redict/src/t_string.c
antirez 894eba07c8 Introduction of a new string encoding: EMBSTR
Previously two string encodings were used for string objects:

1) REDIS_ENCODING_RAW: a string object with obj->ptr pointing to an sds
stirng.

2) REDIS_ENCODING_INT: a string object where the obj->ptr void pointer
is casted to a long.

This commit introduces a experimental new encoding called
REDIS_ENCODING_EMBSTR that implements an object represented by an sds
string that is not modifiable but allocated in the same memory chunk as
the robj structure itself.

The chunk looks like the following:

+--------------+-----------+------------+--------+----+
| robj data... | robj->ptr | sds header | string | \0 |
+--------------+-----+-----+------------+--------+----+
                     |                       ^
                     +-----------------------+

The robj->ptr points to the contiguous sds string data, so the object
can be manipulated with the same functions used to manipulate plan
string objects, however we need just on malloc and one free in order to
allocate or release this kind of objects. Moreover it has better cache
locality.

This new allocation strategy should benefit both the memory usage and
the performances. A performance gain between 60 and 70% was observed
during micro-benchmarks, however there is more work to do to evaluate
the performance impact and the memory usage behavior.
2013-07-22 10:31:38 +02:00

460 lines
15 KiB
C

/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "redis.h"
#include <math.h> /* isnan(), isinf() */
/*-----------------------------------------------------------------------------
* String Commands
*----------------------------------------------------------------------------*/
static int checkStringLength(redisClient *c, long long size) {
if (size > 512*1024*1024) {
addReplyError(c,"string exceeds maximum allowed size (512MB)");
return REDIS_ERR;
}
return REDIS_OK;
}
/* The setGenericCommand() function implements the SET operation with different
* options and variants. This function is called in order to implement the
* following commands: SET, SETEX, PSETEX, SETNX.
*
* 'flags' changes the behavior of the command (NX or XX, see belove).
*
* 'expire' represents an expire to set in form of a Redis object as passed
* by the user. It is interpreted according to the specified 'unit'.
*
* 'ok_reply' and 'abort_reply' is what the function will reply to the client
* if the operation is performed, or when it is not because of NX or
* XX flags.
*
* If ok_reply is NULL "+OK" is used.
* If abort_reply is NULL, "$-1" is used. */
#define REDIS_SET_NO_FLAGS 0
#define REDIS_SET_NX (1<<0) /* Set if key not exists. */
#define REDIS_SET_XX (1<<1) /* Set if key exists. */
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
if (expire) {
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
return;
if (milliseconds <= 0) {
addReplyError(c,"invalid expire time in SETEX");
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
setKey(c->db,key,val);
server.dirty++;
if (expire) setExpire(c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"expire",key,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(redisClient *c) {
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = REDIS_SET_NO_FLAGS;
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
flags |= REDIS_SET_NX;
} else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
flags |= REDIS_SET_XX;
} else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
unit = UNIT_SECONDS;
expire = next;
j++;
} else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
unit = UNIT_MILLISECONDS;
expire = next;
j++;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setnxCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}
void setexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}
void psetexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}
int getGenericCommand(redisClient *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
return REDIS_OK;
if (o->type != REDIS_STRING) {
addReply(c,shared.wrongtypeerr);
return REDIS_ERR;
} else {
addReplyBulk(c,o);
return REDIS_OK;
}
}
void getCommand(redisClient *c) {
getGenericCommand(c);
}
void getsetCommand(redisClient *c) {
if (getGenericCommand(c) == REDIS_ERR) return;
c->argv[2] = tryObjectEncoding(c->argv[2]);
setKey(c->db,c->argv[1],c->argv[2]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[1],c->db->id);
server.dirty++;
}
void setrangeCommand(redisClient *c) {
robj *o;
long offset;
sds value = c->argv[3]->ptr;
if (getLongFromObjectOrReply(c,c->argv[2],&offset,NULL) != REDIS_OK)
return;
if (offset < 0) {
addReplyError(c,"offset is out of range");
return;
}
o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
/* Return 0 when setting nothing on a non-existing string */
if (sdslen(value) == 0) {
addReply(c,shared.czero);
return;
}
/* Return when the resulting string exceeds allowed size */
if (checkStringLength(c,offset+sdslen(value)) != REDIS_OK)
return;
o = createObject(REDIS_STRING,sdsempty());
dbAdd(c->db,c->argv[1],o);
} else {
size_t olen;
/* Key exists, check type */
if (checkType(c,o,REDIS_STRING))
return;
/* Return existing string length when setting nothing */
olen = stringObjectLen(o);
if (sdslen(value) == 0) {
addReplyLongLong(c,olen);
return;
}
/* Return when the resulting string exceeds allowed size */
if (checkStringLength(c,offset+sdslen(value)) != REDIS_OK)
return;
/* Create a copy when the object is shared or encoded. */
if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbOverwrite(c->db,c->argv[1],o);
}
}
if (sdslen(value) > 0) {
o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
memcpy((char*)o->ptr+offset,value,sdslen(value));
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,
"setrange",c->argv[1],c->db->id);
server.dirty++;
}
addReplyLongLong(c,sdslen(o->ptr));
}
void getrangeCommand(redisClient *c) {
robj *o;
long start, end;
char *str, llbuf[32];
size_t strlen;
if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != REDIS_OK)
return;
if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != REDIS_OK)
return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptybulk)) == NULL ||
checkType(c,o,REDIS_STRING)) return;
if (o->encoding == REDIS_ENCODING_INT) {
str = llbuf;
strlen = ll2string(llbuf,sizeof(llbuf),(long)o->ptr);
} else {
str = o->ptr;
strlen = sdslen(str);
}
/* Convert negative indexes */
if (start < 0) start = strlen+start;
if (end < 0) end = strlen+end;
if (start < 0) start = 0;
if (end < 0) end = 0;
if ((unsigned)end >= strlen) end = strlen-1;
/* Precondition: end >= 0 && end < strlen, so the only condition where
* nothing can be returned is: start > end. */
if (start > end) {
addReply(c,shared.emptybulk);
} else {
addReplyBulkCBuffer(c,(char*)str+start,end-start+1);
}
}
void mgetCommand(redisClient *c) {
int j;
addReplyMultiBulkLen(c,c->argc-1);
for (j = 1; j < c->argc; j++) {
robj *o = lookupKeyRead(c->db,c->argv[j]);
if (o == NULL) {
addReply(c,shared.nullbulk);
} else {
if (o->type != REDIS_STRING) {
addReply(c,shared.nullbulk);
} else {
addReplyBulk(c,o);
}
}
}
}
void msetGenericCommand(redisClient *c, int nx) {
int j, busykeys = 0;
if ((c->argc % 2) == 0) {
addReplyError(c,"wrong number of arguments for MSET");
return;
}
/* Handle the NX flag. The MSETNX semantic is to return zero and don't
* set nothing at all if at least one already key exists. */
if (nx) {
for (j = 1; j < c->argc; j += 2) {
if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
busykeys++;
}
}
if (busykeys) {
addReply(c, shared.czero);
return;
}
}
for (j = 1; j < c->argc; j += 2) {
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
setKey(c->db,c->argv[j],c->argv[j+1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
}
server.dirty += (c->argc-1)/2;
addReply(c, nx ? shared.cone : shared.ok);
}
void msetCommand(redisClient *c) {
msetGenericCommand(c,0);
}
void msetnxCommand(redisClient *c) {
msetGenericCommand(c,1);
}
void incrDecrCommand(redisClient *c, long long incr) {
long long value, oldvalue;
robj *o, *new;
o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,REDIS_STRING)) return;
if (getLongLongFromObjectOrReply(c,o,&value,NULL) != REDIS_OK) return;
oldvalue = value;
if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
(incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
addReplyError(c,"increment or decrement would overflow");
return;
}
value += incr;
new = createStringObjectFromLongLong(value);
if (o)
dbOverwrite(c->db,c->argv[1],new);
else
dbAdd(c->db,c->argv[1],new);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
server.dirty++;
addReply(c,shared.colon);
addReply(c,new);
addReply(c,shared.crlf);
}
void incrCommand(redisClient *c) {
incrDecrCommand(c,1);
}
void decrCommand(redisClient *c) {
incrDecrCommand(c,-1);
}
void incrbyCommand(redisClient *c) {
long long incr;
if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != REDIS_OK) return;
incrDecrCommand(c,incr);
}
void decrbyCommand(redisClient *c) {
long long incr;
if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != REDIS_OK) return;
incrDecrCommand(c,-incr);
}
void incrbyfloatCommand(redisClient *c) {
long double incr, value;
robj *o, *new, *aux;
o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,REDIS_STRING)) return;
if (getLongDoubleFromObjectOrReply(c,o,&value,NULL) != REDIS_OK ||
getLongDoubleFromObjectOrReply(c,c->argv[2],&incr,NULL) != REDIS_OK)
return;
value += incr;
if (isnan(value) || isinf(value)) {
addReplyError(c,"increment would produce NaN or Infinity");
return;
}
new = createStringObjectFromLongDouble(value);
if (o)
dbOverwrite(c->db,c->argv[1],new);
else
dbAdd(c->db,c->argv[1],new);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id);
server.dirty++;
addReplyBulk(c,new);
/* Always replicate INCRBYFLOAT as a SET command with the final value
* in order to make sure that differences in float precision or formatting
* will not create differences in replicas or after an AOF restart. */
aux = createStringObject("SET",3);
rewriteClientCommandArgument(c,0,aux);
decrRefCount(aux);
rewriteClientCommandArgument(c,2,new);
}
void appendCommand(redisClient *c) {
size_t totlen;
robj *o, *append;
o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
/* Create the key */
c->argv[2] = tryObjectEncoding(c->argv[2]);
dbAdd(c->db,c->argv[1],c->argv[2]);
incrRefCount(c->argv[2]);
totlen = stringObjectLen(c->argv[2]);
} else {
/* Key exists, check type */
if (checkType(c,o,REDIS_STRING))
return;
/* "append" is an argument, so always an sds */
append = c->argv[2];
totlen = stringObjectLen(o)+sdslen(append->ptr);
if (checkStringLength(c,totlen) != REDIS_OK)
return;
/* If the object is shared or encoded, we have to make a copy */
if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbOverwrite(c->db,c->argv[1],o);
}
/* Append the value */
o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
totlen = sdslen(o->ptr);
}
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"append",c->argv[1],c->db->id);
server.dirty++;
addReplyLongLong(c,totlen);
}
void strlenCommand(redisClient *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,REDIS_STRING)) return;
addReplyLongLong(c,stringObjectLen(o));
}