## Luigi Task Orchestrator
Wednesday, October 14, 2015
Budapest BI Forum 2015
### Who's this guy?
* Data Engineer at Spotify
* I write code most of the time, but not for the sake of it!!!!!
* Arash Rouhani. [@Tarrasch](https://github.com/Tarrasch) on github. Authored [zsh-bd](https://github.com/Tarrasch/zsh-bd)
### Spoti-what?
We stream the right music for the right moment.
75M+ active users. 30M+ songs. $3bn paid to rightsholders.
### This talk
1. Introduction to luigi + declarative programming
2. Building up an open source community
3. Story time - fun anecdote
### What is luigi?
![Luigi logo](img/luigi.png "luigi logo")
A task orchestrator made in house at Spotify.
Open sourced late 2012.
#### it even has spinoffs ;)
![Luiti logo](img/luiti_rectangle_logo.png)
![Mario logo](img/mario.png)
## A dependency graph
![Mario logo](img/dependency_graph.png)
## Task DSL
It is very much like GNU Make.
class MyTask(luigi.Task):
some_parameter = luigi.Parameter(default="hello")
def complete(self):
return True or False
def requires(self):
return [TaskA(), TaskB(param='yay')]
def run(self):
print self.some_parameter, 'world'
## Small example
import luigi
class HelloWorldTask(luigi.Task):
def run(self):
with self.output().open('w') as fd:
fd.write('Hello World')
def output(self):
return luigi.LocalTarget('hello.txt')
## Lets run it!
$ luigi --module helloworld HelloWorldTask --local-scheduler
INFO: Scheduled HelloWorldTask() (PENDING)
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 HelloWorldTask()
===== Luigi Execution Summary =====
$ cat hello.txt
Hello World
## And lets run it again!
$ luigi --module helloworld HelloWorldTask --local-scheduler
INFO: Scheduled HelloWorldTask() (DONE)
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
- 1 HelloWorldTask()
Did not run any tasks
===== Luigi Execution Summary =====
#### Does It feel declarative?
Kind of. Despite that class statements in python are *statements*. We've made
using luigi feel declarative.
This feels like a statement in python, not a declaration
class MyNewClass(object):
So how do one achieve this declarative feel?
class MyTask(luigi.Task):
def run(self): pass
def complete(self): pass
def requires(self): pass
``` bash
$ luigi --module my_tasks MyTask
We need to bring out our Python-fu!
By having a registry!
# from the luigi library source code:
class Task(object):
class Register(abc.ABCMeta):
def __new__(metacls, classname, bases, classdict):
""" Is called when a Task class is "declared". """
cls = super(Register, metacls).__new__(...)
return cls
# Building an Open Source Community
#### Current (1st oct) gh pages for orchestrators
## Observations
* Clearly, community is not built by just being an apache project.
* However, it flocks users and enterprise support!
* Both Cloudera and Hortonworks support Oozie and include it in their Hadoop
In Spotify, we've debated if we want to push Spotify Luigi to become Apache Luigi or not.
I guess that Luigi's community success comes from:
* Spotify being responsive in the email group and github PRs and issues
* It being hosted on GitHub
* The problem of orchestration is wide
* Luigi takes about 10 minutes to just try out.
* API is very intuitive.
## Significant contributions by non-Spotifiers
* 100% of the Web UI
* Most of the scheduling logic
* Edge-case handling in the worker
* Many storage and compute engine integrations
* AWS, GCP, Redshift, Elastic Search, SqlAlchemy etc.
# The Thanksgiving bug
story time!
### Reporting to labels at Spotify
When you listen to music on Spotify, we report this to labels and pay them.
![Money overview](img/overview.png)
### How client logs are stored in Spotify
When you listen to songs we save EndSong messages.
$ hadoop fs -ls /logs/EndSong/2015-08-27/06/
The files are split into multiple avro files.
### The `mv` command in unix
What does this do?
$ mkdir my_directcry
$ mv my_file.txt my_directory
### What luigi did
$ hadoop -jar mapreduce.jar -Poutput=output-luigi-tmp-123456
$ hadoop fs -mv output-luigi-tmp-123456 output
Now what can go wrong? ;)
### An awesome fix I did! [spotify/luigi#522](https://github.com/spotify/luigi/pull/522)
$ hadoop -jar mapreduce.jar -Poutput=output-luigi-tmp-123456
$ hadoop fs -mkdir output
$ hadoop fs -mv output-luigi-tmp-123456/* output
The trick is that `hadoop fs -mkdir output` will crash if folder exist. That's
good, so no other thread can create it. But...
Just a week or so after thanksgiving, a analyst came down to the Data
> Analyst: Hey, a label reported that they received only 80% of the expected
> revenue for last Thursday. We think there's a bug in your systems. If
> labels get pissed, we will all lose our jobs!
> Engineer: Well duuh, it was Thanksgiving last Thursday. Of course people listen
> to less music! And you're supposed to be the guys working with user behaviors.
### No animals were harmed
The "fix" was reverted in [spotify/luigi#557], and a proper fix was implemented
in [spotify/luigi#605]. We used a new file system primitive
with *rename-dont-move* semantics, also by Spotify in [spotify/snakebite#128].
[spotify/luigi#557]: https://github.com/spotify/luigi/pull/557
[spotify/luigi#605]: https://github.com/spotify/luigi/pull/605
[spotify/snakebite#128]: https://github.com/spotify/snakebite/pull/128
### Thanks for listening
[@Tarrasch](https://github.com/Tarrasch) on github