Sunday, June 27, 2021
AWS Secrets Manager简介
Saturday, June 26, 2021
Google Search 技巧
"vtasters" | Exact Match, "use * card" |
site:stackoverflow.com how to install java | 在某个网站查找 |
Nasdaq news after:2020 | 时间范围查询 Nasdaq news before:2020-03-01 Nasdaq news on:2020-04-15 |
fngu download filetype:pdf | 文件格式 |
intitle:cloudwatch | 只搜title |
allintitle:cloudwatch dashboard | 所有关键字都在title |
related:amazon.com | 相似网站 |
Thursday, June 10, 2021
AWS Glue中PySpark和Spark SQL
Glue封装了PySpark和Spark SQL
DataSource0.count()
DataSource0.printSchema()
df = DataSource0.toDF()
找到value column中含数字字母的
df.filter(df['value'].rlike('\w+')).show()
找到value column中只含数字字母的
df.filter(df['value'].rlike('^a-zA-Z\d\s:') == False).show()
Sunday, June 6, 2021
AWS CDK简介
CDK是用多种语言实现的打包工具。
下面介绍是type script (类似于Node.js)
按这几个步骤初试
Step Function
Glue
可以用json来实现state,而不是用new tasks.GlueStartJobRun. 这是因为有些功能并不支持如GlueStartJobRun.sync.
const stateJson = {
Type: 'Task',
Resource: 'arn:aws:states:::dynamodb:putItem',
Parameters: {
TableName: table.tableName,
Item: {
id: {
S: 'MyEntry',
},
},
},
ResultPath: null,
};
Lambda
CDK中step function中的lambda支持payload参数
new tasks.LambdaInvoke(this, 'Invoke with payload field in the state input', {
lambdaFunction: fn,
payload: sfn.TaskInput.fromObject({
"execution.$": "$$.Excution.Id",
"catalogId": sfn.JsonPath.stringAt('$.catalogId'),
}),
});
API gateway
https://docs.aws.amazon.com/cdk/api/latest/docs/aws-apigateway-readme.html#integration-targets
Saturday, June 5, 2021
AWS Step Function简介
产生StepFunction可以用console的template产生,节省时间
如何在Step function用变量
每一个task都有input和output,如果一个task的TaskStateEntered(AWS Console)的input是
{"input": {
"catalogId": "abc-cde"
}
}
State machine中,这样取值$.catalogId, 根目录对应的是input. 这个task也并不需要写ResultPath(只做filter之用).
input对应$.
output对应$.output.
content对应$..
通过选择,可以选取Lambda InputStream的输入,如
Parameters: {
"FunctionName": "xxx",
"Payload": {
"execution.$": "$$.Execution.Id",
"categoryId.$":: "$.catalogId",
"runId.$": "$output.JobRunState"
}
}
Payload对应Lambda的InputStream的输入
这个task(假设是lambda)是按照以下方式写入output:
public void handle(InputStream in, OutputStream out, Context context) {
HashMap map;
map.put("catalogId, "abc-cde");
String json = objectwriter.toJson(map);
out.write(json);
}
Map换成一个object也是一样的。
Context变量
Context含有step function execution arn,用的时候用$$.
"Execution.$": "$$.Execution.Id"
Step function能支持的Lambda的参数包括Payload,这是lambda的输入
"GetJobParams": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn...Lambda",
"Payload": {
"execution.$": "$$.Execution.Id",
"catalogId.$": "$.catalogId"
}
}
}
"Glue StartJobRun": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "my-etl-job"
},
"Next": "ValidateOutput"
},
Resource不能改,一般来说是用ARN,但Glue没有ARN,Step function通过JobName来定位Glue job。JobRunId不能用自定义格式或者不能加入作为参数,否则会说resource找不到。返回值是Id (JobRunId),JobRunState,ErrorMessage, StartedOn等等。
如果某个Task failed还想继续执行下一个任务,可以用Catch,如ETL job失败,还是想将失败状态写入数据库
"Glue StartJobRun": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "my-etl-job"
},
"Catch": [ {
"ErrorEquals": ["States.Timeout", "States.TaskFailed", "HandledError"],
"Next": "ValidateOutput"
} ],
"ResultPath": "$.output",
"Next": "ValidateOutput"
},
$.output存储Glue的输出结果,下一个task如lambda可以使用。
如何给一个task赋值
刚才讲到怎么调用变量,现在讲怎么给一个task的参数赋值。关键在于尾部加入.$
"Glue StartJobRun": {
"JobName": "my-etl-job",
"Arguments": {
"--catalog_id.$": "$.catalogId"
}
},
Step function只支持这些参数
Lambda retry
lambda本身的retry并不支持,需要再step function里面定义retry
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 30,
"MaxAttempts": 2,
"BackoffRate": 2
}
],