diff --git a/src/hyperloglog.c b/src/hyperloglog.c index a95a21c91..5e5064b1b 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -202,6 +202,8 @@ struct hllhdr { #define HLL_SPARSE 1 /* Sparse encoding */ #define HLL_MAX_ENCODING 1 +#define HLL_SPARSE_MAX 3000 + /* =========================== Low level bit macros ========================= */ /* Macros to access the dense representation. @@ -555,8 +557,46 @@ double hllDenseSum(uint8_t *registers, double *PE, int *ezp) { /* ================== Sparse representation implementation ================= */ -sds hllSparseToDense(sds *sparse) { - return sdsnew("TODO"); +/* Convert the HLL with sparse representation given as input in its dense + * representation. Both representations are represented by SDS strings, and + * the input representation is freed as a side effect. */ +sds hllSparseToDense(sds sparse) { + sds dense; + struct hllhdr *hdr, *oldhdr = (struct hllhdr*)sparse; + int idx = 0, runlen, regval; + uint8_t *p = (uint8_t*)sparse, *end = p+sdslen(sparse); + + /* Create a string of the right size filled with zero bytes. + * Note that the cached cardinality is set to 0 as a side effect + * that is exactly the cardinality of an empty HLL. */ + dense = sdsnewlen(NULL,HLL_DENSE_SIZE); + hdr = (struct hllhdr*) dense; + *hdr = *oldhdr; /* This will copy the magic and cached cardinality. */ + hdr->encoding = HLL_DENSE; + + /* Now read the sparse representation and set non-zero registers + * accordingly. */ + p += HLL_HDR_SIZE; + while(p < end) { + if (HLL_SPARSE_IS_ZERO(p)) { + runlen = HLL_SPARSE_ZERO_LEN(p); + idx += runlen; + } else if (HLL_SPARSE_IS_XZERO(p)) { + runlen = HLL_SPARSE_XZERO_LEN(p); + idx += runlen; + } else { + runlen = HLL_SPARSE_VAL_LEN(p); + regval = HLL_SPARSE_VAL_VALUE(p); + while(runlen--) { + HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval); + idx++; + } + } + } + + /* Free the old representation and return the new one. */ + sdsfree(sparse); + return dense; } /* "Add" the element in the sparse hyperloglog data structure. @@ -739,6 +779,8 @@ int hllSparseAdd(robj *o, unsigned char *ele, size_t elesize) { int seqlen = seq-n; int oldlen = is_xzero ? 2 : 1; int deltalen = seqlen-oldlen; + + if (deltalen > 0 && sdslen(o->ptr) > HLL_SPARSE_MAX) goto promote; if (deltalen && next) { memmove(next+deltalen,next,next-sparse); sdsIncrLen(o->ptr,deltalen); @@ -774,7 +816,6 @@ double hllSparseSum(uint8_t *sparse, int sparselen, double *PE, int *ezp) { uint8_t *end = sparse+sparselen, *p = sparse; while(p < end) { - /* Set span to the number of registers covered by this opcode. */ if (HLL_SPARSE_IS_ZERO(p)) { runlen = HLL_SPARSE_ZERO_LEN(p); idx += runlen;