SINTER/MEMBERS are now COW friendly, also some refactoring around was needed to get this result.

This commit is contained in:
antirez 2010-12-09 21:11:56 +01:00
parent a5be65f71c
commit 1b508da7ca
4 changed files with 118 additions and 41 deletions

View File

@ -121,7 +121,7 @@ void computeDatasetDigest(unsigned char *final) {
} else if (o->type == REDIS_SET) { } else if (o->type == REDIS_SET) {
setTypeIterator *si = setTypeInitIterator(o); setTypeIterator *si = setTypeInitIterator(o);
robj *ele; robj *ele;
while((ele = setTypeNext(si)) != NULL) { while((ele = setTypeNextObject(si)) != NULL) {
xorObjectDigest(digest,ele); xorObjectDigest(digest,ele);
decrRefCount(ele); decrRefCount(ele);
} }

View File

@ -813,7 +813,8 @@ int setTypeRemove(robj *subject, robj *value);
int setTypeIsMember(robj *subject, robj *value); int setTypeIsMember(robj *subject, robj *value);
setTypeIterator *setTypeInitIterator(robj *subject); setTypeIterator *setTypeInitIterator(robj *subject);
void setTypeReleaseIterator(setTypeIterator *si); void setTypeReleaseIterator(setTypeIterator *si);
robj *setTypeNext(setTypeIterator *si); int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele);
robj *setTypeNextObject(setTypeIterator *si);
int setTypeRandomElement(robj *setobj, robj **objele, long long *llele); int setTypeRandomElement(robj *setobj, robj **objele, long long *llele);
unsigned long setTypeSize(robj *subject); unsigned long setTypeSize(robj *subject);
void setTypeConvert(robj *subject, int enc); void setTypeConvert(robj *subject, int enc);

View File

@ -222,7 +222,7 @@ void sortCommand(redisClient *c) {
} else if (sortval->type == REDIS_SET) { } else if (sortval->type == REDIS_SET) {
setTypeIterator *si = setTypeInitIterator(sortval); setTypeIterator *si = setTypeInitIterator(sortval);
robj *ele; robj *ele;
while((ele = setTypeNext(si)) != NULL) { while((ele = setTypeNextObject(si)) != NULL) {
vector[j].obj = ele; vector[j].obj = ele;
vector[j].u.score = 0; vector[j].u.score = 0;
vector[j].u.cmpobj = NULL; vector[j].u.cmpobj = NULL;

View File

@ -101,39 +101,68 @@ void setTypeReleaseIterator(setTypeIterator *si) {
} }
/* Move to the next entry in the set. Returns the object at the current /* Move to the next entry in the set. Returns the object at the current
* position, or NULL when the end is reached. This object will have its * position.
* refcount incremented, so the caller needs to take care of this. */ *
robj *setTypeNext(setTypeIterator *si) { * Since set elements can be internally be stored as redis objects or
robj *ret = NULL; * simple arrays of integers, setTypeNext returns the encoding of the
* set object you are iterating, and will populate the appropriate pointer
* (eobj) or (llobj) accordingly.
*
* When there are no longer elements -1 is returned.
* Returned objects ref count is not incremented, so this function is
* copy on write friendly. */
int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) {
if (si->encoding == REDIS_ENCODING_HT) { if (si->encoding == REDIS_ENCODING_HT) {
dictEntry *de = dictNext(si->di); dictEntry *de = dictNext(si->di);
if (de != NULL) { if (de == NULL) return -1;
ret = dictGetEntryKey(de); *objele = dictGetEntryKey(de);
incrRefCount(ret);
}
} else if (si->encoding == REDIS_ENCODING_INTSET) { } else if (si->encoding == REDIS_ENCODING_INTSET) {
int64_t llval; if (!intsetGet(si->subject->ptr,si->ii++,llele))
if (intsetGet(si->subject->ptr,si->ii++,&llval)) return -1;
ret = createStringObjectFromLongLong(llval);
} }
return ret; return si->encoding;
} }
/* The not copy on write friendly version but easy to use version
* of setTypeNext() is setTypeNextObject(), returning new objects
* or incrementing the ref count of returned objects. So if you don't
* retain a pointer to this object you should call decrRefCount() against it.
*
* This function is the way to go for write operations where COW is not
* an issue as the result will be anyway of incrementing the ref count. */
robj *setTypeNextObject(setTypeIterator *si) {
int64_t intele;
robj *objele;
int encoding;
encoding = setTypeNext(si,&objele,&intele);
switch(encoding) {
case -1: return NULL;
case REDIS_ENCODING_INTSET:
return createStringObjectFromLongLong(intele);
case REDIS_ENCODING_HT:
incrRefCount(objele);
return objele;
default:
redisPanic("Unsupported encoding");
}
return NULL; /* just to suppress warnings */
}
/* Return random element from a non empty set. /* Return random element from a non empty set.
* The returned element can be a long long value if the set is encoded * The returned element can be a int64_t value if the set is encoded
* as an "intset" blob of integers, or a redis object if the set * as an "intset" blob of integers, or a redis object if the set
* is a regular set. * is a regular set.
* *
* The caller provides both pointers to be populated with the right * The caller provides both pointers to be populated with the right
* object. The return value of the function is the object->encoding * object. The return value of the function is the object->encoding
* field of the object and is used by the caller to check if the * field of the object and is used by the caller to check if the
* long long pointer or the redis object pointere was populated. * int64_t pointer or the redis object pointere was populated.
* *
* When an object is returned (the set was a real set) the ref count * When an object is returned (the set was a real set) the ref count
* of the object is not incremented so this function can be considered * of the object is not incremented so this function can be considered
* copy-on-write friendly. */ * copy on write friendly. */
int setTypeRandomElement(robj *setobj, robj **objele, long long *llele) { int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {
if (setobj->encoding == REDIS_ENCODING_HT) { if (setobj->encoding == REDIS_ENCODING_HT) {
dictEntry *de = dictGetRandomKey(setobj->ptr); dictEntry *de = dictGetRandomKey(setobj->ptr);
*objele = dictGetEntryKey(de); *objele = dictGetEntryKey(de);
@ -158,25 +187,30 @@ unsigned long setTypeSize(robj *subject) {
/* Convert the set to specified encoding. The resulting dict (when converting /* Convert the set to specified encoding. The resulting dict (when converting
* to a hashtable) is presized to hold the number of elements in the original * to a hashtable) is presized to hold the number of elements in the original
* set. */ * set. */
void setTypeConvert(robj *subject, int enc) { void setTypeConvert(robj *setobj, int enc) {
setTypeIterator *si; setTypeIterator *si;
robj *element; redisAssert(setobj->type == REDIS_SET &&
redisAssert(subject->type == REDIS_SET); setobj->encoding == REDIS_ENCODING_INTSET);
if (enc == REDIS_ENCODING_HT) { if (enc == REDIS_ENCODING_HT) {
int64_t intele;
dict *d = dictCreate(&setDictType,NULL); dict *d = dictCreate(&setDictType,NULL);
/* Presize the dict to avoid rehashing */ robj *element;
dictExpand(d,intsetLen(subject->ptr));
/* setTypeGet returns a robj with incremented refcount */ /* Presize the dict to avoid rehashing */
si = setTypeInitIterator(subject); dictExpand(d,intsetLen(setobj->ptr));
while ((element = setTypeNext(si)) != NULL)
/* To add the elements we extract integers and create redis objects */
si = setTypeInitIterator(setobj);
while (setTypeNext(si,NULL,&intele) != -1) {
element = createStringObjectFromLongLong(intele);
redisAssert(dictAdd(d,element,NULL) == DICT_OK); redisAssert(dictAdd(d,element,NULL) == DICT_OK);
}
setTypeReleaseIterator(si); setTypeReleaseIterator(si);
subject->encoding = REDIS_ENCODING_HT; setobj->encoding = REDIS_ENCODING_HT;
zfree(subject->ptr); zfree(setobj->ptr);
subject->ptr = d; setobj->ptr = d;
} else { } else {
redisPanic("Unsupported set conversion"); redisPanic("Unsupported set conversion");
} }
@ -292,7 +326,7 @@ void scardCommand(redisClient *c) {
void spopCommand(redisClient *c) { void spopCommand(redisClient *c) {
robj *set, *ele; robj *set, *ele;
long long llele; int64_t llele;
int encoding; int encoding;
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
@ -313,7 +347,7 @@ void spopCommand(redisClient *c) {
void srandmemberCommand(redisClient *c) { void srandmemberCommand(redisClient *c) {
robj *set, *ele; robj *set, *ele;
long long llele; int64_t llele;
int encoding; int encoding;
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
@ -334,9 +368,11 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum); robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si; setTypeIterator *si;
robj *ele, *dstset = NULL; robj *eleobj, *dstset = NULL;
int64_t intobj;
void *replylen = NULL; void *replylen = NULL;
unsigned long j, cardinality = 0; unsigned long j, cardinality = 0;
int encoding;
for (j = 0; j < setnum; j++) { for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ? robj *setobj = dstkey ?
@ -382,20 +418,60 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
* the element against all the other sets, if at least one set does * the element against all the other sets, if at least one set does
* not include the element it is discarded */ * not include the element it is discarded */
si = setTypeInitIterator(sets[0]); si = setTypeInitIterator(sets[0]);
while((ele = setTypeNext(si)) != NULL) { while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
for (j = 1; j < setnum; j++) for (j = 1; j < setnum; j++) {
if (!setTypeIsMember(sets[j],ele)) break; if (encoding == REDIS_ENCODING_INTSET) {
/* intset with intset is simple... and fast */
if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,intobj))
{
break;
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
} else if (sets[j]->encoding == REDIS_ENCODING_HT) {
eleobj = createStringObjectFromLongLong(intobj);
if (!setTypeIsMember(sets[j],eleobj)) {
decrRefCount(eleobj);
break;
}
decrRefCount(eleobj);
}
} else if (encoding == REDIS_ENCODING_HT) {
/* Optimization... if the source object is integer
* encoded AND the target set is an intset, we can get
* a much faster path. */
if (eleobj->encoding == REDIS_ENCODING_INT &&
sets[j]->encoding == REDIS_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr))
{
break;
/* else... object to object check is easy as we use the
* type agnostic API here. */
} else if (!setTypeIsMember(sets[j],eleobj)) {
break;
}
}
}
/* Only take action when all sets contain the member */ /* Only take action when all sets contain the member */
if (j == setnum) { if (j == setnum) {
if (!dstkey) { if (!dstkey) {
addReplyBulk(c,ele); if (encoding == REDIS_ENCODING_HT)
addReplyBulk(c,eleobj);
else
addReplyBulkLongLong(c,intobj);
cardinality++; cardinality++;
} else { } else {
setTypeAdd(dstset,ele); if (encoding == REDIS_ENCODING_INTSET) {
eleobj = createStringObjectFromLongLong(intobj);
setTypeAdd(dstset,eleobj);
decrRefCount(eleobj);
} else {
setTypeAdd(dstset,eleobj);
}
} }
} }
decrRefCount(ele);
} }
setTypeReleaseIterator(si); setTypeReleaseIterator(si);
@ -463,7 +539,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
if (!sets[j]) continue; /* non existing keys are like empty sets */ if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]); si = setTypeInitIterator(sets[j]);
while((ele = setTypeNext(si)) != NULL) { while((ele = setTypeNextObject(si)) != NULL) {
if (op == REDIS_OP_UNION || j == 0) { if (op == REDIS_OP_UNION || j == 0) {
if (setTypeAdd(dstset,ele)) { if (setTypeAdd(dstset,ele)) {
cardinality++; cardinality++;
@ -485,7 +561,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
if (!dstkey) { if (!dstkey) {
addReplyMultiBulkLen(c,cardinality); addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset); si = setTypeInitIterator(dstset);
while((ele = setTypeNext(si)) != NULL) { while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulk(c,ele); addReplyBulk(c,ele);
decrRefCount(ele); decrRefCount(ele);
} }