Friday, July 2, 2021

AWS Data Pipeline

 AWS Data Pipeline用于ETL,一个use case是prod accout上的DDB数据clone到alpha account上DDB。先用Data Pipeline复制到alpha上的S3(CloudWatch 定时event),然后再用lambda将数据加载到DDB(CloudWatch 定时event)。缺点是如果这两个步骤是独立的。

Glue有ApplyMapping等,如果需要对数据进行编辑,就应该用Glue

Sunday, June 27, 2021

AWS Secrets Manager简介

用于存RDS,documentDB等密码

如果lambda在VPC的话,不能直接访问Secrets Manager,要开放VPC endpoint才可以
https://docs.aws.amazon.com/secretsmanager/latest/userguide/vpc-endpoint-overview.html

Saturday, June 26, 2021

Google Search 技巧

 


"vtasters"Exact Match, "use * card"
site:stackoverflow.com how to install java 在某个网站查找



Nasdaq news after:2020时间范围查询
Nasdaq news before:2020-03-01
Nasdaq news on:2020-04-15
fngu download filetype:pdf文件格式

intitle:cloudwatch只搜title
allintitle:cloudwatch dashboard所有关键字都在title
related:amazon.com相似网站



Ref


Thursday, June 10, 2021

AWS Glue中PySpark和Spark SQL

 Glue封装了PySpark和Spark SQL


PySpark Select columns


DataSource0.count()

DataSource0.printSchema()

df = DataSource0.toDF()

找到value column中含数字字母的

df.filter(df['value'].rlike('\w+')).show()

找到value column中只含数字字母的

df.filter(df['value'].rlike('^a-zA-Z\d\s:') == False).show()



Sunday, June 6, 2021

AWS CDK简介

 CDK是用多种语言实现的打包工具。


下面介绍是type script (类似于Node.js)

按这几个步骤初试


Step Function

Glue

可以用json来实现state,而不是用new tasks.GlueStartJobRun. 这是因为有些功能并不支持如GlueStartJobRun.sync. 


const stateJson = {

  Type: 'Task',

  Resource: 'arn:aws:states:::dynamodb:putItem',

  Parameters: {

    TableName: table.tableName,

    Item: {

      id: {

        S: 'MyEntry',

      },

    },

  },

  ResultPath: null,

};


Lambda

CDK中step function中的lambda支持payload参数

new tasks.LambdaInvoke(this, 'Invoke with payload field in the state input', {

     lambdaFunction: fn,

      payload: sfn.TaskInput.fromObject({

            "execution.$": "$$.Excution.Id",

            "catalogId": sfn.JsonPath.stringAt('$.catalogId'),

      }),


});


API gateway

https://docs.aws.amazon.com/cdk/api/latest/docs/aws-apigateway-readme.html#integration-targets

Saturday, June 5, 2021

AWS Step Function简介

 产生StepFunction可以用console的template产生,节省时间


如何在Step function用变量

每一个task都有input和output,如果一个task的TaskStateEntered(AWS Console)的input是

{"input": {

       "catalogId": "abc-cde"

    } 

}

State machine中,这样取值$.catalogId, 根目录对应的是input. 这个task也并不需要写ResultPath(只做filter之用).

input对应$.

output对应$.output.

content对应$..

通过选择,可以选取Lambda InputStream的输入,如

Parameters: { 

    "FunctionName": "xxx",

    "Payload": {

         "execution.$": "$$.Execution.Id",

         "categoryId.$":: "$.catalogId",

         "runId.$": "$output.JobRunState"

    }

}

Payload对应Lambda的InputStream的输入


这个task(假设是lambda)是按照以下方式写入output:

public void handle(InputStream in, OutputStream out, Context context) {

     HashMap map;

     map.put("catalogId, "abc-cde");

    String json = objectwriter.toJson(map);

    out.write(json);

}

Map换成一个object也是一样的。


Context变量

Context含有step function execution arn,用的时候用$$.

"Execution.$": "$$.Execution.Id"


StepFunction可以invoke Glue

Step function能支持的Lambda的参数包括Payload,这是lambda的输入

"GetJobParams": {

   "Type": "Task",

   "Resource": "arn:aws:states:::lambda:invoke",

   "Parameters": {

         "FunctionName": "arn...Lambda",

         "Payload": {

                 "execution.$": "$$.Execution.Id",

                 "catalogId.$": "$.catalogId"

          }

     }

}

$.catalogId是原有的lambda input的一个attribute,这里是将execution添加到Lambda的input中。


"Glue StartJobRun": {

      "Type": "Task",

      "Resource": "arn:aws:states:::glue:startJobRun.sync",

      "Parameters": {

        "JobName": "my-etl-job"

      },

      "Next": "ValidateOutput"

    },

Resource不能改,一般来说是用ARN,但Glue没有ARN,Step function通过JobName来定位Glue job。JobRunId不能用自定义格式或者不能加入作为参数,否则会说resource找不到。返回值是Id (JobRunId),JobRunState,ErrorMessage, StartedOn等等。

如果某个Task failed还想继续执行下一个任务,可以用Catch,如ETL job失败,还是想将失败状态写入数据库

"Glue StartJobRun": {

      "Type": "Task",

      "Resource": "arn:aws:states:::glue:startJobRun.sync",

      "Parameters": {

        "JobName": "my-etl-job"

      },

      "Catch": [ {

            "ErrorEquals": ["States.Timeout", "States.TaskFailed", "HandledError"],

            "Next": "ValidateOutput"

         } ],

      "ResultPath": "$.output",

      "Next": "ValidateOutput"

    },

$.output存储Glue的输出结果,下一个task如lambda可以使用。


如何给一个task赋值

刚才讲到怎么调用变量,现在讲怎么给一个task的参数赋值。关键在于尾部加入.$

"Glue StartJobRun": {

        "JobName": "my-etl-job",

        "Arguments": {

               "--catalog_id.$": "$.catalogId"

          }

      },

Step function只支持这些参数


Lambda retry

lambda本身的retry并不支持,需要再step function里面定义retry

"Retry": [

        {

          "ErrorEquals": [

            "States.ALL"

          ],

          "IntervalSeconds": 30,

          "MaxAttempts": 2,

          "BackoffRate": 2

        }

      ],

Saturday, May 15, 2021

AWS API Gateway简介

一步一步建立第一个API + lambda application

AWS CDK 开发 API gateway

Lambda and API gateway Java example


API gateway也可以直接invoke Step function (默认为async的startExecution API,大概80ms latency,也可以invoke stopExcution, startSyncExecution),返回值为step function run id, start time等,一定要用Post。

Post在input里用$request.body支持request payload


IAM role,需要有APIGatewayInvokeFullAccess和StepFunctionFullAccess


Logging:

默认没有log,可以到Monitor->Logging去enable

需要自己创建一个log group

format需要加入$context.error.message, $context.integrationErrorMessage,否则即使是500,也不会知道是什么错误,其他log参数


Authentication & Authorization

可以用Federate + Cognito Identity pool/Cognito user pool 来Authenticate 用户,但这多数用于真正用户。另一种方法是用Lambda Authorizer作为interceptor来做验证。

作为程序用户,可以用IAM user(非AWS resource)或者IAM role(AWS resource)来验证。

需要AmazonAPIGatewayInvokeFullAccess的permission

Frontend用不checkin的方法来本地存API key 


CORS

Request有两种:simple和non simple。我的case是返回contentType=json,不是Simple所以需要CORS support。API gateway设置如下:

allow-control-allow-orgin=https://www.vtasters.com (no slash)

access-control-allow-headers=access-control-allow-origin, content-type, authorization, referer

access-control-allow-methods=POST, GET

access-control-max-age=600 (Same as Chrome)


Thursday, May 13, 2021

DocumentDB简介

Document-based database. MongoDB wrapper. 

大部分功能如index,连接方法类似于RDS,RDS上有full text search功能,支持词频搜索如shop coffee或coffee shop

DocumentDB初次setup

Terms:

Collection: Table

Transaction:

multi-doc for 4.0 above


Other features

Change stream for a collection


Setup + Cloud 9 (IDE)

https://aws.amazon.com/blogs/database/part-2-getting-started-with-amazon-documentdb-using-aws-cloud9/


Shell命令

show dbs

show collections

use mydb (will create or switch to db)

db.createCollection("metadata")   / db.mydb.createCollection("metadata")

db.metadata.insert({"name":"documentdb"})

db.metadata.findOne()

db.metadata.count()

db.metadata.find({name: "Netflix"})

db.metadata.find({name: /Netflix/}) -- like '%Netflix%' (case sensitive)

db.metadata.find({name: /^netflix$/i}) -- like '%netflix%' (case insensitive)


Java coding

import com.mongodb.MongoClient来获取数据

mongoClient.getDatabase('mydb');


DB connection

DB connection会看到是Database operation的两倍,因为会自动产生读和写两个connection,叫replica

https://docs.aws.amazon.com/documentdb/latest/developerguide/connect-to-replica-set.html


1000 per cluster: https://docs.aws.amazon.com/documentdb/latest/developerguide/limits.html

Friday, March 12, 2021

AWS Glue

Glue是一个自动化的工具,有很多优点如自动生成script,支持常见ETL操作如ApplyMapping,BYOD custom script,crawler自动识别scheme等等。相比于DataPipeline和step functions属于更高层的封装。

Glue之初感

用UI创建一个ETLjob快速实例

需要的IAM role: 

S3FullAccess

AWSGlueServiceRole

CloudWatchLogsFullAccess

Transformation:

可以将CSV, TXT, TSV转化成JSON,JSON的形式不是List而是每个object并排如{"city": "b"}{"city": "a"}. 反之转换也可以,但JSON的输入形式也不能是list,否则ApplyMapping等不能识别。可以支持nested JSON(三层以上均可)

Crawler:

Crawler 自动可以crawl指定bucket里面的file的metadata如column names, file type,ski header,count等等。


Custom Scripts (Spark DataFrame)

Custom transformation可以插入自定义的script,直接integrate到ETL job,但需要实现指定的API。下面是一段例子:

    df = dfc.select(list(dfc.keys())[0]).toDF()

    df_filtered = df.filter(df["year"] > 2018)

    dyf_filtered = DynamicFrame.fromDF(df_filtered, glueContext, "filter_votes")

    return (DynamicFrameCollection({"CustomTransform0": dyf_filtered}, glueContext))

Example script

去除空的row:

    df_filtered = df.filter("videoName != ''")

如果出现空行,强制输出错误,返回到Glue errorMessage

   if df.count != df_filtered.count:

      raise ValueError('Empty row detected.')

custom script 需要和SelectFromCollection连用


Trigger:

Lambda可以作为trigger

https://aws.amazon.com/premiumsupport/knowledge-center/start-glue-job-crawler-completes-lambda/

可以用一个job succeeded event来trigger另一个job,这样就形成一个workflow


Programming:

可视化和代码可以自由转换。首先,可视化创建ETL job,然后转到scripts就看到自动生成的代码。所以只要用这个代码,就可以反过来创建ETL job

要加log的话:logger = glueContext.get_logger()

Python: https://github.com/aws-samples/aws-glue-samples/blob/master/examples/data_cleaning_and_lambda.md

Integration with AWS service: 用boto3 library

Glue API:

支持Scala和Python

Python: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html


start_job_run

只要用下面命令glue start-job-run指明如job-name和scriptLocation等job参数就可以创建。返回JobRunId

$ aws glue start-job-run --job-name "CSV to CSV" --arguments='--scriptLocation="s3://my_glue/libraries/test_lib.py"'

还支持custom parameters比如--source_file_s3_path, --targetFileS3Path等最多达50个。这样就可以令pipeline更灵活和generic,支持不同的输入和输出以及变换操作。

用的时候

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'source_file_s3_path']

logger.info('path name:' + args['source_file_s3_path'])

例子:https://stackoverflow.com/questions/52316668/aws-glue-job-input-parameters

还可以设置concurrency,这样可以同时跑不同parameters对应的job。

Job parameters for CDK:

https://awscdk.io/packages/@aws-cdk/aws-glue@1.22.0/#/./@aws-cdk_aws-glue.CfnJob

Default parameters:

https://docs.amazonaws.cn/en_us/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html


get_job_runs

输入参数为jobName和jobRunId,返回JobRunState(Failed, Succeeded)和errorMessage


Shared code in Glue jobs by Python lib

https://medium.com/@bv_subhash/sharing-re-usable-code-across-multiple-aws-glue-jobs-290e7e8b3025


Glue error Code:

Glue Concurrency

Timeout

Custom error

Internal Failure


Deployment:

由于ETL的Python script都是保存在S3的,所以如果代码commit到git的话就要手动上传到S3。解决方案是利用CDK里面的Assets将本地代码上传到S3. Ref

DocumentDB:

Glue可以连documentDB,用于ETL,这里我们用来做update 一个record. 需要在glue手动设置DocDB的连接。

secrets_manager_client = boto3.client("secretsmanager", region_name="us-west-2")

workflow_status = [{

     "_id": job_run_id,

    "statux": "xxx"

}]

workflow_status_frame = DynamicFrame.fromDF(spark.createDataFrame(workflow_status), glueContext, "nested")

db_writer(workflow_status_frame,  "my_db", "workflow_status")


db_writer(df, database_name, colection_name):

    write_documentdb_options = {

        "uri":

       "database": database_name

       "collection": colection_name

      ....

    }

    glueContext.write_dynamic_frame.from_options(df, connection_type="documentdb", connection_options=write_documentdb_options)