Process MKSTREAM option of XGROUP CREATE at a later time.

This avoids issues with having to replicate a command that produced
errors.
This commit is contained in:
antirez 2018-10-17 12:04:03 +02:00
parent ab11c5ebd1
commit 2e3d403349

View File

@ -1701,10 +1701,8 @@ NULL
sds grpname = NULL; sds grpname = NULL;
streamCG *cg = NULL; streamCG *cg = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */ char *opt = c->argv[1]->ptr; /* Subcommand name. */
int mkstream = 0;
/* Lookup the key now, this is common for all the subcommands but HELP. */ robj *o;
if (c->argc >= 4) {
robj *o = lookupKeyWrite(c->db,c->argv[2]);
/* CREATE has an MKSTREAM option that creates the stream if it /* CREATE has an MKSTREAM option that creates the stream if it
* does not exist. */ * does not exist. */
@ -1713,12 +1711,19 @@ NULL
addReplySubcommandSyntaxError(c); addReplySubcommandSyntaxError(c);
return; return;
} }
if (o == NULL) { mkstream = 1;
o = createStreamObject(); grpname = c->argv[3]->ptr;
dbAdd(c->db,c->argv[2],o);
}
} }
/* Everything but the "HELP" option requires a key and group name. */
if (c->argc > 4) {
o = lookupKeyWrite(c->db,c->argv[2]);
if (o) s = o->ptr;
grpname = c->argv[3]->ptr;
}
/* Check for missing key/group. */
if (c->argc >= 4 && !mkstream) {
/* At this point key must exist, or there is an error. */ /* At this point key must exist, or there is an error. */
if (o == NULL) { if (o == NULL) {
addReplyError(c, addReplyError(c,
@ -1729,8 +1734,6 @@ NULL
} }
if (checkType(c,o,OBJ_STREAM)) return; if (checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
grpname = c->argv[3]->ptr;
/* Certain subcommands require the group to exist. */ /* Certain subcommands require the group to exist. */
if ((cg = streamLookupCG(s,grpname)) == NULL && if ((cg = streamLookupCG(s,grpname)) == NULL &&
@ -1752,6 +1755,14 @@ NULL
} else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) { } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return; return;
} }
/* Handle the MKSTREAM option now that the command can no longer fail. */
if (s == NULL && mkstream) {
robj *o = createStreamObject();
dbAdd(c->db,c->argv[2],o);
s = o->ptr;
}
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
if (cg) { if (cg) {
addReply(c,shared.ok); addReply(c,shared.ok);