使用场景
使用场景为event-driven,也就是需要trigger event的。1. S3(文件类型存储服务如FTP)中文件到达后进行ETL
2. DynamoDB中某一天数据load进去后进行model计算。此用例类似于一。
3. API Gateway(HTTP)收到request后进行计算。如某TV供应商不想管理server或者与产生event的server不是一个语言,只能通过http json进行沟通。一个更具体的例子是在Alexa创建一个skill时,就要lambda编程,如ask WeatherHelper to get weather,这个HTTP发到我用任何语言(Java, python)编写lambda(叫WeatherHelper),然后在我的lambda里写程序去调用weather.com去获取天气再返回结果。所以lambda其实是事件驱动构件,无编程环境只与语言相关的EC2。
4. 移动程序数据分析:kinesis->lambda (数据转换json->csv)
另一个较著名的使用场景是从外部网站买数据导入到dynamoDB,新数据触发lambda更新elasticSearch的数据。
功能
重点:事件源和计算(重于与其他服务integration)
事件源
更改 Amazon S3 存储桶或 Amazon DynamoDB 表中的数据
使用 Amazon API Gateway 运行代码以响应 HTTP 请求。
AWS Lambda Invoke API
Amazon S3 始终异步调用 Lambda 函数,Amazon Cognito 始终同步调用 Lambda 函数。对于基于轮询的 AWS 服务(Amazon Kinesis、Amazon DynamoDB、Amazon Simple Queue Service),AWS Lambda 轮询流或消息队列并同步调用您的 Lambda 函数。
限制
运行时间15min,内存3G,所有lambda总并发数1000。异步调用,重试调用两次,且在重试之间有一定的延迟。同步调用由caller决定收到200后是否重试。
并发
您无需担心同步线程或进程的问题。不过,您可以使用异步语言功能并行处理事件批次,并将数据保存到 /tmp 目录以便在同一实例上的未来调用中使用。
亲身经历:用SQS发多个messages来触发多个lambda的实例,lambda可以设置concurrency为2,表示最多有2个instance同时运行,其他未被处理的messages会放入到in flight中。
Coldstart
比较慢,10分钟之内instance如果没有活动,就会被reclaimed,下一次lambda再invoke,就会出现cold start问题,cold start大概6-8秒,log不会看到这个gap。
解决方案用Provisioned Concurrency (Config -> Concurrency),此法还是不需要和canary结合用,一些heavy的资源需要初始化如db connections。此法缺点是alias跟version bind在一起,所以新代码不会被deployed。所以需要手动publish新的version,然后alias不用改(bind to latest)。这些都可以用CDK自动化,需要在CDK的test dependency中加入lambda package,这样lambda的改动会trigger CDK去publish一个新的version。
访问其他资源
AWS资源如S3
非AWS资源
VPC中访问RDS和internet
环境变量
lambda的config
函数版本控制和别名
用于发布,别名指向单个Lambda 函数版本
别名的流量转移
类似于weblab来测试和发布新版本的lambda
层
lambda中的library,最多5个,所有layer大小不超过250M。
重试
基于轮询的事件源(基于流)DynamoDB,Lambda 尝试处理错误的记录批次,直至数据过期,这最多可以为七天。
私信队列DLQ
将异步执行重试两次后失败的运行发送到SQS/SNS进行原因分析
CloudFormation
用其进行lambda代码的管理和部署
其他AWS服务结合
Cloud Events定时trigger
LB
Alexa
Kinesis/Kinesis Firehose
S3
DDB
SQS/SNS
最佳实践
API gateway->lambda->RDS为了封装weather.com以及其他企业内部数据,所以用lambda作为一个到weather.com的API gateway。
request如
{
"operation": "create",
"tableName": "LambdaTable",
"payload": {
"Item": {
"Id": "1",
"name": "Bob"
}
}
}
Lambda中handler为Lambda函数入口:
exports.handler = function(event, context, callback) {
var operation = event.operation;
if (event.tableName) {
event.payload.TableName = event.tableName;
}
switch (operation) {
case 'create':
dynamo.put(event.payload, callback);
break;
配置:
假设lambda要写入数据到RDS,首先配置好RDS,然后需要一个执行lambda的role,这个role要有以下这些权限(IAM中配置):AmazonRDSFullAccess
AWSLambdaFullAccess
AWSLambdaExecute
AWSLambdaVPCAccessExecutionRole
CloudWatchFullAccess
Handler为文件名.入口函数如abc.lambda_handler
abc.py:
def lambda_handler(event, context):
另一个Lambda的完整例子:S3->Lambda->dynamoDB
这个例子是实现存取删单词
1. 准备dynamoDB表:叫Words, partition key: CustomerId, sort key: Word
2. 给lambda_basic_execution role的权限:AmazonDynamoDBFullAccess
3. 用js写Lambda,API参考
4. 写Test event
lambda的触发事件:
只要上传一个新的文件到S3就可以触发lambda执行,这个事件叫S3 putfor record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print('bucket='+bucket+',key='+key)
测试lambda时候,可以create一个test event,只需要修改bucket name和key即可
"object": {
"key": "folder/test.html",
},
"bucket": {
"name": "bucket-1",
测试完毕后就可以add trigger把bucket=bucket-1以及prefix=folder/ 的put事件作为trigger,这样只要上传新文件到这个文件夹lambda即可运行。
'use strict';
console.log('Loading function');
var AWS = require('aws-sdk');
var DOC = require('dynamodb-doc');
var dynamo = new DOC.DynamoDB();
var docClient = new AWS.DynamoDB.DocumentClient();
exports.handler = function(event, context) {
-------------------------------------------------------------------------------------ADD
var item = {
CustomerId: event.Records[0].user,
Word: event.Records[0].word
};
var cb = function(err, data) {
if(err) {
console.log(err);
context.fail('unable to update Words at this time');
} else {
console.log("Successfully saved a word");
context.done(null, data);
}
};
dynamo.putItem({TableName:"Words", Item:item}, cb);
var cbget = function(err, data) {
if(err) {
console.log('error on GetWordsInfo: ',err);
context.done('Unable to retrieve words information', null);
} else {
console.log("Query succeeded: Records returned " + data.Items.length);
var rand = data.Items[Math.floor(Math.random() * data.Items.length)];
console.log("Random word: " + rand.Word);
}
};
-------------------------------------------------------------------------------------GET
var params = {
TableName : "Words",
KeyConditionExpression: "#cust = :customerid",
ExpressionAttributeNames:{
"#cust": "CustomerId"
},
ExpressionAttributeValues: {
":customerid": event.Records[0].user
}
};
dynamo.query(params, cbget);
-------------------------------------------------------------------------------------DELETE
var cbdelete = function(err, data) {
if (err) {
console.log('error on deleting word: ', err);
} else {
console.log("Successfully removed a word");
}
};
var deleteparams = {
TableName: "Words",
Key:{
"CustomerId": event.Records[0].user,
"Word": event.Records[0].word
}
};
console.log("Attempting a delete...");
docClient.delete(deleteparams, cbdelete);
};
Sample event: S3 put
{
"Records": [
{
"eventSource": "aws:s3",
"eventID": "110",
"user": "Jack",
"word": "back"
}
]
}
retry:
lambda的retry取决于caller,若无caller自己会retry,否则caller决定,包括ddb stream (https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html)
ref:
官方指南(中文)
webtask
lambda2RDS
官方例子: lambda->RDS
官方例子: S3->lambda