Thursday, March 16, 2017

lombok

ORM工具,你只要写field就可以产生自动产生getter setter
@Data
=
其他
@ToString
@EqualsAndHashCode
@NoArgsConstructor(access=AccessLevel.PROTECTED)

boolean getter
@Getter
private boolean isGood; // => isGood()

@Getter
private boolean good; // => isGood()

@Getter
private Boolean isGood; // => getIsGood()

Code
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class User {
    private int id;
    private String username;
    private String email;
    public User() {
    }
    public User(int id, String username, String email) {
        this.id = id;
        this.username = username;
        this.email = email;
    }
    // 如果提供了访问函数,则 Lombok 不会为其再生成
    public String getEmail() {
        return "Email: " + email;
    }
}


<=>
public class User {
    private int id;
    private String username;
    private String email;
    public User() {
    }
    public User(int id, String username, String email) {
        this.id = id;
        this.username = username;
        this.email = email;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getEmail() {
        return email;
    }
    public void setEmail(String email) {
        this.email = email;
    }
}

@Builder
Student.builder().id(5).build();

Others:
import lombok.Data;

@Data
public class Teacher {
     private String name;
     private String title;
}

Eclipse安装Lombok这样自动产生class
https://stackoverflow.com/questions/22310414/how-to-configure-lombok-in-eclipse-luna

ref:
http://xtuer.github.io/java-lombok/

Saturday, March 11, 2017

AWS SQS

云端队列服务,客户端polling获取。用于不同系统间通信和解耦。

使用场景

SQS是非实时处理任务,将同步变异步解耦削峰,是一个分布式的FIFO队列。需要创建一个叫QueueName的队列,然后通过SQSclient发消息到此队列,另一个SQSclient从此队列收消息。


架构


队列(保存多个消息) 在多个 Amazon SQS 服务器上冗余存储消息。

短轮询
Wait time set to 0. 采样方式获取信息,因为采用冗余,这次获取不了某信息,下次将会获得。

Queue参数
Delivery delay: 延时一段时间sender的SQS client才发送到云端Queue中
Receive message wait time: recevier的SQS客户端每隔多长时间会从云端Queue中poll信息。默认为
20s,即使空的信息也会poll。会看到客户端的latency就会是这个数。用SNS可以将Poll变成Push模式。
Retention period: msg保留时长1min-14天


sendMessage, receiveMessage API的latency是几十到低几百毫秒。


最佳实践

常与SNS连用。
DynamoDB stream->SQS/SNS->Elastic search
System 1->SQS->System 2

Queue Actions 中,选择 Subscribe Queue to SNS Topic (或 Subscribe Queues to SNS Topic)。

队列操作中,选择Configure Trigger for Lambda Function (为 函数配置触发器)。
SQS作为触发器,会有5个线程每20秒poll一次,所以一分钟是15个 empty receives. 一天是21600个SQS
request,free tier是1百万次,如果超过,基本上是1个SQS用2蚊一个月。

底层API

final SendMessageResult sendMessageResult = sqs.sendMessage(
new SendMessageRequest(myQueueUrl, "This is my message text."));
final String messageId = sendMessageResult.getMessageId();


SendMessageRequest request = new SendMessageRequest(myQueueUrl, "This is my message
text.");
request.setDelaySeconds(5);


final ReceiveMessageRequest receiveMessageRequest = new
 ReceiveMessageRequest(myQueueUrl);
final List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (final Message message : messages) {
    System.out.println("Message");
    System.out.println("  MessageId: " + message.getMessageId());
    System.out.println("  ReceiptHandle: " + message.getReceiptHandle());
    System.out.println("  MD5OfBody: " + message.getMD5OfBody());
    System.out.println("  Body: " + message.getBody());
    for (final Entry<String, String> entry : message.getAttributes().entrySet()) {
        System.out.println("Attribute");
        System.out.println("  Name: " + entry.getKey());
        System.out.println("  Value: " + entry.getValue());
} }

功能


标准队列
无限吞吐量 – 标准 队列每个 操作支持接近无限的每 秒事务数 (TPS)。 
(含重)至少传送一次 – 消息至少传送一次,但偶尔会传送 消息的多个副本。
标准 队列每个 操作支持接近无限的每秒事务数 (TPS)。 标准队列支 持至少一次消息传递。但是,
由于存在允许近乎无限吞吐量的高度分布式架构,偶尔会有一条消息的多个副本不按顺序传送。标准
队列会尽最大努力进行排序,保证了消息大致按其发送的顺序进行传递。亲身经历,即使我发了两条
信息到SQS,但看到message in flight是6,重复了2倍。如果lambda是heavy计算且会写入S3或DB的
话,最好用FIFO。

In flight
当一个信息被一个lambda pickup,该message就会被移到in flight,当可见性超时,就会从in flight移回
queue中。


FIFO 队列
高吞吐量 – 默认情况下,借助批处 理,FIFO 队列每秒支持多达 3000 条消息。
(去重)仅传输一次处理 – 消息传递一次并在使用者处理 并删除它之前保持可用。不会将重复项引入
到队列 中。这个通过指定dedupeID(messageGroupId specified by sender)实现或者enable
先进先出传递 – 严格保持消息的发送和接收顺序。

吞吐量很重要时用标准队列,当事件的顺序重要时用FIFO。

可见性超时: 收到消息后,消息将立即保留在队列中。为防止其他使用者再次处理消息,Amazon 
SQS 设置了可见性超 时,这是 Amazon SQS 防止其他使用者接收和处理消息的时间段。消息的
默认可见性超时为 30 秒。最小值 为 0 秒。最大值为 12 小时。 这是console的配置,针对所有信息。
API可针对单个或多个信息。

一般设置为lambda timeout的6倍,因为2次retry
To allow your function time to process each batch of records, set the source queue's visibility timeout to at least 6 times the timeout that you configure on your function.

延迟队列

延迟队列可让您将针对队列的新消息传递操作推迟特定的秒数。如果您创建延迟队列,则发送到该
队列的任何消息在延迟期间对用户都保持不可见。

死信队列Dead letter queue:死信队列是其他(源)队列可将其作为无法成功处理(使用)的消息的目标的队列。当消息的 ReceiveCount 超出队列的 maxReceiveCount 时,Amazon SQS 会将该消息移到死信队列。FIFO 队列的死信队列也必须为 FIFO 队列。同样,标准 队列的死信队列也必须为 标准 队列。 死信队列的主要任务是处理消息失败。利用死信队列,您可以留出和隔离无法正确处理的消息以确定其处理 失败的原因。

我做过一个的例子是用sqs触发lambda,只要在lambda中throw exception就可以让msg等待可见性时段,SQS会自动将它放回原队列中,让它重试,这是maxReceiveCount+1。当超过maxReceiveCount时,msg会被放入DLQ,所以若maxReceiveCount设置比较大,会lambda会多次运行。Retention period要多于maxReceiveCount,否则,msg会被删除而没有机会进入DLQ。SQS-Lambda的可视时间设置为15分钟(=lambda的timeout时间),若maxReceiveCount=2,要等30分钟,msg会被放入DLQ。DLQ每poll一次(包括打开UI),count就会+1. msg被成功处理后,会被自动删除,不需要consumer做。Lambda最好不要有cache(class variable甚至是static),因为如果某段时间内,若lambda没有收到任何request,含lambda的容器才会重启,cache才有机会被refresh。所以若一直有request,cache永远不会被更新。
总结:
1. Lambda不需要retry,因为maxReceiveCount就是retry
2. retention period和maxReceiveCount共同决定retry次数。默认值分别为4天和500,若lambda时间为15分钟,500是一个非常大的值,lambda会跑4天,所以我的经验是设置rentention为12小时,maxReceives为3.

使用 Amazon S3 管理大型 Amazon SQS 消息。只适用于Java SDK


Kinesis的区别:Kinesis是实时的且可用于大数据。


官网
Github的Java例子
SQS->Lambda
Lambda中的cache

Friday, March 10, 2017

二级缓存

一级缓存是内存,二级缓存是硬盘或者分散式网络内存,它是一级缓存的扩展,弥补一级缓存容量有限且不能共享的缺点,并且可以让整个application(硬盘)或不同server(分散式网络内存)共享,虽然它比一级缓存稍慢。

Monday, March 6, 2017

Guice简介

Guice也是采用DI标准。Guice比Spring轻量级,启动速度稍快,但是没有Spring完善。
Guice发明目的为了1.分离依赖 2.方便测试(插入fake依赖),当然这也正是DI提出的目标。

bind主要用于勾画类的关系(如继承)

与Spring比较
@Named -> @Inject

没继承 -> @Inject 不需要@Named
@provides -> @Inject 如果提供实例需另外处理。当然还可以加上@Singleton,它可在provides或类名前。

最简单的例子

模拟在商店用信用卡买食品的系统:

public class App
{
    public static void main( String[] args )
    {
        Injector injector = Guice.createInjector(new BillingModule());
        BillingService billingService = injector.getInstance(BillingService.class);
        billingService.chargeOrder("pizza", new CreditCard());
    }
}

public class BillingModule extends AbstractModule {
@Override
protected void configure() {}
}

public class CreditCardProcessor {
String name = "Paypal";
}

public class BillingService {
private final CreditCardProcessor processor;
private final TransactionLog transactionLog;

@Inject
BillingService(CreditCardProcessor processor, TransactionLog transactionLog) {
this.processor = processor;
this.transactionLog = transactionLog;
}

public int chargeOrder(String order, CreditCard creditCard) {
System.out.println("processing purchase");
System.out.println(processor.toString());
System.out.println(transactionLog.toString());
return 0;
}
}

1. 在Module里面的binding不是必须的,如果有类关系(如继承)才需要在这里配置。
2. Guice的Inject一般在构造函数,这样私有成员可以声明final,比较安全。只要用@Inject,     Guice就会自动注入,不需要在CreditCardProcessor加上Named(Spring做法)
3. Guice首先根据Module生成Injector,然后获得启动类再启动服务。
比较一下Spring的启动方式,很相似,区别就在于类关系表现在class还是xml
ApplicationContext context = new ClassPathXmlApplicationContext("com/vtasters/beans.xml");
Video t = (Video) context.getBean("video");

Binding绑定

当Inject一个interface时候,需要在config说明用哪个实现类代替这个interface,这就是binding的作用。
@Inject
BillingService(CreditCardProcessor processor, TransactionLog transactionLog)
CreditCardProcessor此时是Interface非实现类,所以要说明其实现类。

在config中加入bind(CreditCardProcessor.class).to(PaypalCreditCardProcessor.class);

interface CreditCardProcessor {
String getName();
}

public class PaypalCreditCardProcessor implements CreditCardProcessor{
String name = "Paypal";
public String getName(){return name;}
}

另一方法是JustInTimeBindings用@ImplementedBy
@ImplementedBy(PayPalCreditCardProcessor.class)
public interface CreditCardProcessor
这样就不用写bind

BindingAnnotations注释绑定

用注释来提高绑定的代码可读性

第一种方法是@interface法,用@interface创建自定义注释。
粗体部分为新加入。加入@PayPal后目的是提高代码易读性,这样不用去binding就知道CreditCardProcessor绑定到PaypalCreditCardProcessor。当然同时也产生了额外的代码(@interface类)

@BindingAnnotation
@Target({ FIELD, PARAMETER, METHOD })
@Retention(RUNTIME)
public @interface PayPal {}

bind(CreditCardProcessor.class).annotatedWith(PayPal.class).to(PayPalCreditCardProcessor.class);

@Inject BillingService(@PayPal CreditCardProcessor processor, TransactionLog transactionLog)

第二种方法是@Named法,内嵌式注释。此法省去@interface创建,但忘记绑定或绑定字符串typo编译器不能发现,所以Guice不推荐

public class DatabaseTransactionLog implements TransactionLog加入实现类

bind(TransactionLog.class).annotatedWith(Names.named("Database")).to(DatabaseTransactionLog.class);

@Inject BillingService(@PayPal CreditCardProcessor processor,
                                      @Named("Database") TransactionLog transactionLog)

InstanceBindings绑定实例

通过注释绑定一个String常数的实例,但复合类型不用此法而是用@Provides
bind(String.class).annotatedWith(Names.named("url")).toInstance("jdbc:mysql://localhost/pizza");
可以用此法绑定更好
bindConstant().annotatedWith(Names.named("dbname")).to("MySQL");


@Provides提供实例

如果不用绑定可以用@Provides,如下例provideStore方法代替了bind(Store.class).to(RegularStore.class)

public class BillingModule extends AbstractModule {
@Provides
Store provideStore() {
RegularStore store = new RegularStore();
store.setId("123");
return store;
}
}

@Inject BillingService(@PayPal CreditCardProcessor processor, @Named("Database") TransactionLog transactionLog, Store store)

Provides也可以与binding一样加注释:
        @Provides
@Named("regular")
Store provideStore()

@Inject BillingService(@PayPal CreditCardProcessor processor, @Named("Database") TransactionLog transactionLog, @Named("regular") Store store)


ProviderBindings提供绑定

如果@Provides模块太大需要独立成一个class可以实现Provider做到,还要加入bind中。
public class RegularEmployeeProvider implements Provider<Employee> {
public Employee get() {
RegularEmployee employee = new RegularEmployee("Sue");
return employee;
}
}

@Inject BillingService(@PayPal CreditCardProcessor processor, @Named("Database") TransactionLog transactionLog, @Named("regular") Store store, Employee employee)

bind(Employee.class).toProvider(RegularEmployeeProvider.class);

另一方法是JustInTimeBindings用@ProvidedBy
@ProvidedBy(RegularEmployeeProvider.class)
public interface Employee
这样就不用写bind

范围(如单例)

表示每次需要实例时,提供同一实例。在application运行期均为单例。
@Provides
@Named("regular")
@Singleton
Store provideStore()

以下两个服务都返回同一个RegularStore
        BillingService billingService = injector.getInstance(BillingService.class);    
        BillingService billingService2 = injector.getInstance(BillingService.class);

上法是Provides法,还可以用binding法和加到RegularStore上面,若冲突,以binding为准。
bind(Store.class).to(RegularStore.class).in(Singleton.class);

@Singleton
public class RegularStore implements Store

非继承类

无继承的类可以用@Provides来提供实例,正如之前提过不用binding
@Provides
@Singleton
Cache provideCache(){
Cache cache = new Cache();
cache.setName("mycache");
return cache;
}
@Inject BillingService(@PayPal CreditCardProcessor processor, @Named("Database") TransactionLog transactionLog, @Named("regular") Store store, Employee employee, Cache cache)


ref:
官方
HeroModule
provider design pattern
HelloGuiceServiceImpl
singleton

负载测试Load test

负载测试用于负载测试和性能测试,使用它来查找和发现相关的性能和负载管理问题,尤其是Web应用程序的性能问题。

原理是手动写一或多个request,JMeter会用多线程随机产生很多这类requests发给要测试的service。测试会分阶段进行比如第一分钟5个request,第二分钟10个,如此类推。一般来说,前期latency基本不变,但某一个点后延迟会显著增加。

Throughout(TPS: transactions per second):当增加连接(request)数时,如果latency显著增加,这样throughout并不会显著增加甚至下降,server遇到瓶颈。这就是最大的TPS。这时候就知道系统是否可以处理到某个TPS值如800.
























ref:
JMeter


AWS Elasticsearch原理

报纸公司一开始把文章存在数据库的一个叫contents列中,但发现多关键词检索难以做到,即使做到(MySQL的full text search)也比较慢且难以全表搜索(比如把title,tags列也作为搜索目标)。此时,全文搜索应运而生,它可以满足:

1. 全文搜索:多个列中搜索
2. 多关键词:不用写SQL的like一样麻烦
3. 速度更快
4. 高级搜索:比如一次性搜索中,某些关键词权重高些

以下是公司用例:
GitHub:搜索代码和checkin日志
StackOverflow: 搜索问题和答案
HotelTonight:这是结构性数据,有人会有疑问,但它用ES来做多列搜索,如价钱+评分+位置
Wikipedia:搜索文章,用于autocomplete

原理

如ADB课程的proj。首先将文档tokenize,统计每个词(单词规范化,小写过滤)的出现的文档(倒排索引)
data           Doc 1, Doc2
engineer    Doc1
program    Doc2
将这些结果写到多个shard(分片,是实际存储数据的Lucene索引 - DB,含其他某一分片的一个副本,如data存于主片1,当然主片1的副本可含engineer,engineer存于主片2)中,可以保证每个shard做独立搜索,加快速度。然后将结果整合,排序,最后(根据汇总结果)请求原始文档。
更新时候,类似于hashmap,可以找到该词找到相应的shard,更新shard,然后写到文件中,ES会对副本进行异步更新。删除是用soft delete,维护文档版本号,query时候过滤掉旧版本号。

基本概念

ES与MySQL对应概念
MySQL Elasticsearch
Database Index
Table Type
Row Document
Column Field
Schema Mappping
Index Everything Indexed by default
SQL Query DSL

写入数据:
client.index({
    index : 'test',
    type : 'article',
    id : '100',
    body : {
        title : '什么是 JS?',
        slug :'what-is-js',
        tags : ['JS', 'JavaScript', 'TEST'],
        content : 'JS 是 JavaScript 的缩写!',
        update_date : '2015-12-15T13:05:55Z',
    }
})

全文搜索JS:
client.search({
    index : 'test',
    type : 'article',
    q : 'JS',
});

搜索结果:结果都在hits中
























高级搜索DSL(类似于SQL):
1. 只搜某个Document(例如message):match
    "query": {
        "match" : {
            "message" : "this is a test"
        }
    }
2. 准确查找:term
 "query": {
    "term" : { "user" : "Kimchy" }
  }
3. 范围查找:range
"query": {
        "range" : {
            "age" : {
                "gte" : 10,
                "lte" : 20,
                "boost" : 2.0
            }
        }
    }
boost是权重,默认1.0,表示这个query权重较高

AWS

ES由Lucene进化而来,AWS实现了ES从而推出了自己的产品。它的步骤是
1. 创建domain,如叫movies
2. 上传json数据













3. 搜索:全文搜索nightmare
curl -XGET 'search-movies-4oy.us-west-1.es.amazonaws.com/movies/_search?q=nightmare'

AWS例子中,有4步曲,要设为public access以及设置master user名和密码
query的例子:

在某index上搜索
curl -XGET -u 'master-user:master-user-password' 'domain-endpoint/movies/_search?q=mars&pretty=true'

在所有index上搜索
curl -XGET -u 'master-user:master-user-password' 'domain-endpoint/_search?q=mars&pretty=true'

在所有index上搜索多个关键字(类似于或的搜索)
curl -XGET -u 'master-user:master-user-password' 'domain-endpoint/_search?q=mars%20Jack&pretty=true'

在所有index上多域搜索
curl -XGET -u 'master-user:master-user-password' 'domain-endpoint/_search?q=title:mars%20AND%20actors:Jack&pretty=true'

Ref

原理

Sunday, March 5, 2017

AWS CloudWatch

AWS CloudWatch是Amazon的运营数据监测和警报系统,用于监测server的CPU,memory,IO读写,吞吐量警报,server开启关闭,等等。

















创建定制化的dashboard:
所以metrics都汇总到这里如RDS, SQS。只要创建一个自定义如关于SQS的dashboard含msgSent, msgReceived等,然后可以share到一个wiki。

ref:
简介



AWS Kinesis Stream

AWS Kinesis Stream是近乎实时收集和处理大数据记录流。可以理解为近乎实时数据传输管道,相当于以前用FTP传输数据,但它是用API且将数据作为字节流传输。1秒的平均传播延迟。

使用场景

用于快速而持续的数据引入和聚合
使用的数据类型可以包括 IT 基础设施日志数据、应用程序日志、社交媒体、市场数据源和 Web点击流数据。由于数据引入和处理的响应时间是实时的,因此处理通常是轻量级的。

日志引入:推送系统和应用程序日志=metrics
实时数据分析:实时处理网站点击流
实时聚合数据,然后将聚合数据加载到数据仓库或 map-reduce群集

架构

关键在于分片shard,流可以指定多个分片,而AWS Kinesis需要花时间为该流分配出分片。分为生产者和使用者,分别是产生数据和消费数据。

Kinesis Data Stream:

保留期(retention period)是数据记录在添加到流中后可供访问的时间长度。在创建之后,流的保留期设置
为默认值 24小时。最多到7天。

分片是流中数据记录的唯一标识序列。一个流由一个或多个分片组成,每个分片提供一个固定的容量单
位。流的总容量是其分片容量的总和。
分区键用于按分片对流中的数据进行分组。MD5 哈希函数用于将分区键映射到 128 位整数值并将关联的
数据记录映射到分片。应用程序需要对每个数据指定分区键。在创建流时,您将指定流的分片数。
number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)

使用者称为Amazon Kinesis Data Streams Application
Kinesis Client Library(供使用者使用,是Kinesis服务的客户端)KCL 负责许多与分布式计算相关的复杂
任务,例如对多个实例进行负载均衡、
对实例故障做出响应、对已处理的数据执行检查点操作和对重新分 片做出反应。KCL 可让您将精力
放在编写记录处理逻辑上。
它使用Amazon DynamoDB表存储控制数据。它会为每个正在处理数据的应用程序创建一个表。实例中,
选择 KinesisDataVisSampleApp-KCLDynamoDBTable-[randomString] 表。
在表中有两个条目,指示特 定分片 (leaseKey)、流中的位置 (checkpoint) 和读取数据的应用程序 
(leaseOwner)。

用 AWS KMS 主密钥进行加密


最佳实践

AWS实例是统计URL实时数目(2s window)。
分析实时股票数据

底层API

创建和描述stream,需要的时间创建stream主要用于分配shard
create-stream --stream-name Foo --shard-count 1
describe-stream --stream-name Foo

Response:
{
   "StreamDescription": {
       "StreamStatus": "ACTIVE",
       "StreamName": "Foo",
       "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo",
       "Shards": [
           {
               "ShardId": "shardId-000000000000",
               "HashKeyRange": {
                   "EndingHashKey": "170141183460469231731687303715884105727",
                   "StartingHashKey": "0"
               },
               "SequenceNumberRange": {
                   "StartingSequenceNumber": "495469866831355442865074579357546397794"
               }
           }
       ]
   }
}

AWS 区域的默认分片限制为 500 分片
单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录.

每个 PutRecord 调用都需要流名称、分区键和创建者正在添加到流的数据记录。
put-record --stream-name Foo --partition-key 123 --data testdata
Response:
{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546156785154" }

要将数据放入流,您必须指定流的名称、分区键和要添加到流的数据 Blob。GetRecords
可以从单个分片中检索多达每个调用 10 MiB 数据,每个调用多达 10000 个记录。每个分片可以通过 GetRecords 支持每秒 2 MiB 的最大总数据读取速率。
首先获得分片的迭代器(指针)
get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
得到迭代器的id,然后将其带入到getRecords API

get-records --shard-iterator AAAAAAAAAAHSywlj
Response:
{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 
1.441215410867E9, 
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], 
"MillisBehindLatest":24000, "NextShardIterator":"AAAA
Data为base64编码
NextShardIterator下一个iterator,即使流没有新数据
当Records为空的时候才表示全部读完
MillisBehindLatest表示距流末端(最新数据)还有多久。零值指示正进行记录处理,此时没有新
的记录要处理

术语检查点操作是指记录到流中目前已使用和处理的数据记录所在的点,这样一来,当应用程序
发生崩溃时,系统将从该点读取流,而不是从头开始读取流。
只要记录SequenceNumber。然后获取Iterator时指定这个checkpoint即可。
GetShardIterator::StartingSequenceNumber

封装层

生产器  - KPL
Kinesis Producer Library (KPL) 简化了创建器应用程序的开发.KPL 是一个易于使用的、高度可配置的库,可帮助您对 Kinesis data stream 进行写入. 由于 KPL 可在将记录发送到 Kinesis Data Streams 之前对其进行缓冲处理,从而产生高吞入量。KPL 会导致库(用户可配置的)中产生高达 RecordMaxBufferedTime 的额外处理延 迟。RecordMaxBufferedTime 值越大,产生的包装效率和性能就越高。

使用器 - KCL
RecordMaxBufferedTime 上述以提到,这是应用程序可以用到的库

使用期 - Kinesis Firehose - 更高级封装

Amazon Kinesis Data Firehose,AWS Lambda 开发使用器,Amazon Kinesis Data Analytics 开发
使用器。您可以使用 Kinesis Data Firehose 读取和处理 Kinesis 流中的记录。Kinesis Data Firehose 
是一个完全托管 的服务,用于将实时流数据传输到目标
(如 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service 和 Splunk)。也是不再需要
编写Java代码就可以直接将数据加载到目标。
例如目标为redshift
{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-0.05,"PRICE":84.51}
=>
create table firehose_test_table
(
TICKER_SYMBOL varchar(4),
SECTOR varchar(16),
CHANGE float,
PRICE float
);
Firehose还可以单独使用,用于让数据流传输到适用的目标如S3。与SQS非常相似也是提供一个有时限的缓存,不同系统之间传输信息,区别在于实时vs非实时,大数据vs msg。
与Kinesis区别是firehose缓存大小对于S3只有128M,比Kinesis小很大,缓存的时间buffer interval也只有60s到900s,比kenesis小很多,目标也只能有限几个AWS产品,而kinesis可以是任何下游产品。kinesis类似于Kafka。
最佳实践为music应用,metrics传输到第三方partner,DDB->lambda->firehose->S3->Redshift

功能

单个分片->单个工作线程取数据。如果数据量大,可以将某几个shard聚合再放入到下一层的
stream。通过两层stream聚合来更快处理数据。
支持重新分片,这使您能够调整流中的分片数量以适应流中数据流量的变化。

增强型扇出功能 是一种 Amazon Kinesis Data Streams 功能,使用者利用此功能能够接收数据流(其
中每分 片每秒的专用吞吐量高达 2 MiB 数据)中的记录。此吞吐量是专用的,
这意味着,使用增强型扇出 功能的使用者不必与接收流中数据的其他使用者争夺。
Kinesis Data Streams 将流中的数据记录推送到使用 增强型扇出功能的使用者。
因此,这些使用者无需轮询数据。

Amazon Kinesis Data Streams 维度与指标用cloudwatch来记录

常见问题

(读取数据终止条件)即使流中有数据,GetRecords 仍然返回空记录阵列

ShardIterator 指向的分片部分附近没有数据。 此情况很微妙,但却是避免在检索数据时搜寻时间
无止境(延迟)的一种必要的设计折衷。因此,流使 用应用程序应循环并调用 GetRecords,并
且理所当然地处理空记录。 在生产场景中,仅当 NextShardIterator 值为 NULL 时,
才应退出连续循环。当 NextShardIterator 为 NULL 时,这意味着当前分片已关闭,ShardIterator
值的指向应越过最后一条记录。如果使用应用 程序从不调用 SplitShard 或 MergeShards,分片将
保持打开状态,并且对GetRecords 的调用从不返回为 NextShardIterator 的 NULL 值。

Java代码示例

下面发布者通过kinesisClient发布一个股票交易到AWS。而流事件是生产者自定义的。这是流事件的ID,通过这个ID,使用者者可只关注某些流事件。
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();

    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
 
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section     //below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    kinesisClient.putRecord(putRecord);
}

使用者:

订阅者通过kinesisClient得到record,然后可以得到trade
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
stockStats.addStockTrade(trade);

与SQS的区别:

Kinesis是实时的且可用于大数据,而SQS是非实时处理任务,将同步变异步解耦削峰。


ref:
官方指南
FireHose官方指南
分布式发布订阅系统
概念
发布者
订阅者
访问权限
与SQS区别