Debezium: Capture changes from MySQL
I’ve been working on a Real-Time Analytics workshop that I’m going to be presenting at the ODSC Europe conference in June 2023 and I wanted to have Debezium publish records from a MySQL database without including the schema.
I’m using the debezium/connect:2.3
Docker image to run Debezium locally and I have a MySQL database running with the hostname mysql
on port 3306
.
Below is the way that I configured this:
curl -X PUT \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/mysql/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": 3306,
"database.user": "debezium",
"database.password": "dbz",
"database.server.name": "mysql",
"database.server.id": "223344",
"database.allowPublicKeyRetrieval": true,
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "mysql-history",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "mysql-schema-history",
"database.include.list": "pizzashop",
"time.precision.mode": "connect",
"topic.prefix": "mysql",
"include.schema.changes": false
}'
My database pizzashop
contains the tables products
and users
, which will map to the following Kafka topics:
-
products
→mysql.pizzashop.products
-
users
→mysql.pizzashop.users
We can use kcat
to see the messages published to these topics:
kcat -C -b localhost:29092 -t mysql.pizzashop.users -c1 -u | jq
{
"before": null,
"after": {
"id": 1,
"first_name": "Kismat",
"last_name": "Shroff",
"email": "drishyamallick@hotmail.com",
"residence": "575, Edwin Circle, Bahraich-255519",
"lat": "TYu5Xg==",
"lon": "Ac6oUak=",
"created_at": "2023-05-31T08:44:19Z",
"updated_at": 1685522659000
},
"source": {
"version": "2.3.0.Beta1",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1685524951000,
"snapshot": "first_in_data_collection",
"db": "pizzashop",
"sequence": null,
"table": "users",
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 156,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1685524951843,
"transaction": null
}
We can then go to our MySQL table and update one of the products:
update products
set name = 'Super Awesome Moroccan Pasta Pizza - Veg'
where id = 1;
And then if we query the Kafka topic again, we’ll see this output:
{
"before": {
"id": 1,
"name": "Moroccan Spice Pasta Pizza - Veg",
"description": "A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.",
"category": "veg pizzas",
"price": 335,
"image": "https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg",
"created_at": "2023-05-31T08:44:19Z",
"updated_at": 1685522659000
},
"after": {
"id": 1,
"name": "Super Awesome Moroccan Pasta Pizza - Veg",
"description": "A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.",
"category": "veg pizzas",
"price": 335,
"image": "https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg",
"created_at": "2023-05-31T08:44:19Z",
"updated_at": 1685527662000
},
"source": {
"version": "2.3.0.Beta1",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1685527662000,
"snapshot": "false",
"db": "pizzashop",
"sequence": null,
"table": "products",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 414,
"row": 0,
"thread": 4714,
"query": null
},
"op": "u",
"ts_ms": 1685527691344,
"transaction": null
}
Let’s now delete one record:
DELETE FROM pizzashop.products WHERE id =1;
And back to Kafka:
{
"before": {
"id": 1,
"name": "Super Awesome Moroccan Pasta Pizza - Veg",
"description": "A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.",
"category": "veg pizzas",
"price": 335,
"image": "https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg",
"created_at": "2023-05-31T08:44:19Z",
"updated_at": 1685527662000
},
"after": null,
"source": {
"version": "2.3.0.Beta1",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1685527882000,
"snapshot": "false",
"db": "pizzashop",
"sequence": null,
"table": "products",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 1143,
"row": 0,
"thread": 4868,
"query": null
},
"op": "d",
"ts_ms": 1685527882035,
"transaction": null
}
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.