Kotlin
Getting started with resilience4j-kotlin
Introduction
Integration for Kotlin coroutines. This module adds extensions for executing and decorating suspend
functions as well as custom operators for the coroutine reactive primitive Flow.
Extension functions that accept Kotlin suspend functions are provided for RateLimiter, Retry, CircuitBreaker, TimeLimiter, and semaphore-based Bulkheads. No extensions for ThreadPoolBulkhead or Caches are currently provided.
For Flow, there are operators provided for RateLimiter, Retry, CircuitBreaker, TimeLimiter, and semaphore-based Bulkheads. No extensions for ThreadPoolBulkhead or Caches are currently provided.
Setup
Add the Kotlin module of Resilience4j to your compile dependency, along with whichever core modules are needed:
repositories {
jCenter()
}
dependencies {
compile "io.github.resilience4j:resilience4j-kotlin:${resilience4jVersion}"
// also have a dependency on the core module(s) needed - for example, retry:
compile "io.github.resilience4j:resilience4j-retry:${resilience4jVersion}"
}
Usage
Two extension functions are declared for each of CircuitBreaker
, RateLimiter
, Retry
, and TimeLimiter
: one to execute a suspend function, and one to decorate a suspend function.
Usage - Suspending Functions
Two extension functions are declared for each of CircuitBreaker
, RateLimiter
, Retry
, and TimeLimiter
: one to execute a suspend function, and one to decorate a suspend function.
val circuitBreaker = CircuitBreaker.ofDefaults()
val result = circuitBreaker.executeSuspendFunction {
// call suspending functions here
}
val function = circuitBreaker.decorateSuspendFunction {
// call suspending functions here
}
val result = function()
The suspend functions suspend where usage of the normal methods would block. For example, calls to a RateLimiter
which need to be delayed to fit within the rate limit suspend before the given function is executed.
No changes are made to the coroutine context of the given suspend functions.
Usage - Flow
There are extensions for Flow<T>
defined for each of CircuitBreaker
, RateLimiter
, Retry
, and TimeLimiter
. The order in which operators are chain is important and directly correlates to the order in which resilience primitives are evaluated. Considering the following example, flowOf(1).retry(...).timeLimiter(...)
, all of the retry attempts must completed within the duration of the supplied TimeLimiter.
val retry = Retry.ofDefaults()
val rateLimiter = RateLimiter.ofDefaults()
val timeLimiter = TimeLimiter.ofDefaults()
val circuitBreaker = CircuitBreaker.ofDefaults()
flowOf(1, 2, 3)
.retry(retry)
.rateLimiter(rateLimiter)
.timeLimiter(timeLimiter)
.circuitBreaker(circuitBreaker)
.collect { println(it) }
Bulkhead
Decorating a suspend function or Flow with a Bulkhead
special attention needs to be paid to configuration value of maxWaitTime
. If maxWaitTime
is non-zero, the call will block until the max wait time is reached or permission is obtained. This blocking is restricted to the dispatcher Dispatchers.IO
. Although this helps mitigate some of the performance impact introduced by the usage of the blocking API, it is not recommended to use this extension function with Bulkheads with non-zero max wait times.
If the coroutine scope is cancelled, either normally or exceptionally, the acquired permission will be released. After which no success or failure events will be recorded.
No extension functions for thread-pool based bulkheads are provided.
CircuitBreaker
Decorating a suspend function or Flow with a CircuitBreaker
does not add any additional suspension points. In the case of decorated Flows, attempts to collect will throw a CallNotPermittedException
if the circuit breaker state is OPEN
.
If the coroutine scope is cancelled, either normally or exceptionally, the acquired permission will be released. After which no success or failure events will be recorded.
Rate Limiter
Suspend functions decorated with a RateLimiter
use delay()
in order to suspend before executing the decorated function if the rate limit has been reached.
Flows decorated with a RateLimiter
will invoke delay()
on attempts to collect if the rate limit has been reached.
Retry
Suspend functions and Flows decorated with a Retry
use delay()
in order to suspend between retries.
Time Limiter
The TimeLimiter
extension functions simply uses withTimeout()
from kotlinx-coroutines
, using the timeout from the receiver's configuration. Specifically, this means:
- On timeout, a
TimeoutCancellationException
is raised, rather than aTimeoutException
as with methods for non-suspending functions. - When a timeout occurs, the coroutine is cancelled, rather than the thread being interrupted as with methods for non-suspending functions.
- After the timeout, the given block can only be stopped at a cancellable suspending function call.
- The
cancelRunningFuture
configuration setting is ignored - on timeout, the suspend function is always cancelled even if thecancelRunningFuture
is set tofalse
.
Flows decorated with a TimeLimiter
will ensure that the entire Flow is consumed within the configured limit once collection has begun.
Updated almost 5 years ago