## Data plumbing with Luigi
Saturday, February 16, 2019
![Pycon Belarus logo](img/logo_pycon_color.png)
### A little about me
* I'm the Chief Maintiner of [Luigi](https://github.com/spotify/luigi).
* Currently working at Google in Zurich. (there is nothing about that in these slides)
* Arash Rouhani. [@Tarrasch](https://github.com/Tarrasch) on github.
### This talk
1. Introduction / what luigi can be used for.
2. Example code and running luigi.
3. How you can parallelize with luigi.
### What is luigi?
[github.com/Spotify/luigi](https://github.com/Spotify/luigi)
![Luigi logo](img/luigi.png "luigi logo")
A task orchestrator made in house at Spotify.
Open sourced in 2012.
## Description
> Conceptually, Luigi is similar to GNU Make where you have certain
> tasks and these tasks in turn may have dependencies on other tasks.
> Luigi is not built specifically for Hadoop
> and it’s easy to extend it with other kinds of tasks.
> However, these roots still influce its design today.
Airflow is also in this space.
#### even has spinoffs: luiti, mario, sciluigi
![Luiti logo](img/luiti_rectangle_logo.png)
![Mario logo](img/mario.png)
![SciLuigi logo](img/sciluigi.png)
## A dependency graph
![Dependency graph](img/dependency_graph.png)
Luigi does ETL well
## Example pipeline
A luigi Pipeline can be these steps:
* Extract: `wget` some file from internet to storage
* Extract: dump table `X` from database to storage
* Transform: process using say Apache Spark
* Transform: use legacy tool `Y` for refinement
* Load: Upload to database **Z**.
Then we can serve webpages/music recommendations/whatever using **Z**. (that is outside luigi)
In the world of code it could be like:
* `WgetWeatherReportTask`
* `GetWeatherDataSnapshotTask`
* `FilterWeatherDataTask`
* `PreprocessWithLegacyToolTask`
* `IngestToDatabaseTask`
In code, you write tasks like this:
```python
class WgetWeatherReportTask(luigi.Task): ...
```
## Task DSL
It is very much like GNU Make.
```python
class MyTask(luigi.Task):
some_parameter = luigi.Parameter(default="hello")
def complete(self):
return True or False
def requires(self):
return [TaskA(), TaskB(param='yay')]
def run(self):
print(self.some_parameter, 'world')
```
# Part 2. Running examples
## Small example
```python
import luigi
class HelloWorldTask(luigi.Task):
def run(self):
with self.output().open('w') as fd:
fd.write('Hello World')
def output(self):
return luigi.LocalTarget('hello.txt')
```
(default `complete()` checks for file `output()`)
Lets run it!
```bash
$ luigi --module helloworld HelloWorldTask --local-scheduler
...
INFO: Scheduled HelloWorldTask() (PENDING)
...
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 HelloWorldTask()
===== Luigi Execution Summary =====
$ cat hello.txt
Hello World
```
If we run luigi again, it won't run the task again.
```bash
$ luigi --module helloworld HelloWorldTask --local-scheduler
...
INFO: Scheduled HelloWorldTask() (DONE)
...
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
- 1 HelloWorldTask()
Did not run any tasks
===== Luigi Execution Summary =====
```
## A more complicated example
```python
class ParentTask(luigi.Task):
...
class ChildTask(luigi.Task):
my_param = luigi.Parameter()
def run(self):
with self.output().open('w') as fd:
fd.write(str(eval(self.my_param)))
time.sleep(30)
def output(self):
return luigi.LocalTarget('/tmp/presentation/' +
self.my_param)
```
```python
class ParentTask(luigi.Task):
def requires(self):
return [ChildTask(my_param='4+6'),
ChildTask(my_param='len("Hello world")')]
def output(self):
return luigi.LocalTarget(
'/tmp/presentation/final_result')
...
```
```python
class ParentTask(luigi.Task):
...
def run(self):
acc = 0
for f in self.input():
with f.open():
acc += int(f.read())
with self.output().open('w') as fd:
fd.write(str(acc))
```
On the scheduler this happens
![Scheduler screenshot](img/sched1.png)
![Scheduler screenshot](img/sched2.png)
![Scheduler screenshot](img/sched3.png)
![Scheduler screenshot](img/sched4.png)
It also failed on the client
```bash
$ PYTHONPATH=. ./bin/luigi --module hello ParentTask
...
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 2 ran successfully:
- 2 ChildTask(my_param=4+6,len("Hello world"))
* 1 failed:
- 1 ParentTask()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
```
And let's fix the bug ...
```python
class ParentTask(luigi.Task):
...
def run(self):
acc = 0
for target in self.input():
with target.open() as fd:
acc += int(fd.read())
with self.output().open('w') as fd:
fd.write(str(acc))
```
## Run it again
```bash
$ PYTHONPATH=. ./bin/luigi --module hello ParentTask
...
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 2 complete ones were encountered:
- 2 ChildTask(my_param=4+6,len("Hello world"))
* 1 ran successfully:
- 1 ParentTask()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
```
```bash
$ cat /tmp/presentation/final_result
21
```
# Part 3. The Scheduler
Thanks to the luigi scheduler, we make sure that we don't run things
unneccessarily and it part of what gives us parallelism.
![Scheduler screenshot](img/visualiser_front_page.png)
The scheduler lives in the same repo and is also written in Python, but it can
be rewritten in anything since it communicates only using RPCs.
Main task of scheduler:
* Register tasks from luigi clients (called Workers)
* Give out tasks to run when a Worker ask for it.
Which translates to this API
* `add_task(task_id, worker_id, status)`
* `get_work(worker_id)`
Now let's look at how it can look like when two Workers want to do the same task `A`, and how they can parallelize with the luigi model.
The scheduler also implements:
* Virtual resource limits
* Setting cooldown for failed tasks.
* Disabling often failing tasks.
* and much more.