Quix Streams: Process certain number of Kafka messages
In a recent demo, I wanted to use Quix Streams to process a specified number of messages from a Kafka topic, write a message to another stream, and then exit the Quix app. This is an unusual use of Quix Streams, so it took me a while to figure out how to do it.
Let’s assume we have a Kafka broker running.
We’ll create a couple of topics using the rpk
tool:
rpk topic create events -p1
rpk topic create massaged-events -p1
Now, we’re going to start by installing Quix Streams and Click, a library for processing command line arguments:
pip install quixstreams click
We’re going to start by creating a version of the application that processes all messages.
Create a file called massage.py
and add the following imports
import quixstreams as qx
from quixstreams import StreamConsumer, EventData, CommitMode
import json
import click
Next, let’s create a 'main' method that’s going to bootstrap the app:
@click.command()
def run_app():
global client, topic_consumer, producer
client = qx.KafkaStreamingClient('127.0.0.1:9092') (1)
topic_consumer = client.get_topic_consumer( (2)
topic="events",
auto_offset_reset=qx.AutoOffsetReset.Earliest,
consumer_group="events-consumer",
commit_settings=CommitMode.Manual
)
producer = client.get_raw_topic_producer("massaged-events") (3)
print("Listening to streams. Press CTRL-C to exit.")
topic_consumer.on_stream_received = on_stream_received_handler
topic_consumer.subscribe()
qx.App.run(before_shutdown=before_shutdown)
1 | Create Kafka client |
2 | Create consumer for the events topic |
3 | Create producer for the massaged-events topic |
We then need to add the following functions to process each message and handle the shutdown of the app:
def on_event_data_received_handler(stream: StreamConsumer, data: EventData):
with data:
payload = json.loads(data.value)
payload["count"] *= 2 (1)
message = qx.RawMessage(json.dumps(payload, indent=2).encode('utf-8')) (2)
message.key = str(payload["id"]).encode('utf-8')
producer.publish(message) (3)
topic_consumer.commit()
def on_stream_received_handler(stream_received: StreamConsumer):
stream_received.events.on_data_received = on_event_data_received_handler
def before_shutdown():
print('before shutdown')
topic_consumer.dispose()
producer.dispose()
1 | Multiply the count property by 2 |
2 | Create a new message |
3 | Publish the message to the massaged-events topic |
Finally, let’s call the 'main' function:
if __name__ == "__main__":
run_app()
If we run this script, it will process any messages received by the events
topic and write a new message to massaged-events
with the count
property doubled.
Let’s ingest a message into Kafka to see if it works:
echo '{"id": 1, "count": 4}' |
jq -cr --arg sep ø '[.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
And now we’ll check the contents of the events
and massaged-events
topics:
kcat -C -b localhost:9092 -t events -e | jq -c
{"id":1,"count":4}
kcat -C -b localhost:9092 -t massaged-events -e | jq -c
{"id":1,"count":8}
So far, so good.
If we write any more messages to the events
topic they will automatically be processed as well.
But now we want to update our script so that we can specify how many messages to consume before stopping. If we then run the script again, it will continue from where we left off because our topic consumer was created with a consumer group that’s keeping track of the last read offset. We can return that offset by running the following command:
rpk group describe events-consumer
GROUP events-consumer
COORDINATOR 0
STATE Empty
BALANCER
MEMBERS 0
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG MEMBER-ID CLIENT-ID HOST
events 0 1 1 0
Let’s update our imports:
from quixstreams import StreamConsumer, EventData, CancellationTokenSource, CommitMode
import threading
And now we’ll update the run_app
function to look like this:
@click.command()
@click.option('--number-events', default=1)
def run_app(number_events):
global client, topic_consumer, producer
global events_to_consume, events_consumed, thread_lock, cancellation_thread
client = qx.KafkaStreamingClient('127.0.0.1:9092')
topic_consumer = client.get_topic_consumer(
topic="events",
auto_offset_reset=qx.AutoOffsetReset.Earliest,
consumer_group="events-consumer",
commit_settings=CommitMode.Manual
)
producer = client.get_raw_topic_producer("massaged-events")
thread_lock = threading.Lock()
cts = CancellationTokenSource() (1)
cancellation_thread = threading.Thread(target=lambda: cts.cancel()) (2)
events_to_consume = number_events
events_consumed = 0
print("Listening to streams. Press CTRL-C to exit.")
topic_consumer.on_stream_received = on_stream_received_handler
topic_consumer.subscribe()
qx.App.run(cts.token, before_shutdown=before_shutdown)
if cancellation_thread.is_alive(): (3)
cancellation_thread.join()
1 | Cancellation token used to stop message processing |
2 | Cancellation thread which will trigger the cancellation token |
3 | Join the cancellation thread to the main thread before exiting |
And the on_event_data_received_handler
needs to be updated to keep track the messages consumed:
def on_event_data_received_handler(stream: StreamConsumer, data: EventData):
global events_consumed
with data:
payload = json.loads(data.value)
payload["count"] *= 2
message = qx.RawMessage(json.dumps(payload, indent=2).encode('utf-8'))
message.key = str(payload["id"]).encode('utf-8')
producer.publish(message)
topic_consumer.commit()
with thread_lock:
events_consumed += 1 (1)
if events_consumed >= events_to_consume: (2)
if not cancellation_thread.is_alive():
cancellation_thread.start() (3)
print("Cancellation token triggered")
return
1 | Increment the number of messages consumed |
2 | Check if we’ve exceeded the count |
3 | Trigger the cancellation thread, which will cancel the token |
We can then call our Python script like this to process one event:
python massage.py --number-events 1
Let’s now add another message to Kafka:
echo '{"id": 42, "count": 9000}' |
jq -cr --arg sep ø '[.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
We’ll see the following output from massage.py
:
Listening to streams. Press CTRL-C to exit.
Cancellation token triggered
before shutdown
And if we look at the massaged-events
topic, it now has the following message:
{"id":42,"count":18000}
You can find the full code in this GitHub Gist.
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.