Redis ZipMap 数据结构和源码剖析

在redis中,使用 hashtable 实现了 set、sorted set、hash 结构。而单纯的 hashtable ,因为是一次分配一大块内存,所以在存储少量数据时会存在空间利用率很低的问题。

redis 是内存数据库,而内存是昂贵的,所以需要尽量避免内存的浪费,所以有了 zipmap 结构。

zipmap?

我们通过 what、why、how 来对 zipmap 进行全面的认识

what?(它是什么)

ZipMap 实质上就是一个特殊格式的字符串。

why?(为什么要用它)

因为redis是个内存数据库,要尽量避免内存浪费

how?(它是怎么实现的)

zipmap 是用连续内存保存 key-value 对的结构,查询时是依次遍列每个 key-value 对,直到查到为止。其存储结构如下所示:
zipmap 结构图示

各字段详解:

zmlen:

zmlen 是1个字节的无符号整型数,表示 zipmap 当前的键值对数量,最多只能表示253个键值对。
当 zipmap 中的元素数量大于或等于254时,只能通过遍历一遍 zipmap 来确定其大小。

key length:

如果随后的字符串(key)长度小于或等于253,直接用1个字节表示其长度。
如果随后的字符串(key)长度超过254,则用5个字节表示,其中第一个字节值为254,接下来的4个字节才是字符串长度。

key:

一个 k-v 对的 key,例如我们在 zipmap 中存储了一个 key 为 abc , value 为123的 k-v 对,那么 key 就是abc,
并且前面的 key length 就为1个字节,其值为3

value length:

与 key length 相同

free:

free 字段是占1个字节的整型数,用于表示 value 中当前的空闲字节数(内存碎片),它的值一般都比较小,
如果空闲区间超过阈值(默认为4),zipmap 会进行调整以使整个 map 尽可能小。这个操作发生在写 zipmap 时。

假设 zipmap 存在 "foo" => "bar" 这样一个键值对,随后我们将 value 更改为 "hi",
此时free = 1,表示value字符串当前有1个字节大小的空闲空间。

value:

与 key 相同,区别在于当 value 更新后所占字节变小时,会有空闲空间,此空间大小由 free 表示

end:

end是 zipmap 的结尾符,占用1个字节,其值为255。表示 zipmap 的结尾

源码剖析:

部分常量定义:

#define ZIPMAP_BIGLEN 254
#define ZIPMAP_END 255
#define ZIPMAP_VALUE_MAX_FREE 4

/* The following macro returns the number of bytes needed to encode the length
 * for the integer value _l, that is, 1 byte for lengths < ZIPMAP_BIGLEN and
 * 5 bytes for all the other lengths. */
//定义了检查len字段长度的方法,若小于254,则返回1个字节,否则返回5个字节
#define ZIPMAP_LEN_BYTES(_l) (((_l) < ZIPMAP_BIGLEN) ? 1 : sizeof(unsigned int)+1)

查找:

static unsigned char *zipmapLookupRaw(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned int *totlen) {
    unsigned char *p = zm+1, *k = NULL;//因为zm指针的第一位指向的是zmlen,所以需要+1,将指针指向第一个元素
    unsigned int l,llen;

    while(*p != ZIPMAP_END) {//遍历zipmap字符串,查找对应的k-v对
        unsigned char free;

        /* Match or skip the key */
        l = zipmapDecodeLength(p);//获取key length
        llen = zipmapEncodeLength(NULL,l);//获取key length 所占字节(1或5)

        //key存在 && k为NULL && 查找出的当前key长度l和目标key长度klen相同 && 两块内存的前l个字节完全相同,则视作查找到了目标    key
        if (key != NULL && k == NULL && l == klen && !memcmp(p+llen,key,l)) {
            /* Only return when the user doesn't care
             * for the total length of the zipmap. */
            if (totlen != NULL) {
                k = p;
            } else {
                return p;
            }
        }
        p += llen+l;
        /* Skip the value as well */
        l = zipmapDecodeLength(p);
        p += zipmapEncodeLength(NULL,l);
        free = p[0];
        p += l+1+free; /* +1 to skip the free byte */
    }
    if (totlen != NULL) *totlen = (unsigned int)(p-zm)+1;//返回当前zipmap占用多少字节,+1是加上END位
    return k;
}

插入/更新:

unsigned char *zipmapSet(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned char *val, unsigned int vlen, int *update) {
    unsigned int zmlen, offset;
    unsigned int freelen, reqlen = zipmapRequiredLength(klen,vlen);
    unsigned int empty, vempty;
    unsigned char *p;

    freelen = reqlen;
    if (update) *update = 0;
    p = zipmapLookupRaw(zm,key,klen,&zmlen);//先查找是否已存在于zipmap中
    if (p == NULL) {//若不存在,则插入
        /* Key not found: enlarge */
        zm = zipmapResize(zm, zmlen+reqlen);//重新分配zm内存
        p = zm+zmlen-1;//指针指向zipmap尾部
        zmlen = zmlen+reqlen;
        /* Increase zipmap length (this is an insert) */
        if (zm[0] < ZIPMAP_BIGLEN) zm[0]++;//若zmlen小于254,则更新zmlen
    } else {//若存在,则更新
        /* Key found. Is there enough space for the new value? */
        /* Compute the total length: */
        if (update) *update = 1;
        freelen = zipmapRawEntryLength(p);//这里是当前value所占用的字节数
        if (freelen < reqlen) {//若当前value的空间不足以存放更新后的value,则申请内存
            /* Store the offset of this key within the current zipmap, so
             * it can be resized. Then, move the tail backwards so this
             * pair fits at the current position. */
            offset = p-zm;
            zm = zipmapResize(zm, zmlen-freelen+reqlen);
            p = zm+offset;

            /* The +1 in the number of bytes to be moved is caused by the
             * end-of-zipmap byte. Note: the *original* zmlen is used. */
            memmove(p+reqlen, p+freelen, zmlen-(offset+freelen+1));//将zipmap后续的整段内存向后移动,为当前key腾出空间
            zmlen = zmlen-freelen+reqlen;
            freelen = reqlen;
        }
    }

    /* We now have a suitable block where the key/value entry can
     * be written. If there is too much free space, move the tail
     * of the zipmap a few bytes to the front and shrink the zipmap,
     * as we want zipmaps to be very space efficient. */
    empty = freelen-reqlen;
    if (empty >= ZIPMAP_VALUE_MAX_FREE) {//如果旧value所占字节比更新后value大4个字节以上,则将后续整段内容向前移动,减少内存浪费
        /* First, move the tail <empty> bytes to the front, then resize
         * the zipmap to be <empty> bytes smaller. */
        offset = p-zm;
        memmove(p+reqlen, p+freelen, zmlen-(offset+freelen+1));    
            zmlen -= empty;
           zm = zipmapResize(zm, zmlen);
            p = zm+offset;
           vempty = 0;
    } else {
            vempty = empty;
    }

    //在这里对zipmap进行写入
    /* Just write the key + value and we are done. */
    /* Key: */
    p += zipmapEncodeLength(p,klen);//这里将key length写入了zipmap,并移动p指针到key位置
    memcpy(p,key,klen);//写入key
    p += klen;//指针移动到value length 位置
    /* Value: */
    p += zipmapEncodeLength(p,vlen);//这里将value length写入了zipmap,并移动指针到free位置
    *p++ = vempty;//将当前free值写入free域,并移动指针到value位置
    memcpy(p,val,vlen);//写入value
    return zm;
}

删除:

unsigned char *zipmapDel(unsigned char *zm, unsigned char *key, unsigned int klen, int *deleted) {
    unsigned int zmlen, freelen;
    unsigned char *p = zipmapLookupRaw(zm,key,klen,&zmlen);
    if (p) {
        freelen = zipmapRawEntryLength(p);//获取要释放的内存长度
        memmove(p, p+freelen, zmlen-((p-zm)+freelen+1));//后段内存直接覆盖前段内存
        zm = zipmapResize(zm, zmlen-freelen);//释放尾部空闲空间

        //所以当zipmap已经存储超过254个k-v对后,使用将zipmap删除到253个,zmlen域也不会在这里变为253,
        //而是在调用 zipmapLen 方法时才去更新 zmlen
        /* Decrease zipmap length */
        if (zm[0] < ZIPMAP_BIGLEN) zm[0]--;

        if (deleted) *deleted = 1;
    } else {
        if (deleted) *deleted = 0;
    }
    return zm;
}

获取当前 zipmap 元素数量:

/* Return the number of entries inside a zipmap */
unsigned int zipmapLen(unsigned char *zm) {
    unsigned int len = 0;
    if (zm[0] < ZIPMAP_BIGLEN) { //如果小于254,则zmlen真实表示了 zm 元素数
        len = zm[0];
    } else { //若zmlen为254,则需要遍历整个 zm 从而获取当前元素数量
        unsigned char *p = zipmapRewind(zm);
        while((p = zipmapNext(p,NULL,NULL,NULL,NULL)) != NULL) len++;
        /* Re-store length if small enough */
        if (len < ZIPMAP_BIGLEN) zm[0] = len; //如果zmlen域目前小于254,其实就意味着在数量超过254后又删除至253以内,此时会再度更新zmlen
    }
    return len;
}

总结:

zipmap 通过在一块连续的内存空间上依次存放 key-value 对来维持 key 和 value 的映射结构,但它跟我们传统意义上的散列结构不同,只能通过依次遍历每个 key-value 节点来查找给定 key 值的 value 信息,所以其时间复杂度为O(n),但是,相比于 hash 结构,zipmap 通过固定的字节编码节省了很多内存空间。 另外,由于使用了一块连续的内存空间,zipmap 的每一次插入、删除、更新操作都有可能造成空间的重新分配。

从 zipmap 的实现上我们可以发现,当 zipmap 中存储的元素超过254后,每次统计元素数量,都需要完整遍历整个 zipmap, 并且 由于 zipmap的线性查找方式,所以当数据量很大时,查找和修改的性能也会跟着下降很多(因为需要遍历到对应位置再做相应操作)。所以我们得出结论:zipmap 不适合用来存放大量的 key-value 对。 故而在运用上,redis中内置的 Hash 结构在保存的元素数量较少时会采用 zipmap 来存放键值对,当元素的数量到达一定值后,会转为用哈希表来存储,从而达到性能和空间的均衡。