个人网站可以做社交类型,滨海天津网站建设,普洱专业企业网站建设,中国网站建设公司有哪些序#xff1a;使用订阅发布功能的时候想查一下客户端是如何接收消息的(客户端订阅了频道之后也会注册一个回调函数#xff0c;服务端publish消息之后回调函数会获取到消息。这块没看到太多内容只有C的源码)#xff0c;无意中查到这篇博客写的简单明了。转一下也做一个记录。…序使用订阅发布功能的时候想查一下客户端是如何接收消息的(客户端订阅了频道之后也会注册一个回调函数服务端publish消息之后回调函数会获取到消息。这块没看到太多内容只有C的源码)无意中查到这篇博客写的简单明了。转一下也做一个记录。这些命令被广泛用于构建即时通信应用比如网络聊天室(chatroom)和实时广播、实时提醒等。本文通过分析 Redis 源码里的 pubsub.c 文件了解发布和订阅机制的底层实现籍此加深对 Redis 的理解。订阅、发布和退订在开始研究源码之前不妨先来回顾一下几个相关命令的使用方式。PUBLISH 命令用于向给定的频道发送信息返回值为接收到信息的订阅者数量redis PUBLISH treehole top secret here ...(integer) 0redis PUBLISH chatroom hi?(integer) 1SUBSCRIBE 命令订阅给定的一个或多个频道redis SUBSCRIBE chatroomReading messages... (press Ctrl-C to quit)1) subscribe # 订阅反馈2) chatroom # 订阅的频道3) (integer) 1 # 目前客户端已订阅频道/模式的数量1) message # 信息2) chatroom # 发送信息的频道3) hi? # 信息内容SUBSCRIBE 的返回值当中 1) 为 subscribe 的是订阅的反馈信息而 1) 为 message 的则是订阅的频道所发送的信息。SUBSCRIBE 还可以订阅多个频道这样一来它接收到的信息就可能来自多个频道redis SUBSCRIBE chatroom talk-to-jackReading messages... (press Ctrl-C to quit)1) subscribe # 订阅 chatroom 的反馈2) chatroom3) (integer) 11) subscribe # 订阅 talk-to-jack 的反馈2) talk-to-jack3) (integer) 21) message # 来自 chatroom 的消息2) chatroom3) yahoo1) message # 来自 talk-to-peter 的消息2) talk-to-jack3) Goodmorning, peter.PSUBSCRIBE 提供了一种订阅符合给定模式的所有频道的方法比如说使用 it.* 为输入就可以订阅所有以 it. 开头的频道比如 it.news 、 it.blog 、 it.tweets 诸如此类redis PSUBSCRIBE it.*Reading messages... (press Ctrl-C to quit)1) psubscribe2) it.*3) (integer) 11) pmessage2) it.* # 匹配的模式3) it.news # 消息的来源频道4) Redis 2.6rc5 release # 消息内容1) pmessage2) it.*3) it.blog4) Why NoSQL matters1) pmessage2) it.*3) it.tweet4) redis: when will the 2.6 stable release?当然 PSUBSCRIBE 也可以接受多个参数从而匹配多种模式。最后 UNSUBSCRIBE 命令和 PUNSUBSCRIBE 负责退订给定的频道或模式。发布和订阅机制当一个客户端通过 PUBLISH 命令向订阅者发送信息的时候我们称这个客户端为发布者(publisher)。而当一个客户端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的时候我们称这个客户端为订阅者(subscriber)。为了解耦发布者(publisher)和订阅者(subscriber)之间的关系Redis 使用了 channel (频道)作为两者的中介 —— 发布者将信息直接发布给 channel 而 channel 负责将信息发送给适当的订阅者发布者和订阅者之间没有相互关系也不知道对方的存在知道了发布和订阅的机制之后接下来就可以开始研究具体的实现了我们从 Redis 的订阅命令开始说起。SUBSCRIBE 命令的实现前面说到Redis 将所有接受和发送信息的任务交给 channel 来进行而所有 channel 的信息就储存在 redisServer 这个结构中struct redisServer {// 省略 ...dict *pubsub_channels; // Map channels to list of subscribed clients// 省略 ...};pubsub_channels 是一个字典字典的键就是一个个 channel 而字典的值则是一个链表链表中保存了所有订阅这个 channel 的客户端。举个例子如果在一个 redisServer 实例中有一个叫做 news 的频道这个频道同时被client_123 和 client_456 两个客户端订阅那么这个 redisServer 结构看起来应该是这样子可以看出实现 SUBSCRIBE 命令的关键就是将客户端添加到给定 channel 的订阅链表中。函数 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底层实现它完成了将客户端添加到订阅链表中的工作// 订阅指定频道// 订阅成功返回 1 如果已经订阅过返回 0int pubsubSubscribeChannel(redisClient *c, robj *channel) {struct dictEntry *de;list *clients NULL;int retval 0;/* Add the channel to the client - channels hash table */// dictAdd 在添加新元素成功时返回 DICT_OK// 因此这个判断句表示如果新订阅 channel 成功那么 。。。if (dictAdd(c-pubsub_channels,channel,NULL) DICT_OK) {retval 1;incrRefCount(channel);/* Add the client to the channel - list of clients hash table */// 将 client 添加到订阅给定 channel 的链表中// 这个链表是一个哈希表的值哈希表的键是给定 channel// 这个哈希表保存在 server.pubsub_channels 里de dictFind(server.pubsub_channels,channel);if (de NULL) {// 如果 de 等于 NULL// 表示这个客户端是首个订阅这个 channel 的客户端// 那么创建一个新的列表 并将它加入到哈希表中clients listCreate();dictAdd(server.pubsub_channels,channel,clients);incrRefCount(channel);} else {// 如果 de 不为空就取出这个 clients 链表clients dictGetVal(de);}// 将客户端加入到链表中listAddNodeTail(clients,c);}/* Notify the client */addReply(c,shared.mbulkhdr[3]);addReply(c,shared.subscribebulk);// 返回订阅的频道addReplyBulk(c,channel);// 返回客户端当前已订阅的频道和模式数量的总和addReplyLongLong(c,dictSize(c-pubsub_channels)listLength(c-pubsub_patterns));return retval;}PSUBSCRIBE 命令的实现除了直接订阅给定 channel 外还可以使用 PSUBSCRIBE 订阅一个模式(pattern)订阅一个模式等同于订阅所有匹配这个模式的 channel 。和 redisServer.pubsub_channels 属性类似 redisServer.pubsub_patterns 属性用于保存所有被订阅的模式和 pubsub_channels 不同的是 pubsub_patterns 是一个链表(而不是字典)struct redisServer {// 省略 ...list *pubsub_patterns; // A list of pubsub_patterns// 省略 ...};pubsub_patterns 的每一个节点都是一个 pubsubPattern 结构的实例它保存了被订阅的模式以及订阅这个模式的客户客户端typedef struct pubsubPattern {redisClient *client;robj *pattern;} pubsubPattern;举个例子假设在一个 redisServer 实例中有一个叫做 news.* 的模式同时被客户端client_789 和 client_999 订阅那么这个 redisServer 结构看起来应该是这样子现在可以知道实现 PSUBSCRIBE 命令的关键就是将客户端和订阅的模式添加到redisServer.pubsub_patterns 当中。pubsubSubscribePattern 是 PSUBSCRIBE 的底层实现它将客户端和所订阅的模式添加到redisServer.pubsub_patterns 当中// 订阅指定模式// 订阅成功返回 1 如果已经订阅过返回 0int pubsubSubscribePattern(redisClient *c, robj *pattern) {int retval 0;// 向 c-pubsub_patterns 中查找指定 pattern// 如果返回值为 NULL 说明这个 pattern 还没被这个客户端订阅过if (listSearchKey(c-pubsub_patterns,pattern) NULL) {retval 1;// 添加 pattern 到客户端 pubsub_patternslistAddNodeTail(c-pubsub_patterns,pattern);incrRefCount(pattern);// 将 pattern 添加到服务器pubsubPattern *pat;pat zmalloc(sizeof(*pat));pat-pattern getDecodedObject(pattern);pat-client c;listAddNodeTail(server.pubsub_patterns,pat);}/* Notify the client */addReply(c,shared.mbulkhdr[3]);addReply(c,shared.psubscribebulk);// 返回被订阅的模式addReplyBulk(c,pattern);// 返回客户端当前已订阅的频道和模式数量的总和addReplyLongLong(c,dictSize(c-pubsub_channels)listLength(c-pubsub_patterns));return retval;}PUBLISH 命令的实现使用 PUBLISH 命令向订阅者发送消息需要执行以下两个步骤1) 使用给定的频道作为键在 redisServer.pubsub_channels 字典中查找记录了订阅这个频道的所有客户端的链表遍历这个链表将消息发布给所有订阅者。2) 遍历 redisServer.pubsub_patterns 链表将链表中的模式和给定的频道进行匹配如果匹配成功那么将消息发布到相应模式的客户端当中。举个例子假设有两个客户端分别订阅 it.news 频道和 it.* 模式当执行命令PUBLISH it.news hello moto 的时候 it.news 频道的订阅者会在步骤 1 收到信息而当PUBLISH 进行到步骤 2 的时候 it.* 模式的订阅者也会收到信息。PUBLISH 命令的实际实现由 pubsubPublishMessage 函数完成它的完整定义如下// 发送消息int pubsubPublishMessage(robj *channel, robj *message) {int receivers 0;struct dictEntry *de;listNode *ln;listIter li;/* Send to clients listening for that channel */// 向所有频道的订阅者发送消息de dictFind(server.pubsub_channels,channel);if (de) {list *list dictGetVal(de); // 取出所有订阅者listNode *ln;listIter li;// 遍历所有订阅者 向它们发送消息listRewind(list,li);while ((ln listNext(li)) ! NULL) {redisClient *c ln-value;addReply(c,shared.mbulkhdr[3]);addReply(c,shared.messagebulk);addReplyBulk(c,channel); // 打印频道名addReplyBulk(c,message); // 打印消息receivers; // 更新接收者数量}}/* Send to clients listening to matching channels */// 向所有被匹配模式的订阅者发送消息if (listLength(server.pubsub_patterns)) {listRewind(server.pubsub_patterns,li); // 取出所有模式channel getDecodedObject(channel);while ((ln listNext(li)) ! NULL) {pubsubPattern *pat ln-value; // 取出模式// 如果模式和 channel 匹配的话// 向这个模式的订阅者发送消息if (stringmatchlen((char*)pat-pattern-ptr,sdslen(pat-pattern-ptr),(char*)channel-ptr,sdslen(channel-ptr),0)) {addReply(pat-client,shared.mbulkhdr[4]);addReply(pat-client,shared.pmessagebulk);addReplyBulk(pat-client,pat-pattern); // 打印被匹配的模式addReplyBulk(pat-client,channel); // 打印频道名addReplyBulk(pat-client,message); // 打印消息receivers; // 更新接收者数量}}decrRefCount(channel); // 释放用过的 channel}return receivers; // 返回接收者数量}UNSUBSCRIBE 和 PUNSUBSCRIBE 的实现UNSUBSCRIBE 和 PUNSUBSCRIBE 分别是 SUBSCRIBE 和 PSUBSCRIBE 的反操作如果明白了SUBSCRIBE 和 PSUBSCRIBE 的工作机制的话应该不难理解这两个反操作的原理所以这里就省略详细的分析了有兴趣的可以直接看代码。小节Redis 的 pubsub 机制的分析就到此结束了跟往常一样带有注释的完整 pubsub.c 文件可以到原作者的 GITHUB 上找 https://github.com/huangz1990/reading_redis_source