Installing and using Apache Airflow on the Google Cloud Platform

Apache Airflow et Google Cloud Platform

 

In this article, we will see how to install Apache Airflow on a Google Compute Engine instance and how it can be used to schedule periodic processings that make use of the various GCP services.

 

Introduction

The Google Cloud Platform provides a bunch of great managed services on top of which we build our own services or data platforms. Whether we need to use a database (relational or not), a datawarehouse, a data processing solution (based on Hadoop/Spark or on Apache Beam) or simply to store data, there is a managed service for that.

Unfortunately, when it comes to tie all these services together to create complex ETL / data processing pipelines, or simply to schedule periodic executions of some work, the GCP currently doesn’t provide a managed service like those we can find on the other cloud platforms (see AWS Data Pipeline or Azure Data Factory).

We could obviously use a GCE instance, configure some Cron jobs and write Bash or Python scripts to achieve what we want but that’s clearly not an ideal solution. It would solve the scheduling issue but we wouldn’t be able to (simply) monitor the execution of the tasks or even restart the failed ones. It could be a solution for simple tasks but it simply doesn’t scale for complex pipelines.

In fact, the “official” way to schedule work execution on the GCP is to use the App Engine’s Cron service to trigger the execution of a deployed App Engine service. This service will in turn delegate (directly or not by using Pub/Sub) the actual execution of the task to another custom service deployed on a GCE instance. This is, in my opinion, quite complicated to achieve only task scheduling: this solution doesn’t offer built-in monitoring or task management. However as we implement the task execution as a custom service, the advantage is that we can code whatever we want, as complex as it can be.

As we can see, these solutions have two main defaults: a lot of code has to be written to implement the actual task and there is no built-in monitoring or task management. And none of them can be compared to the data pipelines services offered by the other cloud providers.

This is where Airflow comes to the rescue. For a full description of the product, have a look at the documentation. In short, Airflow is a platform allowing to schedule and monitor workflows. These workflows are declared with Python code as Directed Acyclic Graphs (DAGs) of tasks. The tasks represent the different steps that are executed and can be of very different nature: execution of a bash command, calling an API endpoint via an HTTP request, sending a mail, etc. The true power of Airflow when deployed on a GCP project is its integration with the GCP services via dedicated operators; it is for example possible to export/import data from/to BigQuery, create a Dataproc cluster and submit a Spark job, start a Dataflow job…

Airflow also comes with a rich user interface allowing to visualize the workflows’ structure and execution status.

To summarize, Airflow is a fantastic tool that will allow us to simply create, schedule and monitor complex pipelines with a great integration with the GCP services. Compared to the previous solution (Cron on a GCE instance and App Engine Cron service), it’s simply the best of both worlds: simple but powerful task scheduling. The only drawback of using Airflow on the GCP is that it is not (yet?) offered as a managed service and thus we will have to deploy it on a GCE instance or instance cluster for complex deployments with multiple workers (not covered here).

Database setup

Airflow uses a database to store the DAGs execution status and history. With the default configuration, a SQLite database is created on the file system of the GCE instance where Airflow is running. As we are on the GCP, we are going to use CloudSQL which will allow us to persist the database outside of the GCE instance running Airflow.

CloudSQL instance creation

To setup the database, go to the CloudSQL console and create a new MySQL instance named airflow-db:

Apache_Airflow_00003

Once created (it takes a few minutes), go to the instance configuration and create a new database named airflow:

Apache_Airflow_00004

Finally, create a user account named airflow-user that will be used by Airflow:

Apache_Airflow_00005

That’s it, we now have a fully managed database server up and running!

Introducing the CloudSQL proxy

Now that the database server is started and the database created we have to make sure that Airflow will be able to connect to it. Upon creation, CloudSQL instances are by default not visible to anyone (see the Access Control / Authorization tab).

One solution would be to whitelist the public IP address of the GCE instance running Airflow in the CloudSQL instance configuration. But that’s not ideal as this address may change upon restart of the Airflow instance so it would not have access anymore to the database, but someone else would…

The best solution is to use the CloudSQL proxy, it requires a little more configuration but it is a much more secure and flexible option. Have a look at the documentation to understand what it provides and how it works. Once installed and configured on a GCE instance, the proxy will handle the connection to the Cloud SQL database with the provided credentials using the CloudSQL API which does not require IP whitelisting.

The Cloud SQL Proxy can be configured in many ways depending on which credentials you want to use, how you provide them and also the explicit specification of the CloudSQL database or the automatic instance discovery. Refer to the documentation to select the appropriate configuration for your project.

In our case, we will keep it simple and use the credentials associated with the Compute Engine instance (not a dedicated service account as we would do for production environments) and explicit CloudSQL instance specification.

The purpose of this section was only to present the CloudSQL proxy, the actual installation and configuration will be performed in the next steps.

Creating a service account

We want to write some DAGs that use a few GCP services, so we have to make sure that the tasks executed by Airflow have the required permissions.

One solution would be to set the required access authorizations when creating the GCE instance on which we will install Airflow. This is not recommended for (at least) two reasons:

  • All services and DAGs executed on this instance will share the same access permissions, which then must be the union of all the required ones;
  • For security reason, API access authorizations can’t be modified after the creation of a compute instance and must be set at creation; so, in case of a new DAG that must have access to a new service, we will have to re-create the GCE instance.

The right way to do it is to create a dedicated service account with only the permissions required by our DAGs. The permissions of a service account can be modified at any time, which will be useful if we need to allow access to a new service for example. You can go even farther and create multiple service accounts for fine grained access control or usage statistics for your various DAGs but we will keep it simple and use only one account.

To create the service account, go to IAM & Admin / Service Accounts and create a new account named airflow-service-account, select the “BigQuery User”, “BigQuery Data Editor” and “Storage Object Admin” roles and select the “Furnish a new private key” option:

Apache_Airflow_00006

Upon account creation, the private key will be automatically downloaded on your computer, keep it safe, we will use it later.

Installing Airflow on a GCE instance

Now that we created the database and the service account, it’s time to create the Google Compute Engine instance and to install Airflow.

Create the instance

Simply create a new GCE instance based on the Debian 8 image with the configuration that suits your needs, name it whatever you want and in the Identity and API access section enable the Cloud SQL API.

Setup the CloudSQL proxy

Open an SSH session on the instance and execute the following command to install the CloudSQL proxy:

# Download the proxy and make it executable. 
sudo mkdir /opt/cloud_sql_proxy
cd /opt/cloud_sql_proxy
sudo wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O cloud_sql_proxy
sudo chmod +x /opt/cloud_sql_proxy/cloud_sql_proxy

# Start the CloudSQL proxy specifying the database instance to connect to.
# Replace INSTANCE_CONNECTION_NAME with your actual CloudSQL instance connection name. It can be found in the instance properties on the GCP console.
./cloud_sql_proxy -instances=[INSTANCE_CONNECTION_NAME]=tcp:3306

You may want to configure your system so that the CloudSQL proxy is launched when the instance is started, see this chapter for instructions.

Install and configure Airflow

There is an issue preventing to use the current release (1.8.1) of Airflow, so we have to install the 1.8.2rc1 version. Execute the following steps to install Airflow with all its dependencies:

# Install prerequisites.
sudo apt-get update && sudo apt-get install -y \
python3-pip \
python3-dev \
build-essential \
libssl-dev \
libffi-dev \
libmysqlclient-dev

# Upgrade pip.
sudo easy_install3 -U pip

# Install some other stuff.
sudo pip3 install mysqlclient

# Install Airflow with the extra package gcp_api containing the hooks and operators for the GCP services.
sudo pip3 install apache-airflow[gcp_api]==1.8.2rc1

We now have to run Airflow a first time, so that it creates the configuration file that we will update. You can have a look at the Setting Configuration Options and Setting up a Backend sections of the documentation to understand the options we will modify.

# Create AIRFLOW_HOME directory.
export AIRFLOW_HOME=/airflow
sudo mkdir $AIRFLOW_HOME
sudo chmod 777 $AIRFLOW_HOME
cd $AIRFLOW_HOME

# Run Airflow a first time to create the airflow.cfg configuration file and edit it.
airflow version
vim $AIRFLOW_HOME/airflow.cfg

# Update the following properties:
#     executor = LocalExecutor
#     sql_alchemy_conn = mysql://airflow-user:airflow-pwd@127.0.0.1:3306/airflow
#     load_examples = False

# Run Airflow again to initialize the metadata database.
airflow initdb

Now that Airflow is installed, you may want to configure your system so that it runs as a managed daemon automatically launched when the instance is started, have a look at the Integration with systemd and Integration with upstart sections of the documentation to do this.

Finally, it’s time to launch Airflow as a web server, Make sure the port 8080 is accessible from the Internet (Networking/Firewall rules) or change it to something else in the following command line:

# Start the web server.
airflow webserver -p 8080

In your preferred browser, go to [http://INSTANCE PUBLIC IP]:8080 and you should see the Airflow UI:

Apache_Airflow_00007

Create the GCP connection

We will now create the connection in Airflow that will be used to access the GCP services. For that, we will use the service account that we created in a previous step. First, copy the service account’s private key somewhere on the Airflow instance. Then in the Admin menu of the Airflow UI, select the Connections option and click create:

Apache_Airflow_00008

Enter the following values:

  • Connection ID: _my_gcp_conn_ (will be used in the DAGs)
  • Connection type: Google Cloud Platform
  • Project ID: the unique identifier of your GCP project
  • Keyfile path: path to the service account’s private key
  • Scopes: https://www.googleapis.com/auth/cloud-platform

Creating your first DAG

Now that we have a running Airflow instance with the appropriate configuration, it’s time to write our first DAG. And guess what, our first example will be a hello world. :)

The first thing to know is that the DAG definition files are plain Python scripts that must reside in the $AIRFLOW_HOME/dags directory. Create this directory and put a new file named _hello_world_dag.py_ with the following content:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 7, 17),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

def format_hello(**kwargs):
    return 'Hello from Python !! Current execution time is ' + kwargs['execution_date'].strftime('%Y-%m-%d')

with DAG('hello-world-dag', schedule_interval=timedelta(minutes=5), catchup=False, default_args=default_args) as dag:

    # Define the task that prints hello with the bash operator.
    t1 = BashOperator(
        task_id='hello_from_bash',
        bash_command='echo Hello world from Bash !!')

    # Define a task that does nothing.
    t2 = DummyOperator(task_id='noop')

    # Define the task that prints hello using Python code.
    t3 = PythonOperator(task_id='hello_from_python', python_callable=format_hello, provide_context=True)

    # Define the DAG structure.
    t1 >> t2 >> t3

As you can see, the DAG definition is achieved with really simple Python code. In this example we create a DAG named hello-world-dag, scheduled to be executed every 5 minutes and composed of three tasks executed in the following order:

  1. The task_ hello_from_bash_ prints some text using the echo command;
  2. The noop task does nothing;
  3. The _hello_from_python_ task prints some text using regular Python code.

The DAG structure (ie the relationship between the tasks) is declared with the following statement: t1 >> t2 >> t3. The >> operator means that the left side task is executed before the right side one (there is also a << operator for the opposite relation of dependency).

Now that the DAG definition file is created, the first thing to do is to check that it is valid Python code and that the Airflow environment is correctly installed. For that, execute the following command:

python3 hello_world_dag.py

If no error is raised, then the file has successfully been parsed by the Python interpreter.

Copied from the Airflow tutorial, you can run the following commands to validate this script further:

# Print the list of active DAGs.
airflow list_dags

# Print the list of tasks of the "hello-world-dag" DAG.
airflow list_tasks hello-world-dag

# Prints the hierarchy of tasks in the "hello-world-dag" DAG.
airflow list_tasks hello-world-dag --tree

We can also test the tasks defined in the DAG and check their output, the DAG state will not be impacted and nothing will be written to the database:

# Test the “hello_from_bash” task.
airflow test hello-world-dag hello_from_bash 2017-07-17

# Test the “hello_from_python” task.
airflow test hello-world-dag hello_from_python 2017-07-17

Go to the Airflow UI and reload it, you should see your DAG appear in the DAG list:

Apache_Airflow_00009

Click on it and explore the different views of the DAG structure.

Now that we have checked that the different tasks of the DAG work as expected it’s time to start the Airflow scheduler and to activate the DAG:

# Start the Airflow scheduler.
airflow scheduler

To activate the DAG, go back to the Airflow UI and click on the switch allowing to change its status to “un-paused”:

Apache_Airflow_00010

Airflow will immediately run your DAG for some execution dates in the past and from now on will start executing it every 5 minutes as we specified in the DAG configuration. You can periodically reload the Airflow UI and monitor the executions (DAG runs).

This is it, we have created a simple DAG that is scheduled to be executed periodically with only a few lines of Python!

Creating a DAG using the GCP services

Now that we have seen how to create a simple DAG, it’s time to write a more complex one that put the advertised integration with the GCP services into action.

In this example we will use the NOAA Global Surface Summary of the Day Weather Data public dataset. Let’s say we want to build a pipeline executed every day that extracts and processes the wind gust speed above a certain threshold measured on the previous day. We will write a new DAG that implements the first two steps of this data processing pipeline:

  1. Running a query to extract the measures of the previous day and materialize the result set on a new BigQuery table;
  2. Exporting the content of this table to a new directory in Cloud Storage.

First, we have to create the BigQuery dataset that will contain the tables created in step 1. Go to BigQuery and create a dataset named _wind_gust_speeds_, the data location must be US as the NOAA dataset is itself located in the US region.

Then, we have to create the GCS bucket in which the table data will be exported in step 2. Go to Cloud Storage and create a new bucket (as the bucket’s’ name must be globally unique on the GCP, it’s up to you to determine it).

The step 1 consists in the execution of a SQL query whose result set is written to a BigQuery table. Create a new file named $AIRFLOW_HOME/dags/gcp_dag/query_template.sql with the following content:

WITH
  gust_measures AS (
    SELECT
      stn,
      year,
      mo,
      da,
      ROUND((gust * 1.852), 2) AS gust_speed_kph
    FROM
      `bigquery-public-data.noaa_gsod.gsod{{ macros.ds_format(yesterday_ds_nodash, "%Y%m%d", "%Y") }}`
    WHERE
      stn != '999999'
      AND year = '{{ macros.ds_format(yesterday_ds_nodash, "%Y%m%d", "%Y") }}'
      AND mo = '{{ macros.ds_format(yesterday_ds_nodash, "%Y%m%d", "%m") }}'
      AND da= '{{ macros.ds_format(yesterday_ds_nodash, "%Y%m%d", "%d") }}'
      AND gust > (50 / 1.852) -- gust speed > 50 km/h
      AND gust < 999.9
  ),
  stations AS (
    SELECT
      usaf AS stn,
      country,
      state,
      name,
      lat,
      lon,
      CASE WHEN elev = '' THEN NULL ELSE CAST(elev AS FLOAT64) END AS elev
    FROM
      `bigquery-public-data.noaa_gsod.stations`
  )

SELECT
  *
FROM
  gust_measures
INNER JOIN
  stations USING (stn)

This is a BigQuery standard SQL query using Jinja2 templates and macros that will be replaced with their actual values when the DAG is executed. Have a look at the documentation to see how it works in Airflow and how to extend it with you own variables.

Here, the templating is used to:

  1. Format the name of the table on which the query is executed (gsod2017 for example);
  2. Select only the measures for the previous day.

And here is the Python code of the DAG ($AIRFLOW_HOME/dags/gcp_dag.py):

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 7, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

# TODO: Replace with your project ID.
PROJECT_ID = 'YOUR_PROJECT_ID'
CONNECTION_ID = 'my_gcp_conn'
BQ_DATASET_NAME = 'wind_gust_speeds'
BQ_TABLE_NAME_PREFIX = 'measures_'
# TODO: Replace with your bucket ID.
GCS_BUCKET_ID = 'YOUR_BUCKET_ID'

with DAG('gcp_dag', schedule_interval=timedelta(days=1), default_args=default_args) as dag:

    # Format table name template.
    table_name_template = PROJECT_ID + '.' + BQ_DATASET_NAME + '.' + BQ_TABLE_NAME_PREFIX + '{{ ds_nodash }}'

    # Format GCS export URI template.
    gcs_export_uri_template = 'gs://' + GCS_BUCKET_ID + '/daily_exports/{{ ds_nodash }}/part-*.gz'

    # 1) Compute data.
    bq_compute_yesterday_data = BigQueryOperator(
        task_id = 'bq_compute_yesterday_data',
        bql = 'gcp_dag/query_template.sql',
        destination_dataset_table = table_name_template,
        write_disposition = 'WRITE_TRUNCATE',
        bigquery_conn_id = CONNECTION_ID,
        use_legacy_sql = False
    )

    # 2) Export to Storage.
    bq2gcs_export_to_storage = BigQueryToCloudStorageOperator(
        task_id = 'export_to_storage',
        source_project_dataset_table = table_name_template,
        destination_cloud_storage_uris = [
            gcs_export_uri_template
        ],
        export_format = 'NEWLINE_DELIMITED_JSON',
        compression = 'GZIP',
        bigquery_conn_id = CONNECTION_ID,
    )

    # 3) Run data processing job.
    # TODO

    # Define the DAG structure.
    bq_compute_yesterday_data >> bq2gcs_export_to_storage

The code is again pretty straightforward. First, we define a set of constants in which you will have to replace the values of PROJECT_ID and GCS_BUCKET_ID with their actual values for your environment.

We then create a DAG named _gcp_dag_, scheduled to be executed every day and composed of two tasks executed in the following order:

  1. The _bq_compute_yesterday_data_ task uses the BigQueryOperator operator to perform the query using the template _gcp_dag/query_template.sql_ and writing the result to a new table every day;
  2. The _export_to_storage_ task uses the BigQueryToCloudStorageOperator operator to export the content of the table to Cloud Storage in a new directory every day.

As you can see, both tasks make use of the Airflow connection _my_gcp_conn_ that we created in a previous step and that allows to use the GCP services with the service account credentials.

You can test the _bq_compute_yesterday_data_ task for a particular date to check that the query is correctly performed and that a new table is created:

# Test the “bq_compute_yesterday_data” task on the date 2017-06-10 and check that the data for the previous day (2017-06-09) are correctly computed.
airflow test gcp_dag bq_compute_yesterday_data 2017-06-10

During the execution of the task you can see in the logs the actual SQL query formatted from the template.

In the same way, you can test, the _export_to_storage_ task and check that the table data is correctly exported to the bucket:

# Test the “export_to_storage” task on the date 2017-06-10 and check that the data for the previous day (2017-06-09) are correctly exported to Cloud Storage.
airflow test gcp_dag export_to_storage 2017-06-10

Finally, go back to the Airflow UI and click on the switch allowing to activate the DAG.

In the Python code we specified a start date of 2017–07–01. As soon as we activate the DAG, Airflow will execute it for all the days between this start date and the current day minus one. This process is called catchup. You can now check that the corresponding tables have been created in BigQuery and that their content have been exported in your bucket.

You’re now free to extend the pipeline and to add some actual data processing tasks. You may, for example, want to perform some computation based on the file stored in Cloud Storage with a Dataflow job or a Spark job executed on a Dataproc cluster created on the fly and then to import back the results to a new BigQuery table.

Conclusion

The Google Cloud Platform offers great managed services but clearly lacks one for building complex ETL / data processing pipelines on top of them. Airflow fills this void with a set of operators allowing to leverage the managed services and unleash the full power of automation on the GCP.

We can build our DAGs using the existing operators and sensors but Airflow is also an open platform that we can extend, if needed, by writing our own custom bricks.

Let’s hope that Google will someday offer it as a fully integrated and managed service!

Vous aimerez aussi...

  • DigitalMarketingGuy

    This was hands down the most helpful piece of documentation on Google Cloud I have ever read. Straightforward and to the point. Seriously job well done. PLEASE I beg of you write more.

    • Pascal CASTERAN

      Hello,

      I’m glad this article have been helpful to you.
      And yes, the project allowing to install the Cloud SQL proxy as a init.d service seems interesting, I will give it a try the next time I have to set-up the proxy, thanks :)

      Regards,
      Pascal