## Data plumbing with Luigi
Saturday, February 16, 2019

### A little about me
* I'm the Chief Maintiner of [Luigi](https://github.com/spotify/luigi).
* Currently working at Google in Zurich. (there is nothing about that in these slides)
* Arash Rouhani. [@Tarrasch](https://github.com/Tarrasch) on github.
### This talk
1. Introduction / what luigi can be used for.
2. Example code and running luigi.
3. How you can parallelize with luigi.
### What is luigi?
[github.com/Spotify/luigi](https://github.com/Spotify/luigi)

A task orchestrator made in house at Spotify.
Open sourced in 2012.
## Description
> Conceptually, Luigi is similar to GNU Make where you have certain
> tasks and these tasks in turn may have dependencies on other tasks.
> Luigi is not built specifically for Hadoop
> and it’s easy to extend it with other kinds of tasks.
> However, these roots still influce its design today.
Airflow is also in this space.
#### even has spinoffs: luiti, mario, sciluigi



## A dependency graph

Luigi does ETL well
## Example pipeline
A luigi Pipeline can be these steps:
* Extract: `wget` some file from internet to storage
* Extract: dump table `X` from database to storage
* Transform: process using say Apache Spark
* Transform: use legacy tool `Y` for refinement
* Load: Upload to database **Z**.
Then we can serve webpages/music recommendations/whatever using **Z**. (that is outside luigi)
In the world of code it could be like:
* `WgetWeatherReportTask`
* `GetWeatherDataSnapshotTask`
* `FilterWeatherDataTask`
* `PreprocessWithLegacyToolTask`
* `IngestToDatabaseTask`
In code, you write tasks like this:
```python
class WgetWeatherReportTask(luigi.Task): ...
```
## Task DSL
It is very much like GNU Make.
```python
class MyTask(luigi.Task):
some_parameter = luigi.Parameter(default="hello")
def complete(self):
return True or False
def requires(self):
return [TaskA(), TaskB(param='yay')]
def run(self):
print(self.some_parameter, 'world')
```
# Part 2. Running examples
## Small example
```python
import luigi
class HelloWorldTask(luigi.Task):
def run(self):
with self.output().open('w') as fd:
fd.write('Hello World')
def output(self):
return luigi.LocalTarget('hello.txt')
```
(default `complete()` checks for file `output()`)
Lets run it!
```bash
$ luigi --module helloworld HelloWorldTask --local-scheduler
...
INFO: Scheduled HelloWorldTask() (PENDING)
...
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 HelloWorldTask()
===== Luigi Execution Summary =====
$ cat hello.txt
Hello World
```
If we run luigi again, it won't run the task again.
```bash
$ luigi --module helloworld HelloWorldTask --local-scheduler
...
INFO: Scheduled HelloWorldTask() (DONE)
...
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
- 1 HelloWorldTask()
Did not run any tasks
===== Luigi Execution Summary =====
```
## A more complicated example
```python
class ParentTask(luigi.Task):
...
class ChildTask(luigi.Task):
my_param = luigi.Parameter()
def run(self):
with self.output().open('w') as fd:
fd.write(str(eval(self.my_param)))
time.sleep(30)
def output(self):
return luigi.LocalTarget('/tmp/presentation/' +
self.my_param)
```
```python
class ParentTask(luigi.Task):
def requires(self):
return [ChildTask(my_param='4+6'),
ChildTask(my_param='len("Hello world")')]
def output(self):
return luigi.LocalTarget(
'/tmp/presentation/final_result')
...
```
```python
class ParentTask(luigi.Task):
...
def run(self):
acc = 0
for f in self.input():
with f.open():
acc += int(f.read())
with self.output().open('w') as fd:
fd.write(str(acc))
```
On the scheduler this happens




It also failed on the client
```bash
$ PYTHONPATH=. ./bin/luigi --module hello ParentTask
...
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 2 ran successfully:
- 2 ChildTask(my_param=4+6,len("Hello world"))
* 1 failed:
- 1 ParentTask()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
```
And let's fix the bug ...
```python
class ParentTask(luigi.Task):
...
def run(self):
acc = 0
for target in self.input():
with target.open() as fd:
acc += int(fd.read())
with self.output().open('w') as fd:
fd.write(str(acc))
```
## Run it again
```bash
$ PYTHONPATH=. ./bin/luigi --module hello ParentTask
...
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 2 complete ones were encountered:
- 2 ChildTask(my_param=4+6,len("Hello world"))
* 1 ran successfully:
- 1 ParentTask()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
```
```bash
$ cat /tmp/presentation/final_result
21
```
# Part 3. The Scheduler
Thanks to the luigi scheduler, we make sure that we don't run things
unneccessarily and it part of what gives us parallelism.

The scheduler lives in the same repo and is also written in Python, but it can
be rewritten in anything since it communicates only using RPCs.
Main task of scheduler:
* Register tasks from luigi clients (called Workers)
* Give out tasks to run when a Worker ask for it.
Which translates to this API
* `add_task(task_id, worker_id, status)`
* `get_work(worker_id)`
Now let's look at how it can look like when two Workers want to do the same task `A`, and how they can parallelize with the luigi model.
The scheduler also implements:
* Virtual resource limits
* Setting cooldown for failed tasks.
* Disabling often failing tasks.
* and much more.