Simplify AWS Glue job orchestration and monitoring with Amazon MWAA

Organizations throughout all industries have complicated information processing necessities for his or her analytical use instances throughout totally different analytics methods, similar to information lakes on AWS, information warehouses (Amazon Redshift), search (Amazon OpenSearch Service), NoSQL (Amazon DynamoDB), machine studying (Amazon SageMaker), and extra. Analytics professionals are tasked with deriving worth from information saved in these distributed methods to create higher, safe, and cost-optimized experiences for his or her clients. For instance, digital media firms search to mix and course of datasets in inner and exterior databases to construct unified views of their buyer profiles, spur concepts for modern options, and improve platform engagement.

In these eventualities, clients in search of a serverless information integration providing use AWS Glue as a core element for processing and cataloging information. AWS Glue is nicely built-in with AWS companies and accomplice merchandise, and supplies low-code/no-code extract, rework, and cargo (ETL) choices to allow analytics, machine studying (ML), or utility improvement workflows. AWS Glue ETL jobs could also be one element in a extra complicated pipeline. Orchestrating the run of and managing dependencies between these elements is a key functionality in an information technique. Amazon Managed Workflows for Apache Airflows (Amazon MWAA) orchestrates information pipelines utilizing distributed applied sciences together with on-premises assets, AWS companies, and third-party elements.

On this submit, we present how you can simplify monitoring an AWS Glue job orchestrated by Airflow utilizing the newest options of Amazon MWAA.

Overview of resolution

This submit discusses the next:

  • improve an Amazon MWAA surroundings to model 2.4.3.
  • orchestrate an AWS Glue job from an Airflow Directed Acyclic Graph (DAG).
  • The Airflow Amazon supplier package deal’s observability enhancements in Amazon MWAA. Now you can consolidate run logs of AWS Glue jobs on the Airflow console to simplify troubleshooting information pipelines. The Amazon MWAA console turns into a single reference to observe and analyze AWS Glue job runs. Beforehand, assist groups wanted to entry the AWS Administration Console and take handbook steps for this visibility. This characteristic is on the market by default from Amazon MWAA model 2.4.3.

The next diagram illustrates our resolution structure.


You want the next stipulations:

Arrange the Amazon MWAA surroundings

For directions on creating your surroundings, discuss with Create an Amazon MWAA surroundings. For current customers, we advocate upgrading to model 2.4.3 to make the most of the observability enhancements featured on this submit.

The steps to improve Amazon MWAA to model 2.4.3 differ relying on whether or not the present model is 1.10.12 or 2.2.2. We focus on each choices on this submit.

Stipulations for establishing an Amazon MWAA surroundings

You will need to meet the next stipulations:

Improve from model 1.10.12 to 2.4.3

If you happen to’re utilizing Amazon MWAA model 1.10.12, discuss with Migrating to a brand new Amazon MWAA surroundings to improve to 2.4.3.

Improve from model 2.0.2 or 2.2.2 to 2.4.3

If you happen to’re utilizing Amazon MWAA surroundings model 2.2.2 or decrease, full the next steps:

  1. Create a necessities.txt for any customized dependencies with particular variations required to your DAGs.
  2. Add the file to Amazon S3 within the applicable location the place the Amazon MWAA surroundings factors to the necessities.txt for putting in dependencies.
  3. Comply with the steps in Migrating to a brand new Amazon MWAA surroundings and choose model 2.4.3.

Replace your DAGs

Prospects who upgraded from an older Amazon MWAA surroundings might must make updates to current DAGs. In Airflow model 2.4.3, the Airflow surroundings will use the Amazon supplier package deal model 6.0.0 by default. This package deal might embody some doubtlessly breaking modifications, similar to modifications to operator names. For instance, the AWSGlueJobOperator has been deprecated and changed with the GlueJobOperator. To keep up compatibility, replace your Airflow DAGs by changing any deprecated or unsupported operators from earlier variations with the brand new ones. Full the next steps:

  1. Navigate to Amazon AWS Operators.
  2. Choose the suitable model put in in your Amazon MWAA occasion (6.0.0. by default) to discover a checklist of supported Airflow operators.
  3. Make the required modifications within the current DAG code and add the modified information to the DAG location in Amazon S3.

Orchestrate the AWS Glue job from Airflow

This part covers the main points of orchestrating an AWS Glue job inside Airflow DAGs. Airflow eases the event of knowledge pipelines with dependencies between heterogeneous methods similar to on-premises processes, exterior dependencies, different AWS companies, and extra.

Orchestrate CloudTrail log aggregation with AWS Glue and Amazon MWAA

On this instance, we undergo a use case of utilizing Amazon MWAA to orchestrate an AWS Glue Python Shell job that persists aggregated metrics based mostly on CloudTrail logs.

CloudTrail permits visibility into AWS API calls which can be being made in your AWS account. A standard use case with this information could be to assemble utilization metrics on principals appearing in your account’s assets for auditing and regulatory wants.

As CloudTrail occasions are being logged, they’re delivered as JSON information in Amazon S3, which aren’t ultimate for analytical queries. We need to combination this information and persist it as Parquet information to permit for optimum question efficiency. As an preliminary step, we are able to use Athena to do the preliminary querying of the info earlier than doing further aggregations in our AWS Glue job. For extra details about creating an AWS Glue Knowledge Catalog desk, discuss with Creating the desk for CloudTrail logs in Athena utilizing partition projection information. After we’ve explored the info through Athena and determined what metrics we need to retain in combination tables, we are able to create an AWS Glue job.

Create an CloudTrail desk in Athena

First, we have to create a desk in our Knowledge Catalog that enables CloudTrail information to be queried through Athena. The next pattern question creates a desk with two partitions on the Area and date (known as snapshot_date). Remember to exchange the placeholders to your CloudTrail bucket, AWS account ID, and CloudTrail desk title:

create exterior desk if not exists `<<<CLOUDTRAIL_TABLE_NAME>>>`(
  `eventversion` string remark 'from deserializer', 
  `useridentity` struct<sort:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<sort:string,principalid:string,arn:string,accountid:string,username:string>>> remark 'from deserializer', 
  `eventtime` string remark 'from deserializer', 
  `eventsource` string remark 'from deserializer', 
  `eventname` string remark 'from deserializer', 
  `awsregion` string remark 'from deserializer', 
  `sourceipaddress` string remark 'from deserializer', 
  `useragent` string remark 'from deserializer', 
  `errorcode` string remark 'from deserializer', 
  `errormessage` string remark 'from deserializer', 
  `requestparameters` string remark 'from deserializer', 
  `responseelements` string remark 'from deserializer', 
  `additionaleventdata` string remark 'from deserializer', 
  `requestid` string remark 'from deserializer', 
  `eventid` string remark 'from deserializer', 
  `assets` array<struct<arn:string,accountid:string,sort:string>> remark 'from deserializer', 
  `eventtype` string remark 'from deserializer', 
  `apiversion` string remark 'from deserializer', 
  `readonly` string remark 'from deserializer', 
  `recipientaccountid` string remark 'from deserializer', 
  `serviceeventdetails` string remark 'from deserializer', 
  `sharedeventid` string remark 'from deserializer', 
  `vpcendpointid` string remark 'from deserializer')
  `area` string,
  `snapshot_date` string)

Run the previous question on the Athena console, and be aware the desk title and AWS Glue Knowledge Catalog database the place it was created. We use these values later within the Airflow DAG code.

Pattern AWS Glue job code

The next code is a pattern AWS Glue Python Shell job that does the next:

  • Takes arguments (which we cross from our Amazon MWAA DAG) on what day’s information to course of
  • Makes use of the AWS SDK for Pandas to run an Athena question to do the preliminary filtering of the CloudTrail JSON information exterior AWS Glue
  • Makes use of Pandas to do easy aggregations on the filtered information
  • Outputs the aggregated information to the AWS Glue Knowledge Catalog in a desk
  • Makes use of logging throughout processing, which might be seen in Amazon MWAA
import awswrangler as wr
import pandas as pd
import sys
import logging
from awsglue.utils import getResolvedOptions
from datetime import datetime, timedelta

# Logging setup, redirects all logs to stdout
LOGGER = logging.getLogger()
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')
streamHandler = logging.StreamHandler(sys.stdout)

LOGGER.information(f"Handed Args :: {sys.argv}")

sql_query_template = """

from "{cloudtrail_glue_db}"."{cloudtrail_table}"
the place snapshot_date="{process_date}"
and area in ('us-east-1','us-east-2')

required_args = ['CLOUDTRAIL_GLUE_DB',
arg_keys = [*required_args, 'PROCESS_DATE'] if '--PROCESS_DATE' in sys.argv else required_args
JOB_ARGS = getResolvedOptions ( sys.argv, arg_keys)

LOGGER.information(f"Parsed Args :: {JOB_ARGS}")

# if course of date was not handed as an argument, course of yesterday's information
process_date = (
    if JOB_ARGS.get('PROCESS_DATE','NONE') != "NONE" 
    else ( present() - timedelta(days=1)).strftime("%Y-%m-%d") 

LOGGER.information(f"Taking snapshot for :: {process_date}")


final_query = sql_query_template.format("-","/"),

LOGGER.information(f"Working Question :: {final_query}")

raw_cloudtrail_df = wr.athena.read_sql_query(


agg_df = raw_cloudtrail_df.groupby(['arn','region','eventsource','eventname','useragent'],as_index=False).agg({'ct':'sum'})


upload_path = f"s3://{TARGET_BUCKET}/{TARGET_DB}/{TARGET_TABLE}"

if not agg_df.empty:
    LOGGER.information(f"Add to {upload_path}")
        response = wr.s3.to_parquet(
    besides Exception as exc:
        LOGGER.error("Importing to S3 failed")
        increase exc
    LOGGER.information(f"Dataframe was empty, nothing to add to {upload_path}")

The next are some key benefits on this AWS Glue job:

  • We use an Athena question to make sure preliminary filtering is completed exterior of our AWS Glue job. As such, a Python Shell job with minimal compute continues to be adequate for aggregating a big CloudTrail dataset.
  • We make sure the analytics library-set possibility is turned on when creating our AWS Glue job to make use of the AWS SDK for Pandas library.

Create an AWS Glue job

Full the next steps to create your AWS Glue job:

  1. Copy the script within the previous part and reserve it in an area file. For this submit, the file known as
  2. On the AWS Glue console, select ETL jobs within the navigation pane.
  3. Create a brand new job and choose Python Shell script editor.
  4. Choose Add and edit an current script and add the file you saved domestically.
  5. Select Create.

  1. On the Job particulars tab, enter a reputation to your AWS Glue job.
  2. For IAM position, select an current position or create a brand new position that has the required permissions for Amazon S3, AWS Glue, and Athena. The position wants to question the CloudTrail desk you created earlier and write to an output location.

You should utilize the next pattern coverage code. Substitute the placeholders along with your CloudTrail logs bucket, output desk title, output AWS Glue database, output S3 bucket, CloudTrail desk title, AWS Glue database containing the CloudTrail desk, and your AWS account ID.

    "Model": "2012-10-17",
    "Assertion": [
            "Action": [
            "Useful resource": [
            "Impact": "Enable",
            "Sid": "GetS3CloudtrailData"
            "Motion": [
            "Useful resource": [
            "Impact": "Enable",
            "Sid": "GetGlueCatalogCloudtrailData"
            "Motion": [
            "Useful resource": [
            "Impact": "Enable",
            "Sid": "WriteOutputToS3"
            "Motion": [
            "Useful resource": [
            "Impact": "Enable",
            "Sid": "AllowOutputToGlue"
            "Motion": [
            "Useful resource": "arn:aws:logs:*:*:/aws-glue/*",
            "Impact": "Enable",
            "Sid": "LogsAccess"
            "Motion": [
            "Useful resource": [
            "Impact": "Enable",
            "Sid": "AccessToAthenaResults"
            "Motion": [
            "Useful resource": [
            "Impact": "Enable",
            "Sid": "AllowAthenaQuerying"

For Python model, select Python 3.9.

  1. Choose Load widespread analytics libraries.
  2. For Knowledge processing items, select 1 DPU.
  3. Depart the opposite choices as default or modify as wanted.

  1. Select Save to avoid wasting your job configuration.

Configure an Amazon MWAA DAG to orchestrate the AWS Glue job

The next code is for a DAG that may orchestrate the AWS Glue job that we created. We make the most of the next key options on this DAG:

"""Pattern DAG"""
import airflow.utils
from import GlueJobOperator
from airflow import DAG
from datetime import timedelta
import airflow.utils

# permit backfills through DAG run parameters
process_date="{{ dag_run.conf.get("process_date") if dag_run.conf.get("process_date") else "NONE" }}"

dag = DAG(
    default_args = {
        'catchup': False
    schedule_interval = None, # None for unscheduled or a cron expression - E.G. "00 12 * * 2" - at 12noon Tuesday
    dagrun_timeout = timedelta(minutes=30),
    max_active_runs = 1,
    max_active_tasks = 1 # since there is just one activity in our DAG

## Log ingest. Assumes Glue Job is already created
glue_ingestion_job = GlueJobOperator(
        "--TARGET_BUCKET": "<<<OUTPUT_S3_BUCKET>>>",
        "--TARGET_DB": "<<<OUTPUT_GLUE_DB>>>", # ought to exist already
        "--TARGET_TABLE": "<<<OUTPUT_TABLE_NAME>>>",
        "--PROCESS_DATE": process_date


Enhance observability of AWS Glue jobs in Amazon MWAA

The AWS Glue jobs write logs to Amazon CloudWatch. With the current observability enhancements to Airflow’s Amazon supplier package deal, these logs at the moment are built-in with Airflow activity logs. This consolidation supplies Airflow customers with end-to-end visibility instantly within the Airflow UI, eliminating the necessity to search in CloudWatch or the AWS Glue console.

To make use of this characteristic, make sure the IAM position hooked up to the Amazon MWAA surroundings has the next permissions to retrieve and write the required logs:

  "Model": "2012-10-17",
  "Assertion": [
      "Effect": "Allow",
      "Action": [
      "Useful resource": [
        "arn:aws:logs:*:*:log-group:airflow-243-<<<Your environment name>>>-*"--Your Amazon MWAA Log Stream Name

If verbose=true, the AWS Glue job run logs present within the Airflow activity logs. The default is fake. For extra info, discuss with Parameters.

When enabled, the DAGs learn from the AWS Glue job’s CloudWatch log stream and relay them to the Airflow DAG AWS Glue job step logs. This supplies detailed insights into an AWS Glue job’s run in actual time through the DAG logs. Be aware that AWS Glue jobs generate an output and error CloudWatch log group based mostly on the job’s STDOUT and STDERR, respectively. All logs within the output log group and exception or error logs from the error log group are relayed into Amazon MWAA.

AWS admins can now restrict a assist group’s entry to solely Airflow, making Amazon MWAA the only pane of glass on job orchestration and job well being administration. Beforehand, customers wanted to examine AWS Glue job run standing within the Airflow DAG steps and retrieve the job run identifier. They then wanted to entry the AWS Glue console to seek out the job run historical past, seek for the job of curiosity utilizing the identifier, and at last navigate to the job’s CloudWatch logs to troubleshoot.

Create the DAG

To create the DAG, full the next steps:

  1. Save the previous DAG code to an area .py file, changing the indicated placeholders.

The values to your AWS account ID, AWS Glue job title, AWS Glue database with CloudTrail desk, and CloudTrail desk title ought to already be identified. You’ll be able to modify the output S3 bucket, output AWS Glue database, and output desk title as wanted, however be sure that the AWS Glue job’s IAM position that you just used earlier is configured accordingly.

  1. On the Amazon MWAA console, navigate to your surroundings to see the place the DAG code is saved.

The DAGs folder is the prefix throughout the S3 bucket the place your DAG file ought to be positioned.

  1. Add your edited file there.

  1. Open the Amazon MWAA console to verify that the DAG seems within the desk.

Run the DAG

To run the DAG, full the next steps:

  1. Select from the next choices:
    • Set off DAG – This causes yesterday’s information for use as the info to course of
    • Set off DAG w/ config – With this feature, you may cross in a distinct date, doubtlessly for backfills, which is retrieved utilizing dag_run.conf within the DAG code after which handed into the AWS Glue job as a parameter

The next screenshot reveals the extra configuration choices in the event you select Set off DAG w/ config.

  1. Monitor the DAG because it runs.
  2. When the DAG is full, open the run’s particulars.

On the appropriate pane, you may view the logs, or select Job Occasion Particulars for a full view.

  1. View the AWS Glue job output logs in Amazon MWAA with out utilizing the AWS Glue console due to the GlueJobOperator verbose flag.

The AWS Glue job could have written outcomes to the output desk you specified.

  1. Question this desk through Athena to verify it was profitable.


Amazon MWAA now supplies a single place to trace AWS Glue job standing and lets you use the Airflow console as the only pane of glass for job orchestration and well being administration. On this submit, we walked via the steps to orchestrate AWS Glue jobs through Airflow utilizing GlueJobOperator. With the brand new observability enhancements, you may seamlessly troubleshoot AWS Glue jobs in a unified expertise. We additionally demonstrated how you can improve your Amazon MWAA surroundings to a appropriate model, replace dependencies, and alter the IAM position coverage accordingly.

For extra details about widespread troubleshooting steps, discuss with Troubleshooting: Creating and updating an Amazon MWAA surroundings. For in-depth particulars of migrating to an Amazon MWAA surroundings, discuss with Upgrading from 1.10 to 2. To be taught in regards to the open-source code modifications for elevated observability of AWS Glue jobs within the Airflow Amazon supplier package deal, discuss with the relay logs from AWS Glue jobs.

Lastly, we advocate visiting the AWS Huge Knowledge Weblog for different materials on analytics, ML, and information governance on AWS.

Concerning the Authors

Rushabh Lokhande is a Knowledge & ML Engineer with the AWS Skilled Companies Analytics Follow. He helps clients implement massive information, machine studying, and analytics options. Exterior of labor, he enjoys spending time with household, studying, working, and golf.

Ryan Gomes is a Knowledge & ML Engineer with the AWS Skilled Companies Analytics Follow. He’s keen about serving to clients obtain higher outcomes via analytics and machine studying options within the cloud. Exterior of labor, he enjoys health, cooking, and spending high quality time with family and friends.

Vishwa Gupta is a Senior Knowledge Architect with the AWS Skilled Companies Analytics Follow. He helps clients implement massive information and analytics options. Exterior of labor, he enjoys spending time with household, touring, and attempting new meals.

Leave a Reply

Your email address will not be published. Required fields are marked *