Friday, March 12, 2021

AWS Glue

Glue是一个自动化的工具,有很多优点如自动生成script,支持常见ETL操作如ApplyMapping,BYOD custom script,crawler自动识别scheme等等。相比于DataPipeline和step functions属于更高层的封装。

Glue之初感

用UI创建一个ETLjob快速实例

需要的IAM role: 

S3FullAccess

AWSGlueServiceRole

CloudWatchLogsFullAccess

Transformation:

可以将CSV, TXT, TSV转化成JSON,JSON的形式不是List而是每个object并排如{"city": "b"}{"city": "a"}. 反之转换也可以,但JSON的输入形式也不能是list,否则ApplyMapping等不能识别。可以支持nested JSON(三层以上均可)

Crawler:

Crawler 自动可以crawl指定bucket里面的file的metadata如column names, file type,ski header,count等等。


Custom Scripts (Spark DataFrame)

Custom transformation可以插入自定义的script,直接integrate到ETL job,但需要实现指定的API。下面是一段例子:

    df = dfc.select(list(dfc.keys())[0]).toDF()

    df_filtered = df.filter(df["year"] > 2018)

    dyf_filtered = DynamicFrame.fromDF(df_filtered, glueContext, "filter_votes")

    return (DynamicFrameCollection({"CustomTransform0": dyf_filtered}, glueContext))

Example script

去除空的row:

    df_filtered = df.filter("videoName != ''")

如果出现空行,强制输出错误,返回到Glue errorMessage

   if df.count != df_filtered.count:

      raise ValueError('Empty row detected.')

custom script 需要和SelectFromCollection连用


Trigger:

Lambda可以作为trigger

https://aws.amazon.com/premiumsupport/knowledge-center/start-glue-job-crawler-completes-lambda/

可以用一个job succeeded event来trigger另一个job,这样就形成一个workflow


Programming:

可视化和代码可以自由转换。首先,可视化创建ETL job,然后转到scripts就看到自动生成的代码。所以只要用这个代码,就可以反过来创建ETL job

要加log的话:logger = glueContext.get_logger()

Python: https://github.com/aws-samples/aws-glue-samples/blob/master/examples/data_cleaning_and_lambda.md

Integration with AWS service: 用boto3 library

Glue API:

支持Scala和Python

Python: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html


start_job_run

只要用下面命令glue start-job-run指明如job-name和scriptLocation等job参数就可以创建。返回JobRunId

$ aws glue start-job-run --job-name "CSV to CSV" --arguments='--scriptLocation="s3://my_glue/libraries/test_lib.py"'

还支持custom parameters比如--source_file_s3_path, --targetFileS3Path等最多达50个。这样就可以令pipeline更灵活和generic,支持不同的输入和输出以及变换操作。

用的时候

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'source_file_s3_path']

logger.info('path name:' + args['source_file_s3_path'])

例子:https://stackoverflow.com/questions/52316668/aws-glue-job-input-parameters

还可以设置concurrency,这样可以同时跑不同parameters对应的job。

Job parameters for CDK:

https://awscdk.io/packages/@aws-cdk/aws-glue@1.22.0/#/./@aws-cdk_aws-glue.CfnJob

Default parameters:

https://docs.amazonaws.cn/en_us/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html


get_job_runs

输入参数为jobName和jobRunId,返回JobRunState(Failed, Succeeded)和errorMessage


Shared code in Glue jobs by Python lib

https://medium.com/@bv_subhash/sharing-re-usable-code-across-multiple-aws-glue-jobs-290e7e8b3025


Glue error Code:

Glue Concurrency

Timeout

Custom error

Internal Failure


Deployment:

由于ETL的Python script都是保存在S3的,所以如果代码commit到git的话就要手动上传到S3。解决方案是利用CDK里面的Assets将本地代码上传到S3. Ref

DocumentDB:

Glue可以连documentDB,用于ETL,这里我们用来做update 一个record. 需要在glue手动设置DocDB的连接。

secrets_manager_client = boto3.client("secretsmanager", region_name="us-west-2")

workflow_status = [{

     "_id": job_run_id,

    "statux": "xxx"

}]

workflow_status_frame = DynamicFrame.fromDF(spark.createDataFrame(workflow_status), glueContext, "nested")

db_writer(workflow_status_frame,  "my_db", "workflow_status")


db_writer(df, database_name, colection_name):

    write_documentdb_options = {

        "uri":

       "database": database_name

       "collection": colection_name

      ....

    }

    glueContext.write_dynamic_frame.from_options(df, connection_type="documentdb", connection_options=write_documentdb_options)

No comments:

Post a Comment