list是计算机科学中最基本的数据结构之一,也是redis支持的数据类型之一。
最常见的list是链表(Linked List),redis是内存数据库,节约存储空间至关重要,因此redis中还实现了其它几种存储效率更高的list:ziplist、quicklist、listpack。
本文将一一介绍它们的具体结构,还会简单分析redis 5.0版本他们的insert方法的源代码

双端链表(Double Linked List)

双端链表的数据结构如下:

               ___________________________________________________________________
              |                                                                   |
              |                                                                   V
[(metadata)(tailP)(headP)]-->[(pp1)(dataP1)(pn1)]<--->[(pp2)(dataP2)(pn2)]......[(ppN)(dataPN)(pnN)]
  • 一开始是链表头结点,存储链表的基本信息(metadata),如结点的个数。除此之外,还有一个指向第一个数据结点的指针(头指针),指向最后一个结点的指针(尾指针)
  • 第一个数据结点由三部分组成。第一部分指针pp1指向前一个数据结点(第一个数据结点不存在前一个结点,因此pp1等于NULL),第二部分指针dataP1指向真正的数据,第三部分指针pn1指向下一个数据结点(即第二个数据结点)
  • 第二个数据结点由三部分组成。第一部分指针pp2指向前一个数据结点(即第一个数据结点),第二部分指针dataP2指向真正的数据,第三部分指针pn2指向下一个数据结点(即第三个数据结点)
    ……
    ……
  • 第N个数据结点由三部分组成。第一部分ppN指向前一个数据结点(即第N-1个数据结点),第二部分指针dataPN指向真正的数据,第三部分指针pnN指向下一个结点(最后一个结点不存在下一个结点,因此pnN等于NULL)

双端链表是最常见的链表,结构简单清晰,可以从链表头从左到右遍历整个链表,也可以从链表尾从右到左遍历整个链表。
然而对于redis应用来说,双端列表有一个严重的缺陷:元数据占用的内存太多。我们可以具体分析下,一个数据节点有3个指针(64系统里指针占8个字节)占24字节,数据指针指向的是一个redis object,redis object中又包含一些元数据,算下来,每个节点都需要约40字节存储元数据。而真正的数据,比如一个整数或者短字符串,远小于40字节,这样大多数的内存都用在了存储元数据上,非常不划算。

为了解决这个问题,redis使用另一种存储效率更高的链表:ziplist。

ziplist

双端链链表的数据在内存中不是连续的,需要通过指针把数据串联起来,而指针过多导致了占用的内存也会变多。
所以ziplist解决这个问题的思路是:ziplist在内存中占据一段连续空间,要访问某个结点,只需要知道它相对于ziplist开头在偏移量,完全避免了指针。
ziplist数据结构如下:

[zlbyte][zltail][zllen][entry1][entry2]....[END]
每个entry的结构如下:
[<prevlen><encoding><entry-data>]
  • zlbyte:4个字节的32位无符号整数,表示链表的字节数
  • zltail:4个字节的32位无符号整数,表示最后一个结点的偏移量(类似双端链表中尾指针的作用,可以快速找到最后一个结点)
  • zllen:2个字节的无符号整数,表示结点的个数。
  • entry:结点。结点由三部分组成:prevlen、encoding、entry-data
    • prevlen:前一个结点的长度(单位是字节),采用变长编码。
      当长度小于254时,占一个字节;
      当长度大于或等于254时占5个字节,其中第一个字节固定为254,剩下的4字节表示32位无符号整数
    • encoding:数据的编码类型,可能是整形或字符串,变长编码,第一个字节即可确定编码类型。例如:
      如果第一个字节的二进制形式是00pppppp,00开头表示后面的数据是字符串,字符串长度是pppppp表示的6位无符号整数
      如果前两个字节的二进制是01pppppp qqqqqqqq,01开头表示后面的数据是字符串,字符串长度是pppppp qqqqqqqq表示的14位无符号整数
      如果前两个字节的二进制是11111110 qqqqqqqq,11开头表示后面的数据是整数,11111110表示后面的是8位有符号整数,qqqqqqqq 就是8位的数据
      这里只是列举了部分编码,还有其它情况可以参见源码。
    • entry-data:存储真正的数据
  • END:1个字节,结束标记,固定为255

例子

下面的ziplist包含两个结点,第一个结点的存储整数50,第二个结点存储字符串hello

 [15 00 00 00] [0d 00 00 00] [02 00] [00 fe 32] [03 05 68 65 6c 6c 6f] [ff]
       |             |          |        |                |             |
    zlbytes        zltail    entries   entry1:50    entry2:hello	   end
  • 1-4字节zlbytes,表示ziplist的总字节数是21字节
  • 5-8字节zltail,表示最后一个结点相对ziplist开头的偏移量是13字节
  • 9-10字节entries,表示结点的个数是2个
  • 11字节prevlen,表示前一个结点的长度是0(因为第一个结点不存在前一个结点)
  • 12字节encoding,fe转化为二进制是11111110,说明数据是一个8位有符号整数
  • 13字节entrydata,真正的数据,整数50
  • 14字节prevlen,表示前一个结点的长度是3字节
  • 15字节encoding,fe转化为二进制是00000101,说明数据是字符串,字符串长度是该字节后6位表示的无符号整数(000101),转化为十进制是5
  • 16-20字节entrydata,真正的数据,hello字符串的asccii码
  • 21字节END

ziplist的缺点

  • ziplist结点数量有限制,不适宜太多。这是因为ziplist要求连续在内存空间,插入和删除结点后都涉及到内存分配和大量结点的整体移动。
    quciklist可以为解决这个问题
  • ziplist结点的prevlen字段可以让我们从尾结点向头结点的方向遍历整个list,但这个设计也带来了较高的复杂度。
    试想一下,插入一个结点后,下一个结点的prevlen字段的值很可能发生变化,prevlen是变长编码,那么prevlen占的空间也可能发生变化。这种情况下,会进一步导致下下个结点的prevlen字段的值发生变化,引发连锁更新效应,非常复杂。
    listpack可以解决这个问题

quicklist

quick创造性的把linked list和ziplist结合起来,多个小的ziplist组合成doubled linked listi,这样一来,既能高效的利用内存空间又没有结点数量的限制。quicklist数据结构如下:

[ziplist0]<-->[ziplist1]<-->...<-->[ziplistN]

虽然整个quicklist没有结点的数量的限制,但单个ziplist结点数量限制仍然存在。当向ziplist中插入一个结点时,如果超过了最大结点数限制,那么当前ziplist必须分裂为两个小的ziplist,这个地方也是比较麻烦的

listpack

listpack和ziplist类似也是占据一片连续的内存空间,listpack数据结构如下:

[total-bytes][num-elements][element1][element2]...[elementN][END]
每个element的结构如下:
[<encoding-type><element-data><element-total-len>]
  • total-bytes:固定为4字节,32位无符号整数,表示listpack的长度
  • num-elements:固定为2字节,16位无符号整数,表示结点个数。
    16位无符号整数的最大能表示的整数是65535。结点个数大于或等于65535的情况下,这个字段始终为65535,获取结点的真实个数,需要把整个list遍历一遍
  • element1:数据结点1,结构如下
    • encoding-type:变长编码,数据编码类型,举两个例子
      当这个字节的二进制形式为0xxxxxxx时,表示数据是一个7位无符号整数,该字节的后7位就用来存储这个整数。这种情况下,不需要专门的element-data字段,encoding-type和element-data共用一个字节
      当这个字节的二进制形式为10xxxxxx时,表示数据是一个字符串,字符串的长度是一个6位的无符号整数,存储在该字节的后6位中。
    • element-data:真正的数据
    • element-total-len:变长编码,该结点的长度(不包含element-total-len字段自身),单位是字节,这个字段的细节后面单独介绍。
  • element2:数据结点2
  • END:结束标记,固定为FF

element-total-len

element-total-len字段存储本结点的长度(不包含element-total-len字段本身),本结点的长度存储在本结点中,而不是像ziplist那样存储在后一个结点中,避免了ziplist中可能出现的连锁更新效应。
存储结点长度本质上是为了能够从右向左遍历list,为此element-total-len有一些特殊的地方:

  1. 为了适应从右向左解析,element-total-len的低位字节存放在右边(大端序big endian)
  2. 为了节省空间element-total-len仍然采用变长编码
  3. 每个字节的最高位标记当前字节后是否还有更多字节属于element-total-len字段,剩下的7位表示数据。
    最高位等于0表示没有更多字节,当前字节就是element-total-len字段的最后一个字节;最高位等于1表示还有更多字节。

假设elment-total-len是500,那么编码是怎么样的呢?
500用二进制表示是

111110100

然后拆分为两个字节(每个字节只用7位)表示

0000011 1110100

每个字节加止第八位,最左边字节后面没有更多字节第八位加0,其他加1

[0]0000011          [1]1110100
 |                   |
 `- no more bytes    `- more bytes to the left

右到左遍历listpack时,根据listpack总长度,可以得到结束标识的位置,与结束标记相临的是最后一个结点。
于是可以从尾结点开始向左解析结点数据,解析结点数据时,首先可以解析得到element-total-len,这样可以得到当前结点的开始位置,从左到右解析当前结点。
当前结点的开头向右一个字节是下一个结点的结尾,继续从右向左解析结点数据。如此循环,可以遍历完所有结点

左到右遍历listpack时,首先直接跳过前6个字节,直接从头结点开始向右解析结点数据。
解析结点时,根据编码规则解析element-type和element-data字段,解析完这两个字段后,已经知道了结点长度,根据element-total-len的编码规则,可以知道element-total-len的长度。
跳过当前element-total-len的所占的字节,得到下一个结点的开头,继续从左向右解析结点数据。如此循环,可以遍历完所有结点数据。

例子

仍然和上面ziplist例子一样,包含两个结点,第一个结点的存储整数50,第二个结点存储字符串hello

[00 00 00 10][00 02][32 01][85 68 65 6c 6c 6f 06][FF]
      |         |      |             |             |
  total-bytes  num     50          hello          END
  • 1-4字节:listpack的总长度16字节
  • 5-6字节:结点个数2个
  • 7字节:结点1的encoding-type。32是十六进制,转化为二进制是00110010,根据encoding-type的编码规则,该结点的数据是7位无符号整数,并且这个整数存储在当前字节的后7位(0110010),代表整数50
  • 8字节:结点1的长度(不包含当前字段)为1
  • 9字节:结点2的encoding-type,转化为二进制是10000101。根据encoding-type编码规则,该结点的数据是字符串,字符串长度是该字节的后6位(000101),长度是5
  • 10-14字节:结点2的数据hello
  • 15字节:结点2的长度(不包含当前字段)为6
  • 16字节:结束标识

talk is cheap, show me the code

ziplist的insert方法

/* 
 * @param zl:插入的zplist
 * @param p:插入的位置
 * @param s:待插入的字符串
 * @param slen:待插入的字符串的长度
 * Insert item at "p". 
 */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p,
					 unsigned char *s, unsigned int slen) {
	/* curlen表示ziplist的插入前的长度
	   reqlen表示存储新插入结点的长度
	*/
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
	/* prevlen:待插入结点的前一个结点的长度
	   prevlensize:存储prvlen字段所需的长度*/
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;

    /* Find out prevlen for the entry that is inserted. */
	/* 得到prvelen*/
    if (p[0] != ZIP_END) {
		/* 插入位置不是最末尾,那么p指向的是一个正常结点
		   新插入结点的前一个结点是p结点的前一个结点
		   所以prevlen等于p结点的prevlen字段*/
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
		/* 插入位置是最末尾*/
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
			/* 当前list不为空时,新插入结点的前一个结点是尾结点
			   所以prevlen等于尾结点的长度*/
            prevlen = zipRawEntryLength(ptail);
        }
    }

    /* See if the entry can be encoded */
	/* 检验新插入的数据是否能编码为整数存储*/
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
		/* 可以编码为整数,数据(entry-data字段)的长度*/
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the
         * string length to figure out how to encode it. */
		 /* 编码为字符串存储时,数据(entry-data字段)的长度*/
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
	 /* 计算prevlen字段的长度*/
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
	/* 计算encoding字段的长度*/
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry's length in
     * its prevlen field. */
	/* 如果p指向一个正常结点时,插入后,p结点的前一个结点变成了新插入的结点
	   p结点的prevlen的值可能发生了变化,因为prevlen采用变长编码,
	   p结点的prevlen占的空间长度也可能发生变化。
	   这是先计算prevlen长度的变化值,后面ziplist扩容时可以正确计算这部分空间的变化*/
    int forcelarge = 0;	/*结点p的prevlen字段是否变长*/
	/*nextdiff表示变化的字节数*/
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    if (nextdiff == -4 && reqlen < 4) {
        nextdiff = 0;
        forcelarge = 1;
    }

    /* Store offset because a realloc may change the address of zl. */
	/* zl进行扩容 
	   扩容前存储结点p的偏移量,因为扩容后zl的地址可能发生变化*/
    offset = p-zl;
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

	/* 扩容后,空间已经足够
	   先把结点p及它后面的数据整体右移
	   移动后,尾结点发生了变化,同时更新zltail字段
	   */
    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        /* Encode this entry's raw length in the next entry. */
		/* 更新结点p的prevlen字段 */
        if (forcelarge)
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);

        /* Update offset for tail */
		/* 更新zltail字段 */
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
		/* 结点p的prevlen字段长度的变化可能也会影响zltail字段
		   1. 如果结点p是尾结点那么不影响
		   2. 如果结点p不是尾结点那么影响*/
        zipEntry(p+reqlen, &tail);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
		/* 插入最末尾的情况下,不需要整体平移,只更新zltail字段*/
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
	/* 结点p的长度发生变化时,处理可能发生连锁更新*/
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
	/* 写入新插入的元素*/
    p += zipStorePrevEntryLength(p,prevlen);
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
	/* 更新zllen字段*/
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

quicklist的insert方法

/* Insert a new entry before or after existing entry 'entry'.
 *
 * If after==1, the new value is inserted after 'entry', otherwise
 * the new value is inserted before 'entry'. */
/* 在entry结点前面或者后面插入一个新的结点
 * 如果after==1,新的结点插入entry结点后面
 * 其他情况,新的结点插入entry结点前面
 * @param quicklist:结点插入的quicklist
 * @param entry:插入位置
 * @param value:新插入结点的值
 * @param sz:value的长度
 * @param after:after==1 插入entry结点的后面,其他情况前面
 */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
                                   void *value, const size_t sz, int after) {
    int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
    /* fill参数和单个ziplist的容量限制有关,目前有两种方式
     * 1. 按照据占在空间,这是默认方式,不超过8Kb
     * 2. 按照ziplist中结点个数限制,在配置文件(list-max-ziplist-size)中配置
     */
    int fill = quicklist->fill;
	/* quicklistNode结构体表示quicklist中一个ziplist*/
    quicklistNode *node = entry->node;
    quicklistNode *new_node = NULL;

    if (!node) {
        /* we have no reference node, so let's create only node in the list */
        /*ziplist为空的情况,新建ziplist并插入结点*/
        D("No node given!");
        new_node = quicklistCreateNode();
        new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        __quicklistInsertNode(quicklist, NULL, new_node, after);
        new_node->count++;
        quicklist->count++;
        return;
    }

    /* Populate accounting flags for easier boolean checks later */
    /* 根据fill参数的设置,检查当前ziplist是否超过容量限制
       full = 1表示当前ziplist达到容量限制*/
    if (!_quicklistNodeAllowInsert(node, fill, sz)) {
        D("Current node is full with count %d with requested fill %lu",
          node->count, fill);
        full = 1;
    }

    if (after && (entry->offset == node->count)) {
        /*插入位置是当前ziplist末尾时,检查下一个ziplist是否达到容量限制
         * full_next=1 表示下一个ziplist已经达到容量限制
         */
        D("At Tail of current ziplist");
        at_tail = 1;
        if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
            D("Next node is full too.");
            full_next = 1;
        }
    }

    if (!after && (entry->offset == 0)) {
        /*插入位置是当前ziplist开头时,检查上一个ziplist是否达到容量限制
         * full_prev=1 表示上一个ziplist已经达到容量限制
         */
        D("At Head");
        at_head = 1;
        if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
            D("Prev node is full too.");
            full_prev = 1;
        }
    }

    /* Now determine where and how to insert the new element */
    if (!full && after) {
        /* 当前ziplist未达到容量限制,插入末尾*/
        D("Not full, inserting after current position.");
        /* ziplist中间的结点可能是压缩存储的(配置文件里的list-compress-depth控制),
          如果是,这里会先解压*/
        quicklistDecompressNodeForUse(node);
        unsigned char *next = ziplistNext(node->zl, entry->zi);
        if (next == NULL) {
            node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
        } else {
            node->zl = ziplistInsert(node->zl, next, value, sz);
        }
        //更新ziplist的结点数
        node->count++;
        //更新quicklist的结点数
        quicklistNodeUpdateSz(node);
        //重新压缩,如果需要的话
        quicklistRecompressOnly(quicklist, node);
    } else if (!full && !after) {
        /* 当前ziplist未达到容量限制,插入开头*/
        D("Not full, inserting before current position.");
        quicklistDecompressNodeForUse(node);
        node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
        node->count++;
        quicklistNodeUpdateSz(node);
        quicklistRecompressOnly(quicklist, node);
    } else if (full && at_tail && node->next && !full_next && after) {
        /* 当前ziplist达到容量限制,插入末尾,下一个ziplist末达到容量限制
           插入下一个ziplist开头*/
        /* If we are: at tail, next has free space, and inserting after:
         *   - insert entry at head of next node. */
        D("Full and tail, but next isn't full; inserting next node head");
        new_node = node->next;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && at_head && node->prev && !full_prev && !after) {
        /* 当前ziplist达到容量限制,插入末尾,上一个ziplist末达到容量限制
           插入下一个ziplist的结尾*/
        /* If we are: at head, previous has free space, and inserting before:
         *   - insert entry at tail of previous node. */
        D("Full and head, but prev isn't full, inserting prev node tail");
        new_node = node->prev;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && ((at_tail && node->next && full_next && after) ||
                        (at_head && node->prev && full_prev && !after))) {
        /*插入位置在ziplist开头或末尾,而且ziplist都达到容量限制,
          直接创建新的ziplist*/
        /* If we are: full, and our prev/next is full, then:
         *   - create new node and attach to quicklist */
        D("\tprovisioning new node...");
        new_node = quicklistCreateNode();
        new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
    } else if (full) {
        /*插入位置在ziplist中间,而且ziplist都达到容量限制,
          当前ziplist分裂为两个ziplist*/
        /* else, node is full we need to split it. */
        /* covers both after and !after cases */
        D("\tsplitting node...");
        quicklistDecompressNodeForUse(node);
        new_node = _quicklistSplitNode(node, entry->offset, after);
        new_node->zl = ziplistPush(new_node->zl, value, sz,
                                   after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
        _quicklistMergeNodes(quicklist, node);
    }

    quicklist->count++;
}

listpack的insert方法

/* Insert, delete or replace the specified element 'ele' of length 'len' at
 * the specified position 'p', with 'p' being a listpack element pointer
 * obtained with lpFirst(), lpLast(), lpIndex(), lpNext(), lpPrev() or
 * lpSeek().
 *
 * The element is inserted before, after, or replaces the element pointed
 * by 'p' depending on the 'where' argument, that can be LP_BEFORE, LP_AFTER
 * or LP_REPLACE.
 *
 * If 'ele' is set to NULL, the function removes the element pointed by 'p'
 * instead of inserting one.
 *
 * Returns NULL on out of memory or when the listpack total length would exceed
 * the max allowed size of 2^32-1, otherwise the new pointer to the listpack
 * holding the new element is returned (and the old pointer passed is no longer
 * considered valid)
 *
 * If 'newp' is not NULL, at the end of a successful call '*newp' will be set
 * to the address of the element just added, so that it will be possible to
 * continue an interation with lpNext() and lpPrev().
 *
 * For deletion operations ('ele' set to NULL) 'newp' is set to the next
 * element, on the right of the deleted one, or to NULL if the deleted element
 * was the last one. */
/* 该方法有插入、删除和替换三种功能,这里重点说明插入
 * @param lp:listpack
 * @param ele:插入结点数据
 * @param size:插入结点数据长度
 * @param p:插入结点的位置
 * @param where: LP_BEFORE表示前面插入,LE_AFTER表示后面插入,LP_REPLACE 表示替换
 * @param newp:新插入结点的指针
 */
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, 
		unsigned char *p, int where, unsigned char **newp) {
    /*intenc存储encoding-type字段*/
    unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
    /*backlen存储element-total-len字段*/
    unsigned char backlen[LP_MAX_BACKLEN_SIZE];

    uint64_t enclen; /* The length of the encoded element. */

    /* An element pointer set to NULL means deletion, which is conceptually
     * replacing the element with a zero-length element. So whatever we
     * get passed as 'where', set it to LP_REPLACE. */
    if (ele == NULL) where = LP_REPLACE;

    /* If we need to insert after the current element, we just jump to the
     * next element (that could be the EOF one) and handle the case of
     * inserting before. So the function will actually deal with just two
     * cases: LP_BEFORE and LP_REPLACE. */
    if (where == LP_AFTER) {
        p = lpSkip(p);
        where = LP_BEFORE;
    }

    /* Store the offset of the element 'p', so that we can obtain its
     * address again after a reallocation. */
    /*插入位置的偏移量*/
    unsigned long poff = p-lp;

    /* Calling lpEncodeGetType() results into the encoded version of the
     * element to be stored into 'intenc' in case it is representable as
     * an integer: in that case, the function returns LP_ENCODING_INT.
     * Otherwise if LP_ENCODING_STR is returned, we'll have to call
     * lpEncodeString() to actually write the encoded string on place later.
     *
     * Whatever the returned encoding is, 'enclen' is populated with the
     * length of the encoded element. */
    /*得到插入结点的编码类型*/
    int enctype;
    if (ele) {
        enctype = lpEncodeGetType(ele,size,intenc,&enclen);
    } else {
        enctype = -1;
        enclen = 0;
    }

    /* We need to also encode the backward-parsable length of the element
     * and append it to the end: this allows to traverse the listpack from
     * the end to the start. */
    /*得到新插入结点的element-total-len字段所需的存储空间*/
    unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
    uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
    uint32_t replaced_len  = 0;
    if (where == LP_REPLACE) {
        replaced_len = lpCurrentEncodedSize(p);
        replaced_len += lpEncodeBacklen(NULL,replaced_len);
    }

    uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
                                  - replaced_len;
    if (new_listpack_bytes > UINT32_MAX) return NULL;

    /* We now need to reallocate in order to make space or shrink the
     * allocation (in case 'when' value is LP_REPLACE and the new element is
     * smaller). However we do that before memmoving the memory to
     * make room for the new element if the final allocation will get
     * larger, or we do it after if the final allocation will get smaller. */

    unsigned char *dst = lp + poff; /* May be updated after reallocation. */

    /* Realloc before: we need more room. */
    if (new_listpack_bytes > old_listpack_bytes) {
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    /* Setup the listpack relocating the elements to make the exact room
     * we need to store the new one. */
    /* 从插入位置开始,结点整体右移,插入情况下,replaced_len=0*/
    if (where == LP_BEFORE) {
        memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
    } else { /* LP_REPLACE. */
        long lendiff = (enclen+backlen_size)-replaced_len;
        memmove(dst+replaced_len+lendiff,
                dst+replaced_len,
                old_listpack_bytes-poff-replaced_len);
    }

    /* Realloc after: we need to free space. */
    if (new_listpack_bytes < old_listpack_bytes) {
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    /* Store the entry. */
    if (newp) {
        *newp = dst;
        /* In case of deletion, set 'newp' to NULL if the next element is
         * the EOF element. */
        if (!ele && dst[0] == LP_EOF) *newp = NULL;
    }
    /*写入新的结点*/
    if (ele) {
        if (enctype == LP_ENCODING_INT) {
            memcpy(dst,intenc,enclen);
        } else {
            lpEncodeString(dst,ele,size);
        }
        dst += enclen;
        memcpy(dst,backlen,backlen_size);
        dst += backlen_size;
    }

    /* Update header. */
    if (where != LP_REPLACE || ele == NULL) {
        uint32_t num_elements = lpGetNumElements(lp);
        if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
            if (ele)
                lpSetNumElements(lp,num_elements+1);
            else
                lpSetNumElements(lp,num_elements-1);
        }
    }
    lpSetTotalBytes(lp,new_listpack_bytes);

    return lp;
}

参考资料