Monday, February 15, 2021

Airflow: Glue Operator looses its region name

 The other day, I tried to run an AWS Glue script from our Airflow instance. Nothing fancy, it would just convert a parquet file to CSV between two S3 buckets. Looking for an operator to use, I found that there is indeed a Glue Operator. It looks pretty easy to configure, so I tried it out:

parquet2csv_glue = AwsGlueJobOperator(
    task_id='parquet2csv_glue',
    dag=parquet2csv_dag,
    job_name='glue-parquet2csv',
    region_name='eu-west-3',
    aws_conn_id=None,
    script_args={
        "--SOURCE_PATH": input_path,
        "--TARGET_PATH": output_path,
    }
)

After triggering the DAG, it would fail, telling me that I need to set the region name. Well, I thought I did!

It probably comes from the fact that recent boto3 versions require region to be set, while it was optional in the past. As airflow is open source, I decided to have a look at the code. As it turns out, the bug was not too hard to spot. The Glue Operator creates a Glue Hook, which declares this constructor:

class AwsGlueJobHook(AwsBaseHook):
    def __init__(
        self,
        s3_bucket: Optional[str] = None,
        job_name: Optional[str] = None,
        desc: Optional[str] = None,
        concurrent_run_limitint = 1,
        script_location: Optional[str] = None,
        retry_limitint = 0,
        num_of_dpusint = 10,
        region_name: Optional[str] = None,
        iam_role_name: Optional[str] = None,
        *args,
        **kwargs,
    ):
        self.job_name = job_name
        self.desc = desc
        self.concurrent_run_limit = concurrent_run_limit
        self.script_location = script_location
        self.retry_limit = retry_limit
        self.num_of_dpus = num_of_dpus
        self.region_name = region_name
        self.s3_bucket = s3_bucket
        self.role_name = iam_role_name
        self.s3_glue_logs = 'logs/glue-logs/'
        kwargs['client_type'] = 'glue'
        super().__init__(*args, **kwargs)

The Hook derives from a base class, AwsBaseHook, that handles the common connection part for all AWS Hooks. The call to the constructor of the super class does not forward the region name. It should probably be called like this:

        super().__init__(region_name=region_name, *args, **kwargs)

I opened a bug report. But in the meanwhile, I still needed my code to work. So I found quite an ugly patch. There is probably better, but since I could see that the boto3 session was created in the AwsBaseHook class, and since our Airflow instance is running from an EC2 and I can just inherit its profile, I made this simple workaround:

import boto3
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

# ugly patch to resolve the bug in the Glue Hook
def _get_credentials(selfregion_name):
    return boto3.session.Session(region_name="eu-west-3"), None

AwsBaseHook._get_credentials = _get_credentials

Hopefully, it won't stay here long...

No comments:

Post a Comment