mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
50ee0f5be8
Based on feedback from interested parties
183 lines
7.4 KiB
Tcl
183 lines
7.4 KiB
Tcl
# SPDX-FileCopyrightText: 2024 Redict Contributors
|
|
# SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
|
|
#
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
# SPDX-License-Identifier: LGPL-3.0-only
|
|
|
|
set testmodule [file normalize tests/modules/stream.so]
|
|
|
|
start_server {tags {"modules"}} {
|
|
r module load $testmodule
|
|
|
|
test {Module stream add and delete} {
|
|
r del mystream
|
|
# add to empty key
|
|
set streamid1 [r stream.add mystream item 1 value a]
|
|
# add to existing stream
|
|
set streamid2 [r stream.add mystream item 2 value b]
|
|
# check result
|
|
assert { [string match "*-*" $streamid1] }
|
|
set items [r XRANGE mystream - +]
|
|
assert_equal $items \
|
|
"{$streamid1 {item 1 value a}} {$streamid2 {item 2 value b}}"
|
|
# delete one of them and try deleting non-existing ID
|
|
assert_equal OK [r stream.delete mystream $streamid1]
|
|
assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
|
|
assert_error "Invalid stream ID*" {r stream.delete mystream foo}
|
|
assert_equal "{$streamid2 {item 2 value b}}" [r XRANGE mystream - +]
|
|
# check error condition: wrong type
|
|
r del mystream
|
|
r set mystream mystring
|
|
assert_error "ERR StreamAdd*" {r stream.add mystream item 1 value a}
|
|
assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
|
|
}
|
|
|
|
test {Module stream add unblocks blocking xread} {
|
|
r del mystream
|
|
|
|
# Blocking XREAD on an empty key
|
|
set rd1 [redict_deferring_client]
|
|
$rd1 XREAD BLOCK 3000 STREAMS mystream $
|
|
# wait until client is actually blocked
|
|
wait_for_condition 50 100 {
|
|
[s 0 blocked_clients] eq {1}
|
|
} else {
|
|
fail "Client is not blocked"
|
|
}
|
|
set id [r stream.add mystream field 1 value a]
|
|
assert_equal "{mystream {{$id {field 1 value a}}}}" [$rd1 read]
|
|
|
|
# Blocking XREAD on an existing stream
|
|
set rd2 [redict_deferring_client]
|
|
$rd2 XREAD BLOCK 3000 STREAMS mystream $
|
|
# wait until client is actually blocked
|
|
wait_for_condition 50 100 {
|
|
[s 0 blocked_clients] eq {1}
|
|
} else {
|
|
fail "Client is not blocked"
|
|
}
|
|
set id [r stream.add mystream field 2 value b]
|
|
assert_equal "{mystream {{$id {field 2 value b}}}}" [$rd2 read]
|
|
}
|
|
|
|
test {Module stream add benchmark (1M stream add)} {
|
|
set n 1000000
|
|
r del mystream
|
|
set result [r stream.addn mystream $n field value]
|
|
assert_equal $result $n
|
|
}
|
|
|
|
test {Module stream XADD big fields doesn't create empty key} {
|
|
set original_proto [config_get_set proto-max-bulk-len 2147483647] ;#2gb
|
|
set original_query [config_get_set client-query-buffer-limit 2147483647] ;#2gb
|
|
|
|
r del mystream
|
|
r write "*4\r\n\$10\r\nstream.add\r\n\$8\r\nmystream\r\n\$5\r\nfield\r\n"
|
|
catch {
|
|
write_big_bulk 1073741824 ;#1gb
|
|
} err
|
|
assert {$err eq "ERR StreamAdd failed"}
|
|
assert_equal 0 [r exists mystream]
|
|
|
|
# restore defaults
|
|
r config set proto-max-bulk-len $original_proto
|
|
r config set client-query-buffer-limit $original_query
|
|
} {OK} {large-memory}
|
|
|
|
test {Module stream iterator} {
|
|
r del mystream
|
|
set streamid1 [r xadd mystream * item 1 value a]
|
|
set streamid2 [r xadd mystream * item 2 value b]
|
|
# range result
|
|
set result1 [r stream.range mystream "-" "+"]
|
|
set expect1 [r xrange mystream "-" "+"]
|
|
assert_equal $result1 $expect1
|
|
# reverse range
|
|
set result_rev [r stream.range mystream "+" "-"]
|
|
set expect_rev [r xrevrange mystream "+" "-"]
|
|
assert_equal $result_rev $expect_rev
|
|
|
|
# only one item: range with startid = endid
|
|
set result2 [r stream.range mystream "-" $streamid1]
|
|
assert_equal $result2 "{$streamid1 {item 1 value a}}"
|
|
assert_equal $result2 [list [list $streamid1 {item 1 value a}]]
|
|
# only one item: range with startid = endid
|
|
set result3 [r stream.range mystream $streamid2 $streamid2]
|
|
assert_equal $result3 "{$streamid2 {item 2 value b}}"
|
|
assert_equal $result3 [list [list $streamid2 {item 2 value b}]]
|
|
}
|
|
|
|
test {Module stream iterator delete} {
|
|
r del mystream
|
|
set id1 [r xadd mystream * normal item]
|
|
set id2 [r xadd mystream * selfdestruct yes]
|
|
set id3 [r xadd mystream * another item]
|
|
# stream.range deletes the "selfdestruct" item after returning it
|
|
assert_equal \
|
|
"{$id1 {normal item}} {$id2 {selfdestruct yes}} {$id3 {another item}}" \
|
|
[r stream.range mystream - +]
|
|
# now, the "selfdestruct" item is gone
|
|
assert_equal \
|
|
"{$id1 {normal item}} {$id3 {another item}}" \
|
|
[r stream.range mystream - +]
|
|
}
|
|
|
|
test {Module stream trim by length} {
|
|
r del mystream
|
|
# exact maxlen
|
|
r xadd mystream * item 1 value a
|
|
r xadd mystream * item 2 value b
|
|
r xadd mystream * item 3 value c
|
|
assert_equal 3 [r xlen mystream]
|
|
assert_equal 0 [r stream.trim mystream maxlen = 5]
|
|
assert_equal 3 [r xlen mystream]
|
|
assert_equal 2 [r stream.trim mystream maxlen = 1]
|
|
assert_equal 1 [r xlen mystream]
|
|
assert_equal 1 [r stream.trim mystream maxlen = 0]
|
|
# check that there is no limit for exact maxlen
|
|
r stream.addn mystream 20000 item x value y
|
|
assert_equal 20000 [r stream.trim mystream maxlen = 0]
|
|
# approx maxlen (100 items per node implies default limit 10K items)
|
|
r stream.addn mystream 20000 item x value y
|
|
assert_equal 20000 [r xlen mystream]
|
|
assert_equal 10000 [r stream.trim mystream maxlen ~ 2]
|
|
assert_equal 9900 [r stream.trim mystream maxlen ~ 2]
|
|
assert_equal 0 [r stream.trim mystream maxlen ~ 2]
|
|
assert_equal 100 [r xlen mystream]
|
|
assert_equal 100 [r stream.trim mystream maxlen ~ 0]
|
|
assert_equal 0 [r xlen mystream]
|
|
}
|
|
|
|
test {Module stream trim by ID} {
|
|
r del mystream
|
|
# exact minid
|
|
r xadd mystream * item 1 value a
|
|
r xadd mystream * item 2 value b
|
|
set minid [r xadd mystream * item 3 value c]
|
|
assert_equal 3 [r xlen mystream]
|
|
assert_equal 0 [r stream.trim mystream minid = -]
|
|
assert_equal 3 [r xlen mystream]
|
|
assert_equal 2 [r stream.trim mystream minid = $minid]
|
|
assert_equal 1 [r xlen mystream]
|
|
assert_equal 1 [r stream.trim mystream minid = +]
|
|
# check that there is no limit for exact minid
|
|
r stream.addn mystream 20000 item x value y
|
|
assert_equal 20000 [r stream.trim mystream minid = +]
|
|
# approx minid (100 items per node implies default limit 10K items)
|
|
r stream.addn mystream 19980 item x value y
|
|
set minid [r xadd mystream * item x value y]
|
|
r stream.addn mystream 19 item x value y
|
|
assert_equal 20000 [r xlen mystream]
|
|
assert_equal 10000 [r stream.trim mystream minid ~ $minid]
|
|
assert_equal 9900 [r stream.trim mystream minid ~ $minid]
|
|
assert_equal 0 [r stream.trim mystream minid ~ $minid]
|
|
assert_equal 100 [r xlen mystream]
|
|
assert_equal 100 [r stream.trim mystream minid ~ +]
|
|
assert_equal 0 [r xlen mystream]
|
|
}
|
|
|
|
test "Unload the module - stream" {
|
|
assert_equal {OK} [r module unload stream]
|
|
}
|
|
}
|