Mastering Apache Airflow: Building and Managing Apache Airflow Workflows

Overview

Apache Airflow is a powerful open-source platform for orchestrating workflows and managing data pipelines. Used extensively in data engineering, Airflow allows teams to schedule, monitor, and organize complex workflows using Python code. This guide provides an in-depth look at Airflow, from basic concepts to practical examples, to help you effectively use Airflow in real-world scenarios.

Use Cases

Apache Airflow was designed to solve the problem of orchestrating complex workflows, often with dependencies between tasks and varied execution schedules. Written in Python, Airflow enables users to:

  • Design workflows as Directed Acyclic Graphs (DAGs)
  • Schedule and monitor workflows
  • Easily visualize task dependencies and execution states

From the use cases of Apache Airflow we can find:

  • Data Engineering: Data extraction, transformation, and loading (ETL) pipelines
  • Machine Learning: Training, validation, and deployment of models
  • Reporting: Automating report generation and distribution

Key Advantages

Many teams chose Airflow over other orchestration engine because of it's:
  • Scalability: Easily manages workflows with dozens or even hundreds of tasks.
  • Extensibility: Integrates well with numerous databases, cloud services, and APIs.
  • Flexibility: Write workflows in Python, allowing for custom logic within each task.

Core Concepts

Understanding Airflow's core components is essential for creating effective pipelines. Here are some key concepts:

  • DAG (Directed Acyclic Graph): A DAG is a collection of tasks arranged in a way that defines their relationships and dependencies. It represents the workflow in Airflow, where each node is a task, and directed edges show dependencies.
  • Task: A task is a single unit of work. In Airflow, tasks are represented as nodes within a DAG and can include various operations, such as data transformation or database queries.
  • Operator: Operators are specific types of tasks. Examples include: PythonOperator, BashOperator and MySqlOperator...
  • Scheduler: The Scheduler is responsible for triggering tasks based on the schedule defined within each DAG. It continually monitors for any tasks that are ready to be executed.
  • Executor: Executors determine how and where tasks run. Popular executors include the LocalExecutor (for running locally) and CeleryExecutor (for distributed execution across multiple workers).

Building workflows using Airflow

Setting Up Airflow

To start using Airflow, let's set up a local environment and install the necessary components. Pip installation is the only officially supported. For demonstration purposes, we'll use Airflow 2.5.0.

1
2pip install apache-airflow==2.5.0
3

To start Airflow you can use the standalone command or run the command manually to initializes the database, creates a user, and starts all components.

1
2# Start Airflow with one command
3airflow standalone
4
5
6# Start Airflow with individual commands
7airflow db migrate
8
9airflow users create --username guru --firstname Workflows --lastname Guru --role Admin --email guru@workflows.guru
10
11airflow webserver --port 8080
12
13airflow scheduler
14

To access the web interface navigate to `http://localhost:8080`. The web UI provides visibility into DAGs, tasks, and their status.

Building Your First DAG

Let's create a simple DAG with two tasks: fetching data from an API and processing that data. Start by creating a new Python file under dags/, for example, example_dag.py. This file will contain the DAG's configuration and tasks.

1
2from airflow import DAG
3from airflow.operators.python import PythonOperator
4from datetime import datetime, timedelta
5
6
7# Define the default arguments
8default_args = {
9 'owner': 'airflow',
10 'retries': 1,
11 'retry_delay': timedelta(minutes=5),
12 'start_date': datetime(2023, 1, 1),
13}
14
15# Define the DAG
16with DAG(
17 'example_dag',
18 default_args=default_args,
19 schedule_interval='@daily',
20 catchup=False,
21) as dag:
22
23 # Task 1: Fetch data
24 def fetch_data():
25 # Here you would add code to fetch data, e.g., from an API
26 print("Fetching data…")
27
28 fetch_data_task = PythonOperator(
29 task_id='fetch_data_task',
30 python_callable=fetch_data,
31 )
32
33 # Task 2: Process data
34 def process_data():
35 # Add data processing code here
36 print("Processing data…")
37
38 process_data_task = PythonOperator(
39 task_id='process_data_task',
40 python_callable=process_data,
41 )
42
43# Set task dependencies
44fetch_data_task >> process_data_task
45

The DAG is scheduled to run daily and starts from a specific date. `fetch_data` and `process_data` are defined as Python functions and wrapped into `PythonOperator` tasks. `fetch_data_task` must complete before `process_data_task` begins.

Once the DAG file is saved, you can trigger it through the web interface or with the command line.

XCom for Data Passing

In the previous DAG, fetch_data and process_data are independent, if we want to use the fetched data from the first Task in the process_data task we should use XCom. (short for cross-communication allows tasks to share data) The example below show how to use data sharing between tasks:

1
2
3def fetch_data(**context):
4 context['ti'].xcom_push("fetched_data", {"data: "my_data"})
5
6def process_data(**context):
7 value = context['ti'].xcom_pull(task_ids="process_data_task", key="fetched_data")
8 print("Pulled value:", value)
9
10fetch_data_task = PythonOperator(
11 task_id='fetch_data_task',
12 python_callable=fetch_data,
13 provide_context=True,
14)
15
16process_data_task = PythonOperator(
17 task_id='process_data_task',
18 python_callable=process_data,
19 provide_context=True,
20)
21
22fetch_data_task >> process_data_task
23

As we see here, fetch_data use xcom_push to push data to the XCom, and in the process data task, we use xcom_pull and the same key to get the data passed from the previous task.

Task Dependencies

Airflow allows you to define complex dependencies and conditional paths. In the example below we use BranchPythonOperator to dynamically chooses which path the DAG will follow based on the condition in choose_branch function.

1
2from airflow.operators.dummy import DummyOperator
3from airflow.operators.python import BranchPythonOperator
4
5# Define branching condition
6def choose_branch():
7 return 'branch_1' if some_condition else 'branch_2'
8
9branch_task = BranchPythonOperator(
10 task_id='branch_task',
11 python_callable=choose_branch,
12)
13
14branch_1 = DummyOperator(task_id='branch_1')
15branch_2 = DummyOperator(task_id='branch_2')
16
17# Set up dependencies
18branch_task >> [branch_1, branch_2]
19

Using TaskFlow

In the previous example, we used the traditional and complexe way to declare DAGs and pass data between tasks, starting from Airflow 2.0 TaskFlow was untroduced. Using TaskFlow the previous DAG could be represented as follow:

1
2import pendulum
3
4from airflow.decorators import dag, task
5
6
7@dag(
8 schedule=None,
9 start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
10 catchup=False,
11 tags=["example"],
12)
13def example_dag()
14
15 @task()
16 def fetch_data():
17 # Fetch data, e.g., from an API
18 print("Fetching data…")
19 return {"data": "data"}
20
21 @task()
22 def process_data(fetched_data: dict):
23 print("Processing data…", fetched_data)
24 return {"processed_data": "processed"}
25
26fetched_data = fetch_data()
27
28processed_data = process_data(fetched_data)
29
30example_dag()
31

We have invoked the fetch data task, obtained the data from there and sent it over to the process data task. The dependencies between the tasks and the passing of data between these tasks which could be running on different workers on different nodes on the network is all handled by Airflow. All of the XCom usage for data passing between these tasks is abstracted away from the DAG author in Airflow 2.0. However, XCom variables are used behind the scenes and can be viewed using the Airflow UI as necessary for debugging or DAG monitoring. Similarly, task dependencies are automatically generated within TaskFlows based on the functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly created and dependencies specified as shown below.

Deep dive into Airflow executors

In Airflow's architecture, executors determine how tasks within DAGs are executed. Airflow support many type of executor, for example, the SequentialExecutor, This is the default executor, it executes tasks sequentially, used mainly for testing or local development, the LocalExecutor runs tasks in parallel on a single machine but lacks distribution across multiple nodes, KubernetesExecutor and CeleryExecutor...

Setting Up Celery Executor

For production environments, CeleryExecutor or KubernetesExecutor are commonly chosen due to their ability to handle distributed, parallel task execution across multiple workers or containers. In the next section, we will see how to use celery as executor. Check out this course if you want to learn how to create workflows using celery.

Celery is one of the most popular executors for Airflow and is particularly useful for scaling workflows across multiple machines. This executor allow to run multiple tasks in parallel across a distributed system and simplify the scalability by adding more workers.

To use CeleryExecutor, configure it in your `airflow.cfg` file, as follow:

1
2[core]
3executor = CeleryExecutor
4[celery]
5broker_url = redis://localhost:6379/0 # or RabbitMQ, e.g., amqp://localhost
6result_backend = db+postgresql://user:password@localhost/airflow # Task results storage
7

Build your own executor

The best thing about Airflow is that it's highly extensively, The core functionality of Airflow (such as authentication) can also be extended to leverage external services. Executors are the mechanism by which task instances get run. There are several executor implementations built-in Airflow, each with their own unique characteristics and capabilities.

If you have a private tool/service for task execution that is only available to you or your organization. You may consider Building your own executor. All Airflow executors implement a common interface so that they are pluggable and any executor has access to all abilities and integrations within Airflow. The Airflow scheduler uses this interface to interact with the executor. The public interface is the BaseExecutor. For more information about Airflow's public interface see Public Interface of Airflow https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html

Wrap Up

Apache Airflow is a powerful tool for managing workflows, especially in data engineering. With Airflow, you can automate and monitor pipelines with ease, improving the efficiency and reliability of your workflows. By following this guide, you should have a solid understanding of the basics of Airflow and how to get started with building and managing DAGs.

In this article, we covered:

  • Airflow's core concepts
  • How to create and run a DAG
  • Advanced concepts, including branching and XCom

With this knowledge, you can start leveraging Apache Airflow to automate complex data pipelines and workflows. While building your Workflow using Airflow, you should consider the following best practices:

  • Organize code into reusable components, especially when tasks share similar logic. This makes debugging easier and reduces redundant code.
  • Tasks should be idempotent they should produce the same result if run multiple times. This is particularly important if tasks need to be retried due to failures.
  • Set retries and use `try-except` blocks within tasks to handle errors gracefully. Additionally, configure alerts for failed DAG runs.