CG: add & populate group+consumer in the blocking state.

This commit is contained in:
antirez 2018-01-19 16:39:09 +01:00
parent b8e5232161
commit ccdae09046
4 changed files with 13 additions and 8 deletions

View File

@ -442,7 +442,9 @@ void unblockClientWaitingData(client *c) {
} }
if (c->bpop.xread_group) { if (c->bpop.xread_group) {
decrRefCount(c->bpop.xread_group); decrRefCount(c->bpop.xread_group);
decrRefCount(c->bpop.xread_consumer);
c->bpop.xread_group = NULL; c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
} }
} }

View File

@ -137,6 +137,7 @@ client *createClient(int fd) {
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL; c->bpop.target = NULL;
c->bpop.xread_group = NULL; c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
c->bpop.numreplicas = 0; c->bpop.numreplicas = 0;
c->bpop.reploffset = 0; c->bpop.reploffset = 0;
c->woff = 0; c->woff = 0;

View File

@ -653,7 +653,8 @@ typedef struct blockingState {
/* BLOCK_STREAM */ /* BLOCK_STREAM */
size_t xread_count; /* XREAD COUNT option. */ size_t xread_count; /* XREAD COUNT option. */
robj *xread_group; /* XREAD group name. */ robj *xread_group; /* XREADGROUP group name. */
robj *xread_consumer; /* XREADGROUP consumer name. */
mstime_t xread_retry_time, xread_retry_ttl; mstime_t xread_retry_time, xread_retry_ttl;
/* BLOCKED_WAIT */ /* BLOCKED_WAIT */

View File

@ -943,8 +943,8 @@ void xreadCommand(client *c) {
streamID *ids = static_ids; streamID *ids = static_ids;
streamCG **groups = NULL; streamCG **groups = NULL;
int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
sds groupname = NULL; robj *groupname = NULL;
sds consumername = NULL; robj *consumername = NULL;
/* Parse arguments. */ /* Parse arguments. */
for (int i = 1; i < c->argc; i++) { for (int i = 1; i < c->argc; i++) {
@ -976,8 +976,8 @@ void xreadCommand(client *c) {
"XREADGROUP. You called XREAD instead."); "XREADGROUP. You called XREAD instead.");
return; return;
} }
groupname = c->argv[i+1]->ptr; groupname = c->argv[i+1];
consumername = c->argv[i+2]->ptr; consumername = c->argv[i+2];
i += 2; i += 2;
} else { } else {
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
@ -1011,12 +1011,12 @@ void xreadCommand(client *c) {
o = lookupKeyRead(c->db,key); o = lookupKeyRead(c->db,key);
if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
if (o == NULL || if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname)) == NULL) (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{ {
addReplyErrorFormat(c, "No such key '%s' or consumer " addReplyErrorFormat(c, "No such key '%s' or consumer "
"group '%s' in XREADGROUP with GROUP " "group '%s' in XREADGROUP with GROUP "
"option", "option",
key->ptr,groupname); key->ptr,groupname->ptr);
goto cleanup; goto cleanup;
} }
groups[id_idx] = group; groups[id_idx] = group;
@ -1093,7 +1093,8 @@ void xreadCommand(client *c) {
* in case the ID provided is too low, we do not want the server to * in case the ID provided is too low, we do not want the server to
* block just to serve this client a huge stream of messages. */ * block just to serve this client a huge stream of messages. */
c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
c->bpop.xread_group = NULL; /* Not used for now. */ c->bpop.xread_group = groupname;
c->bpop.xread_consumer = consumername;
goto cleanup; goto cleanup;
} }