Apache Pinot: Importing CSV files with columns containing spaces
I’ve been playing around with one of my favourite datasets from the Chicago Data Portal and spent a while figuring out how to import columns that contain spaces into Apache Pinot. In this blog post we’ll learn how to do that using a subset of the data.
Setup
We’re going to spin up a local instance of Pinot using the following Docker compose config:
version: '3.7'
services:
zookeeper:
image: zookeeper:3.5.6
hostname: zookeeper
container_name: manual-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
pinot-controller:
image: apachepinot/pinot:0.9.0
command: "StartController -zkAddress manual-zookeeper:2181"
container_name: "manual-pinot-controller"
volumes:
- ./config:/config
- ./data:/data
restart: unless-stopped
ports:
- "9000:9000"
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.9.0
command: "StartBroker -zkAddress manual-zookeeper:2181"
restart: unless-stopped
container_name: "manual-pinot-broker"
volumes:
- ./config:/config
- ./data:/data
ports:
- "8099:8099"
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.9.0
command: "StartServer -zkAddress manual-zookeeper:2181"
restart: unless-stopped
container_name: "manual-pinot-server"
volumes:
- ./config:/config
- ./data:/data
depends_on:
- pinot-broker
We can launch the container by running the following command:
docker-compose up
Data
Let’s say we have the following CSV file that contains two columns, one that contains spaces and one that doesn’t:
ID | Case Number |
---|---|
10224738 |
HY411648 |
10224739 |
HY411615 |
11646166 |
JC213529 |
10224740 |
HY411595 |
This CSV contains a subset of the crimes data from the Chicago Data Portal.
Create Table
We’re going to create a crimes
Pinot table and associated schema based on this CSV file:
{
"schemaName": "crimes",
"dimensionFieldSpecs": [
{
"name": "ID",
"dataType": "INT"
},
{
"name": "Case Number",
"dataType": "STRING"
}
]
}
{
"tableName": "crimes",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"metadata": {}
}
Let’s try to create a table based on this config:
docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable -tableConfigFile /config/table.json -schemaFile /config/schema.json -exec
2021/11/25 11:57:27.088 ERROR [AddTableCommand] [main] Got Exception to upload Pinot Schema: crimes
org.apache.pinot.common.exception.HttpErrorStatusException: Got error status code: 400 (Bad Request) with reason: "Cannot add invalid schema: crimes. Reason: The column name "Case Number" should not contain blank space." while sending request: http://192.168.144.3:9000/schemas to controller: d15b07933b22, version: Unknown
at org.apache.pinot.common.utils.FileUploadDownloadClient.sendRequest(FileUploadDownloadClient.java:510) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.common.utils.FileUploadDownloadClient.addSchema(FileUploadDownloadClient.java:616) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.tools.admin.command.AddTableCommand.uploadSchema(AddTableCommand.java:166) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.tools.admin.command.AddTableCommand.execute(AddTableCommand.java:203) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.tools.Command.call(Command.java:33) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.tools.Command.call(Command.java:29) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at picocli.CommandLine.executeUserObject(CommandLine.java:1953) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at picocli.CommandLine.access$1300(CommandLine.java:145) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2352) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at picocli.CommandLine$RunLast.handle(CommandLine.java:2346) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at picocli.CommandLine$RunLast.handle(CommandLine.java:2311) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at picocli.CommandLine.execute(CommandLine.java:2078) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.tools.admin.PinotAdministrator.execute(PinotAdministrator.java:161) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.tools.admin.PinotAdministrator.main(PinotAdministrator.java:192) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
Pinot schemas don’t allow column names that contain spaces, so we’ll have to get rid of the space in Case Number
.
We can update the schema to look like this:
{
"schemaName": "crimes",
"dimensionFieldSpecs": [
{
"name": "ID",
"dataType": "INT"
},
{
"name": "CaseNumber",
"dataType": "STRING"
}
]
}
If we re-rerun the AddTable
command, we’ll see the following output:
2021/11/25 12:02:04.606 INFO [AddTableCommand] [main] Executing command: AddTable -tableConfigFile /config/table.json -schemaFile /config/schema.json -controllerProtocol http -controllerHost 192.168.144.3 -controllerPort 9000 -user null -password [hidden] -exec
2021/11/25 12:02:05.084 INFO [AddTableCommand] [main] {"status":"Table crimes_OFFLINE succesfully added"}
Ingest CSV file
Now we’re going to imort the CSV file that we saw at the beginning of the post. To do this, we’ll create the following data ingestion job spec:
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/import.csv'
outputDirURI: '/opt/pinot/data/crimes/segments/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'crimes'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
We can import the CSV file by running the following command:
docker exec \
-it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
Once we’ve run this we can navigate to the Query Console at http://localhost:9000/#/query and run the following query:
select * from crimes limit 10
CaseNumber | ID |
---|---|
null |
10224738 |
null |
10224739 |
null |
11646166 |
null |
10224740 |
Hmmm, the CaseNumber
is always null, which isn’t what we want.
To deal with this problem we’ll need to add an ingestion transformation config to our table config.
Update table config and reingest
Let’s update our table config to add the transform config:
{
"tableName": "crimes",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "CaseNumber", "transformFunction": "\"Case Number\"" }
]
},
"metadata": {}
}
The columnName
refers to the column name in the schema and the tranformFunction
describes a function for processing a value from the source data.
In this case we’re specifying the name of the property/column name from our CSV file and it will extract the values from that column.
Before we update the table config, let’s first delete the segment that we ingested in the previous section:
curl -X DELETE "http://localhost:9000/segments/crimes?type=OFFLINE" -H "accept: application/json"
Now we can update the table config:
curl 'http://localhost:9000/tables/crimes_OFFLINE' \
-X 'PUT' \
-H 'Content-Type: application/json' \
--data-binary "@config/table.json"
And finally, we can run the data ingestion job again:
docker exec \
-it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
Now if we run a query against this table we’ll see populated values for the CaseNumber
:
select * from crimes limit 10
CaseNumber | ID |
---|---|
HY411648 |
10224738 |
HY411615 |
10224739 |
JC213529 |
11646166 |
HY411595 |
10224740 |
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.