Swapdb should make transaction fail if there is any client watching keys (#8239)

This PR not only fixes the problem that swapdb does not make the
transaction fail, but also optimizes the FLUSHALL and FLUSHDB command to
set the CLIENT_DIRTY_CAS flag to avoid unnecessary traversal of clients.

FLUSHDB was changed to first iterate on all watched keys, and then on the
clients watching each key.
Instead of iterating though all clients, and for each iterate on watched keys.

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Yang Bodong 2021-01-04 20:48:28 +08:00 committed by GitHub
parent ecd5351870
commit 10f94b0ab1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 22 deletions

View File

@ -573,7 +573,18 @@ void signalModifiedKey(client *c, redisDb *db, robj *key) {
} }
void signalFlushedDb(int dbid, int async) { void signalFlushedDb(int dbid, int async) {
touchWatchedKeysOnFlush(dbid); int startdb, enddb;
if (dbid == -1) {
startdb = 0;
enddb = server.dbnum-1;
} else {
startdb = enddb = dbid;
}
for (int j = startdb; j <= enddb; j++) {
touchAllWatchedKeysInDb(&server.db[j], NULL);
}
trackingInvalidateKeysOnFlush(async); trackingInvalidateKeysOnFlush(async);
} }
@ -1331,9 +1342,14 @@ int dbSwapDatabases(long id1, long id2) {
* However normally we only do this check for efficiency reasons * However normally we only do this check for efficiency reasons
* in dbAdd() when a list is created. So here we need to rescan * in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready * the list of clients blocked on lists and signal lists as ready
* if needed. */ * if needed.
*
* Also the swapdb should make transaction fail if there is any
* client watching keys */
scanDatabaseForReadyLists(db1); scanDatabaseForReadyLists(db1);
touchAllWatchedKeysInDb(db1, db2);
scanDatabaseForReadyLists(db2); scanDatabaseForReadyLists(db2);
touchAllWatchedKeysInDb(db2, db1);
return C_OK; return C_OK;
} }

View File

@ -374,31 +374,36 @@ void touchWatchedKey(redisDb *db, robj *key) {
} }
} }
/* On FLUSHDB or FLUSHALL all the watched keys that are present before the /* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty.
* flush but will be deleted as effect of the flushing operation should * It may happen in the following situations:
* be touched. "dbid" is the DB that's getting the flush. -1 if it is * FLUSHDB, FLUSHALL, SWAPDB
* a FLUSHALL operation (all the DBs flushed). */ *
void touchWatchedKeysOnFlush(int dbid) { * replaced_with: for SWAPDB, the WATCH should be invalidated if
listIter li1, li2; * the key exists in either of them, and skipped only if it
* doesn't exist in both. */
void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
listIter li;
listNode *ln; listNode *ln;
dictEntry *de;
/* For every client, check all the waited keys */ if (dictSize(emptied->watched_keys) == 0) return;
listRewind(server.clients,&li1);
while((ln = listNext(&li1))) {
client *c = listNodeValue(ln);
listRewind(c->watched_keys,&li2);
while((ln = listNext(&li2))) {
watchedKey *wk = listNodeValue(ln);
/* For every watched key matching the specified DB, if the dictIterator *di = dictGetSafeIterator(emptied->watched_keys);
* key exists, mark the client as dirty, as the key will be while((de = dictNext(di)) != NULL) {
* removed. */ robj *key = dictGetKey(de);
if (dbid == -1 || wk->db->id == dbid) { list *clients = dictGetVal(de);
if (dictFind(wk->db->dict, wk->key->ptr) != NULL) if (!clients) continue;
c->flags |= CLIENT_DIRTY_CAS; listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (dictFind(emptied->dict, key->ptr)) {
c->flags |= CLIENT_DIRTY_CAS;
} else if (replaced_with && dictFind(replaced_with->dict, key->ptr)) {
c->flags |= CLIENT_DIRTY_CAS;
} }
} }
} }
dictReleaseIterator(di);
} }
void watchCommand(client *c) { void watchCommand(client *c) {

View File

@ -1864,7 +1864,7 @@ void initClientMultiState(client *c);
void freeClientMultiState(client *c); void freeClientMultiState(client *c);
void queueMultiCommand(client *c); void queueMultiCommand(client *c);
void touchWatchedKey(redisDb *db, robj *key); void touchWatchedKey(redisDb *db, robj *key);
void touchWatchedKeysOnFlush(int dbid); void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with);
void discardTransaction(client *c); void discardTransaction(client *c);
void flagTransaction(client *c); void flagTransaction(client *c);
void execCommandAbort(client *c, sds error); void execCommandAbort(client *c, sds error);

View File

@ -196,6 +196,29 @@ start_server {tags {"multi"}} {
r exec r exec
} {PONG} } {PONG}
test {SWAPDB is able to touch the watched keys that exist} {
r flushall
r select 0
r set x 30
r watch x ;# make sure x (set to 30) doesn't change (SWAPDB will "delete" it)
r swapdb 0 1
r multi
r ping
r exec
} {}
test {SWAPDB is able to touch the watched keys that do not exist} {
r flushall
r select 1
r set x 30
r select 0
r watch x ;# make sure the key x (currently missing) doesn't change (SWAPDB will create it)
r swapdb 0 1
r multi
r ping
r exec
} {}
test {WATCH is able to remember the DB a key belongs to} { test {WATCH is able to remember the DB a key belongs to} {
r select 5 r select 5
r set x 30 r set x 30