Logo

dev-resources.site

for different kinds of informations.

Handle backpressure between Kafka and a database with Vert.x

Published at
8/6/2020
Categories
vertx
backpressure
java
async
Author
ndrbrt_23
Categories
4 categories in total
vertx
open
backpressure
open
java
open
async
open
Author
9 person written this
ndrbrt_23
open
Handle backpressure between Kafka and a database with Vert.x

As we already discussed in the past, asynchronous programming brings many pros in developing reactive and responsive applications. However, it also carries cons and challenges, one of the main ones is the backpressure problem.

What is backpressure?

In physics

it is a resistance or force opposing the desired flow of fluid through pipes (wikipedia)

We can translate the problem on a known scenario: persistence of messages polled from a bus, where there are a huge amount of messages on a bus that our application is polling really fast, but the database underneath is really slow.

real world example

How can be funnel overflow be avoided?

In a synchronous scenario, there's no backpressure's issue, the sync nature of the computation blocks the polling from the bus until the current message is processed.
But, in the async world, the polling is executed without clues on what's happening on the database. So if the database can't handle all the messages coming from the bus, the messages will remain "in between", which means in the memory of our service.
This can lead to failures or, at worst, to service fault.

Let's try to develop an application that persists messages in a database, and make it evolve to handle backpressure

Automatic polling

At first, our verticle will do those operations:

  • initialize the JDBC client
  • initialize the Kafka client
  • subscribe to the topic
  • persist the records

The code is quite simple, and it works well with small amounts of messages.
When the load gets bigger and bigger a problem appears: using the handler of Vertx Kafka consumer means that there's no control on the message ratio, so it will poll continuously without considering the persistance rate, causing memory overload.



public class MainVerticle extends AbstractVerticle {

@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise)
.handler(record -> {
persist(jdbc, record)
.onSuccess(result -> System.out.println("Message persisted"))
.onFailure(cause -> System.err.println("Message not persisted " + cause));
});
}

private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}

private Future<UpdateResult> persist(JDBCClient jdbc, KafkaConsumerRecord<String, String> record) {
Promise<UpdateResult> promise = Promise.promise();
JsonArray params = toParams(record);
jdbc.updateWithParams("insert or update query to persist record", params, promise);
return promise.future();
}

private JsonObject datasourceConfiguration() {
// TODO datasource configuration
return null;
}

private JsonArray toParams(KafkaConsumerRecord<String, String> record) {
// TODO: convert the record into params for the sql command
return null;
}
}

Enter fullscreen mode Exit fullscreen mode




Explicit polling

To handle the backpressure, explicit polling shall be used, and this can be done by avoiding the kafka consumer's handler setting and by calling poll manually (in the following case, every 100ms).
By using this approach, it can be made so that every poll gets performed only when the batch of previously polled messages are persisted.
This behaviour can be achieved by handling every message's persist future and collecting all of them with the CompositeFuture.all, that will succeed only when all the messages are completed, and only in this case the next polling can be made.
If at least one of the future fails, everything will fail, and the polling will stop.
There are various solutions that can be adopted to make the service handle the failure, e.g. sending the message to a Dead Letter Queue, but we will not cover this case.

The problem with this code is that if a message fails, we will lose it, because the consumer is set to auto-commit, so, it's vertx that commits the topic offset.



public class MainVerticle extends AbstractVerticle {

@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer<String, String> consumer = KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise);

<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
Enter fullscreen mode Exit fullscreen mode

}

private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);

<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span>
  <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-&gt;</span> <span class="o">{</span>
    <span class="nc">List</span><span class="o">&lt;</span><span class="nc">Future</span><span class="o">&lt;</span><span class="nc">UpdateResult</span><span class="o">&gt;&gt;</span> <span class="n">futures</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span>
      <span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span>
      <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">record</span> <span class="o">-&gt;</span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">))</span>
      <span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">toList</span><span class="o">());</span>

    <span class="k">return</span> <span class="nc">CompositeFuture</span><span class="o">.</span><span class="na">all</span><span class="o">(</span><span class="k">new</span> <span class="nc">ArrayList</span><span class="o">&lt;&gt;(</span><span class="n">futures</span><span class="o">));</span>
  <span class="o">})</span>
  <span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">composite</span> <span class="o">-&gt;</span> <span class="o">{</span>
    <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted"</span><span class="o">);</span>
    <span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
  <span class="o">})</span>
  <span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-&gt;</span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">))</span>
<span class="o">;</span>
Enter fullscreen mode Exit fullscreen mode

}

private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}

...
}

Enter fullscreen mode Exit fullscreen mode




Manual commit

Setting the ENABLE_AUTO_COMMIT_CONFIG properties to false, the service takes ownership of the topic offset commit.
The commit will be performed only when every message will be persisted, with this trick the at least once delivery is achieved.



public class MainVerticle extends AbstractVerticle {

@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer<String, String> consumer = KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise);

<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
Enter fullscreen mode Exit fullscreen mode

}

private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);

<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span>
  <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-&gt;</span> <span class="o">{</span>
    <span class="nc">List</span><span class="o">&lt;</span><span class="nc">Future</span><span class="o">&lt;</span><span class="nc">UpdateResult</span><span class="o">&gt;&gt;</span> <span class="n">futures</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span>
      <span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span>
      <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">record</span> <span class="o">-&gt;</span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">))</span>
      <span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">toList</span><span class="o">());</span>

    <span class="k">return</span> <span class="nc">CompositeFuture</span><span class="o">.</span><span class="na">all</span><span class="o">(</span><span class="k">new</span> <span class="nc">ArrayList</span><span class="o">&lt;&gt;(</span><span class="n">futures</span><span class="o">));</span>
  <span class="o">})</span>
  <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">composite</span> <span class="o">-&gt;</span> <span class="o">{</span>
    <span class="nc">Promise</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;</span> <span class="n">commitPromise</span> <span class="o">=</span> <span class="nc">Promise</span><span class="o">.</span><span class="na">promise</span><span class="o">();</span>
    <span class="n">consumer</span><span class="o">.</span><span class="na">commit</span><span class="o">(</span><span class="n">commitPromise</span><span class="o">);</span>
    <span class="k">return</span> <span class="n">commitPromise</span><span class="o">.</span><span class="na">future</span><span class="o">();</span>
  <span class="o">})</span>
  <span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">any</span> <span class="o">-&gt;</span> <span class="o">{</span>
    <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted and committed"</span><span class="o">);</span>
    <span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
  <span class="o">})</span>
  <span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-&gt;</span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting and committing messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">))</span>
<span class="o">;</span>
Enter fullscreen mode Exit fullscreen mode

}

private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}
...
}

Enter fullscreen mode Exit fullscreen mode




Bonus feature: achieve ordering

With a little effort it's possible to achieve ordering:
future composition allows forcing every persist operation to wait the completion of its precedent.
It's achievable by chaining the async computations one to another, so every one will be executed when the precedent future succeeds.
This is a smart pattern to be used when serialization is needed.



public class MainVerticle extends AbstractVerticle {
...
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);

<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span>
  <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-&gt;</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span>
    <span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span>
    <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="nc">Future</span><span class="o">.&lt;</span><span class="nc">UpdateResult</span><span class="o">&gt;</span><span class="n">succeededFuture</span><span class="o">(),</span>
      <span class="o">(</span><span class="n">acc</span><span class="o">,</span> <span class="n">record</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">acc</span><span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">it</span> <span class="o">-&gt;</span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">)),</span>
      <span class="o">(</span><span class="n">a</span><span class="o">,</span><span class="n">b</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">a</span>
    <span class="o">)</span>
  <span class="o">)</span>
  <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">composite</span> <span class="o">-&gt;</span> <span class="o">{</span>
    <span class="nc">Promise</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;</span> <span class="n">commitPromise</span> <span class="o">=</span> <span class="nc">Promise</span><span class="o">.</span><span class="na">promise</span><span class="o">();</span>
    <span class="n">consumer</span><span class="o">.</span><span class="na">commit</span><span class="o">(</span><span class="n">commitPromise</span><span class="o">);</span>
    <span class="k">return</span> <span class="n">commitPromise</span><span class="o">.</span><span class="na">future</span><span class="o">();</span>
  <span class="o">})</span>
  <span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">any</span> <span class="o">-&gt;</span> <span class="o">{</span>
    <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted and committed"</span><span class="o">);</span>
    <span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
  <span class="o">})</span>
  <span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-&gt;</span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting and committing messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">));</span>
Enter fullscreen mode Exit fullscreen mode

}
...
}

Enter fullscreen mode Exit fullscreen mode




Conclusion

Backpressure is a fundamental topic to cover when working with async programming.
It does not come for free out of the vert.x box, but it can be acheived with some simple tricks.

vertx Article's
28 articles in total
Favicon
Error handlers and failure handlers in Vert.x
Favicon
Why we discarded Reactive systems architecture from our code?
Favicon
Build web application in Vert.x [Part 1/ ♾️]
Favicon
Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.
Favicon
Surprising Qualities of Event Driven System
Favicon
Idiomatic Kotlin Abstractions for the Vert.x EventBus
Favicon
Vert.x Circuit Breaker
Favicon
Writing Async Tests for Vert.x using Kotlin
Favicon
Reducing Boilerplate in Vert.x Tests written in Kotlin
Favicon
Writing Vert.x Integration Tests with Kotlin & Testcontainers
Favicon
Quarkus: Entendendo a relação entre o Mutiny e o Vert.x
Favicon
HTTPS Client Certificate Authentication With Java
Favicon
Throttle HTTP requests on paged resources with Vert.x
Favicon
Supercharge Your Kotlin Vert.x Application with EventBus Extensions
Favicon
Handle backpressure between Kafka and a database with Vert.x
Favicon
Handling unknown JSON structures
Favicon
Introduction to Vert.x
Favicon
Future Composition in Vert.x
Favicon
How to extend Vert.x EventBus API to save on serialization.
Favicon
How to write beautiful unit tests in Vert.x
Favicon
Scaling Vert.x application for session dependent data processing.
Favicon
KVision v3.7.0 is released (with Vert.x support)
Favicon
Reactive Java using the Vert.x toolkit
Favicon
Vert.x Kotlin Coroutines
Favicon
How we built a RESTful API with Vert.x, Kotlin Coroutines and Keycloak
Favicon
vertx-jooq 2.4 released
Favicon
Sirix - Released 0.9.1 (time travel queries and versioning made easy)
Favicon
Reactive Programming with Kotlin - Quick Intro to Vert.x

Featured ones: