Sunday, March 5, 2017

AWS Kinesis Stream

AWS Kinesis Stream是近乎实时收集和处理大数据记录流。可以理解为近乎实时数据传输管道,相当于以前用FTP传输数据,但它是用API且将数据作为字节流传输。1秒的平均传播延迟。

使用场景

用于快速而持续的数据引入和聚合
使用的数据类型可以包括 IT 基础设施日志数据、应用程序日志、社交媒体、市场数据源和 Web点击流数据。由于数据引入和处理的响应时间是实时的,因此处理通常是轻量级的。

日志引入:推送系统和应用程序日志=metrics
实时数据分析:实时处理网站点击流
实时聚合数据,然后将聚合数据加载到数据仓库或 map-reduce群集

架构

关键在于分片shard,流可以指定多个分片,而AWS Kinesis需要花时间为该流分配出分片。分为生产者和使用者,分别是产生数据和消费数据。

Kinesis Data Stream:

保留期(retention period)是数据记录在添加到流中后可供访问的时间长度。在创建之后,流的保留期设置
为默认值 24小时。最多到7天。

分片是流中数据记录的唯一标识序列。一个流由一个或多个分片组成,每个分片提供一个固定的容量单
位。流的总容量是其分片容量的总和。
分区键用于按分片对流中的数据进行分组。MD5 哈希函数用于将分区键映射到 128 位整数值并将关联的
数据记录映射到分片。应用程序需要对每个数据指定分区键。在创建流时,您将指定流的分片数。
number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)

使用者称为Amazon Kinesis Data Streams Application
Kinesis Client Library(供使用者使用,是Kinesis服务的客户端)KCL 负责许多与分布式计算相关的复杂
任务,例如对多个实例进行负载均衡、
对实例故障做出响应、对已处理的数据执行检查点操作和对重新分 片做出反应。KCL 可让您将精力
放在编写记录处理逻辑上。
它使用Amazon DynamoDB表存储控制数据。它会为每个正在处理数据的应用程序创建一个表。实例中,
选择 KinesisDataVisSampleApp-KCLDynamoDBTable-[randomString] 表。
在表中有两个条目,指示特 定分片 (leaseKey)、流中的位置 (checkpoint) 和读取数据的应用程序 
(leaseOwner)。

用 AWS KMS 主密钥进行加密


最佳实践

AWS实例是统计URL实时数目(2s window)。
分析实时股票数据

底层API

创建和描述stream,需要的时间创建stream主要用于分配shard
create-stream --stream-name Foo --shard-count 1
describe-stream --stream-name Foo

Response:
{
   "StreamDescription": {
       "StreamStatus": "ACTIVE",
       "StreamName": "Foo",
       "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo",
       "Shards": [
           {
               "ShardId": "shardId-000000000000",
               "HashKeyRange": {
                   "EndingHashKey": "170141183460469231731687303715884105727",
                   "StartingHashKey": "0"
               },
               "SequenceNumberRange": {
                   "StartingSequenceNumber": "495469866831355442865074579357546397794"
               }
           }
       ]
   }
}

AWS 区域的默认分片限制为 500 分片
单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录.

每个 PutRecord 调用都需要流名称、分区键和创建者正在添加到流的数据记录。
put-record --stream-name Foo --partition-key 123 --data testdata
Response:
{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546156785154" }

要将数据放入流,您必须指定流的名称、分区键和要添加到流的数据 Blob。GetRecords
可以从单个分片中检索多达每个调用 10 MiB 数据,每个调用多达 10000 个记录。每个分片可以通过 GetRecords 支持每秒 2 MiB 的最大总数据读取速率。
首先获得分片的迭代器(指针)
get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
得到迭代器的id,然后将其带入到getRecords API

get-records --shard-iterator AAAAAAAAAAHSywlj
Response:
{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 
1.441215410867E9, 
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], 
"MillisBehindLatest":24000, "NextShardIterator":"AAAA
Data为base64编码
NextShardIterator下一个iterator,即使流没有新数据
当Records为空的时候才表示全部读完
MillisBehindLatest表示距流末端(最新数据)还有多久。零值指示正进行记录处理,此时没有新
的记录要处理

术语检查点操作是指记录到流中目前已使用和处理的数据记录所在的点,这样一来,当应用程序
发生崩溃时,系统将从该点读取流,而不是从头开始读取流。
只要记录SequenceNumber。然后获取Iterator时指定这个checkpoint即可。
GetShardIterator::StartingSequenceNumber

封装层

生产器  - KPL
Kinesis Producer Library (KPL) 简化了创建器应用程序的开发.KPL 是一个易于使用的、高度可配置的库,可帮助您对 Kinesis data stream 进行写入. 由于 KPL 可在将记录发送到 Kinesis Data Streams 之前对其进行缓冲处理,从而产生高吞入量。KPL 会导致库(用户可配置的)中产生高达 RecordMaxBufferedTime 的额外处理延 迟。RecordMaxBufferedTime 值越大,产生的包装效率和性能就越高。

使用器 - KCL
RecordMaxBufferedTime 上述以提到,这是应用程序可以用到的库

使用期 - Kinesis Firehose - 更高级封装

Amazon Kinesis Data Firehose,AWS Lambda 开发使用器,Amazon Kinesis Data Analytics 开发
使用器。您可以使用 Kinesis Data Firehose 读取和处理 Kinesis 流中的记录。Kinesis Data Firehose 
是一个完全托管 的服务,用于将实时流数据传输到目标
(如 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service 和 Splunk)。也是不再需要
编写Java代码就可以直接将数据加载到目标。
例如目标为redshift
{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-0.05,"PRICE":84.51}
=>
create table firehose_test_table
(
TICKER_SYMBOL varchar(4),
SECTOR varchar(16),
CHANGE float,
PRICE float
);
Firehose还可以单独使用,用于让数据流传输到适用的目标如S3。与SQS非常相似也是提供一个有时限的缓存,不同系统之间传输信息,区别在于实时vs非实时,大数据vs msg。
与Kinesis区别是firehose缓存大小对于S3只有128M,比Kinesis小很大,缓存的时间buffer interval也只有60s到900s,比kenesis小很多,目标也只能有限几个AWS产品,而kinesis可以是任何下游产品。kinesis类似于Kafka。
最佳实践为music应用,metrics传输到第三方partner,DDB->lambda->firehose->S3->Redshift

功能

单个分片->单个工作线程取数据。如果数据量大,可以将某几个shard聚合再放入到下一层的
stream。通过两层stream聚合来更快处理数据。
支持重新分片,这使您能够调整流中的分片数量以适应流中数据流量的变化。

增强型扇出功能 是一种 Amazon Kinesis Data Streams 功能,使用者利用此功能能够接收数据流(其
中每分 片每秒的专用吞吐量高达 2 MiB 数据)中的记录。此吞吐量是专用的,
这意味着,使用增强型扇出 功能的使用者不必与接收流中数据的其他使用者争夺。
Kinesis Data Streams 将流中的数据记录推送到使用 增强型扇出功能的使用者。
因此,这些使用者无需轮询数据。

Amazon Kinesis Data Streams 维度与指标用cloudwatch来记录

常见问题

(读取数据终止条件)即使流中有数据,GetRecords 仍然返回空记录阵列

ShardIterator 指向的分片部分附近没有数据。 此情况很微妙,但却是避免在检索数据时搜寻时间
无止境(延迟)的一种必要的设计折衷。因此,流使 用应用程序应循环并调用 GetRecords,并
且理所当然地处理空记录。 在生产场景中,仅当 NextShardIterator 值为 NULL 时,
才应退出连续循环。当 NextShardIterator 为 NULL 时,这意味着当前分片已关闭,ShardIterator
值的指向应越过最后一条记录。如果使用应用 程序从不调用 SplitShard 或 MergeShards,分片将
保持打开状态,并且对GetRecords 的调用从不返回为 NextShardIterator 的 NULL 值。

Java代码示例

下面发布者通过kinesisClient发布一个股票交易到AWS。而流事件是生产者自定义的。这是流事件的ID,通过这个ID,使用者者可只关注某些流事件。
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();

    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
 
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section     //below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    kinesisClient.putRecord(putRecord);
}

使用者:

订阅者通过kinesisClient得到record,然后可以得到trade
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
stockStats.addStockTrade(trade);

与SQS的区别:

Kinesis是实时的且可用于大数据,而SQS是非实时处理任务,将同步变异步解耦削峰。


ref:
官方指南
FireHose官方指南
分布式发布订阅系统
概念
发布者
订阅者
访问权限
与SQS区别

No comments:

Post a Comment