You can use the InsertField
Single Message Transform (SMT) to add the message timestamp into each message that Kafka Connect sends to a sink.
To use the Single Message Transform specify the name of the field (timestamp.field
) that you want to add to hold the message timestamp:
"transforms" : "insertTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
The message timestamp can be set by the producer API explicitly, or allowed to default to the setting on the broker (log.message.timestamp.type
) or topic (message.timestamp.type
) which by default is the time on the broker at which the message is created (CreateTime
). Message timestamps were added in Apache Kafka 0.10 in KIP-32.
Example - JDBC Sink connector #
Given a JDBC sink connector that looks like this:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "transactions",
"tasks.max" : "4",
"auto.create" : "true"
}'
you can add the Single Message Transform as shown here:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "insertTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
}'
Note auto.evolve=true
otherwise the target table wonโt hold the new field unless it happens to exist already.
The JDBC connector correctly populates the new field as a timestamp type:
mysql> describe transactions;
+-------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| customer_id | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| item | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| messageTS | datetime(3) | YES | | NULL | |
+-------------+-------------+------+-----+---------+-------+
5 rows in set (0.01 sec)
mysql> select * from transactions limit 5;
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
| customer_id | cost | item | card_type | messageTS |
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
| b7187b3a-8ef4-469f-a99d-29dbc0cc3608 | 40.26 | Ten FIDY | discover | 2020-12-08 22:42:26.503 |
| c44a2596-ad4a-4c51-ba81-0a86cc48a2d3 | 96.60 | Trois Pistoles | maestro | 2020-12-08 22:42:26.842 |
| b7187b3a-8ef4-469f-a99d-29dbc0cc3608 | 53.44 | Chimay Grande Rserve | visa | 2020-12-08 22:42:27.341 |
| c44a2596-ad4a-4c51-ba81-0a86cc48a2d3 | 25.33 | Chimay Grande Rserve | dankort | 2020-12-08 22:42:27.841 |
| 573a732e-1d42-4749-a99d-a89391cd2858 | 43.95 | Arrogant Bastard Ale | switch | 2020-12-08 22:42:28.342 |
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
5 rows in set (0.00 sec)
See also ๐ฅ Kafka Connect in Action : JDBC Sink (๐พ demo code
) and ๐ฅ ksqlDB & Kafka Connect JDBC Sink in action (๐พ demo code
)
Example - S3 Sink connector #
InsertField
writes the timestamp as a unix epoch value - if youโd rather it in a string then you can use an additional Single Message Transform, TimestampConverter
as shown here in an example with the AWS S3 sink connector:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-00/config \
-d '{
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"s3.region" : "us-west-2",
"s3.bucket.name" : "rmoff-smt-demo-01",
"topics" : "customers,transactions",
"tasks.max" : "4",
"flush.size" : "16",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility" : "NONE",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms" : "insertTS,formatTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field" : "messageTS",
"transforms.formatTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.format" : "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTS.field" : "messageTS",
"transforms.formatTS.target.type" : "string"
}'
This writes messages as JSON to S3 that look like this:
{
"customer_id": "d0394941-2d2a-4d34-a374-23e5f5364ea9",
"cost": "25.21",
"item": "Founders Breakfast Stout",
"card_type": "jcb",
"messageTS": "2020-12-08 16:07:39:904"
}
See also ๐ฅ Kafka Connect in Action : S3 Sink (๐พ kafka-to-s3 demo code
)