分布式缓存服务 DCS-DCS Redis 5.0支持的新特性说明:Stream数据结构

时间:2023-11-01 16:16:57

Stream数据结构

Stream是Redis 5.0引入的一种新数据类型,它是一个全新的支持多播的可持久化消息队列。

Redis Stream的结构示意图如图1所示,它是一个可持久化的数据结构,用一个消息链表,将所有加入进来的消息都串起来。

Stream数据结构具有以下特性
  1. Stream中可以有多个消费者组。
  2. 每个消费组都含有一个Last_delivered_id,指向消费组当前已消费的最后一个元素(消息)。
  3. 每个消费组可以含有多个消费者对象,消费者共享消费组中的Last_delivered_id,相同消费组内的消费者存在竞争关系,即一个元素只能被其中一个消费者进行消费。
  4. 消费者对象内还维持了一个Pending_ids,Pending_ids记录已发送给客户端,但是还没完成ACK(消费确认)的元素id。
  5. Stream与Redis其他数据结构的比较,见表1
图1 Stream数据结构示意图
表1 Stream与Redis现有数据结构比较

比较项

Stream

List、Pub/Sub、Zset

复杂度

获取元素高效,复杂度为O(logN)

List获取元素的复杂度为O(N)

offset

支持offset,每个消息元素有唯一id。不会因为新元素加入或者其他元素淘汰而改变id。

List没有offset概念,如果有元素被逐出,无法确定最新的元素

持久化

支持消息元素持久化,可以保存到AOF和RDB中。

Pub/Sub不支持持久化消息。

消费分组

支持消费分组

Pub/Sub不支持消费分组

消息确认

支持ACK(消费确认)

Pub/Sub不支持

性能

Stream性能与消费者数量无明显关系

Pub/Sub性能与客户端数量正相关

逐出

允许按时间线逐出历史数据,支持block,给予radix tree和listpack,内存开销少。

Zset不能重复添加相同元素,不支持逐出和block,内存开销大。

删除元素

不能从中间删除消息元素。

Zset支持删除任意元素

Stream相关命令介绍

接下来按照使用流程中出现的顺序介绍Stream相关命令。详细命令见表2
  1. 首先使用XADD添加流元素,即创建Stream,添加流元素时可指定消息数量最大保存范围。
  2. 然后通过XGROUP创建消费者组。
  3. 消费者使用XREADGROUP指令进行消费。
  4. 客户端消费完毕后使用XACK命令确认消息已消费成功。
图2 Stream相关命令介绍
表2 Stream的详细命令

命令

说明

语法

XACK

从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。

XACK key group ID [ID ...]

XADD

将指定的流条目追加到指定key的流中。 如果key不存在,作为运行这个命令的副作用,将使用流的条目自动创建key。

XADD key ID field string [field string ...]

XCLAIM

在流的消费者组上下文中,此命令改变待处理消息的所有权, 因此新的所有者是在命令参数中指定的消费者。

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

XDEL

从指定流中移除指定的条目,并返回成功删除的条目的数量,在传递的ID不存在的情况下, 返回的数量可能与传递的ID数量不同。

XDEL key ID [ID ...]

XGROUP

该命令用于管理流数据结构关联的消费者组。使用XGROUP你可以:

  • 创建与流关联的新消费者组。
  • 销毁一个消费者组。
  • 从消费者组中移除指定的消费者。
  • 将消费者组的最后交付ID设置为其他内容。

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

XINFO

检索关于流和关联的消费者组的不同的信息。

XINFO [CONSUMERS key groupname] key key [HELP]

XLEN

返回流中的条目数。如果指定的key不存在,则此命令返回0,就好像该流为空。

XLEN key

XPENDING

通过消费者组从流中获取数据。检查待处理消息列表的接口,用于观察和了解消费者组中哪些客户端是活跃的,哪些消息在等待消费,或者查看是否有空闲的消息。

XPENDING key group [start end count] [consumer]

XRANGE

返回流中满足给定ID范围的条目。

XRANGE key start end [COUNT count]

XREAD

从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

XREADGROUP

XREAD命令的特殊版本,指定消费者组进行读取。

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

XREVRANGE

与XRANGE相同,但显著的区别是以相反的顺序返回条目,并以相反的顺序获取开始-结束参数

XREVRANGE key end start [COUNT count]

XTRIM

XTRIM将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。

XTRIM key MAXLEN [~] count

消息(流元素)消费确认

Stream与相比Pub/Sub,不仅增加消费分组模式,还支持消息消费确认。

当一条消息被某个消费者调用XREADGROUP命令读取或调用XCLAIM命令接管的时候, 服务器尚不确定它是否至少被处理了一次。 因此,一旦消费者成功处理完一条消息,它应该调用XACK知会Stream,这样这个消息就不会被再次处理, 同时关于此消息的PEL(pending_ids)条目也会被清除,从Redis服务器释放内存

某些情况下,因为网络问题等,客户端消费完毕后没有调用XACK,这时候PEL内会保留对应的元素ID。待客户端重新连上后,XREADGROUP的起始消息ID建议设置为0-0,表示读取所有的PEL消息及自last_id之后的消息。同时,消费者消费消息时需要能够支持消息重复传递。

图3 ACK机制解读
support.huaweicloud.com/dcs_faq/Redis5-newfeature.html