Logo

dev-resources.site

for different kinds of informations.

🎄 Twelve Days of SMT 🎄 - Day 7: TimestampRouter

Published at
12/16/2020
Categories
apachekafka
kafkaconnect
dataengineering
twelvedaysofsmt
Author
rmoff
Author
5 person written this
rmoff
open
🎄 Twelve Days of SMT 🎄 - Day 7: TimestampRouter

Just like the RegExRouter, the TimeStampRouter can be used to modify the topic name of messages as they pass through Kafka Connect. Since the topic name is usually the basis for the naming of the object to which messages are written in a sink connector, this is a great way to achieve time-based partitioning of those objects if required. For example, instead of streaming messages from Kafka to an Elasticsearch index called cars, they can be routed to monthly indices e.g. cars_2020-10, cars_2020-11, cars_2020-12, etc.

The TimeStampRouter takes two arguments; the format of the final topic name to generate, and the format of the timestamp to put in the topic name (based on SimpleDateFormat).

"transforms" : "addTimestampToTopic",
"transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
Enter fullscreen mode Exit fullscreen mode

Note that the TimeStampRouter uses the timestamp of the Kafka message itself. The message timestamp can be set by the producer API explicitly, or allowed to default to the setting on the broker (log.message.timestamp.type) or topic (message.timestamp.type) which by default is the time on the broker at which the message is created (CreateTime). Message timestamps were added in Apache Kafka 0.10 in KIP-32.

👾 Demo code

Route data to target objects named based on the timestamp of the source data

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day7-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "addTimestampToTopic",
          "transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
          "transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
          "transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
        }'
Enter fullscreen mode Exit fullscreen mode

The data is read from the source topic (day7-transactions) and passes through the transformation which takes the message timestamp and appends it to the topic, resulting in a table in the target database:

mysql> show tables;
+-------------------------------+
| Tables_in_demo                |
+-------------------------------+
| day7-transactions_2020-12-16  |
+-------------------------------+
Enter fullscreen mode Exit fullscreen mode

Reference: JDBC sink connector, see also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code)

What about using a timestamp value from the message itself?

In the payload of the data itself you may well have a date field that denotes the business value of the date by which you want to partition the data. In the data that we streamed to the database above, for example, you can see txn_date:

mysql> SELECT txn_date, item, cost FROM `day7-transactions_2020-12-11` LIMIT 5;
+------------------------------+------------------------------+-------+
| txn_date                     | item                         | cost  |
+------------------------------+------------------------------+-------+
| Thu Dec 03 02:37:03 GMT 2020 | Hop Rod Rye                  | 34.52 |
| Sat Dec 05 18:53:49 GMT 2020 | Duvel                        | 97.81 |
| Fri Dec 04 22:45:54 GMT 2020 | Trois Pistoles               | 41.45 |
| Fri Dec 04 16:37:25 GMT 2020 | Ten FIDY                     | 63.56 |
| Fri Dec 04 17:16:59 GMT 2020 | Stone Imperial Russian Stout | 94.66 |
+------------------------------+------------------------------+-------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

This is the difference between 'system date' (when the data was loaded into Kafka), and 'business date' (when the actual event took place).

There is a Single Message Transform called MessageTimestampRouter which is part of Confluent Platform and can be used to route data based on a time field in the message value itself.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day7-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "addTimestampToTopicFromField",
          "transforms.addTimestampToTopicFromField.type" : "io.confluent.connect.transforms.MessageTimestampRouter",
          "transforms.addTimestampToTopicFromField.message.timestamp.keys" : "txn_date",
          "transforms.addTimestampToTopicFromField.message.timestamp.format": "EEE MMM dd HH:mm:ss zzz YYYY",
          "transforms.addTimestampToTopicFromField.topic.format" : "${topic}_${timestamp}",
          "transforms.addTimestampToTopicFromField.topic.timestamp.format" : "YYYY-MM-dd"
        }'
Enter fullscreen mode Exit fullscreen mode

Currently fails…

org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [appending message's timestamp field to topic], found: org.apache.kafka.connect.data.Struct
        at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)
        at io.confluent.connect.transforms.MessageTimestampRouter.apply(MessageTimestampRouter.java:132)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
        ... 14 more

TRACE [sink-jdbc-mysql-day7-00|task-2] Applying transformation io.confluent.connect.transforms.MessageTimestampRouter to SinkRecord{kafkaOffset=2300, timestampType=CreateTime} ConnectRecord{topic='day7-transactions', kafkaPartition=0, key=013e350e-ac03-44cd-bc2b-7b348ec4df6b, keySchema=Schema{STRING}, value=Struct{txn_date=Thu Dec 03 02:25:24 GMT 2020,cost=73.58,item=Delirium Noctorum,card_type=mastercard,customer_remarks=He laid out Biff in one punch. I didn't know he had it in him. He's never stood up to Biff in his life!}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1607681956641, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
Enter fullscreen mode Exit fullscreen mode

The cause of this is that the Single Message Transform currently expects to handle raw JSON formatted records - not Avro/Protobuf/JSON Schema.

kafkaconnect Article's
30 articles in total
Favicon
Kafka Connect: FileStreamSourceConnector in distributed mode
Favicon
What is Kafka Connect?
Favicon
Publish PostgresSQL Data Changes to React with KsqlDB and MQTT
Favicon
Empowering Your Kafka Connectors: A Guide to Connector Guardian
Favicon
Running Debezium On Kubernetes
Favicon
Constant Lag in CDC Pipeline (JDBC Sink Connector)
Favicon
Kafka Connect sink to OpenSearch/ElasticSearch: how to sink unix timestamps
Favicon
Kafka 2 CockroachDB via JDBC Sink Connector Blueprint
Favicon
KSQL with authenticated kafka connect
Favicon
8 tips to speed up Apache Kafka® Connect development
Favicon
Showcasing Change Data Capture with Debezium and Kafka
Favicon
Use your own connector with Twitter and Aiven for Apache Kafka®
Favicon
Manage Apache Kafka Connect connectors with kcctl
Favicon
Loading CSV data into Confluent Cloud using the FilePulse connector
Favicon
Using Kafka Connect JDBC Source: a PostgreSQL example
Favicon
Kafka Connect JDBC Sink deep-dive: Working with Primary Keys
Favicon
Kafka Connect: The Magic Behind Mux Data Realtime Exports
Favicon
An Overview About the Different Kafka Connect Plugins
Favicon
Heroku Error - H10 App Crashed
Favicon
Apache Kafka Connect Usage Patterns
Favicon
Vinted Search Scaling Chapter 1: Indexing
Favicon
Running a self-managed Kafka Connect worker for Confluent Cloud
Favicon
Streaming data into Kafka S01/E04 — Parsing log files using Grok Expressions
Favicon
Kafka Connect - Deep Dive into Single Message Transforms
Favicon
🎄 Twelve Days of SMT 🎄 - Day 12: Community Transformations
Favicon
🎄 Twelve Days of SMT 🎄 - Day 11: Predicate and Filter
Favicon
🎄 Twelve Days of SMT 🎄 - Day 10: ReplaceField
Favicon
🎄 Twelve Days of SMT 🎄 - Day 9: Cast
Favicon
🎄 Twelve Days of SMT 🎄 - Day 8: TimestampConverter
Favicon
🎄 Twelve Days of SMT 🎄 - Day 7: TimestampRouter

Featured ones: