resilience4j

Resilience4j is a fault tolerance library for Java™

Resilience4j is a lightweight fault tolerance library inspired by Netflix Hystrix, but designed for functional programming. Lightweight, because the library only uses Vavr, which does not have any other external library dependencies. Netflix Hystrix, in contrast, has a compile dependency to Archaius which has many more external library dependencies such as Guava and Apache Commons Configuration.

Resilience4j provides higher-order functions (decorators) to enhance any functional interface, lambda expression or method reference with a Circuit Breaker, Rate Limiter, Retry or Bulkhead. You can stack more than one decorator on any functional interface, lambda expression or method reference. The advantage is that you have the choice to select the decorators you need and nothing else.

Supplier<String> supplier = () -> backendService.doSomething(param1, param2);

Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
  .withRetry(Retry.ofDefaults("name"))
  .withCircuitBreaker(CircuitBreaker.ofDefaults("name"))
  .withBulkhead(Bulkhead.ofDefaults("name"));  

String result = Try.ofSupplier(decoratedSupplier)
  .recover(throwable -> "Hello from Recovery").get();

With Resilience4j you don’t have to go all-in, you can pick what you need.

Get Started

Bulkhead

Getting started with resilience4j-bulkhead

Introduction

Resilience4j provides two implementations of a bulkhead pattern that can be used to limit the number of concurrent execution:

  • a SemaphoreBulkhead which uses Semaphores
  • a FixedThreadPoolBulkhead which uses a bounded queue and a fixed thread pool.

The SemaphoreBulkheadshould work well across a variety of threading and I/O models. It is based on a semaphore, and unlike Hystrix, does not provide "shadow" thread pool option. It is up to the client to ensure correct thread pool sizing that will be consistent with bulkhead configuration.

Create a BulkheadRegistry

Just like the CircuitBreaker module, this module provides an in-memory BulkheadRegistry and a ThreadPoolBulkheadRegistry which you can use to manage (create and retrieve) Bulkhead instances.

BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();

ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = 
  ThreadPoolBulkheadRegistry.ofDefaults();

Create and configure a Bulkhead

You can provide a custom global BulkheadConfig. In order to create a custom global BulkheadConfig, you can use the BulkheadConfig builder. You can use the builder to configure the following properties.

Config property
Default value
Description

maxConcurrentCalls

25

Max amount of parallel executions allowed by the bulkhead

maxWaitDuration

0

Max amount of time a thread should be blocked for when attempting to enter a saturated bulkhead.

// Create a custom configuration for a Bulkhead
BulkheadConfig config = BulkheadConfig.custom()
  	.maxConcurrentCalls(150)
  	.maxWaitDuration(Duration.ofMillis(500))
  	.build();

// Create a BulkheadRegistry with a custom global configuration
BulkheadRegistry registry = BulkheadRegistry.of(config);

// Get or create a Bulkhead from the registry - 
// bulkhead will be backed by the default config
Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");

// Get or create a Bulkhead from the registry, 
// use a custom configuration when creating the bulkhead
Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);

Create and configure a ThreadPoolBulkhead

You can provide a custom global ThreadPoolBulkheadConfig. In order to create a custom global ThreadPoolBulkheadConfig, you can use the ThreadPoolBulkheadConfig builder. You can use the builder to configure the following properties.

Config property
Default value
Description

maxThreadPoolSize

Runtime.getRuntime()
.availableProcessors()

Configures the max thread pool size.

coreThreadPoolSize

Runtime.getRuntime()
.availableProcessors() - 1

Configures the core thread pool size

queueCapacity

100

Configures the capacity of the queue.

keepAliveDuration

20 [ms]

When the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.

ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
  .maxThreadPoolSize(10)
  .coreThreadPoolSize(2)
  .queueCapacity(20)
  .build();
        
// Create a BulkheadRegistry with a custom global configuration
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);

// Get or create a ThreadPoolBulkhead from the registry - 
// bulkhead will be backed by the default config
ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");

// Get or create a Bulkhead from the registry, 
// use a custom configuration when creating the bulkhead
ThreadPoolBulkheadConfig custom = BulkheadConfig.custom()
  .maxThreadPoolSize(5)
  .build();

ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);

Decorate and execute a functional interface

As you can guess Bulkhead has all sort of higher order decorator functions just like CircuitBreaker. You can decorate any Callable, Supplier, Runnable, Consumer, CheckedRunnable, CheckedSupplier, CheckedConsumer or CompletionStage with a Bulkhead.

// Given
Bulkhead bulkhead = Bulkhead.of("name", config);

// When I decorate my function
CheckedFunction0<String> decoratedSupplier = Bulkhead
  .decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");

// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
  .map(value -> value + " world'");

// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
    .maxThreadPoolSize(10)
    .coreThreadPoolSize(2)
    .queueCapacity(20)
    .build();

ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config);

CompletionStage<String> supplier = ThreadPoolBulkhead
    .executeSupplier(bulkhead, backendService::doSomething);

Consume emitted RegistryEvents

You can register event consumer on a BulkheadRegistry and take actions whenever a Bulkhead is created, replaced or deleted.

BulkheadRegistry registry = BulkheadRegistry.ofDefaults();
registry.getEventPublisher()
  .onEntryAdded(entryAddedEvent -> {
    Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
    LOG.info("Bulkhead {} added", addedBulkhead.getName());
  })
  .onEntryRemoved(entryRemovedEvent -> {
    Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
    LOG.info("Bulkhead {} removed", removedBulkhead.getName());
  });

Consume emitted BulkheadEvents

The BulkHead emits a stream of BulkHeadEvents. There are two types of events emitted: permitted execution, rejected execution & finished execution. If you want to consume these events, you have to register an event consumer.

bulkhead.getEventPublisher()
    .onCallPermitted(event -> logger.info(...))
    .onCallRejected(event -> logger.info(...))
    .onCallFinished(event -> logger.info(...));

What's Next

RateLimiter

Bulkhead


Getting started with resilience4j-bulkhead

Suggested Edits are limited on API Reference Pages

You can only suggest edits to Markdown body content, but not to the API spec.