First version of evented Redis Tcl client, that will be used for BLPOP and Pub/Sub tests

This commit is contained in:
antirez 2010-04-07 13:55:06 +02:00
parent 4005fef124
commit 686bc0ac5c

115
redis.tcl
View File

@ -9,12 +9,30 @@
# $r lpush mylist bar # $r lpush mylist bar
# $r lrange mylist 0 -1 # $r lrange mylist 0 -1
# $r close # $r close
#
# Non blocking usage example:
#
# proc handlePong {r type reply} {
# puts "PONG $type '$reply'"
# if {$reply ne "PONG"} {
# $r ping [list handlePong]
# }
# }
#
# set r [redis]
# $r blocking 0
# $r get fo [list handlePong]
#
# vwait forever
package provide redis 0.1 package provide redis 0.1
namespace eval redis {} namespace eval redis {}
set ::redis::id 0 set ::redis::id 0
array set ::redis::fd {} array set ::redis::fd {}
array set ::redis::blocking {}
array set ::redis::callback {}
array set ::redis::state {} ;# State in non-blocking reply reading
array set ::redis::bulkarg {} array set ::redis::bulkarg {}
array set ::redis::multibulkarg {} array set ::redis::multibulkarg {}
@ -40,11 +58,21 @@ proc redis {{server 127.0.0.1} {port 6379}} {
fconfigure $fd -translation binary fconfigure $fd -translation binary
set id [incr ::redis::id] set id [incr ::redis::id]
set ::redis::fd($id) $fd set ::redis::fd($id) $fd
set ::redis::blocking($id) 1
::redis::redis_reset_state $id
interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
} }
proc ::redis::__dispatch__ {id method args} { proc ::redis::__dispatch__ {id method args} {
set fd $::redis::fd($id) set fd $::redis::fd($id)
set blocking $::redis::blocking($id)
if {$blocking == 0} {
if {[llength $args] == 0} {
error "Please provide a callback in non-blocking mode"
}
set callback [lindex $args end]
set args [lrange $args 0 end-1]
}
if {[info command ::redis::__method__$method] eq {}} { if {[info command ::redis::__method__$method] eq {}} {
if {[info exists ::redis::bulkarg($method)]} { if {[info exists ::redis::bulkarg($method)]} {
set cmd "$method " set cmd "$method "
@ -65,15 +93,31 @@ proc ::redis::__dispatch__ {id method args} {
append cmd [join $args] append cmd [join $args]
::redis::redis_writenl $fd $cmd ::redis::redis_writenl $fd $cmd
} }
::redis::redis_read_reply $fd if {$blocking} {
::redis::redis_read_reply $fd
} else {
# Every well formed reply read will pop an element from this
# list and use it as a callback. So pipelining is supported
# in non blocking mode.
lappend ::redis::callback($id) $callback
fileevent $fd readable [list ::redis::redis_readable $fd $id]
}
} else { } else {
uplevel 1 [list ::redis::__method__$method $id $fd] $args uplevel 1 [list ::redis::__method__$method $id $fd] $args
} }
} }
proc ::redis::__method__blocking {id fd val} {
set ::redis::blocking($id) $val
fconfigure $fd -blocking $val
}
proc ::redis::__method__close {id fd} { proc ::redis::__method__close {id fd} {
catch {close $fd} catch {close $fd}
catch {unset ::redis::fd($id)} catch {unset ::redis::fd($id)}
catch {unset ::redis::blocking($id)}
catch {unset ::redis::state($id)}
catch {unset ::redis::callback($id)}
catch {interp alias {} ::redis::redisHandle$id {}} catch {interp alias {} ::redis::redisHandle$id {}}
} }
@ -129,3 +173,72 @@ proc ::redis::redis_read_reply fd {
default {return -code error "Bad protocol, $type as reply type byte"} default {return -code error "Bad protocol, $type as reply type byte"}
} }
} }
proc ::redis::redis_reset_state id {
set ::redis::state($id) [dict create buf {} mbulk -1 bulk -1 reply {}]
}
proc ::redis::redis_call_callback {id type reply} {
set cb [lindex $::redis::callback($id) 0]
set ::redis::callback($id) [lrange $::redis::callback($id) 1 end]
uplevel #0 $cb [list ::redis::redisHandle$id $type $reply]
::redis::redis_reset_state $id
}
# Read a reply in non-blocking mode.
proc ::redis::redis_readable {fd id} {
if {[eof $fd]} {
redis_call_callback $id eof {}
::redis::__method__close $id $fd
return
}
if {[dict get $::redis::state($id) bulk] == -1} {
set line [gets $fd]
if {$line eq {}} return ;# No complete line available, return
switch -exact -- [string index $line 0] {
: -
+ {redis_call_callback $id reply [string range $line 1 end-1]}
- {redis_call_callback $id err [string range $line 1 end-1]}
$ {
dict set ::redis::state($id) bulk \
[expr [string range $line 1 end-1]+2]
if {[dict get $::redis::state($id) bulk] == 1} {
# We got a $-1, hack the state to play well with this.
dict set ::redis::state($id) bulk 2
dict set ::redis::state($id) buf "\r\n"
::redis::redis_readable $fd $id
}
}
* {dict set ::redis::state($id) mbulk [string range $line 1 end-1]}
default {
redis_call_callback $id err \
"Bad protocol, $type as reply type byte"
}
}
} else {
set totlen [dict get $::redis::state($id) bulk]
set buflen [string length [dict get $::redis::state($id) buf]]
set toread [expr {$totlen-$buflen}]
set data [read $fd $toread]
set nread [string length $data]
dict append ::redis::state($id) buf $data
# Check if we read a complete bulk reply
if {[string length [dict get $::redis::state($id) buf]] ==
[dict get $::redis::state($id) bulk]} {
if {[dict get $::redis::state($id) mbulk] == -1} {
redis_call_callback $id reply \
[string range [dict get $::redis::state($id) buf] 0 end-2]
} else {
dict with ::redis::state($id) {
lappend reply [string range $buf 0 end-2]
incr mbulk -1
set bulk -1
}
if {[dict get $::redis::state($id) mbulk] == 0} {
redis_call_callback $id reply \
[dict get $::redis::state($id) reply]
}
}
}
}
}