Logo

dev-resources.site

for different kinds of informations.

Node.js Walkthrough: Build a Simple Event-Driven Application with Kafka

Published at
6/25/2024
Categories
kafka
eventdriven
node
tutorial
Author
Alvin Lee
Categories
4 categories in total
kafka
open
eventdriven
open
node
open
tutorial
open
Node.js Walkthrough: Build a Simple Event-Driven Application with Kafka

Have you ever wondered how some of your favorite apps handle real-time updates? Live sports scores, stock market tickers, or even social media notifications — they all rely on event-driven architecture (EDA) to process data instantly. EDA is like having a conversation where every new piece of information triggers an immediate response. It’s what makes an application more interactive and responsive.

In this walkthrough, we’ll guide you through building a simple event-driven application using Apache Kafka on Heroku. We’ll cover:

  • Setting up a Kafka cluster on Heroku

  • Building a Node.js application that produces and consumes events

  • Deploying your application to Heroku

Apache Kafka is a powerful tool for building EDA systems. It’s an open-source platform designed for handling real-time data feeds. Apache Kafka on Heroku is a Heroku add-on that provides Kafka as a service. Heroku makes it pretty easy to deploy and manage applications, and I’ve been using it more in my projects recently. Combining Kafka with Heroku simplifies the setup process when you want to run an event-driven application.

By the end of this guide, you’ll have a running application that demonstrates the power of EDA with Apache Kafka on Heroku. Let’s get started!

Getting Started

Before we dive into the code, let’s quickly review some core concepts. Once you understand these, following along will be easier.

  • Events are pieces of data that signify some occurrence in the system, like a temperature reading from a sensor.

  • Topics are categories or channels where events are published. Think of them as the subjects you subscribe to in a newsletter.

  • Producers are the entities that create and send events to topics. In our demo EDA application, our producers will be a set of weather sensors.

  • Consumers are the entities that read and process events from topics. Our application will have a consumer that listens for weather data events and logs them.

Introduction to our application

We’ll build a Node.js application using the KafkaJS library. Here’s a quick overview how our application will work:

  1. Our weather sensors (the producers) will periodically generate data — such as temperature, humidity, and barometric pressure — and send these events to Apache Kafka. For demo purposes, the data will be randomly generated.

  2. We’ll have a consumer listening to the topics. When a new event is received, it will write the data to a log.

  3. We’ll deploy the entire setup to Heroku and use Heroku logs to monitor the events as they occur.

Prerequisites

Before we start, make sure you have the following:

  • A Heroku account: If you don’t have one, sign up at Heroku.

  • Heroku CLI: Download and install the Heroku CLI.

  • Node.js installed on your local machine for development. On my machine, I’m using Node (v.20.9.0) and npm (10.4.0).

The codebase for this entire project is available in this GitHub repository. Feel free to clone the code and follow along throughout this post.

Now that we’ve covered the basics, let’s set up our Kafka cluster on Heroku and start building.

Setting up a Kafka Cluster on Heroku

Let’s get everything set up on Heroku. It’s a pretty quick and easy process.

Step 1: Log in via the Heroku CLI

~/project$ heroku login

Step 2: Create a Heroku app

~/project$ heroku create weather-eda

(I’ve named my Heroku app weather-eda, but you can choose a unique name for your app.)

Step 3: Add the Apache Kafka on Heroku add-on

~/project$ heroku addons:create heroku-kafka:basic-0

Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month)
The cluster should be available in a few minutes.
Run `heroku kafka:wait` to wait until the cluster is ready.
You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka
kafka-adjacent-07560 is being created in the background. The app will restart when complete...
Use heroku addons:info kafka-adjacent-07560 to check creation progress
Use heroku addons:docs heroku-kafka to view documentation

You can find more information about the Apache Kafka on Heroku add-on here. For our demo, I’m adding the Basic 0 tier of the add-on. The cost of the add-on is $0.139/hour. As I went through building this demo application, I used the add-on for less than an hour, and then I spun it down.

It takes a few minutes for Heroku to get Kafka spun up and ready for you. Pretty soon, this is what you’ll see:

~/project$ heroku addons:info kafka-adjacent-07560

=== kafka-adjacent-07560
Attachments:  weather-eda::KAFKA
Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time)
Max Price:    $100/month
Owning app:   weather-eda
Plan:         heroku-kafka:basic-0
Price:        ~$0.139/hour
State:        created

Step 4: Get Kafka credentials and configurations

With our Kafka cluster spun up, we will need to get credentials and other configurations. Heroku creates several config vars for our application, populating them with information from the Kafka cluster that was just created. We can see all of these config vars by running the following:

~/project$ heroku config
=== weather-eda Config Vars

KAFKA_CLIENT_CERT:     -----BEGIN CERTIFICATE-----
MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h
...
-----END CERTIFICATE-----

KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3
...
-----END RSA PRIVATE KEY-----

KAFKA_PREFIX:          columbia-68051.
KAFKA_TRUSTED_CERT:    -----BEGIN CERTIFICATE-----
MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h
...
F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4=
-----END CERTIFICATE-----

KAFKA_URL:             kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096

As you can see, we have several config variables. We’ll want a file in our project root folder called .env with all of these config var values. To do this, we simply run the following command:

~/project$ heroku config --shell > .env

Our .env file looks like this:

KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----"
KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----"
KAFKA_PREFIX="columbia-68051."
KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----"
KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"

Also, we make sure to add .env to our .gitignore file. We wouldn’t want to commit this sensitive data to our repository.

Step 5: Install the Kafka plugin into the Heroku CLI

The Heroku CLI doesn’t come with Kafka-related commands right out of the box. Since we’re using Kafka, we’ll need to install the CLI plugin.

~/project$ heroku plugins:install heroku-kafka
Installing plugin heroku-kafka... installed v2.12.0

Now, we can manage our Kafka cluster from the CLI.

~/project$ heroku kafka:info
=== KAFKA_URL
Plan:       heroku-kafka:basic-0
Status:     available
Version:    2.8.2
Created:    2024-05-27T18:44:38.023+00:00
Topics:     [··········] 0 / 40 topics, see heroku kafka:topics
Prefix:     columbia-68051.
Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor)
Messages:   0 messages/s
Traffic:    0 bytes/s in / 0 bytes/s out
Data Size:  [··········] 0 bytes / 4.00 GB (0.00%)
Add-on:     kafka-adjacent-07560

~/project$ heroku kafka:topics
=== Kafka Topics on KAFKA_URL

No topics found on this Kafka cluster.
Use heroku kafka:topics:create to create a topic (limit 40)

Step 6: Test out interacting with the cluster

Just as a sanity check, let’s play around with our Kafka cluster. We start by creating a topic.

~/project$ heroku kafka:topics:create test-topic-01
Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done
Use `heroku kafka:topics:info test-topic-01` to monitor your topic.
Your topic is using the prefix columbia-68051..

~/project$ heroku kafka:topics:info test-topic-01
 ▸    topic test-topic-01 is not available yet

Within a minute or so, our topic becomes available.

~/project$ heroku kafka:topics:info test-topic-01
=== kafka-adjacent-07560 :: test-topic-01

Topic Prefix:       columbia-68051.
Producers:          0 messages/second (0 bytes/second) total
Consumers:          0 bytes/second total
Partitions:         8 partitions
Replication Factor: 3
Compaction:         Compaction is disabled for test-topic-01
Retention:          24 hours

Next, in this terminal window, we’ll act as a consumer, listening on this topic by tailing it.

~/project$ heroku kafka:topics:tail test-topic-01

From here, the terminal simply waits for any events published to the topic.

In a separate terminal window, we’ll act as a producer, and we’ll publish some messages to the topic.

~/project$ heroku kafka:topics:write test-topic-01 "hello world!"

Back in our consumer’s terminal window, this is what we see:

~/project$ heroku kafka:topics:tail test-topic-01
test-topic-01 0 0 12 hello world!

Excellent! We have successfully produced and consumed an event to a topic in our Kafka cluster. We’re ready to move on to our Node.js application. Let’s destroy this test topic to keep our playground tidy.

~/project$ heroku kafka:topics:destroy test-topic-01
 ▸    This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda
 ▸    To proceed, type weather-eda or re-run this command with --confirm weather-eda

> weather-eda
Deleting topic test-topic-01... done
Your topic has been marked for deletion, and will be removed from the cluster shortly

~/project$ heroku kafka:topics
=== Kafka Topics on KAFKA_URL

No topics found on this Kafka cluster.
Use heroku kafka:topics:create to create a topic (limit 40).

Step 7: Prepare Kafka for our application

To prepare for our application to use Kafka, we will need to create two things: a topic and a consumer group.

Let’s create the topic that our application will use.

~/project$ heroku kafka:topics:create weather-data

Next, we’ll create the consumer group that our application’s consumer will be a part of:

~/project$ heroku kafka:consumer-groups:create weather-consumers

We’re ready to build our Node.js application!

Build the Application

Let’s initialize a new project and install our dependencies.

~/project$ npm init -y
~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty

Our project will have two processes running:

  1. consumer.js, which is subscribed to the topic and logs any events that are published.

  2. producer.js, which will publish some randomized weather data to the topic every few seconds.

Both of these processes will need to use KafkaJS to connect to our Kafka cluster, so we will modularize our code to make it reusable.

Working with the Kafka client

In the project src folder, we create a file called kafka.js. It looks like this:

const { Kafka } = require('kafkajs');

const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' ))
const TOPIC = `${process.env.KAFKA_PREFIX}weather-data`
const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers`

const kafka = new Kafka({
  clientId: 'weather-eda-app-nodejs-client',
  brokers: BROKER_URLS,
  ssl: {
     rejectUnauthorized: false,
     ca: process.env.KAFKA_TRUSTED_CERT,
     key: process.env.KAFKA_CLIENT_CERT_KEY,
     cert: process.env.KAFKA_CLIENT_CERT,
  },
})

const producer = async () => {
  const p = kafka.producer()
  await p.connect()
  return p;
}

const consumer = async () => {
  const c = kafka.consumer({
    groupId: CONSUMER_GROUP,
    sessionTimeout: 30000
  })
  await c.connect()
  await c.subscribe({ topics: [TOPIC] });
  return c;
}

module.exports = {
  producer,
  consumer,
  topic: TOPIC,
  groupId: CONSUMER_GROUP
};

In this file, we start by creating a new Kafka client. This requires URLs for the Kafka brokers, which we are able to parse from the KAFKA_URL variable in our .env file (which originally came from calling heroku config). To authenticate the connection attempt, we need to provide KAFKA_TRUSTED_CERT, KAFKA_CLIENT_CERT_KEY, and KAFKA_CLIENT_CERT.

Then, from our Kafka client, we create a producer and a consumer, making sure to subscribe our consumer to the weather-data topic.

Clarification on the Kafka prefix

Notice in kafka.js that we prepend KAFKA_PREFIX to our topic and consumer group name. We’re using the Basic 0 plan for Apache Kafka on Heroku, which is a multi-tenant Kafka plan. This means we work with a KAFKA_PREFIX. Even though we named our topic weather-data and our consumer group weather-consumers, their actual names in our multi-tenant Kafka cluster must have the KAFKA_PREFIX prepended to them (to ensure they are unique).

So, technically, for our demo, the actual topic name is columbia-68051.weather-data, not weather-data. (Likewise for the consumer group name.)

The producer process

Now, let’s create our background process which will act as our weather sensor producers. In our project root folder, we have a file called producer.js. It looks like this:

require('dotenv').config();
const kafka = require('./src/kafka.js');
const { faker } = require('@faker-js/faker');

const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05'];
const MAX_DELAY_MS = 20000;
const READINGS = ['temperature','humidity','barometric_pressure'];
const MAX_TEMP = 130;
const MIN_PRESSURE = 2910;
const PRESSURE_RANGE = 160;

const getRandom = (arr) => arr[faker.number.int(arr.length - 1)];

const getRandomReading = {
  temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100),
  humidity: () => faker.number.int(100) / 100,
  barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100
};

const sleep = (ms) => {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
};

(async () => {
  const producer = await kafka.producer()

  while(true) {
    const sensor = getRandom(SENSORS)
    const reading = getRandom(READINGS)
    const value = getRandomReading[reading]()
    const data = { reading, value }
    await producer.send({
      topic: kafka.topic,
      messages: [{
        key: sensor,
        value: JSON.stringify(data)
      }]
    })
    await sleep(faker.number.int(MAX_DELAY_MS))
  }
})()

A lot of the code in the file has to do with generating random values. I’ll highlight the important parts:

  • We’ll simulate having five different weather sensors. Their names are found in SENSORS.

  • A sensor will emit (publish) a value for one of three possible readings: temperature, humidity, or barometric_pressure. The getRandomReading object has a function for each of these readings, to generate a reasonable corresponding value.

  • The entire process runs as an async function with an infinite while loop.

Within the while loop, we:

  • Choose a sensor at random.

  • Choose a reading at random.

  • Generate a random value for that reading.

  • Call producer.send to publish this data to the topic. The sensor serves as the key for the event, while the reading and value will form the event message.

  • Then, we wait for up to 20 seconds before our next iteration of the loop.

The consumer process

The background process in consumer.js is considerably simpler.

require('dotenv').config();
const logger = require('./src/logger.js');
const kafka = require('./src/kafka.js');

(async () => {
  const consumer = await kafka.consumer()
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const sensorId = message.key.toString()
      const messageObj = JSON.parse(message.value.toString())
      const logMessage = { sensorId }
      logMessage[messageObj.reading] = messageObj.value
      logger.info(logMessage)
    }
  })
})()

Our consumer is already subscribed to the weather-data topic. We call consumer.run, and then we set up a handler for eachMessage. Whenever Kafka notifies the consumer of a message, it logs the message. That’s all there is to it.

Processes and the Procfile

In the package.json file, we need to add a few scripts which start up our producer and consumer background processes. The file should now include the following:

...
  "scripts": {
    "start": "echo 'do nothing'",
    "start:consumer": "node consumer.js",
    "start:producer": "node producer.js"
  },
...

The important ones are start:consumer and start:producer. But we keep start in our file (even though it doesn’t do anything meaningful) because the Heroku builder expects it to be there.

Next, we create a Procfile which will tell Heroku how to start up the various workers we need for our Heroku app. In the root folder of our project, the Procfile should look like this:

consumer_worker: npm run start:consumer
producer_worker: npm run start:producer

Pretty simple, right? We’ll have a background process worker called consumer_worker, and another called producer_worker. You’ll notice that we don’t have a web worker, which is what you would typically see in Procfile for a web application. For our Heroku app, we just need the two background workers. We don’t need web.

Deploy and Test the Application

With that, all of our code is set. We’ve committed all of our code to the repo, and we’re ready to deploy.

~/project$ git push heroku main
…
remote: -----> Build succeeded!
…
remote: -----> Compressing...
remote:        Done: 48.6M
remote: -----> Launching...
…
remote: Verifying deploy... done

After we’ve deployed, we want to make sure that we scale our dynos properly. We don’t need a dyno for a web process, but we’ll need one for both consumer_worker and producer_worker. We run the following command to set these processes based on our needs.

~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1
Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco

Now, everything should be up and running. Behind the scenes, our producer_worker should connect to the Kafka cluster and then begin publishing weather sensor data every few seconds. Then, our consumer_worker should connect to the Kafka cluster and log any messages that it receives from the topic that it is subscribed to.

To see what our consumer_worker is doing, we can look in our Heroku logs.

~/project$ heroku logs --tail
…
heroku[producer_worker.1]: Starting process with command `npm run start:producer`
heroku[producer_worker.1]: State changed from starting to up
app[producer_worker.1]: 
app[producer_worker.1]: > [email protected] start:producer
app[producer_worker.1]: > node producer.js
app[producer_worker.1]: 


…

heroku[consumer_worker.1]: Starting process with command `npm run start:consumer`
heroku[consumer_worker.1]: State changed from starting to up
app[consumer_worker.1]: 
app[consumer_worker.1]: > [email protected] start:consumer
app[consumer_worker.1]: > node consumer.js
app[consumer_worker.1]: 
app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"}
app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041}
app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84}
app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3}
app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11}
app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71}
app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55}
app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58}
app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25}
app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1}
app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34}
app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61}
app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36}
app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}

It works! We know that our producer is periodically publishing messages to Kafka because our consumer is receiving them and then logging them.

Of course, in a larger EDA app, every sensor is a producer. They might publish to multiple topics for various purposes, or they might all publish to the same topic. And your consumer can be subscribed to multiple topics. Also, in our demo app, our consumer simply emitted a lot on eachMessage; but in an EDA application, a consumer might respond by calling a third-party API, sending an SMS notification, or querying a database.

Now that you have a basic understanding of events, topics, producers, and consumers, and you know how to work with Kafka, you can start to design and build your own EDA applications to satisfy more complex business use cases.

Conclusion

EDA is pretty powerful — you can decouple your systems while enjoying key features like easy scalability and real-time data processing. For EDA, Kafka is a key tool, helping you handle high-throughput data streams with ease. Using Apache Kafka on Heroku helps you get started quickly. Since it’s a managed service, you don’t need to worry about the complex parts of Kafka cluster management. You can just focus on building your apps.

From here, it’s time for you to experiment and prototype. Identify which use cases fit well with EDA. Dive in, test it out on Heroku, and build something amazing. Happy coding!

Featured ones: