· kafka python

Kafka: Python Consumer - No messages with group id/consumer group

When I’m learning a new technology, I often come across things that are incredibly confusing when I first come across them, but make complete sense afterwards. In this post I’ll explain my experience writing a Kafka consumer that wasn’t finding any messages when using consumer groups .

Setting up Kafka infrastructure

We’ll set up the Kafka infrastructure locally using the Docker Compose Template that I describe in my Kafka: A Basic Tutorial blog post. We can run the following command to do this:

git clone git@github.com:mneedham/basic-kafka-tutorial.git && cd basic-kafka-tutorial

And then launch the Docker containers using the following command:

docker-compose up

While that’s running let’s install the kafka-python library, which we’ll use to put messages onto a Kafka topic, as well as consume messages from that topic. We can install this library using the following command:

pip install kafka-python

Creating a Kafka topic

Now let’s create a topic named foobar, which we can do using the kafka-topics tool

docker exec broker-tutorial kafka-topics \
  --zookeeper zookeeper:2181 \
  --create \
  --topic foobar \
  --partitions 2 \
  --replication-factor 1
Output
Created topic "foobar".

Cool! Now we’re ready to write some messages to the topic.

Producing and consuming messages

The following code adds 10 JSON messages to the foobar topic:

from kafka import KafkaProducer
import uuid
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for _ in range(10):
    producer.send('foobar', {"id": str(uuid.uuid4())})
    producer.flush()

Let’s read the messages from the topic. The following code does this:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('foobar',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    consumer_timeout_ms=1000,
    value_deserializer = json.loads)

for msg in consumer:
    print(msg.value)

Note that we set auto_offset_reset to earliest so that our consumer will read all the messages from the beginning. If we don’t add this config our consumer will only see new messages. The output of running the consumer is below:

python dummy_consumer.py
Output
{'id': '939cd4fe-79e9-4050-a9bc-3f94b31f62e3'}
{'id': 'a90c71b3-4516-4df9-871d-047264f1d6b6'}
{'id': '06d45529-6888-4d2d-a4df-fec15b4b1d87'}
{'id': '25642eee-b51c-432a-89d1-d7a17c7ef30a'}
{'id': '03229045-94c9-4825-9cb6-ce495a68e7a9'}
{'id': '3b6875f4-0a64-443f-9206-3bf1a3d31dc8'}
{'id': '9f45a12b-97d2-4585-b296-a80ec5c0223c'}
{'id': '485b4946-18a9-47d1-a849-87a6fc60365a'}
{'id': 'b1c9d75a-d56e-4dd8-9e08-89b3a818fbb1'}
{'id': '14486560-b5cb-41fa-bd0e-845a424c8ed4'}

If we run that code again, we’ll see the same list of 10 messages. What about if we provide a consumer group? (using the group_id config) The following consumer reads from the foobar topic using a group id named blog_group:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('foobar',
    bootstrap_servers='localhost:9092',
    group_id='blog_group',
    auto_offset_reset='earliest',
    consumer_timeout_ms=10000,
    value_deserializer = json.loads)

for msg in consumer:
    print(msg.value)

The first time we run this script we’ll see those 10 messages, but if we run it again we won’t get any messages. The reason for this is that when we provide a group id, the broker keeps track of the current offset so that messages aren’t consumed twice. We can run the following command to see this:

$ docker exec broker-tutorial kafka-consumer-groups \
  --bootstrap-server broker:9093 \
  --group blog_group \
  --describe
Output
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
foobar         0          8               8               0               kafka-python-1.4.6-74f3f185-b563-40c8-958a-ad6c0c4815c0 /172.26.0.1     kafka-python-1.4.6
foobar         1          2               2               0               kafka-python-1.4.6-74f3f185-b563-40c8-958a-ad6c0c4815c0 /172.26.0.1     kafka-python-1.4.6

From this output we need to look at two columns:

  • CURRENT_OFFSET, which indicates the offset that our consumer has read up to

  • LOG-END-OFFSET, which indicates the maximum offset for that partition

If we want to consume all the messages on the foobar topic again, we’ll need to reset CURRENT_OFFSET back to 0.

Resetting offsets

We can do this by passing the --reset-offsets argument to kafka-consumer-groups. The following code shows what a dry run of this command will do:

docker exec broker-tutorial kafka-consumer-groups \
  --bootstrap-server broker:9093 \
  --group blog_group \
  --topic foobar \
  --reset-offsets \
  --to-earliest \
  --dry-run
Output
TOPIC                          PARTITION  NEW-OFFSET
foobar                        0          0
foobar                        1          0

And if we want to execute it for real, we need to change --dry-run to --execute:

docker exec broker-tutorial kafka-consumer-groups \
  --bootstrap-server broker:9093 \
  --group blog_group \
  --topic foobar \
  --reset-offsets \
  --to-earliest --execute
Output
TOPIC                          PARTITION  NEW-OFFSET
foobar                        0          0
foobar                        1          0

Once we’ve done this we can re-run our group id consumer and we’ll be able to read all the messages again.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket