Logo

dev-resources.site

for different kinds of informations.

๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 10: ReplaceField

Published at
1/4/2021
Categories
apachekafka
kafkaconnect
dataengineering
twelvedaysofsmt
Author
rmoff
Author
5 person written this
rmoff
open
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 10: ReplaceField

The ReplaceField Single Message Transform has three modes of operation on fields of data passing through Kafka Connect:

  • Include only the fields specified in the list (whitelist)

  • Include all fields except the ones specified (blacklist)

  • Rename field(s) (renames)

It can be used in both a source and sink connector depending on requirements.

(KIP-629 has started to be implemented in Apache Kafka 2.7. If you are using an earlier version then you will have to use blacklist and whitelist in place of exclude and include respectively)

๐Ÿ‘พ Demo code

Dropping fields in a sink connector

Imagine we have a source topic with multiple fields in:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-transactions -C -c1 -o-1 -u -q -J | jq '.payload'
Enter fullscreen mode Exit fullscreen mode
{
  "Gen0": {
    "cost" : { "string": "12.85" },
    "units" : { "string": "2" },
    "card_type" : { "string": "maestro" },
    "item" : { "string": "Hercules Double IPA" },
    "customer_remarks": { "string": "Perfect! My experiment worked! They're all exactly 25 minutes slow!" },
    "cc_num" : { "string": "1228-1221-1221-1431" },
    "cc_exp" : { "string": "2013-9-12" },
    "txn_date" : { "string": "Wed Dec 16 07:59:52 GMT 2020" }
  }
}
Enter fullscreen mode Exit fullscreen mode

This data is in Kafka, but the system to which weโ€™re going to stream it with a sink connector mustnโ€™t hold sensitive information, such as credit card data. One option is to Mask it, but this retains the fields in the payload which is wasteful if we simply donโ€™t want them in the target system. We can use ReplaceField to exclude a set of fields from passing through Kafka Connect:

curl -i -X PUT -H "Accept:application/json" \
  -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-01/config \
  -d '{
      "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url" : "jdbc:mysql://mysql:3306/demo",
      "connection.user" : "mysqluser",
      "connection.password" : "mysqlpw",
      "topics" : "day10-transactions",
      "tasks.max" : "4",
      "auto.create" : "true",
      "auto.evolve" : "true",
      "transforms" : "dropCC",
      "transforms.dropCC.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.dropCC.blacklist": "cc_num,cc_exp,card_type"
      }'
Enter fullscreen mode Exit fullscreen mode

In the target system (a database, in this case) the credit card fields are not present, exactly as intended:

mysql> describe `day10-transactions`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost             | text | YES  |     | NULL    |       |
| units            | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| customer_remarks | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Including only certain fields in a source connector

This time we have a source connector thatโ€™s ingesting data from a system that includes numerous fields that we donโ€™t want to ingest into Kafka. Because a Single Message Transform applies to the pipeline before a message is written to Kafka, not after, we can deliberately ensure that certain data is never stored in Kafka if itโ€™s not intended to be.

The source connector in this example is reading data from a database with a schema that looks like this:

mysql> describe production_data;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost             | text | YES  |     | NULL    |       |
| units            | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| customer_remarks | text | YES  |     | NULL    |       |
| cc_num           | text | YES  |     | NULL    |       |
| cc_exp           | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
8 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Unlike the scenario in the first section this time we want to extract data from the source system but only certain fields that we need for our particular analytics pipeline. It may be that itโ€™s inefficient to ingest a large number of redundant fields, or that the data is sensitive and weโ€™re not allowed to store it in our topic. In this case we specify just a list of fields to include:

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-day10-00/config \
  -H "Content-Type: application/json" -d '{
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" : "jdbc:mysql://mysql:3306/demo",
    "connection.user" : "mysqluser",
    "connection.password" : "mysqlpw",
    "topic.prefix" : "day10-",
    "poll.interval.ms" : 10000,
    "tasks.max" : 1,
    "table.whitelist" : "production_data",
    "mode" : "bulk",
    "transforms" : "selectFields",
    "transforms.selectFields.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.selectFields.whitelist": "item,cost,units,txn_date"
  }'
Enter fullscreen mode Exit fullscreen mode

The resulting Kafka topic is populated with only the fields of interest:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-production_data -C -o-1 -u -q -J | jq '.payload'
Enter fullscreen mode Exit fullscreen mode
{
  "cost"    : { "string": "48.54" },
  "units"   : { "string": "41" },
  "item"    : { "string": "Oak Aged Yeti Imperial Stout" },
  "txn_date": { "string": "Mon Dec 14 11:43:56 GMT 2020" }
}
Enter fullscreen mode Exit fullscreen mode

Renaming fields

Perhaps you want to keep all the fields in the payload - but you want to change the name of them. This could be for various reasons, including:

  • Standardise common naming for the same business measures as data is ingested into Kafka

  • Change a field to fit an existing name in a target object in a sink connector

Hereโ€™s an example renaming a field in a sink connector:

curl -i -X PUT -H "Accept:application/json" \
  -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-02/config \
  -d '{
      "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url" : "jdbc:mysql://mysql:3306/demo",
      "connection.user" : "mysqluser",
      "connection.password" : "mysqlpw",
      "topics" : "day10-production_data",
      "tasks.max" : "4",
      "auto.create" : "true",
      "auto.evolve" : "true",
      "transforms" : "renameTS",
      "transforms.renameTS.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.renameTS.renames": "txn_date:transaction_timestamp"
      }'
Enter fullscreen mode Exit fullscreen mode

The resulting table in the database has the amended field name (transaction_timestamp):

mysql> describe `day10-production_data`;
+-----------------------+------+------+-----+---------+-------+
| Field                 | Type | Null | Key | Default | Extra |
+-----------------------+------+------+-----+---------+-------+
| cost                  | text | YES  |     | NULL    |       |
| units                 | text | YES  |     | NULL    |       |
| card_type             | text | YES  |     | NULL    |       |
| item                  | text | YES  |     | NULL    |       |
| customer_remarks      | text | YES  |     | NULL    |       |
| cc_num                | text | YES  |     | NULL    |       |
| cc_exp                | text | YES  |     | NULL    |       |
| transaction_timestamp | text | YES  |     | NULL    |       |
+-----------------------+------+------+-----+---------+-------+
8 rows in set (0.01 sec)
Enter fullscreen mode Exit fullscreen mode

Try it out!

You can find the full code for trying this outโ€”including a Docker Compose so you can spin it up on your local machineโ€” ๐Ÿ‘พ here

kafkaconnect Article's
30 articles in total
Favicon
Kafka Connect: FileStreamSourceConnector in distributed mode
Favicon
What is Kafka Connect?
Favicon
Publish PostgresSQL Data Changes to React with KsqlDB and MQTT
Favicon
Empowering Your Kafka Connectors: A Guide to Connector Guardian
Favicon
Running Debezium On Kubernetes
Favicon
Constant Lag in CDC Pipeline (JDBC Sink Connector)
Favicon
Kafka Connect sink to OpenSearch/ElasticSearch: how to sink unix timestamps
Favicon
Kafka 2 CockroachDB via JDBC Sink Connector Blueprint
Favicon
KSQL with authenticated kafka connect
Favicon
8 tips to speed up Apache Kafkaยฎ Connect development
Favicon
Showcasing Change Data Capture with Debezium and Kafka
Favicon
Use your own connector with Twitter and Aiven for Apache Kafkaยฎ
Favicon
Manage Apache Kafka Connect connectors with kcctl
Favicon
Loading CSV data into Confluent Cloud using the FilePulse connector
Favicon
Using Kafka Connect JDBC Source: a PostgreSQL example
Favicon
Kafka Connect JDBC Sink deep-dive: Working with Primary Keys
Favicon
Kafka Connect: The Magic Behind Mux Data Realtime Exports
Favicon
An Overview About the Different Kafka Connect Plugins
Favicon
Heroku Error - H10 App Crashed
Favicon
Apache Kafka Connect Usage Patterns
Favicon
Vinted Search Scaling Chapter 1: Indexing
Favicon
Running a self-managed Kafka Connect worker for Confluent Cloud
Favicon
Streaming data into Kafka S01/E04 โ€” Parsing log files using Grok Expressions
Favicon
Kafka Connect - Deep Dive into Single Message Transforms
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 12: Community Transformations
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 11: Predicate and Filter
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 10: ReplaceField
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 9: Cast
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 8: TimestampConverter
Favicon
๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 7: TimestampRouter

Featured ones: