Apache Pinot: Exploring range queries
In our last post about the Chicago Crimes dataset and Apache Pinot, we learnt how to use various indexes to filter columns by exact values. In this post we’re going to learn how to write range queries against the dataset.
Recap
To recap, the Chicago Crimes dataset contains more than 7 million crimes committed in Chicago from 2001 until today. For each crime we have various identifiers, a timestamp, location, and codes reprsenting the type of crime that’s been committed.
A subset of the data is shown below:
We loaded this data into Apache and Pinot and then analysed the numEntriesScannedInFilter
and timeUsedMs
values returned in the query metadata to see how well we’d optimised our queries.
Setup
We’re going to use the same Docker Compose script as before, which spins up the various components for Apache Pinot 0.9.0:
version: '3.7'
services:
zookeeper:
image: zookeeper:3.5.6
hostname: zookeeper
container_name: manual-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
pinot-controller:
image: apachepinot/pinot:0.9.0
command: "StartController -zkAddress manual-zookeeper:2181"
container_name: "manual-pinot-controller"
volumes:
- ./config:/config
- ./data:/data
restart: unless-stopped
ports:
- "9000:9000"
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.9.0
command: "StartBroker -zkAddress manual-zookeeper:2181"
restart: unless-stopped
container_name: "manual-pinot-broker"
volumes:
- ./config:/config
- ./data:/data
ports:
- "8099:8099"
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.9.0
command: "StartServer -zkAddress manual-zookeeper:2181"
restart: unless-stopped
container_name: "manual-pinot-server"
volumes:
- ./config:/config
- ./data:/data
depends_on:
- pinot-broker
Schema
We’ve made a small change to our schema since the last blog post.
Instead of storing the timestamp as a STRING
, we’re going to store it as a TIMESTAMP
so that we can perform date time operations.
We’ll also change the column name from Timestamp
to DateEpoch
.
The updated schema is shown below:
{
"schemaName": "crimes",
"dimensionFieldSpecs": [
{
"name": "ID",
"dataType": "INT"
},
{
"name": "CaseNumber",
"dataType": "STRING"
},
{
"name": "Block",
"dataType": "STRING"
},
{
"name": "IUCR",
"dataType": "STRING"
},
{
"name": "PrimaryType",
"dataType": "STRING"
},
{
"name": "Arrest",
"dataType": "BOOLEAN"
},
{
"name": "Domestic",
"dataType": "BOOLEAN"
},
{
"name": "Beat",
"dataType": "STRING"
},
{
"name": "District",
"dataType": "STRING"
},
{
"name": "Ward",
"dataType": "STRING"
},
{
"name": "CommunityArea",
"dataType": "STRING"
},
{
"name": "FBICode",
"dataType": "STRING"
},
{
"name": "Latitude",
"dataType": "DOUBLE"
},
{
"name": "Longitude",
"dataType": "DOUBLE"
}
],
"dateTimeFieldSpecs": [
{
"name": "DateEpoch",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
If you read the first post about this dataset you’ll remember that the timestamp was a DateTime string in the MM/dd/yyyy HH:mm:ss a
format, rather than the number of milliseconds since the epoch.
We’re going to take care of that with a transformation function in our table config.
Table
The table config is described below:
{
"tableName": "crimes",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"sortedColumn": ["Beat"]
},
"nullHandlingEnabled": true,
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "CaseNumber", "transformFunction": "\"Case Number\"" },
{"columnName": "PrimaryType", "transformFunction": "\"Primary Type\"" },
{"columnName": "CommunityArea", "transformFunction": "\"Community Area\"" },
{"columnName": "FBICode", "transformFunction": "\"FBI Code\"" },
{"columnName": "DateEpoch", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
]
},
"metadata": {}
}
As mentioned in the previous section, we’re using the FromDateTime
transformation function to convert the DateTime strings in the Date
column into timestamps.
For more details on the data transformation works see my blog post Converting a DateTime String to Timestamp in Apache Pinot. |
Now let’s create the table and schema:
docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json -exec
Import CSV
Now we’re going to import the crimes into Pinot, using the following ingestion spec:
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/Crimes_beat_sorted.csv'
outputDirURI: ' '
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'crimes'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
docker exec \
-it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec-sorted.yml
Range Querying
Now it’s time to do some range querying. Let’s start by counting the crimes committed on 1st January 2019, which we can do by running the following query:
SELECT count(*)
FROM crimes
WHERE DateEpoch BETWEEN
FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
FromDateTime('2019-01-02', 'yyyy-MM-dd')
As in the last post, we’ll be looking at the output in JSON format to see what’s going on under the covers:
{
"numDocsScanned": 1065,
"numEntriesScannedInFilter": 7434990,
"numEntriesScannedPostFilter": 0,
"timeUsedMs": 80
}
As we might expect, the SQL engine has scanned all 7,434,990 rows to check which ones match the date range. 1,065 crimes were committed on this day and the query took 80 ms.
Now we’re going to create another table called crimes_index
that has a range index applied to the DateEpoch
column:
{
"tableName": "crimes_range_index",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1,
"schemaName": "crimes", (1)
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"sortedColumn": ["Beat"],
"rangeIndexVersion": 2,
"rangeIndexColumns": ["DateEpoch"] (2)
},
"nullHandlingEnabled": true,
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "CaseNumber", "transformFunction": "\"Case Number\"" },
{"columnName": "PrimaryType", "transformFunction": "\"Primary Type\"" },
{"columnName": "CommunityArea", "transformFunction": "\"Community Area\"" },
{"columnName": "FBICode", "transformFunction": "\"FBI Code\"" },
{"columnName": "DateEpoch", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
]
},
"metadata": {}
}
1 | We need to explicitly specify the schema name since it doesn’t match the table name. |
2 | Add a range index on the DateEpoch column. |
Run the following command to create the table:
docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table-range-index.json \
-schemaFile /config/schema.json -exec
Now let’s copy the segment from the crimes
table to the crimes_range_index
table.
We can do this with the following ingestion spec:
executionFrameworkSpec:
name: 'standalone'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentTarPush
outputDirURI: '/opt/pinot/data/crimes'
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
tableSpec:
tableName: 'crimes_range_index'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
Run the ingestion spec:
docker exec \
-it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec-download-only.yml
For more details on how this ingestion spec works, see my blog post Apache Pinot: Copying a segment to a new table. |
Now let’s re-run the query against the new table:
SELECT count(*)
FROM crimes_range_index
WHERE DateEpoch BETWEEN
FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
FromDateTime('2019-01-02', 'yyyy-MM-dd')
{
"numDocsScanned": 1065,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 0,
"timeUsedMs": 14
}
numEntriesScannedInFilter
is now down to 0 and the query is more than 5 times faster than it was before.
Now let’s see what happens if we filter on multiple columns.
We have a sorted index on the Beat
column, so let’s find the crimes committed on a specific beat on 1st January 2019:
SELECT count(*)
FROM crimes
WHERE DateEpoch BETWEEN
FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat = '0421'
{
"numDocsScanned": 13,
"numEntriesScannedInFilter": 57573,
"numEntriesScannedPostFilter": 0,
"timeUsedMs": 3
}
SELECT count(*)
FROM crimes_range_index
WHERE DateEpoch BETWEEN
FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat = '0421'
{
"numDocsScanned": 13,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 0,
"timeUsedMs": 12
}
In this case, the combination of sorted index on Beat
and a scan of the forward index on DateEpoch
is quicker than querying indexes on both fields.
Admittedly this is far from the normal usage of Pinot. I’m only running one query at a time, whereas there would normally be thousands (if not more) concurrent queries, which would tip things in favour of using the indexes on both fields. Having said that, there is a GitHub issue to have the query engine be smarter when deciding whether to scan or use an index.
If we include more beats, the query that uses both indexes is slightly quicker:
SELECT count(*)
FROM crimes
WHERE DateEpoch BETWEEN
FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat IN ('0421', '0423', '0624', '1834', '0511', '1112', '1533', '0823', '0414', '1522')
{
"numDocsScanned": 90,
"numEntriesScannedInFilter": 513052,
"numEntriesScannedPostFilter": 0,
"timeUsedMs": 12
}
SELECT count(*)
FROM crimes_range_index
WHERE DateEpoch BETWEEN
FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat IN ('0421', '0423', '0624', '1834', '0511', '1112', '1533', '0823', '0414', '1522')
{
"numDocsScanned": 90,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 0,
"timeUsedMs": 11
}
Conclusion
In this post we learnt how to use Apache Pinot’s range index so that we could find crimes that happened on a given day. We then combined filter predicates and saw that sometimes it can be faster to do a column scan rather than merging together the results of querying two indexes.
We haven’t yet explored how to query the spatial data that this dataset contains, so perhaps that’s what we’ll look at in our next post!
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.