Redis的底层类型之stream

werbenhu / 118 / 2023-09-25 11:25:39

ChatGPT 可用网址,仅供交流学习使用,如对您有所帮助,请收藏并推荐给需要的朋友。
https://ckai.xyz

stream

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

  • XADD
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] count] <* | id> field value [field value ...]

如果 mystream 不存在,NOMKSTREAM 选项将阻止自动创建新的 Stream

count: 是阈值,限制消息的数量

<stream>:指定要添加条目的流的名称。

MAXLEN [~|=] <count>:(可选)用于限制流的长度。~ 表示不包括最新的条目,| 表示包括最新的条目,<count> 指定要保留的最大条目数量。例如,MAXLEN ~ 100 表示保留最新的 100 个条目,MAXLEN | 100 表示保留最新的 101 个条目(包括最新的一个)。

ID <id>:(可选)指定新条目的 ID。如果省略,Redis 会自动生成一个唯一的 ID。

<field> <value>:指定要添加到条目的字段和值。

  • XGROUP

XGROUP 是 Redis 中用于管理 Stream 消费者组的命令。它允许您创建、管理和配置 Stream 消费者组,以便多个消费者能够协作处理 Stream 中的消息。

XGROUP CREATE key group <id | $> [MKSTREAM]
  [ENTRIESREAD entries-read]

<stream>:指定要创建消费者组的 Stream 的名称。

<groupname>:指定消费者组的名称。

<id-or-$>:指定从 Stream 的哪个位置开始消费消息。可以是特定的消息 ID,也可以是 $,表示从 Stream 的最新消息开始消费。

[MKSTREAM]:可选参数,如果指定,将创建一个新的 Stream,如果该 Stream 不存在。

  • XGROUP CREATECONSUMER
  • XGROUP DELCONSUMER
  • XGROUP DESTROY
  • XGROUP SETID

创建一个消费者

XGROUP CREATECONSUMER key group consumer
  • XACK

XACK 是 Redis Streams 中的命令,用于从消费者组中确认接收并处理消息。在消费者组中,多个消费者可以协作处理 Stream 中的消息,但为了确保消息不会被多次处理,每个消息都需要由某个消费者进行确认(ACK)。XACK 命令用于执行此确认操作。

  • XAUTOCLAIM

自动获取并处理消息。它通常用于消费者组,以自动从 Stream 中获取消息并将其分配给不同的消费者以处理。这个命令转移所有符合指定条件的待处理流条目的所有权。从概念上讲,XAUTOCLAIM 相当于先调用 XPENDING,然后再调用 XCLAIM,但它通过类似 SCAN 的语义提供了一种更直观的方式来处理消息传递失败。

  • XPENDING

查询有关消费者组(Consumer Group)中未处理消息的信息,以及有关每个消费者的消费进度的信息。

  • XCLAIM

将未确认(pending)的消息重新分配给指定的消费者。当一个消费者无法处理一条消息时,该消息会变为未确认状态,然后可以使用 XCLAIM 命令将它分配给另一个消费者。

  • XINFO CONSUMERS
  • XINFO GROUPS
  • XINFO STREAM
  • XLEN
  • XREADGROUP
  • XREAD

XREAD:用于读取消息。XREAD 命令用于从一个或多个流中读取消息。它可以用于实时地获取消息并进行处理。
您可以设置阻塞时间(BLOCK)来等待新消息到达,或者指定要返回的消息数量(COUNT)。
适用于消费者需要获取消息内容的情况。
XPENDING:用于管理挂起的消息(Pending Messages)。

XPENDING 命令用于查看挂起的消息的信息,如待处理消息的数量、最早和最新的消息 ID、以及消费者的相关信息。
这个命令通常用于监控和管理待处理的消息,以确保消息在消费者之间得到正确处理。
适用于监控和管理挂起的消息。
总之,XREAD 用于读取消息内容,而 XPENDING 用于查看和管理待处理消息的信息。这两个命令在不同的情境下有不同的用途,通常用于实现流式数据处理系统中的不同功能。

  • XTRIM

XTRIM 是 Redis Streams 中的命令,用于修剪(Trim)流的大小,以限制流中消息的数量。这可以帮助您控制流的存储大小,以防止其无限增长。


作者
werbenhu
许可协议
CC BY 4.0
发布于
2023-09-25
修改于
2024-05-29
Bonnie image
尚未登录