Merge pull request #2215 from advance512/spopWithCount

SPOP optional count argument. (issue #1793, supersedes pull request #1803)
This commit is contained in:
Salvatore Sanfilippo 2014-12-17 17:59:59 +01:00
commit 70674ac677
9 changed files with 414 additions and 12 deletions

View File

@ -651,8 +651,8 @@ struct commandHelp {
0,
"1.0.0" },
{ "SPOP",
"key",
"Remove and return a random member from a set",
"key [count]",
"Remove and return one or multiple random members from a set",
3,
"1.0.0" },
{ "SRANDMEMBER",

View File

@ -261,6 +261,100 @@ int64_t intsetRandom(intset *is) {
return _intsetGet(is,rand()%intrev32ifbe(is->length));
}
/* How many times bigger should the set length be compared to the requested
* count of members for us to use the Floyd algorithm instead of
the Knuth algorithm */
#define RANDOMMEMBERS_ALGORITHM_SELECTION_RATIO (2)
/* Copies 'count' random members from the set into the 'values' array.
'values' must be an array of int64_t values, of length 'count'.
Returns the amount of items returned. If this amount is less than 'count',
then the remaining 'values' are left uninitialized. */
int intsetRandomMembers(intset *is, int64_t* values, int count) {
/* We don't check that is and values are non-NULL - the caller must
play nice. */
/* redisAssert(is != NULL);
redisAssert(values != NULL);
redisAssert(count > 0); */
int length = intsetLen(is);
if (count > length) {
/* Return everything in the set */
count = length;
}
/* Choose between the Knuth shuffle algorithm, O(1) space, O(length) time,
and the Floyd algorithm, O(length) space, O(count) time. */
if ((RANDOMMEMBERS_ALGORITHM_SELECTION_RATIO * count) > length) {
/* If the count of members requested is almost the length of the set,
use the Knuth shuffle algorithm, O(1) space, O(length) time. */
/* First, fill the values array with unique random indexes inside
the set. */
int in, im, rn, rm;
im = 0;
for (in = 0;
in < length && im < count;
in++) {
rn = length - in;
rm = count - im;
if (rand() % rn < rm) {
values[im++] = in;
}
}
} else {
/* If the length is considerably more than the count of members
requested, use Robert Floyd's algorithm, O(length) space,
O(count) time.
Based on Jon Bentley's Programming Pearls */
int64_t *is_used = zcalloc(sizeof(int64_t) * length);
int in, im, r;
r = 0;
im = 0;
for (in = length - count;
in < length && im < count;
in++) {
/* Generate a random number r */
r = rand() % (in + 1);
/* Do we already have the value in r? */
if (is_used[r]) {
/* Use in instead of the generated number */
r = in;
}
values[im++] = r ;
/* Mark it as used */
is_used[r] = 1;
}
zfree(is_used);
}
/* Replace each random index with the value stored there in the intset */
uint8_t encoding = intrev32ifbe(is->encoding);
for (int currentValue = 0;
currentValue < count;
currentValue++) {
values[currentValue] =
_intsetGetEncoded(is, values[currentValue], encoding);
}
return count;
}
/* Sets the value to the value at the given position. When this position is
* out of range the function returns 0, when in range it returns 1. */
uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value) {
@ -363,8 +457,10 @@ int main(int argc, char **argv) {
assert(_intsetValueEncoding(+2147483647) == INTSET_ENC_INT32);
assert(_intsetValueEncoding(-2147483649) == INTSET_ENC_INT64);
assert(_intsetValueEncoding(+2147483648) == INTSET_ENC_INT64);
assert(_intsetValueEncoding(-9223372036854775808ull) == INTSET_ENC_INT64);
assert(_intsetValueEncoding(+9223372036854775807ull) == INTSET_ENC_INT64);
assert(_intsetValueEncoding(-9223372036854775808ull) ==
INTSET_ENC_INT64);
assert(_intsetValueEncoding(+9223372036854775807ull) ==
INTSET_ENC_INT64);
ok();
}
@ -461,7 +557,8 @@ int main(int argc, char **argv) {
start = usec();
for (i = 0; i < num; i++) intsetSearch(is,rand() % ((1<<bits)-1),NULL);
printf("%ld lookups, %ld element set, %lldusec\n",num,size,usec()-start);
printf("%ld lookups, %ld element set, %lldusec\n",
num,size,usec()-start);
}
printf("Stress add+delete: "); {

View File

@ -43,6 +43,7 @@ intset *intsetAdd(intset *is, int64_t value, uint8_t *success);
intset *intsetRemove(intset *is, int64_t value, int *success);
uint8_t intsetFind(intset *is, int64_t value);
int64_t intsetRandom(intset *is);
int intsetRandomMembers(intset *is, int64_t* value, int count);
uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value);
uint32_t intsetLen(intset *is);
size_t intsetBlobLen(intset *is);

View File

@ -162,7 +162,7 @@ struct redisCommand redisCommandTable[] = {
{"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
{"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
{"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
{"spop",spopCommand,2,"wRsF",0,NULL,1,1,1,0,0},
{"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0},
{"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
{"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},

View File

@ -1269,6 +1269,7 @@ void setTypeReleaseIterator(setTypeIterator *si);
int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele);
robj *setTypeNextObject(setTypeIterator *si);
int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele);
int setTypeRandomElements(robj *set, int count, robj *aux_set);
unsigned long setTypeSize(robj *subject);
void setTypeConvert(robj *subject, int enc);

View File

@ -33,7 +33,8 @@
* Set Commands
*----------------------------------------------------------------------------*/
void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op);
void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum,
robj *dstkey, int op);
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
@ -68,7 +69,8 @@ int setTypeAdd(robj *subject, robj *value) {
/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK);
redisAssertWithInfo(NULL,value,
dictAdd(subject->ptr,value,NULL) == DICT_OK);
incrRefCount(value);
return 1;
}
@ -205,6 +207,100 @@ int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {
return setobj->encoding;
}
/* Return a number of random elements from a non empty set.
*
* This is a version of setTypeRandomElement() that is modified in order to
* return multiple entries, using dictGetRandomKeys() and intsetRandomMembers().
*
* The elements are stored into 'aux_set' which should be of a set type.
*
* The function returns the number of items stored into 'aux_set', that may
* be less than 'count' if the hash table has less than 'count' elements
* inside.
*
* Note that this function is not suitable when you need a good distribution
* of the returned items, but only when you need to "sample" a given number
* of continuous elements to run some kind of algorithm or to produce
* statistics. However the function is much faster than setTypeRandomElement()
* at producing N elements, and the elements are guaranteed to be non
* repeating.
*/
int setTypeRandomElements(robj *set, int count, robj *aux_set) {
int set_size;
int elements_to_return = count;
int elements_copied = 0;
int current_element = 0;
/* Like all setType* functions, we assume good behavior on part of the caller,
* so no extra parameter checks are made. */
/* If the number of elements in the the set is less than the count
requested, just return all of them. */
set_size = setTypeSize(set);
if (set_size < count) {
elements_to_return = set_size;
}
/* TODO: It is definitely faster adding items to the set by directly handling the Dict
or intset inside it, avoiding the constant encoding checks inside setTypeAdd().
However, We don't want to touch the set internals in non setType* functions.
So, we just call setTypeAdd() multiple times, but this isn't an optimial
solution.
Another option would be to create a bulk-add function: setTypeAddBulk(). */
if (set->encoding == REDIS_ENCODING_HT) {
/* Allocate result array */
dictEntry **random_elements =
zmalloc(sizeof(dictEntry*) * elements_to_return);
/* Get the random elements */
elements_copied =
dictGetRandomKeys(set->ptr, random_elements, elements_to_return);
redisAssert(elements_copied == elements_to_return);
/* Put them into the set */
for (current_element = 0;
current_element < elements_copied;
current_element++) {
/* We get the key and duplicate it, as we know it is a string */
setTypeAdd(aux_set,
dupStringObject(dictGetKey(random_elements[current_element])));
}
zfree(random_elements);
} else if (set->encoding == REDIS_ENCODING_INTSET) {
/* Allocate result array */
int64_t *random_elements = zmalloc(sizeof(int64_t) * elements_to_return);
elements_copied =
intsetRandomMembers((intset*) set->ptr,
random_elements,
elements_to_return);
redisAssert(elements_copied == elements_to_return);
/* Put them into the set */
for (current_element = 0;
current_element < elements_copied;
current_element++) {
/* Put the values in the set */
setTypeAdd(aux_set,
createStringObjectFromLongLong(
random_elements[current_element]));
}
zfree(random_elements);
} else {
redisPanic("Unknown set encoding");
}
/* We have a set with random elements. Return the actual elements in
the aux_set. */
return elements_copied;
}
unsigned long setTypeSize(robj *subject) {
if (subject->encoding == REDIS_ENCODING_HT) {
return dictSize((dict*)subject->ptr);
@ -235,7 +331,8 @@ void setTypeConvert(robj *setobj, int enc) {
si = setTypeInitIterator(setobj);
while (setTypeNext(si,NULL,&intele) != -1) {
element = createStringObjectFromLongLong(intele);
redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK);
redisAssertWithInfo(NULL,element,
dictAdd(d,element,NULL) == DICT_OK);
}
setTypeReleaseIterator(si);
@ -377,15 +474,147 @@ void scardCommand(redisClient *c) {
addReplyLongLong(c,setTypeSize(o));
}
/* handle the "SPOP key <count>" variant. The normal version of the
* command is handled by the spopCommand() function itself. */
void spopWithCountCommand(redisClient *c) {
long l;
unsigned long count, size;
int elements_returned;
robj *set, *aux, *aux_set;
int64_t llele;
/* Get the count argument */
if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return;
if (l >= 0) {
count = (unsigned) l;
} else {
addReply(c,shared.outofrangeerr);
return;
}
/* Make sure a key with the name inputted exists, and that it's type is
indeed a set. Otherwise, return nil */
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))
== NULL || checkType(c,set,REDIS_SET)) return;
/* If count is zero, serve an empty multibulk ASAP to avoid special
cases later. */
if (count == 0) {
addReply(c,shared.emptymultibulk);
return;
}
/* Get the size of the set. It is always > 0, as empty sets get
deleted. */
size = setTypeSize(set);
/* Generate an SPOP keyspace notification */
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"spop",c->argv[1],c->db->id);
/* CASE 1:
* The number of requested elements is greater than or equal to
* the number of elements inside the set: simply return the whole set. */
if (count >= size) {
/* We just return the entire set */
sunionDiffGenericCommand(c,c->argv+1,1,NULL,REDIS_OP_UNION);
/* Delete the set as it is now empty */
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
/* Replicate/AOF this command as an SREM operation */
aux = createStringObject("DEL",3);
rewriteClientCommandVector(c,2,aux,c->argv[1]);
decrRefCount(aux);
return;
}
/* CASE 2:
* The number of requested elements is less than the number
* of elements inside the set. */
/* We need an auxiliary set. Optimistically, we create a set using an
Intset internally. */
aux = createStringObjectFromLongLong(0);
aux_set = setTypeCreate(aux);
decrRefCount(aux);
/* Get the count requested of random elements from the set into our
auxiliary set. */
elements_returned = setTypeRandomElements(set, count, aux_set);
redisAssert(elements_returned == count);
{
setTypeIterator *si;
robj *objele;
int element_encoding;
addReplyMultiBulkLen(c, elements_returned);
/* Replicate/AOF this command as an SREM operation */
aux = createStringObject("SREM",4);
si = setTypeInitIterator(aux_set);
while ((element_encoding = setTypeNext(si, &objele, &llele)) != -1) {
if (element_encoding == REDIS_ENCODING_HT) {
addReplyBulk(c, objele);
/* Replicate/AOF this command as an SREM commands */
rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
setTypeRemove(set, objele);
}
else if (element_encoding == REDIS_ENCODING_INTSET) {
/* TODO: setTypeRemove() forces us to convert all of the ints
to string... isn't there a nicer way to do this? */
objele = createStringObjectFromLongLong(llele);
addReplyBulk(c, objele);
/* Replicate/AOF this command as an SREM commands */
rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
setTypeRemove(set, objele);
/* We created it, we kill it. */
decrRefCount(objele);
}
else {
redisPanic("Unknown set encoding");
}
}
setTypeReleaseIterator(si);
decrRefCount(aux);
}
/* Free the auxiliary set - we need it no more. */
freeSetObject(aux_set);
}
void spopCommand(redisClient *c) {
robj *set, *ele, *aux;
int64_t llele;
int encoding;
if (c->argc == 3) {
spopWithCountCommand(c);
return;
} else if (c->argc > 3) {
addReply(c,shared.syntaxerr);
return;
}
/* Make sure a key with the name inputted exists, and that it's type is
indeed a set */
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,set,REDIS_SET)) return;
/* Get a random element from the set */
encoding = setTypeRandomElement(set,&ele,&llele);
/* Remove the element from the set */
if (encoding == REDIS_ENCODING_INTSET) {
ele = createStringObjectFromLongLong(llele);
set->ptr = intsetRemove(set->ptr,llele,NULL);
@ -393,6 +622,7 @@ void spopCommand(redisClient *c) {
incrRefCount(ele);
setTypeRemove(set,ele);
}
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"spop",c->argv[1],c->db->id);
/* Replicate/AOF this command as an SREM operation */
@ -401,11 +631,16 @@ void spopCommand(redisClient *c) {
decrRefCount(ele);
decrRefCount(aux);
/* Add the element to the reply */
addReplyBulk(c,ele);
/* Delete the set if it's empty */
if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
/* Set has been modified */
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
@ -587,7 +822,8 @@ int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {
return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : 0);
}
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
void sinterGenericCommand(redisClient *c, robj **setkeys,
unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *eleobj, *dstset = NULL;
@ -734,7 +970,8 @@ void sinterstoreCommand(redisClient *c) {
#define REDIS_OP_DIFF 1
#define REDIS_OP_INTER 2
void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {
void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *ele, *dstset = NULL;

View File

@ -204,6 +204,30 @@ tags {"aof"} {
}
}
## Test that SPOP with <count> (that modifies the client's argc/argv) is correctly free'd
create_aof {
append_to_aof [formatCommand sadd set foo]
append_to_aof [formatCommand sadd set bar]
append_to_aof [formatCommand sadd set gah]
append_to_aof [formatCommand spop set 2]
}
start_server_aof [list dir $server_path] {
test "AOF+SPOP: Server should have been started" {
assert_equal 1 [is_alive $srv]
}
test "AOF+SPOP: Set should have 1 member" {
set client [redis [dict get $srv host] [dict get $srv port]]
wait_for_condition 50 100 {
[catch {$client ping} e] == 0
} else {
fail "Loading DB is taking too much time."
}
assert_equal 1 [$client scard set]
}
}
## Test that EXPIREAT is loaded correctly
create_aof {
append_to_aof [formatCommand rpush list foo]

View File

@ -413,7 +413,7 @@ start_server {tags {"scripting"}} {
r sadd myset a b c
r mset a 1 b 2 c 3 d 4
assert {[r spop myset] ne {}}
assert {[r spop myset] ne {}}
assert {[r spop myset 1] ne {}}
assert {[r spop myset] ne {}}
assert {[r mget a b c d] eq {1 2 3 4}}
assert {[r spop myset] eq {}}

View File

@ -293,6 +293,13 @@ start_server {
assert_equal 0 [r scard myset]
}
test "SPOP with <count>=1 - $type" {
create_set myset $contents
assert_encoding $type myset
assert_equal $contents [lsort [list [r spop myset 1] [r spop myset 1] [r spop myset 1]]]
assert_equal 0 [r scard myset]
}
test "SRANDMEMBER - $type" {
create_set myset $contents
unset -nocomplain myset
@ -304,6 +311,41 @@ start_server {
}
}
foreach {type contents} {
hashtable {a b c d e f g h i j k l m n o p q r s t u v w x y z}
intset {1 10 11 12 13 14 15 16 17 18 19 2 20 21 22 23 24 25 26 3 4 5 6 7 8 9}
} {
test "SPOP with <count>" {
create_set myset $contents
assert_encoding $type myset
assert_equal $contents [lsort [concat [r spop myset 11] [r spop myset 9] [r spop myset 0] [r spop myset 4] [r spop myset 1] [r spop myset 0] [r spop myset 1] [r spop myset 0]]]
assert_equal 0 [r scard myset]
}
}
# As seen in intsetRandomMembers
test "SPOP using integers, testing Knuth's and Floyd's algorithm" {
create_set myset {1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20}
assert_encoding intset myset
assert_equal 20 [r scard myset]
r spop myset 1
assert_equal 19 [r scard myset]
r spop myset 2
assert_equal 17 [r scard myset]
r spop myset 3
assert_equal 14 [r scard myset]
r spop myset 10
assert_equal 4 [r scard myset]
r spop myset 10
assert_equal 0 [r scard myset]
r spop myset 1
assert_equal 0 [r scard myset]
} {}
test "SPOP using integers with Knuth's algorithm" {
r spop nonexisting_key 100
} {}
test "SRANDMEMBER with <count> against non existing key" {
r srandmember nonexisting_key 100
} {}