Kafka: Python Consumer - No messages with group id/consumer group
When I’m learning a new technology, I often come across things that are incredibly confusing when I first come across them, but make complete sense afterwards. In this post I’ll explain my experience writing a Kafka consumer that wasn’t finding any messages when using consumer groups .
Setting up Kafka infrastructure
We’ll set up the Kafka infrastructure locally using the Docker Compose Template that I describe in my Kafka: A Basic Tutorial blog post. We can run the following command to do this:
git clone git@github.com:mneedham/basic-kafka-tutorial.git && cd basic-kafka-tutorial
And then launch the Docker containers using the following command:
docker-compose up
While that’s running let’s install the kafka-python library, which we’ll use to put messages onto a Kafka topic, as well as consume messages from that topic. We can install this library using the following command:
pip install kafka-python
Creating a Kafka topic
Now let’s create a topic named foobar
, which we can do using the kafka-topics
tool
docker exec broker-tutorial kafka-topics \
--zookeeper zookeeper:2181 \
--create \
--topic foobar \
--partitions 2 \
--replication-factor 1
Created topic "foobar".
Cool! Now we’re ready to write some messages to the topic.
Producing and consuming messages
The following code adds 10 JSON messages to the foobar
topic:
from kafka import KafkaProducer
import uuid
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for _ in range(10):
producer.send('foobar', {"id": str(uuid.uuid4())})
producer.flush()
Let’s read the messages from the topic. The following code does this:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('foobar',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
consumer_timeout_ms=1000,
value_deserializer = json.loads)
for msg in consumer:
print(msg.value)
Note that we set auto_offset_reset
to earliest
so that our consumer will read all the messages from the beginning.
If we don’t add this config our consumer will only see new messages.
The output of running the consumer is below:
python dummy_consumer.py
{'id': '939cd4fe-79e9-4050-a9bc-3f94b31f62e3'}
{'id': 'a90c71b3-4516-4df9-871d-047264f1d6b6'}
{'id': '06d45529-6888-4d2d-a4df-fec15b4b1d87'}
{'id': '25642eee-b51c-432a-89d1-d7a17c7ef30a'}
{'id': '03229045-94c9-4825-9cb6-ce495a68e7a9'}
{'id': '3b6875f4-0a64-443f-9206-3bf1a3d31dc8'}
{'id': '9f45a12b-97d2-4585-b296-a80ec5c0223c'}
{'id': '485b4946-18a9-47d1-a849-87a6fc60365a'}
{'id': 'b1c9d75a-d56e-4dd8-9e08-89b3a818fbb1'}
{'id': '14486560-b5cb-41fa-bd0e-845a424c8ed4'}
If we run that code again, we’ll see the same list of 10 messages.
What about if we provide a consumer group? (using the group_id
config)
The following consumer reads from the foobar
topic using a group id named blog_group
:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('foobar',
bootstrap_servers='localhost:9092',
group_id='blog_group',
auto_offset_reset='earliest',
consumer_timeout_ms=10000,
value_deserializer = json.loads)
for msg in consumer:
print(msg.value)
The first time we run this script we’ll see those 10 messages, but if we run it again we won’t get any messages. The reason for this is that when we provide a group id, the broker keeps track of the current offset so that messages aren’t consumed twice. We can run the following command to see this:
$ docker exec broker-tutorial kafka-consumer-groups \
--bootstrap-server broker:9093 \
--group blog_group \
--describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
foobar 0 8 8 0 kafka-python-1.4.6-74f3f185-b563-40c8-958a-ad6c0c4815c0 /172.26.0.1 kafka-python-1.4.6
foobar 1 2 2 0 kafka-python-1.4.6-74f3f185-b563-40c8-958a-ad6c0c4815c0 /172.26.0.1 kafka-python-1.4.6
From this output we need to look at two columns:
-
CURRENT_OFFSET
, which indicates the offset that our consumer has read up to -
LOG-END-OFFSET
, which indicates the maximum offset for that partition
If we want to consume all the messages on the foobar
topic again, we’ll need to reset CURRENT_OFFSET
back to 0.
Resetting offsets
We can do this by passing the --reset-offsets
argument to kafka-consumer-groups.
The following code shows what a dry run of this command will do:
docker exec broker-tutorial kafka-consumer-groups \
--bootstrap-server broker:9093 \
--group blog_group \
--topic foobar \
--reset-offsets \
--to-earliest \
--dry-run
TOPIC PARTITION NEW-OFFSET
foobar 0 0
foobar 1 0
And if we want to execute it for real, we need to change --dry-run
to --execute
:
docker exec broker-tutorial kafka-consumer-groups \
--bootstrap-server broker:9093 \
--group blog_group \
--topic foobar \
--reset-offsets \
--to-earliest --execute
TOPIC PARTITION NEW-OFFSET
foobar 0 0
foobar 1 0
Once we’ve done this we can re-run our group id consumer and we’ll be able to read all the messages again.
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.