Quix Streams: Consuming and Producing JSON messages
I’ve been meaning to take Quix Streams for a spin for a while and got the chance while building a recent demo. Quix Streams is a library for building streaming applications on time-series data, but I wanted to use it to do some basic consuming and producing of JSON messages. That’s what we’re going to do in this blog post.
We’re going to use Redpanda to store our messages. We’ll launch a Redpanda instance using the following Docker Compose file:
version: '3.7'
services:
redpanda:
container_name: "redpanda-quix"
image: docker.redpanda.com/vectorized/redpanda:v22.2.2
command:
- redpanda start
- --smp 1
- --overprovisioned
- --node-id 0
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
ports:
- 9093:9092
Run docker compose up
to start the cluster and then we’re going to create some topics:
rpk topic create \
-c cleanup.policy=compact \
-r 1 -p 5 \
events big-events
events
is where we’ll write our generated data and big-events
will be used later in the blog post.
Our data generator is shown below:
import datetime
import uuid
import random
import json
while True:
ts = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
id = str(uuid.uuid4())
count = random.randint(0, 1000)
print(
json.dumps({"tsString": ts, "uuid": id[:3], "count": count})
)
We can then call it to generate some data and ingest it into Redpanda using the kcat tool:
python datagen.py 2>/dev/null |
jq -cr --arg sep 😊 '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -K😊
Let’s check the messages have been ingested using the rpk tool:
rpk topic consume events --brokers localhost:9092 |
jq -Cc '.value | fromjson' |
head -n5
{"tsString":"2023-07-11T13:42:34.986675Z","uuid":"7cc","count":779}
{"tsString":"2023-07-11T13:42:34.986734Z","uuid":"122","count":234}
{"tsString":"2023-07-11T13:42:34.986811Z","uuid":"1b9","count":13}
{"tsString":"2023-07-11T13:42:34.986819Z","uuid":"22f","count":298}
{"tsString":"2023-07-11T13:42:34.986841Z","uuid":"688","count":986}
So far, so good. Next, let’s install Quix Streams:
pip install quixstreams
And now we’re going to create a little application that consumes data from the events
topic and writes to the big-events
topic any events that have a count
bigger than 500:
import quixstreams as qx
from quixstreams import StreamConsumer, EventData
import json
client = qx.KafkaStreamingClient('127.0.0.1:9092')
topic_consumer = client.get_topic_consumer( (1)
topic="events",
auto_offset_reset=qx.AutoOffsetReset.Earliest,
consumer_group="events-consumer-group"
)
def on_event_data_received_handler(stream: StreamConsumer, data: EventData):
with data:
payload = json.loads(data.value)
if payload["count"] > 500: (2)
with (producer := client.get_raw_topic_producer("big-events")): (3)
message = qx.RawMessage(json.dumps(payload, indent=2).encode('utf-8'))
message.key = payload["uuid"].encode('utf-8')
producer.publish(message) (4)
def on_stream_received_handler(stream_received: StreamConsumer):
stream_received.events.on_data_received = on_event_data_received_handler
print("Listening to streams. Press CTRL-C to exit.")
topic_consumer.on_stream_received = on_stream_received_handler
topic_consumer.subscribe()
qx.App.run()
1 | Create consumer for events topic |
2 | Only keep events that have a count bigger than 500 |
3 | Create producer for big-events topic |
4 | Publish to big-events |
Let’s run this file:
python filter_events.py
We can then check if any events have made it into the big-events
topic:
rpk topic consume big-events --brokers localhost:9092 |
jq -Cc '.value | fromjson' |
head -n5
{"tsString":"2023-07-11T13:42:34.986716Z","uuid":"277","count":794}
{"tsString":"2023-07-11T13:42:34.986975Z","uuid":"280","count":585}
{"tsString":"2023-07-11T13:42:34.987040Z","uuid":"92c","count":505}
{"tsString":"2023-07-11T13:42:34.987062Z","uuid":"d12","count":929}
{"tsString":"2023-07-11T13:42:34.987081Z","uuid":"6f6","count":582}
Job done!
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.