diff --git a/src/aof.c b/src/aof.c index 9359deabf..9d4587781 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2035,10 +2035,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Append XSETID after XADD, make sure lastid is correct, * in case of XDEL lastid. */ - if (!rioWriteBulkCount(r,'*',3) || + if (!rioWriteBulkCount(r,'*',7) || !rioWriteBulkString(r,"XSETID",6) || !rioWriteBulkObject(r,key) || - !rioWriteBulkStreamID(r,&s->last_id)) + !rioWriteBulkStreamID(r,&s->last_id) || + !rioWriteBulkString(r,"ENTRIESADDED",12) || + !rioWriteBulkLongLong(r,s->entries_added) || + !rioWriteBulkString(r,"MAXDELETEDID",12) || + !rioWriteBulkStreamID(r,&s->max_deleted_entry_id)) { streamIteratorStop(&si); return 0; @@ -2053,12 +2057,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { while(raxNext(&ri)) { streamCG *group = ri.data; /* Emit the XGROUP CREATE in order to create the group. */ - if (!rioWriteBulkCount(r,'*',5) || + if (!rioWriteBulkCount(r,'*',7) || !rioWriteBulkString(r,"XGROUP",6) || !rioWriteBulkString(r,"CREATE",6) || !rioWriteBulkObject(r,key) || !rioWriteBulkString(r,(char*)ri.key,ri.key_len) || - !rioWriteBulkStreamID(r,&group->last_id)) + !rioWriteBulkStreamID(r,&group->last_id) || + !rioWriteBulkString(r,"ENTRIESREAD",11) || + !rioWriteBulkLongLong(r,group->entries_read)) { raxStop(&ri); streamIteratorStop(&si); diff --git a/src/cluster.c b/src/cluster.c index d1626117c..ef381f29a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -5701,7 +5701,7 @@ int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) { if (len < 10) return C_ERR; footer = p+(len-10); - /* Verify RDB version */ + /* Set and verify RDB version. */ rdbver = (footer[1] << 8) | footer[0]; if (rdbver_ptr) { *rdbver_ptr = rdbver; diff --git a/src/commands.c b/src/commands.c index d6e6163eb..2de24bc2a 100644 --- a/src/commands.c +++ b/src/commands.c @@ -5998,7 +5998,10 @@ struct redisCommandArg XDEL_Args[] = { /********** XGROUP CREATE ********************/ /* XGROUP CREATE history */ -#define XGROUP_CREATE_History NULL +commandHistory XGROUP_CREATE_History[] = { +{"7.0.0","Added the `entries_read` named argument."}, +{0} +}; /* XGROUP CREATE tips */ #define XGROUP_CREATE_tips NULL @@ -6016,6 +6019,7 @@ struct redisCommandArg XGROUP_CREATE_Args[] = { {"groupname",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"id",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=XGROUP_CREATE_id_Subargs}, {"mkstream",ARG_TYPE_PURE_TOKEN,-1,"MKSTREAM",NULL,NULL,CMD_ARG_OPTIONAL}, +{"entries_read",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL}, {0} }; @@ -6077,7 +6081,10 @@ struct redisCommandArg XGROUP_DESTROY_Args[] = { /********** XGROUP SETID ********************/ /* XGROUP SETID history */ -#define XGROUP_SETID_History NULL +commandHistory XGROUP_SETID_History[] = { +{"7.0.0","Added the optional `entries_read` argument."}, +{0} +}; /* XGROUP SETID tips */ #define XGROUP_SETID_tips NULL @@ -6094,6 +6101,7 @@ struct redisCommandArg XGROUP_SETID_Args[] = { {"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE}, {"groupname",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"id",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=XGROUP_SETID_id_Subargs}, +{"entries_read",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL}, {0} }; @@ -6104,7 +6112,7 @@ struct redisCommand XGROUP_Subcommands[] = { {"delconsumer","Delete a consumer from a consumer group.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_DELCONSUMER_History,XGROUP_DELCONSUMER_tips,xgroupCommand,5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_DELCONSUMER_Args}, {"destroy","Destroy a consumer group.","O(N) where N is the number of entries in the group's pending entries list (PEL).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_DESTROY_History,XGROUP_DESTROY_tips,xgroupCommand,4,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_DESTROY_Args}, {"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_HELP_History,XGROUP_HELP_tips,xgroupCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_STREAM}, -{"setid","Set a consumer group to an arbitrary last delivered ID value.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_SETID_History,XGROUP_SETID_tips,xgroupCommand,5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_SETID_Args}, +{"setid","Set a consumer group to an arbitrary last delivered ID value.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_SETID_History,XGROUP_SETID_tips,xgroupCommand,-5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_SETID_Args}, {0} }; @@ -6137,7 +6145,10 @@ struct redisCommandArg XINFO_CONSUMERS_Args[] = { /********** XINFO GROUPS ********************/ /* XINFO GROUPS history */ -#define XINFO_GROUPS_History NULL +commandHistory XINFO_GROUPS_History[] = { +{"7.0.0","Added the `entries-read` and `lag` fields"}, +{0} +}; /* XINFO GROUPS tips */ #define XINFO_GROUPS_tips NULL @@ -6159,7 +6170,10 @@ struct redisCommandArg XINFO_GROUPS_Args[] = { /********** XINFO STREAM ********************/ /* XINFO STREAM history */ -#define XINFO_STREAM_History NULL +commandHistory XINFO_STREAM_History[] = { +{"7.0.0","Added the `max-deleted-entry-id`, `entries-added`, `recorded-first-entry-id`, `entries-read` and `lag` fields"}, +{0} +}; /* XINFO STREAM tips */ #define XINFO_STREAM_tips NULL @@ -6338,7 +6352,10 @@ struct redisCommandArg XREVRANGE_Args[] = { /********** XSETID ********************/ /* XSETID history */ -#define XSETID_History NULL +commandHistory XSETID_History[] = { +{"7.0.0","Added the `entries_added` and `max_deleted_entry_id` arguments."}, +{0} +}; /* XSETID tips */ #define XSETID_tips NULL @@ -6347,6 +6364,8 @@ struct redisCommandArg XREVRANGE_Args[] = { struct redisCommandArg XSETID_Args[] = { {"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE}, {"last-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, +{"entries_added",ARG_TYPE_INTEGER,-1,"ENTRIESADDED",NULL,NULL,CMD_ARG_OPTIONAL}, +{"max_deleted_entry_id",ARG_TYPE_STRING,-1,"MAXDELETEDID",NULL,NULL,CMD_ARG_OPTIONAL}, {0} }; @@ -7057,7 +7076,7 @@ struct redisCommand redisCommandTable[] = { {"xread","Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.","For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREAD_History,XREAD_tips,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY|CMD_BLOCKING,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",1},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREAD_Args}, {"xreadgroup","Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREADGROUP_History,XREADGROUP_tips,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",4},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREADGROUP_Args}, {"xrevrange","Return a range of elements in a stream, with IDs matching the specified IDs interval, in reverse order (from greater to smaller IDs) compared to XRANGE","O(N) with N being the number of elements returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREVRANGE_History,XREVRANGE_tips,xrevrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XREVRANGE_Args}, -{"xsetid","An internal command for replicating stream values","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XSETID_History,XSETID_tips,xsetidCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XSETID_Args}, +{"xsetid","An internal command for replicating stream values","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XSETID_History,XSETID_tips,xsetidCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XSETID_Args}, {"xtrim","Trims the stream to (approximately if '~' is passed) a certain size","O(N), with N being the number of evicted entries. Constant times are very small however, since entries are organized in macro nodes containing multiple entries that can be released with a single deallocation.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XTRIM_History,XTRIM_tips,xtrimCommand,-4,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XTRIM_Args}, /* string */ {"append","Append a value to a key","O(1). The amortized time complexity is O(1) assuming the appended value is small and the already present value is of any size, since the dynamic string library used by Redis will double the free space available on every reallocation.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STRING,APPEND_History,APPEND_tips,appendCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STRING,{{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=APPEND_Args}, diff --git a/src/commands/xgroup-create.json b/src/commands/xgroup-create.json index a14c0e577..2b1ee03b4 100644 --- a/src/commands/xgroup-create.json +++ b/src/commands/xgroup-create.json @@ -7,6 +7,12 @@ "arity": -5, "container": "XGROUP", "function": "xgroupCommand", + "history": [ + [ + "7.0.0", + "Added the `entries_read` named argument." + ] + ], "command_flags": [ "WRITE", "DENYOOM" @@ -64,6 +70,12 @@ "name": "mkstream", "type": "pure-token", "optional": true + }, + { + "token": "ENTRIESREAD", + "name": "entries_read", + "type": "integer", + "optional": true } ] } diff --git a/src/commands/xgroup-setid.json b/src/commands/xgroup-setid.json index 5c2ffcf19..af4b83c19 100644 --- a/src/commands/xgroup-setid.json +++ b/src/commands/xgroup-setid.json @@ -4,9 +4,15 @@ "complexity": "O(1)", "group": "stream", "since": "5.0.0", - "arity": 5, + "arity": -5, "container": "XGROUP", "function": "xgroupCommand", + "history": [ + [ + "7.0.0", + "Added the optional `entries_read` argument." + ] + ], "command_flags": [ "WRITE" ], @@ -57,6 +63,12 @@ "token": "$" } ] + }, + { + "name": "entries_read", + "token": "ENTRIESREAD", + "type": "integer", + "optional": true } ] } diff --git a/src/commands/xinfo-groups.json b/src/commands/xinfo-groups.json index 546d2030e..e9b61ba06 100644 --- a/src/commands/xinfo-groups.json +++ b/src/commands/xinfo-groups.json @@ -6,6 +6,12 @@ "since": "5.0.0", "arity": 3, "container": "XINFO", + "history": [ + [ + "7.0.0", + "Added the `entries-read` and `lag` fields" + ] + ], "function": "xinfoCommand", "command_flags": [ "READONLY" diff --git a/src/commands/xinfo-stream.json b/src/commands/xinfo-stream.json index 43ae9bc8b..5b7d9ad57 100644 --- a/src/commands/xinfo-stream.json +++ b/src/commands/xinfo-stream.json @@ -6,6 +6,12 @@ "since": "5.0.0", "arity": -3, "container": "XINFO", + "history": [ + [ + "7.0.0", + "Added the `max-deleted-entry-id`, `entries-added`, `recorded-first-entry-id`, `entries-read` and `lag` fields" + ] + ], "function": "xinfoCommand", "command_flags": [ "READONLY" diff --git a/src/commands/xsetid.json b/src/commands/xsetid.json index 8faa2c7e9..7654784e1 100644 --- a/src/commands/xsetid.json +++ b/src/commands/xsetid.json @@ -4,8 +4,14 @@ "complexity": "O(1)", "group": "stream", "since": "5.0.0", - "arity": 3, + "arity": -3, "function": "xsetidCommand", + "history": [ + [ + "7.0.0", + "Added the `entries_added` and `max_deleted_entry_id` arguments." + ] + ], "command_flags": [ "WRITE", "DENYOOM", @@ -43,6 +49,18 @@ { "name": "last-id", "type": "string" + }, + { + "name": "entries_added", + "token": "ENTRIESADDED", + "type": "integer", + "optional": true + }, + { + "name": "max_deleted_entry_id", + "token": "MAXDELETEDID", + "type": "string", + "optional": true } ] } diff --git a/src/rdb.c b/src/rdb.c index 360ca605b..d5f853dd8 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -692,7 +692,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) { else serverPanic("Unknown hash encoding"); case OBJ_STREAM: - return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS); + return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2); case OBJ_MODULE: return rdbSaveType(rdb,RDB_TYPE_MODULE_2); default: @@ -986,6 +986,19 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { nwritten += n; if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; nwritten += n; + /* Save the first entry ID. */ + if ((n = rdbSaveLen(rdb,s->first_id.ms)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,s->first_id.seq)) == -1) return -1; + nwritten += n; + /* Save the maximal tombstone ID. */ + if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.ms)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.seq)) == -1) return -1; + nwritten += n; + /* Save the offset. */ + if ((n = rdbSaveLen(rdb,s->entries_added)) == -1) return -1; + nwritten += n; /* The consumer groups and their clients are part of the stream * type, so serialize every consumer group. */ @@ -1020,6 +1033,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { return -1; } nwritten += n; + + /* Save the group's logical reads counter. */ + if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) { + raxStop(&ri); + return -1; + } + nwritten += n; /* Save the global PEL. */ if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) { @@ -2321,7 +2341,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } - } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) { + } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { o = createStreamObject(); stream *s = o->ptr; uint64_t listpacks = rdbLoadLen(rdb,NULL); @@ -2397,6 +2417,30 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { /* Load the last entry ID. */ s->last_id.ms = rdbLoadLen(rdb,NULL); s->last_id.seq = rdbLoadLen(rdb,NULL); + + if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { + /* Load the first entry ID. */ + s->first_id.ms = rdbLoadLen(rdb,NULL); + s->first_id.seq = rdbLoadLen(rdb,NULL); + + /* Load the maximal deleted entry ID. */ + s->max_deleted_entry_id.ms = rdbLoadLen(rdb,NULL); + s->max_deleted_entry_id.seq = rdbLoadLen(rdb,NULL); + + /* Load the offset. */ + s->entries_added = rdbLoadLen(rdb,NULL); + } else { + /* During migration the offset can be initialized to the stream's + * length. At this point, we also don't care about tombstones + * because CG offsets will be later initialized as well. */ + s->max_deleted_entry_id.ms = 0; + s->max_deleted_entry_id.seq = 0; + s->entries_added = s->length; + + /* Since the rax is already loaded, we can find the first entry's + * ID. */ + streamGetEdgeID(s,1,1,&s->first_id); + } if (rioGetReadError(rdb)) { rdbReportReadError("Stream object metadata loading failed."); @@ -2432,8 +2476,22 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { decrRefCount(o); return NULL; } + + /* Load group offset. */ + uint64_t cg_offset; + if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { + cg_offset = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream cgroup offset loading failed."); + sdsfree(cgname); + decrRefCount(o); + return NULL; + } + } else { + cg_offset = streamEstimateDistanceFromFirstEverEntry(s,&cg_id); + } - streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); + streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id,cg_offset); if (cgroup == NULL) { rdbReportCorruptRDB("Duplicated consumer group name %s", cgname); diff --git a/src/rdb.h b/src/rdb.h index 3c7b5ffcc..0d298c40d 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -94,10 +94,11 @@ #define RDB_TYPE_HASH_LISTPACK 16 #define RDB_TYPE_ZSET_LISTPACK 17 #define RDB_TYPE_LIST_QUICKLIST_2 18 +#define RDB_TYPE_STREAM_LISTPACKS_2 19 /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ /* Test if a type is an object type. */ -#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) +#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 19)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_FUNCTION 246 /* engine data */ diff --git a/src/server.c b/src/server.c index 3f2dffa79..9bf2193f0 100644 --- a/src/server.c +++ b/src/server.c @@ -1767,6 +1767,7 @@ void createSharedObjects(void) { shared.retrycount = createStringObject("RETRYCOUNT",10); shared.force = createStringObject("FORCE",5); shared.justid = createStringObject("JUSTID",6); + shared.entriesread = createStringObject("ENTRIESREAD",11); shared.lastid = createStringObject("LASTID",6); shared.default_username = createStringObject("default",7); shared.ping = createStringObject("ping",4); diff --git a/src/server.h b/src/server.h index 82a8549ff..30c40f946 100644 --- a/src/server.h +++ b/src/server.h @@ -1223,7 +1223,7 @@ struct sharedObjectsStruct { *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, - *time, *pxat, *absttl, *retrycount, *force, *justid, + *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk,*sunsubscribebulk, diff --git a/src/stream.h b/src/stream.h index 724f2c2ad..2d4997919 100644 --- a/src/stream.h +++ b/src/stream.h @@ -15,8 +15,11 @@ typedef struct streamID { typedef struct stream { rax *rax; /* The radix tree holding the stream. */ - uint64_t length; /* Number of elements inside this stream. */ + uint64_t length; /* Current number of elements inside this stream. */ streamID last_id; /* Zero if there are yet no items. */ + streamID first_id; /* The first non-tombstone entry, zero if empty. */ + streamID max_deleted_entry_id; /* The maximal ID that was deleted. */ + uint64_t entries_added; /* All time count of elements added. */ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream; @@ -34,6 +37,7 @@ typedef struct streamIterator { unsigned char *master_fields_ptr; /* Master field to emit next. */ int entry_flags; /* Flags of entry we are emitting. */ int rev; /* True if iterating end to start (reverse). */ + int skip_tombstones; /* True if not emitting tombstone entries. */ uint64_t start_key[2]; /* Start key as 128 bit big endian. */ uint64_t end_key[2]; /* End key as 128 bit big endian. */ raxIterator ri; /* Rax iterator. */ @@ -52,6 +56,11 @@ typedef struct streamCG { streamID last_id; /* Last delivered (not acknowledged) ID for this group. Consumers that will just ask for more messages will served with IDs > than this. */ + long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no + XGROUP SETID, ...), this is the total number of + group reads. In the real world, the reasoning behind + this value is detailed at the top comment of + streamEstimateDistanceFromFirstEverEntry(). */ rax *pel; /* Pending entries list. This is a radix tree that has every message delivered to consumers (without the NOACK option) that was yet not acknowledged @@ -105,6 +114,8 @@ struct client; #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ #define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */ +#define SCG_INVALID_ENTRIES_READ -1 + stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); @@ -117,7 +128,7 @@ void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags); -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); @@ -131,6 +142,8 @@ int streamParseID(const robj *o, streamID *id); robj *createObjectFromStreamID(streamID *id); int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given); int streamDeleteItem(stream *s, streamID *id); +void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); +long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); int64_t streamTrimByLength(stream *s, long long maxlen, int approx); int64_t streamTrimByID(stream *s, streamID minid, int approx); diff --git a/src/t_stream.c b/src/t_stream.c index 9ff6a17ac..408c0199a 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -68,8 +68,13 @@ stream *streamNew(void) { stream *s = zmalloc(sizeof(*s)); s->rax = raxNew(); s->length = 0; + s->first_id.ms = 0; + s->first_id.seq = 0; s->last_id.ms = 0; s->last_id.seq = 0; + s->max_deleted_entry_id.seq = 0; + s->max_deleted_entry_id.ms = 0; + s->entries_added = 0; s->cgroups = NULL; /* Created on demand to save memory when not used. */ return s; } @@ -184,7 +189,10 @@ robj *streamDup(robj *o) { new_lp, NULL); } new_s->length = s->length; + new_s->first_id = s->first_id; new_s->last_id = s->last_id; + new_s->max_deleted_entry_id = s->max_deleted_entry_id; + new_s->entries_added = s->entries_added; raxStop(&ri); if (s->cgroups == NULL) return sobj; @@ -196,7 +204,8 @@ robj *streamDup(robj *o) { while (raxNext(&ri_cgroups)) { streamCG *cg = ri_cgroups.data; streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, - ri_cgroups.key_len, &cg->last_id); + ri_cgroups.key_len, &cg->last_id, + cg->entries_read); serverAssert(new_cg != NULL); @@ -378,37 +387,21 @@ int streamCompareID(streamID *a, streamID *b) { return 0; } -void streamGetEdgeID(stream *s, int first, streamID *edge_id) +/* Retrieves the ID of the stream edge entry. An edge is either the first or + * the last ID in the stream, and may be a tombstone. To filter out tombstones, + * set the'skip_tombstones' argument to 1. */ +void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id) { - raxIterator ri; - raxStart(&ri, s->rax); - int empty; - if (first) { - raxSeek(&ri, "^", NULL, 0); - empty = !raxNext(&ri); - } else { - raxSeek(&ri, "$", NULL, 0); - empty = !raxPrev(&ri); + streamIterator si; + int64_t numfields; + streamIteratorStart(&si,s,NULL,NULL,!first); + si.skip_tombstones = skip_tombstones; + int found = streamIteratorGetID(&si,edge_id,&numfields); + if (!found) { + streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX}; + *edge_id = first ? max_id : min_id; } - if (empty) { - /* Stream is empty, mark edge ID as lowest/highest possible. */ - edge_id->ms = first ? UINT64_MAX : 0; - edge_id->seq = first ? UINT64_MAX : 0; - raxStop(&ri); - return; - } - - unsigned char *lp = ri.data; - - /* Read the master ID from the radix tree key. */ - streamID master_id; - streamDecodeID(ri.key, &master_id); - - /* Construct edge ID. */ - lpGetEdgeStreamID(lp, first, &master_id, edge_id); - - raxStop(&ri); } /* Adds a new item into the stream 's' having the specified number of @@ -664,7 +657,9 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ if (ri.data != lp) raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); s->length++; + s->entries_added++; s->last_id = id; + if (s->length == 1) s->first_id = id; if (added_id) *added_id = id; return C_OK; } @@ -842,7 +837,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { } deleted += deleted_from_lp; - /* Now we the entries/deleted counters. */ + /* Now we update the entries/deleted counters. */ p = lpFirst(lp); lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); p = lpNext(lp,p); /* Skip deleted field. */ @@ -864,8 +859,16 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { break; /* If we are here, there was enough to delete in the current node, so no need to go to the next node. */ } - raxStop(&ri); + + /* Update the stream's first ID after the trimming. */ + if (s->length == 0) { + s->first_id.ms = 0; + s->first_id.seq = 0; + } else if (deleted) { + streamGetEdgeID(s,1,1,&s->first_id); + } + return deleted; } @@ -1089,9 +1092,10 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI } } si->stream = s; - si->lp = NULL; /* There is no current listpack right now. */ + si->lp = NULL; /* There is no current listpack right now. */ si->lp_ele = NULL; /* Current listpack cursor. */ - si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ + si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ + si->skip_tombstones = 1; /* By default tombstones aren't emitted. */ } /* Return 1 and store the current item ID at 'id' if there are still @@ -1189,10 +1193,10 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { serverAssert(*numfields>=0); /* If current >= start, and the entry is not marked as - * deleted, emit it. */ + * deleted or tombstones are included, emit it. */ if (!si->rev) { if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && - !(flags & STREAM_ITEM_FLAG_DELETED)) + (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) { if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) return 0; /* We are already out of range. */ @@ -1203,7 +1207,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { } } else { if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && - !(flags & STREAM_ITEM_FLAG_DELETED)) + (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) { if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) return 0; /* We are already out of range. */ @@ -1270,7 +1274,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { int64_t aux; /* We do not really delete the entry here. Instead we mark it as - * deleted flagging it, and also incrementing the count of the + * deleted by flagging it, and also incrementing the count of the * deleted entries in the listpack header. * * We start flagging: */ @@ -1314,7 +1318,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { streamIteratorStop(si); streamIteratorStart(si,si->stream,&start,&end,si->rev); - /* TODO: perform a garbage collection here if the ration between + /* TODO: perform a garbage collection here if the ratio between * deleted and valid goes over a certain limit. */ } @@ -1386,6 +1390,148 @@ robj *createObjectFromStreamID(streamID *id) { id->ms,id->seq)); } +/* Returns non-zero if the ID is 0-0. */ +int streamIDEqZero(streamID *id) { + return !(id->ms || id->seq); +} + +/* A helper that returns non-zero if the range from 'start' to `end` + * contains a tombstone. + * + * NOTE: this assumes that the caller had verified that 'start' is less than + * 's->last_id'. */ +int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) { + streamID start_id, end_id; + + if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) { + /* The stream is empty or has no tombstones. */ + return 0; + } + + if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) { + /* The latest tombstone is before the first entry. */ + return 0; + } + + if (start) { + start_id = *start; + } else { + start_id.ms = 0; + start_id.seq = 0; + } + + if (end) { + end_id = *end; + } else { + end_id.ms = UINT64_MAX; + end_id.seq = UINT64_MAX; + } + + if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 && + streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0) + { + /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */ + return 1; + } + + /* The range doesn't includes a tombstone. */ + return 0; +} + +/* Replies with a consumer group's current lag, that is the number of messages + * in the stream that are yet to be delivered. In case that the lag isn't + * available due to fragmentation, the reply to the client is a null. */ +void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) { + int valid = 0; + long long lag = 0; + + if (!s->entries_added) { + /* The lag of a newly-initialized stream is 0. */ + lag = 0; + valid = 1; + } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) { + /* No fragmentation ahead means that the group's logical reads counter + * is valid for performing the lag calculation. */ + lag = (long long)s->entries_added - cg->entries_read; + valid = 1; + } else { + /* Attempt to retrieve the group's last ID logical read counter. */ + long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id); + if (entries_read != SCG_INVALID_ENTRIES_READ) { + /* A valid counter was obtained. */ + lag = (long long)s->entries_added - entries_read; + valid = 1; + } + } + + if (valid) { + addReplyLongLong(c,lag); + } else { + addReplyNull(c); + } +} + +/* This function returns a value that is the ID's logical read counter, or its + * distance (the number of entries) from the first entry ever to have been added + * to the stream. + * + * A counter is returned only in one of the following cases: + * 1. The ID is the same as the stream's last ID. In this case, the returned + * is the same as the stream's entries_added counter. + * 2. The ID equals that of the currently first entry in the stream, and the + * stream has no tombstones. The returned value, in this case, is the result + * of subtracting the stream's length from its added_entries, incremented by + * one. + * 3. The ID less than the stream's first current entry's ID, and there are no + * tombstones. Here the estimated counter is the result of subtracting the + * stream's length from its added_entries. + * 4. The stream's added_entries is zero, meaning that no entries were ever + * added. + * + * The special return value of ULLONG_MAX signals that the counter's value isn't + * obtainable. It is returned in these cases: + * 1. The provided ID, if it even exists, is somewhere between the stream's + * current first and last entries' IDs, or in the future. + * 2. The stream contains one or more tombstones. */ +long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) { + /* The counter of any ID in an empty, never-before-used stream is 0. */ + if (!s->entries_added) { + return 0; + } + + /* In the empty stream, if the ID is smaller or equal to the last ID, + * it can set to the current added_entries value. */ + if (!s->length && streamCompareID(id,&s->last_id) < 1) { + return s->entries_added; + } + + int cmp_last = streamCompareID(id,&s->last_id); + if (cmp_last == 0) { + /* Return the exact counter of the last entry in the stream. */ + return s->entries_added; + } else if (cmp_last > 0) { + /* The counter of a future ID is unknown. */ + return SCG_INVALID_ENTRIES_READ; + } + + int cmp_id_first = streamCompareID(id,&s->first_id); + int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id); + if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) { + /* There's definitely no fragmentation ahead. */ + if (cmp_id_first < 0) { + /* Return the estimated counter. */ + return s->entries_added - s->length; + } else if (cmp_id_first == 0) { + /* Return the exact counter of the first entry in the stream. */ + return s->entries_added - s->length + 1; + } + } + + /* The ID is either before an XDEL that fragments the stream or an arbitrary + * ID. Either case, so we can't make a prediction. */ + return SCG_INVALID_ENTRIES_READ; +} + /* As a result of an explicit XCLAIM or XREADGROUP command, new entries * are created in the pending list of the stream and consumers. We need * to propagate this changes in the form of XCLAIM commands. */ @@ -1425,19 +1571,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam * that was consumed by XREADGROUP with the NOACK option: in that case we can't * propagate the last ID just using the XCLAIM LASTID option, so we emit * - * XGROUP SETID + * XGROUP SETID ENTRIESREAD */ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { - robj *argv[5]; + robj *argv[7]; argv[0] = shared.xgroup; argv[1] = shared.setid; argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); + argv[5] = shared.entriesread; + argv[6] = createStringObjectFromLongLong(group->entries_read); - alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[4]); + decrRefCount(argv[6]); } /* We need this when we want to propagate creation of consumer that was created @@ -1476,6 +1625,10 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds * function will not return it to the client. * 3. An entry in the pending list will be created for every entry delivered * for the first time to this consumer. + * 4. The group's read counter is incremented if it is already valid and there + * are no future tombstones, or is invalidated (set to 0) otherwise. If the + * counter is invalid to begin with, we try to obtain it for the last + * delivered ID. * * The behavior may be modified passing non-zero flags: * @@ -1532,6 +1685,15 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end while(streamIteratorGetID(&si,&id,&numfields)) { /* Update the group last_id if needed. */ if (group && streamCompareID(&id,&group->last_id) > 0) { + if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) { + /* A valid counter and no future tombstones mean we can + * increment the read counter to keep tracking the group's + * progress. */ + group->entries_read++; + } else if (s->entries_added) { + /* The group's counter may be invalid, so we try to obtain it. */ + group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id); + } group->last_id = id; /* Group last ID should be propagated only if NOACK was * specified, otherwise the last id will be included @@ -1805,7 +1967,7 @@ void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) arg = createStringObjectFromLongLong(s->length); } else { streamID first_id; - streamGetEdgeID(s, 1, &first_id); + streamGetEdgeID(s,1,0,&first_id); arg = createObjectFromStreamID(&first_id); } @@ -2298,10 +2460,10 @@ void streamFreeConsumer(streamConsumer *sc) { } /* Create a new consumer group in the context of the stream 's', having the - * specified name and last server ID. If a consumer group with the same name - * already existed NULL is returned, otherwise the pointer to the consumer - * group is returned. */ -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { + * specified name, last server ID and reads counter. If a consumer group with + * the same name already exists NULL is returned, otherwise the pointer to the + * consumer group is returned. */ +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { if (s->cgroups == NULL) s->cgroups = raxNew(); if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) return NULL; @@ -2310,6 +2472,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { cg->pel = raxNew(); cg->consumers = raxNew(); cg->last_id = *id; + cg->entries_read = entries_read; raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); return cg; } @@ -2389,8 +2552,8 @@ void streamDelConsumer(streamCG *cg, streamConsumer *consumer) { * Consumer groups commands * ----------------------------------------------------------------------- */ -/* XGROUP CREATE [MKSTREAM] - * XGROUP SETID +/* XGROUP CREATE [MKSTREAM] [ENTRIESADDED count] + * XGROUP SETID [ENTRIESADDED count] * XGROUP DESTROY * XGROUP CREATECONSUMER * XGROUP DELCONSUMER */ @@ -2400,21 +2563,33 @@ void xgroupCommand(client *c) { streamCG *cg = NULL; char *opt = c->argv[1]->ptr; /* Subcommand name. */ int mkstream = 0; + long long entries_read = SCG_INVALID_ENTRIES_READ; robj *o; - /* CREATE has an MKSTREAM option that creates the stream if it - * does not exist. */ - if (c->argc == 6 && !strcasecmp(opt,"CREATE")) { - if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) { - addReplySubcommandSyntaxError(c); - return; - } - mkstream = 1; - grpname = c->argv[3]->ptr; - } - /* Everything but the "HELP" option requires a key and group name. */ if (c->argc >= 4) { + /* Parse optional arguments for CREATE and SETID */ + int i = 5; + int create_subcmd = !strcasecmp(opt,"CREATE"); + int setid_subcmd = !strcasecmp(opt,"SETID"); + while (i < c->argc) { + if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) { + mkstream = 1; + i++; + } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) { + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK) + return; + if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyError(c,"value for ENTRIESREAD must be positive or -1"); + return; + } + i += 2; + } else { + addReplySubcommandSyntaxError(c); + return; + } + } + o = lookupKeyWrite(c->db,c->argv[2]); if (o) { if (checkType(c,o,OBJ_STREAM)) return; @@ -2454,18 +2629,20 @@ void xgroupCommand(client *c) { " Create a new consumer group. Options are:", " * MKSTREAM", " Create the empty stream if it does not exist.", +" * ENTRIESREAD entries_read", +" Set the group's entries_read counter (internal use)." "CREATECONSUMER ", " Create a new consumer in the specified group.", "DELCONSUMER ", " Remove the specified consumer.", "DESTROY ", " Remove the specified group.", -"SETID ", -" Set the current group ID.", +"SETID [ENTRIESREAD entries_read]", +" Set the current group ID and entries_read counter.", NULL }; addReplyHelp(c, help); - } else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) { + } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) { streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { if (s) { @@ -2487,7 +2664,7 @@ NULL signalModifiedKey(c,c->db,c->argv[2]); } - streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); + streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read); if (cg) { addReply(c,shared.ok); server.dirty++; @@ -2496,7 +2673,7 @@ NULL } else { addReplyError(c,"-BUSYGROUP Consumer Group name already exists"); } - } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { + } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) { streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { id = s->last_id; @@ -2504,6 +2681,7 @@ NULL return; } cg->last_id = id; + cg->entries_read = entries_read; addReply(c,shared.ok); server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id); @@ -2542,16 +2720,46 @@ NULL } } -/* XSETID +/* XSETID [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id] * - * Set the internal "last ID" of a stream. */ + * Set the internal "last ID", "added entries" and "maximal deleted entry ID" + * of a stream. */ void xsetidCommand(client *c) { + streamID id, max_xdel_id = {0, 0}; + long long entries_added = -1; + + if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) + return; + + int i = 3; + while (i < c->argc) { + int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ + char *opt = c->argv[i]->ptr; + if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) { + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) { + return; + } else if (entries_added < 0) { + addReplyError(c,"entries_added must be positive"); + return; + } + i += 2; + } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) { + if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) { + return; + } else if (streamCompareID(&id,&max_xdel_id) < 0) { + addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id"); + return; + } + i += 2; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_STREAM)) return; - stream *s = o->ptr; - streamID id; - if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return; /* If the stream has at least one item, we want to check that the user * is setting a last ID that is equal or greater than the current top @@ -2561,12 +2769,22 @@ void xsetidCommand(client *c) { streamLastValidID(s,&maxid); if (streamCompareID(&id,&maxid) < 0) { - addReplyError(c,"The ID specified in XSETID is smaller than the " - "target stream top item"); + addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item"); + return; + } + + /* If an entries_added was provided, it can't be lower than the length. */ + if (entries_added != -1 && s->length > (uint64_t)entries_added) { + addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length"); return; } } + s->last_id = id; + if (entries_added != -1) + s->entries_added = entries_added; + if (!streamIDEqZero(&max_xdel_id)) + s->max_deleted_entry_id = max_xdel_id; addReply(c,shared.ok); server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id); @@ -3289,8 +3507,31 @@ void xdelCommand(client *c) { /* Actually apply the command. */ int deleted = 0; + int first_entry = 0; for (int j = 2; j < c->argc; j++) { - deleted += streamDeleteItem(s,&ids[j-2]); + streamID *id = &ids[j-2]; + if (streamDeleteItem(s,id)) { + /* We want to know if the first entry in the stream was deleted + * so we can later set the new one. */ + if (streamCompareID(id,&s->first_id) == 0) { + first_entry = 1; + } + /* Update the stream's maximal tombstone if needed. */ + if (streamCompareID(id,&s->max_deleted_entry_id) > 0) { + s->max_deleted_entry_id = *id; + } + deleted++; + }; + } + + /* Update the stream's first ID. */ + if (deleted) { + if (s->length == 0) { + s->first_id.ms = 0; + s->first_id.seq = 0; + } else if (first_entry) { + streamGetEdgeID(s,1,1,&s->first_id); + } } /* Propagate the write if needed. */ @@ -3398,7 +3639,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { } } - addReplyMapLen(c,full ? 6 : 7); + addReplyMapLen(c,full ? 9 : 10); addReplyBulkCString(c,"length"); addReplyLongLong(c,s->length); addReplyBulkCString(c,"radix-tree-keys"); @@ -3407,6 +3648,12 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { addReplyLongLong(c,s->rax->numnodes); addReplyBulkCString(c,"last-generated-id"); addReplyStreamID(c,&s->last_id); + addReplyBulkCString(c,"max-deleted-entry-id"); + addReplyStreamID(c,&s->max_deleted_entry_id); + addReplyBulkCString(c,"entries-added"); + addReplyLongLong(c,s->entries_added); + addReplyBulkCString(c,"recorded-first-entry-id"); + addReplyStreamID(c,&s->first_id); if (!full) { /* XINFO STREAM */ @@ -3445,7 +3692,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { raxSeek(&ri_cgroups,"^",NULL,0); while(raxNext(&ri_cgroups)) { streamCG *cg = ri_cgroups.data; - addReplyMapLen(c,5); + addReplyMapLen(c,7); /* Name */ addReplyBulkCString(c,"name"); @@ -3455,6 +3702,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { addReplyBulkCString(c,"last-delivered-id"); addReplyStreamID(c,&cg->last_id); + /* Read counter of the last delivered ID */ + addReplyBulkCString(c,"entries-read"); + if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyLongLong(c,cg->entries_read); + } else { + addReplyNull(c); + } + + /* Group lag */ + addReplyBulkCString(c,"lag"); + streamReplyWithCGLag(c,s,cg); + /* Group PEL count */ addReplyBulkCString(c,"pel-count"); addReplyLongLong(c,raxSize(cg->pel)); @@ -3632,7 +3891,7 @@ NULL raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { streamCG *cg = ri.data; - addReplyMapLen(c,4); + addReplyMapLen(c,6); addReplyBulkCString(c,"name"); addReplyBulkCBuffer(c,ri.key,ri.key_len); addReplyBulkCString(c,"consumers"); @@ -3641,6 +3900,14 @@ NULL addReplyLongLong(c,raxSize(cg->pel)); addReplyBulkCString(c,"last-delivered-id"); addReplyStreamID(c,&cg->last_id); + addReplyBulkCString(c,"entries-read"); + if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyLongLong(c,cg->entries_read); + } else { + addReplyNull(c); + } + addReplyBulkCString(c,"lag"); + streamReplyWithCGLag(c,s,cg); } raxStop(&ri); } else if (!strcasecmp(opt,"STREAM")) { diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl index 86c7dd246..d2491306a 100644 --- a/tests/integration/corrupt-dump.tcl +++ b/tests/integration/corrupt-dump.tcl @@ -193,9 +193,8 @@ test {corrupt payload: listpack invalid size header} { test {corrupt payload: listpack too long entry len} { start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { r config set sanitize-dump-payload no - r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x89\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x88\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x40\x63\xC9\x37\x03\xA2\xE5\x68" catch { - r xinfo stream key full + r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x89\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x88\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x40\x63\xC9\x37\x03\xA2\xE5\x68" } err assert_equal [count_log_message 0 "crashed by signal"] 0 assert_equal [count_log_message 0 "ASSERTION FAILED"] 1 @@ -205,9 +204,9 @@ test {corrupt payload: listpack too long entry len} { test {corrupt payload: listpack very long entry len} { start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { r config set sanitize-dump-payload no - r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x9C\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x63\x6F\x42\x8E\x7C\xB5\xA2\x9D" catch { - r xinfo stream key full + # This will catch migrated payloads from v6.2.x + r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x9C\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x63\x6F\x42\x8E\x7C\xB5\xA2\x9D" } err assert_equal [count_log_message 0 "crashed by signal"] 0 assert_equal [count_log_message 0 "ASSERTION FAILED"] 1 diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 9dadc1dc5..27cbc686e 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -623,22 +623,30 @@ start_server { r XDEL x 103 set reply [r XINFO STREAM x FULL] - assert_equal [llength $reply] 12 - assert_equal [lindex $reply 1] 4 ;# stream length - assert_equal [lindex $reply 9] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" ;# entries - assert_equal [lindex $reply 11 0 1] "g1" ;# first group name - assert_equal [lindex $reply 11 0 7 0 0] "100-0" ;# first entry in group's PEL - assert_equal [lindex $reply 11 0 9 0 1] "Alice" ;# first consumer - assert_equal [lindex $reply 11 0 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL - assert_equal [lindex $reply 11 1 1] "g2" ;# second group name - assert_equal [lindex $reply 11 1 9 0 1] "Charlie" ;# first consumer - assert_equal [lindex $reply 11 1 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL - assert_equal [lindex $reply 11 1 9 0 7 1 0] "101-0" ;# second entry in first consumer's PEL + assert_equal [llength $reply] 18 + assert_equal [dict get $reply length] 4 + assert_equal [dict get $reply entries] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" + + # First consumer group + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group name] "g1" + assert_equal [lindex [dict get $group pending] 0 0] "100-0" + set consumer [lindex [dict get $group consumers] 0] + assert_equal [dict get $consumer name] "Alice" + assert_equal [lindex [dict get $consumer pending] 0 0] "100-0" ;# first entry in first consumer's PEL + + # Second consumer group + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group name] "g2" + set consumer [lindex [dict get $group consumers] 0] + assert_equal [dict get $consumer name] "Charlie" + assert_equal [lindex [dict get $consumer pending] 0 0] "100-0" ;# first entry in first consumer's PEL + assert_equal [lindex [dict get $consumer pending] 1 0] "101-0" ;# second entry in first consumer's PEL set reply [r XINFO STREAM x FULL COUNT 1] - assert_equal [llength $reply] 12 - assert_equal [lindex $reply 1] 4 - assert_equal [lindex $reply 9] "{100-0 {a 1}}" + assert_equal [llength $reply] 18 + assert_equal [dict get $reply length] 4 + assert_equal [dict get $reply entries] "{100-0 {a 1}}" } test {XGROUP CREATECONSUMER: create consumer if does not exist} { @@ -702,7 +710,7 @@ start_server { set grpinfo [r xinfo groups mystream] r debug loadaof - assert {[r xinfo groups mystream] == $grpinfo} + assert_equal [r xinfo groups mystream] $grpinfo set reply [r xinfo consumers mystream mygroup] set consumer_info [lindex $reply 0] assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name @@ -741,6 +749,154 @@ start_server { } } + test {Consumer group read counter and lag in empty streams} { + r DEL x + r XGROUP CREATE x g1 0 MKSTREAM + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $reply max-deleted-entry-id] "0-0" + assert_equal [dict get $reply entries-added] 0 + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 0 + + r XADD x 1-0 data a + r XDEL x 1-0 + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $reply max-deleted-entry-id] "1-0" + assert_equal [dict get $reply entries-added] 1 + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 0 + } + + test {Consumer group read counter and lag sanity} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XADD x 5-0 data e + r XGROUP CREATE x g1 0 + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 5 + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 1 + assert_equal [dict get $group lag] 4 + + r XREADGROUP GROUP g1 c12 COUNT 10 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 0 + + r XADD x 6-0 data f + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + } + + test {Consumer group lag with XDELs} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XADD x 5-0 data e + r XDEL x 3-0 + r XGROUP CREATE x g1 0 + r XGROUP CREATE x g2 0 + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 0 + + r XADD x 6-0 data f + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + + r XTRIM x MINID = 3-0 + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 3 + + r XTRIM x MINID = 5-0 + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 2 + } + + test {Loading from legacy (Redis <= v6.2.x, rdb_ver < 10) persistence} { + # The payload was DUMPed from a v5 instance after: + # XADD x 1-0 data a + # XADD x 2-0 data b + # XADD x 3-0 data c + # XADD x 4-0 data d + # XADD x 5-0 data e + # XADD x 6-0 data f + # XDEL x 3-0 + # XGROUP CREATE x g1 0 + # XGROUP CREATE x g2 0 + # XREADGROUP GROUP g1 c11 COUNT 4 STREAMS x > + # XTRIM x MAXLEN = 2 + + r DEL x + r RESTORE x 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\xC3\x40\x4A\x40\x57\x16\x57\x00\x00\x00\x23\x00\x02\x01\x04\x01\x01\x01\x84\x64\x61\x74\x61\x05\x00\x01\x03\x01\x00\x20\x01\x03\x81\x61\x02\x04\x20\x0A\x00\x01\x40\x0A\x00\x62\x60\x0A\x00\x02\x40\x0A\x00\x63\x60\x0A\x40\x22\x01\x81\x64\x20\x0A\x40\x39\x20\x0A\x00\x65\x60\x0A\x00\x05\x40\x0A\x00\x66\x20\x0A\x00\xFF\x02\x06\x00\x02\x02\x67\x31\x05\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x01\x03\x63\x31\x31\x3E\xF7\x83\x43\x7A\x01\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x02\x67\x32\x00\x00\x00\x00\x09\x00\x3D\x52\xEF\x68\x67\x52\x1D\xFA" + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "0-0" + assert_equal [dict get $reply entries-added] 2 + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 1 + assert_equal [dict get $group lag] 1 + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group entries-read] 0 + assert_equal [dict get $group lag] 2 + } + start_server {tags {"external:skip"}} { set master [srv -1 client] set master_host [srv -1 host] @@ -841,7 +997,7 @@ start_server { waitForBgrewriteaof r r debug loadaof assert {[dict get [r xinfo stream mystream] length] == 0} - assert {[r xinfo groups mystream] == $grpinfo} + assert_equal [r xinfo groups mystream] $grpinfo } } } diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 7ba3ed116..bd689cd29 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -760,7 +760,9 @@ start_server {tags {"stream xsetid"}} { test {XSETID can set a specific ID} { r XSETID mystream "200-0" - assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"} + set reply [r XINFO stream mystream] + assert_equal [dict get $reply last-generated-id] "200-0" + assert_equal [dict get $reply entries-added] 1 } test {XSETID cannot SETID with smaller ID} { @@ -774,6 +776,98 @@ start_server {tags {"stream xsetid"}} { catch {r XSETID stream 1-1} err set _ $err } {ERR no such key} + + test {XSETID cannot run with an offset but without a maximal tombstone} { + catch {r XSETID stream 1-1 0} err + set _ $err + } {ERR syntax error} + + test {XSETID cannot run with a maximal tombstone but without an offset} { + catch {r XSETID stream 1-1 0-0} err + set _ $err + } {ERR syntax error} + + test {XSETID errors on negstive offset} { + catch {r XSETID stream 1-1 ENTRIESADDED -1 MAXDELETEDID 0-0} err + set _ $err + } {ERR*must be positive} + + test {XSETID cannot set the maximal tombstone with larger ID} { + r DEL x + r XADD x 1-0 a b + + catch {r XSETID x "1-0" ENTRIESADDED 1 MAXDELETEDID "2-0" } err + r XADD mystream MAXLEN 0 * a b + set err + } {ERR*smaller*} + + test {XSETID cannot set the offset to less than the length} { + r DEL x + r XADD x 1-0 a b + + catch {r XSETID x "1-0" ENTRIESADDED 0 MAXDELETEDID "0-0" } err + r XADD mystream MAXLEN 0 * a b + set err + } {ERR*smaller*} +} + +start_server {tags {"stream offset"}} { + test {XADD advances the entries-added counter and sets the recorded-first-entry-id} { + r DEL x + r XADD x 1-0 data a + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply entries-added] 1 + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + + r XADD x 2-0 data a + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply entries-added] 2 + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + } + + test {XDEL/TRIM are reflected by recorded first entry} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data a + r XADD x 3-0 data a + r XADD x 4-0 data a + r XADD x 5-0 data a + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply entries-added] 5 + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + + r XDEL x 2-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + + r XDEL x 1-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply recorded-first-entry-id] "3-0" + + r XTRIM x MAXLEN = 2 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply recorded-first-entry-id] "4-0" + } + + test {Maxmimum XDEL ID behaves correctly} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "0-0" + + r XDEL x 2-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "2-0" + + r XDEL x 1-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "2-0" + } } start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} { @@ -796,7 +890,7 @@ start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb- waitForBgrewriteaof r r debug loadaof assert {[dict get [r xinfo stream mystream] length] == 1} - assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"} + assert_equal [dict get [r xinfo stream mystream] last-generated-id] "2-2" } }