Kafka: Writing data to a topic from the command line
I’ve been doing more Apache Pinot documentation - this time covering the JSON functions - and I needed to quickly write some data into Kafka to test things out. I’d normally do that using the Python Kafka client, but this time I wanted to do it using only command line tools. So that’s what we’ll be doing in this blog post and it’s more for future me than anyone else!
Setup
Let’s start by spinning up Kafka using Docker compose:
version: '3.7'
services:
zookeeper:
image: zookeeper:3.5.6
hostname: zookeeper
container_name: zookeeper-blog
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
restart: unless-stopped
container_name: "kafka-blog"
ports:
- "9092:9092"
expose:
- "9093"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper-blog:2181/kafka
KAFKA_BROKER_ID: 0
KAFKA_ADVERTISED_HOST_NAME: kafka-blog
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-blog:9093,OUTSIDE://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
We can spin up that container by running the following command:
docker-compose up
Write messages
Now it’s time to import some data using the kafka-console-producer.sh
script.
If we want to ingest a single message to the docs
topic, we can run the following:
echo '{"timestamp": "2019-10-09 21:25:25", "meta": {"age": 12}}' |
docker exec -i kafka-blog /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic docs
But what about if we have a file full of documents, as shown in the file below:
{"timestamp": "2019-10-09 22:25:25", "meta": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "meta": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "meta": {"age": 16}}
We can pipe those into kafka-console-producer.sh
like this:
docker exec -i kafka-blog /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic docs \
< documents.json
Read messages
So far, so good. We haven’t go any errors, but let’s double check that our messages have made it into Kafka.
We can check the offsets (which in this case indicates the number of documents ingested) for the docs
topic by running the following command:
docker exec -it kafka-blog kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic docs
docs:0:4
The output indicates that partition 0
of the docs
topic has an offset of 4, which means this partition of the topic contains 4 documents.
We imported one message on its own and three from documents.json
, so that’s what we’d expect.
We can return a maximum of 4 messages (i.e. all of them) from the docs
topic, like so:
docker exec -i kafka-blog /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic docs \
--from-beginning \
--max-messages 4
{"timestamp": "2019-10-09 21:25:25", "meta": {"age": 12}}
{"timestamp": "2019-10-09 22:25:25", "meta": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "meta": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "meta": {"age": 16}}
Processed a total of 4 messages
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.