Optimize quicklistIndex to seek from the nearest end (#9454)

Until now, giving a negative index seeks from the end of a list and a
positive seeks from the beginning. This change makes it seek from
the nearest end, regardless of the sign of the given index.

quicklistIndex is used by all list commands which operate by index.

LINDEX key 999999 in a list if 1M elements is greately optimized by
this change. Latency is cut by 75%.

LINDEX key -1000000 in a list of 1M elements, likewise.

LRANGE key -1 -1 is affected by this, since LRANGE converts the
indices to positive numbers before seeking.

The tests for corrupt dumps are updated to make sure the corrup
data is seeked in the same direction as before.
This commit is contained in:
Viktor Söderqvist 2021-09-06 08:12:38 +02:00 committed by GitHub
parent 763fd09416
commit 547c3405d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 19 deletions

View File

@ -861,7 +861,7 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
/* Populate accounting flags for easier boolean checks later */
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
D("Current node is full with count %d with requested fill %d",
node->count, fill);
full = 1;
}
@ -1244,31 +1244,36 @@ int quicklistIndex(const quicklist *quicklist, const long long idx,
initEntry(entry);
entry->quicklist = quicklist;
if (!forward) {
index = (-idx) - 1;
n = quicklist->tail;
} else {
index = idx;
n = quicklist->head;
}
index = forward ? idx : (-idx) - 1;
if (index >= quicklist->count)
return 0;
/* Seek in the other direction if that way is shorter. */
int seek_forward = forward;
unsigned long long seek_index = index;
if (index > (quicklist->count - 1) / 2) {
seek_forward = !forward;
seek_index = quicklist->count - 1 - index;
}
n = seek_forward ? quicklist->head : quicklist->tail;
while (likely(n)) {
if ((accum + n->count) > index) {
if ((accum + n->count) > seek_index) {
break;
} else {
D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
accum);
accum += n->count;
n = forward ? n->next : n->prev;
n = seek_forward ? n->next : n->prev;
}
}
if (!n)
return 0;
/* Fix accum so it looks like we seeked in the other direction. */
if (seek_forward != forward) accum = quicklist->count - n->count - accum;
D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
accum, index, index - accum, (-index) - 1 + accum);
@ -1278,7 +1283,7 @@ int quicklistIndex(const quicklist *quicklist, const long long idx,
entry->offset = index - accum;
} else {
/* reverse = need negative offset for tail-to-head, so undo
* the result of the original if (index < 0) above. */
* the result of the original index = (-idx) - 1 above. */
entry->offset = (-index) - 1 + accum;
}
@ -1852,7 +1857,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
unsigned int sz;
long long lv;
ql_info(ql);
quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv);
assert(quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv));
assert(data != NULL);
assert(sz == 32);
if (strcmp(populate, (char *)data))
@ -1870,7 +1875,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
unsigned int sz;
long long lv;
ql_info(ql);
quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv);
assert(quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv));
assert(data == NULL);
assert(lv == 55513);
ql_verify(ql, 0, 0, 0, 0);
@ -2735,9 +2740,11 @@ int quicklistTest(int argc, char *argv[], int accurate) {
if (step == 1) {
for (int i = 0; i < list_sizes[list] / 2; i++) {
unsigned char *data;
quicklistPop(ql, QUICKLIST_HEAD, &data, NULL, NULL);
assert(quicklistPop(ql, QUICKLIST_HEAD, &data,
NULL, NULL));
zfree(data);
quicklistPop(ql, QUICKLIST_TAIL, &data, NULL, NULL);
assert(quicklistPop(ql, QUICKLIST_TAIL, &data,
NULL, NULL));
zfree(data);
}
}

View File

@ -405,7 +405,7 @@ void addListRangeReply(client *c, robj *o, long start, long end, int reverse) {
while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
serverAssert(listTypeNext(iter, &entry)); /* fail on corrupt data */
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);

View File

@ -82,7 +82,7 @@ test {corrupt payload: valid zipped hash header, dup records} {
test {corrupt payload: quicklist big ziplist prev len} {
start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
r config set sanitize-dump-payload no
r restore key 0 "\x0E\x01\x13\x13\x00\x00\x00\x0E\x00\x00\x00\x02\x00\x00\x02\x61\x00\x0E\x02\x62\x00\xFF\x09\x00\x49\x97\x30\xB2\x0D\xA1\xED\xAA"
r restore key 0 "\x0e\x01\x1b\x1b\x00\x00\x00\x16\x00\x00\x00\x04\x00\x00\x02\x61\x00\x04\x02\x62\x00\x04\x02\x63\x00\x19\x02\x64\x00\xff\x09\x00\xec\x42\xe9\xf5\xd6\x19\x9e\xbd"
catch {r lindex key -2}
assert_equal [count_log_message 0 "crashed by signal"] 0
assert_equal [count_log_message 0 "ASSERTION FAILED"] 1
@ -107,7 +107,7 @@ test {corrupt payload: quicklist ziplist wrong count} {
# we'll be able to push, but iterating on the list will assert
r lpush key header
r rpush key footer
catch { [r lrange key -1 -1] }
catch { [r lrange key 0 -1] }
assert_equal [count_log_message 0 "crashed by signal"] 0
assert_equal [count_log_message 0 "ASSERTION FAILED"] 1
}