Confluent Kafka: DeprecationWarning: AvroProducer has been deprecated. Use AvroSerializer instead.
I’ve been creating a demo showing how to ingest Avro-encoded data from Apache Kafka into Apache Pinot and ran into a deprecation warning. In this blog post, I’ll show how to update code using the Confluent Kafka Python client to get rid of that warning.
I started by installing the following libraries:
pip install confluent-kafka avro urllib3 requests
And then my code to publish an Avro encoded event to Kafka looked like this:
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
schema_name = "telemetry.avsc"
producer_config = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081',
'broker.address.family': 'v4'
}
value_schema = avro.load(schema_name)
producer = AvroProducer(producer_config, default_value_schema=value_schema)
event = {
# all my fields
}
producer.produce(topic="telemetry", value=event)
When I ran this script data did make its way into Kafka, but I also got the following warning on the AvroProducer
line:
DeprecationWarning: AvroProducer has been deprecated. Use AvroSerializer instead.
It took me a little while to figure out where AvroSerializer
lived and how to use it, but I eventually ended up with the following code for creating the producer:
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import (
SerializationContext,
MessageField,
)
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
with open("telemetry.avsc") as f:
value_schema = f.read()
avro_serializer = AvroSerializer(schema_registry_client, value_schema)
producer_conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(producer_conf)
I ran this code and got the following error message:
Traceback (most recent call last):
File "/Users/markhneedham/projects/hugo-blog/blog/content/2023/07/25/new.py", line 5, in <module>
from confluent_kafka.schema_registry.avro import AvroSerializer
File "/Users/markhneedham/projects/hugo-blog/blog/content/2023/07/25/.venv/lib/python3.11/site-packages/confluent_kafka/schema_registry/avro.py", line 22, in <module>
from fastavro import (parse_schema,
ModuleNotFoundError: No module named 'fastavro'
Let’s get fastavro
installed:
pip install fastavro
When I ran the script again, the deprecation warning was gone, but I got the following error message instead:
Traceback (most recent call last):
File "/Users/markhneedham/projects/hugo-blog/blog/content/2023/07/25/new.py", line 47, in <module>
producer.produce(topic="telemetry", value=event)
TypeError: a bytes-like object is required, not 'dict'
In the previous API, our messages were encoded inside the producer, but here we need to do it explicitly using the Avro serialiser. We, therefore, need to update this line:
producer.produce(topic="telemetry", value=event)
To read like this:
producer.produce(
topic="telemetry",
value=avro_serializer(event, SerializationContext("telemetry", MessageField.VALUE)),
)
Now if I run the script, messages make their way into Kafka and I don’t have any warnings or errors!
Note
|
If you want to see the entirety of both code samples, I’ve included them both in a GitHub Gist. |
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.