Bulkhead
Getting started with resilience4j-bulkhead
Introduction
Resilience4j provides two implementations of a bulkhead pattern that can be used to limit the number of concurrent execution:
- a
SemaphoreBulkhead
which uses Semaphores - a
FixedThreadPoolBulkhead
which uses a bounded queue and a fixed thread pool.
The SemaphoreBulkhead
should work well across a variety of threading and I/O models. It is based on a semaphore, and unlike Hystrix, does not provide "shadow" thread pool option. It is up to the client to ensure correct thread pool sizing that will be consistent with bulkhead configuration.
Create a BulkheadRegistry
Just like the CircuitBreaker module, this module provides an in-memory BulkheadRegistry
and a ThreadPoolBulkheadRegistry
which you can use to manage (create and retrieve) Bulkhead instances.
BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry =
ThreadPoolBulkheadRegistry.ofDefaults();
Create and configure a Bulkhead
You can provide a custom global BulkheadConfig. In order to create a custom global BulkheadConfig, you can use the BulkheadConfig builder. You can use the builder to configure the following properties.
Config property | Default value | Description |
---|---|---|
maxConcurrentCalls | 25 | Max amount of parallel executions allowed by the bulkhead |
maxWaitDuration | 0 | Max amount of time a thread should be blocked for when attempting to enter a saturated bulkhead. |
// Create a custom configuration for a Bulkhead
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(150)
.maxWaitDuration(Duration.ofMillis(500))
.build();
// Create a BulkheadRegistry with a custom global configuration
BulkheadRegistry registry = BulkheadRegistry.of(config);
// Get or create a Bulkhead from the registry -
// bulkhead will be backed by the default config
Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
Create and configure a ThreadPoolBulkhead
You can provide a custom global ThreadPoolBulkheadConfig. In order to create a custom global ThreadPoolBulkheadConfig, you can use the ThreadPoolBulkheadConfig builder. You can use the builder to configure the following properties.
Config property | Default value | Description |
---|---|---|
maxThreadPoolSize | Runtime.getRuntime() .availableProcessors() | Configures the max thread pool size. |
coreThreadPoolSize | Runtime.getRuntime() .availableProcessors() - 1 | Configures the core thread pool size |
queueCapacity | 100 | Configures the capacity of the queue. |
keepAliveDuration | 20 [ms] | When the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating. |
writableStackTraceEnabled | true | Output the stack trace error when a bulkhead exception is thrown. If false, output a single line with bulkhead exception. |
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
// Create a BulkheadRegistry with a custom global configuration
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
// Get or create a ThreadPoolBulkhead from the registry -
// bulkhead will be backed by the default config
ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
ThreadPoolBulkheadConfig custom = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(5)
.build();
ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
Decorate and execute a functional interface
As you can guess Bulkhead has all sort of higher order decorator functions just like CircuitBreaker. You can decorate any Callable
, Supplier
, Runnable
, Consumer
, CheckedRunnable
, CheckedSupplier
, CheckedConsumer
or CompletionStage
with a Bulkhead.
// Given
Bulkhead bulkhead = Bulkhead.of("name", config);
// When I decorate my function
CheckedFunction0<String> decoratedSupplier = Bulkhead
.decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");
// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
.map(value -> value + " world'");
// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config);
CompletionStage<String> supplier = ThreadPoolBulkhead
.executeSupplier(bulkhead, backendService::doSomething);
Consume emitted RegistryEvents
You can register event consumer on a BulkheadRegistry and take actions whenever a Bulkhead is created, replaced or deleted.
BulkheadRegistry registry = BulkheadRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
LOG.info("Bulkhead {} added", addedBulkhead.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
LOG.info("Bulkhead {} removed", removedBulkhead.getName());
});
Consume emitted BulkheadEvents
The BulkHead emits a stream of BulkHeadEvents. There are two types of events emitted: permitted execution, rejected execution & finished execution. If you want to consume these events, you have to register an event consumer.
bulkhead.getEventPublisher()
.onCallPermitted(event -> logger.info(...))
.onCallRejected(event -> logger.info(...))
.onCallFinished(event -> logger.info(...));
Updated about 2 years ago