7.ZSet 数据结构1)介绍
【1】ZSet 为有序的 , 自动去重的集合数据类型,ZSet 数据结构底层实现为 字典(dict) + 跳表(skiplist) ,当数据比较少时,用ziplist编码结构存储 。
zset-max-ziplist-entries128// 元素个数超过128 ,将用skiplist编码zset-max-ziplist-value64//单个元素大小超过 64 byte, 将用 skiplist编码【2】数据比较少时,用ziplist编码结构存储的图示:

文章插图
2)skiplist 分析解析
【1】数据结构代码
// 创建zset 数据结构: 字典 + 跳表robj *createZsetObject(void) { zset *zs = zmalloc(sizeof(*zs)); robj *o; // dict用来查询数据到分数的对应关系, 如 zscore 就可以直接根据 元素拿到分值 zs->dict = dictCreate(&zsetDictType,NULL); // skiplist用来根据分数查询数据(可能是范围查找) zs->zsl = zslCreate(); // 设置对象类型 o = createObject(OBJ_ZSET,zs); // 设置编码类型 o->encoding = OBJ_ENCODING_SKIPLIST; return o;}//位于edis/src/server.h 中#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */#define ZSKIPLIST_P 0.25/* Skiplist P = 1/4 */typedef struct zskiplistNode {sds ele;//存储字符串类型数据 redis3.0版本中使用robj类型表示,但是在redis4.0.1中直接使用sds类型表示double score;//存储排序的分值struct zskiplistNode *backward;//指向上一个节点,用于zrevrange命令struct zskiplistLevel {struct zskiplistNode *forward;//指向下一个节点unsigned long span;//到达后一个节点的跨度(两个相邻节点span为1)} level[];//该节点在各层的信息,柔性数组成员} zskiplistNode;typedef struct zskiplist {struct zskiplistNode *header, *tail;// 跳跃表头尾节点unsigned long length;//节点个数int level;//除头结点外最大的层数} zskiplist;typedef struct zset {dict *dict;zskiplist *zsl;} zset;【2】追踪添加函数
//在server.c发现跳表的添加函数为zaddCommand//去t_zset.c文件查看流程void zaddCommand(client *c) {zaddGenericCommand(c,ZADD_NONE);}void zaddGenericCommand(client *c, int flags) {static char *nanerr = "resulting score is not a number (NaN)";robj *key = c->argv[1];robj *zobj;sds ele;double score = 0, *scores = NULL;int j, elements;int scoreidx = 0;/* The following vars are used in order to track what the command actually* did during the execution, to reply to the client and to trigger the* notification of keyspace change. */int added = 0;//新添加元素的数量int updated = 0;//更新分数的元素数量int processed = 0;//被处理的元素数量/* Parse options. At the end 'scoreidx' is set to the argument position* of the score of the first score-element pair. */scoreidx = 2;// 输入参数解析while(scoreidx < c->argc) {char *opt = c->argv[scoreidx]->ptr;if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;else break;scoreidx++;}/* Turn options into simple to check vars. */int incr = (flags & ZADD_INCR) != 0;int nx = (flags & ZADD_NX) != 0;int xx = (flags & ZADD_XX) != 0;int ch = (flags & ZADD_CH) != 0;/* After the options, we expect to have an even number of args, since* we expect any number of score-element pairs. */elements = c->argc-scoreidx;if (elements % 2 || !elements) {addReply(c,shared.syntaxerr);return;}elements /= 2; /* Now this holds the number of score-element pairs. *//* Check for incompatible options. */if (nx && xx) {addReplyError(c,"XX and NX options at the same time are not compatible");return;}if (incr && elements > 1) {addReplyError(c,"INCR option supports a single increment-element pair");return;}/* Start parsing all the scores, we need to emit any syntax error* before executing additions to the sorted set, as the command should* either execute fully or nothing at all. */scores = zmalloc(sizeof(double)*elements);for (j = 0; j < elements; j++) {if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)!= C_OK) goto cleanup;}/* Lookup the key and create the sorted set if does not exist.查询对应的 key 在对应的 db 即 hash table中,是否存在*/zobj = lookupKeyWrite(c->db,key);if (zobj == NULL) {if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */// 如果 zset_max_ziplist_entries ==0//// 或者 zadd 元素的长度 > zset_max_ziplist_value//// 则直接创建 skiplist 数据结构//// 否则创建ziplist 压缩列表数据结构if (server.zset_max_ziplist_entries == 0 ||server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)){zobj = createZsetObject();} else {zobj = createZsetZiplistObject();}// 关联对象到dbdbAdd(c->db,key,zobj);} else {if (zobj->type != OBJ_ZSET) {addReply(c,shared.wrongtypeerr);goto cleanup;}}// 处理所有元素for (j = 0; j < elements; j++) {double newscore;// 分值score = scores[j];int retflags = flags;// 元素ele = c->argv[scoreidx+1+j*2]->ptr;// 往zobj 添加元素int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);if (retval == 0) {addReplyError(c,nanerr);goto cleanup;}if (retflags & ZADD_ADDED) added++;if (retflags & ZADD_UPDATED) updated++;if (!(retflags & ZADD_NOP)) processed++;score = newscore;}server.dirty += (added+updated);reply_to_client:if (incr) { /* ZINCRBY or INCR option. */if (processed)addReplyDouble(c,score);elseaddReplyNull(c);} else { /* ZADD. */addReplyLongLong(c,ch ? added+updated : added);}cleanup:zfree(scores);if (added || updated) {signalModifiedKey(c,c->db,key);notifyKeyspaceEvent(NOTIFY_ZSET,incr ? "zincr" : "zadd", key, c->db->id);}}// 创建zset 数据结构: 字典 + 跳表robj *createZsetObject(void) {zset *zs = zmalloc(sizeof(*zs));robj *o;// dict用来查询数据到分数的对应关系 , 如 zscore 就可以直接根据 元素拿到分值zs->dict = dictCreate(&zsetDictType,NULL);// skiplist用来根据分数查询数据(可能是范围查找)zs->zsl = zslCreate();// 设置对象类型o = createObject(OBJ_ZSET,zs);// 设置编码类型o->encoding = OBJ_ENCODING_SKIPLIST;return o;}// 创建zset 数据结构: ZipListrobj *createZsetZiplistObject(void) {unsigned char *zl = ziplistNew();robj *o = createObject(OBJ_ZSET,zl);o->encoding = OBJ_ENCODING_ZIPLIST;return o;}int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {/* Turn options into simple to check vars.可选参数解析*/int incr = (*flags & ZADD_INCR) != 0;int nx = (*flags & ZADD_NX) != 0;int xx = (*flags & ZADD_XX) != 0;*flags = 0; /* We'll return our response flags. */double curscore;/* NaN as input is an error regardless of all the other parameters.数值判断*/if (isnan(score)) {*flags = ZADD_NAN;return 0;}/* Update the sorted set according to its encoding.数据类型为ziplist 的情况*/if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {unsigned char *eptr;if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {/* NX? Return, same element already exists. */if (nx) {*flags |= ZADD_NOP;return 1;}/* Prepare the score for the increment if needed. */if (incr) {score += curscore;if (isnan(score)) {*flags |= ZADD_NAN;return 0;}if (newscore) *newscore = score;}/* Remove and re-insert when score changed.元素 score 有变化,则删除老节点,重新插入*/if (score != curscore) {zobj->ptr = zzlDelete(zobj->ptr,eptr);zobj->ptr = zzlInsert(zobj->ptr,ele,score);*flags |= ZADD_UPDATED;}return 1;} else if (!xx) {/* Optimize: check if the element is too large or the list* becomes too long *before* executing zzlInsert. */zobj->ptr = zzlInsert(zobj->ptr,ele,score);if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||sdslen(ele) > server.zset_max_ziplist_value)// 元素个数 或者 单个元素大小超过阈值任意条件满足就转化为skiplistzsetConvert(zobj,OBJ_ENCODING_SKIPLIST);if (newscore) *newscore = score;*flags |= ZADD_ADDED;return 1;} else {*flags |= ZADD_NOP;return 1;}// 数据类型为 跳表的情况} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {// 获取值指针zset *zs = zobj->ptr;zskiplistNode *znode;dictEntry *de;// O(1) 的时间复杂度,获取到元素de = dictFind(zs->dict,ele);if (de != NULL) {/* NX? Return, same element already exists.NX 互斥*/if (nx) {*flags |= ZADD_NOP;return 1;}// 当前分值curscore = *(double*)dictGetVal(de);/* Prepare the score for the increment if needed. */// 递增if (incr) {score += curscore;if (isnan(score)) {*flags |= ZADD_NAN;return 0;}if (newscore) *newscore = score;}/* Remove and re-insert when score changes.分值不同的场景*/if (score != curscore) {znode = zslUpdateScore(zs->zsl,curscore,ele,score);/* Note that we did not removed the original element from* the hash table representing the sorted set, so we just* update the score.*hash 表中不需要移除元素, 修改分值就可以了***/dictGetVal(de) = &znode->score; /* Update score ptr. */*flags |= ZADD_UPDATED;}return 1;// 元素不存在} else if (!xx) {ele = sdsdup(ele);// 插入新元素znode = zslInsert(zs->zsl,score,ele);serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);*flags |= ZADD_ADDED;if (newscore) *newscore = score;return 1;} else {*flags |= ZADD_NOP;return 1;}} else {serverPanic("Unknown sorted set encoding");}return 0; /* Never reached. */}// 往跳表中 新增元素zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;unsigned int rank[ZSKIPLIST_MAXLEVEL];int i, level;// 数值判断serverAssert(!isnan(score));x = zsl->header;// 遍历所有层高 , 寻找插入点: 高位 -> 低位for (i = zsl->level-1; i >= 0; i--) {/* store rank that is crossed to reach the insert position存储排位,便于更新*/rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];while (x->level[i].forward &&(x->level[i].forward->score < score ||// 找到第一个比新分值大的节点 , 前面一个位置即是插入点(x->level[i].forward->score == score &&sdscmp(x->level[i].forward->ele,ele) < 0)))//相同分值则按字典序排序{rank[i] += x->level[i].span;// 累加排位分值x = x->level[i].forward;}update[i] = x;// 每一层的拐点}/* we assume the element is not already inside, since we allow duplicated* scores, reinserting the same element should never happen since the* caller of zslInsert() should test in the hash table if the element is* already inside or not.** */level = zslRandomLevel();// 幂次定律, 随机生成层高 ,越高的层出现概率越低if (level > zsl->level) {//随机层高大于当前的最大层高,则初始化新的层高for (i = zsl->level; i < level; i++) {rank[i] = 0;update[i] = zsl->header;update[i]->level[i].span = zsl->length;//header 最层都是跳表的长度}zsl->level = level;}x = zslCreateNode(level,score,ele);// 创建新的节点for (i = 0; i < level; i++) {x->level[i].forward = update[i]->level[i].forward;// 插入新节点update[i]->level[i].forward = x;/* update span covered by update[i] as x is inserted here更新 span信息*/x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);update[i]->level[i].span = (rank[0] - rank[i]) + 1;}/* increment span for untouched levels新加入节点,更新顶层 span*/for (i = level; i < zsl->level; i++) {update[i]->level[i].span++;}// 更新后退指针 和尾指针x->backward = (update[0] == zsl->header) ? NULL : update[0];if (x->level[0].forward)x->level[0].forward->backward = x;elsezsl->tail = x;zsl->length++;return x;}//返回一个随机的层数,不是level的索引是层数int zslRandomLevel(void) {int level = 1;while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//有1/4的概率加入到上一层中level += 1;return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;}
推荐阅读
- 七 Netty 学习:NioEventLoop 对应线程的创建和启动源码说明
- Spring mvc源码分析系列--Servlet的前世今生
- spring cron表达式源码分析
- 集合框架——LinkedList集合源码分析
- 含源码 手把手教你使用LabVIEW OpenCV DNN实现手写数字识别
- 补充部分---ScheduledThreadPoolExecutor类分析 线程池底层原理详解与源码分析
- 五 Netty 学习:服务端启动核心流程源码说明
- HashMap底层原理及jdk1.8源码解读
- 含源码 手把手教你使用LabVIEW人工智能视觉工具包快速实现传统Opencv算子的调用
- 含源码 手把手教你使用LabVIEW人工智能视觉工具包快速实现图像读取与采集