## Data plumbing with Luigi Saturday, February 16, 2019 ![Pycon Belarus logo](img/logo_pycon_color.png)
### A little about me * I'm the Chief Maintiner of [Luigi](https://github.com/spotify/luigi). * Currently working at Google in Zurich. (there is nothing about that in these slides) * Arash Rouhani. [@Tarrasch](https://github.com/Tarrasch) on github.
### This talk 1. Introduction / what luigi can be used for. 2. Example code and running luigi. 3. How you can parallelize with luigi.
# Introduction to luigi
### What is luigi? [github.com/Spotify/luigi](https://github.com/Spotify/luigi) ![Luigi logo](img/luigi.png "luigi logo") A task orchestrator made in house at Spotify. Open sourced in 2012.
## Description > Conceptually, Luigi is similar to GNU Make where you have certain > tasks and these tasks in turn may have dependencies on other tasks. > Luigi is not built specifically for Hadoop > and it’s easy to extend it with other kinds of tasks. > However, these roots still influce its design today. Airflow is also in this space.
#### even has spinoffs: luiti, mario, sciluigi ![Luiti logo](img/luiti_rectangle_logo.png) ![Mario logo](img/mario.png) ![SciLuigi logo](img/sciluigi.png)
## A dependency graph ![Dependency graph](img/dependency_graph.png)
Luigi does ETL well Etl Pipeline
## Example pipeline A luigi Pipeline can be these steps: * Extract: `wget` some file from internet to storage * Extract: dump table `X` from database to storage * Transform: process using say Apache Spark * Transform: use legacy tool `Y` for refinement * Load: Upload to database **Z**. Then we can serve webpages/music recommendations/whatever using **Z**. (that is outside luigi)
In the world of code it could be like: * `WgetWeatherReportTask` * `GetWeatherDataSnapshotTask` * `FilterWeatherDataTask` * `PreprocessWithLegacyToolTask` * `IngestToDatabaseTask` In code, you write tasks like this: ```python class WgetWeatherReportTask(luigi.Task): ... ```
## Task DSL It is very much like GNU Make. ```python class MyTask(luigi.Task): some_parameter = luigi.Parameter(default="hello") def complete(self): return True or False def requires(self): return [TaskA(), TaskB(param='yay')] def run(self): print(self.some_parameter, 'world') ```
# Part 2. Running examples
## Small example ```python import luigi class HelloWorldTask(luigi.Task): def run(self): with self.output().open('w') as fd: fd.write('Hello World') def output(self): return luigi.LocalTarget('hello.txt') ``` (default `complete()` checks for file `output()`)
Lets run it! ```bash $ luigi --module helloworld HelloWorldTask --local-scheduler ... INFO: Scheduled HelloWorldTask() (PENDING) ... ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 HelloWorldTask() ===== Luigi Execution Summary ===== $ cat hello.txt Hello World ```
If we run luigi again, it won't run the task again. ```bash $ luigi --module helloworld HelloWorldTask --local-scheduler ... INFO: Scheduled HelloWorldTask() (DONE) ... INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 present dependencies were encountered: - 1 HelloWorldTask() Did not run any tasks ===== Luigi Execution Summary ===== ```
## A more complicated example ```python class ParentTask(luigi.Task): ... class ChildTask(luigi.Task): my_param = luigi.Parameter() def run(self): with self.output().open('w') as fd: fd.write(str(eval(self.my_param))) time.sleep(30) def output(self): return luigi.LocalTarget('/tmp/presentation/' + self.my_param) ```
```python class ParentTask(luigi.Task): def requires(self): return [ChildTask(my_param='4+6'), ChildTask(my_param='len("Hello world")')] def output(self): return luigi.LocalTarget( '/tmp/presentation/final_result') ... ```
```python class ParentTask(luigi.Task): ... def run(self): acc = 0 for f in self.input(): with f.open(): acc += int(f.read()) with self.output().open('w') as fd: fd.write(str(acc)) ```
On the scheduler this happens ![Scheduler screenshot](img/sched1.png)
![Scheduler screenshot](img/sched2.png)
![Scheduler screenshot](img/sched3.png)
![Scheduler screenshot](img/sched4.png)
It also failed on the client ```bash $ PYTHONPATH=. ./bin/luigi --module hello ParentTask ... ===== Luigi Execution Summary ===== Scheduled 3 tasks of which: * 2 ran successfully: - 2 ChildTask(my_param=4+6,len("Hello world")) * 1 failed: - 1 ParentTask() This progress looks :( because there were failed tasks ===== Luigi Execution Summary ===== ```
And let's fix the bug ... ```python class ParentTask(luigi.Task): ... def run(self): acc = 0 for target in self.input(): with target.open() as fd: acc += int(fd.read()) with self.output().open('w') as fd: fd.write(str(acc)) ```
## Run it again ```bash $ PYTHONPATH=. ./bin/luigi --module hello ParentTask ... ===== Luigi Execution Summary ===== Scheduled 3 tasks of which: * 2 complete ones were encountered: - 2 ChildTask(my_param=4+6,len("Hello world")) * 1 ran successfully: - 1 ParentTask() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary ===== ``` ```bash $ cat /tmp/presentation/final_result 21 ```
# Part 3. The Scheduler Thanks to the luigi scheduler, we make sure that we don't run things unneccessarily and it part of what gives us parallelism.
![Scheduler screenshot](img/visualiser_front_page.png)
The scheduler lives in the same repo and is also written in Python, but it can be rewritten in anything since it communicates only using RPCs.
Main task of scheduler: * Register tasks from luigi clients (called Workers) * Give out tasks to run when a Worker ask for it.
Which translates to this API * `add_task(task_id, worker_id, status)` * `get_work(worker_id)` Now let's look at how it can look like when two Workers want to do the same task `A`, and how they can parallelize with the luigi model.
The scheduler also implements: * Virtual resource limits * Setting cooldown for failed tasks. * Disabling often failing tasks. * and much more.
# Questions?