From bb75ecddfdcab4fb530bb9c78088e53b28687816 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 18 Feb 2016 10:24:11 +0100 Subject: [PATCH] New options for GEORADIUS: STORE and STOREDIST. Related to issue #3019. --- src/geo.c | 142 ++++++++++++++++++++++++++++++++++++--------------- src/server.c | 4 +- src/server.h | 1 + src/t_zset.c | 18 +++++-- 4 files changed, 116 insertions(+), 49 deletions(-) diff --git a/src/geo.c b/src/geo.c index 560c34109..caa9e1863 100644 --- a/src/geo.c +++ b/src/geo.c @@ -425,10 +425,12 @@ void geoaddCommand(client *c) { #define RADIUS_MEMBER 2 /* GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC] - * [COUNT count] + * [COUNT count] [STORE key] [STOREDIST key] * GEORADIUSBYMEMBER key member radius unit ... options ... */ void georadiusGeneric(client *c, int type) { robj *key = c->argv[1]; + robj *storekey = NULL; + int storedist = 0; /* 0 for STORE, 1 for STOREDIST. */ /* Look up the requested zset */ robj *zobj = NULL; @@ -489,6 +491,14 @@ void georadiusGeneric(client *c, int type) { return; } i++; + } else if (!strcasecmp(arg, "store") && (i+1) < remaining) { + storekey = c->argv[base_args+i+1]; + storedist = 0; + i++; + } else if (!strcasecmp(arg, "storedist") && (i+1) < remaining) { + storekey = c->argv[base_args+i+1]; + storedist = 1; + i++; } else { addReply(c, shared.syntaxerr); return; @@ -496,6 +506,14 @@ void georadiusGeneric(client *c, int type) { } } + /* Trap options not compatible with STORE and STOREDIST. */ + if (storekey && (withdist || withhash || withcoords)) { + addReplyError(c, + "STORE option in GEORADIUS is not compatible with " + "WITHDIST, WITHHASH and WITHCOORDS options"); + return; + } + /* COUNT without ordering does not make much sense, force ASC * ordering if COUNT was specified but no sorting was requested. */ if (count != 0 && sort == SORT_NONE) sort = SORT_ASC; @@ -509,33 +527,17 @@ void georadiusGeneric(client *c, int type) { membersOfAllNeighbors(zobj, georadius, xy[0], xy[1], radius_meters, ga); /* If no matching results, the user gets an empty reply. */ - if (ga->used == 0) { + if (ga->used == 0 && storekey == NULL) { addReply(c, shared.emptymultibulk); geoArrayFree(ga); return; } long result_length = ga->used; + long returned_items = (count == 0 || result_length < count) ? + result_length : count; long option_length = 0; - /* Our options are self-contained nested multibulk replies, so we - * only need to track how many of those nested replies we return. */ - if (withdist) - option_length++; - - if (withcoords) - option_length++; - - if (withhash) - option_length++; - - /* The multibulk len we send is exactly result_length. The result is either - * all strings of just zset members *or* a nested multi-bulk reply - * containing the zset member string _and_ all the additional options the - * user enabled for this request. */ - addReplyMultiBulkLen(c, (count == 0 || result_length < count) ? - result_length : count); - /* Process [optional] requested sorting */ if (sort == SORT_ASC) { qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_asc); @@ -543,35 +545,91 @@ void georadiusGeneric(client *c, int type) { qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_desc); } - /* Finally send results back to the caller */ - int i; - for (i = 0; i < result_length; i++) { - geoPoint *gp = ga->array+i; - gp->dist /= conversion; /* Fix according to unit. */ - - /* If we have options in option_length, return each sub-result - * as a nested multi-bulk. Add 1 to account for result value itself. */ - if (option_length) - addReplyMultiBulkLen(c, option_length + 1); - - addReplyBulkSds(c,gp->member); - gp->member = NULL; + if (storekey == NULL) { + /* No target key, return results to user. */ + /* Our options are self-contained nested multibulk replies, so we + * only need to track how many of those nested replies we return. */ if (withdist) - addReplyDoubleDistance(c, gp->dist); + option_length++; + + if (withcoords) + option_length++; if (withhash) - addReplyLongLong(c, gp->score); + option_length++; - if (withcoords) { - addReplyMultiBulkLen(c, 2); - addReplyDouble(c, gp->longitude); - addReplyDouble(c, gp->latitude); + /* The multibulk len we send is exactly result_length. The result is + * either all strings of just zset members *or* a nested multi-bulk + * reply containing the zset member string _and_ all the additional + * options the user enabled for this request. */ + addReplyMultiBulkLen(c, returned_items); + + /* Finally send results back to the caller */ + int i; + for (i = 0; i < returned_items; i++) { + geoPoint *gp = ga->array+i; + gp->dist /= conversion; /* Fix according to unit. */ + + /* If we have options in option_length, return each sub-result + * as a nested multi-bulk. Add 1 to account for result value + * itself. */ + if (option_length) + addReplyMultiBulkLen(c, option_length + 1); + + addReplyBulkSds(c,gp->member); + gp->member = NULL; + + if (withdist) + addReplyDoubleDistance(c, gp->dist); + + if (withhash) + addReplyLongLong(c, gp->score); + + if (withcoords) { + addReplyMultiBulkLen(c, 2); + addReplyDouble(c, gp->longitude); + addReplyDouble(c, gp->latitude); + } + } + } else { + /* Target key, create a sorted set with the results. */ + robj *zobj; + zset *zs; + int i; + size_t maxelelen = 0; + + if (returned_items) { + zobj = createZsetObject(); + zs = zobj->ptr; } - /* Stop if COUNT was specified and we already provided the - * specified number of elements. */ - if (count != 0 && count == i+1) break; + for (i = 0; i < returned_items; i++) { + zskiplistNode *znode; + geoPoint *gp = ga->array+i; + gp->dist /= conversion; /* Fix according to unit. */ + double score = storedist ? gp->dist : gp->score; + size_t elelen = sdslen(gp->member); + + if (maxelelen < elelen) maxelelen = elelen; + znode = zslInsert(zs->zsl,score,gp->member); + serverAssert(dictAdd(zs->dict,gp->member,&znode->score) == DICT_OK); + gp->member = NULL; + } + + if (returned_items) { + zsetConvertToZiplistIfNeeded(zobj,maxelelen); + setKey(c->db,storekey,zobj); + decrRefCount(zobj); + notifyKeyspaceEvent(NOTIFY_LIST,"georadiusstore",storekey, + c->db->id); + server.dirty += returned_items; + } else if (dbDelete(c->db,storekey)) { + signalModifiedKey(c->db,storekey); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id); + server.dirty++; + } + addReplyLongLong(c, returned_items); } geoArrayFree(ga); } diff --git a/src/server.c b/src/server.c index 8ed105d93..56e1e3b73 100644 --- a/src/server.c +++ b/src/server.c @@ -283,8 +283,8 @@ struct redisCommand redisCommandTable[] = { {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0}, {"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0}, {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0}, - {"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0}, - {"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0}, + {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0}, + {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0}, {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0}, {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0}, {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0}, diff --git a/src/server.h b/src/server.h index e3e6690ec..306568eeb 100644 --- a/src/server.h +++ b/src/server.h @@ -1311,6 +1311,7 @@ void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); unsigned int zsetLength(robj *zobj); void zsetConvert(robj *zobj, int encoding); +void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen); int zsetScore(robj *zobj, sds member, double *score); unsigned long zslGetRank(zskiplist *zsl, double score, sds o); diff --git a/src/t_zset.c b/src/t_zset.c index 84bea5aa8..7956f6e9c 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1183,6 +1183,18 @@ void zsetConvert(robj *zobj, int encoding) { } } +/* Convert the sorted set object into a ziplist if it is not already a ziplist + * and if the number of elements and the maximum element size is within the + * expected ranges. */ +void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) { + if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return; + zset *zset = zobj->ptr; + + if (zset->zsl->length <= server.zset_max_ziplist_entries && + maxelelen <= server.zset_max_ziplist_value) + zsetConvert(zobj,OBJ_ENCODING_ZIPLIST); +} + /* Return (by reference) the score of the specified member of the sorted set * storing it into *score. If the element does not exist C_ERR is returned * otherwise C_OK is returned and *score is correctly populated. @@ -2174,11 +2186,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { server.dirty++; } if (dstzset->zsl->length) { - /* Convert to ziplist when in limits. */ - if (dstzset->zsl->length <= server.zset_max_ziplist_entries && - maxelelen <= server.zset_max_ziplist_value) - zsetConvert(dstobj,OBJ_ENCODING_ZIPLIST); - + zsetConvertToZiplistIfNeeded(dstobj,maxelelen); dbAdd(c->db,dstkey,dstobj); addReplyLongLong(c,zsetLength(dstobj)); if (!touched) signalModifiedKey(c->db,dstkey);