2022-11-01 22:26:44 -04:00
|
|
|
proc get_links_with_peer {this_instance_id peer_nodename} {
|
2022-11-27 12:16:16 -05:00
|
|
|
set links [R $this_instance_id cluster links]
|
2022-11-01 22:26:44 -04:00
|
|
|
set links_with_peer {}
|
|
|
|
foreach l $links {
|
|
|
|
if {[dict get $l node] eq $peer_nodename} {
|
|
|
|
lappend links_with_peer $l
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return $links_with_peer
|
|
|
|
}
|
|
|
|
|
|
|
|
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
|
|
|
|
# corresponds to the link established toward a peer identified by `peer_nodename`
|
|
|
|
proc get_link_to_peer {this_instance_id peer_nodename} {
|
|
|
|
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
|
|
|
|
foreach l $links_with_peer {
|
2022-11-27 12:16:16 -05:00
|
|
|
if {[dict get $l direction] eq "to"} {
|
2022-11-01 22:26:44 -04:00
|
|
|
return $l
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return {}
|
|
|
|
}
|
|
|
|
|
|
|
|
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
|
|
|
|
# corresponds to the link accepted from a peer identified by `peer_nodename`
|
|
|
|
proc get_link_from_peer {this_instance_id peer_nodename} {
|
|
|
|
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
|
|
|
|
foreach l $links_with_peer {
|
2022-11-27 12:16:16 -05:00
|
|
|
if {[dict get $l direction] eq "from"} {
|
2022-11-01 22:26:44 -04:00
|
|
|
return $l
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return {}
|
|
|
|
}
|
|
|
|
|
|
|
|
# Reset cluster links to their original state
|
|
|
|
proc reset_links {id} {
|
|
|
|
set limit [lindex [R $id CONFIG get cluster-link-sendbuf-limit] 1]
|
|
|
|
|
|
|
|
# Set a 1 byte limit and wait for cluster cron to run
|
|
|
|
# (executes every 100ms) and terminate links
|
|
|
|
R $id CONFIG SET cluster-link-sendbuf-limit 1
|
|
|
|
after 150
|
|
|
|
|
|
|
|
# Reset limit
|
|
|
|
R $id CONFIG SET cluster-link-sendbuf-limit $limit
|
|
|
|
|
|
|
|
# Wait until the cluster links come back up for each node
|
|
|
|
wait_for_condition 50 100 {
|
|
|
|
[number_of_links $id] == [expr [number_of_peers $id] * 2]
|
|
|
|
} else {
|
|
|
|
fail "Cluster links did not come back up"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
proc number_of_peers {id} {
|
|
|
|
expr [llength $::servers] - 1
|
|
|
|
}
|
|
|
|
|
|
|
|
proc number_of_links {id} {
|
2022-11-27 12:16:16 -05:00
|
|
|
llength [R $id cluster links]
|
2022-11-01 22:26:44 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
proc publish_messages {server num_msgs msg_size} {
|
|
|
|
for {set i 0} {$i < $num_msgs} {incr i} {
|
|
|
|
$server PUBLISH channel [string repeat "x" $msg_size]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-02 12:06:24 -05:00
|
|
|
start_cluster 1 2 {tags {external:skip cluster}} {
|
|
|
|
set primary_id 0
|
|
|
|
set replica1_id 1
|
|
|
|
|
|
|
|
set primary [Rn $primary_id]
|
|
|
|
set replica1 [Rn $replica1_id]
|
|
|
|
|
|
|
|
test "Broadcast message across a cluster shard while a cluster link is down" {
|
|
|
|
set replica1_node_id [$replica1 CLUSTER MYID]
|
|
|
|
|
|
|
|
set channelname ch3
|
|
|
|
|
|
|
|
# subscribe on replica1
|
|
|
|
set subscribeclient1 [redis_deferring_client -1]
|
|
|
|
$subscribeclient1 deferred 1
|
|
|
|
$subscribeclient1 SSUBSCRIBE $channelname
|
|
|
|
$subscribeclient1 read
|
|
|
|
|
|
|
|
# subscribe on replica2
|
|
|
|
set subscribeclient2 [redis_deferring_client -2]
|
|
|
|
$subscribeclient2 deferred 1
|
|
|
|
$subscribeclient2 SSUBSCRIBE $channelname
|
|
|
|
$subscribeclient2 read
|
|
|
|
|
|
|
|
# Verify number of links with cluster stable state
|
|
|
|
assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id]
|
|
|
|
|
|
|
|
# Disconnect the cluster between primary and replica1 and publish a message.
|
|
|
|
$primary MULTI
|
|
|
|
$primary DEBUG CLUSTERLINK KILL TO $replica1_node_id
|
|
|
|
$primary SPUBLISH $channelname hello
|
|
|
|
set res [$primary EXEC]
|
|
|
|
|
|
|
|
# Verify no client exists on the primary to receive the published message.
|
|
|
|
assert_equal $res {OK 0}
|
|
|
|
|
|
|
|
# Wait for all the cluster links are healthy
|
|
|
|
wait_for_condition 50 100 {
|
|
|
|
[number_of_peers $primary_id]*2 == [number_of_links $primary_id]
|
|
|
|
} else {
|
|
|
|
fail "All peer links couldn't be established"
|
|
|
|
}
|
|
|
|
|
|
|
|
# Publish a message afterwards.
|
|
|
|
$primary SPUBLISH $channelname world
|
|
|
|
|
|
|
|
# Verify replica1 has received only (world) / hello is lost.
|
|
|
|
assert_equal "smessage ch3 world" [$subscribeclient1 read]
|
|
|
|
|
|
|
|
# Verify replica2 has received both messages (hello/world)
|
|
|
|
assert_equal "smessage ch3 hello" [$subscribeclient2 read]
|
|
|
|
assert_equal "smessage ch3 world" [$subscribeclient2 read]
|
|
|
|
} {} {needs:debug}
|
|
|
|
}
|
|
|
|
|
2022-11-01 22:26:44 -04:00
|
|
|
start_cluster 3 0 {tags {external:skip cluster}} {
|
|
|
|
test "Each node has two links with each peer" {
|
|
|
|
for {set id 0} {$id < [llength $::servers]} {incr id} {
|
|
|
|
# Assert that from point of view of each node, there are two links for
|
|
|
|
# each peer. It might take a while for cluster to stabilize so wait up
|
|
|
|
# to 5 seconds.
|
|
|
|
wait_for_condition 50 100 {
|
|
|
|
[number_of_peers $id]*2 == [number_of_links $id]
|
|
|
|
} else {
|
|
|
|
assert_equal [expr [number_of_peers $id]*2] [number_of_links $id]
|
|
|
|
}
|
|
|
|
|
|
|
|
set nodes [get_cluster_nodes $id]
|
2022-11-27 12:16:16 -05:00
|
|
|
set links [R $id cluster links]
|
2022-11-01 22:26:44 -04:00
|
|
|
|
|
|
|
# For each peer there should be exactly one
|
|
|
|
# link "to" it and one link "from" it.
|
|
|
|
foreach n $nodes {
|
|
|
|
if {[cluster_has_flag $n myself]} continue
|
|
|
|
set peer [dict get $n id]
|
|
|
|
set to 0
|
|
|
|
set from 0
|
|
|
|
foreach l $links {
|
|
|
|
if {[dict get $l node] eq $peer} {
|
2022-11-27 12:16:16 -05:00
|
|
|
if {[dict get $l direction] eq "to"} {
|
2022-11-01 22:26:44 -04:00
|
|
|
incr to
|
2022-11-27 12:16:16 -05:00
|
|
|
} elseif {[dict get $l direction] eq "from"} {
|
2022-11-01 22:26:44 -04:00
|
|
|
incr from
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert {$to eq 1}
|
|
|
|
assert {$from eq 1}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-27 12:16:16 -05:00
|
|
|
test {Validate cluster links format} {
|
|
|
|
set lines [R 0 cluster links]
|
|
|
|
foreach l $lines {
|
|
|
|
if {$l eq {}} continue
|
|
|
|
assert_equal [llength $l] 12
|
|
|
|
assert_equal 1 [dict exists $l "direction"]
|
|
|
|
assert_equal 1 [dict exists $l "node"]
|
|
|
|
assert_equal 1 [dict exists $l "create-time"]
|
|
|
|
assert_equal 1 [dict exists $l "events"]
|
|
|
|
assert_equal 1 [dict exists $l "send-buffer-allocated"]
|
|
|
|
assert_equal 1 [dict exists $l "send-buffer-used"]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-01 22:26:44 -04:00
|
|
|
set primary1_id 0
|
|
|
|
set primary2_id 1
|
|
|
|
|
|
|
|
set primary1 [Rn $primary1_id]
|
|
|
|
set primary2 [Rn $primary2_id]
|
|
|
|
|
|
|
|
test "Disconnect link when send buffer limit reached" {
|
|
|
|
# On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts
|
|
|
|
set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1]
|
|
|
|
$primary1 CONFIG set cluster-node-timeout [expr 60*60*1000]
|
|
|
|
|
|
|
|
# Get primary1's links with primary2
|
|
|
|
set primary2_name [dict get [cluster_get_myself $primary2_id] id]
|
|
|
|
set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
|
|
|
|
set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
|
|
|
|
|
|
|
|
# On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be
|
|
|
|
# overflowed by regular gossip messages but also small enough that it doesn't take too much
|
|
|
|
# memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this
|
|
|
|
# limit is overflowed in some RAM-limited test environments.
|
|
|
|
set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1]
|
|
|
|
$primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024]
|
|
|
|
assert {[CI $primary1_id total_cluster_links_buffer_limit_exceeded] eq 0}
|
|
|
|
|
|
|
|
# To manufacture an ever-growing send buffer from primary1 to primary2,
|
|
|
|
# make primary2 unresponsive.
|
|
|
|
set primary2_pid [srv [expr -1*$primary2_id] pid]
|
|
|
|
exec kill -SIGSTOP $primary2_pid
|
|
|
|
|
|
|
|
# On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from
|
|
|
|
# primary1 to primary2 exceeds buffer limit therefore be dropped.
|
|
|
|
# For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP
|
|
|
|
# receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB
|
|
|
|
# messages should be sufficient.
|
|
|
|
set i 0
|
|
|
|
wait_for_condition 100 0 {
|
|
|
|
[catch {incr i} e] == 0 &&
|
|
|
|
[catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 &&
|
|
|
|
[catch {after 500} e] == 0 &&
|
|
|
|
[CI $primary1_id total_cluster_links_buffer_limit_exceeded] >= 1
|
|
|
|
} else {
|
|
|
|
fail "Cluster link not freed as expected"
|
|
|
|
}
|
|
|
|
|
|
|
|
# A new link to primary2 should have been recreated
|
|
|
|
set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
|
2022-11-27 12:16:16 -05:00
|
|
|
assert {[dict get $new_link_p1_to_p2 create-time] > [dict get $orig_link_p1_to_p2 create-time]}
|
2022-11-01 22:26:44 -04:00
|
|
|
|
|
|
|
# Link from primary2 should not be affected
|
|
|
|
set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
|
2022-11-27 12:16:16 -05:00
|
|
|
assert {[dict get $same_link_p1_from_p2 create-time] eq [dict get $orig_link_p1_from_p2 create-time]}
|
2022-11-01 22:26:44 -04:00
|
|
|
|
|
|
|
# Revive primary2
|
|
|
|
exec kill -SIGCONT $primary2_pid
|
|
|
|
|
|
|
|
# Reset configs on primary1 so config changes don't leak out to other tests
|
|
|
|
$primary1 CONFIG set cluster-node-timeout $oldtimeout
|
|
|
|
$primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit
|
|
|
|
|
|
|
|
reset_links $primary1_id
|
|
|
|
}
|
|
|
|
|
|
|
|
test "Link memory increases with publishes" {
|
|
|
|
set server_id 0
|
|
|
|
set server [Rn $server_id]
|
|
|
|
set msg_size 10000
|
|
|
|
set num_msgs 10
|
|
|
|
|
|
|
|
# Remove any sendbuf limit
|
|
|
|
$primary1 CONFIG set cluster-link-sendbuf-limit 0
|
|
|
|
|
|
|
|
# Publish ~100KB to one of the servers
|
|
|
|
$server MULTI
|
|
|
|
$server INFO memory
|
|
|
|
publish_messages $server $num_msgs $msg_size
|
|
|
|
$server INFO memory
|
|
|
|
set res [$server EXEC]
|
|
|
|
|
|
|
|
set link_mem_before_pubs [getInfoProperty $res mem_cluster_links]
|
|
|
|
|
|
|
|
# Remove the first half of the response string which contains the
|
|
|
|
# first "INFO memory" results and search for the property again
|
|
|
|
set res [string range $res [expr [string length $res] / 2] end]
|
|
|
|
set link_mem_after_pubs [getInfoProperty $res mem_cluster_links]
|
|
|
|
|
|
|
|
# We expect the memory to have increased by more than
|
|
|
|
# the culmulative size of the publish messages
|
|
|
|
set mem_diff_floor [expr $msg_size * $num_msgs]
|
|
|
|
set mem_diff [expr $link_mem_after_pubs - $link_mem_before_pubs]
|
|
|
|
assert {$mem_diff > $mem_diff_floor}
|
|
|
|
|
|
|
|
# Reset links to ensure no leftover data for the next test
|
|
|
|
reset_links $server_id
|
|
|
|
}
|
|
|
|
|
|
|
|
test "Link memory resets after publish messages flush" {
|
|
|
|
set server [Rn 0]
|
|
|
|
set msg_size 100000
|
|
|
|
set num_msgs 10
|
|
|
|
|
|
|
|
set link_mem_before [status $server mem_cluster_links]
|
|
|
|
|
|
|
|
# Publish ~1MB to one of the servers
|
|
|
|
$server MULTI
|
|
|
|
publish_messages $server $num_msgs $msg_size
|
|
|
|
$server EXEC
|
|
|
|
|
|
|
|
# Wait until the cluster link memory has returned to below the pre-publish value.
|
|
|
|
# We can't guarantee it returns to the exact same value since gossip messages
|
|
|
|
# can cause the values to fluctuate.
|
|
|
|
wait_for_condition 1000 500 {
|
|
|
|
[status $server mem_cluster_links] <= $link_mem_before
|
|
|
|
} else {
|
|
|
|
fail "Cluster link memory did not settle back to expected range"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|