Apache Pinot: Geospatial - java.nio.BufferUnderflowException: null
I’ve been working on a blog post showing how to use Geospatial indexes in Apache Pinot and ran into an interesting exception that I’ll explain in this blog post.
Set up
But first, let’s take a look at the structure of the data that I’m ingesting from Apache Kafka. Below is an example of one of those events:
{
"trainCompany": "London Overground",
"atocCode": "LO",
"lat": 51.541615,
"lon": -0.122528896,
"ts": "2023-03-10 11:35:20",
"trainId": "202303107145241"
}
As you’ve probably guessed, I’m importing the locations of trains in the UK. I created the following schema:
{
"schemaName": "trains",
"dimensionFieldSpecs": [
{"name": "trainCompany", "dataType": "STRING"},
{"name": "trainId", "dataType": "STRING"},
{"name": "atocCode", "dataType": "STRING"},
{"name": "point", "dataType": "BYTES"}
],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
The first three columns are automatically mapped from the data source.
The point
column is going to store a Point
object based on the lat/lon values in the event.
We’ll create that object in the table config, which you can see below:
{
"tableName": "trains",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "trains",
"replication": "1",
"replicasPerPartition": "1"
},
"fieldConfigList": [
{
"name": "point",
"encodingType":"RAW",
"indexType":"H3",
"properties": {
"resolutions": "5"
}
}
],
"tableIndexConfig": {
"loadMode": "MMAP",
"noDictionaryColumns": ["point"],
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "trains",
"stream.kafka.broker.list": "kafka-geospatial:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows":"1000",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "point",
"transformFunction": "STPoint(lon, lat, 1)"
}
]
},
"tenants": {},
"metadata": {}
}
The function under ingestionConfig.transformConfigs
creates a Point Geomtry object, which is stored in the point
column.
We also create a Geospatial index on the point
column, which is defined under fieldConfigList
.
The BufferUnderflowException
Once I’d created this table the data started ingesting, but I was sometimes ending up with the following error on the Pinot server:
pinot-server-geospatial | 2023/03/09 12:49:05.889 ERROR [BaseCombineOperator] [pqw-5] Caught exception while processing query: QueryContext{_tableName='trains_REALTIME', _subquery=null, _selectExpressions=[ts, trainId, atocCode, trainCompany, stastext(point)], _aliasList=[null, null, null, null, null], _filter=stwithin(point,'84000000010000000600000000bfc097f3a00000004049c107e8691db8bfbbd01b7fffffff4049c10bc3b2dbd8bfbac631c00000014049bf9dcb81ef38bfc0ae8a200000014049be7fdbcf81e0bfc3b13b000000004049bf9dee86bbf8bfc097f3a00000004049c107e8691db8') = '1', _groupByExpressions=null, _havingFilter=null, _orderByExpressions=[ts DESC], _limit=10, _offset=0, _queryOptions={responseFormat=sql, groupByMode=sql, timeoutMs=10000}, _expressionOverrideHints={}, _explain=false}
pinot-server-geospatial | java.nio.BufferUnderflowException: null
pinot-server-geospatial | at java.nio.Buffer.nextGetIndex(Buffer.java:643) ~[?:?]
pinot-server-geospatial | at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:165) ~[?:?]
pinot-server-geospatial | at org.apache.pinot.segment.local.utils.GeometrySerializer.readGeometry(GeometrySerializer.java:83) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.segment.local.utils.GeometrySerializer.readGeometry(GeometrySerializer.java:79) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.segment.local.utils.GeometrySerializer.deserialize(GeometrySerializer.java:68) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.geospatial.transform.function.BaseBinaryGeoTransformFunction.transformGeometryToIntValuesSV(BaseBinaryGeoTransformFunction.java:99) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.geospatial.transform.function.StWithinFunction.transformToIntValuesSV(StWithinFunction.java:46) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator.processProjectionBlock(ExpressionScanDocIdIterator.java:140) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator.applyAnd(ExpressionScanDocIdIterator.java:120) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator.getFilterBlock(H3InclusionIndexFilterOperator.java:131) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator.getNextBlock(H3InclusionIndexFilterOperator.java:113) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator.getNextBlock(H3InclusionIndexFilterOperator.java:49) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial | at org.apache.pinot.core.operator.BaseOperator.nextBlock(BaseOperator.java:43) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
I did a bit of debugging, which revealed that the value in the point
column was sometimes an empty byte array.
It turns out that an empty byte array is actually the default value for the BYTES
column type and that default value was being inserted when my transformation function failed.
And the transformation function failed if either of the lat or lon values were null!
I figured this out by running the following command against the Kafka stream:
kcat -C -b localhost:9092 -t trains -u | jq 'select(.lat == null or .lon == null)'
And eventually saw the following output:
{
"trainCompany": "London Overground",
"atocCode": "LO",
"lat": null,
"lon": null,
"ts": "2023-03-10 12:56:13",
"trainId": "202303107145033"
}
% Reached end of topic trains [0] at offset 4475923
This error was stopping any segments being committed, so I needed to fix it. The fix that I’ve come up with is to create a default value that represents a point near the arctic as I’m fairly sure no UK trains will be going that far North!
Default values for BYTES column
The default value goes in the schema config and it should be a hex encoded value. I ran the following query to get a Hex encoded representation of a location in the Arctic:
SELECT ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)')
FROM trains
This query returns the value 003fe5f4a42008f90c4054e004d205fbe4
, which I added to my schema, as shown below:
{
"schemaName": "trains",
"dimensionFieldSpecs": [
{"name": "trainCompany", "dataType": "STRING"},
{"name": "trainId", "dataType": "STRING"},
{"name": "atocCode", "dataType": "STRING"},
{"name": "point", "dataType": "BYTES", "defaultNullValue": "003fe5f4a42008f90c4054e004d205fbe4"}
],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
When we’re querying the table we can filter those values out like this:
select $docId, $hostName, $segmentName, *
from trains
where point <> ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)')
limit 10
The AddTableCommand
When I was trying to create the schema and table from scratch with a default value, I found that I couldn’t use the AddTable
command as it was getting into a mess by trying to decode the default value twice - once in the command itself and once on the Pinot Controller when it received the table config.
The error message looked like this:
2023/03/10 13:15:04.401 INFO [AddTableCommand] [main] {"code":400,"error":"Invalid TableConfigs. Cannot convert value: 'AD/l9KQgCPkMQFTgBNIF++Q=' to type: BYTES\n at [Source: (String)\"{\"tableName\":\"trains\",\"schema\":{\"schemaName\":\"trains\",\"primaryKeyColumns\":null,\"dimensionFieldSpecs\":[{\"name\":\"trainCompany\",\"maxLength\":512,\"dataType\":\"STRING\",\"transformFunction\":null,\"defaultNullValue\":\"null\",\"singleValueField\":true,\"virtualColumnProvider\":null,\"defaultNullValueString\":\"null\"},{\"name\":\"trainId\",\"maxLength\":512,\"dataType\":\"STRING\",\"transformFunction\":null,\"defaultNullValue\":\"null\",\"singleValueField\":true,\"virtualColumnProvider\":null,\"defaultNullValueString\":\"null\"},{\"name\":\"at\"[truncated 2231 chars]; line: 1, column: 777] (through reference chain: org.apache.pinot.spi.config.TableConfigs[\"schema\"]->org.apache.pinot.spi.data.Schema[\"dimensionFieldSpecs\"]->java.util.ArrayList[3]->org.apache.pinot.spi.data.DimensionFieldSpec[\"defaultNullValue\"])"}
I’m not really sure how to get this to work, but luckily there is a workaround. First, create the schema:
docker run \
--network geospatial \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddSchema \
-schemaFile /config/schema.json \
-controllerHost "pinot-controller-geospatial" \
-exec
And then create the table using the HTTP API:
curl -X POST http://localhost:9000/tables --data @config/table.json
Not ideal, but it works!
Conclusion
This ended up being super fiddly, but it does work! If you’re stuck on something similar, do join the StarTree Community Slack and I’ll do my best to help.
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.