pipeline: do not sdsrange querybuf unless all commands processed

This is an optimization for processing pipeline, we discussed a
problem in issue #5229: clients may be paused if we apply `CLIENT
PAUSE` command, and then querybuf may grow too large, the cost of
memmove in sdsrange after parsing a completed command will be
horrible. The optimization is that parsing all commands in queyrbuf
, after that we can just call sdsrange only once.
This commit is contained in:
zhaozhao.zz 2018-08-14 00:43:36 +08:00
parent 39c70e728b
commit 14c4ddb5a6
2 changed files with 48 additions and 40 deletions

View File

@ -110,6 +110,7 @@ client *createClient(int fd) {
c->fd = fd; c->fd = fd;
c->name = NULL; c->name = NULL;
c->bufpos = 0; c->bufpos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty(); c->querybuf = sdsempty();
c->pending_querybuf = sdsempty(); c->pending_querybuf = sdsempty();
c->querybuf_peak = 0; c->querybuf_peak = 0;
@ -1119,11 +1120,11 @@ int processInlineBuffer(client *c) {
size_t querylen; size_t querylen;
/* Search for end of line */ /* Search for end of line */
newline = strchr(c->querybuf,'\n'); newline = strchr(c->querybuf+c->qb_pos,'\n');
/* Nothing to do without a \r\n */ /* Nothing to do without a \r\n */
if (newline == NULL) { if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big inline request"); addReplyError(c,"Protocol error: too big inline request");
setProtocolError("too big inline request",c,0); setProtocolError("too big inline request",c,0);
} }
@ -1131,12 +1132,12 @@ int processInlineBuffer(client *c) {
} }
/* Handle the \r\n case. */ /* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r') if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
newline--, linefeed_chars++; newline--, linefeed_chars++;
/* Split the input buffer up to the \r\n */ /* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf); querylen = newline-(c->querybuf+c->qb_pos);
aux = sdsnewlen(c->querybuf,querylen); aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
argv = sdssplitargs(aux,&argc); argv = sdssplitargs(aux,&argc);
sdsfree(aux); sdsfree(aux);
if (argv == NULL) { if (argv == NULL) {
@ -1152,7 +1153,8 @@ int processInlineBuffer(client *c) {
c->repl_ack_time = server.unixtime; c->repl_ack_time = server.unixtime;
/* Leave data after the first line of the query in the buffer */ /* Leave data after the first line of the query in the buffer */
sdsrange(c->querybuf,querylen+linefeed_chars,-1); sdsrange(c->querybuf,c->qb_pos+querylen+linefeed_chars,-1);
c->qb_pos = 0;
/* Setup argv array on client structure */ /* Setup argv array on client structure */
if (argc) { if (argc) {
@ -1182,10 +1184,10 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
/* Sample some protocol to given an idea about what was inside. */ /* Sample some protocol to given an idea about what was inside. */
char buf[256]; char buf[256];
if (sdslen(c->querybuf) < PROTO_DUMP_LEN) { if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf); snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
} else { } else {
snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf, sdslen(c->querybuf)-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
} }
/* Remove non printable chars. */ /* Remove non printable chars. */
@ -1202,6 +1204,7 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
} }
c->flags |= CLIENT_CLOSE_AFTER_REPLY; c->flags |= CLIENT_CLOSE_AFTER_REPLY;
sdsrange(c->querybuf,pos,-1); sdsrange(c->querybuf,pos,-1);
c->qb_pos -= pos;
} }
/* Process the query buffer for client 'c', setting up the client argument /* Process the query buffer for client 'c', setting up the client argument
@ -1217,7 +1220,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
* to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
int processMultibulkBuffer(client *c) { int processMultibulkBuffer(client *c) {
char *newline = NULL; char *newline = NULL;
long pos = 0;
int ok; int ok;
long long ll; long long ll;
@ -1226,9 +1228,9 @@ int processMultibulkBuffer(client *c) {
serverAssertWithInfo(c,NULL,c->argc == 0); serverAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */ /* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r'); newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) { if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string"); addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError("too big mbulk count string",c,0); setProtocolError("too big mbulk count string",c,0);
} }
@ -1236,22 +1238,23 @@ int processMultibulkBuffer(client *c) {
} }
/* Buffer should also contain \n */ /* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
return C_ERR; return C_ERR;
/* We know for sure there is a whole line since newline != NULL, /* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */ * so go ahead and find out the multi bulk length. */
serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
if (!ok || ll > 1024*1024) { if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length"); addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError("invalid mbulk count",c,pos); setProtocolError("invalid mbulk count",c,c->qb_pos);
return C_ERR; return C_ERR;
} }
pos = (newline-c->querybuf)+2; c->qb_pos = (newline-c->querybuf)+2;
if (ll <= 0) { if (ll <= 0) {
sdsrange(c->querybuf,pos,-1); sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
return C_OK; return C_OK;
} }
@ -1266,9 +1269,9 @@ int processMultibulkBuffer(client *c) {
while(c->multibulklen) { while(c->multibulklen) {
/* Read bulk length if unknown */ /* Read bulk length if unknown */
if (c->bulklen == -1) { if (c->bulklen == -1) {
newline = strchr(c->querybuf+pos,'\r'); newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) { if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c, addReplyError(c,
"Protocol error: too big bulk count string"); "Protocol error: too big bulk count string");
setProtocolError("too big bulk count string",c,0); setProtocolError("too big bulk count string",c,0);
@ -1278,25 +1281,25 @@ int processMultibulkBuffer(client *c) {
} }
/* Buffer should also contain \n */ /* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
break; break;
if (c->querybuf[pos] != '$') { if (c->querybuf[c->qb_pos] != '$') {
addReplyErrorFormat(c, addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'", "Protocol error: expected '$', got '%c'",
c->querybuf[pos]); c->querybuf[c->qb_pos]);
setProtocolError("expected $ but got something else",c,pos); setProtocolError("expected $ but got something else",c,c->qb_pos);
return C_ERR; return C_ERR;
} }
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
if (!ok || ll < 0 || ll > server.proto_max_bulk_len) { if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
addReplyError(c,"Protocol error: invalid bulk length"); addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError("invalid bulk length",c,pos); setProtocolError("invalid bulk length",c,c->qb_pos);
return C_ERR; return C_ERR;
} }
pos += newline-(c->querybuf+pos)+2; c->qb_pos = newline-c->querybuf+2;
if (ll >= PROTO_MBULK_BIG_ARG) { if (ll >= PROTO_MBULK_BIG_ARG) {
size_t qblen; size_t qblen;
@ -1304,8 +1307,8 @@ int processMultibulkBuffer(client *c) {
* try to make it likely that it will start at c->querybuf * try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation * boundary so that we can optimize object creation
* avoiding a large copy of data. */ * avoiding a large copy of data. */
sdsrange(c->querybuf,pos,-1); sdsrange(c->querybuf,c->qb_pos,-1);
pos = 0; c->qb_pos = 0;
qblen = sdslen(c->querybuf); qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is /* Hint the sds library about the amount of bytes this string is
* going to contain. */ * going to contain. */
@ -1316,14 +1319,14 @@ int processMultibulkBuffer(client *c) {
} }
/* Read bulk argument */ /* Read bulk argument */
if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) { if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */ /* Not enough data (+2 == trailing \r\n) */
break; break;
} else { } else {
/* Optimization: if the buffer contains JUST our bulk element /* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we * instead of creating a new object by *copying* the sds we
* just use the current sds string. */ * just use the current sds string. */
if (pos == 0 && if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG && c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2)) sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{ {
@ -1333,20 +1336,16 @@ int processMultibulkBuffer(client *c) {
* likely... */ * likely... */
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
sdsclear(c->querybuf); sdsclear(c->querybuf);
pos = 0;
} else { } else {
c->argv[c->argc++] = c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen); createStringObject(c->querybuf+c->qb_pos,c->bulklen);
pos += c->bulklen+2; c->qb_pos += c->bulklen+2;
} }
c->bulklen = -1; c->bulklen = -1;
c->multibulklen--; c->multibulklen--;
} }
} }
/* Trim to pos */
if (pos) sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */ /* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK; if (c->multibulklen == 0) return C_OK;
@ -1360,8 +1359,9 @@ int processMultibulkBuffer(client *c) {
* pending query buffer, already representing a full command, to process. */ * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) { void processInputBuffer(client *c) {
server.current_client = c; server.current_client = c;
/* Keep processing while there is something in the input buffer */ /* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) { while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */ /* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
@ -1377,7 +1377,7 @@ void processInputBuffer(client *c) {
/* Determine request type when unknown. */ /* Determine request type when unknown. */
if (!c->reqtype) { if (!c->reqtype) {
if (c->querybuf[0] == '*') { if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK; c->reqtype = PROTO_REQ_MULTIBULK;
} else { } else {
c->reqtype = PROTO_REQ_INLINE; c->reqtype = PROTO_REQ_INLINE;
@ -1400,7 +1400,7 @@ void processInputBuffer(client *c) {
if (processCommand(c) == C_OK) { if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */ /* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf); c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
} }
/* Don't reset the client structure for clients blocked in a /* Don't reset the client structure for clients blocked in a
@ -1416,6 +1416,13 @@ void processInputBuffer(client *c) {
if (server.current_client == NULL) break; if (server.current_client == NULL) break;
} }
} }
/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
server.current_client = NULL; server.current_client = NULL;
} }

View File

@ -710,6 +710,7 @@ typedef struct client {
redisDb *db; /* Pointer to currently SELECTed DB. */ redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */ robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */ sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the represents the yet not applied portion of the
replication stream that we are receiving from replication stream that we are receiving from