Apache Pinot: Resetting a segment after an invalid JSON Transformation
I recently had a typo in a Pinot ingestion transformation function and wanted to have Pinot re-process the Kafka stream without having to restart all the things. In this blog post we’ll learn how to do that.
Setup
We’re going to spin up a local instance of Pinot and Kafka using the following Docker compose config:
version: '3.7'
services:
zookeeper:
image: zookeeper:3.5.6
hostname: zookeeper
container_name: zookeeper-json
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
restart: unless-stopped
container_name: "kafka-json"
ports:
- "9092:9092"
expose:
- "9093"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper-json:2181/kafka
KAFKA_BROKER_ID: 0
KAFKA_ADVERTISED_HOST_NAME: kafka-json
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-json:9093,OUTSIDE://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
pinot-controller:
image: apachepinot/pinot:0.9.3
command: "StartController -zkAddress zookeeper-json:2181"
container_name: "pinot-controller-json"
volumes:
- ./config:/config
restart: unless-stopped
ports:
- "9000:9000"
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.9.3
command: "StartBroker -zkAddress zookeeper-json:2181"
restart: unless-stopped
container_name: "pinot-broker-json"
ports:
- "8099:8099"
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.9.3
command: "StartServer -zkAddress zookeeper-json:2181"
restart: unless-stopped
container_name: "pinot-server-json"
depends_on:
- pinot-broker
We can launch all the components by running the following command:
docker-compose up
Create Schema
We’re going to use the following schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "age",
"dataType": "LONG"
}
],
"dateTimeFieldSpecs": [{
"name": "timestamp",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
It’s only small, but it will be enough for our purposes. We can create the schema by running the following command:
docker exec -it pinot-controller-json bin/pinot-admin.sh AddSchema \
-schemaFile /config/schema.json -exec
Create Table
Now let’s create a real-time table based on that schema:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "age", "transformFunction": "JSONPATHLONG(payload, '$.ages')" } (1)
]
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-json:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
}
},
"tenants": {},
"metadata": {}
}
1 | Typo in the JSON path ('$.ages' ) means that an exception will be thrown when the function is executed. |
== For documentation on the JSONPATHLONG function, see the JSONPATHLONG function page. == |
We can create the table by running the following command:
docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-exec
Ingest Data into Kafka
Now let’s ingest a few messages into the Kafka events
topic:
printf '{"timestamp": "2019-10-09 22:25:25", "payload": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "payload": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "payload": {"age": 16}}\n' |
docker exec -i kafka-json /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events
We can check that the messages have been ingested by running the following command:
docker exec -i kafka-json /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic events \
--from-beginning \
--max-messages 3
{"timestamp": "2019-10-09 22:25:25", "payload": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "payload": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "payload": {"age": 16}}
Processed a total of 3 messages
All good so far.
Let’s navigate to http://localhost:9000/#/query and query the events
table:
Hmmm, no documents.
The Debug API
We can find out what’s going on by querying the debug API for this table:
curl -X GET "http://localhost:9000/debug/tables/events?verbosity=0" -H "accept: application/json"
[
{
"tableName": "events_REALTIME",
"numSegments": 1,
"numServers": 1,
"numBrokers": 1,
"segmentDebugInfos": [
{
"segmentName": "events__0__0__20220131T1057Z",
"serverState": {
"Server_172.24.0.6_8098": {
"idealState": "CONSUMING",
"externalView": "CONSUMING",
"segmentSize": "0 bytes",
"consumerInfo": {
"segmentName": "events__0__0__20220131T1057Z",
"consumerState": "CONSUMING",
"lastConsumedTimestamp": 1643626843673,
"partitionToOffsetMap": {
"0": "3"
}
},
"errorInfo": {
"timestamp": "2022-01-31 10:57:50 UTC",
"errorMessage": "Caught exception while transforming the record: {\n \"nullValueFields\" : [ ],\n \"fieldToValueMap\" : {\n \"payload\" : {\n \"age\" : 16\n },\n \"age\" : null,\n \"timestamp\" : \"2019-10-09 23:40:25\"\n }\n}",
"stackTrace": "java.lang.RuntimeException: Caught exception while executing function: jsonPathLong(payload,'$.ages')\n\tat org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:124)\n\tat org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:88)\n\tat org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:96)\n\tat org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:83)\n\tat org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:518)\n\tat org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:420)\n\tat org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:568)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.IllegalStateException: Caught exception while invoking method: public static long org.apache.pinot.common.function.scalar.JsonFunctions.jsonPathLong(java.lang.Object,java.lang.String) with arguments: [{age=16}, $.ages]\n\tat org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:131)\n\tat org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:122)\n\t... 7 more\nCaused by: java.lang.reflect.InvocationTargetException\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:128)\n\t... 8 more\nCaused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['ages']\n\tat com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)\n\tat com.jayway.jsonpath.JsonPath.read(JsonPath.java:187)\n\tat com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:102)\n\tat com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:85)\n\tat org.apache.pinot.common.function.scalar.JsonFunctions.jsonPath(JsonFunctions.java:89)\n\tat org.apache.pinot.common.function.scalar.JsonFunctions.jsonPathLong(JsonFunctions.java:152)\n\t... 13 more\n"
}
}
}
}
],
"serverDebugInfos": [],
"brokerDebugInfos": [],
"tableSize": {
"reportedSize": "0 bytes",
"estimatedSize": "0 bytes"
},
"ingestionStatus": {
"ingestionState": "HEALTHY",
"errorMessage": ""
}
}
]
As expected, Pinot failed to find the ages
property because it doesn’t exist in those messages.
Let’s fix the transformation by applying the following table config:
docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table-fixed.json \
-exec
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "age", "transformFunction": "JSONPATHLONG(payload, '$.age')" } (1)
]
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-json:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants": {},
"metadata": {}
}
1 | Typo has now been fixed |
This fixes the table config, but doesn’t retrospectively ingest the messages where the exception was thrown. We can return the Kafka offset of the consuming segment by running the following:
curl -X GET "http://localhost:9000/tables/events/consumingSegmentsInfo" -H "accept: application/json"
{
"_segmentToConsumingInfoMap": {
"events__0__0__20220131T1057Z": [
{
"serverName": "Server_172.24.0.6_8098",
"consumerState": "CONSUMING",
"lastConsumedTimestamp": 1643627394569,
"partitionToOffsetMap": {
"0": "3"
}
}
]
}
}
The offset for partition 0 is 3
, but we want to process offsets 0-2.
Resetting the consuming segment
To do that we’ll need to reset the consuming segment, by running the following command:
curl -X POST "http://localhost:9000/segments/events_REALTIME/events__0__0__20220131T1057Z/reset" -H "accept: application/json"
{"status":"Successfully reset segment: events__0__0__20220131T1057Z of table: events_REALTIME"}
If we now go back to the query editor we’ll see that those documents have now been ingested:
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.