XGROUP CREATE: MKSTREAM option for automatic stream creation.

This commit is contained in:
antirez 2018-10-17 11:27:09 +02:00
parent 2f8f29aa0e
commit cb27dd1a68

View File

@ -1683,13 +1683,14 @@ uint64_t streamDelConsumer(streamCG *cg, sds name) {
* Consumer groups commands
* ----------------------------------------------------------------------- */
/* XGROUP CREATE <key> <groupname> <id or $>
/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
* XGROUP SETID <key> <groupname> <id or $>
* XGROUP DESTROY <key> <groupname>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */
void xgroupCommand(client *c) {
const char *help[] = {
"CREATE <key> <groupname> <id or $> -- Create a new consumer group.",
"CREATE <key> <groupname> <id or $> [opt] -- Create a new consumer group.",
" option MKSTREAM: create the empty stream if it does not exist.",
"SETID <key> <groupname> <id or $> -- Set the current group ID.",
"DESTROY <key> <groupname> -- Remove the specified group.",
"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.",
@ -1703,8 +1704,31 @@ NULL
/* Lookup the key now, this is common for all the subcommands but HELP. */
if (c->argc >= 4) {
robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
robj *o = lookupKeyWrite(c->db,c->argv[2]);
/* CREATE has an MKSTREAM option that creates the stream if it
* does not exist. */
if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
addReplySubcommandSyntaxError(c);
return;
}
if (o == NULL) {
o = createStreamObject();
dbAdd(c->db,c->argv[2],o);
}
}
/* At this point key must exist, or there is an error. */
if (o == NULL) {
addReplyError(c,
"The XGROUP subcommand requires the key to exist. "
"Note that for CREATE you may want to use the MKSTREAM "
"option to create an empty stream automatically.");
return;
}
if (checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
grpname = c->argv[3]->ptr;
@ -1721,7 +1745,7 @@ NULL
}
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CREATE") && c->argc == 5) {
if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;