Fix and improve module error reply statistics (#10278)

This PR handles several aspects
1. Calls to RM_ReplyWithError from thread safe contexts don't violate thread safety.
2. Errors returning from RM_Call to the module aren't counted in the statistics (they
  might be handled silently by the module)
3. When a module propagates a reply it got from RM_Call to it's client, then the error
  statistics are counted.

This is done by:
1. When appending an error reply to the output buffer, we avoid updating the global
  error statistics, instead we cache that error in a deferred list in the client struct.
2. When creating a RedisModuleCallReply object, the deferred error list is moved from
  the client into that object.
3. when a module calls RM_ReplyWithCallReply we copy the deferred replies to the dest
  client (if that's a real client, then that's when the error statistics are updated to the server)

Note about RM_ReplyWithCallReply: if the original reply had an array with errors, and the module
replied with just a portion of the original reply, and not the entire reply, the errors are currently not
propagated and the errors stats will not get propagated.

Fix #10180
This commit is contained in:
Oran Agra 2022-02-13 18:37:32 +02:00 committed by GitHub
parent 1193e96d02
commit b099889a3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 4 deletions

View File

@ -60,7 +60,7 @@ struct CallReply {
double d; /* Reply value for double reply. */
struct CallReply *array; /* Array of sub-reply elements. used for set, array, map, and attribute */
} val;
list *deferred_error_list; /* list of errors in sds form or NULL */
struct CallReply *attribute; /* attribute reply, NULL if not exists */
};
@ -237,6 +237,8 @@ void freeCallReply(CallReply *rep) {
freeCallReplyInternal(rep);
}
sdsfree(rep->original_proto);
if (rep->deferred_error_list)
listRelease(rep->deferred_error_list);
zfree(rep);
}
@ -488,6 +490,11 @@ int callReplyIsResp3(CallReply *rep) {
return rep->flags & REPLY_FLAG_RESP3;
}
/* Returns a list of errors in sds form, or NULL. */
list *callReplyDeferredErrorList(CallReply *rep) {
return rep->deferred_error_list;
}
/* Create a new CallReply struct from the reply blob.
*
* The function will own the reply blob, so it must not be used or freed by
@ -496,6 +503,9 @@ int callReplyIsResp3(CallReply *rep) {
* The reply blob will be freed when the returned CallReply struct is later
* freed using freeCallReply().
*
* The deferred_error_list is an optional list of errors that are present
* in the reply blob, if given, this function will take ownership on it.
*
* The private_data is optional and can later be accessed using
* callReplyGetPrivateData().
*
@ -504,7 +514,7 @@ int callReplyIsResp3(CallReply *rep) {
* DESIGNED TO HANDLE USER INPUT and using it to parse invalid replies is
* unsafe.
*/
CallReply *callReplyCreate(sds reply, void *private_data) {
CallReply *callReplyCreate(sds reply, list *deferred_error_list, void *private_data) {
CallReply *res = zmalloc(sizeof(*res));
res->flags = REPLY_FLAG_ROOT;
res->original_proto = reply;
@ -512,5 +522,6 @@ CallReply *callReplyCreate(sds reply, void *private_data) {
res->proto_len = sdslen(reply);
res->private_data = private_data;
res->attribute = NULL;
res->deferred_error_list = deferred_error_list;
return res;
}

View File

@ -34,7 +34,7 @@
typedef struct CallReply CallReply;
CallReply *callReplyCreate(sds reply, void *private_data);
CallReply *callReplyCreate(sds reply, list *deferred_error_list, void *private_data);
int callReplyType(CallReply *rep);
const char *callReplyGetString(CallReply *rep, size_t *len);
long long callReplyGetLongLong(CallReply *rep);
@ -51,6 +51,7 @@ const char *callReplyGetVerbatim(CallReply *rep, size_t *len, const char **forma
const char *callReplyGetProto(CallReply *rep, size_t *len);
void *callReplyGetPrivateData(CallReply *rep);
int callReplyIsResp3(CallReply *rep);
list *callReplyDeferredErrorList(CallReply *rep);
void freeCallReply(CallReply *rep);
#endif /* SRC_CALL_REPLY_H_ */

View File

@ -2918,6 +2918,15 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
size_t proto_len;
const char *proto = callReplyGetProto(reply, &proto_len);
addReplyProto(c, proto, proto_len);
/* Propagate the error list from that reply to the other client, to do some
* post error reply handling, like statistics.
* Note that if the original reply had an array with errors, and the module
* replied with just a portion of the original reply, and not the entire
* reply, the errors are currently not propagated and the errors stats
* will not get propagated. */
list *errors = callReplyDeferredErrorList(reply);
if (errors)
deferredAfterErrorReply(c, errors);
return REDISMODULE_OK;
}
@ -5654,7 +5663,8 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
proto = sdscatlen(proto,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply));
}
reply = callReplyCreate(proto, ctx);
reply = callReplyCreate(proto, c->deferred_reply_errors, ctx);
c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */
autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
cleanup:

View File

@ -173,6 +173,7 @@ client *createClient(connection *conn) {
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->reply = listCreate();
c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
@ -439,6 +440,18 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
void afterErrorReply(client *c, const char *s, size_t len) {
/* Module clients fall into two categories:
* Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted.
* Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread later. */
if (c->flags & CLIENT_MODULE) {
if (!c->deferred_reply_errors) {
c->deferred_reply_errors = listCreate();
listSetFreeMethod(c->deferred_reply_errors, (void (*)(void*))sdsfree);
}
listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len));
return;
}
/* Increment the global error counter */
server.stat_total_error_replies++;
/* Increment the error stats
@ -1024,10 +1037,28 @@ void AddReplyFromClient(client *dst, client *src) {
src->reply_bytes = 0;
src->bufpos = 0;
if (src->deferred_reply_errors) {
deferredAfterErrorReply(dst, src->deferred_reply_errors);
listRelease(src->deferred_reply_errors);
src->deferred_reply_errors = NULL;
}
/* Check output buffer limits */
closeClientOnOutputBufferLimitReached(dst, 1);
}
/* Append the listed errors to the server error statistics. the input
* list is not modified and remains the responsibility of the caller. */
void deferredAfterErrorReply(client *c, list *errors) {
listIter li;
listNode *ln;
listRewind(errors,&li);
while((ln = listNext(&li))) {
sds err = ln->value;
afterErrorReply(c, err, sdslen(err));
}
}
/* Logically copy 'src' replica client buffers info to 'dst' replica.
* Basically increase referenced buffer block node reference count. */
void copyReplicaOutputBuffer(client *dst, client *src) {
@ -1497,6 +1528,8 @@ void freeClient(client *c) {
freeReplicaReferencedReplBuffer(c);
freeClientArgv(c);
freeClientOriginalArgv(c);
if (c->deferred_reply_errors)
listRelease(c->deferred_reply_errors);
/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
@ -1863,6 +1896,10 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
if (c->deferred_reply_errors)
listRelease(c->deferred_reply_errors);
c->deferred_reply_errors = NULL;
/* We clear the ASKING flag as well if we are not inside a MULTI, and
* if what we just executed is not the ASKING command itself. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)

View File

@ -1087,6 +1087,7 @@ typedef struct client {
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
list *deferred_reply_errors; /* Used for module thread safe contexts. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
@ -2428,6 +2429,7 @@ void addReplySubcommandSyntaxError(client *c);
void addReplyLoadedModules(client *c);
void copyReplicaOutputBuffer(client *dst, client *src);
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
void deferredAfterErrorReply(client *c, list *errors);
size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o);
void freeClientReplyValue(void *o);

View File

@ -180,6 +180,26 @@ start_server {tags {"modules"}} {
assert_no_match "*name=myclient*" $clients
}
test {module client error stats} {
r config resetstat
assert_error "NULL reply returned" {r do_rm_call hgetalllll}
assert_equal [errorrstat NULL r] {count=1}
assert_error "NULL reply returned" {r do_bg_rm_call hgetalllll}
assert_equal [errorrstat NULL r] {count=2}
r do_rm_call set x x
assert_error "ERR wrong number of arguments for 'do_rm_call' command" {r do_rm_call}
assert_equal [errorrstat ERR r] {count=1}
assert_error "WRONGTYPE*" {r do_rm_call hgetall x}
assert_equal [errorrstat WRONGTYPE r] {count=1}
assert_error "WRONGTYPE*" {r do_bg_rm_call hgetall x}
assert_equal [errorrstat WRONGTYPE r] {count=2}
}
test "Unload the module - blockedclient" {
assert_equal {OK} [r module unload blockedclient]
}