dev-resources.site
for different kinds of informations.
Running a self-managed Kafka Connect worker for Confluent Cloud
Confluent Cloud is not only a fully -managed Apache Kafka service, but also provides important additional pieces for building applications and pipelines including managed connectors, Schema Registry, and ksqlDB. Managed Connectors are run for you (hence, managed!) within Confluent Cloud - you just specify the technology to which you want to integrate in or out of Kafka and Confluent Cloud does the rest.
The rate at which managed connectors are being added is impressive, but there you may find that the connector you want to use with Confluent Cloud isn’t yet available. In that case you need to run your own Kafka Connect worker which then connects to Confluent Cloud.
In my case I have a Confluent Cloud cluster running on GCP, so it makes sense to run my worker there too (although I could run it anywhere, closer to the cluster seems sensible). I have an ActiveMQ connector that’s pulling data from a 3rd party service that’s also Cloud-based (hence wanting to get all my processing into the Cloud too).
After processing the data in Confluent Cloud (with ksqlDB) I’m going to be streaming the data over to Elasticsearch - in Elastic Cloud. For this I will be able to take advantage of a Confluent Cloud managed connector for Elasticsearch:
Let’s take a look at what’s involved in running a self-managed Kafka Connect worker alongside Confluent Cloud.
What are my options for running a Kafka Connect worker?
Ultimately, Kafka Connect workers are just JVM processes. You can deploy on bare metal or containers. A few options present themselves:
Bare-metal on-premises install of Confluent Platform
IaaS Compute (AWS EC2, Google Compute Engine, etc) install of Confluent Platform
-
Terraform:
Ricardo Ferreira’s Confluent Cloud Tools (based on Terraform)
-
Docker
- On-premises
- Cloud-based
In this article I’m going to look at the latter of these — Docker.
Running Kafka Connect from Docker Compose, connecting to Confluent Cloud
The reason I love working with Docker is that running software no longer looks like this:
Download the software
Unpack the software
Run the installer
Install other stuff to meet dependency requirements
Uninstall previous version that’re creating conflicts
Try to find previous config files
Cry
Instead is looks like this:
Define software requirements and configuration in a Docker Compose file
Run Docker Compose
Win
I’m missing some of the points, but it’s the ability to simply declare the config and runtime and then instantiate it that makes Docker such a joy to use.
To run Kafka Connect using Docker you start with the base image. From there you need to do a few things before the container launches the worker:
Specify environment variables
Install necessary connector plugins (as well as any Single Message Transform and Converters if using ones other than those that ship with the image)
Install any other requires files, such as JDBC Drivers.
Once the worker is running you can then:
- Create connectors. This can be automated, or run manually. I like to automate it :)
Here’s what a Docker Compose looks like for running Kafka Connect locally, connecting to Confluent Cloud. Note that it is configured to also use the Schema Registry hosted in Confluent Cloud by default for the key and value converters.
---
version: '3'
services:
kafka-connect-ccloud:
image: confluentinc/cp-kafka-connect-base:6.0.1
container_name: kafka-connect-ccloud
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-ccloud'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SR_USER:SR_PASSWORD"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SR_USER:SR_PASSWORD"
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [$$curl_status -eq 200] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Creating Kafka Connect source connectors"
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \
-d '{
"connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector",
"activemq.url" : "tcp://my-activemq-endpoint:61619",
"activemq.username" : "ACTIVEMQ_USER",
"activemq.password" : "ACTIVEMQ_PASSWORD",
"jms.destination.type" : "topic",
"jms.destination.name" : "TRAIN_MVT_EA_TOC",
"kafka.topic" : "networkrail_TRAIN_MVT",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "false",
"topic.creation.default.partitions" : 1,
"topic.creation.default.replication.factor" : 3,
"confluent.license" : "",
"confluent.topic.bootstrap.servers" : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";",
"confluent.topic.security.protocol" : "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism" : "PLAIN",
"confluent.topic.request.timeout.ms" : "20000",
"confluent.topic.retry.backoff.ms" : "500"
}'
#
#
sleep infinity
Note that this does everything needed:
Installs the connector (ActiveMQ)
Launches the Kafka Connect worker (forked to a background process with
&
)Waits for the worker to be available
-
Creates the connector
- Observe that
topic.creation.default.partitions
andtopic.creation.default.replication.factor
are set - this means that Confluent Cloud will create the target topics that the connector is to write to automagically. This is possible because of KIP-158 which I wrote about recently.
- Observe that
One other point to note is that the worker uses Kafka itself to store state including configuration and status, and it does so in the topics defined under
CONNECT_CONFIG_STORAGE_TOPIC
CONNECT_OFFSET_STORAGE_TOPIC
CONNECT_STATUS_STORAGE_TOPIC
If you’re [re]creating workers make sure that you don’t have a clash on these topics - use a unique number appended to the end, or as I did here use the epoch as part of the unique name.
Deploying a Docker image to Google Compute Engine (GCE) / Google Cloud Platform (GCP)
NOTE: This it is 💯 a Proof-of-Concept (i.e. not blessed by Confluent in any way as "The Right Way"), and builds on my previous experimentation. If you are doing this in anger then for sure you should figure out how to do it properly, but for my purposes of a quick & dirty solution it worked well. It Works On My Machine [well, Google’s]™
.
So taking the above Docker Compose definition, we can then use GCE’s feature to run Containers on Compute Engine to provision this directly on GCE. For AWS see the approach that I wrote about here.
To launch a container on GCE either use the Web UI, or the gcloud
commandline. The first part of it is simple enough - we name the VM holding the container, we specify the image to use, and so on:
gcloud compute instances create-with-container \
rmoff-connect-source-v01 \
--zone=us-east1-b \
--tags kafka-connect \
--metadata=google-logging-enabled=true \
--container-image confluentinc/cp-kafka-connect-base:6.0.1 \
--container-restart-policy=never \
[…]
When the image starts up, by default it runs the Kafka Connect worker. However, we can override this by specifying a custom command
. We run /bin/bash
as the command, and then pass in -c
as the argument followed by an argument that holds the actual shell script we want to execute:
[…]
--container-command=/bin/bash \
--container-arg=-c \
--container-arg='set -x
# Run this stuff when the container launches
[…]
#
sleep infinity'
Within that command block we use the command
seen in the Docker Compose YAML above. So far, so good.
But (you knew there was a but coming, didn’t you), we also need to specify environment variables, and not just a few - and not just with straightforward values. We’ve got dozens of values, and because we’re specifying SASL config there’s quote marks in there, escape characters, and more. The gcloud
CLI has the --container-env
argument in which we can pass the environment variables as a comma-separated list of key=value pairs, and the =
can be overriden to a custom character - but you still end up with an awful mess like this:
It’s not pretty, and it’s a bit of a bugger to debug. You can pass in a separate file holding environment values but I’m always keen on keeping things self-contained if possible. So instead, since I was overriding the command to run as container launch anyway, I overrode the environment variables at that point instead:
[…]
--container-command=/bin/bash \
--container-arg=-c \
--container-arg='set -x
#
# Set the environment variables
export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01
[…]
#
[…]
#
sleep infinity'
Most important is to finish with sleep infinity
so that the container does not exit (since the Kafka Connect worker process is forked to the background).
It needs some tricky escaping, both of the curl
data (-d
) block, as well as the quoted passages within it. Here is the final shell invocation:
gcloud compute instances create-with-container rmoff-connect-source-v01 \
--zone=us-east1-b \
--tags kafka-connect \
--metadata=google-logging-enabled=true \
--container-image confluentinc/cp-kafka-connect-base:6.0.1 \
--container-restart-policy=never \
--container-command=/bin/bash \
--container-arg=-c \
--container-arg='set -x
#
# Set the environment variables
export CONNECT_CUB_KAFKA_TIMEOUT=300
export CONNECT_BOOTSTRAP_SERVERS=MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092
export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01
export CONNECT_REST_PORT=8083
export CONNECT_GROUP_ID=kafka-connect-group-gcp-v01
export CONNECT_CONFIG_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-configs
export CONNECT_OFFSET_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-offsets
export CONNECT_STATUS_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-status
export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
export CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
export CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
export CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
export CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
export CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
export CONNECT_RETRY_BACKOFF_MS=500
export CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
export CONNECT_SASL_MECHANISM=PLAIN
export CONNECT_SECURITY_PROTOCOL=SASL_SSL
export CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
export CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
export CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
export CONNECT_CONSUMER_RETRY_BACKOFF_MS=500
export CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
export CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
export CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
export CONNECT_PRODUCER_RETRY_BACKOFF_MS=500
export CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
export CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
export CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
#
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
if [$curl_status -eq 200] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Creating Kafka Connect source connectors"
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \
-d '"'"'{
"connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector",
"activemq.url" : "tcp://my-activemq-endpoint:61619",
"activemq.username" : "ACTIVEMQ_USER",
"activemq.password" : "ACTIVEMQ_PASSWORD",
"jms.destination.type" : "topic",
"jms.destination.name" : "TRAIN_MVT_EA_TOC",
"kafka.topic" : "networkrail_TRAIN_MVT_v01",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "false",
"topic.creation.default.partitions" : 1,
"topic.creation.default.replication.factor" : 3,
"confluent.license" : "",
"confluent.topic.bootstrap.servers" : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'CCLOUD_USER'\" password=\"'CCLOUD_PASSWORD'\";",
"confluent.topic.security.protocol" : "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism" : "PLAIN",
"confluent.topic.request.timeout.ms" : "20000",
"confluent.topic.retry.backoff.ms" : "500"
}'"'"'
#
sleep infinity'
Container logs
You can use the rather useful gcloud compute ssh
to connect to the VM directly that’s been launched
gcloud compute ssh --zone "us-east1-b" "rmoff-connect-source-v01"
If you run it too soon after launch you’ll get an error
Warning: Permanently added 'compute.8428359303178581516' (ED25519) to the list of known hosts.
[email protected]: Permission denied (publickey).
ERROR: (gcloud.compute.ssh) [/usr/bin/ssh] exited with return code [255].
Once the VM is running properly you’ll get a shell prompt
########################[Welcome]########################
# You have logged in to the guest OS. #
# To access your containers use 'docker attach' command #
###########################################################
rmoff@rmoff-connect-source-v01 ~ $
From here, you can see the containers running on the VM. To start with you’ll see a couple of internal ones (stackdriver-logging-agent
, konlet
):
rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4a04df77a0be gcr.io/gce-containers/konlet:v.0.11-latest "/bin/gce-containers…" 35 seconds ago Up 32 seconds pedantic_tu
0d008a624e56 gcr.io/stackdriver-agents/stackdriver-logging-agent:0.2-1.5.33-1-1 "/entrypoint.sh /usr…" 2 days ago Up 2 days stackdriver-logging-agent
and soon after, the actual container that you’ve configured to run:
rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1e349180aa20 confluentinc/cp-kafka-connect-base:6.0.1 "/bin/bash -c 'set -…" 33 seconds ago Up 30 seconds (health: starting) klt-rmoff-connect-source-v01-qjez
At this point you’re just in normal Docker world, and can look at the logs as you would locally:
rmoff@rmoff-connect-source-v01 ~ $ docker logs -f klt-rmoff-connect-source-v01-qjez|more
+ export CONNECT_CUB_KAFKA_TIMEOUT=300
+ CONNECT_CUB_KAFKA_TIMEOUT=300
[…]
Installing connector plugins
+ echo 'Installing connector plugins'
+ confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
Running in a "--no-prompt" mode
[…]
[2021-01-11 21:56:38,614] INFO [Worker clientId=connect-1, groupId=kafka-connect-group-gcp-v01] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[…]
With all of this done, you should now see topics on your Confluent Cloud cluster for both the internal Kafka Connect worker topics, and any populated by the connector:
When you want to shut down the VM you can use delete
:
gcloud compute instances delete --zone "us-east1-b" "rmoff-connect-source-v01"
Featured ones: