软文一般发布在哪些平台,seo实战密码电子版,wordpress进入,营销推广运营 网站使用Redis中的list实现消息队列
list是Redis的一种数据结构#xff0c;可以把它理解成双向链表
可以从头部插入数据然后从尾部取出数据#xff0c;从而实现消息队列的效果
利用命令 LPUSH和RPOP #xff08;从左边插入数据从右边取出数据#xff09;
lpush l1 e1 e2rpo…使用Redis中的list实现消息队列
list是Redis的一种数据结构可以把它理解成双向链表
可以从头部插入数据然后从尾部取出数据从而实现消息队列的效果
利用命令 LPUSH和RPOP 从左边插入数据从右边取出数据
lpush l1 e1 e2rpop l1或者 RPUSH和LPOP 从左边插入数据从右边取出数据从而实现消息队列
rpush l1 e1 e2lpop l1但是使用list作为消息队列也有弊端 - 不能实现广播功能只能单对单的进行消息队列
使用Redis中的pubsub实现消息队列
PubSub是Redis引入的一种消息传递的模型。消费者可以订阅一个或者多个Channel从Channel中获取数据当生产者向Channel发送数据的时候所有的消费者都可以接收数据
(XXX是Channel xxx是消息)
发送消息
publish order.XXX xxx 接收消息
subscribe order.XXX但是使用PubSub作为消息队列也有弊端 - 当消息堆积以后会造成消息的丢失
使用Redis中的stream实现消息队列
stream是Redis引入的新的消息队列是功能比较完善的消息队列
使用XADD用于添加信息
具体做法可以参考下面这张图
编写一段命令
XADD l1 20 * name jack age 21使用XREAD来接收命令 编写一段命令来接收刚刚发送的消息
XREAD Count 1 BLOCK 20 Stream l1 0我们也可以$来表示获取最新的消息
XREAD Count 1 BLOCK 20 Stream l1 $当然我们也可以创建一个消息组利用消息组来处理消息
创建消息组
XGROUP CREATE l1 g1 $利用消息组读取消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]group消费组名称 consumer消费者名称如果消费者不存在会自动创建一个消费者count本次查询的最大数量BLOCK milliseconds当没有消息时最长等待时间NOACK无需手动ACK获取到消息后自动确认STREAMS key指定队列名称ID获取消息的起始ID
当消息没有被确认的时候就会被放进PendingList里面等待处理
下面编写一段命令来创建两个消费者
XREADGROUP GROUP g1 c1 BLOCK 2000 STREAM l1XREADGROUP GROUP g1 c2 BLOCK 2000 STREAM l1下面来演示一下怎么使用
public class voucherOrderHandler implements Runnable{Orrvidepublic void run(){while(true){try{//接收信息ListMapRecordString,Object,Object list stringRedisTemplate.opsFotStream().read(Consumer.from(g1,c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(stream.orders,ReadOffset.lastConsumed()));//省略一堆业务代码//确认机制stringRedisTemplate.opsForStream.acknowledge(s1,g1,record.getId(); }catch(Exception e){log.erroe(消息处理异常,e);handlePendingList();}}}//当消息没有被确认就会出现异常那么我们就从PendingList里面尝试取出数据public void handlePendingList(){//几乎同样的逻辑再来一遍while(true){try{//接收信息 从PendingList中读取消息不需要阻塞ListMapRecordString,Object,Object list stringRedisTemplate.opsFotStream().read(Consumer.from(g1,c1),StreamReadOptions.empty().count(1)StreamOffset.create(stream.orders,ReadOffset.from(0)))//省略一堆业务代码当处理完消息的时候记得退出循环//确认机制stringRedisTemplate.opsForStream.acknowledge(s1,g1,record.getId(); //当出现异常的时候由于我们已经设置类while(true)所以会自动循环 }catch(Exception e){log.erroe(消息处理异常,e);try{Thread.sleep(20);}catch(Exception e){e.printStackTrace();}}}}
}