Threaded IO: parsing WIP 1: set current_client in a better scoped way.

This commit is contained in:
antirez 2019-04-12 17:18:10 +02:00
parent 463ccf8664
commit 647a66ebba

View File

@ -1563,7 +1563,7 @@ int processMultibulkBuffer(client *c) {
* or because a client was blocked and later reactivated, so there could be * or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */ * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) { void processInputBuffer(client *c) {
server.current_client = c; int deadclient = 0;
/* Keep processing while there is something in the input buffer */ /* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) { while(c->qb_pos < sdslen(c->querybuf)) {
@ -1619,6 +1619,7 @@ void processInputBuffer(client *c) {
resetClient(c); resetClient(c);
} else { } else {
/* Only reset the client when the command was executed. */ /* Only reset the client when the command was executed. */
server.current_client = c;
if (processCommand(c) == C_OK) { if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */ /* Update the applied replication offset of our master. */
@ -1629,23 +1630,26 @@ void processInputBuffer(client *c) {
* module blocking command, so that the reply callback will * module blocking command, so that the reply callback will
* still be able to access the client argv and argc field. * still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */ * The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) if (!(c->flags & CLIENT_BLOCKED) ||
c->btype != BLOCKED_MODULE)
{
resetClient(c); resetClient(c);
}
} }
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may /* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be * result into a slave, that may be the active client, to be
* freed. */ * freed. */
if (server.current_client == NULL) break; if (deadclient) break;
} }
} }
/* Trim to pos */ /* Trim to pos */
if (server.current_client != NULL && c->qb_pos) { if (!deadclient && c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1); sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0; c->qb_pos = 0;
} }
server.current_client = NULL;
} }
/* This is a wrapper for processInputBuffer that also cares about handling /* This is a wrapper for processInputBuffer that also cares about handling
@ -1743,11 +1747,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
/* There is more data in the client input buffer, continue parsing it /* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. * in case to check if there is a full command to execute. */
* Don't do it if the client is flagged as CLIENT_PENDING_READ: it means processInputBufferAndReplicate(c);
* we are currently in the context of an I/O thread. */
if (!(c->flags & CLIENT_PENDING_READ))
processInputBufferAndReplicate(c);
} }
void getClientsMaxBuffers(unsigned long *longest_output_list, void getClientsMaxBuffers(unsigned long *longest_output_list,