Apache Airflow on Docker With AWS S3 | by Narotam Aggarwal | May, 2022

Write your first DAG

Photograph by Emin Sefiyarov on Unsplash

This weblog is for somebody who needs to get began with Apache Airflow rapidly. This weblog assumes you may have a primary understanding of Apache Airflow, Docker, and AWS.

By the tip of this weblog, you should have your first DAG written and orchestrated in Airflow.

The primary focus is on easy methods to launch the Airflow utilizing an prolonged picture on Docker, assemble a DAG with PythonOperator-focused duties, make the most of XComs (a way that enables Duties to speak with each other), use Python modules, and eventually publish and retrieve information from an AWS S3 bucket.

We’ll carry out a small undertaking with the next duties to raised perceive this:

a) Create a weblog file utilizing Python script
b) Add the file to an AWS S3 bucket created within the earlier step
c) Hook up with AWS S3 utilizing AWS CLI for object validation

We’ll full our Airflow arrange and begin the docker by following the steps beneath, after which we’ll be capable to run our pipeline in Airflow and retrieve the information.

  1. Docker configuration for Airflow
  2. Docker configuration for Airflow’s prolonged picture
  3. Docker configuration for AWS
  4. Executing docker picture to create container
  5. DAG and Duties creation in Airflow
  6. Executing DAG from Airflow UI
  7. Accessing S3 bucket / objects utilizing AWS CLI

1. Docker configuration for Airflow

We’ll use docker on macOS to run containers for the Airflow setup. We are going to use docker-compose.yaml file from Airflow documentation as a base and add the required configuration on high of it. Right here is the link and a few info from the Airflow official web site.

curl -LfO ‘https://airflow.apache.org/docs/apache-airflow/2.2.5/docker-compose.yaml'
  • airflow-scheduler – The scheduler displays all duties and DAGs, then triggers the duty situations as soon as their dependencies are full.
  • airflow-webserver — The webserver is out there at http://localhost:8080
  • airflow-worker — The employee that executes the duties given by the scheduler.
  • airflow-init — The initialization service.
  • flowerThe flower app for monitoring the surroundings. It’s obtainable at http://localhost:5555.
  • postgres — The database.
  • redisThe redis – dealer that forwards messages from scheduler to employee.

2. Docker configuration for Airflow’s prolonged picture

The subsequent step is to create a Dockerfile that can enable us to increase our Airflow base picture to incorporate Python packages that aren’t included within the authentic picture (apache/airflow:2.2.5).

Create a file referred to as “Dockerfile” in the identical listing because the docker-compose.yaml file and paste the beneath strains into it.

FROM apache/airflow:2.2.5
RUN pip3 set up Faker numpy boto3 botocore

Right here we’re utilizing Airflow’s base picture after which extending it utilizing extra libraries, packages, modules, and so forth. that are required for our use case. You’ll be able to add/take away packages based mostly in your data-processing requirement.

There are a few methods Dockerfile might be executed –

  1. Remark the bottom picture and uncomment the constructing line in docker-compose.yaml file
  2. Don’t make any adjustments to docker-compose.yaml file at this stage and power its execution whereas operating the docker (defined in Part 4).
# picture: $AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.5
construct: .

3. Docker configuration for AWS

The subsequent step is so as to add your AWS credentials as ENV variables in docker-compose.yaml file.

Add the beneath variables below the surroundings part and put the credentials out of your AWS account.


Let’s additionally embody the AWS CLI picture within the docker-compose.yaml file, which can be utilized to entry AWS S3 objects to confirm if required information has been saved on S3 or not.

picture: amazon/aws-cli
entrypoint: tail -f /dev/null
<<: *airflow-common-env

entrypoint It will hold the cli container operating

surroundings Passing Airflow surroundings to AWS CLI to make use of AWS credentials

4. Executing docker picture to create the container

Now run the next command to initialise the surroundings, confirm if the Airflow picture isn’t too outdated and unsupported, if the UID is configured, and if the requisite RAM, disc house, and sources can be found.

docker-compose up airflow-init

Let’s run the docker-compose.yaml file now, which is able to produce the entire photos specified within the file (together with Dockerfile), after which run the containers utilizing these photos.

docker-compose up -d

-d → run containers within the background (indifferent mode)


Should you haven’t commented on the picture line and uncommented the constructing line within the docker-compose.yaml file, run this command.

docker-compose up --build -d

–build → run Dockerfile

As soon as this has been accomplished efficiently, you’ll be able to confirm the standing of the pictures utilizing the next command or by clicking on the newly fashioned Containers / Apps within the Docker UI.

docker ps

Use the command beneath to login into any of those containers utilizing CLI.

docker exec -ti <Container ID> /bin/bash

5. DAG and Duties creation in Airflow:

Place weblog_gen.py (from the Github hyperlink – https://github.com/narotam333/de-project-1) below dags folder. This script is used to generate weblog information and is imported into our DAG as a module.

Let’s import the required libraries, packages, or modules we are going to want for our first dag.

# The DAG object; we'll want this to instantiate a DAG
from airflow import DAG
# Pendulum is a Python bundle to ease datetimes manipulation
import pendulum
# Operators; we want this to function!
from airflow.operators.python import PythonOperator
# Python module for information weblog technology
from weblog_gen import generate_log
# Different packages for AWS connection and information processing
import os
import boto3
from botocore.exceptions import ClientError
import logging

DAG: A DAG (Directed Acyclic Graph) is the core idea of Airflow, accumulating Duties collectively, organized with dependencies and relationships to say how they need to run.

Let’s begin writing our first DAG…

# [START instantiate_dag]with DAG(
default_args=‘retries’: 2,
description=’ETL DAG tutorial’,
start_date=pendulum.datetime(2022, 1, 1, tz=”UTC”),
) as dag:
# [END instantiate_dag]

my_first_dag → That is the DAG ID and should consist solely of alphanumeric characters, dashes, dots and underscores (all ASCII). All of the totally different DAG’s should have distinctive id.

default_args → A dictionary of default parameters for use as constructor key phrase parameters when initialising operators (Non-compulsory)

description → The outline for the DAG to e.g. be proven on the webserver (Non-compulsory)

schedule_interval → Defines how usually that DAG runs, this timedelta object will get added to your newest process occasion’s execution_date to determine the following schedule.

start_date → The timestamp from which the scheduler will try to backfill (Non-compulsory)

catchup → Carry out scheduler catchup (or solely run newest)? Defaults to True

tags → Listing of tags to assist filtering DAGs within the UI (Non-compulsory)

Subsequent, we are going to write our first Process utilizing PythonOperator.

Process: A Process is the essential unit of execution in Airflow. Duties are organized into DAGs, after which have upstream and downstream dependencies set between them as a way to specific the order they need to run in.

Operator: An Operator is conceptually a template for a predefined Process, you can simply outline declaratively inside your DAG.

On this Process, we are going to name the perform referred to as f_generate_log and move the required argument to it. When operating our callable, Airflow will move a set of arguments/key phrase arguments that can be utilized in our perform.

# [START weblog_function]def f_generate_log(*op_args, **kwargs):
ti = kwargs[‘ti’]
strains = op_args[0]
logFile = generate_log(strains)
ti.xcom_push(key=’logFileName’, worth=logFile)
# [END weblog_function]# [Start weblog task]create_weblog_task = PythonOperator(
op_args = [30],
# [End weblog task]

task_id → This needs to be distinctive for all of the duties inside a DAG

python_callable → A reference to an object that’s callable

op_args → An inventory of positional arguments that can get unpacked when calling your callable

op_kwargs → A dictionary of key phrase arguments that can get unpacked in your perform

generate_log → That is the module that generates the weblog information file. Please undergo the weblog_gen.py script to see the way it works, nevertheless it mainly generates weblog information in a file and variety of strains shall be equal to the quantity handed as an argument. It would return the log file identify as soon as it has been created.

ti.xcom_push() → Worth for the important thing logFileName is pushed into the XCom utilizing taskInstance (ti) key phrase argument as a key-value pair

key=’logFileName’ → This holds the important thing identify

worth = logFile → This holds the returned worth (identify of the log file) from generate_log module

Let’s go on to the second process in our DAG.

# [START s3_upload_file function]def s3_upload_file(**kwargs):
ti = kwargs[‘ti’]
bucketName = kwargs[‘bucketName’]
fileName = ti.xcom_pull(task_ids='weblog', key='logFileName')
objectName = os.path.basename(fileName)
s3_client = boto3.consumer(‘s3’)
response = s3_client.upload_file(fileName, bucketName, objectName)
besides ClientError as e:
return False
return True
# [END s3_upload_file function]# [Start s3 upload task]s3_upload_log_file_task = PythonOperator(
task_id = ‘s3_upload_log_file’,
op_kwargs = ‘bucketName’: <>,
# [End s3 upload task]

We’re utilizing key phrase arguments on this Process since we have to move the AWS S3 bucket identify. Please present the identify of the bucket you created or that’s obtainable in your AWS account.

We’re calling the s3_upload_file perform on this Process and passing the required key phrase arguments.

ti.xcom_pull() → Utilizing the XCom taskInstance (ti) key phrase argument, the worth for the important thing logFileName is retrieved

task_ids = ‘weblog’ → Passing the duty id identify from earlier process

key=’logFileName’ → Passing the important thing that holds the generated weblog file identify

The subsequent step is to ascertain the move of newly created Duties.

create_weblog_task >> s3_upload_log_file_task

This completes the event of a DAG file for our use case, which features a DAG object, Duties, and Operators, in addition to the orchestration of the duties acknowledged on the finish.

6. Executing DAG from Airflow UI

To start, log in to the Airflow UI with the credentials listed beneath.

Hyperlink → http://localhost:8080/
Username / Password → airflow

You must be capable to see the DAG that we created below the DAG’s menu.

Let’s run our DAG now by hitting the play button below Actions on the right-hand facet. Should you click on on DAG, you’ll be capable to see the DAG’s and Duties’ progress utilizing color coding.

It’s also possible to see the logs at any time by choosing Process, then Log.

As soon as Log is open, click on XCom to see what’s the worth of key variable we transmitted from one Process to the following.

In order that concludes our DAG’s profitable execution in Airflow.

7. Accessing S3 bucket/objects utilizing AWS CLI

Let’s take a fast peek on the AWS S3 bucket to see if the suitable file or object has been generated. To take action, use the beneath command to log into the AWS CLI container and carry out the s3 command to show all objects.

Right here it’s! The file was efficiently uploaded to the S3 bucket. This marks the completion of our first DAG run in Airflow, in addition to its profitable validation.

GitHub hyperlink for full code:

More Posts