Merge branch 'unstable' of github.com:/antirez/redis into unstable

This commit is contained in:
antirez 2020-02-21 13:48:52 +01:00
commit 8a14fff545
11 changed files with 391 additions and 53 deletions

View File

@ -363,6 +363,7 @@ void debugCommand(client *c) {
"LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.",
"LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.",
"OBJECT <key> -- Show low level info about key and associated value.",
"OOM -- Crash the server simulating an out-of-memory error.",
"PANIC -- Crash the server simulating a panic.",
"POPULATE <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.",
"RELOAD -- Save the RDB on disk and reload it back in memory.",

View File

@ -5,8 +5,8 @@
* We do that by scanning the keyspace and for each pointer we have, we can try to
* ask the allocator if moving it to a new address will help reduce fragmentation.
*
* Copyright (c) 2017, Oran Agra
* Copyright (c) 2017, Redis Labs, Inc
* Copyright (c) 2020, Oran Agra
* Copyright (c) 2020, Redis Labs, Inc
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -408,25 +408,32 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL;
}
long activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head, *newnode;
long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
long defragged = 0;
unsigned char *newzl;
if ((newnode = activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
*node_ref = node = newnode;
defragged++;
}
if ((newzl = activeDefragAlloc(node->zl)))
defragged++, node->zl = newzl;
return defragged;
}
long activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head;
long defragged = 0;
while (node) {
if ((newnode = activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
node = newnode;
defragged++;
}
if ((newzl = activeDefragAlloc(node->zl)))
defragged++, node->zl = newzl;
defragged += activeDefragQuickListNode(ql, &node);
node = node->next;
}
return defragged;
@ -440,12 +447,48 @@ void defragLater(redisDb *db, dictEntry *kde) {
listAddNodeTail(db->defrag_later, key);
}
long scanLaterList(robj *ob) {
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
quicklist *ql = ob->ptr;
quicklistNode *node;
long iterations = 0;
int bookmark_failed = 0;
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
return 0;
server.stat_active_defrag_scanned+=ql->len;
return activeDefragQuickListNodes(ql);
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
node = ql->head;
} else {
node = quicklistBookmarkFind(ql, "_AD");
if (!node) {
/* if the bookmark was deleted, it means we reached the end. */
*cursor = 0;
return 0;
}
node = node->next;
}
(*cursor)++;
while (node) {
(*defragged) += activeDefragQuickListNode(ql, &node);
server.stat_active_defrag_scanned++;
if (++iterations > 128 && !bookmark_failed) {
if (ustime() > endtime) {
if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
bookmark_failed = 1;
} else {
ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */
return 1;
}
}
iterations = 0;
}
node = node->next;
}
quicklistBookmarkDelete(ql, "_AD");
*cursor = 0;
return bookmark_failed? 1: 0;
}
typedef struct {
@ -638,7 +681,8 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime,
void *newdata = activeDefragAlloc(ri.data);
if (newdata)
raxSetData(ri.node, ri.data=newdata), (*defragged)++;
if (++iterations > 16) {
server.stat_active_defrag_scanned++;
if (++iterations > 128) {
if (ustime() > endtime) {
serverAssert(ri.key_len==sizeof(last));
memcpy(last,ri.key,ri.key_len);
@ -900,8 +944,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
server.stat_active_defrag_hits += scanLaterList(ob);
*cursor = 0; /* list has no scan, we must finish it in one go */
return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits);
} else if (ob->type == OBJ_SET) {
server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
@ -961,11 +1004,6 @@ int defragLaterStep(redisDb *db, long long endtime) {
if (defragLaterItem(de, &defrag_later_cursor, endtime))
quit = 1; /* time is up, we didn't finish all the work */
/* Don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
if (!defrag_later_cursor)
quit = 1;
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
* (if we have a lot of pointers in one hash bucket, or rehashing),
* check if we reached the time limit. */

View File

@ -2044,7 +2044,7 @@ void clientCommand(client *c) {
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
"SETNAME <name> -- Assign the name <name> to the current connection.",
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
"TRACKING (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.",
"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] ... -- Enable client keys tracking for client side caching.",
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
NULL
};
@ -2233,18 +2233,30 @@ NULL
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
j++;
if (redir != 0) {
addReplyError(c,"A client can only redirect to a single "
"other client");
zfree(prefix);
return;
}
if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
C_OK) return;
C_OK)
{
zfree(prefix);
return;
}
/* We will require the client with the specified ID to exist
* right now, even if it is possible that it gets disconnected
* later. Still a valid sanity check. */
if (lookupClientByID(redir) == NULL) {
addReplyError(c,"The client ID you want redirect to "
"does not exist");
zfree(prefix);
return;
}
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
bcast++;
bcast = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
j++;
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));

View File

@ -1102,13 +1102,13 @@ sds getMemoryDoctorReport(void) {
num_reports++;
}
/* Allocator fss is higher than 1.1 and 10MB ? */
/* Allocator rss is higher than 1.1 and 10MB ? */
if (mh->allocator_rss > 1.1 && mh->allocator_rss_bytes > 10<<20) {
high_alloc_rss = 1;
num_reports++;
}
/* Non-Allocator fss is higher than 1.1 and 10MB ? */
/* Non-Allocator rss is higher than 1.1 and 10MB ? */
if (mh->rss_extra > 1.1 && mh->rss_extra_bytes > 10<<20) {
high_proc_rss = 1;
num_reports++;

View File

@ -70,6 +70,12 @@ static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
} while (0);
#endif
/* Bookmarks forward declarations */
#define QL_MAX_BM ((1 << QL_BM_BITS)-1)
quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name);
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node);
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);
/* Simple way to give quicklistEntry structs default values with one call. */
#define initEntry(e) \
do { \
@ -100,10 +106,11 @@ quicklist *quicklistCreate(void) {
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}
#define COMPRESS_MAX (1 << 16)
#define COMPRESS_MAX (1 << QL_COMP_BITS)
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
if (compress > COMPRESS_MAX) {
compress = COMPRESS_MAX;
@ -113,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
quicklist->compress = compress;
}
#define FILL_MAX (1 << 15)
#define FILL_MAX (1 << (QL_FILL_BITS-1))
void quicklistSetFill(quicklist *quicklist, int fill) {
if (fill > FILL_MAX) {
fill = FILL_MAX;
@ -169,6 +176,7 @@ void quicklistRelease(quicklist *quicklist) {
quicklist->len--;
current = next;
}
quicklistBookmarksClear(quicklist);
zfree(quicklist);
}
@ -578,6 +586,15 @@ quicklist *quicklistCreateFromZiplist(int fill, int compress,
REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
quicklistNode *node) {
/* Update the bookmark if any */
quicklistBookmark *bm = _quicklistBookmarkFindByNode(quicklist, node);
if (bm) {
bm->node = node->next;
/* if the bookmark was to the last node, delete it. */
if (!bm->node)
_quicklistBookmarkDelete(quicklist, bm);
}
if (node->next)
node->next->prev = node->prev;
if (node->prev)
@ -1410,6 +1427,87 @@ void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
}
}
/* Create or update a bookmark in the list which will be updated to the next node
* automatically when the one referenced gets deleted.
* Returns 1 on success (creation of new bookmark or override of an existing one).
* Returns 0 on failure (reached the maximum supported number of bookmarks).
* NOTE: use short simple names, so that string compare on find is quick.
* NOTE: bookmakrk creation may re-allocate the quicklist, so the input pointer
may change and it's the caller responsibilty to update the reference.
*/
int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node) {
quicklist *ql = *ql_ref;
if (ql->bookmark_count >= QL_MAX_BM)
return 0;
quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
if (bm) {
bm->node = node;
return 1;
}
ql = zrealloc(ql, sizeof(quicklist) + (ql->bookmark_count+1) * sizeof(quicklistBookmark));
*ql_ref = ql;
ql->bookmarks[ql->bookmark_count].node = node;
ql->bookmarks[ql->bookmark_count].name = zstrdup(name);
ql->bookmark_count++;
return 1;
}
/* Find the quicklist node referenced by a named bookmark.
* When the bookmarked node is deleted the bookmark is updated to the next node,
* and if that's the last node, the bookmark is deleted (so find returns NULL). */
quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name) {
quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
if (!bm) return NULL;
return bm->node;
}
/* Delete a named bookmark.
* returns 0 if bookmark was not found, and 1 if deleted.
* Note that the bookmark memory is not freed yet, and is kept for future use. */
int quicklistBookmarkDelete(quicklist *ql, const char *name) {
quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
if (!bm)
return 0;
_quicklistBookmarkDelete(ql, bm);
return 1;
}
quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) {
unsigned i;
for (i=0; i<ql->bookmark_count; i++) {
if (!strcmp(ql->bookmarks[i].name, name)) {
return &ql->bookmarks[i];
}
}
return NULL;
}
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node) {
unsigned i;
for (i=0; i<ql->bookmark_count; i++) {
if (ql->bookmarks[i].node == node) {
return &ql->bookmarks[i];
}
}
return NULL;
}
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm) {
int index = bm - ql->bookmarks;
zfree(bm->name);
ql->bookmark_count--;
memmove(bm, bm+1, (ql->bookmark_count - index)* sizeof(*bm));
/* NOTE: We do not shrink (realloc) the quicklist yet (to avoid resonance,
* it may be re-used later (a call to realloc may NOP). */
}
void quicklistBookmarksClear(quicklist *ql) {
while (ql->bookmark_count)
zfree(ql->bookmarks[--ql->bookmark_count].name);
/* NOTE: We do not shrink (realloc) the quick list. main use case for this
* function is just before releasing the allocation. */
}
/* The rest of this file is test cases and test helpers. */
#ifdef REDIS_TEST
#include <stdint.h>
@ -2641,6 +2739,54 @@ int quicklistTest(int argc, char *argv[]) {
printf("Compressions: %0.2f seconds.\n", (float)(stop - start) / 1000);
printf("\n");
TEST("bookmark get updated to next item") {
quicklist *ql = quicklistNew(1, 0);
quicklistPushTail(ql, "1", 1);
quicklistPushTail(ql, "2", 1);
quicklistPushTail(ql, "3", 1);
quicklistPushTail(ql, "4", 1);
quicklistPushTail(ql, "5", 1);
assert(ql->len==5);
/* add two bookmarks, one pointing to the node before the last. */
assert(quicklistBookmarkCreate(&ql, "_dummy", ql->head->next));
assert(quicklistBookmarkCreate(&ql, "_test", ql->tail->prev));
/* test that the bookmark returns the right node, delete it and see that the bookmark points to the last node */
assert(quicklistBookmarkFind(ql, "_test") == ql->tail->prev);
assert(quicklistDelRange(ql, -2, 1));
assert(quicklistBookmarkFind(ql, "_test") == ql->tail);
/* delete the last node, and see that the bookmark was deleted. */
assert(quicklistDelRange(ql, -1, 1));
assert(quicklistBookmarkFind(ql, "_test") == NULL);
/* test that other bookmarks aren't affected */
assert(quicklistBookmarkFind(ql, "_dummy") == ql->head->next);
assert(quicklistBookmarkFind(ql, "_missing") == NULL);
assert(ql->len==3);
quicklistBookmarksClear(ql); /* for coverage */
assert(quicklistBookmarkFind(ql, "_dummy") == NULL);
quicklistRelease(ql);
}
TEST("bookmark limit") {
int i;
quicklist *ql = quicklistNew(1, 0);
quicklistPushHead(ql, "1", 1);
for (i=0; i<QL_MAX_BM; i++)
assert(quicklistBookmarkCreate(&ql, genstr("",i), ql->head));
/* when all bookmarks are used, creation fails */
assert(!quicklistBookmarkCreate(&ql, "_test", ql->head));
/* delete one and see that we can now create another */
assert(quicklistBookmarkDelete(ql, "0"));
assert(quicklistBookmarkCreate(&ql, "_test", ql->head));
/* delete one and see that the rest survive */
assert(quicklistBookmarkDelete(ql, "_test"));
for (i=1; i<QL_MAX_BM; i++)
assert(quicklistBookmarkFind(ql, genstr("",i)) == ql->head);
/* make sure the deleted ones are indeed gone */
assert(!quicklistBookmarkFind(ql, "0"));
assert(!quicklistBookmarkFind(ql, "_test"));
quicklistRelease(ql);
}
if (!err)
printf("ALL TESTS PASSED!\n");
else

View File

@ -28,6 +28,8 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdint.h> // for UINTPTR_MAX
#ifndef __QUICKLIST_H__
#define __QUICKLIST_H__
@ -64,19 +66,51 @@ typedef struct quicklistLZF {
char compressed[];
} quicklistLZF;
/* Bookmarks are padded with realloc at the end of of the quicklist struct.
* They should only be used for very big lists if thousands of nodes were the
* excess memory usage is negligible, and there's a real need to iterate on them
* in portions.
* When not used, they don't add any memory overhead, but when used and then
* deleted, some overhead remains (to avoid resonance).
* The number of bookmarks used should be kept to minimum since it also adds
* overhead on node deletion (searching for a bookmark to update). */
typedef struct quicklistBookmark {
quicklistNode *node;
char *name;
} quicklistBookmark;
#if UINTPTR_MAX == 0xffffffff
/* 32-bit */
# define QL_FILL_BITS 14
# define QL_COMP_BITS 14
# define QL_BM_BITS 4
#elif UINTPTR_MAX == 0xffffffffffffffff
/* 64-bit */
# define QL_FILL_BITS 16
# define QL_COMP_BITS 16
# define QL_BM_BITS 4 /* we can encode more, but we rather limit the user
since they cause performance degradation. */
#else
# error unknown arch bits count
#endif
/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor. */
* 'fill' is the user-requested (or default) fill factor.
* 'bookmakrs are an optional feature that is used by realloc this struct,
* so that they don't consume memory when not used. */
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes */
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
int fill : QL_FILL_BITS; /* fill factor for individual nodes */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
typedef struct quicklistIter {
@ -158,6 +192,12 @@ unsigned long quicklistCount(const quicklist *ql);
int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len);
size_t quicklistGetLzf(const quicklistNode *node, void **data);
/* bookmarks */
int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node);
int quicklistBookmarkDelete(quicklist *ql, const char *name);
quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name);
void quicklistBookmarksClear(quicklist *ql);
#ifdef REDIS_TEST
int quicklistTest(int argc, char *argv[]);
#endif

View File

@ -1850,6 +1850,8 @@ NULL
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
c->argv[2],c->db->id);
/* We want to unblock any XREADGROUP consumers with -NOGROUP. */
signalKeyAsReady(c->db,c->argv[2]);
} else {
addReply(c,shared.czero);
}

View File

@ -44,7 +44,7 @@
rax *TrackingTable = NULL;
rax *PrefixTable = NULL;
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
the whole tracking table. This givesn
the whole tracking table. This gives
an hint about the total memory we
are using server side for CSC. */
robj *TrackingChannelName;
@ -145,9 +145,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
}
}
/* This function is called after the excution of a readonly command in the
/* This function is called after the execution of a readonly command in the
* case the client 'c' has keys tracking enabled. It will populate the
* tracking ivalidation table according to the keys the user fetched, so that
* tracking invalidation table according to the keys the user fetched, so that
* Redis will know what are the clients that should receive an invalidation
* message with certain groups of keys are modified. */
void trackingRememberKeys(client *c) {
@ -292,19 +292,12 @@ void trackingInvalidateKey(robj *keyobj) {
}
/* This function is called when one or all the Redis databases are flushed
* (dbid == -1 in case of FLUSHALL). Caching slots are not specific for
* each DB but are global: currently what we do is sending a special
* (dbid == -1 in case of FLUSHALL). Caching keys are not specific for
* each DB but are global: currently what we do is send a special
* notification to clients with tracking enabled, invalidating the caching
* slot "-1", which means, "all the keys", in order to avoid flooding clients
* key "", which means, "all the keys", in order to avoid flooding clients
* with many invalidation messages for all the keys they may hold.
*
* However trying to flush the tracking table here is very costly:
* we need scanning 16 million caching slots in the table to check
* if they are used, this introduces a big delay. So what we do is to really
* flush the table in the case of FLUSHALL. When a FLUSHDB is called instead
* we just send the invalidation message to all the clients, but don't
* flush the table: it will slowly get garbage collected as more keys
* are modified in the used caching slots. */
*/
void freeTrackingRadixTree(void *rt) {
raxFree(rt);
}
@ -325,6 +318,7 @@ void trackingInvalidateKeysOnFlush(int dbid) {
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
if (dbid == -1 && TrackingTable) {
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
TrackingTable = raxNew();
TrackingTableTotalItems = 0;
}
}

View File

@ -74,6 +74,7 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_ReplyWithError(ctx, err);
goto final;
}
/* Make sure we can't convert a string that has \0 in it */
char buf[4] = "123";
buf[1] = '\0';
@ -81,8 +82,11 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
long double ld3;
if (RedisModule_StringToLongDouble(s3, &ld3) == REDISMODULE_OK) {
RedisModule_ReplyWithError(ctx, "Invalid string successfully converted to long double");
RedisModule_FreeString(ctx, s3);
goto final;
}
RedisModule_FreeString(ctx, s3);
RedisModule_ReplyWithLongDouble(ctx, ld2);
final:
RedisModule_FreeString(ctx, s1);

View File

@ -209,5 +209,97 @@ start_server {tags {"defrag"}} {
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
} {OK}
test "Active defrag big list" {
r flushdb
r config resetstat
r config set save "" ;# prevent bgsave from interfereing with save below
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75
r config set active-defrag-ignore-bytes 2mb
r config set maxmemory 0
r config set list-max-ziplist-size 5 ;# list of 500k items will have 100k quicklist nodes
# create big keys with 10k items
set rd [redis_deferring_client]
set expected_frag 1.7
# add a mass of list nodes to two lists (allocations are interlaced)
set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation
for {set j 0} {$j < 500000} {incr j} {
$rd lpush biglist1 $val
$rd lpush biglist2 $val
}
for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies
$rd read ; # Discard replies
}
# create some fragmentation
r del biglist2
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
if {$::verbose} {
puts "frag $frag"
}
assert {$frag >= $expected_frag}
r config set latency-monitor-threshold 5
r latency reset
set digest [r debug digest]
catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} {
# wait for the active defrag to start working (decision once a second)
wait_for_condition 50 100 {
[s active_defrag_running] ne 0
} else {
fail "defrag not started."
}
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
} else {
after 120 ;# serverCron only updates the info once in 100ms
puts [r info memory]
puts [r info stats]
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
# test the the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
lassign $event eventname time latency max
if {$eventname == "active-defrag-cycle"} {
set max_latency $max
}
}
if {$::verbose} {
puts "frag $frag"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
}
assert {$frag < 1.1}
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 7.5ms
assert {$max_latency <= 12}
}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
r del biglist1 ;# coverage for quicklistBookmarksClear
} {1}
}
}

View File

@ -161,6 +161,15 @@ start_server {
assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}}
}
test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">"
r XGROUP DESTROY mystream mygroup
assert_error "*NOGROUP*" {$rd read}
}
test {XCLAIM can claim PEL items from another consumer} {
# Add 3 items into the stream, and create a consumer group
r del mystream