· pinot

Apache Pinot: Convert DateTime string to Timestamp - IllegalArgumentException: Invalid timestamp

In this post we’ll learn how to deal with a field that contains DateTime strings when importing a CSV file into Apache Pinot. We’ll also cover some of the error messages that you’ll see if you do it the wrong way.

datetime timestamp
Figure 1. Apache Pinot - Convert DateTime string to Timestamp

Setup

We’re going to spin up a local instance of Pinot using the following Docker compose config:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: manual-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:0.9.0
    command: "StartController -zkAddress manual-zookeeper:2181"
    container_name: "manual-pinot-controller"
    volumes:
      - ./config:/config
      - ./data:/data
    restart: unless-stopped
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.9.0
    command: "StartBroker -zkAddress manual-zookeeper:2181"
    restart: unless-stopped
    container_name: "manual-pinot-broker"
    volumes:
      - ./config:/config
      - ./data:/data
    ports:
      - "8099:8099"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.9.0
    command: "StartServer -zkAddress manual-zookeeper:2181"
    restart: unless-stopped
    container_name: "manual-pinot-server"
    volumes:
      - ./config:/config
      - ./data:/data
    depends_on:
      - pinot-broker

Data

We’ll be working with the following CSV file that has ID and Date columns:

Table 1. dates.csv
ID Date

10224738

09/05/2015 01:30:00 PM

10224739

09/04/2015 11:30:00 AM

11646166

09/01/2018 12:01:00 AM

10224740

09/05/2015 12:45:00 PM

Create Table

Now we’re going to create a Pinot schema and table based on this CSV file.

The schema is defined below:

/config/schema.json
{
    "schemaName": "dates",
    "dimensionFieldSpecs": [
      {
        "name": "ID",
        "dataType": "INT"
      }
    ],
    "dateTimeFieldSpecs": [
      {
        "name": "Date",
        "dataType": "TIMESTAMP",
        "format" : "1:SECONDS:SIMPLE_DATE_FORMAT:MM/dd/yyyy HH:mm:ss a",
        "granularity": "1:HOURS"
      }
    ]
}

We’re going to use the TIMESTAMP data type so that we can write queries that require date operations against the values in this column.

Our table config is defined below:s

/config/table.json
{
    "tableName": "dates",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "replication": 1
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP"
    },
    "nullHandlingEnabled": true,
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      }
    },
    "metadata": {}
  }

Now let’s create the table and schema:

docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json -exec

Import CSV file

After we’ve done that, it’s time to import the CSV file. We’ll do this using the ingestion job spec defined below:

/config/job-spec.yml
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/dates.csv'
outputDirURI: '/opt/pinot/data/dates'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'dates'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'

Notice that includeFileNamePattern refers to the dates.csv file that we saw earlier in this post.

We can run the ingestion job like so:

docker exec \
  -it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml
2021/12/03 12:16:56.848 ERROR [RecordReaderSegmentCreationDataSource] [pool-2-thread-1] Caught exception while gathering stats
java.lang.RuntimeException: Caught exception while transforming data type for column: Date
	at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:95) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:83) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:80) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:42) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:173) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:155) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:104) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.run(SegmentGenerationTaskRunner.java:118) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.lambda$submitSegmentGenTask$1(SegmentGenerationJobRunner.java:263) ~[pinot-batch-ingestion-standalone-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: Invalid timestamp: '09/05/2015 01:30:00 PM'
	at org.apache.pinot.spi.utils.TimestampUtils.toTimestamp(TimestampUtils.java:43) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.common.utils.PinotDataType$10.toTimestamp(PinotDataType.java:524) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:485) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:442) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:90) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	... 13 more
2021/12/03 12:16:56.856 ERROR [SegmentGenerationJobRunner] [pool-2-thread-1] Failed to generate Pinot segment for file - file:/data/dates.csv
java.lang.RuntimeException: Caught exception while transforming data type for column: Date
	at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:95) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:83) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:80) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource.gatherStats(RecordReaderSegmentCreationDataSource.java:42) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:173) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:155) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.init(SegmentIndexCreationDriverImpl.java:104) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.run(SegmentGenerationTaskRunner.java:118) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.lambda$submitSegmentGenTask$1(SegmentGenerationJobRunner.java:263) ~[pinot-batch-ingestion-standalone-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: Invalid timestamp: '09/05/2015 01:30:00 PM'
	at org.apache.pinot.spi.utils.TimestampUtils.toTimestamp(TimestampUtils.java:43) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.common.utils.PinotDataType$10.toTimestamp(PinotDataType.java:524) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:485) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.common.utils.PinotDataType$9.convert(PinotDataType.java:442) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:90) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	... 13 more

Hmmm, that didn’t quite work as expected. There was a failure when trying to convert the string to a timestamp, and if we take a look at Timestamputils#toTimestamp, we can see that this function actually only supports the following input formats:

  • 'yyyy-mm-dd hh:mm:ss[.fffffffff]'

  • Millis since epoch

Our DateTime strings have the MM/dd/yyyy HH:mm:ss a format, which isn’t supported.

One way to fix this problem would be to massage dates.csv so that the values in the Date column are in the expected format. The CSV file would then look like this:

Table 2. dates.csv
ID Date

10224738

2015-09-05 01:30:00

10224739

2015-09-04 11:30:00

11646166

2018-09-01 00:01:00

10224740

2015-09-05 12:45:00

We can then re-run the ingestion job and the date will be imported into the Date column.

Alternatively, we could write a transformation function in our table config to take care of it. Our table config would then instead look this:

/config/table-transform.json
{
    "tableName": "dates",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "replication": 1
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP"
    },
    "nullHandlingEnabled": true,
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "Date", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
      ]
    },
    "metadata": {}
}

This transform function uses the FromDateTime function, which converts a DateTime string to epoch millis.

Before we update the table config, let’s drop the segment that we created earlier (by running the ingest job):

Drop dates table segments
curl -X DELETE "http://localhost:9000/segments/dates?type=OFFLINE" -H "accept: application/json"

Now we can update the table config:

Update table config
curl 'http://localhost:9000/tables/dates_OFFLINE' \
 -X 'PUT' \
 -H 'Content-Type: application/json' \
 --data-binary "@config/table-transform.json"
Output
{"_code":400,"_error":"Invalid table config: dates_OFFLINE with error: Arguments of a transform function '[Date]' cannot contain the destination column 'Date'"}

Hmmm, we can’t use the same name for the transformed and source columns. We’ll have to instead update our schema and table config to store the transformed value in a Timestamp column instead.

The updated schema and table config are shown below:

/config/schema-transform.json
{
  "schemaName": "dates",
  "dimensionFieldSpecs": [
    {
      "name": "ID",
      "dataType": "INT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "Timestamp",
      "dataType": "TIMESTAMP",
      "format" : "1:MILLISECONDS:EPOCH",
      "granularity": "1:HOURS"
    }
  ]
}
/config/table-transform.json
{
    "tableName": "dates",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "replication": 1
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP"
    },
    "nullHandlingEnabled": true,
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "Timestamp", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
      ]
    },
    "metadata": {}
}

We can’t remove columns from a schema, so let’s drop the schema and table and re-create them both:

curl -X DELETE "http://localhost:9000/schemas/dates" -H "accept: application/json" &&
curl -X DELETE "http://localhost:9000/tables/dates?type=offline" -H "accept: application/json"
docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table-transform.json   \
  -schemaFile /config/schema-transform.json -exec

We can then run the ingestion job again and the datetime values will be loaded into the Timestamp column, as shown in the screenshot below:

query
Figure 2. The dates table with timestamps loaded

Success!

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket