· neo4j cypher apoc strava

Neo4j APOC: Importing data from Strava's paginated JSON API

Over the weekend I’ve been playing around with loading data from the Strava API into Neo4j and I started with the following Python script which creates a node with a Run label for each of my activities.

If you want to follow along on your own data you’ll need to get an API key via the 'My API Application' section of the website. Once you’ve got that put it in the TOKEN environment variable and you should be good to go.

import os
from neo4j.v1 import GraphDatabase

password = os.environ["NEO4J_PASSWORD"]

driver = GraphDatabase.driver("bolt://localhost", auth=("neo4j", password))
with driver.session() as session:
    page = 1
    while True:
        result = session.run("""\
        WITH "https://www.strava.com/api/v3/athlete/activities?page=" + $page AS uri
        CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
        YIELD value

        MERGE (run:Run {id: value.id})
        SET run.distance = toFloat(value.distance),
            run.startDate = datetime(value.start_date_local),
            run.elapsedTime = duration({seconds: value.elapsed_time})

        RETURN count(*) AS count
        """, {"page": page, "stravaToken": "Bearer {0}".format(os.environ["TOKEN"])})

        runs_imported = result.peek()["count"]
        print("Runs imported:", runs_imported)
        if runs_imported == 0:
            break
        else:
            page += 1

The Strava API is a bit unusual in that it doesn’t return any meta data indicating whether there are more pages to come - you get the data and only the data! We’ll receive an empty array once we reach the end so we have to check for that condition and exit our loop when its met.

Most of the word is being done by APOC's apoc.load.jsonParams procedure - we’re only handling the pagination stuff in Python. Much as I love a good Python script, I was curious whether I could write the whole import script using just Cypher and get rid of the Python code completely.

apoc all the things

Attempt 1: Using an Import meta data node

Let’s get started! Before we do anything we’ll create a parameter containing our Strava token:

:params {stravaToken: "Bearer <insert-strava-token>"}

My first solution for handling this pagination is to create a separate meta data node which can keep track of the page we’re up to. We should then be able to increment a page property on that node after every call to the Strava API. We’ll wrap our call to apoc.load.jsonParams inside one of APOC’s periodic commit procedures.

First let’s create our meta data node:

CREATE (:Import {page: 1})

Now we need to work out which of the periodic commit procedures is the best fit. There are several to pick from:

  • apoc.periodic.commit - runs the given kernelTransaction in separate transactions until it returns 0

  • apoc.periodic.rock_n_roll_while - run the action kernelTransaction in batches over the iterator kernelTransaction’s results in a separate thread.

  • apoc.periodic.rock_n_roll - run the action kernelTransaction in batches over the iterator kernelTransaction’s results in a separate thread.

  • apoc.periodic.iterate - run the second kernelTransaction for each item returned by the first kernelTransaction.

After playing around with a solution in my head and then sketching out a toy example in the Neo4j browser (as well as doubting that this was even possible), I figured out that apoc.periodic.commit would be best suited.

We can wrap our call to apoc.load.jsonParams in the following call to apoc.periodic.commit:

call apoc.periodic.commit("
  MATCH (import:Import)
  WITH 'https://www.strava.com/api/v3/athlete/activities?page=' + import.page AS uri, import.page AS initialPage, import
  CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
  YIELD value

  MERGE (run:Run {id: value.id})
  SET run.distance = toFloat(value.distance),
      run.startDate = datetime(value.start_date_local),
      run.elapsedTime = duration({seconds: value.elapsed_time})

  WITH initialPage, import, CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
  FOREACH(ignoreMe in CASE WHEN count = 0 THEN [] ELSE [1] END |
    MERGE (import)
    SET import.page = initialPage+1
  )
  RETURN count
", {stravaToken: $stravaToken})

This script will read the page property from our Import node and call the Strava API starting at that page via apoc.load.jsonParams. We create a node with the label Run for each activity and then we check if any rows were actually returned. The default activities per page is 30 so if we receive less than that back then we know we’re reached the end of the stream and can return a count of 0. We then use the FOREACH hack to increment the page property on the Import node before returning the count. Finally we return the count and once the value returned is 0 the Cypher statement will complete.

This works but it’s a bit annoying to have to create the Import node to store our page number. We also can’t easily reuse the code to pick up any new runs. We’d have to set the page back to 1 and then it would go through everything again which is a bit of a waste.

Attempt 2: The after parameter and timestamps

While trying to come up with a cleaner way to paginate I realised that the Strava API allows you to pass in an after parameter. The after parameter indicates a minimum timestamp for the activities that should be returned. We can use this to simplify our Cypher statement!

The Run nodes that we’re creating contain a startDate which we can convert into a timestamp and pass to the API. If we haven’t loaded any runs yet we can use the coalesce function to start from 0.

We now end up with a much simpler script:

call apoc.periodic.commit("
  OPTIONAL MATCH (run:Run)
  WITH run ORDER BY run.startDate DESC LIMIT 1
  WITH coalesce(run.startDate.epochSeconds, 0) AS after
  WITH 'https://www.strava.com/api/v3/athlete/activities?after=' + after AS uri
  CALL apoc.load.jsonParams(uri, {Authorization: $stravaToken}, null)
  YIELD value

  MERGE (run:Run {id: value.id})
  SET run.distance = toFloat(value.distance),
      run.startDate = datetime(value.start_date_local),
      run.elapsedTime = duration({seconds: value.elapsed_time})

  RETURN CASE WHEN count(*) < 30 THEN 0 ELSE count(*) END AS count
", {stravaToken: $stravaToken})

We no longer need to create a meta data node, and we can easily execute this to pick up new activities.

Thank you APOC!

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket