Precise timeouts: reference client pointer directly.

This commit is contained in:
antirez 2020-03-30 15:22:55 +02:00
parent dd7e61d77f
commit 3ef59b50c4

View File

@ -93,18 +93,19 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ #define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
/* Given client ID and timeout, write the resulting radix tree key in buf. */ /* Given client ID and timeout, write the resulting radix tree key in buf. */
void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, client *c) {
timeout = htonu64(timeout); timeout = htonu64(timeout);
memcpy(buf,&timeout,sizeof(timeout)); memcpy(buf,&timeout,sizeof(timeout));
memcpy(buf+8,&id,sizeof(id)); memcpy(buf+8,&c,sizeof(c));
if (sizeof(c) == 4) memset(buf+12,0,4); /* Zero padding for 32bit target. */
} }
/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write /* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
* the timeout into *toptr and the client ID into *idptr. */ * the timeout into *toptr and the client pointer into *cptr. */
void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, client **cptr) {
memcpy(toptr,buf,sizeof(*toptr)); memcpy(toptr,buf,sizeof(*toptr));
*toptr = ntohu64(*toptr); *toptr = ntohu64(*toptr);
memcpy(idptr,buf+8,sizeof(*idptr)); memcpy(cptr,buf+8,sizeof(*cptr));
} }
/* Add the specified client id / timeout as a key in the radix tree we use /* Add the specified client id / timeout as a key in the radix tree we use
@ -113,9 +114,8 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
void addClientToTimeoutTable(client *c) { void addClientToTimeoutTable(client *c) {
if (c->bpop.timeout == 0) return; if (c->bpop.timeout == 0) return;
uint64_t timeout = c->bpop.timeout; uint64_t timeout = c->bpop.timeout;
uint64_t id = c->id;
unsigned char buf[CLIENT_ST_KEYLEN]; unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,id); encodeTimeoutKey(buf,timeout,c);
if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
c->flags |= CLIENT_IN_TO_TABLE; c->flags |= CLIENT_IN_TO_TABLE;
} }
@ -126,9 +126,8 @@ void removeClientFromTimeoutTable(client *c) {
if (!(c->flags & CLIENT_IN_TO_TABLE)) return; if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
c->flags &= ~CLIENT_IN_TO_TABLE; c->flags &= ~CLIENT_IN_TO_TABLE;
uint64_t timeout = c->bpop.timeout; uint64_t timeout = c->bpop.timeout;
uint64_t id = c->id;
unsigned char buf[CLIENT_ST_KEYLEN]; unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,id); encodeTimeoutKey(buf,timeout,c);
raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
} }
@ -142,14 +141,12 @@ void handleBlockedClientsTimeout(void) {
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) { while(raxNext(&ri)) {
uint64_t id, timeout; uint64_t timeout;
decodeTimeoutKey(ri.key,&timeout,&id); client *c;
decodeTimeoutKey(ri.key,&timeout,&c);
if (timeout >= now) break; /* All the timeouts are in the future. */ if (timeout >= now) break; /* All the timeouts are in the future. */
client *c = lookupClientByID(id); c->flags &= ~CLIENT_IN_TO_TABLE;
if (c) { checkBlockedClientTimeout(c,now);
c->flags &= ~CLIENT_IN_TO_TABLE;
checkBlockedClientTimeout(c,now);
}
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
} }