mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
TLS: Add missing redis-cli options. (#7456)
* Tests: fix and reintroduce redis-cli tests. These tests have been broken and disabled for 10 years now! * TLS: add remaining redis-cli support. This adds support for the redis-cli --pipe, --rdb and --replica options previously unsupported in --tls mode. * Fix writeConn().
This commit is contained in:
parent
b23e251036
commit
d9f970d8d3
102
src/redis-cli.c
102
src/redis-cli.c
@ -1989,6 +1989,7 @@ static void repl(void) {
|
||||
|
||||
if (argv == NULL) {
|
||||
printf("Invalid argument(s)\n");
|
||||
fflush(stdout);
|
||||
linenoiseFree(line);
|
||||
continue;
|
||||
} else if (argc > 0) {
|
||||
@ -6784,10 +6785,53 @@ void sendCapa() {
|
||||
sendReplconf("capa", "eof");
|
||||
}
|
||||
|
||||
/* Wrapper around hiredis to allow arbitrary reads and writes.
|
||||
*
|
||||
* We piggybacks on top of hiredis to achieve transparent TLS support,
|
||||
* and use its internal buffers so it can co-exist with commands
|
||||
* previously/later issued on the connection.
|
||||
*
|
||||
* Interface is close to enough to read()/write() so things should mostly
|
||||
* work transparently.
|
||||
*/
|
||||
|
||||
/* Write a raw buffer through a redisContext. If we already have something
|
||||
* in the buffer (leftovers from hiredis operations) it will be written
|
||||
* as well.
|
||||
*/
|
||||
static ssize_t writeConn(redisContext *c, const char *buf, size_t buf_len)
|
||||
{
|
||||
int done = 0;
|
||||
|
||||
c->obuf = sdscatlen(c->obuf, buf, buf_len);
|
||||
if (redisBufferWrite(c, &done) == REDIS_ERR) {
|
||||
sdsrange(c->obuf, 0, -(buf_len+1));
|
||||
if (!(c->flags & REDIS_BLOCK))
|
||||
errno = EAGAIN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t left = sdslen(c->obuf);
|
||||
sdsclear(c->obuf);
|
||||
if (!done) {
|
||||
return buf_len - left;
|
||||
}
|
||||
|
||||
return buf_len;
|
||||
}
|
||||
|
||||
/* Read raw bytes through a redisContext. The read operation is not greedy
|
||||
* and may not fill the buffer entirely.
|
||||
*/
|
||||
static ssize_t readConn(redisContext *c, char *buf, size_t len)
|
||||
{
|
||||
return c->funcs->read(c, buf, len);
|
||||
}
|
||||
|
||||
/* Sends SYNC and reads the number of bytes in the payload. Used both by
|
||||
* slaveMode() and getRDB().
|
||||
* returns 0 in case an EOF marker is used. */
|
||||
unsigned long long sendSync(int fd, char *out_eof) {
|
||||
unsigned long long sendSync(redisContext *c, char *out_eof) {
|
||||
/* To start we need to send the SYNC command and return the payload.
|
||||
* The hiredis client lib does not understand this part of the protocol
|
||||
* and we don't want to mess with its buffers, so everything is performed
|
||||
@ -6796,7 +6840,7 @@ unsigned long long sendSync(int fd, char *out_eof) {
|
||||
ssize_t nread;
|
||||
|
||||
/* Send the SYNC command. */
|
||||
if (write(fd,"SYNC\r\n",6) != 6) {
|
||||
if (writeConn(c, "SYNC\r\n", 6) != 6) {
|
||||
fprintf(stderr,"Error writing to master\n");
|
||||
exit(1);
|
||||
}
|
||||
@ -6804,7 +6848,7 @@ unsigned long long sendSync(int fd, char *out_eof) {
|
||||
/* Read $<payload>\r\n, making sure to read just up to "\n" */
|
||||
p = buf;
|
||||
while(1) {
|
||||
nread = read(fd,p,1);
|
||||
nread = readConn(c,p,1);
|
||||
if (nread <= 0) {
|
||||
fprintf(stderr,"Error reading bulk length while SYNCing\n");
|
||||
exit(1);
|
||||
@ -6825,11 +6869,10 @@ unsigned long long sendSync(int fd, char *out_eof) {
|
||||
}
|
||||
|
||||
static void slaveMode(void) {
|
||||
int fd = context->fd;
|
||||
static char eofmark[RDB_EOF_MARK_SIZE];
|
||||
static char lastbytes[RDB_EOF_MARK_SIZE];
|
||||
static int usemark = 0;
|
||||
unsigned long long payload = sendSync(fd, eofmark);
|
||||
unsigned long long payload = sendSync(context,eofmark);
|
||||
char buf[1024];
|
||||
int original_output = config.output;
|
||||
|
||||
@ -6849,7 +6892,7 @@ static void slaveMode(void) {
|
||||
while(payload) {
|
||||
ssize_t nread;
|
||||
|
||||
nread = read(fd,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
|
||||
nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
|
||||
if (nread <= 0) {
|
||||
fprintf(stderr,"Error reading RDB payload while SYNCing\n");
|
||||
exit(1);
|
||||
@ -6892,14 +6935,15 @@ static void slaveMode(void) {
|
||||
/* This function implements --rdb, so it uses the replication protocol in order
|
||||
* to fetch the RDB file from a remote server. */
|
||||
static void getRDB(clusterManagerNode *node) {
|
||||
int s, fd;
|
||||
int fd;
|
||||
redisContext *s;
|
||||
char *filename;
|
||||
if (node != NULL) {
|
||||
assert(node->context);
|
||||
s = node->context->fd;
|
||||
s = node->context;
|
||||
filename = clusterManagerGetNodeRDBFilename(node);
|
||||
} else {
|
||||
s = context->fd;
|
||||
s = context;
|
||||
filename = config.rdb_filename;
|
||||
}
|
||||
static char eofmark[RDB_EOF_MARK_SIZE];
|
||||
@ -6934,7 +6978,7 @@ static void getRDB(clusterManagerNode *node) {
|
||||
while(payload) {
|
||||
ssize_t nread, nwritten;
|
||||
|
||||
nread = read(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
|
||||
nread = readConn(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
|
||||
if (nread <= 0) {
|
||||
fprintf(stderr,"I/O Error reading RDB payload from socket\n");
|
||||
exit(1);
|
||||
@ -6968,7 +7012,7 @@ static void getRDB(clusterManagerNode *node) {
|
||||
} else {
|
||||
fprintf(stderr,"Transfer finished with success.\n");
|
||||
}
|
||||
close(s); /* Close the file descriptor ASAP as fsync() may take time. */
|
||||
redisFree(s); /* Close the file descriptor ASAP as fsync() may take time. */
|
||||
fsync(fd);
|
||||
close(fd);
|
||||
fprintf(stderr,"Transfer finished with success.\n");
|
||||
@ -6985,11 +7029,9 @@ static void getRDB(clusterManagerNode *node) {
|
||||
|
||||
#define PIPEMODE_WRITE_LOOP_MAX_BYTES (128*1024)
|
||||
static void pipeMode(void) {
|
||||
int fd = context->fd;
|
||||
long long errors = 0, replies = 0, obuf_len = 0, obuf_pos = 0;
|
||||
char ibuf[1024*16], obuf[1024*16]; /* Input and output buffers */
|
||||
char obuf[1024*16]; /* Output buffer */
|
||||
char aneterr[ANET_ERR_LEN];
|
||||
redisReader *reader = redisReaderCreate();
|
||||
redisReply *reply;
|
||||
int eof = 0; /* True once we consumed all the standard input. */
|
||||
int done = 0;
|
||||
@ -6999,47 +7041,38 @@ static void pipeMode(void) {
|
||||
srand(time(NULL));
|
||||
|
||||
/* Use non blocking I/O. */
|
||||
if (anetNonBlock(aneterr,fd) == ANET_ERR) {
|
||||
if (anetNonBlock(aneterr,context->fd) == ANET_ERR) {
|
||||
fprintf(stderr, "Can't set the socket in non blocking mode: %s\n",
|
||||
aneterr);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
context->flags &= ~REDIS_BLOCK;
|
||||
|
||||
/* Transfer raw protocol and read replies from the server at the same
|
||||
* time. */
|
||||
while(!done) {
|
||||
int mask = AE_READABLE;
|
||||
|
||||
if (!eof || obuf_len != 0) mask |= AE_WRITABLE;
|
||||
mask = aeWait(fd,mask,1000);
|
||||
mask = aeWait(context->fd,mask,1000);
|
||||
|
||||
/* Handle the readable state: we can read replies from the server. */
|
||||
if (mask & AE_READABLE) {
|
||||
ssize_t nread;
|
||||
int read_error = 0;
|
||||
|
||||
/* Read from socket and feed the hiredis reader. */
|
||||
do {
|
||||
nread = read(fd,ibuf,sizeof(ibuf));
|
||||
if (nread == -1 && errno != EAGAIN && errno != EINTR) {
|
||||
fprintf(stderr, "Error reading from the server: %s\n",
|
||||
strerror(errno));
|
||||
if (!read_error && redisBufferRead(context) == REDIS_ERR) {
|
||||
read_error = 1;
|
||||
break;
|
||||
}
|
||||
if (nread > 0) {
|
||||
redisReaderFeed(reader,ibuf,nread);
|
||||
last_read_time = time(NULL);
|
||||
}
|
||||
} while(nread > 0);
|
||||
|
||||
/* Consume replies. */
|
||||
do {
|
||||
if (redisReaderGetReply(reader,(void**)&reply) == REDIS_ERR) {
|
||||
reply = NULL;
|
||||
if (redisGetReply(context, (void **) &reply) == REDIS_ERR) {
|
||||
fprintf(stderr, "Error reading replies from server\n");
|
||||
exit(1);
|
||||
}
|
||||
if (reply) {
|
||||
last_read_time = time(NULL);
|
||||
if (reply->type == REDIS_REPLY_ERROR) {
|
||||
fprintf(stderr,"%s\n", reply->str);
|
||||
errors++;
|
||||
@ -7072,7 +7105,7 @@ static void pipeMode(void) {
|
||||
while(1) {
|
||||
/* Transfer current buffer to server. */
|
||||
if (obuf_len != 0) {
|
||||
ssize_t nwritten = write(fd,obuf+obuf_pos,obuf_len);
|
||||
ssize_t nwritten = writeConn(context,obuf+obuf_pos,obuf_len);
|
||||
|
||||
if (nwritten == -1) {
|
||||
if (errno != EAGAIN && errno != EINTR) {
|
||||
@ -7088,6 +7121,10 @@ static void pipeMode(void) {
|
||||
loop_nwritten += nwritten;
|
||||
if (obuf_len != 0) break; /* Can't accept more data. */
|
||||
}
|
||||
if (context->err) {
|
||||
fprintf(stderr, "Server I/O Error: %s\n", context->errstr);
|
||||
exit(1);
|
||||
}
|
||||
/* If buffer is empty, load from stdin. */
|
||||
if (obuf_len == 0 && !eof) {
|
||||
ssize_t nread = read(STDIN_FILENO,obuf,sizeof(obuf));
|
||||
@ -7138,7 +7175,6 @@ static void pipeMode(void) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
redisReaderFree(reader);
|
||||
printf("errors: %lld, replies: %lld\n", errors, replies);
|
||||
if (errors)
|
||||
exit(1);
|
||||
|
@ -1,14 +1,13 @@
|
||||
source tests/support/cli.tcl
|
||||
|
||||
start_server {tags {"cli"}} {
|
||||
proc open_cli {} {
|
||||
proc open_cli {{opts "-n 9"}} {
|
||||
set ::env(TERM) dumb
|
||||
set cmdline [rediscli [srv port] "-n 9"]
|
||||
set cmdline [rediscli [srv port] $opts]
|
||||
set fd [open "|$cmdline" "r+"]
|
||||
fconfigure $fd -buffering none
|
||||
fconfigure $fd -blocking false
|
||||
fconfigure $fd -translation binary
|
||||
assert_equal "redis> " [read_cli $fd]
|
||||
set _ $fd
|
||||
}
|
||||
|
||||
@ -32,11 +31,14 @@ start_server {tags {"cli"}} {
|
||||
}
|
||||
|
||||
# Helpers to run tests in interactive mode
|
||||
|
||||
proc format_output {output} {
|
||||
set _ [string trimright [regsub -all "\r" $output ""] "\n"]
|
||||
}
|
||||
|
||||
proc run_command {fd cmd} {
|
||||
write_cli $fd $cmd
|
||||
set lines [split [read_cli $fd] "\n"]
|
||||
assert_equal "redis> " [lindex $lines end]
|
||||
join [lrange $lines 0 end-1] "\n"
|
||||
set _ [format_output [read_cli $fd]]
|
||||
}
|
||||
|
||||
proc test_interactive_cli {name code} {
|
||||
@ -58,7 +60,7 @@ start_server {tags {"cli"}} {
|
||||
|
||||
proc _run_cli {opts args} {
|
||||
set cmd [rediscli [srv port] [list -n 9 {*}$args]]
|
||||
foreach {key value} $args {
|
||||
foreach {key value} $opts {
|
||||
if {$key eq "pipe"} {
|
||||
set cmd "sh -c \"$value | $cmd\""
|
||||
}
|
||||
@ -72,7 +74,7 @@ start_server {tags {"cli"}} {
|
||||
fconfigure $fd -translation binary
|
||||
set resp [read $fd 1048576]
|
||||
close $fd
|
||||
set _ $resp
|
||||
set _ [format_output $resp]
|
||||
}
|
||||
|
||||
proc run_cli {args} {
|
||||
@ -80,11 +82,11 @@ start_server {tags {"cli"}} {
|
||||
}
|
||||
|
||||
proc run_cli_with_input_pipe {cmd args} {
|
||||
_run_cli [list pipe $cmd] {*}$args
|
||||
_run_cli [list pipe $cmd] -x {*}$args
|
||||
}
|
||||
|
||||
proc run_cli_with_input_file {path args} {
|
||||
_run_cli [list path $path] {*}$args
|
||||
_run_cli [list path $path] -x {*}$args
|
||||
}
|
||||
|
||||
proc test_nontty_cli {name code} {
|
||||
@ -101,7 +103,7 @@ start_server {tags {"cli"}} {
|
||||
test_interactive_cli "INFO response should be printed raw" {
|
||||
set lines [split [run_command $fd info] "\n"]
|
||||
foreach line $lines {
|
||||
assert [regexp {^[a-z0-9_]+:[a-z0-9_]+} $line]
|
||||
assert [regexp {^$|^#|^[a-z0-9_]+:.+} $line]
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,7 +123,7 @@ start_server {tags {"cli"}} {
|
||||
test_interactive_cli "Multi-bulk reply" {
|
||||
r rpush list foo
|
||||
r rpush list bar
|
||||
assert_equal "1. \"foo\"\n2. \"bar\"" [run_command $fd "lrange list 0 -1"]
|
||||
assert_equal "1) \"foo\"\n2) \"bar\"" [run_command $fd "lrange list 0 -1"]
|
||||
}
|
||||
|
||||
test_interactive_cli "Parsing quotes" {
|
||||
@ -144,35 +146,35 @@ start_server {tags {"cli"}} {
|
||||
}
|
||||
|
||||
test_tty_cli "Status reply" {
|
||||
assert_equal "OK\n" [run_cli set key bar]
|
||||
assert_equal "OK" [run_cli set key bar]
|
||||
assert_equal "bar" [r get key]
|
||||
}
|
||||
|
||||
test_tty_cli "Integer reply" {
|
||||
r del counter
|
||||
assert_equal "(integer) 1\n" [run_cli incr counter]
|
||||
assert_equal "(integer) 1" [run_cli incr counter]
|
||||
}
|
||||
|
||||
test_tty_cli "Bulk reply" {
|
||||
r set key "tab\tnewline\n"
|
||||
assert_equal "\"tab\\tnewline\\n\"\n" [run_cli get key]
|
||||
assert_equal "\"tab\\tnewline\\n\"" [run_cli get key]
|
||||
}
|
||||
|
||||
test_tty_cli "Multi-bulk reply" {
|
||||
r del list
|
||||
r rpush list foo
|
||||
r rpush list bar
|
||||
assert_equal "1. \"foo\"\n2. \"bar\"\n" [run_cli lrange list 0 -1]
|
||||
assert_equal "1) \"foo\"\n2) \"bar\"" [run_cli lrange list 0 -1]
|
||||
}
|
||||
|
||||
test_tty_cli "Read last argument from pipe" {
|
||||
assert_equal "OK\n" [run_cli_with_input_pipe "echo foo" set key]
|
||||
assert_equal "OK" [run_cli_with_input_pipe "echo foo" set key]
|
||||
assert_equal "foo\n" [r get key]
|
||||
}
|
||||
|
||||
test_tty_cli "Read last argument from file" {
|
||||
set tmpfile [write_tmpfile "from file"]
|
||||
assert_equal "OK\n" [run_cli_with_input_file $tmpfile set key]
|
||||
assert_equal "OK" [run_cli_with_input_file $tmpfile set key]
|
||||
assert_equal "from file" [r get key]
|
||||
}
|
||||
|
||||
@ -188,7 +190,7 @@ start_server {tags {"cli"}} {
|
||||
|
||||
test_nontty_cli "Bulk reply" {
|
||||
r set key "tab\tnewline\n"
|
||||
assert_equal "tab\tnewline\n" [run_cli get key]
|
||||
assert_equal "tab\tnewline" [run_cli get key]
|
||||
}
|
||||
|
||||
test_nontty_cli "Multi-bulk reply" {
|
||||
@ -208,4 +210,79 @@ start_server {tags {"cli"}} {
|
||||
assert_equal "OK" [run_cli_with_input_file $tmpfile set key]
|
||||
assert_equal "from file" [r get key]
|
||||
}
|
||||
|
||||
proc test_redis_cli_rdb_dump {} {
|
||||
r flushdb
|
||||
|
||||
set dir [lindex [r config get dir] 1]
|
||||
|
||||
assert_equal "OK" [r debug populate 100000 key 1000]
|
||||
catch {run_cli --rdb "$dir/cli.rdb"} output
|
||||
assert_match {*Transfer finished with success*} $output
|
||||
|
||||
file delete "$dir/dump.rdb"
|
||||
file rename "$dir/cli.rdb" "$dir/dump.rdb"
|
||||
|
||||
assert_equal "OK" [r set should-not-exist 1]
|
||||
assert_equal "OK" [r debug reload nosave]
|
||||
assert_equal {} [r get should-not-exist]
|
||||
}
|
||||
|
||||
test_nontty_cli "Dumping an RDB" {
|
||||
# Disk-based master
|
||||
assert_match "OK" [r config set repl-diskless-sync no]
|
||||
test_redis_cli_rdb_dump
|
||||
|
||||
# Disk-less master
|
||||
assert_match "OK" [r config set repl-diskless-sync yes]
|
||||
assert_match "OK" [r config set repl-diskless-sync-delay 0]
|
||||
test_redis_cli_rdb_dump
|
||||
}
|
||||
|
||||
test_nontty_cli "Connecting as a replica" {
|
||||
set fd [open_cli "--replica"]
|
||||
wait_for_condition 50 500 {
|
||||
[string match {*slave0:*state=online*} [r info]]
|
||||
} else {
|
||||
fail "redis-cli --replica did not connect"
|
||||
}
|
||||
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
r set test-key test-value-$i
|
||||
}
|
||||
r client kill type slave
|
||||
catch {
|
||||
assert_match {*SET*key-a*} [read_cli $fd]
|
||||
}
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test_nontty_cli "Piping raw protocol" {
|
||||
set fd [open_cli "--pipe"]
|
||||
fconfigure $fd -blocking true
|
||||
|
||||
# Create a new deferring client and overwrite its fd
|
||||
set client [redis [srv 0 "host"] [srv 0 "port"] 1 0]
|
||||
set ::redis::fd($::redis::id) $fd
|
||||
$client select 9
|
||||
|
||||
r del test-counter
|
||||
for {set i 0} {$i < 10000} {incr i} {
|
||||
$client incr test-counter
|
||||
$client set large-key [string repeat "x" 20000]
|
||||
}
|
||||
|
||||
for {set i 0} {$i < 1000} {incr i} {
|
||||
$client set very-large-key [string repeat "x" 512000]
|
||||
}
|
||||
|
||||
close $fd write
|
||||
set output [read_cli $fd]
|
||||
|
||||
assert_equal {10000} [r get test-counter]
|
||||
assert_match {*All data transferred*errors: 0*replies: 21001*} $output
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ set ::all_tests {
|
||||
integration/psync2
|
||||
integration/psync2-reg
|
||||
integration/psync2-pingoff
|
||||
integration/redis-cli
|
||||
unit/pubsub
|
||||
unit/slowlog
|
||||
unit/scripting
|
||||
|
Loading…
Reference in New Issue
Block a user