An Introduction to Cloud Composer

Workflows are a common theme in data analytics – they involve ingesting, transforming, and analyzing data to figure out the meaningful information within. In Google Cloud Platform (GCP), the tool for hosting workflows is Cloud Composer which is a hosted version of the popular open source workflow tool Apache Airflow.

Setup and requirements

log in into your cloud console.

Take part in COVID-19 SOLUTION CHALLENGE send us a mail

Cloud Console Menu

Activate Google Cloud Shell

Google Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud. Google Cloud Shell provides command-line access to your GCP resources.

Cloud Shell icon
cloudshell_continue.png

You can list the active account name with this command:

gcloud auth list

You can list the project ID with this command:

gcloud config list project

Create Cloud Composer environment

In this section, you create a Cloud Composer environment.

  1. Go to Navigation menu > Composer:
6d5cc1e126272384.png
  1. Click CREATE ENVIRONMENT and set the following for your environment:
PropertyValue
Namehighcpu
Locationus-central1
Zoneus-central1-a
Machine typen1-highcpu-4

Leave all other settings as default.

  1. Click Create.

The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console.

It can take 10-15 minutes for the environment to complete the setup process. Continue with the lab while the environment spins up.

  • bigquery gis builds HURRICANE path of 2017 usa in single sql command

    To see code click here

Create a Cloud Storage bucket

Create a Cloud Storage bucket in your project. This buckets will be used as output for the Hadoop job from Dataproc.

  1. Go to Navigation menu > Storage > Browser and then click Create bucket.
  2. Give your bucket a universally unique name, then click Create.

Remember the Cloud Storage bucket name as you’ll use it as an Airflow variable later.

Airflow and core concepts

While waiting for your Composer environment to get created, review some terms that are used with Airflow.

Airflow is a platform to programmatically author, schedule and monitor workflows.

Use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

Core concepts

DAG

A Directed Acyclic Graph is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Operator

The description of a single task, it is usually atomic. For example, the BashOperator is used to execute bash command.

Task

A parameterised instance of an Operator; a node in the DAG.

Task Instance

A specific run of a task; characterized as: a DAG, a Task, and a point in time. It has an indicative state: runningsuccessfailedskipped, …

Defining the workflow

Now let’s discuss the workflow you’ll be using. Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). DAGs are defined in standard Python files that are placed in Airflow’s DAG_FOLDER. Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks. In general, each one should correspond to a single logical workflow.

Below is the hadoop_tutorial.py workflow code, also referred to as the DAG:

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to used as output for the Hadoop jobs from Dataproc.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_quickstart',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

To orchestrate the three workflow tasks, the DAG imports the following operators:

  1. DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster.
  2. DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.
  3. DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.

The tasks run sequentially, which you can see in this section of the file:

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

The name of the DAG is quickstart, and the DAG runs once each day.

with models.DAG(
        'composer_sample_quickstart',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Because the start_date that is passed in to default_dag_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG uploads.

Viewing environment information

  1. Go back to Composer to check on the status of your environment.
  2. Once your environment has been created, click the name of the environment (highcpu) to see its details.

On the Environment details you’ll see information such as the Airflow web interface URL, Kubernetes Engine cluster ID, and a link to the DAGs folder, which is stored in your bucket.Note: Cloud Composer only schedules the workflows in the /dags folder.

Using the Airflow UI

To access the Airflow web interface using the GCP Console:

  1. Go back to the Environments page.
  2. In the Airflow webserver column for the environment, click Airflow.
  3. Click on your lab credentials.
  4. The Airflow web interface opens in a new browser window.

Setting Airflow variables

Airflow variables are an Airflow-specific concept that is distinct from environment variables.

  1. Select Admin > Variables from the Airflow menu bar, then Create.
4a38ba78af97a898.png
  1. Create the following Aireflow variables, gcp_projectgcs_bucket, and gce_zone:
KEYVALUEDetails
gcp_project<your project-id>The Google Cloud Platform project you’re using for this quickstart.
gcs_bucketgs://<my-bucket>Replace <my-bucket> with the name of the Cloud Storage bucket you made earlier. This bucket stores the output from the Hadoop jobs from Dataproc.
gce_zoneus-central1-aThis is the Compute Engine zone where your Cloud Dataproc cluster will be created. To chose a different zone, see Available regions & zones.

Click Save and Add Another after adding each variable. Your Variables table should look like this when you’re finished:

6069c7a4f191b5a3.png

Uploading the DAG to Cloud Storage

To upload the DAG:

  1. In Cloud Shell, copy and save hadoop_tutorial.py on your local virtual machine:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples

  1. Change to the python-docs-samples directory:
cd python-docs-samples/composer/workflows
  1. Now upload a copy of the hadoop_tutorial.py file to the Cloud Storage bucket, which gets automatically created when you create the environment. You can check that by going to Composer > Environments. Click on the environment you created earlier, this will get you to the description of the environment you created. Find DAGs folder, copy the value to replace DAGs_folder_path in the following command to upload the file :
gsutil cp hadoop_tutorial.py <DAGs_folder_path>

Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes. The workflow will now be referred to as composer-sample-quickstart.

You will be able to see the task status in the Airflow web interface.

Exploring DAG runs

When you upload your DAG file to the dags folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately.

Make sure that you’re on the DAGs tab in the Airflow web interface. It takes several minutes for this process to complete. Refresh your browser to make sure you’re looking at the latest information.

DAGs.png
  1. In Airflow, click composer_hadoop_tutorial to open the DAG details page. This page includes a graphical representation of the workflow tasks and dependencies.
composer_hadoop_tutorial.png
  1. In the toolbar, click Graph View. Mouseover the graphic for each task to see its status. Note that the border around each task also indicates the status (green border = running; red = failed, etc.).
hover.png
  1. Click the “Refresh” link to make sure you’re looking at the most recent information. The boarders of the processes change colors as the state of the process changes.

Once your process reaches the Success state, run the workflow again from the Graph View:

  1. Click the create_dataproc_cluster graphic.
  2. Click Clear to reset the three tasks. create_cluster.png
  3. Then click OK to confirm.

Notice that the color around create_dataproc_cluster has changed and the state is “running”.

You can also monitor the process in the GCP Console.

  1. Once the status for create_dataproc_cluster has changed to “running”, go to Navigation menu > Dataproc, then click on:
  • Clusters to monitor cluster creation and deletion. The cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.
  • Jobs to monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.
  1. Once Dataproc gets to a state of “Running”, return to Airflow and click Refresh to see that the cluster is complete.

When the run_dataproc_hadoop process is complete, go to Navigation menu > Storage > Browser and click on the name of your bucket to see the results of the wordcount in the wordcount folder.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s