mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Rename bstate to bpop.
This commit is contained in:
parent
59bd44d1c8
commit
e3c51c4b1b
@ -41,10 +41,10 @@ redisClient *createClient(int fd) {
|
|||||||
c->reply = listCreate();
|
c->reply = listCreate();
|
||||||
listSetFreeMethod(c->reply,decrRefCount);
|
listSetFreeMethod(c->reply,decrRefCount);
|
||||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||||
c->bstate.keys = NULL;
|
c->bpop.keys = NULL;
|
||||||
c->bstate.count = 0;
|
c->bpop.count = 0;
|
||||||
c->bstate.timeout = 0;
|
c->bpop.timeout = 0;
|
||||||
c->bstate.target = NULL;
|
c->bpop.target = NULL;
|
||||||
c->io_keys = listCreate();
|
c->io_keys = listCreate();
|
||||||
c->watched_keys = listCreate();
|
c->watched_keys = listCreate();
|
||||||
listSetFreeMethod(c->io_keys,decrRefCount);
|
listSetFreeMethod(c->io_keys,decrRefCount);
|
||||||
@ -679,7 +679,7 @@ void closeTimedoutClients(void) {
|
|||||||
redisLog(REDIS_VERBOSE,"Closing idle client");
|
redisLog(REDIS_VERBOSE,"Closing idle client");
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
} else if (c->flags & REDIS_BLOCKED) {
|
} else if (c->flags & REDIS_BLOCKED) {
|
||||||
if (c->bstate.timeout != 0 && c->bstate.timeout < now) {
|
if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
|
||||||
addReply(c,shared.nullmultibulk);
|
addReply(c,shared.nullmultibulk);
|
||||||
unblockClientWaitingData(c);
|
unblockClientWaitingData(c);
|
||||||
}
|
}
|
||||||
|
@ -326,7 +326,7 @@ typedef struct redisClient {
|
|||||||
long repldboff; /* replication DB file offset */
|
long repldboff; /* replication DB file offset */
|
||||||
off_t repldbsize; /* replication DB file size */
|
off_t repldbsize; /* replication DB file size */
|
||||||
multiState mstate; /* MULTI/EXEC state */
|
multiState mstate; /* MULTI/EXEC state */
|
||||||
blockingState bstate; /* blocking state */
|
blockingState bpop; /* blocking state */
|
||||||
list *io_keys; /* Keys this client is waiting to be loaded from the
|
list *io_keys; /* Keys this client is waiting to be loaded from the
|
||||||
* swap file in order to continue. */
|
* swap file in order to continue. */
|
||||||
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
||||||
|
38
src/t_list.c
38
src/t_list.c
@ -694,10 +694,10 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
|
|||||||
list *l;
|
list *l;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
c->bstate.keys = zmalloc(sizeof(robj*)*numkeys);
|
c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
|
||||||
c->bstate.count = numkeys;
|
c->bpop.count = numkeys;
|
||||||
c->bstate.timeout = timeout;
|
c->bpop.timeout = timeout;
|
||||||
c->bstate.target = target;
|
c->bpop.target = target;
|
||||||
|
|
||||||
if (target != NULL) {
|
if (target != NULL) {
|
||||||
incrRefCount(target);
|
incrRefCount(target);
|
||||||
@ -705,7 +705,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
|
|||||||
|
|
||||||
for (j = 0; j < numkeys; j++) {
|
for (j = 0; j < numkeys; j++) {
|
||||||
/* Add the key in the client structure, to map clients -> keys */
|
/* Add the key in the client structure, to map clients -> keys */
|
||||||
c->bstate.keys[j] = keys[j];
|
c->bpop.keys[j] = keys[j];
|
||||||
incrRefCount(keys[j]);
|
incrRefCount(keys[j]);
|
||||||
|
|
||||||
/* And in the other "side", to map keys -> clients */
|
/* And in the other "side", to map keys -> clients */
|
||||||
@ -734,28 +734,28 @@ void unblockClientWaitingData(redisClient *c) {
|
|||||||
list *l;
|
list *l;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
redisAssert(c->bstate.keys != NULL);
|
redisAssert(c->bpop.keys != NULL);
|
||||||
/* The client may wait for multiple keys, so unblock it for every key. */
|
/* The client may wait for multiple keys, so unblock it for every key. */
|
||||||
for (j = 0; j < c->bstate.count; j++) {
|
for (j = 0; j < c->bpop.count; j++) {
|
||||||
/* Remove this client from the list of clients waiting for this key. */
|
/* Remove this client from the list of clients waiting for this key. */
|
||||||
de = dictFind(c->db->blocking_keys,c->bstate.keys[j]);
|
de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
|
||||||
redisAssert(de != NULL);
|
redisAssert(de != NULL);
|
||||||
l = dictGetEntryVal(de);
|
l = dictGetEntryVal(de);
|
||||||
listDelNode(l,listSearchKey(l,c));
|
listDelNode(l,listSearchKey(l,c));
|
||||||
/* If the list is empty we need to remove it to avoid wasting memory */
|
/* If the list is empty we need to remove it to avoid wasting memory */
|
||||||
if (listLength(l) == 0)
|
if (listLength(l) == 0)
|
||||||
dictDelete(c->db->blocking_keys,c->bstate.keys[j]);
|
dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
|
||||||
decrRefCount(c->bstate.keys[j]);
|
decrRefCount(c->bpop.keys[j]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (c->bstate.target != NULL) {
|
if (c->bpop.target != NULL) {
|
||||||
decrRefCount(c->bstate.target);
|
decrRefCount(c->bpop.target);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Cleanup the client structure */
|
/* Cleanup the client structure */
|
||||||
zfree(c->bstate.keys);
|
zfree(c->bpop.keys);
|
||||||
c->bstate.keys = NULL;
|
c->bpop.keys = NULL;
|
||||||
c->bstate.target = NULL;
|
c->bpop.target = NULL;
|
||||||
c->flags &= (~REDIS_BLOCKED);
|
c->flags &= (~REDIS_BLOCKED);
|
||||||
server.blpop_blocked_clients--;
|
server.blpop_blocked_clients--;
|
||||||
/* We want to process data if there is some command waiting
|
/* We want to process data if there is some command waiting
|
||||||
@ -789,7 +789,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
|||||||
redisAssert(ln != NULL);
|
redisAssert(ln != NULL);
|
||||||
receiver = ln->value;
|
receiver = ln->value;
|
||||||
|
|
||||||
if (receiver->bstate.target == NULL) {
|
if (receiver->bpop.target == NULL) {
|
||||||
/* BRPOP/BLPOP return a multi-bulk with the name
|
/* BRPOP/BLPOP return a multi-bulk with the name
|
||||||
* of the popped list */
|
* of the popped list */
|
||||||
addReplyMultiBulkLen(receiver,2);
|
addReplyMultiBulkLen(receiver,2);
|
||||||
@ -798,16 +798,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
/* BRPOPLPUSH */
|
/* BRPOPLPUSH */
|
||||||
robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target);
|
robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target);
|
||||||
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
|
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
|
||||||
|
|
||||||
addReplyBulk(receiver,ele);
|
addReplyBulk(receiver,ele);
|
||||||
|
|
||||||
if (!handleClientsWaitingListPush(receiver, receiver->bstate.target, ele)) {
|
if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) {
|
||||||
/* Create the list if the key does not exist */
|
/* Create the list if the key does not exist */
|
||||||
if (!dobj) {
|
if (!dobj) {
|
||||||
dobj = createZiplistObject();
|
dobj = createZiplistObject();
|
||||||
dbAdd(receiver->db, receiver->bstate.target, dobj);
|
dbAdd(receiver->db, receiver->bpop.target, dobj);
|
||||||
}
|
}
|
||||||
|
|
||||||
listTypePush(dobj, ele, REDIS_HEAD);
|
listTypePush(dobj, ele, REDIS_HEAD);
|
||||||
|
Loading…
Reference in New Issue
Block a user