· kafka

Kafka: Writing data to a topic from the command line

I’ve been doing more Apache Pinot documentation - this time covering the JSON functions - and I needed to quickly write some data into Kafka to test things out. I’d normally do that using the Python Kafka client, but this time I wanted to do it using only command line tools. So that’s what we’ll be doing in this blog post and it’s more for future me than anyone else!

kafka banner
Figure 1. Kafka: Writing data to a topic from the command line

Setup

Let’s start by spinning up Kafka using Docker compose:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: zookeeper-blog
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: wurstmeister/kafka:latest
    restart: unless-stopped
    container_name: "kafka-blog"
    ports:
      - "9092:9092"
    expose:
      - "9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-blog:2181/kafka
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_HOST_NAME: kafka-blog
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-blog:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT

We can spin up that container by running the following command:

docker-compose up

Write messages

Now it’s time to import some data using the kafka-console-producer.sh script.

If we want to ingest a single message to the docs topic, we can run the following:

echo '{"timestamp": "2019-10-09 21:25:25", "meta": {"age": 12}}' |
docker exec -i kafka-blog /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic docs

But what about if we have a file full of documents, as shown in the file below:

documents.json
{"timestamp": "2019-10-09 22:25:25", "meta": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "meta": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "meta": {"age": 16}}

We can pipe those into kafka-console-producer.sh like this:

docker exec -i kafka-blog /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic docs \
< documents.json

Read messages

So far, so good. We haven’t go any errors, but let’s double check that our messages have made it into Kafka.

We can check the offsets (which in this case indicates the number of documents ingested) for the docs topic by running the following command:

docker exec -it kafka-blog kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic docs
Output
docs:0:4

The output indicates that partition 0 of the docs topic has an offset of 4, which means this partition of the topic contains 4 documents. We imported one message on its own and three from documents.json, so that’s what we’d expect.

We can return a maximum of 4 messages (i.e. all of them) from the docs topic, like so:

docker exec -i kafka-blog /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic docs \
  --from-beginning \
  --max-messages 4
Output
{"timestamp": "2019-10-09 21:25:25", "meta": {"age": 12}}
{"timestamp": "2019-10-09 22:25:25", "meta": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "meta": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "meta": {"age": 16}}
Processed a total of 4 messages
  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket