Some people learn through doing - and for that there’s a bunch of good ksqlDB tutorials here and here. Others may prefer to watch and listen first, before getting hands on. And for that, I humbly offer you this little series of videos all about ksqlDB. They’re all based on a set of demo scripts that you can run for yourself and try out.
🚨 Make sure you subscribe to my YouTube channel so that you don’t miss more videos like these!
S01E01 : Filtering #
Using ksqlDB you can filter streams of data in Apache Kafka and write new topics in Kafka populated by a subset of another. For example
CREATE STREAM ORDERS_NY AS
SELECT *
FROM ORDERS
WHERE ADDRESS_STATE='New York';
S01E02 : Schema Manipulation and Data Transformation #
There are lots of transformations that you can do on streams in ksqlDB including:
-
Remove/drop fields
-
Rename fields
-
CAST
datatypes -
Reformat timestamps from
BIGINT
epoch to human-readable strings -
Flatten nested objects (
STRUCT
)
For example:
CREATE STREAM ORDERS_PROCESSED AS
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS ORDER_TS,
CAST(ORDER_VALUE AS DECIMAL(9,2)) AS ORDER_VALUE_USD,
ORDER->ADDRESS AS ADDRESS
FROM ORDERS;
S01E03 : Joins #
Using ksqlDB you can enrich messages on a Kafka topic with reference data held in another topic. This could come from a database, message queue, producer API, etc. With JOIN clause you can define relationships between streams and/or tables in ksqlDB (which are built on topics in Kafka). For example:
CREATE STREAM ORDERS_ENRICHED AS
SELECT O.ORDERTIME AS ORDER_TIMESTAMP,
O.ORDERID,
I.MAKE,
O.ORDERUNITS,
O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
FROM ORDERS O
INNER JOIN ITEM_REFERENCE_01 I
ON O.ITEMID = I.ITEM_ID
PARTITION BY ORDERID;
S01E04 : Integration with other systems #
ksqlDB can pull data in from other systems (e.g. databases, JMS message queues, etc etc), and push data down to other systems (NoSQL stores, Elasticsearch, databases, Neo4j, etc etc). This is done using Kafka Connect, which can be run embedded within ksqlDB or as a separate cluster of workers. ksqlDB can be used to create and control the connectors. For example:
CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'ORDERS_ENRICHED',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc'
);
S01E05 : Reserialising Streams #
Using ksqlDB you can reserialise data in Apache Kafka topics. For example, you can take a stream of CSV data and write it to a new topic in Avro. ksqlDB supports many serialisation formats including Avro, Protobuf, JSON Schema, JSON, and Delimited (CSV, TSV, etc). For example:
CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT='DELIMITED',
KAFKA_TOPIC='orders_csv') AS
SELECT * FROM ORDERS;
S01E06 : Split and Merge Kafka Topics #
You can split streams of data in Apache Kafka based on values in a field using ksqlDB. You can also merge separate streams of data together into one.
-
Splitting a stream:
CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS WHERE COUNTRY='UK';
CREATE STREAM ORDERS_OTHER AS SELECT * FROM ORDERS WHERE COUNTRY!='UK';
-
Merging streams
CREATE STREAM INVENTORY_COMBINED AS SELECT 'WH1' AS SOURCE, * FROM INVENTORY_WH1;
CREATE STREAM INVENTORY_COMBINED AS SELECT 'WH2' AS SOURCE, * FROM INVENTORY_WH2;
S01E07 : Aggregates #
Using ksqlDB you can build stateful aggregations of state on events in Apache Kafka topics. These are persisted as Kafka topics and held in a state store within ksqlDB that you can query directly or from an external application using the Java client or REST API.
ksqlDB uses SQL to describe the stream processing that you want to do. For example:
CREATE TABLE ORDERS_BY_MAKE AS
SELECT MAKE,
COUNT(*) AS ORDER_COUNT,
SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
FROM ORDERS
GROUP BY MAKE;
S01E08: Time Handling #
When you do processing in ksqlDB that is based on time (such as windowed aggregations, or stream-stream joins) it is important that you define correctly the timestamp by which you want your data to be processed. This could be the timestamp that’s part of the Kafka message metadata, or it could be a field in the value of the Kafka message itself.
By default ksqlDB will use the timestamp of the Kafka message. You can change this by specifying WITH (TIMESTAMP='…'
in your CREATE STREAM statement, and instead identify a value field to use as the timestamp.
Use the ROWTIME
system field to view the timestamp of the ksqlDB row.
References & Links #
Questions? #
👉 Head over to the Confluent Community forum or Slack group.