New API to force propagation.

The old REDIS_CMD_FORCE_REPLICATION flag was removed from the
implementation of Redis, now there is a new API to force specific
executions of a command to be propagated to AOF / Replication link:

    void forceCommandPropagation(int flags);

The new API is also compatible with Lua scripting, so a script that will
execute commands that are forced to be propagated, will also be
propagated itself accordingly even if no change to data is operated.

As a side effect, this new design fixes the issue with scripts not able
to propagate PUBLISH to slaves (issue #873).
This commit is contained in:
antirez 2013-06-21 12:07:53 +02:00
parent b0c2cdd6a7
commit 515a26bbc1
4 changed files with 34 additions and 5 deletions

View File

@ -307,6 +307,7 @@ void punsubscribeCommand(redisClient *c) {
void publishCommand(redisClient *c) { void publishCommand(redisClient *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
addReplyLongLong(c,receivers); addReplyLongLong(c,receivers);
} }

View File

@ -239,7 +239,7 @@ struct redisCommand redisCommandTable[] = {
{"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
{"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0}, {"publish",publishCommand,3,"pltr",0,NULL,0,0,0,0,0},
{"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0}, {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
@ -1528,7 +1528,6 @@ void populateCommandTable(void) {
case 'm': c->flags |= REDIS_CMD_DENYOOM; break; case 'm': c->flags |= REDIS_CMD_DENYOOM; break;
case 'a': c->flags |= REDIS_CMD_ADMIN; break; case 'a': c->flags |= REDIS_CMD_ADMIN; break;
case 'p': c->flags |= REDIS_CMD_PUBSUB; break; case 'p': c->flags |= REDIS_CMD_PUBSUB; break;
case 'f': c->flags |= REDIS_CMD_FORCE_REPLICATION; break;
case 's': c->flags |= REDIS_CMD_NOSCRIPT; break; case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
case 'R': c->flags |= REDIS_CMD_RANDOM; break; case 'R': c->flags |= REDIS_CMD_RANDOM; break;
case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break; case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
@ -1652,9 +1651,18 @@ void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target); redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
} }
/* It is possible to call the function forceCommandPropagation() inside a
* Redis command implementaiton in order to to force the propagation of a
* specific command execution into AOF / Replication. */
void forceCommandPropagation(redisClient *c, int flags) {
if (flags & REDIS_PROPAGATE_REPL) c->flags |= REDIS_FORCE_REPL;
if (flags & REDIS_PROPAGATE_AOF) c->flags |= REDIS_FORCE_AOF;
}
/* Call() is the core of Redis execution of a command */ /* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) { void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration; long long dirty, start = ustime(), duration;
int client_old_flags = c->flags;
/* Sent the command to clients in MONITOR mode, only if the commands are /* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */ * not generated from reading an AOF. */
@ -1666,6 +1674,7 @@ void call(redisClient *c, int flags) {
} }
/* Call the command. */ /* Call the command. */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagate); redisOpArrayInit(&server.also_propagate);
dirty = server.dirty; dirty = server.dirty;
c->cmd->proc(c); c->cmd->proc(c);
@ -1677,6 +1686,16 @@ void call(redisClient *c, int flags) {
if (server.loading && c->flags & REDIS_LUA_CLIENT) if (server.loading && c->flags & REDIS_LUA_CLIENT)
flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
/* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the
* propagation. */
if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {
if (c->flags & REDIS_FORCE_REPL)
server.lua_caller->flags |= REDIS_FORCE_REPL;
if (c->flags & REDIS_FORCE_AOF)
server.lua_caller->flags |= REDIS_FORCE_AOF;
}
/* Log the command into the Slow log if needed, and populate the /* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */ * per-command statistics that we show in INFO commandstats. */
if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand)
@ -1690,14 +1709,19 @@ void call(redisClient *c, int flags) {
if (flags & REDIS_CALL_PROPAGATE) { if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE; int flags = REDIS_PROPAGATE_NONE;
if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
flags |= REDIS_PROPAGATE_REPL; if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
if (dirty) if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE) if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags); propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
} }
/* Restore the old FORCE_AOF/REPL flags, since call can be executed
* recursively. */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);
/* Handle the alsoPropagate() API to handle commands that want to propagate /* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. */ * multiple separated commands. */
if (server.also_propagate.numops) { if (server.also_propagate.numops) {

View File

@ -139,7 +139,7 @@
#define REDIS_CMD_WRITE 1 /* "w" flag */ #define REDIS_CMD_WRITE 1 /* "w" flag */
#define REDIS_CMD_READONLY 2 /* "r" flag */ #define REDIS_CMD_READONLY 2 /* "r" flag */
#define REDIS_CMD_DENYOOM 4 /* "m" flag */ #define REDIS_CMD_DENYOOM 4 /* "m" flag */
#define REDIS_CMD_FORCE_REPLICATION 8 /* "f" flag */ #define REDIS_CMD_NOT_USED_1 8 /* no longer used flag */
#define REDIS_CMD_ADMIN 16 /* "a" flag */ #define REDIS_CMD_ADMIN 16 /* "a" flag */
#define REDIS_CMD_PUBSUB 32 /* "p" flag */ #define REDIS_CMD_PUBSUB 32 /* "p" flag */
#define REDIS_CMD_NOSCRIPT 64 /* "s" flag */ #define REDIS_CMD_NOSCRIPT 64 /* "s" flag */
@ -217,6 +217,8 @@
#define REDIS_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */ #define REDIS_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
#define REDIS_DIRTY_EXEC (1<<12) /* EXEC will fail for errors while queueing */ #define REDIS_DIRTY_EXEC (1<<12) /* EXEC will fail for errors while queueing */
#define REDIS_MASTER_FORCE_REPLY (1<<13) /* Queue replies even if is master */ #define REDIS_MASTER_FORCE_REPLY (1<<13) /* Queue replies even if is master */
#define REDIS_FORCE_AOF (1<<14) /* Force AOF propagation of current cmd. */
#define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */
/* Client request types */ /* Client request types */
#define REDIS_REQ_INLINE 1 #define REDIS_REQ_INLINE 1
@ -1211,6 +1213,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name);
void call(redisClient *c, int flags); void call(redisClient *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags); void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target); void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
void forceCommandPropagation(redisClient *c, int flags);
int prepareForShutdown(); int prepareForShutdown();
#ifdef __GNUC__ #ifdef __GNUC__
void redisLog(int level, const char *fmt, ...) void redisLog(int level, const char *fmt, ...)

View File

@ -1042,6 +1042,7 @@ void scriptCommand(redisClient *c) {
} }
addReplyBulkCBuffer(c,funcname+2,40); addReplyBulkCBuffer(c,funcname+2,40);
sdsfree(sha); sdsfree(sha);
forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) { } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) {
if (server.lua_caller == NULL) { if (server.lua_caller == NULL) {
addReplySds(c,sdsnew("-NOTBUSY No scripts in execution right now.\r\n")); addReplySds(c,sdsnew("-NOTBUSY No scripts in execution right now.\r\n"));