Logo

dev-resources.site

for different kinds of informations.

๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 12: Community Transformations

Published at
1/4/2021
Categories
apachekafka
kafkaconnect
dataengineering
twelvedaysofsmt
Author
rmoff
Author
5 person written this
rmoff
open
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 12: Community Transformations

Apache Kafka ships with many Single Message Transformations included - but the great thing about it being an open API is that people can, and do, write their own transformations. Many of these are shared with the wider community, and in this final installment of the series Iโ€™m going to look at some of the transformations written by Jeremy Custenborder and available in kafka-connect-transform-common which can be downloaded and installed from Confluent Hub (or built from source, if you like that kind of thing). Also check out the XML transformation by the same author, which Iโ€™ve written about previously.

Change the topic case

๐Ÿ‘‰ Reference

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day12-00/config \
  -d '{
      "connector.class"          : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"           : "jdbc:mysql://mysql:3306/demo",
      "connection.user"          : "mysqluser",
      "connection.password"      : "mysqlpw",
      "topics"                   : "day12-sys01",
      "tasks.max"                : "4",
      "auto.create"              : "true",
      "auto.evolve"              : "true",

      "transforms"               : "topicCase",
      "transforms.topicCase.type": "com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase",
      "transforms.topicCase.from": "LOWER_HYPHEN",
      "transforms.topicCase.to"  : "UPPER_CAMEL"
      }'
Enter fullscreen mode Exit fullscreen mode

The source topic name of day12-sys01 gets modified to Day12Sys01:

mysql> show tables;
+----------------+
| Tables_in_demo |
+----------------+
| Day12Sys01     |
Enter fullscreen mode Exit fullscreen mode

Use the timestamp of a field as the message timestamp

A nice little triumvirate of transformations here, which use the timestamp in a field of a message to modify the topic name.

The three steps are:

  1. TimestampConverter to transform the field from a string to a Timestamp (not necessary if it already is)

  2. ExtractTimestamp to set the timestamp of the Kafka message to the value of the specified field

  3. TimestampRouter to modify the topic name to include the timestamp components required

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day12-01/config \
  -d '{
      "connector.class"                        : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                         : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                        : "mysqluser",
      "connection.password"                    : "mysqlpw",
      "topics"                                 : "day12-sys01",
      "tasks.max"                              : "4",
      "auto.create"                            : "true",
      "auto.evolve"                            : "true",

      "transforms"                              : "convertTS,extractTS,setTopicName",
      "transforms.convertTS.type"               : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.convertTS.field"              : "txn_date",
      "transforms.convertTS.format"             : "EEE MMM dd HH:mm:ss zzz yyyy",
      "transforms.convertTS.target.type"        : "Timestamp",
      "transforms.extractTS.type"               : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
      "transforms.extractTS.field.name"         : "txn_date",
      "transforms.setTopicName.type"            : "org.apache.kafka.connect.transforms.TimestampRouter",
      "transforms.setTopicName.topic.format"    : "${topic}_${timestamp}",
      "transforms.setTopicName.timestamp.format": "YYYY-MM-dd"
      }'
Enter fullscreen mode Exit fullscreen mode

Resulting topic takes the date from the message field txn_date and generates table names accordingly:

mysql> show tables;
+------------------------+
| Tables_in_demo         |
+------------------------+
| day12-sys01_2020-12-07 |
| day12-sys01_2020-12-08 |
| day12-sys01_2020-12-09 |
| day12-sys01_2020-12-10 |
| day12-sys01_2020-12-11 |
| day12-sys01_2020-12-12 |
| day12-sys01_2020-12-13 |
| day12-sys01_2020-12-14 |
| day12-sys01_2020-12-15 |
| day12-sys01_2020-12-16 |
+------------------------+
12 rows in set (0.01 sec)
Enter fullscreen mode Exit fullscreen mode

Add the current timestamp to the message payload

๐Ÿ‘‰ Reference

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day12-02/config \
  -d '{
      "connector.class"          : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"           : "jdbc:mysql://mysql:3306/demo",
      "connection.user"          : "mysqluser",
      "connection.password"      : "mysqlpw",
      "topics"                   : "day12-sys01",
      "tasks.max"                : "4",
      "auto.create"              : "true",
      "auto.evolve"              : "true",

      "transforms"                : "addTSNow",
      "transforms.addTSNow.type"  : "com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value",
      "transforms.addTSNow.fields": "processingTS"
      }'
Enter fullscreen mode Exit fullscreen mode
mysql> select product, amount, txn_date, processingTS from `day12-sys01` ORDER BY units  LIMIT 5;
+------------------------------+--------+------------------------------+-------------------------+
| product                      | amount | txn_date                     | processingTS            |
+------------------------------+--------+------------------------------+-------------------------+
| Sublimely Self-Righteous Ale | 61.25  | Mon Dec 14 09:12:03 GMT 2020 | 2020-12-17 00:43:02.550 |
| Arrogant Bastard Ale         | 88.65  | Wed Dec 09 18:05:02 GMT 2020 | 2020-12-17 00:43:02.559 |
| Sublimely Self-Righteous Ale | 30.81  | Fri Dec 11 14:49:14 GMT 2020 | 2020-12-17 00:43:02.551 |
| Arrogant Bastard Ale         | 20.45  | Tue Dec 08 10:30:21 GMT 2020 | 2020-12-17 00:43:02.223 |
| Sublimely Self-Righteous Ale | 56.95  | Wed Dec 16 23:12:23 GMT 2020 | 2020-12-17 00:43:02.233 |
+------------------------------+--------+------------------------------+-------------------------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Using SimulatorSinkConnector (and Single Message Transform TRACE logging)

Not a transformation as such, but a useful tip for examining the output of Transforms without needing to route the data to an actual target:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-simulator-day12-02/config \
    -d '{
        "connector.class"           : "com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector",
        "topics"                    : "day12-sys01",
        "log.entries"               : "true",
        "transforms"                : "addTSNow",
        "transforms.addTSNow.type"  : "com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value",
        "transforms.addTSNow.fields": "processingTS"
    }'
Enter fullscreen mode Exit fullscreen mode

You can see the message after itโ€™s been processed by the transform(s) in the Kafka Connect worker log:

[2020-12-18 00:29:59,651] INFO [sink-simulator-day12-02|task-0] record.value=Struct{units=39,product=Delirium Tremens,amount=32.60,txn_date=Wed Dec 16 07:27:19 GMT 2020,source=SYS01,processingTS=Fri Dec 18 00:29:59 GMT 2020} (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
Enter fullscreen mode Exit fullscreen mode

You can also get the Kafka Connect runtime to log TRACE messages that show the source messages before a transformation (c.f. Changing the Logging Level for Kafka Connect Dynamically):

curl -s -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.TransformationChain \
    -d '{"level": "TRACE"}' \
    | jq '.'
Enter fullscreen mode Exit fullscreen mode

With that set the Kafka Connect worker then logs the record before it is transformed, and then from the SimulatorSink its state after transform:

[2020-12-18 00:31:54,572] TRACE [sink-simulator-day12-02|task-0] Applying transformation
com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value to
SinkRecord{kafkaOffset=121, timestampType=CreateTime} ConnectRecord{topic='day12-sys01',
kafkaPartition=0, key=fd403528-90c3-45a1-a1c5-3f9ebe2799be, keySchema=Schema{STRING},
value=Struct{units=6,product=Nugget Nectar,amount=91.30,txn_date=Thu Dec 10 06:51:22 GMT
2020,source=SYS01}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608251514568,
headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)

[2020-12-18 00:31:54,572] INFO [sink-simulator-day12-02|task-0]
record.value=Struct{units=6,product=Nugget Nectar,amount=91.30,txn_date=Thu Dec 10 06:51:22 GMT
2020,source=SYS01,processingTS=Fri Dec 18 00:31:54 GMT 2020}
(com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
Enter fullscreen mode Exit fullscreen mode

Try it out!

You can find the full code for trying this outโ€”including a Docker Compose so you can spin it up on your local machineโ€” ๐Ÿ‘พ here

kafkaconnect Article's
30 articles in total
Favicon
Kafka Connect: FileStreamSourceConnector in distributed mode
Favicon
What is Kafka Connect?
Favicon
Publish PostgresSQL Data Changes to React with KsqlDB and MQTT
Favicon
Empowering Your Kafka Connectors: A Guide to Connector Guardian
Favicon
Running Debezium On Kubernetes
Favicon
Constant Lag in CDC Pipeline (JDBC Sink Connector)
Favicon
Kafka Connect sink to OpenSearch/ElasticSearch: how to sink unix timestamps
Favicon
Kafka 2 CockroachDB via JDBC Sink Connector Blueprint
Favicon
KSQL with authenticated kafka connect
Favicon
8 tips to speed up Apache Kafkaยฎ Connect development
Favicon
Showcasing Change Data Capture with Debezium and Kafka
Favicon
Use your own connector with Twitter and Aiven for Apache Kafkaยฎ
Favicon
Manage Apache Kafka Connect connectors with kcctl
Favicon
Loading CSV data into Confluent Cloud using the FilePulse connector
Favicon
Using Kafka Connect JDBC Source: a PostgreSQL example
Favicon
Kafka Connect JDBC Sink deep-dive: Working with Primary Keys
Favicon
Kafka Connect: The Magic Behind Mux Data Realtime Exports
Favicon
An Overview About the Different Kafka Connect Plugins
Favicon
Heroku Error - H10 App Crashed
Favicon
Apache Kafka Connect Usage Patterns
Favicon
Vinted Search Scaling Chapter 1: Indexing
Favicon
Running a self-managed Kafka Connect worker for Confluent Cloud
Favicon
Streaming data into Kafka S01/E04 โ€” Parsing log files using Grok Expressions
Favicon
Kafka Connect - Deep Dive into Single Message Transforms
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 12: Community Transformations
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 11: Predicate and Filter
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 10: ReplaceField
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 9: Cast
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 8: TimestampConverter
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 7: TimestampRouter

Featured ones: