Cluster refactor, common API for different implementations (#12742)

This PR reworks the clustering code to support multiple clustering
implementations, specifically, the current "legacy" clustering
implementation or, although not part of this PR, flotilla (see #10875).
Which implementation is used could be a compile-time flag (will be added
later). Legacy clustering functionality remains unchanged.

The basic idea is as follows. The header cluster.h now contains function
declarations that define the "Cluster API." These are the contract and
interface between any clustering implementation and the rest of the
Redis source code. Some of the function definitions are shared between
all clustering implementations. These functions are in cluster.c. The
functions and data structures specific to legacy clustering are in
cluster-legacy.c/h. One consequence of this is that the structs
clusterNode and clusterState which were previously "public" to the rest
of Redis are now hidden behind the Cluster API.

The PR is divided up into commits, each with a commit message explaining
the changes. some are just mass rename or moving code between files (may
not require close inspection / review), others are manual changes.

One other, related change is:
- The "failover" command is now plugged into the Cluster API so that the
clustering implementation can (a) enable/disable the command to begin
with and if enabled (b) perform the actual failover. The "failover"
command remains disabled for legacy clustering.
This commit is contained in:
Oran Agra 2023-11-22 09:31:19 +02:00 committed by GitHub
commit 58cb302526
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 7617 additions and 7373 deletions

View File

@ -345,7 +345,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@
#define __CLUSTER_H
/*-----------------------------------------------------------------------------
* Redis cluster data structures, defines, exported API.
* Redis cluster exported API.
*----------------------------------------------------------------------------*/
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
@ -11,15 +11,6 @@
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
#define CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
/* The following defines are amount of time, sometimes expressed as
* multiplicators of the node timeout value (when ending with MULT). */
#define CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 /* Fail report validity. */
#define CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */
#define CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */
#define CLUSTER_MF_PAUSE_MULT 2 /* Master pause manual failover mult. */
#define CLUSTER_SLAVE_MIGRATION_DELAY 5000 /* Delay for slave migration. */
/* Redirection errors returned by getNodeByQuery(). */
#define CLUSTER_REDIR_NONE 0 /* Node can serve the request. */
@ -31,77 +22,8 @@
#define CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */
#define CLUSTER_REDIR_DOWN_RO_STATE 7 /* -CLUSTERDOWN, allow reads. */
struct clusterNode;
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
connection *conn; /* Connection to remote node */
list *send_msg_queue; /* List of messages to be sent */
size_t head_msg_send_offset; /* Number of bytes already sent of message at head of queue */
unsigned long long send_msg_queue_mem; /* Memory in bytes used by message queue */
char *rcvbuf; /* Packet reception buffer */
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
int inbound; /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;
/* Cluster node flags and macros. */
#define CLUSTER_NODE_MASTER 1 /* The node is a master */
#define CLUSTER_NODE_SLAVE 2 /* The node is a slave */
#define CLUSTER_NODE_PFAIL 4 /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL 8 /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF 16 /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR 64 /* We don't know the address of this node */
#define CLUSTER_NODE_MEET 128 /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO 256 /* Master eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
#define nodeWithoutAddr(n) ((n)->flags & CLUSTER_NODE_NOADDR)
#define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL)
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
/* Reasons why a slave is not able to failover. */
#define CLUSTER_CANT_FAILOVER_NONE 0
#define CLUSTER_CANT_FAILOVER_DATA_AGE 1
#define CLUSTER_CANT_FAILOVER_WAITING_DELAY 2
#define CLUSTER_CANT_FAILOVER_EXPIRED 3
#define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4
#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD (10) /* seconds. */
/* clusterState todo_before_sleep flags. */
#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0)
#define CLUSTER_TODO_UPDATE_STATE (1<<1)
#define CLUSTER_TODO_SAVE_CONFIG (1<<2)
#define CLUSTER_TODO_FSYNC_CONFIG (1<<3)
#define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1<<4)
/* Message types.
*
* Note that the PING, PONG and MEET messages are actually the same exact
* kind of packet. PONG is the reply to ping, in the exact format as a PING,
* while MEET is a special PING that forces the receiver to add the sender
* as a node (if it is not already in the list). */
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */
#define CLUSTERMSG_TYPE_PUBLISHSHARD 10 /* Pub/Sub Publish shard propagation */
#define CLUSTERMSG_TYPE_COUNT 11 /* Total number of message types. */
typedef struct _clusterNode clusterNode;
struct clusterState;
/* Flags that a module can set in order to prevent certain Redis Cluster
* features to be enabled. Useful when implementing a different distributed
@ -110,312 +32,87 @@ typedef struct clusterLink {
#define CLUSTER_MODULE_FLAG_NO_FAILOVER (1<<1)
#define CLUSTER_MODULE_FLAG_NO_REDIRECTION (1<<2)
/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
struct clusterNode *node; /* Node reporting the failure condition. */
mstime_t time; /* Time of the last report from this node. */
} clusterNodeFailReport;
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds hostname; /* The known hostname for this node */
sds human_nodename; /* The known human readable nodename for this node */
int tcp_port; /* Latest known clients TCP port. */
int tls_port; /* Latest known clients TLS port */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
typedef struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */
dict *shards; /* Hash table of shard_id -> list (of nodes) structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
rax *slots_to_channels;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF
or -1 if still not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The following fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Stats */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
/* Bit map for slots that are no longer claimed by the owner in cluster PING
* messages. During slot migration, the owner will stop claiming the slot after
* the ownership transfer. Set the bit corresponding to the slot when a node
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
} clusterState;
/* Redis cluster messages header */
/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */
typedef struct {
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* primary port last time it was seen */
uint16_t cport; /* cluster port last time it was seen */
uint16_t flags; /* node->flags copy */
uint16_t pport; /* secondary port last time it was seen */
uint16_t notused1;
} clusterMsgDataGossip;
typedef struct {
char nodename[CLUSTER_NAMELEN];
} clusterMsgDataFail;
typedef struct {
uint32_t channel_len;
uint32_t message_len;
unsigned char bulk_data[8]; /* 8 bytes just as placeholder. */
} clusterMsgDataPublish;
typedef struct {
uint64_t configEpoch; /* Config epoch of the specified instance. */
char nodename[CLUSTER_NAMELEN]; /* Name of the slots owner. */
unsigned char slots[CLUSTER_SLOTS/8]; /* Slots bitmap. */
} clusterMsgDataUpdate;
typedef struct {
uint64_t module_id; /* ID of the sender module. */
uint32_t len; /* ID of the sender module. */
uint8_t type; /* Type from 0 to 255. */
unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */
} clusterMsgModule;
/* The cluster supports optional extension messages that can be sent
* along with ping/pong/meet messages to give additional info in a
* consistent manner. */
typedef enum {
CLUSTERMSG_EXT_TYPE_HOSTNAME,
CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME,
CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE,
CLUSTERMSG_EXT_TYPE_SHARDID,
} clusterMsgPingtypes;
/* Helper function for making sure extensions are eight byte aligned. */
#define EIGHT_BYTE_ALIGN(size) ((((size) + 7) / 8) * 8)
typedef struct {
char hostname[1]; /* The announced hostname, ends with \0. */
} clusterMsgPingExtHostname;
typedef struct {
char human_nodename[1]; /* The announced nodename, ends with \0. */
} clusterMsgPingExtHumanNodename;
typedef struct {
char name[CLUSTER_NAMELEN]; /* Node name. */
uint64_t ttl; /* Remaining time to blacklist the node, in seconds. */
} clusterMsgPingExtForgottenNode;
static_assert(sizeof(clusterMsgPingExtForgottenNode) % 8 == 0, "");
typedef struct {
char shard_id[CLUSTER_NAMELEN]; /* The shard_id, 40 bytes fixed. */
} clusterMsgPingExtShardId;
typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
union {
clusterMsgPingExtHostname hostname;
clusterMsgPingExtHumanNodename human_nodename;
clusterMsgPingExtForgottenNode forgotten_node;
clusterMsgPingExtShardId shard_id;
} ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */
} clusterMsgPingExt;
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
/* Extension data that can optionally be sent for ping/meet/pong
* messages. We can't explicitly define them here though, since
* the gossip array isn't the real length of the gossip data. */
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};
#define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 1. */
uint16_t port; /* Primary port number (TCP or TLS). */
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
uint16_t extensions; /* Number of extensions sent along with this packet. */
char notused1[30]; /* 30 bytes reserved for future usage. */
uint16_t pport; /* Secondary port number: if primary port is TCP port, this is
TLS port, and if primary port is TLS port, this is TCP port.*/
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data;
} clusterMsg;
/* clusterMsg defines the gossip wire protocol exchanged among Redis cluster
* members, which can be running different versions of redis-server bits,
* especially during cluster rolling upgrades.
*
* Therefore, fields in this struct should remain at the same offset from
* release to release. The static asserts below ensures that incompatible
* changes in clusterMsg be caught at compile time.
*/
static_assert(offsetof(clusterMsg, sig) == 0, "unexpected field offset");
static_assert(offsetof(clusterMsg, totlen) == 4, "unexpected field offset");
static_assert(offsetof(clusterMsg, ver) == 8, "unexpected field offset");
static_assert(offsetof(clusterMsg, port) == 10, "unexpected field offset");
static_assert(offsetof(clusterMsg, type) == 12, "unexpected field offset");
static_assert(offsetof(clusterMsg, count) == 14, "unexpected field offset");
static_assert(offsetof(clusterMsg, currentEpoch) == 16, "unexpected field offset");
static_assert(offsetof(clusterMsg, configEpoch) == 24, "unexpected field offset");
static_assert(offsetof(clusterMsg, offset) == 32, "unexpected field offset");
static_assert(offsetof(clusterMsg, sender) == 40, "unexpected field offset");
static_assert(offsetof(clusterMsg, myslots) == 80, "unexpected field offset");
static_assert(offsetof(clusterMsg, slaveof) == 2128, "unexpected field offset");
static_assert(offsetof(clusterMsg, myip) == 2168, "unexpected field offset");
static_assert(offsetof(clusterMsg, extensions) == 2214, "unexpected field offset");
static_assert(offsetof(clusterMsg, notused1) == 2216, "unexpected field offset");
static_assert(offsetof(clusterMsg, pport) == 2246, "unexpected field offset");
static_assert(offsetof(clusterMsg, cport) == 2248, "unexpected field offset");
static_assert(offsetof(clusterMsg, flags) == 2250, "unexpected field offset");
static_assert(offsetof(clusterMsg, state) == 2252, "unexpected field offset");
static_assert(offsetof(clusterMsg, mflags) == 2253, "unexpected field offset");
static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset");
#define CLUSTERMSG_MIN_LEN (sizeof(clusterMsg)-sizeof(union clusterMsgData))
/* Message flags better specify the packet content or are used to
* provide some information about the node state. */
#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */
#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if
master is up. */
#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */
/* ---------------------- API exported outside cluster.c -------------------- */
/* functions requiring mechanism specific implementations */
void clusterInit(void);
void clusterInitListeners(void);
void clusterInitLast(void);
void clusterCron(void);
void clusterBeforeSleep(void);
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
int verifyClusterNodeId(const char *name, int length);
int verifyClusterConfigWithData(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
void clusterUpdateMyselfFlags(void);
void clusterUpdateMyselfIp(void);
void clusterUpdateMyselfHostname(void);
void clusterUpdateMyselfAnnouncedPorts(void);
void clusterUpdateMyselfHumanNodename(void);
void slotToChannelAdd(sds channel);
void slotToChannelDel(sds channel);
void clusterPropagatePublish(robj *channel, robj *message, int sharded);
unsigned long getClusterConnectionsCount(void);
int isClusterHealthy(void);
sds clusterGenNodesDescription(client *c, int filter, int tls_primary);
sds genClusterInfoString(void);
/* handle implementation specific debug cluster commands. Return 1 if handled, 0 otherwise. */
int handleDebugClusterCommand(client *c);
const char **clusterDebugCommandExtendedHelp(void);
/* handle implementation specific cluster commands. Return 1 if handled, 0 otherwise. */
int clusterCommandSpecial(client *c);
const char** clusterCommandExtendedHelp(void);
int clusterAllowFailoverCmd(client *c);
void clusterPromoteSelfToMaster(void);
int clusterManualFailoverTimeLimit(void);
void clusterCommandSlots(client * c);
void clusterCommandMyId(client *c);
void clusterCommandMyShardId(client *c);
void clusterCommandShards(client *c);
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);
int clusterNodeCoversSlot(clusterNode *n, int slot);
int getNodeDefaultClientPort(clusterNode *n);
int clusterNodeIsMyself(clusterNode *n);
clusterNode *getMyClusterNode(void);
char *getMyClusterId(void);
int getClusterSize(void);
int handleDebugClusterCommand(client *c);
int clusterNodePending(clusterNode *node);
int clusterNodeIsMaster(clusterNode *n);
char **getClusterNodesList(size_t *numnodes);
int clusterNodeIsMaster(clusterNode *n);
char *clusterNodeIp(clusterNode *node);
int clusterNodeIsSlave(clusterNode *node);
clusterNode *clusterNodeGetSlaveof(clusterNode *node);
char *clusterNodeGetName(clusterNode *node);
int clusterNodeTimedOut(clusterNode *node);
int clusterNodeIsFailing(clusterNode *node);
int clusterNodeIsNoFailover(clusterNode *node);
char *clusterNodeGetShardId(clusterNode *node);
int clusterNodeNumSlaves(clusterNode *node);
clusterNode *clusterNodeGetSlave(clusterNode *node, int slave_idx);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
clusterNode *getNodeBySlot(int slot);
int clusterNodeClientPort(clusterNode *n, int use_tls);
char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
int clusterRedirectBlockedClientIfNeeded(client *c);
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
void migrateCloseTimedoutSockets(void);
int verifyClusterConfigWithData(void);
unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
void clusterPropagatePublish(robj *channel, robj *message, int sharded);
unsigned int keyHashSlot(char *key, int keylen);
int patternHashSlot(char *pattern, int length);
void clusterUpdateMyselfFlags(void);
void clusterUpdateMyselfIp(void);
void slotToChannelAdd(sds channel);
void slotToChannelDel(sds channel);
void clusterUpdateMyselfHostname(void);
void clusterUpdateMyselfAnnouncedPorts(void);
sds clusterGenNodesDescription(client *c, int filter, int tls_primary);
sds genClusterInfoString(void);
void freeClusterLink(clusterLink *link);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
void clusterUpdateMyselfHumanNodename(void);
int isValidAuxString(char *s, unsigned int length);
int getNodeDefaultClientPort(clusterNode *n);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);
#endif /* __CLUSTER_H */

6426
src/cluster_legacy.c Normal file

File diff suppressed because it is too large Load Diff

360
src/cluster_legacy.h Normal file
View File

@ -0,0 +1,360 @@
#ifndef CLUSTER_LEGACY_H
#define CLUSTER_LEGACY_H
#define CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
/* The following defines are amount of time, sometimes expressed as
* multiplicators of the node timeout value (when ending with MULT). */
#define CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 /* Fail report validity. */
#define CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */
#define CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */
#define CLUSTER_MF_PAUSE_MULT 2 /* Master pause manual failover mult. */
#define CLUSTER_SLAVE_MIGRATION_DELAY 5000 /* Delay for slave migration. */
/* Reasons why a slave is not able to failover. */
#define CLUSTER_CANT_FAILOVER_NONE 0
#define CLUSTER_CANT_FAILOVER_DATA_AGE 1
#define CLUSTER_CANT_FAILOVER_WAITING_DELAY 2
#define CLUSTER_CANT_FAILOVER_EXPIRED 3
#define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4
#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD (10) /* seconds. */
/* clusterState todo_before_sleep flags. */
#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0)
#define CLUSTER_TODO_UPDATE_STATE (1<<1)
#define CLUSTER_TODO_SAVE_CONFIG (1<<2)
#define CLUSTER_TODO_FSYNC_CONFIG (1<<3)
#define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1<<4)
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
connection *conn; /* Connection to remote node */
list *send_msg_queue; /* List of messages to be sent */
size_t head_msg_send_offset; /* Number of bytes already sent of message at head of queue */
unsigned long long send_msg_queue_mem; /* Memory in bytes used by message queue */
char *rcvbuf; /* Packet reception buffer */
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
int inbound; /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;
/* Cluster node flags and macros. */
#define CLUSTER_NODE_MASTER 1 /* The node is a master */
#define CLUSTER_NODE_SLAVE 2 /* The node is a slave */
#define CLUSTER_NODE_PFAIL 4 /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL 8 /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF 16 /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR 64 /* We don't know the address of this node */
#define CLUSTER_NODE_MEET 128 /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO 256 /* Master eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
#define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL)
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
clusterNode *node; /* Node reporting the failure condition. */
mstime_t time; /* Time of the last report from this node. */
} clusterNodeFailReport;
/* Redis cluster messages header */
/* Message types.
*
* Note that the PING, PONG and MEET messages are actually the same exact
* kind of packet. PONG is the reply to ping, in the exact format as a PING,
* while MEET is a special PING that forces the receiver to add the sender
* as a node (if it is not already in the list). */
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */
#define CLUSTERMSG_TYPE_PUBLISHSHARD 10 /* Pub/Sub Publish shard propagation */
#define CLUSTERMSG_TYPE_COUNT 11 /* Total number of message types. */
/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */
typedef struct {
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* primary port last time it was seen */
uint16_t cport; /* cluster port last time it was seen */
uint16_t flags; /* node->flags copy */
uint16_t pport; /* secondary port last time it was seen */
uint16_t notused1;
} clusterMsgDataGossip;
typedef struct {
char nodename[CLUSTER_NAMELEN];
} clusterMsgDataFail;
typedef struct {
uint32_t channel_len;
uint32_t message_len;
unsigned char bulk_data[8]; /* 8 bytes just as placeholder. */
} clusterMsgDataPublish;
typedef struct {
uint64_t configEpoch; /* Config epoch of the specified instance. */
char nodename[CLUSTER_NAMELEN]; /* Name of the slots owner. */
unsigned char slots[CLUSTER_SLOTS/8]; /* Slots bitmap. */
} clusterMsgDataUpdate;
typedef struct {
uint64_t module_id; /* ID of the sender module. */
uint32_t len; /* ID of the sender module. */
uint8_t type; /* Type from 0 to 255. */
unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */
} clusterMsgModule;
/* The cluster supports optional extension messages that can be sent
* along with ping/pong/meet messages to give additional info in a
* consistent manner. */
typedef enum {
CLUSTERMSG_EXT_TYPE_HOSTNAME,
CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME,
CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE,
CLUSTERMSG_EXT_TYPE_SHARDID,
} clusterMsgPingtypes;
/* Helper function for making sure extensions are eight byte aligned. */
#define EIGHT_BYTE_ALIGN(size) ((((size) + 7) / 8) * 8)
typedef struct {
char hostname[1]; /* The announced hostname, ends with \0. */
} clusterMsgPingExtHostname;
typedef struct {
char human_nodename[1]; /* The announced nodename, ends with \0. */
} clusterMsgPingExtHumanNodename;
typedef struct {
char name[CLUSTER_NAMELEN]; /* Node name. */
uint64_t ttl; /* Remaining time to blacklist the node, in seconds. */
} clusterMsgPingExtForgottenNode;
static_assert(sizeof(clusterMsgPingExtForgottenNode) % 8 == 0, "");
typedef struct {
char shard_id[CLUSTER_NAMELEN]; /* The shard_id, 40 bytes fixed. */
} clusterMsgPingExtShardId;
typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
union {
clusterMsgPingExtHostname hostname;
clusterMsgPingExtHumanNodename human_nodename;
clusterMsgPingExtForgottenNode forgotten_node;
clusterMsgPingExtShardId shard_id;
} ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */
} clusterMsgPingExt;
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
/* Extension data that can optionally be sent for ping/meet/pong
* messages. We can't explicitly define them here though, since
* the gossip array isn't the real length of the gossip data. */
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};
#define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 1. */
uint16_t port; /* Primary port number (TCP or TLS). */
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
uint16_t extensions; /* Number of extensions sent along with this packet. */
char notused1[30]; /* 30 bytes reserved for future usage. */
uint16_t pport; /* Secondary port number: if primary port is TCP port, this is
TLS port, and if primary port is TLS port, this is TCP port.*/
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data;
} clusterMsg;
/* clusterMsg defines the gossip wire protocol exchanged among Redis cluster
* members, which can be running different versions of redis-server bits,
* especially during cluster rolling upgrades.
*
* Therefore, fields in this struct should remain at the same offset from
* release to release. The static asserts below ensures that incompatible
* changes in clusterMsg be caught at compile time.
*/
static_assert(offsetof(clusterMsg, sig) == 0, "unexpected field offset");
static_assert(offsetof(clusterMsg, totlen) == 4, "unexpected field offset");
static_assert(offsetof(clusterMsg, ver) == 8, "unexpected field offset");
static_assert(offsetof(clusterMsg, port) == 10, "unexpected field offset");
static_assert(offsetof(clusterMsg, type) == 12, "unexpected field offset");
static_assert(offsetof(clusterMsg, count) == 14, "unexpected field offset");
static_assert(offsetof(clusterMsg, currentEpoch) == 16, "unexpected field offset");
static_assert(offsetof(clusterMsg, configEpoch) == 24, "unexpected field offset");
static_assert(offsetof(clusterMsg, offset) == 32, "unexpected field offset");
static_assert(offsetof(clusterMsg, sender) == 40, "unexpected field offset");
static_assert(offsetof(clusterMsg, myslots) == 80, "unexpected field offset");
static_assert(offsetof(clusterMsg, slaveof) == 2128, "unexpected field offset");
static_assert(offsetof(clusterMsg, myip) == 2168, "unexpected field offset");
static_assert(offsetof(clusterMsg, extensions) == 2214, "unexpected field offset");
static_assert(offsetof(clusterMsg, notused1) == 2216, "unexpected field offset");
static_assert(offsetof(clusterMsg, pport) == 2246, "unexpected field offset");
static_assert(offsetof(clusterMsg, cport) == 2248, "unexpected field offset");
static_assert(offsetof(clusterMsg, flags) == 2250, "unexpected field offset");
static_assert(offsetof(clusterMsg, state) == 2252, "unexpected field offset");
static_assert(offsetof(clusterMsg, mflags) == 2253, "unexpected field offset");
static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset");
#define CLUSTERMSG_MIN_LEN (sizeof(clusterMsg)-sizeof(union clusterMsgData))
/* Message flags better specify the packet content or are used to
* provide some information about the node state. */
#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */
#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if
master is up. */
#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */
struct _clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
clusterNode **slaves; /* pointers to slave nodes */
clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds hostname; /* The known hostname for this node */
sds human_nodename; /* The known human readable nodename for this node */
int tcp_port; /* Latest known clients TCP port. */
int tls_port; /* Latest known clients TLS port */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
};
struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */
dict *shards; /* Hash table of shard_id -> list (of nodes) structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
rax *slots_to_channels;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF
or -1 if still not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The following fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Stats */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
/* Bit map for slots that are no longer claimed by the owner in cluster PING
* messages. During slot migration, the owner will stop claiming the slot after
* the ownership transfer. Set the bit corresponding to the slot when a node
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
};
#endif //CLUSTER_LEGACY_H

View File

@ -2197,7 +2197,7 @@ int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_exp
dict *d;
if (server.cluster_enabled) {
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (clusterNodeGetSlotBit(server.cluster->myself, i)) {
if (clusterNodeCoversSlot(getMyClusterNode(), i)) {
/* We don't know exact number of keys that would fall into each slot, but we can approximate it, assuming even distribution. */
if (keyType == DB_MAIN) {
d = db->dict[i];

View File

@ -496,11 +496,9 @@ void debugCommand(client *c) {
" In case RESET is provided the peak reset time will be restored to the default value",
"REPLYBUFFER RESIZING <0|1>",
" Enable or disable the reply buffer resize cron job",
"CLUSTERLINK KILL <to|from|all> <node-id>",
" Kills the link based on the direction to/from (both) with the provided node." ,
NULL
};
addReplyHelp(c, help);
addExtendedReplyHelp(c, help, clusterDebugCommandExtendedHelp());
} else if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
/* Compiler gives warnings about writing to a random address
* e.g "*((char*)-1) = 'x';". As a workaround, we map a read-only area
@ -1023,34 +1021,7 @@ NULL
return;
}
addReply(c, shared.ok);
} else if(!strcasecmp(c->argv[1]->ptr,"CLUSTERLINK") &&
!strcasecmp(c->argv[2]->ptr,"KILL") &&
c->argc == 5) {
if (!server.cluster_enabled) {
addReplyError(c, "Debug option only available for cluster mode enabled setup!");
return;
}
/* Find the node. */
clusterNode *n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr);
return;
}
/* Terminate the link based on the direction or all. */
if (!strcasecmp(c->argv[3]->ptr,"from")) {
freeClusterLink(n->inbound_link);
} else if (!strcasecmp(c->argv[3]->ptr,"to")) {
freeClusterLink(n->link);
} else if (!strcasecmp(c->argv[3]->ptr,"all")) {
freeClusterLink(n->link);
freeClusterLink(n->inbound_link);
} else {
addReplyErrorFormat(c, "Unknown direction %s", (char*) c->argv[3]->ptr);
}
addReply(c,shared.ok);
} else {
} else if(!handleDebugClusterCommand(c)) {
addReplySubcommandSyntaxError(c);
return;
}

View File

@ -6466,7 +6466,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,&error_code) !=
server.cluster->myself)
getMyClusterNode())
{
sds msg = NULL;
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
@ -8917,23 +8917,7 @@ char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) {
UNUSED(ctx);
if (!server.cluster_enabled) return NULL;
size_t count = dictSize(server.cluster->nodes);
char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN);
dictIterator *di = dictGetIterator(server.cluster->nodes);
dictEntry *de;
int j = 0;
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN);
memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN);
j++;
}
*numnodes = j;
ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
* to also get the count argument. */
dictReleaseIterator(di);
return ids;
return getClusterNodesList(numnodes);
}
/* Free the node list obtained with RedisModule_GetClusterNodesList. */
@ -8947,7 +8931,7 @@ void RM_FreeClusterNodesList(char **ids) {
* is disabled. */
const char *RM_GetMyClusterID(void) {
if (!server.cluster_enabled) return NULL;
return server.cluster->myself->name;
return getMyClusterId();
}
/* Return the number of nodes in the cluster, regardless of their state
@ -8956,7 +8940,7 @@ const char *RM_GetMyClusterID(void) {
* cluster mode, zero is returned. */
size_t RM_GetClusterSize(void) {
if (!server.cluster_enabled) return 0;
return dictSize(server.cluster->nodes);
return getClusterSize();
}
/* Populate the specified info for the node having as ID the specified 'id',
@ -8983,20 +8967,19 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
UNUSED(ctx);
clusterNode *node = clusterLookupNode(id, strlen(id));
if (node == NULL ||
node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
if (node == NULL || clusterNodePending(node))
{
return REDISMODULE_ERR;
}
if (ip) redis_strlcpy(ip,node->ip,NET_IP_STR_LEN);
if (ip) redis_strlcpy(ip, clusterNodeIp(node),NET_IP_STR_LEN);
if (master_id) {
/* If the information is not available, the function will set the
* field to zero bytes, so that when the field can't be populated the
* function kinda remains predictable. */
if (node->flags & CLUSTER_NODE_SLAVE && node->slaveof)
memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
if (clusterNodeIsSlave(node) && clusterNodeGetSlaveof(node))
memcpy(master_id, clusterNodeGetName(clusterNodeGetSlaveof(node)) ,REDISMODULE_NODE_ID_LEN);
else
memset(master_id,0,REDISMODULE_NODE_ID_LEN);
}
@ -9006,12 +8989,12 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
* we can provide binary compatibility. */
if (flags) {
*flags = 0;
if (node->flags & CLUSTER_NODE_MYSELF) *flags |= REDISMODULE_NODE_MYSELF;
if (node->flags & CLUSTER_NODE_MASTER) *flags |= REDISMODULE_NODE_MASTER;
if (node->flags & CLUSTER_NODE_SLAVE) *flags |= REDISMODULE_NODE_SLAVE;
if (node->flags & CLUSTER_NODE_PFAIL) *flags |= REDISMODULE_NODE_PFAIL;
if (node->flags & CLUSTER_NODE_FAIL) *flags |= REDISMODULE_NODE_FAIL;
if (node->flags & CLUSTER_NODE_NOFAILOVER) *flags |= REDISMODULE_NODE_NOFAILOVER;
if (clusterNodeIsMyself(node)) *flags |= REDISMODULE_NODE_MYSELF;
if (clusterNodeIsMaster(node)) *flags |= REDISMODULE_NODE_MASTER;
if (clusterNodeIsSlave(node)) *flags |= REDISMODULE_NODE_SLAVE;
if (clusterNodeTimedOut(node)) *flags |= REDISMODULE_NODE_PFAIL;
if (clusterNodeIsFailing(node)) *flags |= REDISMODULE_NODE_FAIL;
if (clusterNodeIsNoFailover(node)) *flags |= REDISMODULE_NODE_NOFAILOVER;
}
return REDISMODULE_OK;
}

View File

@ -1117,14 +1117,18 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
}
}
/* Add an array of C strings as status replies with a heading.
* This function is typically invoked by from commands that support
* subcommands in response to the 'help' subcommand. The help array
* is terminated by NULL sentinel. */
void addReplyHelp(client *c, const char **help) {
/* This function is similar to the addReplyHelp function but adds the
* ability to pass in two arrays of strings. Some commands have
* some additional subcommands based on the specific feature implementation
* Redis is compiled with (currently just clustering). This function allows
* to pass is the common subcommands in `help` and any implementation
* specific subcommands in `extended_help`.
*/
void addExtendedReplyHelp(client *c, const char **help, const char **extended_help) {
sds cmd = sdsnew((char*) c->argv[0]->ptr);
void *blenp = addReplyDeferredLen(c);
int blen = 0;
int idx = 0;
sdstoupper(cmd);
addReplyStatusFormat(c,
@ -1132,6 +1136,10 @@ void addReplyHelp(client *c, const char **help) {
sdsfree(cmd);
while (help[blen]) addReplyStatus(c,help[blen++]);
if (extended_help) {
while (extended_help[idx]) addReplyStatus(c,extended_help[idx++]);
}
blen += idx;
addReplyStatus(c,"HELP");
addReplyStatus(c," Print this help.");
@ -1141,6 +1149,14 @@ void addReplyHelp(client *c, const char **help) {
setDeferredArrayLen(c,blenp,blen);
}
/* Add an array of C strings as status replies with a heading.
* This function is typically invoked by commands that support
* subcommands in response to the 'help' subcommand. The help array
* is terminated by NULL sentinel. */
void addReplyHelp(client *c, const char **help) {
addExtendedReplyHelp(c, help, NULL);
}
/* Add a suggestive error reply.
* This function is typically invoked by from commands that support
* subcommands in response to an unknown subcommand or argument error. */

View File

@ -951,7 +951,11 @@ void syncCommand(client *c) {
}
if (!strcasecmp(c->argv[1]->ptr,server.replid)) {
replicationUnsetMaster();
if (server.cluster_enabled) {
clusterPromoteSelfToMaster();
} else {
replicationUnsetMaster();
}
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,
"MASTER MODE enabled (failover request from '%s')",client);
@ -3774,7 +3778,7 @@ void replicationCron(void) {
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
((server.cluster_enabled &&
server.cluster->mf_end) ||
clusterManualFailoverTimeLimit()) ||
server.failover_end_time) &&
isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA);
@ -4061,12 +4065,10 @@ void abortFailover(const char *err) {
* will attempt forever and must be manually aborted.
*/
void failoverCommand(client *c) {
if (server.cluster_enabled) {
addReplyError(c,"FAILOVER not allowed in cluster mode. "
"Use CLUSTER FAILOVER command instead.");
if (!clusterAllowFailoverCmd(c)) {
return;
}
/* Handle special case for abort */
if ((c->argc == 2) && !strcasecmp(c->argv[1]->ptr,"abort")) {
if (server.failover_state == NO_FAILOVER) {

View File

@ -429,7 +429,7 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or
c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING);
c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING);
int hashslot = -1;
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != server.cluster->myself) {
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != getMyClusterNode()) {
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
*err = sdsnew(
"Script attempted to execute a write command while the "

View File

@ -4037,7 +4037,7 @@ int processCommand(client *c) {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&c->slot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (n == NULL || !clusterNodeIsMyself(n)) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
@ -6838,7 +6838,7 @@ int redisIsSupervised(int mode) {
int iAmMaster(void) {
return ((!server.cluster_enabled && server.masterhost == NULL) ||
(server.cluster_enabled && nodeIsMaster(server.cluster->myself)));
(server.cluster_enabled && clusterNodeIsMaster(getMyClusterNode())));
}
#ifdef REDIS_TEST
@ -7161,7 +7161,7 @@ int main(int argc, char **argv) {
ACLLoadUsersAtStartup();
initListeners();
if (server.cluster_enabled) {
clusterInitListeners();
clusterInitLast();
}
InitServerLast();

View File

@ -738,6 +738,7 @@ struct RedisModuleCtx;
struct moduleLoadQueueEntry;
struct RedisModuleKeyOptCtx;
struct RedisModuleCommand;
struct clusterState;
/* Each module type implementation should export a set of methods in order
* to serialize and deserialize the value in the RDB file, rewrite the AOF
@ -2626,6 +2627,7 @@ void addReplySetLen(client *c, long length);
void addReplyAttributeLen(client *c, long length);
void addReplyPushLen(client *c, long length);
void addReplyHelp(client *c, const char **help);
void addExtendedReplyHelp(client *c, const char **help, const char **extended_help);
void addReplySubcommandSyntaxError(client *c);
void addReplyLoadedModules(client *c);
void copyReplicaOutputBuffer(client *dst, client *src);