· pinot

Apache Pinot: Geospatial - java.nio.BufferUnderflowException: null

I’ve been working on a blog post showing how to use Geospatial indexes in Apache Pinot and ran into an interesting exception that I’ll explain in this blog post.

Set up

But first, let’s take a look at the structure of the data that I’m ingesting from Apache Kafka. Below is an example of one of those events:

{
  "trainCompany": "London Overground",
  "atocCode": "LO",
  "lat": 51.541615,
  "lon": -0.122528896,
  "ts": "2023-03-10 11:35:20",
  "trainId": "202303107145241"
}

As you’ve probably guessed, I’m importing the locations of trains in the UK. I created the following schema:

Pinot Schema
{
    "schemaName": "trains",
    "dimensionFieldSpecs": [
      {"name": "trainCompany", "dataType": "STRING"},
      {"name": "trainId", "dataType": "STRING"},
      {"name": "atocCode", "dataType": "STRING"},
      {"name": "point", "dataType": "BYTES"}
    ],
    "dateTimeFieldSpecs": [
      {
        "name": "ts",
        "dataType": "TIMESTAMP",
        "format": "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }
    ]
}

The first three columns are automatically mapped from the data source. The point column is going to store a Point object based on the lat/lon values in the event. We’ll create that object in the table config, which you can see below:

Pinot Table Config
{
    "tableName": "trains",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "trains",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "fieldConfigList": [
      {
        "name": "point",
        "encodingType":"RAW",
        "indexType":"H3",
        "properties": {
        "resolutions": "5"
         }
        }
    ],
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "noDictionaryColumns": ["point"],
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "trains",
        "stream.kafka.broker.list": "kafka-geospatial:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.rows":"1000",
        "realtime.segment.flush.threshold.time":"1h"
      }
    },
    "ingestionConfig": {
      "transformConfigs": [
        {
          "columnName": "point",
          "transformFunction": "STPoint(lon, lat, 1)"
        }
      ]
    },
    "tenants": {},
    "metadata": {}
  }

The function under ingestionConfig.transformConfigs creates a Point Geomtry object, which is stored in the point column. We also create a Geospatial index on the point column, which is defined under fieldConfigList.

The BufferUnderflowException

Once I’d created this table the data started ingesting, but I was sometimes ending up with the following error on the Pinot server:

pinot-server-geospatial      | 2023/03/09 12:49:05.889 ERROR [BaseCombineOperator] [pqw-5] Caught exception while processing query: QueryContext{_tableName='trains_REALTIME', _subquery=null, _selectExpressions=[ts, trainId, atocCode, trainCompany, stastext(point)], _aliasList=[null, null, null, null, null], _filter=stwithin(point,'84000000010000000600000000bfc097f3a00000004049c107e8691db8bfbbd01b7fffffff4049c10bc3b2dbd8bfbac631c00000014049bf9dcb81ef38bfc0ae8a200000014049be7fdbcf81e0bfc3b13b000000004049bf9dee86bbf8bfc097f3a00000004049c107e8691db8') = '1', _groupByExpressions=null, _havingFilter=null, _orderByExpressions=[ts DESC], _limit=10, _offset=0, _queryOptions={responseFormat=sql, groupByMode=sql, timeoutMs=10000}, _expressionOverrideHints={}, _explain=false}
pinot-server-geospatial      | java.nio.BufferUnderflowException: null
pinot-server-geospatial      | 	at java.nio.Buffer.nextGetIndex(Buffer.java:643) ~[?:?]
pinot-server-geospatial      | 	at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:165) ~[?:?]
pinot-server-geospatial      | 	at org.apache.pinot.segment.local.utils.GeometrySerializer.readGeometry(GeometrySerializer.java:83) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.segment.local.utils.GeometrySerializer.readGeometry(GeometrySerializer.java:79) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.segment.local.utils.GeometrySerializer.deserialize(GeometrySerializer.java:68) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.geospatial.transform.function.BaseBinaryGeoTransformFunction.transformGeometryToIntValuesSV(BaseBinaryGeoTransformFunction.java:99) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.geospatial.transform.function.StWithinFunction.transformToIntValuesSV(StWithinFunction.java:46) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator.processProjectionBlock(ExpressionScanDocIdIterator.java:140) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator.applyAnd(ExpressionScanDocIdIterator.java:120) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator.getFilterBlock(H3InclusionIndexFilterOperator.java:131) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator.getNextBlock(H3InclusionIndexFilterOperator.java:113) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator.getNextBlock(H3InclusionIndexFilterOperator.java:49) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
pinot-server-geospatial      | 	at org.apache.pinot.core.operator.BaseOperator.nextBlock(BaseOperator.java:43) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]

I did a bit of debugging, which revealed that the value in the point column was sometimes an empty byte array. It turns out that an empty byte array is actually the default value for the BYTES column type and that default value was being inserted when my transformation function failed. And the transformation function failed if either of the lat or lon values were null!

I figured this out by running the following command against the Kafka stream:

kcat -C  -b localhost:9092 -t trains  -u  | jq 'select(.lat == null or .lon == null)'

And eventually saw the following output:

{
  "trainCompany": "London Overground",
  "atocCode": "LO",
  "lat": null,
  "lon": null,
  "ts": "2023-03-10 12:56:13",
  "trainId": "202303107145033"
}
% Reached end of topic trains [0] at offset 4475923

This error was stopping any segments being committed, so I needed to fix it. The fix that I’ve come up with is to create a default value that represents a point near the arctic as I’m fairly sure no UK trains will be going that far North!

Default values for BYTES column

The default value goes in the schema config and it should be a hex encoded value. I ran the following query to get a Hex encoded representation of a location in the Arctic:

SELECT ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)')
FROM trains

This query returns the value 003fe5f4a42008f90c4054e004d205fbe4, which I added to my schema, as shown below:

{
    "schemaName": "trains",
    "dimensionFieldSpecs": [
      {"name": "trainCompany", "dataType": "STRING"},
      {"name": "trainId", "dataType": "STRING"},
      {"name": "atocCode", "dataType": "STRING"},
      {"name": "point", "dataType": "BYTES", "defaultNullValue": "003fe5f4a42008f90c4054e004d205fbe4"}
    ],
    "dateTimeFieldSpecs": [
      {
        "name": "ts",
        "dataType": "TIMESTAMP",
        "format": "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }
    ]
}

When we’re querying the table we can filter those values out like this:

select $docId, $hostName, $segmentName, *
from trains
where point <> ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)')
limit 10

The AddTableCommand

When I was trying to create the schema and table from scratch with a default value, I found that I couldn’t use the AddTable command as it was getting into a mess by trying to decode the default value twice - once in the command itself and once on the Pinot Controller when it received the table config. The error message looked like this:

2023/03/10 13:15:04.401 INFO [AddTableCommand] [main] {"code":400,"error":"Invalid TableConfigs. Cannot convert value: 'AD/l9KQgCPkMQFTgBNIF++Q=' to type: BYTES\n at [Source: (String)\"{\"tableName\":\"trains\",\"schema\":{\"schemaName\":\"trains\",\"primaryKeyColumns\":null,\"dimensionFieldSpecs\":[{\"name\":\"trainCompany\",\"maxLength\":512,\"dataType\":\"STRING\",\"transformFunction\":null,\"defaultNullValue\":\"null\",\"singleValueField\":true,\"virtualColumnProvider\":null,\"defaultNullValueString\":\"null\"},{\"name\":\"trainId\",\"maxLength\":512,\"dataType\":\"STRING\",\"transformFunction\":null,\"defaultNullValue\":\"null\",\"singleValueField\":true,\"virtualColumnProvider\":null,\"defaultNullValueString\":\"null\"},{\"name\":\"at\"[truncated 2231 chars]; line: 1, column: 777] (through reference chain: org.apache.pinot.spi.config.TableConfigs[\"schema\"]->org.apache.pinot.spi.data.Schema[\"dimensionFieldSpecs\"]->java.util.ArrayList[3]->org.apache.pinot.spi.data.DimensionFieldSpec[\"defaultNullValue\"])"}

I’m not really sure how to get this to work, but luckily there is a workaround. First, create the schema:

docker run \
   --network geospatial \
   -v $PWD/config:/config \
   apachepinot/pinot:0.12.0-arm64 AddSchema \
     -schemaFile /config/schema.json \
     -controllerHost "pinot-controller-geospatial" \
    -exec

And then create the table using the HTTP API:

curl -X POST http://localhost:9000/tables --data @config/table.json

Not ideal, but it works!

Conclusion

This ended up being super fiddly, but it does work! If you’re stuck on something similar, do join the StarTree Community Slack and I’ll do my best to help.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket