Logo

dev-resources.site

for different kinds of informations.

Alpakka Kafka and Clojure

Published at
6/26/2023
Categories
clojure
kafka
akka
alpakka
Author
fr33m0nk
Categories
4 categories in total
clojure
open
kafka
open
akka
open
alpakka
open
Author
8 person written this
fr33m0nk
open
Alpakka Kafka and Clojure

Kafka streams is an amazing abstraction for working with Kafka and has amazing JVM support. However, if stream application does I/O operations, it may not be right fit. Alpakka Kafka uses Flow control optimizations, well suited for creating streams where I/O operations are taking place. For a detailed comparison on Kafka Streams and Alpakka Kafka, refer this Medium story.

Alpakka Kafka uses Akka under the hood and offers a rich Scala and Java API. However, Clojure story is pretty lackluster. Using Alpakka Kafka essentially means, doing a whole lot of interop with Alpakka Kafka’s Java API.

I have created a Clojure wrapper that abstracts the interop with Alpakka Kafka Java API and allows library consumer to use Alpakka Kafka without much hassle.

I will demo an example of setting up a Alpakka Kafka stream using aforementioned Clojure wrapper library:

  1. Import required dependencies in the (Clojure deps) project.

     net.clojars.fr33m0nk/clj-alpakka-kafka {:mvn/version "0.1.6"}
     org.apache.kafka/kafka-clients {:mvn/version "3.3.2"}
    
  2. Import required namespaces in the demo namespace alpakka-kafka-demo

     (ns alpakka-kafka-demo
       (:require
         [fr33m0nk.akka.actor :as actor]
         [fr33m0nk.akka.stream :as s]
         [fr33m0nk.alpakka-kafka.committer :as committer]
         [fr33m0nk.alpakka-kafka.consumer :as consumer]
         [fr33m0nk.alpakka-kafka.producer :as producer]
         [fr33m0nk.utils :as utils])
     (:import
       [org.apache.kafka.common.serialization StringDeserializer StringSerializer]))
    
  3. We will create a new stream topology.

    1. This stream topology will consume message from a Kafka Topic, transform it and then publish to another Kafka topic.

        (defn test-stream-with-producer
          [actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
          (-> (consumer/->committable-source consumer-settings consumer-topics)
              (s/map-async 2
                           (fn [message]
                             (let [_key (consumer/key message)
                                   value (consumer/value message)
                                   committable-offset (consumer/committable-offset message)
                                   message-to-publish (producer/->producer-record producer-topic (str/upper-case value))]
                               (producer/single-producer-message-envelope committable-offset message-to-publish))))
              (s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
              (s/run actor-system)))
      
    2. s/map-async executes mapping function with 2 messages being processed in parallel

    3. Then we will publish messages to another topic and commit offsets to Kafka via s/to-mat and producer/committable-sink

    4. Finally, we run the stream with our actor-system using s/run

  4. Let’s create required dependencies.

     (def actor-system (actor/->actor-system "test-actor-system"))
    
     (def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
    
     (def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))
    
     (def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))
    
  5. Let’s run the stream and see it in action

     (def consumer-control (test-stream-with-producer actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
    

    Streams in action :D

  6. We can shut this Alpakka Kafka stream down via following.

     ;; shutdown streams using consumer-control var
     @(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
    

    utils/->fn0 reifies java.util.function.Supplier interface to a Clojure function with 0 arity.

  7. We can shutdown Akka actor-system as well.

     @(actor/terminate actor-system)
    

I hope this story introduced Alpakka Kafka as a viable alternate to Kafka streams using Clojure. I have documented more examples on using Alpakka Kafka with my Clojure wrapper library here.

Featured ones: