# Day4 - ML Pipelines

## Pipeline

It is a declarative description of a workflow or process, typically in code or configuration, that specifies:

- What tasks should be run
- How they are connected or ordered
- What inputs and outputs are involved
- Any conditions, retries, or resources needed

## Orchestrator

An orchestrator is a system or tool that manages and coordinates the execution of complex workflows or tasks, especially when those tasks involve multiple steps, dependencies, and resources.

## Comparison Between Popular ML Pipelines Orchestrators

| Feature         | **Dagster**                                       | **Airflow**                    | **Kubeflow**           | **Prefect**                      | **Metaflow**                         |
| ------------------------- | ------------------------------------------------- | ------------------------------ | -------------------------------- | -------------------------------- | ------------------------------------ |
| **Primary Language**      | Python                                            | Python                         | Python                           | Python                           | Python                               |
| **Designed For**          | Data & ML pipelines                               | General workflow orchestration | ML pipelines on Kubernetes       | General workflow orchestration   | ML pipelines, prototyping to prod    |
| **ML Native Features**    | ✅ Strong ML support (IO management, type systems) | ❌ Limited ML support           | ✅ Tight integration with TF, K8s | ⚠️ Minimal built-in ML features  | ✅ ML-focused abstractions            |
| **Kubernetes Native**     | ✅ (via Dagster K8s executor)                      | ✅ (with Helm, K8sExecutor)     | ✅ Fully K8s-native               | ✅ Optional                       | ✅ Optional                           |
| **Local Dev Experience**  | ✅ Very good (CLI & UI)                            | ⚠️ Okay but clunky             | ❌ Heavyweight (needs K8s)        | ✅ Excellent (easy local → cloud) | ✅ Excellent (local-first)            |
| **UI / Observability**    | ✅ Excellent UI & asset tracking                   | ✅ Basic but mature             | ✅ Full UI                        | ✅ Good (flow run UI)             | ✅ Great (incl. lineage, retry)       |
| **Type Safety / IO Mgmt** | ✅ Strong typing & asset materialization           | ❌ Minimal support              | ⚠️ Basic through component specs | ⚠️ Basic typing                  | ✅ Simple but effective               |
| **Data Lineage**          | ✅ First-class asset lineage                       | ⚠️ Custom plugins needed       | ✅ via ML Metadata store          | ⚠️ Partial                       | ✅ Built-in                           |
| **Execution Flexibility** | ✅ Local, multiprocess, K8s, etc.                  | ✅ Executors: Celery, K8s, etc. | ❌ K8s only                       | ✅ Cloud or local agents          | ✅ AWS, K8s, local                    |
| **Extensibility**         | ✅ Modular, Pythonic design                        | ✅ Strong DAG customization     | ⚠️ Custom container components   | ✅ Flows as Python code           | ✅ Highly extensible Python code      |
| **Community / Maturity**  | ⭐⭐ Growing fast                                   | ⭐⭐⭐ Very mature (but older)    | ⭐⭐ Kubernetes/Google centric     | ⭐⭐ Fast-growing                  | ⭐⭐ Strong in enterprise/data science |
| **Best Use Case**         | ML + data pipelines, asset-driven                 | ETL, batch jobs                | Large-scale ML on K8s            | Lightweight orchestration        | ML workflows from notebook to prod   |

🟦 **Dagster**

- Pros: Modern, type-safe, asset-centric, good dev UX, great observability.
- Cons: Learning curve around asset concepts.
- Best for: ML teams looking for data lineage and strong development ergonomics.

🟩 **Airflow**

- Pros: Battle-tested, huge ecosystem, flexible.
- Cons: DSL is clunky for ML; weak typing; hard to trace ML artifacts.
- Best for: Traditional ETL and teams with existing Airflow setups.

🟥 **Kubeflow**

- Pros: Cloud-native, scalable, integrates well with K8s and TensorFlow ecosystem.
- Cons: Complex setup, Kubernetes-only, poor local dev UX.
- Best for: Teams deploying ML at scale on Kubernetes.

🟨 **Prefect**

- Pros: Simple, Pythonic, cloud-native agents, strong developer experience.
- Cons: Not ML-specific; less focus on artifact tracking.
- Best for: Lightweight workflows, dataops, hybrid cloud/local orchestration.

🟪 **Metaflow**

- Pros: Very ML-friendly, notebook integration, supports branching, versioning, retries.
- Cons: Less customizable for general workflows.
- Best for: ML teams needing reproducibility from notebook → prod.

## Dagster

![**Dagster**](../imgs/dagster.png)

An open-source data orchestrator for ML, analytics and (Extract, Transform, Load) ETL. It enables to build, run and monitor complex data pipelines.
Dagster offers:

- **Declarative** pipeline definitions (data dependencies and configuration).
- **Type-safe** operations.
- Native support for **assets**, **schedules**, and **sensors**.
- **Integration** with popular data tools (e.g., dbt, Spark, MLFlow).

### Core Concepts

| Concept               | Description                                      |
| --------------------- | ------------------------------------------------ |
| `@op`                 | A function that performs a unit of work          |
| `@job`                | A directed graph of `op`s                        |
| `@asset`              | A first-class, versioned data product            |
| `Graph`               | A reusable composition of ops                    |
| `Resource`            | External dependency like S3, DB, API             |
| `Sensor` / `Schedule` | Triggers jobs by event/time                      |

## Getting Started

- Install Dagster:

  ```powershell
  pip install dagster dagit
  ```

- Initialize a new Dagster project:

  ```powershell
  dagster project scaffold --name dagster_tutorial
  cd dagster_tutorial
  ```

- Run the Dagster development server:

  ```powershell
  dagster dev
  ```

- Open the Dagit UI in your browser at `http://localhost:3000`.

- Create a new file `ops.py` in the subdirectory `dagster_tutorial` and add the following code:

  ```python
  from dagster import op
  @op
  def get_numbers():
      return [1, 2, 3]
  @op
  def multiply(numbers):
      return [x * 10 for x in numbers]
  ```

- Create a new file `jobs.py` in the subdirectory `dagster_tutorial` and add the following code:

  ```python
  from dagster import job
  from .ops import get_numbers, multiply
  @job
  def process_job():
      multiply(get_numbers())
  ```

- In the `definitions.py` file, import the job and add it to the repository:

  ```python
  from dagster import Definitions, load_assets_from_modules
  from dagster_tutorial import assets  # noqa: TID252
  from dagster_tutorial.jobs import process_job
  all_assets = load_assets_from_modules([assets])
  defs = Definitions(
      assets=all_assets,
      jobs=[process_job],
  )
  ```

- Modify the `multiply` function in `ops.py` to get runtime config:

  ```python
  from typing import List
  from dagster import op, Config
  class MultiplyConfig(Config):
      factor: int
  @op
  def multiply(config:MultiplyConfig,numbers:List[int]):
      return [x * config.factor for x in numbers]
  ```

- In the launchpad, you can now run the `process_job` with a configuration:

  ```yaml
  ops:
    multiply:
      config:
        factor: 10
  ```

- You can enable logging in your Dagster project by adding this anywhere you want to add logging:

  ```python
  from dagster import get_dagster_logger
  logger = get_dagster_logger()
  logger.info("This is an info message")
  ```

- You can also use `assets` to define, persist and version your data products. For example, you can create a new file `assets.py` in the subdirectory `dagster_tutorial` and add the following code:

  ```python
  from dagster import asset
  @asset
  def raw_data():
      return [1, 2, 3, 4]
  @asset
  def squared_data(raw_data):
      return [x**2 for x in raw_data]
  ```

- You can add a scheduler to run a job at a specific time. To do that, add the following code to the `definitions.py` file:

  ```python
  from dagster import ScheduleDefinition
  hourly_schedule = ScheduleDefinition(
      job=process_job,
      cron_schedule="0 * * * *",  # Every hour
  )
  defs = Definitions(
    assets=all_assets,
    jobs=[process_job],
    schedules=[hourly_schedule],

  )
  ```

## Mlflow

![**Mlflow**](../imgs/mlflow.png)

- Open-source platform for managing the ML lifecycle, including experimentation, reproducibility, and deployment.
- It provides a central repository for tracking experiments, packaging code into reproducible runs, and sharing and deploying models.
- It has four main components:
  - **Tracking**: Log and query experiments.
  - **Projects**: Package code in a reusable and reproducible way.
  - **Models**: Manage and deploy models from various ML libraries.
  - **Registry**: Store and manage models in a central repository.

- Dagster can be used to orchestrate ML workflows and integrate with MLflow for tracking experiments and managing models.
- You can use Dagster to define and run ML pipelines, and use MLflow to log and track experiments, models, and artifacts.
- You can use Dagster's `@op` decorator to define MLflow operations, and use MLflow's Python API to log and track experiments.
- You can use Dagster's `@job` decorator to define MLflow jobs, and use MLflow's Python API to log and track experiments.
- You can use Dagster's `@asset` decorator to define MLflow assets, and use MLflow's Python API to log and track experiments.
- You can use Dagster's `@schedule` decorator to define MLflow schedules, and use MLflow's Python API to log and track experiments.
- You can use Dagster's `@sensor` decorator to define MLflow sensors, and use MLflow's Python API to log and track experiments.

To test MLFlow access, you can run the following python code:

```python
import os
from dotenv import load_dotenv
import mlflow

load_dotenv("../../.env")

MLFLOW_SERVER_URL = os.getenv("MLFLOW_SERVER_URL", "http://localhost:5000")
MLFLOW_TRACKING_USERNAME = os.getenv("MLFLOW_TRACKING_USERNAME")
MLFLOW_TRACKING_PASSWORD = os.getenv("MLFLOW_TRACKING_PASSWORD")

MY_PREFIX = "mohanad-experiment"
os.environ["MLFLOW_TRACKING_USERNAME"] = MLFLOW_TRACKING_USERNAME
os.environ["MLFLOW_TRACKING_PASSWORD"] = MLFLOW_TRACKING_PASSWORD


mlflow.set_tracking_uri(MLFLOW_SERVER_URL)
mlflow.set_experiment(f"/{MY_PREFIX}/classification")
with mlflow.start_run():
    mlflow.log_metric("metric1", 1.0)

```

## Dagster + Xarray + Dask to Train ERA5 Forecasting Model

You can run the advanced Dagster project `era5_forecast` that integrates `xarray`, `Dask` with `Dagster`.

### Instructions

1- Create the dagster project:

```powershell
dagster project scaffold -n era5_forecast
cd era5_forecast
```

2- Assuming the dependencies are installed, run the dagster server:

```powershell
dagster dev
```

3- Open the Dagit UI in your browser at `http://localhost:3000`.
4- On jobs tab, select the `era5_forecast` job and click on the launchpad button to run it.

5- Open the Dask UI in your browser at `http://localhost:8787/status`.

`ops.py`

```{literalinclude} era5_forecast/era5_forecast/ops.py
:language: python
:linenos:
```

`resources.py`

```{literalinclude} era5_forecast/era5_forecast/resources.py
:language: python
:linenos:
```

`jobs.py`

```{literalinclude} era5_forecast/era5_forecast/jobs.py
:language: python
:linenos:
```

`definitions.py`

```{literalinclude} era5_forecast/era5_forecast/definitions.py
:language: python
:linenos:
```

## Dagster + Stackstac + Dask + MLflow to Train Sentinel-2 Land Cover Classification Model

You can run the advanced Dagster project `esa_worldcover_classification` that integrates `xarray`, `Dask` with `Dagster` and `MLflow`.

:::{note}
The ESA WorldCover dataset is a global land cover map produced by the European Space Agency, offering 10-meter resolution classification based on Sentinel-1 and Sentinel-2 satellite imagery. Released in 2021, it provides detailed and consistent information on land cover types such as forests, croplands, urban areas, and water bodies. Designed to support environmental monitoring, climate change studies, and sustainable land management, WorldCover is freely accessible and regularly updated, making it a valuable resource for researchers, policymakers, and Earth observation applications worldwide.
The ESA WorldCover dataset includes 11 land cover classes, based on the UN FAO Land Cover Classification System (LCCS). Here’s the list of classes:
:::

| Class ID | Land Cover Class         |
| -------- | ------------------------ |
| 10       | Tree cover               |
| 20       | Shrubland                |
| 30       | Grassland                |
| 40       | Cropland                 |
| 50       | Built-up                 |
| 60       | Bare / sparse vegetation |
| 70       | Snow and ice             |
| 80       | Permanent water bodies   |
| 90       | Herbaceous wetland       |
| 95       | Mangroves                |
| 100      | Moss and lichen          |

[WorldCover Documentation](https://worldcover2020.esa.int/data/docs/WorldCover_PUM_V1.1.pdf)

### Instructions

1- Run the mlflow server:

```powershell
mlflow server \
  --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root ./mlruns \
  --host 0.0.0.0 \
  --port 8000
```

2- Create the dagster project:

```powershell
dagster project scaffold -n eas_worldcover_classification
cd eas_worldcover_classification
```

3- Assuming the dependencies are installed, run the dagster server:

```powershell
dagster dev
```

4- Open the Dagit UI in your browser at `http://localhost:3000`.

5- On jobs tab, select the `esa_worldcover_classification` job and click on the launchpad button to run it. Use the following configuration:

:::{note}
Change the mlflow experiment name with your name, e.g. `/mohanad_s2_classification`. Make sure to have the mlflow server URL, username and password set in the `.env` file.
:::

```yaml
ops:
  fetch_s2_stack:
    config:
      bbox: [21.0, 38.0, 21.5, 38.5]
      time_range: "2020-01-01/2020-01-31"

  fetch_worldcover_stack:
    config:
      bbox: [21.0, 38.0, 21.5, 38.5]
      time_range: "2020-01-01/2020-01-31"

  save_to_zarr:
    config:
      zarr_cache_dir: "cache"

  train_unet:
    config:
      patch_size: 64
      stride: 32
      batch_size: 16
      model: "unet"
      epochs: 1
      learning_rate: 0.001
      loss: "cross_entropy"
      num_workers: 4
      in_channels: 5
      out_classes: 4
      mlflow_tracking_uri: null
      mlflow_experiment_name: "s2_classification"
      model_path: "models/unet_model.pth"
      zarr_cache_dir: "cache"
      device: "mps"
```

6- Open the MLflow UI in your browser at `http://localhost:8000`.

`ops.py`

```{literalinclude} esa_worldcover_classification/esa_worldcover_classification/ops.py
:language: python
:linenos:
```

`model.py`

```{literalinclude} esa_worldcover_classification/esa_worldcover_classification/model.py
:language: python
:linenos:
```

`dataset.py`

```{literalinclude} esa_worldcover_classification/esa_worldcover_classification/dataset.py
:language: python
:linenos:
```

`train.py`

```{literalinclude} esa_worldcover_classification/esa_worldcover_classification/train.py
:language: python
:linenos:
```

`utils.py`

```{literalinclude} esa_worldcover_classification/esa_worldcover_classification/utils.py
:language: python
:linenos:
```

`jobs.py`

```{literalinclude} esa_worldcover_classification/esa_worldcover_classification/jobs.py
:language: python
:linenos:
```

`definitions.py`

```{literalinclude} esa_worldcover_classification/esa_worldcover_classification/definitions.py
:language: python
:linenos:
```
