Logo

dev-resources.site

for different kinds of informations.

How to Build a Kafka Producer in Rust with Partitioning

Published at
3/3/2023
Categories
rust
kafka
tracing
Author
schultyy
Categories
3 categories in total
rust
open
kafka
open
tracing
open
Author
8 person written this
schultyy
open
How to Build a Kafka Producer in Rust with Partitioning

Hero Image by Askhat Gilyakhov

In this blog post we build an Apache Kafka producer in Rust,
showcasing how you partition data for a specific topic.

When it comes to writing producers for Kafka, writing a simple
producer

is straightforward. With a growing amount of data, however, it becomes
necessary to publish data in a way that allows consumers to work with it efficiently.

To showcase how to produce Kafka events, we will leverage Tokio's tracing crate to generate log data.

Setup a new Project

In this tutorial, our dev setup consists of VSCode, Rust, and Docker to build a Kafka Producer.

Let's get started with a new Rust project. In your terminal, run:

$ cargo new tracing_publisher
Enter fullscreen mode Exit fullscreen mode

As mentioned above, we want to use VSCode's
DevContainer feature to run our Rust code, but also to run Kafka and Zookeeper.

Dev Containers require a config file in a separate directory:

$ cd tracing_publisher
$ mkdir .devcontainer
$ cat > .devcontainer/devcontainer.json
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/rust
{
  "name": "Rust",
  "service": "rust-log-processing",
  "dockerComposeFile": "../docker-compose.yml",
  "features": {
   "ghcr.io/devcontainers/features/rust:1": {}
  },
  "workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}",
  "shutdownAction": "stopCompose"
}
Enter fullscreen mode Exit fullscreen mode

Additionally, we also need a docker-compose.yml file to orchestrate all Docker containers:

#docker-compose.yml
version: '3.8'
services:
rust-log-processing:
image: mcr.microsoft.com/devcontainers/rust:0-1-bullseye
volumes:
- ..:/workspaces:cached
  cap_add:
  - SYS_PTRACE
    security_opt:
    - seccomp:unconfined
      command: /bin/sh -c "while sleep 1000; do :; done"
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
     # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
     - "9092:9092"
       depends_on:
       - zookeeper
         environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

With these configuration files in place, let's open the project in VSCode:

$ code .
Enter fullscreen mode Exit fullscreen mode

*Please Note: Once VSCode has loaded the project, it should present
you with a message in the bottom right corner, asking if you'd like to Open this Project in a container:
*

VSCode prompt to open project in Dev<br>
Container

Click on Reopen in Container. This step will take a few moments to complete, as it downloads all Docker images and installs several components into our development container.

Once VSCode finished loading, click View > Terminal to open the integrated Terminal. We will use this terminal session to execute all commands inside the dev container unless noted otherwise.

Before we add the first few lines of code, let's install some
dependencies we will need (You need to run these commands in the VSCode Terminal):

$ cargo add anyhow
$ cargo add kafka
$ cargo add serde_json
$ cargo add tracing
$ cargo add tracing_subscriber
Enter fullscreen mode Exit fullscreen mode

Collecting Telemetry Data from a Rust Program

We will build a custom tracing subscriber layer to gather telemetry data produced by the tracing crate. We will then send that telemetry data to Kafka.

We won't cover the actual collection mechanism in detail. This aspect is based on the work of Bryan Burgers excellent blog post on "Custom
Logging in Rust Using tracing and
tracing-subscriber"
.

Before we add code, in the terminal, run:

$ cargo run 
Hello, World!
Enter fullscreen mode Exit fullscreen mode

to ensure everything works perfectly. Next, let's add the following code
to main.rs:

use tracing::info;
use tracing_subscriber::prelude::*;

mod custom_layer;
use custom_layer::CustomLayer;

fn main() {
    // Set up how `tracing-subscriber` will deal with tracing data.
    tracing_subscriber::registry().with(CustomLayer).init();

    // Log something simple. In `tracing` parlance, this creates an "event".
    info!(a_bool = true, answer = 42, message = "first example");
}
Enter fullscreen mode Exit fullscreen mode

Then, create a new module called custom_layer.rs:

// Credit: https://github.com/bryanburgers/tracing-blog-post/blob/main/examples/figure_3/custom_layer.rs
use std::collections::BTreeMap;
use tracing_subscriber::Layer;

pub struct CustomLayer;

impl<S> Layer<S> for CustomLayer
where
    S: tracing::Subscriber,
{
    fn on_event(
        &self,
        event: &tracing::Event<'_>,
        _ctx: tracing_subscriber::layer::Context<'_, S>,
    ) {
        // Covert the values into a JSON object
        let mut fields = BTreeMap::new();
        let mut visitor = JsonVisitor(&mut fields);
        event.record(&mut visitor);

        // Output the event in JSON
        let output = serde_json::json!({
            "target": event.metadata().target(),
            "name": event.metadata().name(),
            "level": format!("{:?}", event.metadata().level()),
            "fields": fields,
        });
        println!("{}", serde_json::to_string_pretty(&output).unwrap());
    }
}

struct JsonVisitor<'a>(&'a mut BTreeMap<String, serde_json::Value>);

impl<'a> tracing::field::Visit for JsonVisitor<'a> {
    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_error(
        &mut self,
        field: &tracing::field::Field,
        value: &(dyn std::error::Error + 'static),
    ) {
        self.0.insert(
            field.name().to_string(),
            serde_json::json!(value.to_string()),
        );
    }

    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
        self.0.insert(
            field.name().to_string(),
            serde_json::json!(format!("{:?}", value)),
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

With this code in place, let's run it:

$ cargo run 
{
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
}
Enter fullscreen mode Exit fullscreen mode

Now, every time we use info!, warn!, trace!, or error!, our new extension turns this event into json.

In the next step, we will use the generated JSON and produce Kafka messages from it.

Implement a Kafka Publisher with a Partitioning Strategy

We will implement the publishing capability in a separate impl block custom_layer.rs. Before we get started with that, let's enhance the CustomLayer struct. Change this:

//src/custom_layer.rs
pub struct CustomLayer;
Enter fullscreen mode Exit fullscreen mode

to:

//src/custom_layer.rs
use std::collections::BTreeMap;
use tracing::Level;
use tracing_subscriber::Layer;

pub struct CustomLayer {
   kafka_broker: String,
}
Enter fullscreen mode Exit fullscreen mode

Next, in src/custom_layer.rs, create a new impl in which we will add a new method, as well as our Kafka functionality:

impl CustomLayer {
    pub fn new(kafka_broker: &str) -> Self {
         Self {
             kafka_broker: kafka_broker.into(),
         }
    }
} 
Enter fullscreen mode Exit fullscreen mode

The initializer accepts a Kafka broker address (e.g. broker:9092).
Since we're using Docker Compose, we can refer to the broker by its container name (broker).

send_event contains the most important functionality:

use std::{collections::BTreeMap, time::Duration};
use kafka::producer::{Record, Producer, RequiredAcks};

impl CustomLayer {
   //...
   fn send_event(&self, topic: &str, log_level: &Level, serialized_event: &str) {
       let mut producer = self.create_producer();
       let partition_key = log_level.as_str();

       let record = Record {
           topic: &self.to_kafka_topic_name(topic),
           key: partition_key.to_lowercase(),
           partition: -1,
           value: serialized_event.as_bytes()
       };

       producer
           .send(&record)
           .unwrap();
   }
}

 fn to_kafka_topic_name(&self, input: &str) -> String {
     input.replace('_', "").replace("::", "-")
 }

 fn create_producer(&self) -> Producer {
     Producer::from_hosts(vec![self.kafka_broker.to_owned()])
         .with_ack_timeout(Duration::from_secs(1))
         .with_required_acks(RequiredAcks::One)
         .create()
         .unwrap()
}
Enter fullscreen mode Exit fullscreen mode

send_event accepts a Kafka topic, the event's log level (which we will use to determine the partition) and a payload (serialized_event).

It creates a Record, a Kafka data structure, and sends it off to the broker.

A Primer on Kafka Partitions

Kafka organizes different types of messages in topics. A topic is a logical separation between different types of messages. It contains a certain type of event, for instance log messages, while another might hold customer events.

Often, with a high amount of data, it makes sense to split a topic into several partitions. We do this for several reasons:

  • It's easier to replicate a partition across different Kafka brokers
  • It's easier to distribute the load on the consumer side

If you're curious to learn more about the details, check out this Post.
In our case, we're distributing log messages into separate partitions based on their severity.

Let's examine send_event in a bit more detail:

 fn send_event(&self, topic: &str, log_level: &Level, serialized_event: &str) {
       let mut producer = self.create_producer();
       let partition_key = log_level.as_str();

       let record = Record {
           topic: &self.to_kafka_topic_name(topic),
           key: partition_key.to_lowercase(),
           partition: -1,
           value: serialized_event.as_bytes()
       };
       //...
   }
}
Enter fullscreen mode Exit fullscreen mode

When we construct a new Record, besides providing the topic and the payload (value), we also specify a key, as well as partition.

Both, key and partition allow us to influence in which partition this record should end up in.

partition: -1 tells the Kafka broker to determine the partition on its own. Since we provided a key, it will use the key to decide. key is an optional value. If not provided, Kafka will use a round-Robbin approach to distribute events into partitions. If we choose to provide a
key, it should be related to the application's domain, such as a user id, or, in our case, the application's name.

Publish Log Events

With all Kafka-related code in place, let's start publishing events.
Find the on_event method in the Layer impl block and add a call to send_event:

fn on_event(
     &self,
     event: &tracing::Event<'_>,
     _ctx: tracing_subscriber::layer::Context<'_, S>,
 ) {
     let mut fields = BTreeMap::new();
     let mut visitor = JsonVisitor(&mut fields);
     event.record(&mut visitor);

     // Output the event in JSON
     let output = serde_json::json!({
         "target": event.metadata().target(),
         "name": event.metadata().name(),
         "level": format!("{:?}", event.metadata().level()),
         "fields": fields,
         "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
     });
     //add this line
     let serialized_event = serde_json::to_string_pretty(&output).unwrap();
     println!("{}", serialized_event);
     //add this line
     self.send_event(event.metadata().target(), event.metadata().level(), &serialized_event);
 }
Enter fullscreen mode Exit fullscreen mode

From now on, every time we intercept a tracing event, it will get sent to Kafka.

Before we can start running the program, we need a little bit of configuration in main.rs:

use tracing::info;
use tracing_subscriber::prelude::*;

mod custom_layer;
use custom_layer::CustomLayer;

fn main() {
    // Set up how `tracing-subscriber` will deal with tracing data.
    //change this line
    tracing_subscriber::registry().with(CustomLayer::new("broker:9092")).init();

    // Log something simple. In `tracing` parlance, this creates an "event".
    info!(a_bool = true, answer = 42, message = "first example");
}
Enter fullscreen mode Exit fullscreen mode

If we run our code now, we will see the following output:

$ cargo run
{
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
 thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Kafka(UnknownTopicOrPartition)', src/custom_layer.rs:35:32
 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Enter fullscreen mode Exit fullscreen mode

Our program fails because we haven't created the Kafka topic yet. While Kafka can be configured to create topics on an ad-hoc basis, we will create the topic manually.

Configure Kafka

Before we create the topic, let's quickly discuss how our code determines topic names.

fn on_event(
    &self,
    event: &tracing::Event<'_>,
    _ctx: tracing_subscriber::layer::Context<'_, S>,
) {
    // Covert the values into a JSON object
    let mut fields = BTreeMap::new();
    let mut visitor = JsonVisitor(&mut fields);
    event.record(&mut visitor);

    // Output the event in JSON
    let output = serde_json::json!({
        "target": event.metadata().target(),
        "name": event.metadata().name(),
        "level": format!("{:?}", event.metadata().level()),
        "fields": fields,
    });
    let serialized_event = serde_json::to_string_pretty(&output).unwrap();
    println!("{}", serialized_event);
    self.send_event(
        event.metadata().target(), //This is our topic name
        event.metadata().level(),
        &serialized_event,
    );
}
Enter fullscreen mode Exit fullscreen mode

The send_event method accepts a topic name as argument. In on_event, we provide event.metadata().target() as our topic name. target contains the application's name: tracing_publisher. After processing the name via to_kafka_topic, our topic name will be tracingpublisher.

If we had more applications sending log data, we would want to separate data per application, therefore we create a topic dedicated to a single application.

Let's create the topic. In a terminal on your host machine, run the following command to create a topic and two partitions:

$ docker exec broker \
   kafka-topics --bootstrap-server broker:9092 \
             --create \
             --topic tracingpublisher --partitions 2
 Created topic tracingpublisher.
Enter fullscreen mode Exit fullscreen mode

With that out of the way, let's try running the code again (Run this command in the VSCode terminal):

$ cargo run
{
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
Enter fullscreen mode Exit fullscreen mode

Now we don't see any additional output. To verify it worked, let's use kafkacat to consume the topic's events. (We install kafkacat in the Dev Container. Please run the following command in VSCode's terminal)

$ sudo apt-get update && sudo apt-get install -y kafkacat
Enter fullscreen mode Exit fullscreen mode

(Note: kafkacat got renamed to kcat. We refer to the old name because the dev container's package sources haven't updated to the new name yet).

Let's run kafkacat in Consumer mode and listen to the
tracingpublisher topic:

$ kafkacat -C -b broker:9092 -t tracingpublisher 
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
 % Reached end of topic tracingpublisher [0] at offset 1
 % Reached end of topic tracingpublisher [1] at offset 0   
Enter fullscreen mode Exit fullscreen mode

Now we see our event. But what about the partitions? When we created the topic, we created two partitions.

Let's update our main function and re-run everything again.

use tracing::{info, warn}; //update imports
use tracing_subscriber::prelude::*;

mod custom_layer;
use custom_layer::CustomLayer;

fn main() {
    // Set up how `tracing-subscriber` will deal with tracing data.
    tracing_subscriber::registry().with(CustomLayer::new("broker:9092")).init();

    // Log something simple. In `tracing` parlance, this creates an "event".
    info!(a_bool = true, answer = 42, message = "first example");
    //add this line 
    warn!(a_bool = true, answer = 42, message = "first example"); 
}
Enter fullscreen mode Exit fullscreen mode

Let's run the program again:

$ cargo run
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Warn)",
   "name": "event src/main.rs:13",
   "target": "tracing_publisher"
 }
Enter fullscreen mode Exit fullscreen mode

Now, we get two log outputs, one for info! and one for warning!. Let's run kafkacat again to verify the output:

$ kafkacat -C -b broker:9092 -t tracingpublisher 
{
  "fields": {
    "a_bool": true,
    "answer": 42,
    "message": "first example"
  },
  "level": "Level(Info)",
  "name": "event src/main.rs:12",
  "target": "tracing_publisher"
}
{
  "fields": {
    "a_bool": true,
    "answer": 42,
    "message": "first example"
  },
  "level": "Level(Info)",
  "name": "event src/main.rs:12",
  "target": "tracing_publisher"
}
% Reached end of topic tracingpublisher [0] at offset 2
{
  "fields": {
    "a_bool": true,
    "answer": 42,
    "message": "first example"
  },
  "level": "Level(Warn)",
  "name": "event src/main.rs:13",
  "target": "tracing_publisher"
}
% Reached end of topic tracingpublisher [1] at offset 1
Enter fullscreen mode Exit fullscreen mode

Without additional parameters, we consume both partitions at the same
time. But if we're only interested in, let's say warnings, we could do the following:

$ kafkacat -C -b broker:9092 -t tracingpublisher -p 1
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Warn)",
   "name": "event src/main.rs:13",
   "target": "tracing_publisher"
 }
 % Reached end of topic tracingpublisher [1] at offset 1
Enter fullscreen mode Exit fullscreen mode

Partitions don't have a name, but are identified by an index. In our case, index 1 contains all warnings.

Summary

In this tutorial, we used tracing and a custom tracing_subscriber Layer
to capture telemetry data and sent it to Apache Kafka for further processing.

Instead of sending all data into a single partition, we divide the telemetry events based on severity. A consumer can choose to selectively work with certain subsets of data, based on event severity.

Running Apache Kafka on a local machine as part of your development setup is straightforward. Deploying Kafka into a production environment, however, is a different story. We know sometimes deadlines are approaching quickly and it's challenging to deliver all features and also become a Kafka operations expert at the same time. With Calisti,
you can run Apache Kafka and Zookeeper as a Kubernetes workload without having to become an expert first. Calisti takes care of deployment and configuration, so you can focus on running your application.

Ready to deploy? Check out Calisti.app to get started with our free
tier.

Find the project's source code on GitHub.

tracing Article's
30 articles in total
Favicon
Telemetry and Tracing: A Comprehensive Overview
Favicon
Observability - 6(Distributed Tracing using Jaeger)
Favicon
Trace-based Testing With OpenTelemetry: Using Tracetest with OpenTelemetry
Favicon
Wednesday Links - Edition 2024-08-21
Favicon
OpenTelemetry Tracing on Spring Boot, Java Agent vs. Micrometer Tracing
Favicon
The best way to debug slow web pages
Favicon
How to Track USDT TRC20 Transactions
Favicon
Introduction to Distributed Tracing With OpenTelemetry in .NET
Favicon
Enabling distributed tracing for containerized apps with AWS X-Ray
Favicon
Rust: Actix-web and Daily Logging
Favicon
Unlocking the Power of Distributed Tracing: Navigating the Digital Cosmos🌌🔍✨
Favicon
Microservice observability by OpenTelemetry!
Favicon
[TechStory]: How to add distributed tracing using Jaeger and OpenTelemetry into a Golang application
Favicon
Rust(Rocket)でtracingを使った詳細なエラーログ出力とエラーハンドリングの改善をしてみました
Favicon
Exploring Jaeger - Unveiling the Power of Open-Source End-to-End Distributed Tracing
Favicon
Monitoring and Testing Cloud Native APIs with Grafana
Favicon
Tracing Node.js Microservices with OpenTelemetry
Favicon
Set Up Tracing for a Node.js Application on AppSignal
Favicon
Step-by-Step Guide to Adding Logging to an Actix Web Application
Favicon
AWS Lambda Cookbook — Part 2 — AWS Lambda Observability Best Practices
Favicon
How to Build a Kafka Producer in Rust with Partitioning
Favicon
Log it Like You Mean It: The Top 12 Logging, Tracing, Monitoring & Observability Platforms for Your Success
Favicon
Monitoring, Tracing, and Observability: Get the Inside Scoop on Your System with These Tips!
Favicon
Serverless Spy Vs. Spy Chapter 2: AWS Distro for OpenTelemetry Lambda vs X-Ray SDK
Favicon
How to Add Sentry Integration to your NodeJS App
Favicon
Learning Go by examples: part 10 - Instrument your Go app with OpenTelemetry and send traces to Jaeger - Distributed Tracing
Favicon
End-to-end tracing with OpenTelemetry
Favicon
Honeycomb.io Review
Favicon
Sentry - Tracing
Favicon
Observability: the basics

Featured ones: