diff --git a/src/blocked.c b/src/blocked.c index f438c3353..d560a8f38 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -442,7 +442,9 @@ void unblockClientWaitingData(client *c) { } if (c->bpop.xread_group) { decrRefCount(c->bpop.xread_group); + decrRefCount(c->bpop.xread_consumer); c->bpop.xread_group = NULL; + c->bpop.xread_consumer = NULL; } } diff --git a/src/networking.c b/src/networking.c index 51c239fc8..c29adc1e0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -137,6 +137,7 @@ client *createClient(int fd) { c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); c->bpop.target = NULL; c->bpop.xread_group = NULL; + c->bpop.xread_consumer = NULL; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; diff --git a/src/server.h b/src/server.h index 72a6563a3..d4f989264 100644 --- a/src/server.h +++ b/src/server.h @@ -653,7 +653,8 @@ typedef struct blockingState { /* BLOCK_STREAM */ size_t xread_count; /* XREAD COUNT option. */ - robj *xread_group; /* XREAD group name. */ + robj *xread_group; /* XREADGROUP group name. */ + robj *xread_consumer; /* XREADGROUP consumer name. */ mstime_t xread_retry_time, xread_retry_ttl; /* BLOCKED_WAIT */ diff --git a/src/t_stream.c b/src/t_stream.c index 82c926b39..65a926ae1 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -943,8 +943,8 @@ void xreadCommand(client *c) { streamID *ids = static_ids; streamCG **groups = NULL; int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ - sds groupname = NULL; - sds consumername = NULL; + robj *groupname = NULL; + robj *consumername = NULL; /* Parse arguments. */ for (int i = 1; i < c->argc; i++) { @@ -976,8 +976,8 @@ void xreadCommand(client *c) { "XREADGROUP. You called XREAD instead."); return; } - groupname = c->argv[i+1]->ptr; - consumername = c->argv[i+2]->ptr; + groupname = c->argv[i+1]; + consumername = c->argv[i+2]; i += 2; } else { addReply(c,shared.syntaxerr); @@ -1011,12 +1011,12 @@ void xreadCommand(client *c) { o = lookupKeyRead(c->db,key); if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; if (o == NULL || - (group = streamLookupCG(o->ptr,groupname)) == NULL) + (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) { addReplyErrorFormat(c, "No such key '%s' or consumer " "group '%s' in XREADGROUP with GROUP " "option", - key->ptr,groupname); + key->ptr,groupname->ptr); goto cleanup; } groups[id_idx] = group; @@ -1093,7 +1093,8 @@ void xreadCommand(client *c) { * in case the ID provided is too low, we do not want the server to * block just to serve this client a huge stream of messages. */ c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; - c->bpop.xread_group = NULL; /* Not used for now. */ + c->bpop.xread_group = groupname; + c->bpop.xread_consumer = consumername; goto cleanup; }