Skip to content

Apache Airflow (an Introduction)

Published: at 04:00 PM
Author: Sebastian Talamoni

Context and Introduction

Apache Airflow Concepts

Executors

Executors, as the name suggests, determine where and how your tasks are executed. Options range from local execution (LocalExecutor) to distributed execution across multiple machines (CeleryExecutor, KubernetesExecutor).

Tip : use LocalExecutor for DEV

For DEV mode you probably want to have a setup that “fits” into your notebook and do not depend on anything external. It also has the advantage that is multi-threaded so it will use your CPU wisely. This article is focusing on LocalExecutor so it’s important to understand that:

alt text

DAG, Task

This is important but i will not make my own definition, you can better read the official docs for DAGs and Tasks

XComs

Definition: XComs (short for “Cross-Communication”) are used in Apache Airflow to allow tasks within a DAG to exchange messages or data. They help in passing small pieces of information (like a variable or status) between tasks, ensuring that the workflow can be dynamic and responsive based on previous task outcomes.

- AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True 

Example of an XCom return value via UI for 1 task:

alt text

Note: It's quite common to start returning crazy big datasets on each task and this will give you not only error but also very flow UI since it's not meant for that. So, store large datasets in external storage like cloud services (Azure, AWS S3, GCS) or databases.

Dynamic tasks

I will write a separate article to go deeper on this topic since its really cool. My use case was that i had a table with “Countries” and i wanted to actually run “separate” tasks for each country without repeating all the code. This reduces code duplication and improves task scalability by dynamically generating task instances based on input parameters.

alt text

Dataset

Datasets in Apache Airflow are a structured way to define, track, and manage data dependencies between tasks and DAGs. They represent a specific data entity (e.g., a file, a database table) and allow tasks to explicitly declare their relationship to it, either as a producer (updating/creating the dataset) or a consumer (reading/using the dataset). This improves observability and makes it easier to manage complex data workflows.

What’s nice about this is mainly the traceability and Data Lineage , this gets done basically by:

Example: task “produces” a dataset

You need to define you task using “outlets” (producer) and use the dataset content as return value of your function.

    @task(outlets=[Dataset("News_RSS_active_feeds")])
    def read_feeds():
        ...
        return rss_feeds

alt text

Practical Guide

Below some practical tips in random order ;)

Share your datasets via storage (Azure Data Lake Storage Gen2)

I found this to be the most practical way to share big datasets. This integrates also very nicely with Azure Fabric ( more to come later ) What i found interesting for the return value then is to have a small json report alt text

Use the terminal to check/temp install packages

note: if you only want to do a small test and are not really sure about using it, don’t forget you can always connect to the terminal remotely and install install via pip This is obviously a temporal solution, will be lost.

alt text

How to install python package when using LocalExecutor?

  scheduler:
    image: apache/airflow:2.10.2
    restart: always
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://{YourUser}:{YourPassword}@postgres/airflow
      - AIRFLOW__CORE__FERNET_KEY="{YourKey}"
      - AIRFLOW__DATASETS__ENABLE_DATASET_AWARE_SCHEDULING=True
      - _PIP_ADDITIONAL_REQUIREMENTS=apache-airflow-providers-mongo feedparser <---- Add it here 
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    depends_on:
      - postgres
    command: "airflow scheduler"  

Note: As mentioned in my into, this is going to be slow since it gets install after each restart but perfect for my DEV/test case. In production, it’s better to build a custom Docker image with these dependencies pre-installed to avoid installation overhead

Tip: if odbc driver missing check 1st the ones that are installed

Further readings (and final thoughts)


Previous Post
Playwright parallelism
Next Post
MongoDB Relational Migrator tool