Logo

dev-resources.site

for different kinds of informations.

Spring Cloud Functions, Kafka | How to interact asynchronous

Published at
12/24/2023
Categories
springcloudstream
springcloudfunctions
kafka
java
Author
Yegor Voronianskii
Spring Cloud Functions, Kafka | How to interact asynchronous

Introduction

This article will show how to use the Spring Cloud Functions projects. The Spring Cloud Functions is the best approach to implementing a straightforward business logic.
Nowadays, the usage of message brokers such as Kafka has increased.
I will implement a simple system in this article, as shown below.

Diagram of developing system

Implementation

I will start with a simple example of a cloud function, which is the same as a rest endpoint in the classical spring boot web application. You can create a project from your favorite IDE or from Spring Initliazr.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>io.vrnsky</groupId>
    <artifactId>cloud-function-push</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>cloud-function-push</name>
    <description>cloud-function-push</description>
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2023.0.0</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-function-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-function-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Your pom.xml file or build.gradle should like this.

The first step is to describe events related to service. I will stick with the following definition of an event.

package io.vrnsky.cloudfunctionpush.domain;

import java.util.List;

public record PullRequestAccepted(
        String author,
        List<String> reviewers,

        boolean havePassedAllChecks
) {
}

The first step is to describe events related to service. I will stick with the following definition of an event.
The second step is to create a configuration class within the bean function creation inside it.

package io.vrnsky.cloudfunctionpush.config;

import io.vrnsky.cloudfunctionpush.domain.PullRequestAccepted;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Function;


@Configuration
public class FunctionConfig {

    @Bean
    public Function<PullRequestAccepted, String> pullRequestAccepted() {
        return value -> """
                Pull request from %s has been reviewed by %s. Have passed all checks: %b
                """
                .formatted(value.author(), value.reviewers().size(), value.havePassedAllChecks());
    }
}

It’s time to start our cloud function push service and check that it works correctly.

curl http://localhost:8080/pullRequestAccepted -H "Content-type:application/json" -d '{"author":"Yegor Voronianskii","reviewers":[],"havePassedAllChecks":true}'
Pull request from Yegor Voronianskii has been reviewed by 0. Have passed all checks: true

To achieve that, we must add the following dependencies to pom.xml or build.gradle

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

Before starting the service, it is required to start Kafka and Zookeeper. Here is a docker-compose file to start both.

version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "22181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:29092
      KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"
      - "29092:29092"

You can start Kafka and Zookeeper by following the command:

docker compose up

The next step it to configure our application and add some code. Here, you can see the updated version of the FunctionConfig class, where an instance of the stream bridge is injected. You can use Lombok for brevity. It is up to you.

package io.vrnsky.cloudfunctionpush.config;

import io.vrnsky.cloudfunctionpush.domain.PullRequestAccepted;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Function;


@Configuration
public class FunctionConfig {

    private final StreamBridge streamBridge;

    public FunctionConfig(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @Bean
    public Function<PullRequestAccepted, String> pullRequestAccepted() {
        return value -> {
            streamBridge.send("pullRequestAccepted-out-0", value);
            return "Message have been sent";
        };
    }
}

Below is the updated application configuration, where I specify the broker URL and other required settings.

spring:
  kafka:
    bootstrap-servers:
      - http://localhost:29092
  cloud:
    stream:
      kafka:
        binder:
          brokers:
            - http://localhost:29092
      output-bindings: pullRequestAccepted-out-0

logging:
  level:
    org.springframework.cloud.stream: TRACE

Now, we can check that the message has been sent to the broker.

curl http://localhost:8080/pullRequestAccepted -H "Content-type:application/json" -d '{"author":"Yegor Voronianskii","reviewers":[],"havePassedAllChecks":true}'
Message have been sent

In the cloud function push service, you should see something like this.

--- [nio-8080-exec-1] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
--- [nio-8080-exec-1] o.s.c.s.m.DirectWithAttributesChannel    : preSend on channel 'bean 'pullRequestAccepted-out-0'', message: GenericMessage [payload=byte[73], headers={id=7c176f51-1a1a-63d9-b0ed-20cc1ce71fc8, contentType=application/json, target-protocol=kafka, timestamp=1703409370932}]
--- [nio-8080-exec-1] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@7d99be9b received message: GenericMessage [payload=byte[73], headers={id=7c176f51-1a1a-63d9-b0ed-20cc1ce71fc8, contentType=application/json, target-protocol=kafka, timestamp=1703409370932}]
--- [nio-8080-exec-1] nder$ProducerConfigurationMessageHandler : org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@d36ee78 received message: GenericMessage [payload=byte[73], headers={id=d9ab9d27-fdf6-53a7-9db6-ae98e6130272, contentType=application/json, target-protocol=kafka, timestamp=1703409370933}]
--- [nio-8080-exec-1] nder$ProducerConfigurationMessageHandler : handler 'org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@d36ee78' produced no reply for request Message: GenericMessage [payload=byte[73], headers={id=d9ab9d27-fdf6-53a7-9db6-ae98e6130272, contentType=application/json, target-protocol=kafka, timestamp=1703409370933}]
--- [nio-8080-exec-1] o.s.c.s.m.DirectWithAttributesChannel    : postSend (sent=true) on channel 'bean 'pullRequestAccepted-out-0'', message: GenericMessage [payload=byte[73], headers={id=7c176f51-1a1a-63d9-b0ed-20cc1ce71fc8, contentType=application/json, target-protocol=kafka, timestamp=1703409370932}]

The next step is implementing Spring Cloud Function to consume events from the message broker. The creation project’s steps are similar to those of the cloud push service. To configure our cloud pull service, we should have an application.properties or application.yml file with the following settings.

spring:
  kafka:
    bootstrap-servers:
      - http://localhost:29092
  cloud:
    function:
      definition: pullRequestAccepted
    stream:
      kafka:
        binder:
          brokers:
            - http://localhost:29092
      input-bindings: pullRequestAccepted-in-0
      bindings:
        pullRequestAccepted-in-0:
          destination: pullRequestAccepted-out-0
          group: myGroup
logging:
  level:
    org.springframework.cloud.stream: TRACE
server:
  port: 9090

To handle incoming messages, we have to define the consumer of this message. I stick with a simple solution that prints the message into standard output.

package com.example.cloudfunctionpull.config;

import com.example.cloudfunctionpull.domain.PullRequestAccepted;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;

import java.util.function.Consumer;

@Configuration
public class FunctionsConfig {

    @Bean
    public Consumer<Message<PullRequestAccepted>> pullRequestAccepted() {
        return message -> {
            PullRequestAccepted payload = message.getPayload();
            System.out.println("Received message: " + payload);
        };
    }

    @Bean
    public Consumer<Message<?>> myErrorHandler() {
        return errorMessage -> {
            System.out.println("Handling error message " + errorMessage);
        };
    }
}

After a successful start of consuming the service, we can again push the message to the message broker and check that the message was consumed to pull the service.

kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@1721c358, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=pullRequestAccepted-out-0, kafka_receivedTimestamp=1703411981591, kafka_groupId=myGroup, target-protocol=kafka}]
Received message: PullRequestAccepted[author=Yegor Voronianskii, reviewers=[], havePassedAllChecks=true]




Conclusion

In my honest opinion, the Spring Cloud Function is a great tool for small, short-lived microservices. The Spring Cloud Stream is an excellent tool for building event-driven microservices architecture. But there are pros; of course, this project’s complexity can bring a lot of pain in terms of maintenance and understanding new paradigms.

References

  1. Spring Cloud Stream Documentation

  2. Spring Cloud Functions Documentation

  3. Kafka Documentation

Featured ones: