diff --git a/src/debug.c b/src/debug.c index 9e97868dd..0144c78ca 100644 --- a/src/debug.c +++ b/src/debug.c @@ -121,7 +121,7 @@ void computeDatasetDigest(unsigned char *final) { } else if (o->type == REDIS_SET) { setTypeIterator *si = setTypeInitIterator(o); robj *ele; - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { xorObjectDigest(digest,ele); decrRefCount(ele); } diff --git a/src/redis.h b/src/redis.h index e012db4c4..01ba0c656 100644 --- a/src/redis.h +++ b/src/redis.h @@ -813,7 +813,8 @@ int setTypeRemove(robj *subject, robj *value); int setTypeIsMember(robj *subject, robj *value); setTypeIterator *setTypeInitIterator(robj *subject); void setTypeReleaseIterator(setTypeIterator *si); -robj *setTypeNext(setTypeIterator *si); +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele); +robj *setTypeNextObject(setTypeIterator *si); int setTypeRandomElement(robj *setobj, robj **objele, long long *llele); unsigned long setTypeSize(robj *subject); void setTypeConvert(robj *subject, int enc); diff --git a/src/sort.c b/src/sort.c index 79f790105..355a0db64 100644 --- a/src/sort.c +++ b/src/sort.c @@ -222,7 +222,7 @@ void sortCommand(redisClient *c) { } else if (sortval->type == REDIS_SET) { setTypeIterator *si = setTypeInitIterator(sortval); robj *ele; - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { vector[j].obj = ele; vector[j].u.score = 0; vector[j].u.cmpobj = NULL; diff --git a/src/t_set.c b/src/t_set.c index e15952c05..0b4128adf 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -101,39 +101,68 @@ void setTypeReleaseIterator(setTypeIterator *si) { } /* Move to the next entry in the set. Returns the object at the current - * position, or NULL when the end is reached. This object will have its - * refcount incremented, so the caller needs to take care of this. */ -robj *setTypeNext(setTypeIterator *si) { - robj *ret = NULL; + * position. + * + * Since set elements can be internally be stored as redis objects or + * simple arrays of integers, setTypeNext returns the encoding of the + * set object you are iterating, and will populate the appropriate pointer + * (eobj) or (llobj) accordingly. + * + * When there are no longer elements -1 is returned. + * Returned objects ref count is not incremented, so this function is + * copy on write friendly. */ +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) { if (si->encoding == REDIS_ENCODING_HT) { dictEntry *de = dictNext(si->di); - if (de != NULL) { - ret = dictGetEntryKey(de); - incrRefCount(ret); - } + if (de == NULL) return -1; + *objele = dictGetEntryKey(de); } else if (si->encoding == REDIS_ENCODING_INTSET) { - int64_t llval; - if (intsetGet(si->subject->ptr,si->ii++,&llval)) - ret = createStringObjectFromLongLong(llval); + if (!intsetGet(si->subject->ptr,si->ii++,llele)) + return -1; } - return ret; + return si->encoding; } +/* The not copy on write friendly version but easy to use version + * of setTypeNext() is setTypeNextObject(), returning new objects + * or incrementing the ref count of returned objects. So if you don't + * retain a pointer to this object you should call decrRefCount() against it. + * + * This function is the way to go for write operations where COW is not + * an issue as the result will be anyway of incrementing the ref count. */ +robj *setTypeNextObject(setTypeIterator *si) { + int64_t intele; + robj *objele; + int encoding; + + encoding = setTypeNext(si,&objele,&intele); + switch(encoding) { + case -1: return NULL; + case REDIS_ENCODING_INTSET: + return createStringObjectFromLongLong(intele); + case REDIS_ENCODING_HT: + incrRefCount(objele); + return objele; + default: + redisPanic("Unsupported encoding"); + } + return NULL; /* just to suppress warnings */ +} /* Return random element from a non empty set. - * The returned element can be a long long value if the set is encoded + * The returned element can be a int64_t value if the set is encoded * as an "intset" blob of integers, or a redis object if the set * is a regular set. * * The caller provides both pointers to be populated with the right * object. The return value of the function is the object->encoding * field of the object and is used by the caller to check if the - * long long pointer or the redis object pointere was populated. + * int64_t pointer or the redis object pointere was populated. * * When an object is returned (the set was a real set) the ref count * of the object is not incremented so this function can be considered - * copy-on-write friendly. */ -int setTypeRandomElement(robj *setobj, robj **objele, long long *llele) { + * copy on write friendly. */ +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) { if (setobj->encoding == REDIS_ENCODING_HT) { dictEntry *de = dictGetRandomKey(setobj->ptr); *objele = dictGetEntryKey(de); @@ -158,25 +187,30 @@ unsigned long setTypeSize(robj *subject) { /* Convert the set to specified encoding. The resulting dict (when converting * to a hashtable) is presized to hold the number of elements in the original * set. */ -void setTypeConvert(robj *subject, int enc) { +void setTypeConvert(robj *setobj, int enc) { setTypeIterator *si; - robj *element; - redisAssert(subject->type == REDIS_SET); + redisAssert(setobj->type == REDIS_SET && + setobj->encoding == REDIS_ENCODING_INTSET); if (enc == REDIS_ENCODING_HT) { + int64_t intele; dict *d = dictCreate(&setDictType,NULL); - /* Presize the dict to avoid rehashing */ - dictExpand(d,intsetLen(subject->ptr)); + robj *element; - /* setTypeGet returns a robj with incremented refcount */ - si = setTypeInitIterator(subject); - while ((element = setTypeNext(si)) != NULL) + /* Presize the dict to avoid rehashing */ + dictExpand(d,intsetLen(setobj->ptr)); + + /* To add the elements we extract integers and create redis objects */ + si = setTypeInitIterator(setobj); + while (setTypeNext(si,NULL,&intele) != -1) { + element = createStringObjectFromLongLong(intele); redisAssert(dictAdd(d,element,NULL) == DICT_OK); + } setTypeReleaseIterator(si); - subject->encoding = REDIS_ENCODING_HT; - zfree(subject->ptr); - subject->ptr = d; + setobj->encoding = REDIS_ENCODING_HT; + zfree(setobj->ptr); + setobj->ptr = d; } else { redisPanic("Unsupported set conversion"); } @@ -292,7 +326,7 @@ void scardCommand(redisClient *c) { void spopCommand(redisClient *c) { robj *set, *ele; - long long llele; + int64_t llele; int encoding; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || @@ -313,7 +347,7 @@ void spopCommand(redisClient *c) { void srandmemberCommand(redisClient *c) { robj *set, *ele; - long long llele; + int64_t llele; int encoding; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || @@ -334,9 +368,11 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; - robj *ele, *dstset = NULL; + robj *eleobj, *dstset = NULL; + int64_t intobj; void *replylen = NULL; unsigned long j, cardinality = 0; + int encoding; for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? @@ -382,20 +418,60 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * the element against all the other sets, if at least one set does * not include the element it is discarded */ si = setTypeInitIterator(sets[0]); - while((ele = setTypeNext(si)) != NULL) { - for (j = 1; j < setnum; j++) - if (!setTypeIsMember(sets[j],ele)) break; + while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) { + for (j = 1; j < setnum; j++) { + if (encoding == REDIS_ENCODING_INTSET) { + /* intset with intset is simple... and fast */ + if (sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,intobj)) + { + break; + /* in order to compare an integer with an object we + * have to use the generic function, creating an object + * for this */ + } else if (sets[j]->encoding == REDIS_ENCODING_HT) { + eleobj = createStringObjectFromLongLong(intobj); + if (!setTypeIsMember(sets[j],eleobj)) { + decrRefCount(eleobj); + break; + } + decrRefCount(eleobj); + } + } else if (encoding == REDIS_ENCODING_HT) { + /* Optimization... if the source object is integer + * encoded AND the target set is an intset, we can get + * a much faster path. */ + if (eleobj->encoding == REDIS_ENCODING_INT && + sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr)) + { + break; + /* else... object to object check is easy as we use the + * type agnostic API here. */ + } else if (!setTypeIsMember(sets[j],eleobj)) { + break; + } + } + } /* Only take action when all sets contain the member */ if (j == setnum) { if (!dstkey) { - addReplyBulk(c,ele); + if (encoding == REDIS_ENCODING_HT) + addReplyBulk(c,eleobj); + else + addReplyBulkLongLong(c,intobj); cardinality++; } else { - setTypeAdd(dstset,ele); + if (encoding == REDIS_ENCODING_INTSET) { + eleobj = createStringObjectFromLongLong(intobj); + setTypeAdd(dstset,eleobj); + decrRefCount(eleobj); + } else { + setTypeAdd(dstset,eleobj); + } } } - decrRefCount(ele); } setTypeReleaseIterator(si); @@ -463,7 +539,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!sets[j]) continue; /* non existing keys are like empty sets */ si = setTypeInitIterator(sets[j]); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { if (setTypeAdd(dstset,ele)) { cardinality++; @@ -485,7 +561,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!dstkey) { addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { addReplyBulk(c,ele); decrRefCount(ele); }