HomeGuidesChangelog
GuidesGitHubLog In

Bulkhead

Getting started with resilience4j-bulkhead

Introduction

Resilience4j provides two implementations of a bulkhead pattern that can be used to limit the number of concurrent execution:

  • a SemaphoreBulkhead which uses Semaphores
  • a FixedThreadPoolBulkhead which uses a bounded queue and a fixed thread pool.

The SemaphoreBulkheadshould work well across a variety of threading and I/O models. It is based on a semaphore, and unlike Hystrix, does not provide "shadow" thread pool option. It is up to the client to ensure correct thread pool sizing that will be consistent with bulkhead configuration.

Create a BulkheadRegistry

Just like the CircuitBreaker module, this module provides an in-memory BulkheadRegistry and a ThreadPoolBulkheadRegistry which you can use to manage (create and retrieve) Bulkhead instances.

BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();

ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = 
  ThreadPoolBulkheadRegistry.ofDefaults();

Create and configure a Bulkhead

You can provide a custom global BulkheadConfig. In order to create a custom global BulkheadConfig, you can use the BulkheadConfig builder. You can use the builder to configure the following properties.

Config propertyDefault valueDescription
maxConcurrentCalls25Max amount of parallel executions allowed by the bulkhead
maxWaitDuration0Max amount of time a thread should be blocked for when attempting to enter a saturated bulkhead.
// Create a custom configuration for a Bulkhead
BulkheadConfig config = BulkheadConfig.custom()
  	.maxConcurrentCalls(150)
  	.maxWaitDuration(Duration.ofMillis(500))
  	.build();

// Create a BulkheadRegistry with a custom global configuration
BulkheadRegistry registry = BulkheadRegistry.of(config);

// Get or create a Bulkhead from the registry - 
// bulkhead will be backed by the default config
Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");

// Get or create a Bulkhead from the registry, 
// use a custom configuration when creating the bulkhead
Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);

Create and configure a ThreadPoolBulkhead

You can provide a custom global ThreadPoolBulkheadConfig. In order to create a custom global ThreadPoolBulkheadConfig, you can use the ThreadPoolBulkheadConfig builder. You can use the builder to configure the following properties.

Config propertyDefault valueDescription
maxThreadPoolSizeRuntime.getRuntime()
.availableProcessors()
Configures the max thread pool size.
coreThreadPoolSizeRuntime.getRuntime()
.availableProcessors() - 1
Configures the core thread pool size
queueCapacity100Configures the capacity of the queue.
keepAliveDuration20 [ms]When the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
  .maxThreadPoolSize(10)
  .coreThreadPoolSize(2)
  .queueCapacity(20)
  .build();
        
// Create a BulkheadRegistry with a custom global configuration
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);

// Get or create a ThreadPoolBulkhead from the registry - 
// bulkhead will be backed by the default config
ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");

// Get or create a Bulkhead from the registry, 
// use a custom configuration when creating the bulkhead
ThreadPoolBulkheadConfig custom = BulkheadConfig.custom()
  .maxThreadPoolSize(5)
  .build();

ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);

Decorate and execute a functional interface

As you can guess Bulkhead has all sort of higher order decorator functions just like CircuitBreaker. You can decorate any Callable, Supplier, Runnable, Consumer, CheckedRunnable, CheckedSupplier, CheckedConsumer or CompletionStage with a Bulkhead.

// Given
Bulkhead bulkhead = Bulkhead.of("name", config);

// When I decorate my function
CheckedFunction0<String> decoratedSupplier = Bulkhead
  .decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");

// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
  .map(value -> value + " world'");

// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
    .maxThreadPoolSize(10)
    .coreThreadPoolSize(2)
    .queueCapacity(20)
    .build();

ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config);

CompletionStage<String> supplier = ThreadPoolBulkhead
    .executeSupplier(bulkhead, backendService::doSomething);

Consume emitted RegistryEvents

You can register event consumer on a BulkheadRegistry and take actions whenever a Bulkhead is created, replaced or deleted.

BulkheadRegistry registry = BulkheadRegistry.ofDefaults();
registry.getEventPublisher()
  .onEntryAdded(entryAddedEvent -> {
    Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
    LOG.info("Bulkhead {} added", addedBulkhead.getName());
  })
  .onEntryRemoved(entryRemovedEvent -> {
    Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
    LOG.info("Bulkhead {} removed", removedBulkhead.getName());
  });

Consume emitted BulkheadEvents

The BulkHead emits a stream of BulkHeadEvents. There are two types of events emitted: permitted execution, rejected execution & finished execution. If you want to consume these events, you have to register an event consumer.

bulkhead.getEventPublisher()
    .onCallPermitted(event -> logger.info(...))
    .onCallRejected(event -> logger.info(...))
    .onCallFinished(event -> logger.info(...));

What’s Next