Apache Airflow is a powerful open-source platform for orchestrating workflows and managing data pipelines. Used extensively in data engineering, Airflow allows teams to schedule, monitor, and organize complex workflows using Python code. This guide provides an in-depth look at Airflow, from basic concepts to practical examples, to help you effectively use Airflow in real-world scenarios.
Apache Airflow was designed to solve the problem of orchestrating complex workflows, often with dependencies between tasks and varied execution schedules. Written in Python, Airflow enables users to:
From the use cases of Apache Airflow we can find:
Understanding Airflow's core components is essential for creating effective pipelines. Here are some key concepts:
To start using Airflow, let's set up a local environment and install the necessary components. Pip installation is the only officially supported. For demonstration purposes, we'll use Airflow 2.5.0.
12pip install apache-airflow==2.5.03
To start Airflow you can use the standalone command or run the command manually to initializes the database, creates a user, and starts all components.
12# Start Airflow with one command3airflow standalone456# Start Airflow with individual commands7airflow db migrate89airflow users create --username guru --firstname Workflows --lastname Guru --role Admin --email guru@workflows.guru1011airflow webserver --port 80801213airflow scheduler14
To access the web interface navigate to `http://localhost:8080`. The web UI provides visibility into DAGs, tasks, and their status.
Let's create a simple DAG with two tasks: fetching data from an API and processing that data. Start by creating a new Python file under dags/, for example, example_dag.py. This file will contain the DAG's configuration and tasks.
12from airflow import DAG3from airflow.operators.python import PythonOperator4from datetime import datetime, timedelta567# Define the default arguments8default_args = {9 'owner': 'airflow',10 'retries': 1,11 'retry_delay': timedelta(minutes=5),12 'start_date': datetime(2023, 1, 1),13}1415# Define the DAG16with DAG(17 'example_dag',18 default_args=default_args,19 schedule_interval='@daily',20 catchup=False,21) as dag:2223 # Task 1: Fetch data24 def fetch_data():25 # Here you would add code to fetch data, e.g., from an API26 print("Fetching data…")2728 fetch_data_task = PythonOperator(29 task_id='fetch_data_task',30 python_callable=fetch_data,31 )3233 # Task 2: Process data34 def process_data():35 # Add data processing code here36 print("Processing data…")3738 process_data_task = PythonOperator(39 task_id='process_data_task',40 python_callable=process_data,41 )4243# Set task dependencies44fetch_data_task >> process_data_task45
The DAG is scheduled to run daily and starts from a specific date. `fetch_data` and `process_data` are defined as Python functions and wrapped into `PythonOperator` tasks. `fetch_data_task` must complete before `process_data_task` begins.
Once the DAG file is saved, you can trigger it through the web interface or with the command line.
In the previous DAG, fetch_data and process_data are independent, if we want to use the fetched data from the first Task in the process_data task we should use XCom. (short for cross-communication allows tasks to share data) The example below show how to use data sharing between tasks:
123def fetch_data(**context):4 context['ti'].xcom_push("fetched_data", {"data: "my_data"})56def process_data(**context):7 value = context['ti'].xcom_pull(task_ids="process_data_task", key="fetched_data")8 print("Pulled value:", value)910fetch_data_task = PythonOperator(11 task_id='fetch_data_task',12 python_callable=fetch_data,13 provide_context=True,14)1516process_data_task = PythonOperator(17 task_id='process_data_task',18 python_callable=process_data,19 provide_context=True,20)2122fetch_data_task >> process_data_task23
As we see here, fetch_data use xcom_push to push data to the XCom, and in the process data task, we use xcom_pull and the same key to get the data passed from the previous task.
Airflow allows you to define complex dependencies and conditional paths. In the example below we use BranchPythonOperator to dynamically chooses which path the DAG will follow based on the condition in choose_branch function.
12from airflow.operators.dummy import DummyOperator3from airflow.operators.python import BranchPythonOperator45# Define branching condition6def choose_branch():7 return 'branch_1' if some_condition else 'branch_2'89branch_task = BranchPythonOperator(10 task_id='branch_task',11 python_callable=choose_branch,12)1314branch_1 = DummyOperator(task_id='branch_1')15branch_2 = DummyOperator(task_id='branch_2')1617# Set up dependencies18branch_task >> [branch_1, branch_2]19
In the previous example, we used the traditional and complexe way to declare DAGs and pass data between tasks, starting from Airflow 2.0 TaskFlow was untroduced. Using TaskFlow the previous DAG could be represented as follow:
12import pendulum34from airflow.decorators import dag, task567@dag(8 schedule=None,9 start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),10 catchup=False,11 tags=["example"],12)13def example_dag()1415 @task()16 def fetch_data():17 # Fetch data, e.g., from an API18 print("Fetching data…")19 return {"data": "data"}2021 @task()22 def process_data(fetched_data: dict):23 print("Processing data…", fetched_data)24 return {"processed_data": "processed"}2526fetched_data = fetch_data()2728processed_data = process_data(fetched_data)2930example_dag()31
We have invoked the fetch data task, obtained the data from there and sent it over to the process data task. The dependencies between the tasks and the passing of data between these tasks which could be running on different workers on different nodes on the network is all handled by Airflow. All of the XCom usage for data passing between these tasks is abstracted away from the DAG author in Airflow 2.0. However, XCom variables are used behind the scenes and can be viewed using the Airflow UI as necessary for debugging or DAG monitoring. Similarly, task dependencies are automatically generated within TaskFlows based on the functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly created and dependencies specified as shown below.
In Airflow's architecture, executors determine how tasks within DAGs are executed. Airflow support many type of executor, for example, the SequentialExecutor, This is the default executor, it executes tasks sequentially, used mainly for testing or local development, the LocalExecutor runs tasks in parallel on a single machine but lacks distribution across multiple nodes, KubernetesExecutor and CeleryExecutor...
For production environments, CeleryExecutor or KubernetesExecutor are commonly chosen due to their ability to handle distributed, parallel task execution across multiple workers or containers. In the next section, we will see how to use celery as executor. Check out this course if you want to learn how to create workflows using celery.
Celery is one of the most popular executors for Airflow and is particularly useful for scaling workflows across multiple machines. This executor allow to run multiple tasks in parallel across a distributed system and simplify the scalability by adding more workers.
To use CeleryExecutor, configure it in your `airflow.cfg` file, as follow:
12[core]3executor = CeleryExecutor4[celery]5broker_url = redis://localhost:6379/0 # or RabbitMQ, e.g., amqp://localhost6result_backend = db+postgresql://user:password@localhost/airflow # Task results storage7
The best thing about Airflow is that it's highly extensively, The core functionality of Airflow (such as authentication) can also be extended to leverage external services. Executors are the mechanism by which task instances get run. There are several executor implementations built-in Airflow, each with their own unique characteristics and capabilities.
If you have a private tool/service for task execution that is only available to you or your organization. You may consider Building your own executor. All Airflow executors implement a common interface so that they are pluggable and any executor has access to all abilities and integrations within Airflow. The Airflow scheduler uses this interface to interact with the executor. The public interface is the BaseExecutor. For more information about Airflow's public interface see Public Interface of Airflow https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html
Apache Airflow is a powerful tool for managing workflows, especially in data engineering. With Airflow, you can automate and monitor pipelines with ease, improving the efficiency and reliability of your workflows. By following this guide, you should have a solid understanding of the basics of Airflow and how to get started with building and managing DAGs.
In this article, we covered:
With this knowledge, you can start leveraging Apache Airflow to automate complex data pipelines and workflows. While building your Workflow using Airflow, you should consider the following best practices: