New options for GEORADIUS: STORE and STOREDIST.

Related to issue #3019.
This commit is contained in:
antirez 2016-02-18 10:24:11 +01:00
parent 15f37ebd4a
commit bb75ecddfd
4 changed files with 116 additions and 49 deletions

142
src/geo.c
View File

@ -425,10 +425,12 @@ void geoaddCommand(client *c) {
#define RADIUS_MEMBER 2
/* GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
* [COUNT count]
* [COUNT count] [STORE key] [STOREDIST key]
* GEORADIUSBYMEMBER key member radius unit ... options ... */
void georadiusGeneric(client *c, int type) {
robj *key = c->argv[1];
robj *storekey = NULL;
int storedist = 0; /* 0 for STORE, 1 for STOREDIST. */
/* Look up the requested zset */
robj *zobj = NULL;
@ -489,6 +491,14 @@ void georadiusGeneric(client *c, int type) {
return;
}
i++;
} else if (!strcasecmp(arg, "store") && (i+1) < remaining) {
storekey = c->argv[base_args+i+1];
storedist = 0;
i++;
} else if (!strcasecmp(arg, "storedist") && (i+1) < remaining) {
storekey = c->argv[base_args+i+1];
storedist = 1;
i++;
} else {
addReply(c, shared.syntaxerr);
return;
@ -496,6 +506,14 @@ void georadiusGeneric(client *c, int type) {
}
}
/* Trap options not compatible with STORE and STOREDIST. */
if (storekey && (withdist || withhash || withcoords)) {
addReplyError(c,
"STORE option in GEORADIUS is not compatible with "
"WITHDIST, WITHHASH and WITHCOORDS options");
return;
}
/* COUNT without ordering does not make much sense, force ASC
* ordering if COUNT was specified but no sorting was requested. */
if (count != 0 && sort == SORT_NONE) sort = SORT_ASC;
@ -509,33 +527,17 @@ void georadiusGeneric(client *c, int type) {
membersOfAllNeighbors(zobj, georadius, xy[0], xy[1], radius_meters, ga);
/* If no matching results, the user gets an empty reply. */
if (ga->used == 0) {
if (ga->used == 0 && storekey == NULL) {
addReply(c, shared.emptymultibulk);
geoArrayFree(ga);
return;
}
long result_length = ga->used;
long returned_items = (count == 0 || result_length < count) ?
result_length : count;
long option_length = 0;
/* Our options are self-contained nested multibulk replies, so we
* only need to track how many of those nested replies we return. */
if (withdist)
option_length++;
if (withcoords)
option_length++;
if (withhash)
option_length++;
/* The multibulk len we send is exactly result_length. The result is either
* all strings of just zset members *or* a nested multi-bulk reply
* containing the zset member string _and_ all the additional options the
* user enabled for this request. */
addReplyMultiBulkLen(c, (count == 0 || result_length < count) ?
result_length : count);
/* Process [optional] requested sorting */
if (sort == SORT_ASC) {
qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_asc);
@ -543,35 +545,91 @@ void georadiusGeneric(client *c, int type) {
qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_desc);
}
/* Finally send results back to the caller */
int i;
for (i = 0; i < result_length; i++) {
geoPoint *gp = ga->array+i;
gp->dist /= conversion; /* Fix according to unit. */
/* If we have options in option_length, return each sub-result
* as a nested multi-bulk. Add 1 to account for result value itself. */
if (option_length)
addReplyMultiBulkLen(c, option_length + 1);
addReplyBulkSds(c,gp->member);
gp->member = NULL;
if (storekey == NULL) {
/* No target key, return results to user. */
/* Our options are self-contained nested multibulk replies, so we
* only need to track how many of those nested replies we return. */
if (withdist)
addReplyDoubleDistance(c, gp->dist);
option_length++;
if (withcoords)
option_length++;
if (withhash)
addReplyLongLong(c, gp->score);
option_length++;
if (withcoords) {
addReplyMultiBulkLen(c, 2);
addReplyDouble(c, gp->longitude);
addReplyDouble(c, gp->latitude);
/* The multibulk len we send is exactly result_length. The result is
* either all strings of just zset members *or* a nested multi-bulk
* reply containing the zset member string _and_ all the additional
* options the user enabled for this request. */
addReplyMultiBulkLen(c, returned_items);
/* Finally send results back to the caller */
int i;
for (i = 0; i < returned_items; i++) {
geoPoint *gp = ga->array+i;
gp->dist /= conversion; /* Fix according to unit. */
/* If we have options in option_length, return each sub-result
* as a nested multi-bulk. Add 1 to account for result value
* itself. */
if (option_length)
addReplyMultiBulkLen(c, option_length + 1);
addReplyBulkSds(c,gp->member);
gp->member = NULL;
if (withdist)
addReplyDoubleDistance(c, gp->dist);
if (withhash)
addReplyLongLong(c, gp->score);
if (withcoords) {
addReplyMultiBulkLen(c, 2);
addReplyDouble(c, gp->longitude);
addReplyDouble(c, gp->latitude);
}
}
} else {
/* Target key, create a sorted set with the results. */
robj *zobj;
zset *zs;
int i;
size_t maxelelen = 0;
if (returned_items) {
zobj = createZsetObject();
zs = zobj->ptr;
}
/* Stop if COUNT was specified and we already provided the
* specified number of elements. */
if (count != 0 && count == i+1) break;
for (i = 0; i < returned_items; i++) {
zskiplistNode *znode;
geoPoint *gp = ga->array+i;
gp->dist /= conversion; /* Fix according to unit. */
double score = storedist ? gp->dist : gp->score;
size_t elelen = sdslen(gp->member);
if (maxelelen < elelen) maxelelen = elelen;
znode = zslInsert(zs->zsl,score,gp->member);
serverAssert(dictAdd(zs->dict,gp->member,&znode->score) == DICT_OK);
gp->member = NULL;
}
if (returned_items) {
zsetConvertToZiplistIfNeeded(zobj,maxelelen);
setKey(c->db,storekey,zobj);
decrRefCount(zobj);
notifyKeyspaceEvent(NOTIFY_LIST,"georadiusstore",storekey,
c->db->id);
server.dirty += returned_items;
} else if (dbDelete(c->db,storekey)) {
signalModifiedKey(c->db,storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
server.dirty++;
}
addReplyLongLong(c, returned_items);
}
geoArrayFree(ga);
}

View File

@ -283,8 +283,8 @@ struct redisCommand redisCommandTable[] = {
{"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0},
{"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0},
{"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
{"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0},
{"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0},
{"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0},
{"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0},
{"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
{"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
{"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},

View File

@ -1311,6 +1311,7 @@ void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
unsigned int zsetLength(robj *zobj);
void zsetConvert(robj *zobj, int encoding);
void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen);
int zsetScore(robj *zobj, sds member, double *score);
unsigned long zslGetRank(zskiplist *zsl, double score, sds o);

View File

@ -1183,6 +1183,18 @@ void zsetConvert(robj *zobj, int encoding) {
}
}
/* Convert the sorted set object into a ziplist if it is not already a ziplist
* and if the number of elements and the maximum element size is within the
* expected ranges. */
void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
zset *zset = zobj->ptr;
if (zset->zsl->length <= server.zset_max_ziplist_entries &&
maxelelen <= server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
}
/* Return (by reference) the score of the specified member of the sorted set
* storing it into *score. If the element does not exist C_ERR is returned
* otherwise C_OK is returned and *score is correctly populated.
@ -2174,11 +2186,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
server.dirty++;
}
if (dstzset->zsl->length) {
/* Convert to ziplist when in limits. */
if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
maxelelen <= server.zset_max_ziplist_value)
zsetConvert(dstobj,OBJ_ENCODING_ZIPLIST);
zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsetLength(dstobj));
if (!touched) signalModifiedKey(c->db,dstkey);