Apache Pinot: Convert DateTime string to Timestamp - IllegalArgumentException: Invalid timestamp
In this post we’ll learn how to deal with a field that contains DateTime strings when importing a CSV file into Apache Pinot. We’ll also cover some of the error messages that you’ll see if you do it the wrong way.
Setup
We’re going to spin up a local instance of Pinot using the following Docker compose config:
version: '3.7'
services:
zookeeper:
image: zookeeper:3.5.6
hostname: zookeeper
container_name: manual-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
pinot-controller:
image: apachepinot/pinot:0.9.0
command: "StartController -zkAddress manual-zookeeper:2181"
container_name: "manual-pinot-controller"
volumes:
- ./config:/config
- ./data:/data
restart: unless-stopped
ports:
- "9000:9000"
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.9.0
command: "StartBroker -zkAddress manual-zookeeper:2181"
restart: unless-stopped
container_name: "manual-pinot-broker"
volumes:
- ./config:/config
- ./data:/data
ports:
- "8099:8099"
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.9.0
command: "StartServer -zkAddress manual-zookeeper:2181"
restart: unless-stopped
container_name: "manual-pinot-server"
volumes:
- ./config:/config
- ./data:/data
depends_on:
- pinot-broker
Data
We’ll be working with the following CSV file that has ID
and Date
columns:
ID | Date |
---|---|
10224738 |
09/05/2015 01:30:00 PM |
10224739 |
09/04/2015 11:30:00 AM |
11646166 |
09/01/2018 12:01:00 AM |
10224740 |
09/05/2015 12:45:00 PM |
Create Table
Now we’re going to create a Pinot schema and table based on this CSV file.
The schema is defined below:
{
"schemaName": "dates",
"dimensionFieldSpecs": [
{
"name": "ID",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "Date",
"dataType": "TIMESTAMP",
"format" : "1:SECONDS:SIMPLE_DATE_FORMAT:MM/dd/yyyy HH:mm:ss a",
"granularity": "1:HOURS"
}
]
}
We’re going to use the TIMESTAMP
data type so that we can write queries that require date operations against the values in this column.
Our table config is defined below:s
{
"tableName": "dates",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"nullHandlingEnabled": true,
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"metadata": {}
}
Now let’s create the table and schema:
docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json -exec
Import CSV file
After we’ve done that, it’s time to import the CSV file. We’ll do this using the ingestion job spec defined below:
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/dates.csv'
outputDirURI: '/opt/pinot/data/dates'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'dates'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
Notice that includeFileNamePattern
refers to the dates.csv
file that we saw earlier in this post.
We can run the ingestion job like so:
docker exec \
-it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
2021/12/03 12:16:56.848 ERROR [RecordReaderSegmentCreationDataSource] [pool-2-thread-1] Caught exception while gathering stats
java.lang.RuntimeException: Caught exception while transforming data type for column: Date
at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:95) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:83) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:80) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:42) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:173) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:155) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:104) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.run(SegmentGenerationTaskRunner.java:118) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.lambda$submitSegmentGenTask$1(SegmentGenerationJobRunner.java:263) ~[pinot-batch-ingestion-standalone-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: Invalid timestamp: '09/05/2015 01:30:00 PM'
at org.apache.pinot.spi.utils.TimestampUtils.toTimestamp(TimestampUtils.java:43) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.common.utils.PinotDataType$10.toTimestamp(PinotDataType.java:524) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:485) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:442) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:90) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
... 13 more
2021/12/03 12:16:56.856 ERROR [SegmentGenerationJobRunner] [pool-2-thread-1] Failed to generate Pinot segment for file - file:/data/dates.csv
java.lang.RuntimeException: Caught exception while transforming data type for column: Date
at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:95) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:83) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:80) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:42) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:173) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:155) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:104) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.run(SegmentGenerationTaskRunner.java:118) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.lambda$submitSegmentGenTask$1(SegmentGenerationJobRunner.java:263) ~[pinot-batch-ingestion-standalone-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: Invalid timestamp: '09/05/2015 01:30:00 PM'
at org.apache.pinot.spi.utils.TimestampUtils.toTimestamp(TimestampUtils.java:43) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.common.utils.PinotDataType$10.toTimestamp(PinotDataType.java:524) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:485) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:442) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:90) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
... 13 more
Hmmm, that didn’t quite work as expected.
There was a failure when trying to convert the string to a timestamp, and if we take a look at Timestamputils#toTimestamp
, we can see that this function actually only supports the following input formats:
-
'yyyy-mm-dd hh:mm:ss[.fffffffff]'
-
Millis since epoch
Our DateTime strings have the MM/dd/yyyy HH:mm:ss a
format, which isn’t supported.
One way to fix this problem would be to massage dates.csv
so that the values in the Date
column are in the expected format.
The CSV file would then look like this:
ID | Date |
---|---|
10224738 |
2015-09-05 01:30:00 |
10224739 |
2015-09-04 11:30:00 |
11646166 |
2018-09-01 00:01:00 |
10224740 |
2015-09-05 12:45:00 |
We can then re-run the ingestion job and the date will be imported into the Date
column.
Alternatively, we could write a transformation function in our table config to take care of it. Our table config would then instead look this:
{
"tableName": "dates",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"nullHandlingEnabled": true,
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "Date", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
]
},
"metadata": {}
}
This transform function uses the FromDateTime
function, which converts a DateTime string to epoch millis.
Before we update the table config, let’s drop the segment that we created earlier (by running the ingest job):
curl -X DELETE "http://localhost:9000/segments/dates?type=OFFLINE" -H "accept: application/json"
Now we can update the table config:
curl 'http://localhost:9000/tables/dates_OFFLINE' \
-X 'PUT' \
-H 'Content-Type: application/json' \
--data-binary "@config/table-transform.json"
{"_code":400,"_error":"Invalid table config: dates_OFFLINE with error: Arguments of a transform function '[Date]' cannot contain the destination column 'Date'"}
Hmmm, we can’t use the same name for the transformed and source columns.
We’ll have to instead update our schema and table config to store the transformed value in a Timestamp
column instead.
The updated schema and table config are shown below:
{
"schemaName": "dates",
"dimensionFieldSpecs": [
{
"name": "ID",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "Timestamp",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:HOURS"
}
]
}
{
"tableName": "dates",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"nullHandlingEnabled": true,
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "Timestamp", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
]
},
"metadata": {}
}
We can’t remove columns from a schema, so let’s drop the schema and table and re-create them both:
curl -X DELETE "http://localhost:9000/schemas/dates" -H "accept: application/json" &&
curl -X DELETE "http://localhost:9000/tables/dates?type=offline" -H "accept: application/json"
docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table-transform.json \
-schemaFile /config/schema-transform.json -exec
We can then run the ingestion job again and the datetime values will be loaded into the Timestamp
column, as shown in the screenshot below:
Success!
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.