Boost up performance for redis PUB-SUB patterns matching

If lots of clients PSUBSCRIBE to same patterns, multiple pattens matching will take place. This commit change it into just one single pattern matching by using a `dict *` to store the unique pattern and which clients subscribe to it.
This commit is contained in:
伯成 2018-03-01 11:46:56 +08:00
parent 3a5bf75ede
commit dfb12f0628
3 changed files with 48 additions and 12 deletions

View File

@ -125,6 +125,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) { int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
list *clients;
int retval = 0; int retval = 0;
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
@ -136,6 +138,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
pat->pattern = getDecodedObject(pattern); pat->pattern = getDecodedObject(pattern);
pat->client = c; pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat); listAddNodeTail(server.pubsub_patterns,pat);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns_dict,pattern);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_patterns_dict,pattern,clients);
incrRefCount(pattern);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
} }
/* Notify the client */ /* Notify the client */
addReply(c,shared.mbulkhdr[3]); addReply(c,shared.mbulkhdr[3]);
@ -148,6 +160,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */ * 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
dictEntry *de;
list *clients;
listNode *ln; listNode *ln;
pubsubPattern pat; pubsubPattern pat;
int retval = 0; int retval = 0;
@ -160,6 +174,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
pat.pattern = pattern; pat.pattern = pattern;
ln = listSearchKey(server.pubsub_patterns,&pat); ln = listSearchKey(server.pubsub_patterns,&pat);
listDelNode(server.pubsub_patterns,ln); listDelNode(server.pubsub_patterns,ln);
/* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns_dict,pattern);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client. */
dictDelete(server.pubsub_patterns_dict,pattern);
}
} }
/* Notify the client */ /* Notify the client */
if (notify) { if (notify) {
@ -225,6 +251,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
int pubsubPublishMessage(robj *channel, robj *message) { int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0; int receivers = 0;
dictEntry *de; dictEntry *de;
dictIterator *di;
listNode *ln; listNode *ln;
listIter li; listIter li;
@ -247,25 +274,32 @@ int pubsubPublishMessage(robj *channel, robj *message) {
} }
} }
/* Send to clients listening to matching channels */ /* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) { di = dictGetIterator(server.pubsub_patterns_dict);
listRewind(server.pubsub_patterns,&li); if (di) {
channel = getDecodedObject(channel); channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) { while((de = dictNext(di)) != NULL) {
pubsubPattern *pat = ln->value; robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
if (stringmatchlen((char*)pat->pattern->ptr, if (!stringmatchlen((char*)pattern->ptr,
sdslen(pat->pattern->ptr), sdslen(pattern->ptr),
(char*)channel->ptr, (char*)channel->ptr,
sdslen(channel->ptr),0)) { sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]); continue;
addReply(pat->client,shared.pmessagebulk); }
addReplyBulk(pat->client,pat->pattern); listRewind(clients,&li);
addReplyBulk(pat->client,channel); while ((ln = listNext(&li)) != NULL) {
addReplyBulk(pat->client,message); client *c = listNodeValue(ln);
addReply(c,shared.mbulkhdr[4]);
addReply(c,shared.pmessagebulk);
addReplyBulk(c,pattern);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++; receivers++;
} }
} }
decrRefCount(channel); decrRefCount(channel);
dictReleaseIterator(di);
} }
return receivers; return receivers;
} }

View File

@ -1900,6 +1900,7 @@ void initServer(void) {
evictionPoolAlloc(); /* Initialize the LRU keys pool. */ evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate(); server.pubsub_patterns = listCreate();
server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0; server.cronloops = 0;

View File

@ -1163,6 +1163,7 @@ struct redisServer {
/* Pubsub */ /* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */ list *pubsub_patterns; /* A list of pubsub_patterns */
dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */ xor of NOTIFY_... flags. */
/* Cluster */ /* Cluster */