Luigi: An ExternalProgramTask example - Converting JSON to CSV
I’ve been playing around with the Python library Luigi which is used to build pipelines of batch jobs and I struggled to find an example of an ExternalProgramTask so this is my attempt at filling that void.
I’m building a little data pipeline to get data from the meetup.com API and put it into CSV files that can be loaded into Neo4j using the LOAD CSV command.
The first task I created calls the /groups endpoint and saves the result into a JSON file:
import luigi
import requests
import json
from collections import Counter
class GroupsToJSON(luigi.Task):
key = luigi.Parameter()
lat = luigi.Parameter()
lon = luigi.Parameter()
def run(self):
seed_topic = "nosql"
uri = "https://api.meetup.com/2/groups?&topic={0}&lat={1}&lon={2}&key={3}".format(seed_topic, self.lat, self.lon, self.key)
r = requests.get(uri)
all_topics = [topic["urlkey"] for result in r.json()["results"] for topic in result["topics"]]
c = Counter(all_topics)
topics = [entry[0] for entry in c.most_common(10)]
groups = {}
for topic in topics:
uri = "https://api.meetup.com/2/groups?&topic={0}&lat={1}&lon={2}&key={3}".format(topic, self.lat, self.lon, self.key)
r = requests.get(uri)
for group in r.json()["results"]:
groups[group["id"]] = group
with self.output().open('w') as groups_file:
json.dump(list(groups.values()), groups_file, indent=4, sort_keys=True)
def output(self):
return luigi.LocalTarget("/tmp/groups.json")
We define a few parameters at the top of the class which will be passed in when this task is executed. The most interesting lines of the run function are the last couple where we write the JSON to a file. self.output() refers to the target defined in the output function which in this case is /tmp/groups.json.
Now we need to create a task to convert that JSON file into CSV format. The jq command line tool does this job well so we’ll use that. The following task does the job:
from luigi.contrib.external_program import ExternalProgramTask
class GroupsToCSV(luigi.contrib.external_program.ExternalProgramTask):
file_path = "/tmp/groups.csv"
key = luigi.Parameter()
lat = luigi.Parameter()
lon = luigi.Parameter()
def program_args(self):
return ["./groups.sh", self.input()[0].path, self.output().path]
def output(self):
return luigi.LocalTarget(self.file_path)
def requires(self):
yield GroupsToJSON(self.key, self.lat, self.lon)
groups.sh
#!/bin/bash
in=${1}
out=${2}
echo "id,name,urlname,link,rating,created,description,organiserName,organiserMemberId" > ${out}
jq -r '.[] | [.id, .name, .urlname, .link, .rating, .created, .description, .organizer.name, .organizer.member_id] | @csv' ${in} >> ${out}
I wanted to call jq directly from the Python code but I couldn’t figure out how to do it so putting that code in a shell script is my workaround.
The last piece of the puzzle is a wrapper task that launches the others:
import os
class Meetup(luigi.WrapperTask):
def run(self):
print("Running Meetup")
def requires(self):
key = os.environ['MEETUP_API_KEY']
lat = os.getenv('LAT', "51.5072")
lon = os.getenv('LON', "0.1275")
yield GroupsToCSV(key, lat, lon)
Now we’re ready to run the tasks:
$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup
DEBUG: Checking if Meetup() is complete
DEBUG: Checking if GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275) is complete
INFO: Informed scheduler that task Meetup__99914b932b has status PENDING
DEBUG: Checking if GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275) is complete
INFO: Informed scheduler that task GroupsToCSV_xxx_51_5072_0_1275_e07372cebf has status PENDING
INFO: Informed scheduler that task GroupsToJSON_xxx_51_5072_0_1275_e07372cebf has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task GroupsToJSON_xxx_51_5072_0_1275_e07372cebf has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
INFO: Running command: ./groups.sh /tmp/groups.json /tmp/groups.csv
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task GroupsToCSV_xxx_51_5072_0_1275_e07372cebf has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running Meetup()
Running Meetup
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done Meetup()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Meetup__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 3 ran successfully:
- 1 GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
- 1 GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
- 1 Meetup()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
Looks good! Let’s quickly look at our CSV file:
$ head -n10 /tmp/groups.csv
id,name,urlname,link,rating,created,description,organiserName,organiserMemberId
1114381,"London NoSQL, MySQL, Open Source Community","london-nosql-mysql","https://www.meetup.com/london-nosql-mysql/",4.28,1208505614000,"<p>Meet others in London interested in NoSQL, MySQL, and Open Source Databases.</p>
","Sinead Lawless",185675230
1561841,"Enterprise Search London Meetup","es-london","https://www.meetup.com/es-london/",4.66,1259157419000,"<p>Enterprise Search London is a meetup for anyone interested in building search and discovery experiences — from intranet search and site search, to advanced discovery applications and beyond.</p>
<p>Disclaimer: This meetup is NOT about SEO or search engine marketing.</p>
<p><strong>What people are saying:</strong></p>
<ul>
<li><span>""Join this meetup if you have a passion for enterprise search and user experience that you would like to share with other able-minded practitioners."" — Vegard Sandvold</span></li>
<li><span>""Full marks for vision and execution. Looking forward to the next Meetup."" — Martin White</span></li>
<li><span>“Consistently excellent” — Helen Lippell</span></li>
</ul>
Sweet! And what if we run it again?
$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup
DEBUG: Checking if Meetup() is complete
INFO: Informed scheduler that task Meetup__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=172768377, workers=1, host=Marks-MBP-4, username=markneedham, pid=4531) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
- 1 Meetup()
Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
As expected nothing happens since our dependencies are already satisfied and we have our first Luigi pipeline up and running.
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.