DynamoDB Stream

2020-03-05| NoSQL, DynamoDB, Data-Intensive

DynamoDB的表能够存储大量的数据,为了提高查找性能,研发人员通常会将关联但不同的数据实体(比如User和Order)集中存放在一台服务器上,这就导致表中的数据关系难以理解!如果直接基于该表来分析其中的数据,则分析工作将会变得困难起来,除此之外,也会影响终端用户的用户体验(分析任务会占用该表的读写单元)。为了使分析工作变得简单,则需要将表中的数据导入到其它分析系统,最终依赖其它分析系统来分析数据。将数据导出到其它系统主要有2种办法,它们分别是:1.遍历整张表,并将每一项数据写入到其它系统;2.表中的数据每变更一次,则将变更写入到其它系统。前者不适用于数据量庞大的情景,而后者可以很好地避开处理大量数据,但需要借助DynamoDB Stream功能来实现。

DynamoDB Stream是DynamoDB提供的一个功能,现实世界里有许多场景会使用到它,这些场景有:

  • 同步不同区域的数据。比如你的业务分布在中国和美国,那么你需要将美国的数据同步到中国,反之亦然。此时你需要借助DynamoDB Stream来实现数据同步,最终确保中国用户与美国用户访问的数据是一致的。
  • 发送消息。比如你的App有一个用户注册功能,每当一个新用户注册时,你需要向新用户发送一封表示欢迎的邮件通知。此时你需要DynamoDB Stream的通知机制来触发分发邮件的服务。
  • 索引数据。DynamoDB不适合全文搜索,因此如果你想要搜索表中的数据,那么最好的办法是将表中的数据快速同步到Elastic Search服务或者algolia里,最终通过这些服务来搜索数据。为了能够快速地将数据更新到搜索系统,那么则需要DynamoDB Stream。
  • 聚合或统计数据。有时你想快速得到一些统计信息,比如某个区域的总销量,每家门店每个月所需的成本等等。那么你可以使用DynamoDB Stream来聚合这些数据。

为了实现以上提到的应用场景,则需要了解DynamoDB Stream内部的逻辑以及它与其它服务的关系。这篇文章将从以下几个方面来讲解DynamoDB Stream:

  1. DynamoDB Stream的构成以及其周边服务
  2. 使用DynamoDB Stream的注意事项
  3. 基于DynamoDB Stream的设计模式
  4. 结论
  5. 参考

DynamoDB Stream的构成以及其周边服务

DynamoDB Stream是DynamoDB服务所提供的一个功能,它需要结合DynamoDB Table来使用。开启DynamoDB Stream功能的Table能集成其它服务,最终能够延伸DynamoDB的功能(比如,使用Elastic Search来检索表中的数据,并提供全文搜索功能!)。下图展示了DynamoDB Stream与其它服务的关系:

图 1.1 DynamoDB Stream与Table,Lambda之间的关系

上图涉及到DynamoDB Stream的工作流程是:

  1. Producers将向表中修改数据,包括添加数据,修改已有数据,删除数据等。这些操作均是基于HTTPS协议来发起的。
  2. DynamoDB将修改之后的数据写入内部的Transactions Log文件,DynamoDB Stream从该文件中读取更新之后的数据并存放24小时,在这之后,数据将自动从DynamoDB Stream中移除。
  3. Lambda平台向每一个Shard发送HTTPS请求并以poll的方式监听所有的Shards。
  4. Lambda平台从Shard中批量读取新数据(Records),并以Sync的方式启动一个Lambda Function实例,并将Records传入并执行该实例。如果结果返回成功,那么Lambda平台会继续读取下一批数据,失败则把当前的数据集传入该函数实例,并重新执行。

上图涉及到DynamoDB Stream的内部逻辑以及外部交互:

研发人员在使用DynamoDB服务时,需要创建表(Table),每一张表其内部又根据数据量划分了好几个分区(如上图的Partition A,Partition B,Partition C)。每一个分区其实会分布于不同的服务器上。

如果在该表上启用了DynamoDB Stream,那么这个Stream会根据分区数量来创建Shard(如上图有3个Shard)。每一个分区中修改的数据只会发送到对应的Shard上,并且是有序的。每一个Shard里的数据(如上图的Record),其生命周期是24小时,在这之后,该数据项自动移除。

DynamoDB Stream只允许使用者(也就是上图的Consumers)从中批量读取数据,其它操作,比如删除其中的数据项或者修改其中的数据项是禁止的。DynamoDB Stream的吞吐量(MB/S)受到Shard的限制,Shard的数量越多,则其吞吐量越大(每个Shard的吞吐量是2MB/S,每个Consumer每秒最多能读取1MB的数据)。每个Consumer一次最多能从对应的Shard中读取1000条数据(Record)。每个Shard最多同时被2个Consumers来使用,超过这个数量,则会导致读取失败,这一点使得DynamoDB Stream无法直接支持超过2个以上的服务(比如,无法同时支持新用户注册收到欢迎邮件,将数据导出到其它分析系统,检索DynamoDB中的数据等)。

上图使用了AWS Lambda服务作为DynamoDB Stream的Consumers。研发人员只需要创建一个Lambda Function,然后将该Function的触发器设置成DynamoDB,就能接收DynamoDB Stream的通知了。当每个Shard里有新的数据时,Lambda服务会根据该函数的设置(比如最多一次取10000个Records)读取Records(如果Shard里有100万条数据,每1000条的数据量是0.8MB<1MB,那么Lambda服务会调用10次GetRecords操作,并得到8MB的数据,但是由于Lambda Function的request PayLoad最大是6MB,因此,此时调用Lambda Function时会引发异常。),然后调用Function实例,并将读取到的Records批量传递给Function,由Function决定发送给哪些下游服务,如果这些批量数据均处理成功,那么Lambda服务将继续从对应的Shard中读取下一批Record,直到对应的Shard中没有新的数据。

除了使用Lambda Function,还可以使用EC2,并运行KCL和DynamoDB Streams Kinesis Adapter应用来读取数据,如下图所示:

图 1.2 访问DynamoDB Stream的2种不同方式:KCL Worker和Lambda

与Lambda Function作为Consumers不同,KCL Workers需要运行在服务器上(比如EC2或Kubernetes的节点上),并且需要研发人员考虑规模化的问题,比如决定运行几个Workers等。除此之外,研发人员还要基于KCL和DynamoDB Streams Kinesis Adapter编写数据读取应用。

使用DynamoDB Stream的注意事项

DynamoDB Stream能够捕获DynamoDB Table中变化的数据,并保留24小时。每项数据(Record)按照作用到表上的时间先后进入到Stream中的某一个Shard(根据该Record的Key来选择Shard),而且同一Record只会出现一次而不会出现多次,这些特性使得DynamoDB Stream能够用于同步数据(比如将不同区域的数据同步成一致的状态)。

DynamoDB Stream不会占用DynamoDB Table的读写单元(WCU和RCU),而是直接使用了类似于DB中的Transactions Log结构来获取变更的数据,因此其它服务从DynamoDB Stream读取数据时,不会影响其关联(Table)的性能。

DynamoDB Stream会根据DynamoDB Table中分区(Partition)的数量创建相同数量的Shards,每一个Shard都会有一些限制,比如,调用GetRecords接口最多能返回1000条Record,返回的数据量最多是1MB。因此,当需要将数据从DynamoDB Stream中发送到另外一个下游服务,则需要考虑下游服务的写入数据的速率。比如,从DynamoDB Stream中读取数据的速率是10M/S,而下游Elastic Search只能支撑最大的写入数据的速率是1M/S,那么此时需要调整读取数据的应用(Lambda Function或KCL Worker)或者提高Elastic Search写入数据的速率,以便上下游的服务的数据流转速率是匹配的。

在DynamoDB Stream内部,每一个Shard,最多能同时支持2个Consumers,如果有2个以上Consumer来读取同一个Shard,那么很有可能会引发数据读取失败的错误。因此要想规模化DynamoDB Stream的读取数据的操作,则首先要确保其关联表中的数据是均匀分布的,其次要将Consumers均匀分配给不同的Shards,最后要确保每一个Shard不能同时被2个以上的Consumers占用。

对于需要DynamoDB Stream同步数据的应用场景,如果在同步过程中出现了错误,那么需要将已经执行成功的数据回退并重新处理整批数据。原因在于数据的最终状态依赖于DB中Transactions的次序,而DynamoDB Stream中变更的数据,其次序与DB中Transactions的次序是一致的。假设我们向DynamoDB Table中新增一项数据(Record,其ID是123),接着再进行修改,之后再删除,此时DynamoDB Table的Transaction Log里会出现3个Transactions,分别对应Insert,Update和Delete操作以及相关的数据,而这些操作所产生的Records会以同样的次序出现在DynamoDB Stream里,如下所示:

Records[
    {
        "EventName":"Insert",
        "ID":"123",
        "Name":"2cloudlab.com"
    },
    {
        "EventName":"Update",
        "ID":"123",
        "Name":"https://2cloudlab.com"
    },
    {
        "EventName":"Delete",
        "ID":"123",
        "Name":"https://2cloudlab.com"
    }
]

假设,在一次同步过程中出错了,比如处理以下数据时出错了:

{
    "EventName":"Insert",
    "ID":"123",
    "Name":"2cloudlab.com"
}

如果同步程序忽略错误而继续处理后续的数据,比如:

{
    "EventName":"Update",
    "ID":"123",
    "Name":"https://2cloudlab.com"
}

由于目标数据库没有ID为123的数据,因此这次更新目标数据库的操作将失败。将DynamoDB Stream中的数据发送到其它系统这个过程会伴随着错误,因此研发人员需要根据不同的应用场景设计出错误处理方案。接下来,让我们看看如何基于DynamoDB Stream来设计健壮的事件驱动型系统。

基于DynamoDB Stream的设计模式

DynamoDB Stream非常适合于事件驱动型架构(event-driven architecture)。事件驱动型架构的基本思想是:当一个事件发生了(比如用户注册或者购买了一件商品),该事件将被记录下来,并由事件响应者(比如全文搜索系统或分析系统等)处理这些事件。接下来,让我们通过一个例子来应用这种架构。

假设,你是The New York Times报社,当前的用户量有100+万。用户希望能够快速打开热门的新闻,除此之外,还希望能够通过关键字来搜索相关文章。为了解决当下这些问题,让我们看一个比较简单的方案。

1. 一个简单的基于事件驱动型架构的例子

根据以上需求,我们选择了DynamoDB,Lambda以及Elastic Search来解决提到的问题,结果如下图所示:

图 1.3 利用DynamoDB Stream来捕获更新,并将更新写入Elastic Search

其中,DynamoDB用于存储新闻,并提供高效获取数据的能力,新闻工作者可以将编辑的新闻提交到DynamoDB,终端用户可以根据新闻的ID打开这篇新闻。

为了支持查询,我们需要引入Elastic Search,它具有全文搜索的能力,但是依旧需要研发人员启动并管理多个实例(如果需要Serverless的搜索服务,那么可以考虑使用algolia)。

接下来,我们需要向Elastic Search填充数据。为了持续填充变化的数据,则需要借助DynamoDB Stream和Lambda。研发人员需要在存储新闻信息的表上启用DynamoDB Stream,选择合适的语言(GO,Python,Java)编写一个Lambda函数,该函数通过调用Elastic Search的SDK来填入数据。最后,将DynamoDB Stream与Lambda函数映射在一起。

该解决方案的运作过程是:新闻编辑者将新闻上传到DynamoDB,紧接着DynamoDB Stream将触发Lambda函数,而Lambda函数会将新闻的变化(比如新添,修改或删除新闻)填充到Elastic Search中。终端用户一方面能够从DynamoDB中获取一篇或多篇文章,另外一方面也能够通过关键字从Elastic Search中搜索感兴趣的文章。

到目前为止,一切都非常顺利,但是如果以上描述的整个过程中某一个环节出现了错误(比如上图的环节1或者2)应该怎么办?

2. 如何处理事件驱动型架构中的错误

事件驱动型系统中的错误主要分为2类:组件与组件之间通信相关的错误和组件自身的错误。前者是可以自动修复的,比如某一个时间段网络太拥堵而导致信息丢失了,这类错误只需要通过重新尝试就可以解决。而后者是无法自动修复,并需要人为干预,才能修复。比如,新闻工作者想上传一篇超过400KB的文章,可是基于现有的系统,传输时会提示失败,原因在于DynamoDB的Item,其大小不能超过400KB。

任何一类错误均有可能在上图的1和2中发生,如果发生,则需要一个异常处理的机制来纠正这些错误。针对组件与组件之间的错误,则只需要通过重新尝试的机制来解决。比如上图的第2步,如果因网络原因无法将数据填入Elastic Search,那么Lambda会自动重新执行Lambda函数,重试的时间间隔按照Exponential backoff的方式来决定。AWS Lambda的重试行为会因调用者来决定,具体如下:

  • 同步调用(比如通过API Gateway调用Lambda函数):不会触发重试行为,而是由调用者决定重试次数
  • 异步调用(比如通过SQS和SNS调用Lambda函数):默认情况会重试2次,因此重试结束后,如果依然没有解决错误,那么需要将事件存储到Dead Letter Queue (DLQ)
  • 基于流的调用(比如通过Dynamo DB Streams调用Lambda函数): Lambda会反复执行失败的任务,直到数据过期或者执行成功

但是如果是Lambda函数自身的错误,那么为了纠正这类错误则需要分场景来考虑。一种场景是需要依赖数据的次序的,比如,现在所遇到的往Elastic Search中填入数据的例子。如果我们处理某一个数据出错时,这个错误是因为Lambda函数自身所引起的,那么这个错误是不能忽略的,而是要及时通知给相关的研发人员。接到通知后,研发人员需要分析Lambda函数的日志,并采取修补措施。

另外一种场景是不依赖于数据的次序的,比如,统计投票数量等信息。此时,如果我们处理某一个数据出错时,则可以将这个出错的信息记录到一个队列里,并继续处理下一条记录。当这些错误的信息超过一定量时,则通知相关的研发人员。接到通知后,研发人员则需要分析Lambda函数的日志,并采取修补措施。下图为这种场景的异常处理机制:

图 1.4 利用SQS存储异常发生时的数据上下文,以便恢复异常之后重新执行

上面的解决方案依然存在问题。如果步骤2和步骤10处理同一个Record,那么结果是Statistics中的信息将是不准确的。因此为了避免这类错误,则需要将步骤2和步骤10之前的2个Lambda函数设计成具有idempotent属性。具有该属性的函数即便是执行多次,也不会影响Statistics中的最终结果。

3. 如何大规模读取DynamoDB Stream

为了获得变更之后的数据,你需要借助一个处理器来从DynamoDB Stream中读取数据,常见的处理器有Lambda函数。现在假设,市场部门的人找到你,让你帮忙从DynamoDB中获取新闻的点击量,以便他们能分析哪类新闻,在哪些时间段的点击量是最高的。一个较好的做法是编写另外一个Lambda函数,并将DynamoDB Stream设置成该Lambda函数的触发器。这个Lambda函数的作用是把关于新闻流行度的统计信息分发到市场部门的分析系统,以便市场人员能够使用它们的分析系统来找到最受欢迎的新闻以及人群分布情况等。加上这个新的Lambda函数之后,The New York Times报社的解决方案如下:

图 1.5 不同的Lambda函数同时读取DynamoDB Stream中的数据

虽然,DynamoDB Stream能同时支持2个不同的Lambda函数(上图的Lambda和Transform statistics),但是这么做会导致这2个Lambda函数占用相同的资源,从而每一个Lambda函数的读取数据的效率变低。尤其是当你需要再添加一个Lambda函数,用于同步数据时,DynamoDB Stream就无法同时支持这3个Lambda函数。这个就是我们所说的规模化问题,接下来让我们看看有哪些方法能够规模化读取DynamoDB Stream。

规模化DynamoDB Stream的方法主要有2种,分别是不考虑数据次序的规模化和考虑次序的规模化。比如,之前我们要为市场部门提取统计信息的应用场景则是不需要考虑数据产生的次序的,而像数据同步,检索数据等应用场景则需要考虑数据产生的次序。下面我们来针对这2种不同的应用场景来实施规模化DynamoDB Stream方案。

不考虑数据次序来规模化DynamoDB Stream,则可以借助SNS,SQS和Lambda服务来实现。具体思路是只允许使用一个Lambda函数从DynamoDB Stream读取数据,并将读取到的数据发送给SNS,接着SNS再以广播的方式将数据分发到不同的SQS,每个SQS均有一群Consumers,如下图所示:

图 1.6 使用SQS提高DynamoDB Stream所支持的并发数,同时也丢失了原来数据的次序

考虑数据次序来规模化DynamoDB Stream,则可以考虑使用Kinesis Streams和Lambda。Kinesis Streams也是流服务,与DynamoDB不同的是,它可以同时支持多个不同类别的Consumers。为了使用Kinesis Streams服务,需要在Lambda函数里调用KCL SDK,该SDK专门用于操作Kinesis Streams服务。为了让数据按照DynamoDB Stream中的次序进入到Kinesis Streams,则需要设置为每一个数据设置SequenceNumberForOrderingPartitionKey,前者用于确保数据的次序,后者则确保该数据存在于Kinesis Streams中的哪一个Shard。由于Kinesis Streams与众多AWS服务集成,因此可以轻松实现多种规模化场景,如下图所示:

图 1.7 使用Kinesis Stream提高DynamoDB Stream所支持的并发数,原来数据的次序得到了保留

让我们回到The New York Times报社例子。之前提到,一共有3个不同的Lambda函数同时读取DynamoDB Stream,它们的作用分别是检索新闻,同步数据和分析数据。为了突破DynamoDB Stream的限制,则需要借助Kinesis Streams,最终我们的The New York Times报社解决方案如下所示:

图 1.8 结合了搜索服务,数据同步服务,分析服务的DynamoDB Stream

从上图可知,我们的用户分布于全球,主要是美国和日本,因此上图的DynamoDB有2份,其中的数据是一样的(通过Sync Data函数来实现同步,当然你也可以使用Global Table来同步数据),它们分别位于美国和中国的机房。中国的用户通常优先访问中国区的数据,如果因为各种原因(比如,机房停电)导致中国区的数据无法使用,那么才会退而访问美国区的DynamoDB,然而这会导致中国区用户打开报社网站变慢。

为了加快不同地区的用户打开报社网站的速度,我们添加了CloudFront节点,它是一款能够根据用户所在区域(比如,中国,日本,美国等)来缓存页面的资源(包括CSS,JS,HTML,Image和视频等)。该服务主要缓存了2部分资源,它们分别是API Gateway和S3。

API Gateway的主要作用是将DynamoDB服务和Elastic Search服务对外暴露出来。除此之外,它还能够限流,以免访问报社网站的流量激增时导致DynamoDB或Elastic Search无法处理部分请求。最后,它还能对外提供Restful APIs,这些APIs能被手机端和浏览器端的应用调用。注意,我们在这里直接将API Gateway挂接到了DynamoDB,这种方式适用于简单的业务场景(比如用户的界面的数据结构与DynamoDB中存储的数据结构相似),如果涉及到复杂的应用场景,那么则需要在它们之间添加Lambda函数,这么做不仅增加了系统的复杂度,同时也会增加费用。DynamoDB不支持全文搜索,为了搜索根据关键字搜索相关新闻,则需要借助Elastic Search。为了向其中填入数据,则需要基于DynamoDB Stream,数据库中的每一次修改(新增,更新,删除等)都会及时更新到Elastic Search。之后,API Gateway只需要将将Elastic API暴露出来,其它应用(包括手机端和网页端)就能使用它来实现全文搜索功能。

S3是AWS的对象存储服务,它能存储各种对象,包括文本文件,图片,视频等,单个对象的大小最多是5GB。我们报社的例子是基于浏览器的Web应用,因此它的前端相关的代码(CSS,JS,HTML)均存放在S3中。不仅如此,每一篇新闻都会包含图片或视频,这些资源也都存放在S3中。由于每篇新闻所占存储空间无法确定(有可能超过400KB,DynamoDB中每一个数据项其大小不能超过400KB),因此为了支持字数较多的新闻,则需要将新闻内容放到S3中,并为每篇新闻生成一个S3链接存储于DynamoDB中。虽然解决了存储大对象的问题,但是却分离了数据(一部分数据在DynamoDB中,另外一部分数据在S3中),从而引发了数据一致性的问题。因此,可以考虑将字数少的新闻直接存放在DynamoDB中,而对于字数较多的新闻存放于S3中。

最后,该报社有3类服务需要用到DynamoDB Stream,它们分别是检索数据,分析数据和同步数据。因此我们使用了Kinesis Stream,这个服务能够规模化DynamoDB Stream。这3类服务需要通过Lambda服务来实现,分别定义了Sync Data,Aggregate Data,Index Data函数。其中Aggregate Data将数据聚合到RedShift中,市场团队基于RedShift来分析哪类新闻是最受欢迎的,以及报社的用户群体分布等信息。

虽然图 1.8能够解决报社的问题,但是也引发了更多的问题。首先,由于DynamoDB Stream集成了许多服务,比如Elastic Search,Firehose等,因此系统变得更加复杂,也更加容易出错。另外,增加了监控系统运行状态的工作量,比如,需要为每一个环节上的节点实施监控。除此之外,潜在的风险也变多了,因为需要针对每一个节点设计安全权限,以免外来入侵者攻破一个节点,便可以入侵整个系统。最后,费用上升了,因为用了更多的服务。

结论

本文介绍了DynamoDB Stream的内部结构和用途。DynamoDB Stream用于捕获DynamoDB Table中数据的变化,这些变化能够被Lambda服务检测到,并根据变化的数据来执行Lambda函数。Lambda服务是一个计算平台,它通过HTTPS协议来与DynamoDB Stream通信,并将数据读取出来,接着以同步(Sync)的方式来调用Lambda函数。DynamoDB Stream的内部由多个Shards组成,每个Shard会从对应的分区(比如图 1.1中的Partition A)读取数据,并保留24小时。Lambda服务会根据通知来启动Lambda函数实例,每一个Shard最多同时允许2个Lambda函数实例来读取数据,超过3个实例则可能会导致其中一个实例无法读取数据,这是DynamoDB Stream的限制。通过Lambda服务,DynamoDB能够与很多服务集成,比如支持搜索的Elastic Search,支持分析的QuickSight,支持同步数据的DynamoDB等。

使用DynamoDB Stream需要注意很多细节。其中最为重要的是它的并发量,它最多只能支持2个Lambda函数或KCL Worker,除此之外还需要考虑它存储数据的周期(24小时),最后还需要根据下游服务的吞吐量来使用DynamoDB Stream。

DynamoDB Stream能够实现事件驱动系统,这类系统是由事件驱动的。事件是指已经发生的事情,比如,新闻编辑向DynamoDB中添加一篇新闻,这就是一个事件。由于DynamoDB只适合存取数据,并不适合搜索或分析,因此为了给系统添加这些功能,则需要引入其它服务,比如Elastic Search服务用于搜索,Firehose用于分析等。因此需要使用DynamoDB Stream将这些事件散播出去,以便这些服务能够处理这些事件。基于事件驱动的系统也会带来一些挑战,比如如何处理其中发生的异常,如何确保事件的次序是正确的,如果是基于DynamoDB Stream的事件,那么如何提高它的并发量等。处理异常的方法需要考虑实际情况,比如不考虑事件次序的场景,其异常处理变得比较简单,只需要忽略异常,并将异常发生时的数据上下文存储到SQS中,再通知研发人员。要想提高DynamoDB Stream的并发量,并保持事件的原始次序,则需要借助Kinesis。

参考

听起来还不错 ?

如果你所在的企业遇到了以下问题:
研发流程混乱不堪或者效率低下、经历了持续上升的运维成本、无法及时向用户发布新的服务或产品以及想使用云计算技术但缺乏经验!
那么,请毫不犹疑地

联系我们