Flink SQL: Exporting nested JSON to a Kafka topic
I’ve been playing around with Flink as part of a workshop that I’m doing at JFokus in a couple of weeks and I wanted to export some data from Flink to Apache Kafka in a nested format. In this blog post we’ll learn how to do that.
Setup
We’re going to be using the following Docker Compose config:
version: "3"
services:
zookeeper:
image: zookeeper:latest
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
kafka:
image: confluentinc/cp-kafka:7.1.0
hostname: kafka
container_name: kafka
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
depends_on:
[zookeeper]
healthcheck: {test: nc -z localhost 9092, interval: 1s, start_period: 120s}
sql-client:
container_name: 'flink-sql-client'
build:
context: .
dockerfile: sql-client/Dockerfile
depends_on:
- jobmanager
environment:
FLINK_JOBMANAGER_HOST: jobmanager
volumes:
- ./flink/settings/:/settings
jobmanager:
image: flink:1.16.0-scala_2.12-java11
container_name: 'flink-jobmanager'
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 30
volumes:
- ./flink/settings/:/settings
- ./flink/data/:/data
taskmanager:
image: flink:1.16.0-scala_2.12-java11
container_name: 'flink-taskmanager'
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 30
volumes:
- ./flink/settings/:/settings
- ./flink/data/:/data
The Flink config used here is adapted Francesco Tisiot’s repository at github.com/aiven/sql-cli-for-apache-flink-docker/, so thanks Francesco!
Let’s get started:
docker-compose up
Data Generation
We’re going to generate data using the github.com/mneedham/livestream-data-generator repository, which simulates users joining and leaving live stream events. Once we’ve checked out that repository, we can install the dependencies:
pip install -r requirements.txt
And then run the data generator:
python loop.py \
--timeout 1 \
--users 1000 \
--events 100 \
--max-start-delay 0 \
--min-event-length 60 \
--max-event-length 180
We’ll see output similar to this:
{"eventTime": "2023-01-23T10:15:24.089789", "eventId": "89653462-d58c-4751-974b-cc94d9fa9a11", "userId": "cf29d9e5-4f52-43cf-99c6-b6138ae612eb", "name": "Beverly Kelley", "lat": "51.5085", "lng": "-0.1257", "city": "London", "region": "England", "action": "Join"}
{"eventTime": "2023-01-23T10:15:24.048042", "eventId": "3bf77680-7664-44a2-b5eb-7281fb759999", "userId": "31ab115f-b7a4-48a6-a282-5e5a3c15ddf0", "name": "Jeffery Adams", "lat": "32.5530", "lng": "-92.0422", "city": "Monroe", "region": "Louisiana", "action": "Join"}
{"eventTime": "2023-01-23T10:15:24.033714", "eventId": "0283e8e6-14a7-4d8e-8aab-5d40d38eb52d", "userId": "0acad44c-2545-4c81-bd3f-33385d21160f", "name": "Alexander Fuller", "lat": "43.2501", "lng": "-79.8496", "city": "Hamilton", "region": "Ontario", "action": "Join"}
{"eventTime": "2023-01-23T10:15:23.979862", "eventId": "bf061b6c-b03d-43d7-9d04-f4f8c71a9ab0", "userId": "0a9f8527-a2b4-4c5f-b82b-2a3930182c62", "name": "Julie Grant", "lat": "35.6910", "lng": "139.7679", "city": "Tokyo", "region": "Tokyo", "action": "Join"}
{"eventTime": "2023-01-23T10:15:24.075753", "eventId": "868320eb-96b2-496f-af72-9cd83d9726c0", "userId": "fc20e945-f4ce-4c71-9a55-8f7253583c1c", "name": "Natalie Martinez", "lat": "39.9690", "lng": "-83.0114", "city": "Columbus", "region": "Ohio", "action": "Join"}
Ingesting data into Kafka
python loop.py \
--timeout 1 \
--users 1000 \
--events 100 \
--max-start-delay 0 \
--min-event-length 60 \
--max-event-length 180 |
jq -cr --arg sep 😊 '[.eventId, tostring] | join($sep)' |
kcat -P -b localhost:29092 -t events -K😊
Note
|
I’ve created a video showing how ingest data using this technique on my YouTube channel, Learn Data with Mark, which is embedded below. |
We can also use kcat to check that the data has made its way into Kafka:
kcat -C -b localhost:29092 -t events -c3 | jq
{
"eventTime": "2023-01-24T11:13:05.213589",
"eventId": "ebfec380-c3f1-4471-b30b-da822db57117",
"userId": "983aa5c2-4f98-4507-937b-d60f41e1407e",
"name": "Joanne Walters",
"lat": "42.2399",
"lng": "-83.1508",
"city": "Dearborn",
"region": "Michigan",
"action": "Join"
}
{
"eventTime": "2023-01-24T11:13:05.216398",
"eventId": "9cc0a7f5-0638-4b27-a897-2a61ddab98ac",
"userId": "1043a9d5-e722-4d86-91f6-8926046050d5",
"name": "Michael Miller",
"lat": "34.0498",
"lng": "-117.4706",
"city": "Fontana",
"region": "California",
"action": "Join"
}
{
"eventTime": "2023-01-24T11:13:05.291973",
"eventId": "2bb4a79b-f5ec-488e-84ee-72bf2fb8c293",
"userId": "cd0e36ff-9171-41b5-91d2-bd32feaae17c",
"name": "David Trujillo",
"lat": "35.9139",
"lng": "47.0239",
"city": "Dīvāndarreh",
"region": "Kordestān",
"action": "Join"
}
So far, so good.
Transforming stream with Flink
Now let’s say that we want to tranform these events to have a nested structure like this:
{
"event": {
"time": "2023-01-24T11:13:05.213589",
"id": "ebfec380-c3f1-4471-b30b-da822db57117"
},
"user": {
"id": "983aa5c2-4f98-4507-937b-d60f41e1407e",
"name": "Joanne Walters",
"lat": "42.2399",
"lng": "-83.1508",
"city": "Dearborn",
"region": "Michigan",
},
"action": "Join"
}
We’re going to do this using Flink, a popular stateful stream processor. We can interact with Flink via its SQL client:
docker exec -it flink-sql-client /opt/sql-client/sql-client.sh
Create a table on the events
stream:
CREATE TABLE Events (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`eventTime` STRING,
`eventId` STRING,
`userId` STRING,
`name` STRING,
`lat` DOUBLE,
`lng` DOUBLE,
`city` STRING,
`region` STRING,
`action` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'eventsGroup=',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
We can query this table to view some of the events:
select eventTime, eventId, userId, name, lat, lng, city, region, action
FROM Events
LIMIT 5;
eventTime | eventId | userId | name | lat | lng | city | region | action |
---|---|---|---|---|---|---|---|---|
2023-01-24T11:35:21.017729 |
6a8fbce6-aa20-46eb-a201-75807~ |
ff8663b0-4376-448a-a443-c14bb |
Terri Simmons |
-27.4679 |
153.0281 |
Brisbane |
Queensland |
Join |
2023-01-24T11:35:21.037883 |
9d46a3ad-63f6-436e-9623-e2338~ |
63aa6903-9e37-4009-8571-bbf1c |
Matthew Baldwin |
35.691 |
139.7679 |
Tokyo |
Tokyo |
Join |
2023-01-24T11:35:21.049036 |
dbb56840-130c-4356-9b36-ddec6~ |
3a748d48-7adb-45aa-8d9d-e0021 |
Leah Cook |
35.0202 |
135.734 |
Kyoto |
Kyoto |
Join |
2023-01-24T11:35:21.029667 |
6994ffa4-3c0d-44a6-9cc0-ab866~ |
3d322e87-f090-4484-8ea1-c7d15 |
Kendra Crawford |
32.0809 |
34.7806 |
Tel Aviv |
Tel Aviv |
Join |
2023-01-24T11:35:20.988061 |
3d443ffc-f1cd-4ee4-8675-ac99e~ |
a41f260f-5b57-4e69-9bb2-31715 |
Jeremy Jones |
30.6667 |
104.0667 |
Chengdu |
Sichuan |
Join |
We can use the map
function to massage this data into the nested structure.
The following query does this:
SELECT map[
'time', eventTime,
'id', eventId
] AS event,
map [
'id', userId,
'name', name,
'lat', lat,
'lng', lng,
'city', city,
'region', region
] AS `user`,
action
FROM Events
LIMIT 1;
If we run this query, we’ll get the following exception:
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Parameters must be of the same type
The problem is that it expects all the value to be of the same type and the lat
and lng
fields are doubles.
The easiest solution is to cast these values to strings, as shown below:
SELECT map[
'time', eventTime,
'id', eventId
] AS event,
map [
'id', userId,
'name', name,
'lat', CAST(lat AS STRING),
'lng', CAST(lng AS STRING),
'city', city,
'region', region
] AS `user`,
action
FROM Events
LIMIT 1;
Now let’s create a table that exports its contents to another Kafka topic called events-nested
;
CREATE TABLE EventsNested (
`user` MAP<STRING,STRING>,
`event` MAP<STRING,STRING>,
`action` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'events-nested',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'testGroup',
'value.format' = 'json'
);
And now let’s ingest the previous query into that table:
INSERT INTO EventsNested
SELECT map[
'time', eventTime,
'id', eventId
] AS event,
map [
'id', userId,
'name', name,
'lat', CAST(lat AS STRING),
'lng', CAST(lng AS STRING),
'city', city,
'region', region
] AS `user`,
action
FROM Events;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f2e6f707a9c69b84f4d6e9ca7bc34fc6
We can then check that data is making its way into that topic using kcat:
kcat -C -b localhost:29092 -t events-nested -c1 | jq
{
"user": {
"time": "2023-01-24T11:35:21.017729",
"id": "6a8fbce6-aa20-46eb-a201-7580762c2a16"
},
"event": {
"city": "Brisbane",
"lng": "153.0281",
"id": "ff8663b0-4376-448a-a443-c14bb0cee0cc",
"region": "Queensland",
"name": "Terri Simmons",
"lat": "-27.4679"
},
"action": "Join"
}
Job done!
In Summary
The map structure is very helpful for creating nested structures, but it took me a little while to figure out how to use it. Hopefully this blog post will save you going through that same journey of exploration.
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.