redis消息队列方案
# redis 消息队列方案
观察角度:消息有序,重复消息处理,消息可靠性保证
# pub/sub 发布订阅机制
# list集合
消息有序:lpush和rpop可以保证消息顺序的被消费
重复消息处理:list没有为消息提供唯一标识,需要生产者提供唯一标识,消费程序自行判断消息是否重复
消息可靠性保证:讲的是消息因为网络,停电,宕机等突发因素丢失,list没有为消息保存副本,这样消费者如果正在消费时有突发因素,那么就会发生消息丢失。
所以list做消息队列时,如果是对消息丢失不是很敏感的场景可以使用。
# stream
优点:支持持久化,支持消费组形式的消息读取
Streams 同样能够满足消息队列的三大需求。而且,它还支持消费组形式的消息读取
# Stream基本结构
Redis Stream像是一个仅追加内容的消息链表,把所有加入的消息都一个一个串起来,每个消息都有一个唯一的 ID 和内容,它还从 Kafka 借鉴了另一种概念:消费者组(Consumer Group),这让Redis Stream变得更加复杂。
Redis Stream的结构如下:
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 XADD 指令追加消息时自动创建。
- Consumer Group:消费者组,消费者组记录了Starem的状态**,使用 XGROUP CREATE 命令手动创建,在同一个Stream内消费者组名称唯一。一个消费组可以有多个消费者(Consumer)同时进行组内消费,所有消费者共享Stream内的所有信息,但同一条消息只会有一个消费者消费到,不同的消费者会消费Stream中不同的消息,这样就可以应用在分布式的场景中来保证消息消费的唯一性。
- last_delivered_id:游标,用来记录某个消费者组在Stream上的消费位置信息**,每个消费组会有个游标,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。创建消费者组时需要指定从Stream的哪一个消息ID(哪个位置)开始消费,该位置之前的数据会被忽略,同时还用来初始化 last_delivered_id 这个变量。这个last_delivered_id一般来说就是最新消费的消息ID。
- pending_ids:消费者内部的状态变量,作用是维护消费者的未确认的消息ID。pending_ids记录了当前已经被客户端读取,但是还没有 ack (Acknowledge character:确认字符)的消息。 目的是为了保证客户端至少消费了消息一次,而不会在网络传输的中途丢失而没有对消息进行处理。如果客户端没有 ack,那么这个变量里面的消息ID 就会越来越多,一旦某个消息被ack,它就会对应开始减少。这个变量也被 Redis 官方称为 PEL (Pending Entries List)。
# 插入消息
往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。*号代表自动创建全局唯一的id,也可以手动生成,但必须是增长且唯一的。
XADD mqstream * repo 5
"1599203861727-0"
1
2
2
# 读取消息
使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
1
- count :数量
- milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
- key :队列名
- id :消息 ID
# 创建消费者组 (XGROUP CREATE)
使用 XGROUP CREATE 创建消费者组,语法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
1
- key :队列名称,如果不存在就创建
- groupname :组名。
- $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
1
从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
1
消费组命令
# 消费者组消费消息(XREADGROUP GROUP)
使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
1
- group :消费组名
- consumer :消费者名。
- count : 读取数量。
- milliseconds : 阻塞毫秒数。
- key : 队列名。
- ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
1
上次更新: 2023/05/07, 23:16:56