2024-03-21 05:56:59 -04:00
# Copyright (C) 2009-2014 Salvatore Sanfilippo
2024-03-21 09:30:47 -04:00
# SPDX-FileCopyrightText: 2024 Redict Contributors
# SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
2024-03-21 05:56:59 -04:00
#
2024-03-21 09:30:47 -04:00
# SPDX-License-Identifier: BSD-3-Clause
2024-03-21 15:11:44 -04:00
# SPDX-License-Identifier: LGPL-3.0-only
2024-03-21 05:56:59 -04:00
package require Tcl 8.5
package provide redict 0.1
source [ file join [ file dirname [ info script] ] " r e s p o n s e _ t r a n s f o r m e r s . t c l " ]
namespace eval redict { }
set : : redict::id 0
array set : : redict::fd { }
array set : : redict::addr { }
array set : : redict::blocking { }
array set : : redict::deferred { }
array set : : redict::readraw { }
array set : : redict::attributes { } ; # Holds the RESP3 attributes from the last call
array set : : redict::reconnect { }
array set : : redict::tls { }
array set : : redict::callback { }
array set : : redict::state { } ; # State in non-blocking reply reading
array set : : redict::statestack { } ; # Stack of states, for nested mbulks
array set : : redict::curr_argv { } ; # Remember the current argv, to be used in response_transformers.tcl
array set : : redict::testing_resp3 { } ; # Indicating if the current client is using RESP3 (only if the test is trying to test RESP3 specific behavior. It won't be on in case of force_resp3)
set : : force_resp3 0
set : : log_req_res 0
proc redict { { server 127.0 .0.1} { port 6379 } { defer 0 } { tls 0 } { tlsoptions { } } { readraw 0 } } {
if { $tls } {
package require tls
: : tls : : init \
-cafile " $ : : t l s d i r / c a . c r t " \
-certfile " $ : : t l s d i r / c l i e n t . c r t " \
-keyfile " $ : : t l s d i r / c l i e n t . k e y " \
{ * } $tlsoptions
set fd [ : : tls : : socket $server $port ]
} else {
set fd [ socket $server $port ]
}
fconfigure $fd - translation binary
set id [ incr : : redict::id]
set : : redict::fd( $id ) $fd
set : : redict::addr( $id ) [ list $server $port ]
set : : redict::blocking( $id ) 1
set : : redict::deferred( $id ) $defer
set : : redict::readraw( $id ) $readraw
set : : redict::reconnect( $id ) 0
set : : redict::curr_argv( $id ) 0
set : : redict::testing_resp3( $id ) 0
set : : redict::tls( $id ) $tls
: : redict : : redict_reset_state $id
interp alias { } : : redict::redictHandle$id { } : : redict::__dispatch__ $id
}
# On recent versions of tcl-tls/OpenSSL, reading from a dropped connection
# results with an error we need to catch and mimic the old behavior.
proc : : redict::redict_safe_read { fd len} {
if { $len == -1 } {
set err [ catch { set val [ read $fd ] } msg]
} else {
set err [ catch { set val [ read $fd $len ] } msg]
}
if { ! $err } {
return $val
}
if { [ string match " * c o n n e c t i o n a b o r t * " $msg ] } {
return { }
}
error $msg
}
proc : : redict::redict_safe_gets { fd } {
if { [ catch { set val [ gets $fd ] } msg] } {
if { [ string match " * c o n n e c t i o n a b o r t * " $msg ] } {
return { }
}
error $msg
}
return $val
}
# This is a wrapper to the actual dispatching procedure that handles
# reconnection if needed.
proc : : redict::__dispatch__ { id method args} {
set errorcode [ catch { : : redict : : __dispatch__raw__ $id $method $args } retval]
if { $errorcode && $::redict::reconnect ( $id ) && $::redict::fd ( $id ) eq { } } {
# Try again if the connection was lost.
# FIXME: we don't re-select the previously selected DB, nor we check
# if we are inside a transaction that needs to be re-issued from
# scratch.
set errorcode [ catch { : : redict : : __dispatch__raw__ $id $method $args } retval]
}
return - code $errorcode $retval
}
proc : : redict::__dispatch__raw__ { id method argv} {
set fd $::redict::fd ( $id )
# Reconnect the link if needed.
if { $fd eq { } && $method ne { close } } {
lassign $::redict::addr ( $id ) host port
if { $::redict::tls ( $id ) } {
set : : redict::fd( $id ) [ : : tls : : socket $host $port ]
} else {
set : : redict::fd( $id ) [ socket $host $port ]
}
fconfigure $::redict::fd ( $id ) - translation binary
set fd $::redict::fd ( $id )
}
# Transform HELLO 2 to HELLO 3 if force_resp3
# All set the connection var testing_resp3 in case of HELLO 3
if { [ llength $argv ] > 0 && [ string compare - nocase $method " H E L L O " ] == 0 } {
if { [ lindex $argv 0 ] == 3 } {
set : : redict::testing_resp3( $id ) 1
} else {
set : : redict::testing_resp3( $id ) 0
if { $::force_resp3 } {
# If we are in force_resp3 we run HELLO 3 instead of HELLO 2
lset argv 0 3
}
}
}
set blocking $::redict::blocking ( $id )
set deferred $::redict::deferred ( $id )
if { $blocking == 0 } {
if { [ llength $argv ] == 0 } {
error " P l e a s e p r o v i d e a c a l l b a c k i n n o n - b l o c k i n g m o d e "
}
set callback [ lindex $argv end]
set argv [ lrange $argv 0 end-1]
}
if { [ info command : : redict::__method__$method ] eq { } } {
catch { unset : : redict::attributes( $id ) }
set cmd " * [ e x p r { [ l l e n g t h $ a r g v ] + 1 } ] \r \n "
append cmd " $ [ s t r i n g l e n g t h $ m e t h o d ] \r \n $ m e t h o d \r \n "
foreach a $argv {
append cmd " $ [ s t r i n g l e n g t h $ a ] \r \n $ a \r \n "
}
: : redict : : redict_write $fd $cmd
if { [ catch { flush $fd } ] } {
catch { close $fd }
set : : redict::fd( $id ) { }
return - code error " I / O e r r o r r e a d i n g r e p l y "
}
set : : redict::curr_argv( $id ) [ concat $method $argv ]
if { ! $deferred } {
if { $blocking } {
: : redict : : redict_read_reply $id $fd
} else {
# Every well formed reply read will pop an element from this
# list and use it as a callback. So pipelining is supported
# in non blocking mode.
lappend : : redict::callback( $id ) $callback
fileevent $fd readable [ list : : redict::redict_readable $fd $id ]
}
}
} else {
uplevel 1 [ list : : redict::__method__$method $id $fd ] $argv
}
}
proc : : redict::__method__blocking { id fd val} {
set : : redict::blocking( $id ) $val
fconfigure $fd - blocking $val
}
proc : : redict::__method__reconnect { id fd val} {
set : : redict::reconnect( $id ) $val
}
proc : : redict::__method__read { id fd} {
: : redict : : redict_read_reply $id $fd
}
proc : : redict::__method__rawread { id fd { len - 1 } } {
return [ redict_safe_read $fd $len ]
}
proc : : redict::__method__write { id fd buf} {
: : redict : : redict_write $fd $buf
}
proc : : redict::__method__flush { id fd} {
flush $fd
}
proc : : redict::__method__close { id fd} {
catch { close $fd }
catch { unset : : redict::fd( $id ) }
catch { unset : : redict::addr( $id ) }
catch { unset : : redict::blocking( $id ) }
catch { unset : : redict::deferred( $id ) }
catch { unset : : redict::readraw( $id ) }
catch { unset : : redict::attributes( $id ) }
catch { unset : : redict::reconnect( $id ) }
catch { unset : : redict::tls( $id ) }
catch { unset : : redict::state( $id ) }
catch { unset : : redict::statestack( $id ) }
catch { unset : : redict::callback( $id ) }
catch { unset : : redict::curr_argv( $id ) }
catch { unset : : redict::testing_resp3( $id ) }
catch { interp alias { } : : redict::redictHandle$id { } }
}
proc : : redict::__method__channel { id fd} {
return $fd
}
proc : : redict::__method__deferred { id fd val} {
set : : redict::deferred( $id ) $val
}
proc : : redict::__method__readraw { id fd val} {
set : : redict::readraw( $id ) $val
}
proc : : redict::__method__readingraw { id fd} {
return $::redict::readraw ( $id )
}
proc : : redict::__method__attributes { id fd} {
set _ $::redict::attributes ( $id )
}
proc : : redict::redict_write { fd buf} {
puts - nonewline $fd $buf
}
proc : : redict::redict_writenl { fd buf} {
redict_write $fd $buf
redict_write $fd " \r \n "
flush $fd
}
proc : : redict::redict_readnl { fd len} {
set buf [ redict_safe_read $fd $len ]
redict_safe_read $fd 2 ; # discard CR LF
return $buf
}
proc : : redict::redict_bulk_read { fd } {
set count [ redict_read_line $fd ]
if { $count == -1 } return { }
set buf [ redict_readnl $fd $count ]
return $buf
}
proc : : redict::redict_multi_bulk_read { id fd} {
set count [ redict_read_line $fd ]
if { $count == -1 } return { }
set l { }
set err { }
for { set i 0 } { $i < $count } { incr i} {
if { [ catch {
lappend l [ redict_read_reply_logic $id $fd ]
} e ] && $err eq { } } {
set err $e
}
}
if { $err ne { } } { return - code error $err }
return $l
}
proc : : redict::redict_read_map { id fd} {
set count [ redict_read_line $fd ]
if { $count == -1 } return { }
set d { }
set err { }
for { set i 0 } { $i < $count } { incr i} {
if { [ catch {
set k [ redict_read_reply_logic $id $fd ] ; # key
set v [ redict_read_reply_logic $id $fd ] ; # value
dict set d $k $v
} e ] && $err eq { } } {
set err $e
}
}
if { $err ne { } } { return - code error $err }
return $d
}
proc : : redict::redict_read_line fd {
string trim [ redict_safe_gets $fd ]
}
proc : : redict::redict_read_null fd {
redict_safe_gets $fd
return { }
}
proc : : redict::redict_read_bool fd {
set v [ redict_read_line $fd ]
if { $v == " t " } { return 1 }
if { $v == " f " } { return 0 }
return - code error " B a d p r o t o c o l , ' $ v ' a s b o o l t y p e "
}
proc : : redict::redict_read_double { id fd} {
set v [ redict_read_line $fd ]
# unlike many other DTs, there is a textual difference between double and a string with the same value,
# so we need to transform to double if we are testing RESP3 (i.e. some tests check that a
# double reply is "1.0" and not "1")
if { [ should_transform_to_resp2 $id ] } {
return $v
} else {
return [ expr { double ( $v ) } ]
}
}
proc : : redict::redict_read_verbatim_str fd {
set v [ redict_bulk_read $fd ]
# strip the first 4 chars ("txt:")
return [ string range $v 4 end]
}
proc : : redict::redict_read_reply_logic { id fd} {
if { $::redict::readraw ( $id ) } {
return [ redict_read_line $fd ]
}
while { 1 } {
set type [ redict_safe_read $fd 1 ]
switch - exact - - $type {
_ { return [ redict_read_null $fd ] }
: -
( -
+ { return [ redict_read_line $fd ] }
, { return [ redict_read_double $id $fd ] }
# {return [redict_read_bool $fd]}
= { return [ redict_read_verbatim_str $fd ] }
- { return - code error [ redict_read_line $fd ] }
$ { return [ redict_bulk_read $fd ] }
> -
~ -
* { return [ redict_multi_bulk_read $id $fd ] }
% { return [ redict_read_map $id $fd ] }
| {
set attrib [ redict_read_map $id $fd ]
set : : redict::attributes( $id ) $attrib
continue
}
default {
if { $type eq { } } {
catch { close $fd }
set : : redict::fd( $id ) { }
return - code error " I / O e r r o r r e a d i n g r e p l y "
}
return - code error " B a d p r o t o c o l , ' $ t y p e ' a s r e p l y t y p e b y t e "
}
}
}
}
proc : : redict::redict_read_reply { id fd} {
set response [ redict_read_reply_logic $id $fd ]
: : response_transformers : : transform_response_if_needed $id $::redict::curr_argv ( $id ) $response
}
proc : : redict::redict_reset_state id {
set : : redict::state( $id ) [ dict create buf { } mbulk - 1 bulk - 1 reply { } ]
set : : redict::statestack( $id ) { }
}
2024-03-25 10:30:46 -04:00
proc : : redict::redict.call_callback { id type reply} {
2024-03-21 05:56:59 -04:00
set cb [ lindex $::redict::callback ( $id ) 0 ]
set : : redict::callback( $id ) [ lrange $::redict::callback ( $id ) 1 end]
uplevel # 0 $cb [ list : : redict::redictHandle$id $type $reply ]
: : redict : : redict_reset_state $id
}
# Read a reply in non-blocking mode.
proc : : redict::redict_readable { fd id} {
if { [ eof $fd ] } {
2024-03-25 10:30:46 -04:00
redict.call_callback $id eof { }
2024-03-21 05:56:59 -04:00
: : redict : : __method__close $id $fd
return
}
if { [ dict get $::redict::state ( $id ) bulk] == -1 } {
set line [ gets $fd ]
if { $line eq { } } return ; # No complete line available, return
switch - exact - - [ string index $line 0 ] {
: -
2024-03-25 10:30:46 -04:00
+ { redict.call_callback $id reply [ string range $line 1 end-1] }
- { redict.call_callback $id err [ string range $line 1 end-1] }
( { redict.call_callback $id reply [ string range $line 1 end-1] }
2024-03-21 05:56:59 -04:00
$ {
dict set : : redict::state( $id ) bulk \
[ expr [ string range $line 1 end-1] + 2 ]
if { [ dict get $::redict::state ( $id ) bulk] == 1 } {
# We got a $-1, hack the state to play well with this.
dict set : : redict::state( $id ) bulk 2
dict set : : redict::state( $id ) buf " \r \n "
: : redict : : redict_readable $fd $id
}
}
* {
dict set : : redict::state( $id ) mbulk [ string range $line 1 end-1]
# Handle *-1
if { [ dict get $::redict::state ( $id ) mbulk] == -1 } {
2024-03-25 10:30:46 -04:00
redict.call_callback $id reply { }
2024-03-21 05:56:59 -04:00
}
}
default {
2024-03-25 10:30:46 -04:00
redict.call_callback $id err \
2024-03-21 05:56:59 -04:00
" B a d p r o t o c o l , $ t y p e a s r e p l y t y p e b y t e "
}
}
} else {
set totlen [ dict get $::redict::state ( $id ) bulk]
set buflen [ string length [ dict get $::redict::state ( $id ) buf] ]
set toread [ expr { $totlen- $buflen } ]
set data [ read $fd $toread ]
set nread [ string length $data ]
dict append : : redict::state( $id ) buf $data
# Check if we read a complete bulk reply
if { [ string length [ dict get $::redict::state ( $id ) buf] ] ==
[ dict get $::redict::state ( $id ) bulk] } {
if { [ dict get $::redict::state ( $id ) mbulk] == -1 } {
2024-03-25 10:30:46 -04:00
redict.call_callback $id reply \
2024-03-21 05:56:59 -04:00
[ string range [ dict get $::redict::state ( $id ) buf] 0 end-2]
} else {
dict with : : redict::state( $id ) {
lappend reply [ string range $buf 0 end-2]
incr mbulk - 1
set bulk - 1
}
if { [ dict get $::redict::state ( $id ) mbulk] == 0 } {
2024-03-25 10:30:46 -04:00
redict.call_callback $id reply \
2024-03-21 05:56:59 -04:00
[ dict get $::redict::state ( $id ) reply]
}
}
}
}
}
# when forcing resp3 some tests that rely on resp2 can fail, so we have to translate the resp3 response to resp2
proc : : redict::should_transform_to_resp2 { id } {
return [ expr { $::force_resp3 && ! $::redict::testing_resp3 ( $id ) } ]
}