Cluster: clusterReadHandler() reworked to be more correct and simpler to follow.

This commit is contained in:
antirez 2013-09-03 11:43:07 +02:00
parent 1036b4b21b
commit 354a5de270

View File

@ -1169,53 +1169,52 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
REDIS_NOTUSED(el); REDIS_NOTUSED(el);
REDIS_NOTUSED(mask); REDIS_NOTUSED(mask);
again: while(1) { /* Read as long as there is data to read. */
rcvbuflen = sdslen(link->rcvbuf); rcvbuflen = sdslen(link->rcvbuf);
if (rcvbuflen < 4) { if (rcvbuflen < 4) {
/* First, obtain the first four bytes to get the full message /* First, obtain the first four bytes to get the full message
* length. */ * length. */
readlen = 4 - rcvbuflen; readlen = 4 - rcvbuflen;
} else { } else {
/* Finally read the full message. */ /* Finally read the full message. */
hdr = (clusterMsg*) link->rcvbuf; hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 4) { if (rcvbuflen == 4) {
/* Perform some sanity check on the message length. */ /* Perform some sanity check on the message length. */
if (ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) { if (ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) {
redisLog(REDIS_WARNING, redisLog(REDIS_WARNING,
"Bad message length received from Cluster bus."); "Bad message length received from Cluster bus.");
handleLinkIOError(link); handleLinkIOError(link);
return; return;
}
} }
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
} }
readlen = ntohl(hdr->totlen) - rcvbuflen;
}
nread = read(fd,buf,readlen); nread = read(fd,buf,readlen);
if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */ if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
if (nread <= 0) { if (nread <= 0) {
/* I/O error... */ /* I/O error... */
redisLog(REDIS_DEBUG,"I/O error reading from node link: %s", redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno)); (nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link); handleLinkIOError(link);
return; return;
} else { } else {
/* Read data and recast the pointer to the new buffer. */ /* Read data and recast the pointer to the new buffer. */
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread); link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf; hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread; rcvbuflen += nread;
} }
/* Total length obtained? read the payload now instead of burning /* Total length obtained? Process this packet. */
* cycles waiting for a new event to fire. */ if (rcvbuflen >= 4 && rcvbuflen == ntohl(hdr->totlen)) {
if (rcvbuflen == 4) goto again; if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
/* Whole packet in memory? We can process it. */ link->rcvbuf = sdsempty();
if (rcvbuflen == ntohl(hdr->totlen)) { } else {
if (clusterProcessPacket(link)) { return; /* Link no longer valid. */
sdsfree(link->rcvbuf); }
link->rcvbuf = sdsempty();
rcvbuflen = 0; /* Useless line of code currently... defensive. */
} }
} }
} }