Kafka: A basic tutorial
In this post we’re going to learn how to launch Kafka locally and write to and read from a topic using one of the Python drivers.
To make things easy for myself, I’ve created a Docker Compose template that launches 3 containers:
-
broker - our Kafka broker
-
zookeeper - used by Kafka for leader election
-
jupyter - notebooks for connecting to our Kafka broker
This template can be downloaded from the mneedham/basic-kafka-tutorial repository, and reads as follows:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper-tutorial
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka
hostname: broker
container_name: broker-tutorial
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
jupyter:
container_name: jupyter-tutorial
image: jupyter/scipy-notebook:${JUPYTER_VERSION:-latest}
volumes:
- ./notebooks:/home/jovyan/
ports:
- "8888:8888"
The easiest way to use this template is to first clone the repository locally using the following command:
git clone git@github.com:mneedham/basic-kafka-tutorial.git && cd basic-kafka-tutorial
And then launch the Docker containers using the following command:
docker-compose up
This command outputs a lot of stuff, and we’ll need to keep a close eye on the first few lines so that we can grab the Jupyter Notebook token. The output we’re looking for looks like this:
jupyter-tutorial | [I 10:35:27.804 NotebookApp] The Jupyter Notebook is running at:
jupyter-tutorial | [I 10:35:27.804 NotebookApp] http://(4a031e4b5326 or 127.0.0.1):8888/?token=7907fef53948168308c829d48d9978b8f9c475b7c621c7d1
jupyter-tutorial | [I 10:35:27.804 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
jupyter-tutorial | [C 10:35:27.811 NotebookApp]
jupyter-tutorial |
jupyter-tutorial | To access the notebook, open this file in a browser:
jupyter-tutorial | file:///home/jovyan/.local/share/jupyter/runtime/nbserver-6-open.html
jupyter-tutorial | Or copy and paste one of these URLs:
jupyter-tutorial | http://(4a031e4b5326 or 127.0.0.1):8888/?token=7907fef53948168308c829d48d9978b8f9c475b7c621c7d1
We then need to navigate to http://localhost:8888?token=<token>;
in our web browser.
Based on the output above we’d navigate to http://localhost:8888?token=7907fef53948168308c829d48d9978b8f9c475b7c621c7d1
.
Once we’ve done that we can open Kafka Tutorial.ipynb. Let’s go through that notebook.
Once we’ve installed the kafka-python
library we write the following functions to create a Kafka Producer and publish a message to a topic:
from kafka import KafkaConsumer, KafkaProducer
import json
import uuid
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(key, encoding='utf-8')
value_bytes = bytes(value, encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print('Message published successfully.')
except Exception as ex:
print('Exception in publishing message')
print(str(ex))
def connect_kafka_producer(server):
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=[server], api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
return _producer
Now we’ll create a producer:
server = 'broker:9093'
kafka_producer = connect_kafka_producer(server)
And publish a JSON message to the broker:
topic_name = "intro-to-kafka"
message = json.dumps({"name": "Mark"})
publish_message(kafka_producer, topic_name, str(uuid.uuid4()), message)
And now we’ll create a consumer to process all the message on that topic:
consumer = KafkaConsumer(topic_name,
auto_offset_reset='earliest',
bootstrap_servers=[server],
api_version=(0, 10),
value_deserializer = json.loads,
consumer_timeout_ms=1000)
for msg in consumer:
print(msg.key.decode("utf-8"), msg.value)
If we run this code we’ll see the following output:
716a46df-2b10-4270-b294-b04ced51bd73 {'name': 'Mark'}
And now we’re ready to go and do some more fun stuff with streams!
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.