mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
don't use small shared integer objects when disk store is enabled
This commit is contained in:
parent
82ef6ebf73
commit
98a9abb66d
@ -87,6 +87,8 @@
|
|||||||
*
|
*
|
||||||
* - If dsSet() fails on the write thread log the error and reschedule the
|
* - If dsSet() fails on the write thread log the error and reschedule the
|
||||||
* key for flush.
|
* key for flush.
|
||||||
|
*
|
||||||
|
* - Check why INCR will not update the LRU info for the object.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* Virtual Memory is composed mainly of two subsystems:
|
/* Virtual Memory is composed mainly of two subsystems:
|
||||||
@ -133,6 +135,7 @@ void dsInit(void) {
|
|||||||
server.io_processed = listCreate();
|
server.io_processed = listCreate();
|
||||||
server.io_ready_clients = listCreate();
|
server.io_ready_clients = listCreate();
|
||||||
pthread_mutex_init(&server.io_mutex,NULL);
|
pthread_mutex_init(&server.io_mutex,NULL);
|
||||||
|
pthread_cond_init(&server.io_condvar,NULL);
|
||||||
server.io_active_threads = 0;
|
server.io_active_threads = 0;
|
||||||
if (pipe(pipefds) == -1) {
|
if (pipe(pipefds) == -1) {
|
||||||
redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
|
redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
|
||||||
@ -329,13 +332,14 @@ void *IOThreadEntryPoint(void *arg) {
|
|||||||
REDIS_NOTUSED(arg);
|
REDIS_NOTUSED(arg);
|
||||||
|
|
||||||
pthread_detach(pthread_self());
|
pthread_detach(pthread_self());
|
||||||
|
lockThreadedIO();
|
||||||
while(1) {
|
while(1) {
|
||||||
|
/* Wait for more work to do */
|
||||||
|
pthread_cond_wait(&server.io_condvar,&server.io_mutex);
|
||||||
/* Get a new job to process */
|
/* Get a new job to process */
|
||||||
lockThreadedIO();
|
|
||||||
if (listLength(server.io_newjobs) == 0) {
|
if (listLength(server.io_newjobs) == 0) {
|
||||||
/* No new jobs in queue, exit. */
|
/* No new jobs in queue, reiterate. */
|
||||||
unlockThreadedIO();
|
unlockThreadedIO();
|
||||||
sleep(1);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ln = listFirst(server.io_newjobs);
|
ln = listFirst(server.io_newjobs);
|
||||||
@ -345,6 +349,7 @@ void *IOThreadEntryPoint(void *arg) {
|
|||||||
listAddNodeTail(server.io_processing,j);
|
listAddNodeTail(server.io_processing,j);
|
||||||
ln = listLast(server.io_processing); /* We use ln later to remove it */
|
ln = listLast(server.io_processing); /* We use ln later to remove it */
|
||||||
unlockThreadedIO();
|
unlockThreadedIO();
|
||||||
|
|
||||||
redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
|
redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
|
||||||
(long) pthread_self(),
|
(long) pthread_self(),
|
||||||
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
|
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
|
||||||
@ -367,15 +372,17 @@ void *IOThreadEntryPoint(void *arg) {
|
|||||||
/* Done: insert the job into the processed queue */
|
/* Done: insert the job into the processed queue */
|
||||||
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
|
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
|
||||||
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
|
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
|
||||||
|
|
||||||
lockThreadedIO();
|
lockThreadedIO();
|
||||||
listDelNode(server.io_processing,ln);
|
listDelNode(server.io_processing,ln);
|
||||||
listAddNodeTail(server.io_processed,j);
|
listAddNodeTail(server.io_processed,j);
|
||||||
unlockThreadedIO();
|
|
||||||
|
|
||||||
/* Signal the main thread there is new stuff to process */
|
/* Signal the main thread there is new stuff to process */
|
||||||
redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
|
redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
|
||||||
}
|
}
|
||||||
return NULL; /* never reached */
|
/* never reached, but that's the full pattern... */
|
||||||
|
unlockThreadedIO();
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void spawnIOThread(void) {
|
void spawnIOThread(void) {
|
||||||
@ -449,6 +456,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
|
|||||||
|
|
||||||
lockThreadedIO();
|
lockThreadedIO();
|
||||||
queueIOJob(j);
|
queueIOJob(j);
|
||||||
|
pthread_cond_signal(&server.io_condvar);
|
||||||
unlockThreadedIO();
|
unlockThreadedIO();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,7 +167,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
|
|||||||
|
|
||||||
void addReply(redisClient *c, robj *obj) {
|
void addReply(redisClient *c, robj *obj) {
|
||||||
if (_installWriteEvent(c) != REDIS_OK) return;
|
if (_installWriteEvent(c) != REDIS_OK) return;
|
||||||
redisAssert(!server.ds_enabled || obj->storage == REDIS_DS_MEMORY);
|
redisAssert(!server.ds_enabled || obj->storage != REDIS_DS_SAVING);
|
||||||
|
|
||||||
/* This is an important place where we can avoid copy-on-write
|
/* This is an important place where we can avoid copy-on-write
|
||||||
* when there is a saving child running, avoiding touching the
|
* when there is a saving child running, avoiding touching the
|
||||||
|
@ -32,6 +32,7 @@ robj *createStringObject(char *ptr, size_t len) {
|
|||||||
robj *createStringObjectFromLongLong(long long value) {
|
robj *createStringObjectFromLongLong(long long value) {
|
||||||
robj *o;
|
robj *o;
|
||||||
if (value >= 0 && value < REDIS_SHARED_INTEGERS &&
|
if (value >= 0 && value < REDIS_SHARED_INTEGERS &&
|
||||||
|
!server.ds_enabled &&
|
||||||
pthread_equal(pthread_self(),server.mainthread)) {
|
pthread_equal(pthread_self(),server.mainthread)) {
|
||||||
incrRefCount(shared.integers[value]);
|
incrRefCount(shared.integers[value]);
|
||||||
o = shared.integers[value];
|
o = shared.integers[value];
|
||||||
@ -214,7 +215,7 @@ robj *tryObjectEncoding(robj *o) {
|
|||||||
* Note that we also avoid using shared integers when maxmemory is used
|
* Note that we also avoid using shared integers when maxmemory is used
|
||||||
* because every object needs to have a private LRU field for the LRU
|
* because every object needs to have a private LRU field for the LRU
|
||||||
* algorithm to work well. */
|
* algorithm to work well. */
|
||||||
if (server.ds_enabled == 0 &&
|
if (!server.ds_enabled &&
|
||||||
server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
|
server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
|
||||||
pthread_equal(pthread_self(),server.mainthread))
|
pthread_equal(pthread_self(),server.mainthread))
|
||||||
{
|
{
|
||||||
|
@ -459,7 +459,7 @@ struct redisServer {
|
|||||||
list *io_processed; /* List of VM I/O jobs already processed */
|
list *io_processed; /* List of VM I/O jobs already processed */
|
||||||
list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
|
list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
|
||||||
pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
|
pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
|
||||||
pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
|
pthread_cond_t io_condvar; /* I/O threads conditional variable */
|
||||||
pthread_attr_t io_threads_attr; /* attributes for threads creation */
|
pthread_attr_t io_threads_attr; /* attributes for threads creation */
|
||||||
int io_active_threads; /* Number of running I/O threads */
|
int io_active_threads; /* Number of running I/O threads */
|
||||||
int vm_max_threads; /* Max number of I/O threads running at the same time */
|
int vm_max_threads; /* Max number of I/O threads running at the same time */
|
||||||
|
Loading…
Reference in New Issue
Block a user