Friday, December 30, 2016

AWS Lambda

Lambda的特点是只要写代码而不需关心provision和manager server。计费方式是按程序运行时间。

使用场景

使用场景为event-driven,也就是需要trigger event的。
1. S3(文件类型存储服务如FTP)中文件到达后进行ETL
2. DynamoDB中某一天数据load进去后进行model计算。此用例类似于一。
3. API Gateway(HTTP)收到request后进行计算。如某TV供应商不想管理server或者与产生event的server不是一个语言,只能通过http json进行沟通。一个更具体的例子是在Alexa创建一个skill时,就要lambda编程,如ask WeatherHelper to get weather,这个HTTP发到我用任何语言(Java, python)编写lambda(叫WeatherHelper),然后在我的lambda里写程序去调用weather.com去获取天气再返回结果。所以lambda其实是事件驱动构件,无编程环境只与语言相关的EC2。
4. 移动程序数据分析:kinesis->lambda (数据转换json->csv)

另一个较著名的使用场景是从外部网站买数据导入到dynamoDB,新数据触发lambda更新elasticSearch的数据。















功能

重点:
事件源和计算(重于与其他服务integration)

事件源
更改 Amazon S3 存储桶或 Amazon DynamoDB 表中的数据
使用 Amazon API Gateway 运行代码以响应 HTTP 请求。
AWS Lambda Invoke API
Amazon S3 始终异步调用 Lambda 函数,Amazon Cognito 始终同步调用 Lambda 函数。对于基于轮询的 AWS 服务(Amazon Kinesis、Amazon DynamoDB、Amazon Simple Queue Service),AWS Lambda 轮询流或消息队列并同步调用您的 Lambda 函数。

限制
运行时间15min,内存3G,所有lambda总并发数1000。异步调用,重试调用两次,且在重试之间有一定的延迟。同步调用由caller决定收到200后是否重试。

并发
您无需担心同步线程或进程的问题。不过,您可以使用异步语言功能并行处理事件批次,并将数据保存到 /tmp 目录以便在同一实例上的未来调用中使用。
亲身经历:用SQS发多个messages来触发多个lambda的实例,lambda可以设置concurrency为2,表示最多有2个instance同时运行,其他未被处理的messages会放入到in flight中。

Coldstart
比较慢,10分钟之内instance如果没有活动,就会被reclaimed,下一次lambda再invoke,就会出现cold start问题,cold start大概6-8秒,log不会看到这个gap。
解决方案用Provisioned Concurrency (Config -> Concurrency),此法还是不需要和canary结合用,一些heavy的资源需要初始化如db connections。此法缺点是alias跟version bind在一起,所以新代码不会被deployed。所以需要手动publish新的version,然后alias不用改(bind to latest)。这些都可以用CDK自动化,需要在CDK的test dependency中加入lambda package,这样lambda的改动会trigger CDK去publish一个新的version。


访问其他资源
AWS资源如S3
非AWS资源
VPC中访问RDS和internet

环境变量
lambda的config

函数版本控制和别名
用于发布,别名指向单个Lambda 函数版本

别名的流量转移
类似于weblab来测试和发布新版本的lambda


lambda中的library,最多5个,所有layer大小不超过250M。

重试
基于轮询的事件源(基于流)DynamoDB,Lambda 尝试处理错误的记录批次,直至数据过期,这最多可以为七天。

私信队列DLQ
将异步执行重试两次后失败的运行发送到SQS/SNS进行原因分析

CloudFormation
用其进行lambda代码的管理和部署

其他AWS服务结合
Cloud Events定时trigger
LB
Alexa
Kinesis/Kinesis Firehose
S3
DDB
SQS/SNS


最佳实践

API gateway->lambda->RDS
为了封装weather.com以及其他企业内部数据,所以用lambda作为一个到weather.com的API gateway。

request如
{
    "operation": "create",
    "tableName": "LambdaTable",
    "payload": {
        "Item": {
            "Id": "1",
            "name": "Bob"
        }
    }
}

Lambda中handler为Lambda函数入口:
exports.handler = function(event, context, callback) {
    var operation = event.operation;
    if (event.tableName) {
        event.payload.TableName = event.tableName;
    }
    switch (operation) {
        case 'create':
            dynamo.put(event.payload, callback);
            break;

配置:

假设lambda要写入数据到RDS,首先配置好RDS,然后需要一个执行lambda的role,这个role要有以下这些权限(IAM中配置):
AmazonRDSFullAccess
AWSLambdaFullAccess
AWSLambdaExecute
AWSLambdaVPCAccessExecutionRole
CloudWatchFullAccess

Handler为文件名.入口函数如abc.lambda_handler

abc.py:
   def lambda_handler(event, context):

另一个Lambda的完整例子:S3->Lambda->dynamoDB


这个例子是实现存取删单词

1. 准备dynamoDB表:叫Words, partition key: CustomerId, sort key: Word
2. 给lambda_basic_execution role的权限:AmazonDynamoDBFullAccess
3. 用js写Lambda,API参考
4. 写Test event

lambda的触发事件:

只要上传一个新的文件到S3就可以触发lambda执行,这个事件叫S3 put

for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        print('bucket='+bucket+',key='+key)

测试lambda时候,可以create一个test event,只需要修改bucket name和key即可
         "object": {
          "key": "folder/test.html",
        },
        "bucket": {
          "name": "bucket-1",

测试完毕后就可以add trigger把bucket=bucket-1以及prefix=folder/ 的put事件作为trigger,这样只要上传新文件到这个文件夹lambda即可运行。


'use strict';

console.log('Loading function');

var AWS = require('aws-sdk');
var DOC = require('dynamodb-doc');
var dynamo = new DOC.DynamoDB();
var docClient = new AWS.DynamoDB.DocumentClient();
exports.handler = function(event, context) {

-------------------------------------------------------------------------------------ADD
    var item = {
                 CustomerId: event.Records[0].user,
                 Word: event.Records[0].word
            };

    var cb = function(err, data) {
        if(err) {
            console.log(err);
            context.fail('unable to update Words at this time');
        } else {
            console.log("Successfully saved a word");
            context.done(null, data);
        }
    };
 
    dynamo.putItem({TableName:"Words", Item:item}, cb);
 
    var cbget = function(err, data) {
      if(err) {
         console.log('error on GetWordsInfo: ',err);
         context.done('Unable to retrieve words information', null);
      } else {
         console.log("Query succeeded: Records returned " + data.Items.length);
         var rand = data.Items[Math.floor(Math.random() * data.Items.length)];
         console.log("Random word: " + rand.Word);
      }
   };
-------------------------------------------------------------------------------------GET

    var params = {
        TableName : "Words",
        KeyConditionExpression: "#cust = :customerid",
        ExpressionAttributeNames:{
            "#cust": "CustomerId"
        },
        ExpressionAttributeValues: {
            ":customerid": event.Records[0].user
        }
    };

   dynamo.query(params, cbget);
 
 
   -------------------------------------------------------------------------------------DELETE
   var cbdelete = function(err, data) {
      if (err) {
           console.log('error on deleting word: ', err);
      } else {
           console.log("Successfully removed a word");
      }
   };
     
   var deleteparams = {
        TableName: "Words",
        Key:{
             "CustomerId": event.Records[0].user,
             "Word": event.Records[0].word
        }
    };
 
    console.log("Attempting a delete...");
    docClient.delete(deleteparams, cbdelete);
 
};

Test Event:
Sample event: S3 put

{
  "Records": [
    {
      "eventSource": "aws:s3",
      "eventID": "110",
      "user": "Jack",
      "word": "back"
   
    }
  ]
}

retry:
lambda的retry取决于caller,若无caller自己会retry,否则caller决定,包括ddb stream (https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html)

ref:
官方指南(中文)
webtask
lambda2RDS
官方例子: lambda->RDS
官方例子: S3->lambda

AWS DynamoDB

DynamoDB是AWS产品线之一,它有如下特点:
1. NoSQL
2. schema less设计。也就是每个item(行)的列名不固定可省空间。原理是每item按json存。
3. 只能query(partition)/sort (range key),不能query/sort非主键。除非创建GSI(global secondary index)
4. 计价方式颠覆传统数据库。传统用空间大小计费,而DynamoDB用request峰值计算
5. 存储按partition key来分配存储位置,不用像传统一样做增加空间、shrink、备份等DBA工作
6. 每份数据至少有3个备份

新特性:
1. 无需规划容量read/write capacity,采用按请求付费的定价模式
2. 支持事务await dynamoDb.transactWriteItems
3. Accelerator - dynamoDB的cache


架构

重点:
存储key-value和更改trigger(stream)

核心概念:
Table:与传统DB一样
Item:传统DB的行
Attribute:传统DB的列名
Primary key:是必须的。有两种primary key:一种只用partition key,此时partition key在全表中唯一。另一种是partition key+sort key作为组合主键(如名字+生日唯一确定一个人)。数据时按partition key来存储的,同一partition内,数据按sort key排序。partition key叫hash attribute只能用于相等性比较,而sort key叫range attribute,可以进行相等和大小比较。
Secondary Indexes:额外索引来提高非主键数据读取速度,由于primary key已是索引,所以这里叫额外索引。索引包括全局和局部索引。全局索引针对全表,如studentID(HashKey), courseID(SortKey),  score(IndexHashKey)可以获得某个分数的所有人。局部索引是针对每个partition key的索引,类似于SQL Server的filter index。指定index时还要选include primary key还是其他非key属性或是全部,类似与SQL Server中index的include。但是数据一致性并不能保证。写入新数据,可能需要一些时间才会写到二级索引。
read capacity:默认的read capacity为5,表示每秒只允许5个读操作。

最佳实践
CID, DeviceId+type
EntityType, entityId
如果存epoch time,用Number存
customerid+startPoint作为hash key,endPoint做range Key,再对startPoint做secondary index

底层API:

创建table如下,dynamoDB.getTable("Movies").putItem, getItem, batchGetItem, batchWriteItem, query, scan, delete,这是dynamoDB的API属于low level API。

 AmazonDynamoDBClient client = new AmazonDynamoDBClient()
            .withEndpoint("http://localhost:8000");

 DynamoDB dynamoDB = new DynamoDB(client);

 Table table = dynamoDB.createTable(“Movies”,
                Arrays.asList(
                    new KeySchemaElement("year", KeyType.HASH),  //Partition key
                    new KeySchemaElement("title", KeyType.RANGE)), //Sort key
                    Arrays.asList(
                        new AttributeDefinition("year", ScalarAttributeType.N),
                        new AttributeDefinition("title", ScalarAttributeType.S)),
                    new ProvisionedThroughput(10L, 10L));
            table.waitForActive();

封装层 - Mapper:

AWS SDK
Developer Guide中的例子:

代表一个item:
@DynamoDBTable(tableName="Music")
public class MusicItem {
    private String artist;
    private String songTitle;
    private String albumTitle;
    private int year;

    @DynamoDBHashKey(attributeName="Artist")
    public String getArtist() { return artist;}
    public void setArtist(String artist) {this.artist = artist;}

    @DynamoDBRangeKey(attributeName="SongTitle")
    public String getSongTitle() { return songTitle;}
    public void setSongTitle(String songTitle) {this.songTitle = songTitle;}

    @DynamoDBAttribute(attributeName = "AlbumTitle")
    public String getAlbumTitle() { return albumTitle;}
    public void setAlbumTitle(String albumTitle) {this.artist = albumTitle;}
}

@DynamoDBAttribute不是必须,与lombok连用:

@DynamoDBTable(tableName="Music")
@Data
public class MusicItem {
     @DynamoDBHashKey(attributeName="Artist")
      private String artist;
      private String albumTitle;

      @DynamoDBVersionAttribute
      private Integer version;
}

根据primary key读取一个item:
        AmazonDynamoDBClient client = new AmazonDynamoDBClient();
        DynamoDBMapper mapper = new DynamoDBMapper(client);
     
        MusicItem keySchema = new MusicItem();
        keySchema.setArtist("No One You Know");
        keySchema.setSongTitle("Call Me Today");
        MusicItem result = mapper.load(keySchema);

SDK中核心类为DynamoDBMapper作为持久层的接口来操作DynamoDB,作用也类似于ORM或Hibernate作为Table和class的桥梁。DynamoDBMapper只能用于某一个table的item的创建、读、更新、删除,若要对table进行操作就要用low level API。
AmazonDynamoDBClient client = new AmazonDynamoDBClient(new ProfileCredentialsProvider());
DynamoDBMapper mapper = new DynamoDBMapper(client);
CatalogItem item = new CatalogItem();
mapper.save(item);    

query第一个参数是结果类,第二参数为query条件也是用结果类封装:
CatalogItem partitionKey = new CatalogItem();
partitionKey.setId(102);
DynamoDBQueryExpression<CatalogItem> queryExpression = new DynamoDBQueryExpression<CatalogItem>()
    .withHashKeyValues(partitionKey);
List<CatalogItem> itemList = mapper.query(CatalogItem.class, queryExpression);
for (int i = 0; i < itemList.size(); i++) {
    System.out.println(itemList.get(i).getTitle());
    System.out.println(itemList.get(i).getBookAuthors());
}

mapper的方法包括save, batchSave, load, query, delete
非DynamoDBMapper方法去读数据

Index:
DynamoDBQueryExpression<TableClass> expression = new DynamoDBQueryExpression<TableClass>()
        .withIndexName("index-name")
        .withConsistentRead(false)
        .withHashKeyValues(tableObject)

功能


操作:
GUI:创建表格、增加item都可以通过AWS界面完成

Table区域:
table按区域,也就是east-2有table,west-1就不会有。

Auto scaling:
30min才会生效。

GSI/LSI:
GSI是任何两个属性,而LSI的分区键与原分区键一致,sort key不同。GSI是不保证强一致性,LSI则可以。

乐观锁:
指定一个属性作为版本号。如果保存,版本号不一致,更新失败。用意在于确保,更新的改动不会被其他transaction覆盖。具体实现是客户端版本号+1,输入后端,若match DDB中版本号+1,就成功更新,返回整个item包括新版本号。否则不成功。

PutItem:
覆盖现有item。但可以用条件先判断是否存在。

TTL:
每个item的生存时间(time to live),dynamoDB会48小时内删除过期的项目。用户修改table属性指定TTL的attribute,然后写入item时候用户指定过期时间如currentTime+5 days. 

DDB streams:
用例有DB不同区域sync,DDB数据分析,新用户通知。一个stream是一个改动,只能保留24小时。Kinesis是DDB streams客户端适配器。
DDB stream->lambda->SNS->notification

DDB stream默认设置
Retry = -1 无限重试 (这个retry控制lambda的retry,lambda本身的retry不起作用,所以lambda设的DLQ也没用)
Maximum age of event = -1 无时间限制,event永远留在Stream
https://enrico-portolan.medium.com/how-aws-lambda-retry-really-works-fb823e78b4a1

全局表:
跨区的相同表结构,不保证强一致性,只保证最终一致性。

静态加密:
加密敏感数据。

事务:
仅支持10个项目

DAX:
Cache层,若需要强一致性,不推荐使用。从10毫秒降到微秒。

QueryFilter
用于DAO层的实现如contains, id=(:id).

On-demand mode
provisioned会让request throttled,即使设置了auto-scaling也会有delay。但on-demand就自动30分钟增加两倍,对于6000以下RPU的,就可以设置这个。因为初始为6000。 

最佳实践:
多对多关系:同一个表中,用自join存储。

Export/Import:
用data pipeline来export到S3. 不要设onTerminate的时间,因为大概要过10分钟才会真正开始,若设置了时间过短,就不会开始。

常见问题

resource not found exception
由于throttling,提高read capacity可以解决


AWS CLI:

shorthand syntax:
aws dynamodb create-table --table-name MusicCollection --attribute-definitions AttributeName=Artist,AttributeType=S AttributeName=SongTitle,AttributeType=S --key-schema AttributeName=Artist,KeyType=HASH AttributeName=SongTitle,KeyType=RANGE --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

CLI比较方便,但文档不足,也可以用于其他AWS产品如S3。或者backup到S3再import


Ref:
官方指南(中文)
DynamoDB深度体验
DynamoDB使用经验

Sunday, December 18, 2016

Mockito简介

Mockito是基于JUnit,可以先学习一下JUnit。原则是依赖什么,模拟什么
Mockito可以验证
1. 值是否相等(assertEquals)
2. 方法调用几次(verify)
3. 方法调用参数是否一致(eq/ArgumentCaptor)
4. 方法调用是否按顺序

Stub

比如一个FTP程序,要测试如果断开连接是否会log错误信息Disconnected from FTP。这里断开连接是测试前提(或叫条件)是Stub,会否log错误信息是测试对象(或叫目标)。when里面只能是mock对象,不能是真实对象。when(getName(anyString(), "abc")这是不对,因为参数如果一个是matcher,其他就一定要matcher(eq, any...), i.e. eq("ewf", any())

FTPClient mockedFTP = mock(FTPClient.class);
或者
@Mock
FTPClient mockedFTP

//Stub “无法连接”
when(mockedFTP.isConnected())).thenReturn(false);
when可以任意值如when(foo.add(lt(5), eq(8)))第一参数任意小于5的值及第二参数=8时返回。。
但thenReturn一定要具体指不能用any(Test.class)

//开始测试,真实调用
test.isFTPConnected();

//检查结果
assert(true, Logger.endWith("Disconnected from FTP"));

这样的测试看似显而易见,由代码一看就知道是正确,但为了确保后续维护或者refactor仍能保持这个逻辑(需求),有必要写这样的测试。

Mockito不支持连续mock,如when(mockedFTP.getConn().isConnected).thenReturn(true); 此情况可以逐个mock来做接力
@Mock Connect conn;
when(conn.isConnected).thenReturn(true);

any用于假设,any(class)用于verify
when(mockClass.get(any())).thenReturn();
verify(dynamoDao).put(any(List.class)

API:
doReturn
doNothing
doThrow

Exception的测试
  @Test(expected=ArithmeticException.class) public void testException2(){ doThrow(ArithmeticException.class).when(customer).getCustomerId(); // Act underTest.buildAnimal("tiger", 4); }

InOrder的测试
声明mock的预期调用顺序,verify再验证
// Arrange
InOrder inOrder = inOrder(customer, myJsonHelper);

// Act
underTest.callInOrder();

// Assert
inOrder.verify(customer).setCustomerId(Mockito.anyInt());
inOrder.verify(myJsonHelper).toObject();

Verify

Mockito包括stub, verify, spy三种API。Verify是检查mock对象的某一个函数+参数这个组合(参数不同视为不同调用)的调用次数。使用场合:如果测试对象不返回结果或结果极为复杂就用此校验法。

//真实调用
mockedList.add("one");
//验证add("one")这个调用是否一次
verify(mockedList).add("one");

另一个例子:
request.timestamp=1483685164
device.timestamp=1483685164
//真实调用
updateTimestampActivity.update(request);
//验证add("one")这个调用是否一次
verify(dynamoDao).put(devices);
verify(dynamoDao, times(0)).put(any(Device.class));

Main.java:
update(request){
    request.timestamp=Date.now();
    Devices devices = convertToObject(request);
    dynamoDB.put(devices);
}
这个例子中verify对象时dynamoDao的put方法以及devices参数,由于devices在真实调用中被修改成当前时间与1483685164不一致,所以会报错:参数不一样。代码要更改成verify(dynamoDao).put(any(List.class));表示匹配任何List参数也就是不验证参数。
Matchers.any(Test.class)
Matchers.anyListOf(Test.class)

参数捕捉:

public void test(){
    //act
     underTest.exec();

    //assert
     verify(math).pow(baseCaptor.capture(), exponenetCaptor.capture());
     Integer base = baseCaptor.getValue();
     Float exponenet = exponenetCaptor.getValue();

     assertEqual(2, base);
   
}

@Captor
ArgumentCaptor<Integer> baseCaptor;

@Captor
ArgumentCaptor<Float> exponenetCaptor;

@Mock
Math math;

@InjectMocks
Test underTest;

捕捉参数还可以用eq来代替,如verify(math).pow(eq(2), eq(1.3));等价于verify(math).pow(2,1,3)当prmitive类型时候。如果参数为复合类型,eq会调用equals去做匹配,如eq(student),而不加eq时候会比较指针。

Spy

spy是修改部分测试对象的部分API以满足测试条件,此法慎用。
假设UnderTest有两个函数getProfile(), getName(),而getProfile()内部调用getName(), 此时getName若用when().thenReturn做mock会报错因为getName()也是UnderTest的一个函数。这是可以用spy来修改UnderTest的API来满足。@InjectMocks和@Spy可以连用。例如:

StudentTest:

public void test(){
    //因为student是spy,所以student.get(Name)可以假设
    doReturn("Gary").when(student).getName();
}

@InjectMocks
@Spy
Student student;


Mockito用于有dependency情况下测试,JUnit是无条件下测试,两者结合使用。

Annotation:

annotation显得代码更简洁
annotation mock 功能
@RunWith(MockitoJUnitRunner.class) MockitoAnnotations.initMocks(this); 初始化
@Mock Mockito.mock(ArrayList.class) 自动产生实例,用于假设或叫依赖。如果mock类成员一定要与@InjectMocks连用,否则时null不自动产生实例
@Captor ArgumentCaptor.forClass(String.class) 参数捕捉
@InjectMocks MyDictionary dic; 自动产生实例。需要测试的obj也叫UnderTest,而它的域成员可以mock,用来测试每个函数

class Student {
     String name;
     public void exam(Dialog dialog){}
}

StudentTest:

@Mock
String name;

@InjectMocks
Student student;

@Mock
Dialog dialog;

这里dialog是独立的,即使没有InjectMocks也不影响。但name依赖于student的存在。

另一个例子:

@Test
public void test_BuildAnimal(){
// Arrange
when(customer.getCustomerId()).thenReturn(0);

// Act
Animal animal = underTest.buildAnimal("tiger", 4);

// Assert
assertEquals("0:tiger", animal.drivedName());

}

@InjectMocks
VideoSpeechlet underTest;

@Mock
Customer customer;

Mockito有一定局限性,解决方案是PowerMock

mock contructor:
https://lkrnac.net/blog/2014/01/mock-constructor/

EqualsBuilder
排除某些fields如id
assertTrue(EqualsBuilder.reflectionEquals(student, student2, new String[]{"id"}))
assertTrue(EqualsBuilder.reflectionEquals(student, student2, "id"))

Ref:
理论:
http://blog.csdn.net/zhangxin09/article/details/42422643
API:
http://blog.csdn.net/onlyqi/article/details/6544989
实例:
http://blog.csdn.net/onlyqi/article/details/6546589
JUnit:
http://blog.csdn.net/zhangxin09/article/details/42418441