redis的list数据结构简介
list是计算机科学中最基本的数据结构之一,也是redis支持的数据类型之一。
最常见的list是链表(Linked List),redis是内存数据库,节约存储空间至关重要,因此redis中还实现了其它几种存储效率更高的list:ziplist、quicklist、listpack。
本文将一一介绍它们的具体结构,还会简单分析redis 5.0版本他们的insert方法的源代码
双端链表(Double Linked List)
双端链表的数据结构如下:
___________________________________________________________________
| |
| V
[(metadata)(tailP)(headP)]-->[(pp1)(dataP1)(pn1)]<--->[(pp2)(dataP2)(pn2)]......[(ppN)(dataPN)(pnN)]
- 一开始是链表头结点,存储链表的基本信息(metadata),如结点的个数。除此之外,还有一个指向第一个数据结点的指针(头指针),指向最后一个结点的指针(尾指针)
- 第一个数据结点由三部分组成。第一部分指针pp1指向前一个数据结点(第一个数据结点不存在前一个结点,因此pp1等于NULL),第二部分指针dataP1指向真正的数据,第三部分指针pn1指向下一个数据结点(即第二个数据结点)
- 第二个数据结点由三部分组成。第一部分指针pp2指向前一个数据结点(即第一个数据结点),第二部分指针dataP2指向真正的数据,第三部分指针pn2指向下一个数据结点(即第三个数据结点)
……
…… - 第N个数据结点由三部分组成。第一部分ppN指向前一个数据结点(即第N-1个数据结点),第二部分指针dataPN指向真正的数据,第三部分指针pnN指向下一个结点(最后一个结点不存在下一个结点,因此pnN等于NULL)
双端链表是最常见的链表,结构简单清晰,可以从链表头从左到右遍历整个链表,也可以从链表尾从右到左遍历整个链表。
然而对于redis应用来说,双端列表有一个严重的缺陷:元数据占用的内存太多。我们可以具体分析下,一个数据节点有3个指针(64系统里指针占8个字节)占24字节,数据指针指向的是一个redis object,redis object中又包含一些元数据,算下来,每个节点都需要约40字节存储元数据。而真正的数据,比如一个整数或者短字符串,远小于40字节,这样大多数的内存都用在了存储元数据上,非常不划算。
为了解决这个问题,redis使用另一种存储效率更高的链表:ziplist。
ziplist
双端链链表的数据在内存中不是连续的,需要通过指针把数据串联起来,而指针过多导致了占用的内存也会变多。
所以ziplist解决这个问题的思路是:ziplist在内存中占据一段连续空间,要访问某个结点,只需要知道它相对于ziplist开头在偏移量,完全避免了指针。
ziplist数据结构如下:
[zlbyte][zltail][zllen][entry1][entry2]....[END]
每个entry的结构如下:
[<prevlen><encoding><entry-data>]
- zlbyte:4个字节的32位无符号整数,表示链表的字节数
- zltail:4个字节的32位无符号整数,表示最后一个结点的偏移量(类似双端链表中尾指针的作用,可以快速找到最后一个结点)
- zllen:2个字节的无符号整数,表示结点的个数。
- entry:结点。结点由三部分组成:prevlen、encoding、entry-data
- prevlen:前一个结点的长度(单位是字节),采用变长编码。
当长度小于254时,占一个字节;
当长度大于或等于254时占5个字节,其中第一个字节固定为254,剩下的4字节表示32位无符号整数 - encoding:数据的编码类型,可能是整形或字符串,变长编码,第一个字节即可确定编码类型。例如:
如果第一个字节的二进制形式是00pppppp,00开头表示后面的数据是字符串,字符串长度是pppppp表示的6位无符号整数
如果前两个字节的二进制是01pppppp qqqqqqqq,01开头表示后面的数据是字符串,字符串长度是pppppp qqqqqqqq表示的14位无符号整数
如果前两个字节的二进制是11111110 qqqqqqqq,11开头表示后面的数据是整数,11111110表示后面的是8位有符号整数,qqqqqqqq 就是8位的数据
这里只是列举了部分编码,还有其它情况可以参见源码。 - entry-data:存储真正的数据
- prevlen:前一个结点的长度(单位是字节),采用变长编码。
- END:1个字节,结束标记,固定为255
例子
下面的ziplist包含两个结点,第一个结点的存储整数50,第二个结点存储字符串hello
[15 00 00 00] [0d 00 00 00] [02 00] [00 fe 32] [03 05 68 65 6c 6c 6f] [ff]
| | | | | |
zlbytes zltail entries entry1:50 entry2:hello end
- 1-4字节zlbytes,表示ziplist的总字节数是21字节
- 5-8字节zltail,表示最后一个结点相对ziplist开头的偏移量是13字节
- 9-10字节entries,表示结点的个数是2个
- 11字节prevlen,表示前一个结点的长度是0(因为第一个结点不存在前一个结点)
- 12字节encoding,fe转化为二进制是11111110,说明数据是一个8位有符号整数
- 13字节entrydata,真正的数据,整数50
- 14字节prevlen,表示前一个结点的长度是3字节
- 15字节encoding,fe转化为二进制是00000101,说明数据是字符串,字符串长度是该字节后6位表示的无符号整数(000101),转化为十进制是5
- 16-20字节entrydata,真正的数据,hello字符串的asccii码
- 21字节END
ziplist的缺点
- ziplist结点数量有限制,不适宜太多。这是因为ziplist要求连续在内存空间,插入和删除结点后都涉及到内存分配和大量结点的整体移动。
quciklist可以为解决这个问题 - ziplist结点的prevlen字段可以让我们从尾结点向头结点的方向遍历整个list,但这个设计也带来了较高的复杂度。
试想一下,插入一个结点后,下一个结点的prevlen字段的值很可能发生变化,prevlen是变长编码,那么prevlen占的空间也可能发生变化。这种情况下,会进一步导致下下个结点的prevlen字段的值发生变化,引发连锁更新效应,非常复杂。
listpack可以解决这个问题
quicklist
quick创造性的把linked list和ziplist结合起来,多个小的ziplist组合成doubled linked listi,这样一来,既能高效的利用内存空间又没有结点数量的限制。quicklist数据结构如下:
[ziplist0]<-->[ziplist1]<-->...<-->[ziplistN]
虽然整个quicklist没有结点的数量的限制,但单个ziplist结点数量限制仍然存在。当向ziplist中插入一个结点时,如果超过了最大结点数限制,那么当前ziplist必须分裂为两个小的ziplist,这个地方也是比较麻烦的
listpack
listpack和ziplist类似也是占据一片连续的内存空间,listpack数据结构如下:
[total-bytes][num-elements][element1][element2]...[elementN][END]
每个element的结构如下:
[<encoding-type><element-data><element-total-len>]
- total-bytes:固定为4字节,32位无符号整数,表示listpack的长度
- num-elements:固定为2字节,16位无符号整数,表示结点个数。
16位无符号整数的最大能表示的整数是65535。结点个数大于或等于65535的情况下,这个字段始终为65535,获取结点的真实个数,需要把整个list遍历一遍 - element1:数据结点1,结构如下
- encoding-type:变长编码,数据编码类型,举两个例子
当这个字节的二进制形式为0xxxxxxx时,表示数据是一个7位无符号整数,该字节的后7位就用来存储这个整数。这种情况下,不需要专门的element-data字段,encoding-type和element-data共用一个字节
当这个字节的二进制形式为10xxxxxx时,表示数据是一个字符串,字符串的长度是一个6位的无符号整数,存储在该字节的后6位中。 - element-data:真正的数据
- element-total-len:变长编码,该结点的长度(不包含element-total-len字段自身),单位是字节,这个字段的细节后面单独介绍。
- encoding-type:变长编码,数据编码类型,举两个例子
- element2:数据结点2
- END:结束标记,固定为FF
element-total-len
element-total-len字段存储本结点的长度(不包含element-total-len字段本身),本结点的长度存储在本结点中,而不是像ziplist那样存储在后一个结点中,避免了ziplist中可能出现的连锁更新效应。
存储结点长度本质上是为了能够从右向左遍历list,为此element-total-len有一些特殊的地方:
- 为了适应从右向左解析,element-total-len的低位字节存放在右边(大端序big endian)
- 为了节省空间element-total-len仍然采用变长编码
- 每个字节的最高位标记当前字节后是否还有更多字节属于element-total-len字段,剩下的7位表示数据。
最高位等于0表示没有更多字节,当前字节就是element-total-len字段的最后一个字节;最高位等于1表示还有更多字节。
假设elment-total-len是500,那么编码是怎么样的呢?
500用二进制表示是
111110100
然后拆分为两个字节(每个字节只用7位)表示
0000011 1110100
每个字节加止第八位,最左边字节后面没有更多字节第八位加0,其他加1
[0]0000011 [1]1110100
| |
`- no more bytes `- more bytes to the left
从右到左遍历listpack时,根据listpack总长度,可以得到结束标识的位置,与结束标记相临的是最后一个结点。
于是可以从尾结点开始向左解析结点数据,解析结点数据时,首先可以解析得到element-total-len,这样可以得到当前结点的开始位置,从左到右解析当前结点。
当前结点的开头向右一个字节是下一个结点的结尾,继续从右向左解析结点数据。如此循环,可以遍历完所有结点
从左到右遍历listpack时,首先直接跳过前6个字节,直接从头结点开始向右解析结点数据。
解析结点时,根据编码规则解析element-type和element-data字段,解析完这两个字段后,已经知道了结点长度,根据element-total-len的编码规则,可以知道element-total-len的长度。
跳过当前element-total-len的所占的字节,得到下一个结点的开头,继续从左向右解析结点数据。如此循环,可以遍历完所有结点数据。
例子
仍然和上面ziplist例子一样,包含两个结点,第一个结点的存储整数50,第二个结点存储字符串hello
[00 00 00 10][00 02][32 01][85 68 65 6c 6c 6f 06][FF]
| | | | |
total-bytes num 50 hello END
- 1-4字节:listpack的总长度16字节
- 5-6字节:结点个数2个
- 7字节:结点1的encoding-type。32是十六进制,转化为二进制是00110010,根据encoding-type的编码规则,该结点的数据是7位无符号整数,并且这个整数存储在当前字节的后7位(0110010),代表整数50
- 8字节:结点1的长度(不包含当前字段)为1
- 9字节:结点2的encoding-type,转化为二进制是10000101。根据encoding-type编码规则,该结点的数据是字符串,字符串长度是该字节的后6位(000101),长度是5
- 10-14字节:结点2的数据hello
- 15字节:结点2的长度(不包含当前字段)为6
- 16字节:结束标识
talk is cheap, show me the code
ziplist的insert方法
/*
* @param zl:插入的zplist
* @param p:插入的位置
* @param s:待插入的字符串
* @param slen:待插入的字符串的长度
* Insert item at "p".
*/
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p,
unsigned char *s, unsigned int slen) {
/* curlen表示ziplist的插入前的长度
reqlen表示存储新插入结点的长度
*/
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
/* prevlen:待插入结点的前一个结点的长度
prevlensize:存储prvlen字段所需的长度*/
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; /* initialized to avoid warning. Using a value
that is easy to see if for some reason
we use it uninitialized. */
zlentry tail;
/* Find out prevlen for the entry that is inserted. */
/* 得到prvelen*/
if (p[0] != ZIP_END) {
/* 插入位置不是最末尾,那么p指向的是一个正常结点
新插入结点的前一个结点是p结点的前一个结点
所以prevlen等于p结点的prevlen字段*/
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else {
/* 插入位置是最末尾*/
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
/* 当前list不为空时,新插入结点的前一个结点是尾结点
所以prevlen等于尾结点的长度*/
prevlen = zipRawEntryLength(ptail);
}
}
/* See if the entry can be encoded */
/* 检验新插入的数据是否能编码为整数存储*/
if (zipTryEncoding(s,slen,&value,&encoding)) {
/* 'encoding' is set to the appropriate integer encoding */
/* 可以编码为整数,数据(entry-data字段)的长度*/
reqlen = zipIntSize(encoding);
} else {
/* 'encoding' is untouched, however zipStoreEntryEncoding will use the
* string length to figure out how to encode it. */
/* 编码为字符串存储时,数据(entry-data字段)的长度*/
reqlen = slen;
}
/* We need space for both the length of the previous entry and
* the length of the payload. */
/* 计算prevlen字段的长度*/
reqlen += zipStorePrevEntryLength(NULL,prevlen);
/* 计算encoding字段的长度*/
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
/* When the insert position is not equal to the tail, we need to
* make sure that the next entry can hold this entry's length in
* its prevlen field. */
/* 如果p指向一个正常结点时,插入后,p结点的前一个结点变成了新插入的结点
p结点的prevlen的值可能发生了变化,因为prevlen采用变长编码,
p结点的prevlen占的空间长度也可能发生变化。
这是先计算prevlen长度的变化值,后面ziplist扩容时可以正确计算这部分空间的变化*/
int forcelarge = 0; /*结点p的prevlen字段是否变长*/
/*nextdiff表示变化的字节数*/
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}
/* Store offset because a realloc may change the address of zl. */
/* zl进行扩容
扩容前存储结点p的偏移量,因为扩容后zl的地址可能发生变化*/
offset = p-zl;
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;
/* 扩容后,空间已经足够
先把结点p及它后面的数据整体右移
移动后,尾结点发生了变化,同时更新zltail字段
*/
/* Apply memory move when necessary and update tail offset. */
if (p[0] != ZIP_END) {
/* Subtract one because of the ZIP_END bytes */
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
/* Encode this entry's raw length in the next entry. */
/* 更新结点p的prevlen字段 */
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);
/* Update offset for tail */
/* 更新zltail字段 */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
/* When the tail contains more than one entry, we need to take
* "nextdiff" in account as well. Otherwise, a change in the
* size of prevlen doesn't have an effect on the *tail* offset. */
/* 结点p的prevlen字段长度的变化可能也会影响zltail字段
1. 如果结点p是尾结点那么不影响
2. 如果结点p不是尾结点那么影响*/
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
/* This element will be the new tail. */
/* 插入最末尾的情况下,不需要整体平移,只更新zltail字段*/
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}
/* When nextdiff != 0, the raw length of the next entry has changed, so
* we need to cascade the update throughout the ziplist */
/* 结点p的长度发生变化时,处理可能发生连锁更新*/
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}
/* Write the entry */
/* 写入新插入的元素*/
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
/* 更新zllen字段*/
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}
quicklist的insert方法
/* Insert a new entry before or after existing entry 'entry'.
*
* If after==1, the new value is inserted after 'entry', otherwise
* the new value is inserted before 'entry'. */
/* 在entry结点前面或者后面插入一个新的结点
* 如果after==1,新的结点插入entry结点后面
* 其他情况,新的结点插入entry结点前面
* @param quicklist:结点插入的quicklist
* @param entry:插入位置
* @param value:新插入结点的值
* @param sz:value的长度
* @param after:after==1 插入entry结点的后面,其他情况前面
*/
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
/* fill参数和单个ziplist的容量限制有关,目前有两种方式
* 1. 按照据占在空间,这是默认方式,不超过8Kb
* 2. 按照ziplist中结点个数限制,在配置文件(list-max-ziplist-size)中配置
*/
int fill = quicklist->fill;
/* quicklistNode结构体表示quicklist中一个ziplist*/
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
if (!node) {
/* we have no reference node, so let's create only node in the list */
/*ziplist为空的情况,新建ziplist并插入结点*/
D("No node given!");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
__quicklistInsertNode(quicklist, NULL, new_node, after);
new_node->count++;
quicklist->count++;
return;
}
/* Populate accounting flags for easier boolean checks later */
/* 根据fill参数的设置,检查当前ziplist是否超过容量限制
full = 1表示当前ziplist达到容量限制*/
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
node->count, fill);
full = 1;
}
if (after && (entry->offset == node->count)) {
/*插入位置是当前ziplist末尾时,检查下一个ziplist是否达到容量限制
* full_next=1 表示下一个ziplist已经达到容量限制
*/
D("At Tail of current ziplist");
at_tail = 1;
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
D("Next node is full too.");
full_next = 1;
}
}
if (!after && (entry->offset == 0)) {
/*插入位置是当前ziplist开头时,检查上一个ziplist是否达到容量限制
* full_prev=1 表示上一个ziplist已经达到容量限制
*/
D("At Head");
at_head = 1;
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
D("Prev node is full too.");
full_prev = 1;
}
}
/* Now determine where and how to insert the new element */
if (!full && after) {
/* 当前ziplist未达到容量限制,插入末尾*/
D("Not full, inserting after current position.");
/* ziplist中间的结点可能是压缩存储的(配置文件里的list-compress-depth控制),
如果是,这里会先解压*/
quicklistDecompressNodeForUse(node);
unsigned char *next = ziplistNext(node->zl, entry->zi);
if (next == NULL) {
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else {
node->zl = ziplistInsert(node->zl, next, value, sz);
}
//更新ziplist的结点数
node->count++;
//更新quicklist的结点数
quicklistNodeUpdateSz(node);
//重新压缩,如果需要的话
quicklistRecompressOnly(quicklist, node);
} else if (!full && !after) {
/* 当前ziplist未达到容量限制,插入开头*/
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (full && at_tail && node->next && !full_next && after) {
/* 当前ziplist达到容量限制,插入末尾,下一个ziplist末达到容量限制
插入下一个ziplist开头*/
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && at_head && node->prev && !full_prev && !after) {
/* 当前ziplist达到容量限制,插入末尾,上一个ziplist末达到容量限制
插入下一个ziplist的结尾*/
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
/*插入位置在ziplist开头或末尾,而且ziplist都达到容量限制,
直接创建新的ziplist*/
/* If we are: full, and our prev/next is full, then:
* - create new node and attach to quicklist */
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
} else if (full) {
/*插入位置在ziplist中间,而且ziplist都达到容量限制,
当前ziplist分裂为两个ziplist*/
/* else, node is full we need to split it. */
/* covers both after and !after cases */
D("\tsplitting node...");
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
_quicklistMergeNodes(quicklist, node);
}
quicklist->count++;
}
listpack的insert方法
/* Insert, delete or replace the specified element 'ele' of length 'len' at
* the specified position 'p', with 'p' being a listpack element pointer
* obtained with lpFirst(), lpLast(), lpIndex(), lpNext(), lpPrev() or
* lpSeek().
*
* The element is inserted before, after, or replaces the element pointed
* by 'p' depending on the 'where' argument, that can be LP_BEFORE, LP_AFTER
* or LP_REPLACE.
*
* If 'ele' is set to NULL, the function removes the element pointed by 'p'
* instead of inserting one.
*
* Returns NULL on out of memory or when the listpack total length would exceed
* the max allowed size of 2^32-1, otherwise the new pointer to the listpack
* holding the new element is returned (and the old pointer passed is no longer
* considered valid)
*
* If 'newp' is not NULL, at the end of a successful call '*newp' will be set
* to the address of the element just added, so that it will be possible to
* continue an interation with lpNext() and lpPrev().
*
* For deletion operations ('ele' set to NULL) 'newp' is set to the next
* element, on the right of the deleted one, or to NULL if the deleted element
* was the last one. */
/* 该方法有插入、删除和替换三种功能,这里重点说明插入
* @param lp:listpack
* @param ele:插入结点数据
* @param size:插入结点数据长度
* @param p:插入结点的位置
* @param where: LP_BEFORE表示前面插入,LE_AFTER表示后面插入,LP_REPLACE 表示替换
* @param newp:新插入结点的指针
*/
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size,
unsigned char *p, int where, unsigned char **newp) {
/*intenc存储encoding-type字段*/
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
/*backlen存储element-total-len字段*/
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
uint64_t enclen; /* The length of the encoded element. */
/* An element pointer set to NULL means deletion, which is conceptually
* replacing the element with a zero-length element. So whatever we
* get passed as 'where', set it to LP_REPLACE. */
if (ele == NULL) where = LP_REPLACE;
/* If we need to insert after the current element, we just jump to the
* next element (that could be the EOF one) and handle the case of
* inserting before. So the function will actually deal with just two
* cases: LP_BEFORE and LP_REPLACE. */
if (where == LP_AFTER) {
p = lpSkip(p);
where = LP_BEFORE;
}
/* Store the offset of the element 'p', so that we can obtain its
* address again after a reallocation. */
/*插入位置的偏移量*/
unsigned long poff = p-lp;
/* Calling lpEncodeGetType() results into the encoded version of the
* element to be stored into 'intenc' in case it is representable as
* an integer: in that case, the function returns LP_ENCODING_INT.
* Otherwise if LP_ENCODING_STR is returned, we'll have to call
* lpEncodeString() to actually write the encoded string on place later.
*
* Whatever the returned encoding is, 'enclen' is populated with the
* length of the encoded element. */
/*得到插入结点的编码类型*/
int enctype;
if (ele) {
enctype = lpEncodeGetType(ele,size,intenc,&enclen);
} else {
enctype = -1;
enclen = 0;
}
/* We need to also encode the backward-parsable length of the element
* and append it to the end: this allows to traverse the listpack from
* the end to the start. */
/*得到新插入结点的element-total-len字段所需的存储空间*/
unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0;
if (where == LP_REPLACE) {
replaced_len = lpCurrentEncodedSize(p);
replaced_len += lpEncodeBacklen(NULL,replaced_len);
}
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
if (new_listpack_bytes > UINT32_MAX) return NULL;
/* We now need to reallocate in order to make space or shrink the
* allocation (in case 'when' value is LP_REPLACE and the new element is
* smaller). However we do that before memmoving the memory to
* make room for the new element if the final allocation will get
* larger, or we do it after if the final allocation will get smaller. */
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
/* Realloc before: we need more room. */
if (new_listpack_bytes > old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* Setup the listpack relocating the elements to make the exact room
* we need to store the new one. */
/* 从插入位置开始,结点整体右移,插入情况下,replaced_len=0*/
if (where == LP_BEFORE) {
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else { /* LP_REPLACE. */
long lendiff = (enclen+backlen_size)-replaced_len;
memmove(dst+replaced_len+lendiff,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}
/* Realloc after: we need to free space. */
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* Store the entry. */
if (newp) {
*newp = dst;
/* In case of deletion, set 'newp' to NULL if the next element is
* the EOF element. */
if (!ele && dst[0] == LP_EOF) *newp = NULL;
}
/*写入新的结点*/
if (ele) {
if (enctype == LP_ENCODING_INT) {
memcpy(dst,intenc,enclen);
} else {
lpEncodeString(dst,ele,size);
}
dst += enclen;
memcpy(dst,backlen,backlen_size);
dst += backlen_size;
}
/* Update header. */
if (where != LP_REPLACE || ele == NULL) {
uint32_t num_elements = lpGetNumElements(lp);
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
if (ele)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements-1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);
return lp;
}