Logo

dev-resources.site

for different kinds of informations.

Manage Apache Kafka Connect connectors with kcctl

Published at
10/13/2021
Categories
apachekafka
kafkaconnect
kcctl
Author
ftisiot
Categories
3 categories in total
apachekafka
open
kafkaconnect
open
kcctl
open
Author
7 person written this
ftisiot
open
Manage Apache Kafka Connect connectors with kcctl

Image showing a terminal with an astronaut flowing and the wording cli power

Apache Kafka is widely used as company data backbone with Kafka Connect acting as a bridge. This enables an easy, reliable and scalable integration of Kafka with other existing technologies. Kafka Connect REST APIs provide a way to manage connectors via web calls, but crafting URLs on a terminal can sometimes be tricky.

In this blog post we will explore kcctl, a new open source command line tool for Kafka Connect. We'll show how we can integrate it with Apache Kafka and manage our connections to other systems.

At Aiven, we offer similar functionality in the Aiven command line interface, that you can use to create, drop, change, verify, pause and restore any Kafka Connect connector running on Aiven services. If you're all your Kafka Connect instances are Aiven services, then the Aiven command line interface is all you need. If on the other side, you want to use the same tool for any Kafka Connect instance, being it on-premises, on Aiven or in other cloud providers, then kcctl is your friend.

Create an Apache Kafka instance with Kafka Connect

To start following the process in this article, you're supposed to already have an Apache Kafka environment with Kafka Connect up and running. If you don't have one, don't worry: Aiven can provide one in minutes. Just create one in the Aiven Console, or in the Aiven Command Line Interface with following command:

avn service create demo-kafka               \
    --service-type kafka                    \
    --cloud google-europe-west3             \
    --plan business-4                       \
    -c kafka_connect=true                   \
    -c kafka.auto_create_topics_enable=true                  
Enter fullscreen mode Exit fullscreen mode

The above command creates an Aiven for Apache Kafka cluster named demo-kafka, of three nodes (with the business-4 plan), in the google-europe-west3 region and enables the automatic creation of topics and Kafka Connect. With Aiven, you can deploy Kafka Connect as part of your Kafka cluster for business and premium plans or as separate, standalone cluster. To read more about Aiven for Apache Kafka and related Kafka Connect topics, check out the dedicated page.

We can wait until the service is ready with:

avn service wait demo-kafka
Enter fullscreen mode Exit fullscreen mode

Install kcctl

At the moment of writing, kcctl is in an early access release. The current set of installation instructions can be found in its GitHub repository.

Once kcctl is installed, we can test that it working in our terminal by adding the bin subdirectory to our PATH and executing:

kcctl
Enter fullscreen mode Exit fullscreen mode

If we did everything correctly, then we should see the usage information. Now it's time to connect to our Kafka Connect cluster.

Connect

In order to plug in to Kafka Connect, first retrieve the cluster URL, which can be done using the via Aiven CLI and jq, to parse the JSON output:

avn service get demo-kafka --json | jq '.connection_info.kafka_connect_uri'
Enter fullscreen mode Exit fullscreen mode

We can now create a kcctl configuration context by executing the following command replacing the cluster parameter accordingly.

kcctl config set-context \
    --cluster https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443 \
    my_kafka_cluster
Enter fullscreen mode Exit fullscreen mode

The above creates a context named my_kafka_cluster pointing to the demo-kafka instance. To verify the configuration:

kcctl info
Enter fullscreen mode Exit fullscreen mode

This retrieves its definition of the current kcctl configuration context:

URL:               https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443
Version:           2.7.2-SNAPSHOT
Commit:            d15ddddd3ef3f5ef
Kafka Cluster ID:  -DvILyiXQxSpnFSK9M1qgQ
Enter fullscreen mode Exit fullscreen mode

Create a data source in PostgreSQL

To see the connectors in action, let's create a PostgreSQL database and configure a Kafka Connect JDBC source connector to bring the data into Kafka. The connector will take data from a table named pasta stored in a PostgreSQL database and include them in a Kafka topic. If you don't have a PostgreSQL database handy, you can create one at Aiven with the following Aiven CLI command:

avn service create demo-pg               \
    --service-type pg                    \
    --cloud google-europe-west3          \
    --plan hobbyist
Enter fullscreen mode Exit fullscreen mode

Once the demo-pg PostgreSQL instance is up and running (use avn service wait demo-pg to wait for it), connect to it:

avn service cli demo-pg
Enter fullscreen mode Exit fullscreen mode

Now we can create our sample pasta table and fill it with data, using the following statements in our terminal:

create table pasta (id serial, name varchar, cooking_minutes int);
insert into pasta (name, cooking_minutes) values ('spaghetti', 8);
insert into pasta (name, cooking_minutes) values ('spaghettini', 6);
insert into pasta (name, cooking_minutes) values ('fusilli', 9);
insert into pasta (name, cooking_minutes) values ('trofie', 5);
Enter fullscreen mode Exit fullscreen mode

Create a new Kafka Connect connector

Once the source data is available, we can create the Kafka Connect JDBC source connector, sourcing in incremental mode the pasta table based on the id column. To get the required PostgreSQL connection details such as hostname, port, user and password, use this command:

avn service get demo-pg --format '{service_uri_params}'
Enter fullscreen mode Exit fullscreen mode

Create a file named my_jdbc_connect_source.json with the following JSON content (substituting the <HOST>, <PORT> and <PASSWORD> with the actual information retrieved in the previous step):

{
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOST>:<PORT>/defaultdb?sslmode=require",
    "connection.user": "avnadmin",
    "connection.password": "<PASSWORD>",
    "table.whitelist": "pasta",
    "mode": "incrementing",
    "incrementing.column.name":"id",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_"
}
Enter fullscreen mode Exit fullscreen mode

Now we can invoke the connector creation via kcctl in a new terminal window:

kcctl apply -f my_jdbc_connect_source.json --name pg-incremental-source
Enter fullscreen mode Exit fullscreen mode

We can verify that the connector was successfully created:

kcctl describe connector pg-incremental-source
Enter fullscreen mode Exit fullscreen mode

The command output shows the pg-incremental-source connector in RUNNING state and all the details associated with it.

Check the data in Apache Kafka with kcctl

We can also check that we have a new Kafka topic called pg_source_pasta with the same data stored in PostgreSQL. We can check it via kcat. Start by first downloading the required certificates:

avn service user-creds-download demo-kafka \
    --username avnadmin                    \
    -d certs
Enter fullscreen mode Exit fullscreen mode

Then create a kcat.config file containing the following entries:

bootstrap.servers=<HOST>:<PORT>
security.protocol=ssl
ssl.key.location=certs/service.key
ssl.certificate.location=certs/service.cert
ssl.ca.location=certs/ca.pem
Enter fullscreen mode Exit fullscreen mode

And reading from the pg_source_pasta topic with the following kcat invocation:

kcat -F kcat.config -C -t pg_source_pasta
Enter fullscreen mode Exit fullscreen mode

If we now insert some rows in the PostgreSQL pasta table, we can see the same changes appearing in Kafka via kcat:

Gif showing Postgresql insert and related rows flowing in Kafka via kcat

Managing Kafka Connect connectors with kcctl

Creating connectors is only part of the game with kcctl - we can also manage them! Need a list of all the connectors deployed? Just run the following command:

kcctl get connectors
Enter fullscreen mode Exit fullscreen mode

Need to pause and resume connectors? The code below, for example, pauses the one named pg-incremental-source:

kcctl pause connector pg-incremental-source
Enter fullscreen mode Exit fullscreen mode

What type of connectors can we create? Glad you asked! We can find the full plugin list with:

kcctl get plugins
Enter fullscreen mode Exit fullscreen mode

The command shows all the connector plugins available with the related type (source or sink) and version. With this command you'll be able to check the list of the managed Kafka Connect connector types you can create with Aiven for Apache Kafka.

TYPE     CLASS                                                                           VERSION
 source   com.couchbase.connect.kafka.CouchbaseSourceConnector                            4.0.6
 source   com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceCon   2.1.3
         nector
 source   com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector       2.1.3
 source   com.google.pubsub.kafka.source.CloudPubSubSourceConnector                       2.7.2-SNAPSHOT
 source   com.google.pubsublite.kafka.source.PubSubLiteSourceConnector                    2.7.2-SNAPSHOT

 ...
 sink     io.aiven.kafka.connect.gcs.GcsSinkConnector                                     0.9.0
 sink     io.aiven.kafka.connect.http.HttpSinkConnector                                   0.4.0
 sink     io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector                      2.12.0
 ...
Enter fullscreen mode Exit fullscreen mode

Wrapping up

Managing Kafka Connect connectors from the terminal is only few commands away. kcctl makes it easy to inspect, deploy, update, pause and restore any connector to our environments. This unifies the end-user experience for Apache Kafka instances deployed on-premises, self-hosted or in Aiven.

Further reading:

kafkaconnect Article's
30 articles in total
Favicon
Kafka Connect: FileStreamSourceConnector in distributed mode
Favicon
What is Kafka Connect?
Favicon
Publish PostgresSQL Data Changes to React with KsqlDB and MQTT
Favicon
Empowering Your Kafka Connectors: A Guide to Connector Guardian
Favicon
Running Debezium On Kubernetes
Favicon
Constant Lag in CDC Pipeline (JDBC Sink Connector)
Favicon
Kafka Connect sink to OpenSearch/ElasticSearch: how to sink unix timestamps
Favicon
Kafka 2 CockroachDB via JDBC Sink Connector Blueprint
Favicon
KSQL with authenticated kafka connect
Favicon
8 tips to speed up Apache Kafka® Connect development
Favicon
Showcasing Change Data Capture with Debezium and Kafka
Favicon
Use your own connector with Twitter and Aiven for Apache Kafka®
Favicon
Manage Apache Kafka Connect connectors with kcctl
Favicon
Loading CSV data into Confluent Cloud using the FilePulse connector
Favicon
Using Kafka Connect JDBC Source: a PostgreSQL example
Favicon
Kafka Connect JDBC Sink deep-dive: Working with Primary Keys
Favicon
Kafka Connect: The Magic Behind Mux Data Realtime Exports
Favicon
An Overview About the Different Kafka Connect Plugins
Favicon
Heroku Error - H10 App Crashed
Favicon
Apache Kafka Connect Usage Patterns
Favicon
Vinted Search Scaling Chapter 1: Indexing
Favicon
Running a self-managed Kafka Connect worker for Confluent Cloud
Favicon
Streaming data into Kafka S01/E04 — Parsing log files using Grok Expressions
Favicon
Kafka Connect - Deep Dive into Single Message Transforms
Favicon
🎄 Twelve Days of SMT 🎄 - Day 12: Community Transformations
Favicon
🎄 Twelve Days of SMT 🎄 - Day 11: Predicate and Filter
Favicon
🎄 Twelve Days of SMT 🎄 - Day 10: ReplaceField
Favicon
🎄 Twelve Days of SMT 🎄 - Day 9: Cast
Favicon
🎄 Twelve Days of SMT 🎄 - Day 8: TimestampConverter
Favicon
🎄 Twelve Days of SMT 🎄 - Day 7: TimestampRouter

Featured ones: