Redis是一个开源的、支持网络、基于内存的键值对存储系统,它可以用作数据库、缓存和消息中间件。它支持多种数据类型,包括字符串、散列、列表、集合、位图等,拥有极快的读写速度,并且支持丰富的特性,如事务、持久化、复制、脚本、发布/订阅等。
一、Redis的特性
1.1 Redis为什么快?
基于内存操作,操作不需要跟磁盘交互,单次执行很快
命令执行是单线程,因为是基于内存操作,单次执行的时间快于线程切换时间,同时通信采用多路复用
Redis本身就是一个k-v结构,类似于hashMap,所以查询性能接近于O(1)
同时redis自己底层数据结构支持,比如跳表、SDS
lO多路复用,单个线程中通过记录跟踪每一个sock(I/O流)的状态来管理多个I/O流
1.2 Redis其他特性
更丰富的数据类型,虽然都是k、v结构,value可以存储很多的数据类型
完善的内存管理机制、保证数据一致性:持久化机制、过期策略
支持多种编程语言
高可用,集群、保证高可用
1.3 Redis高可用
很完善的内存管理机制,过期、淘汰、持久化
集群模式,主从、哨兵、cluster集群
二、Redis数据类型以及使用场景
Redis的数据类型有String、Hash、Set、List、Zset、bitMap(基于String类型)、 Hyperloglog(基于String类型)、Geo(地理位置)、Streams流。
2.1 String
2.1.1 基本指令
// 批量设置 > mset key1 value1 key2 value2 // 批量获取 > mget key1 key2 // 获取长度 > strlen key // 字符串追加内容 > append key xxx // 获取指定区间的字符 > getrange key 0 5 // 整数值递增 (递增指定的值) > incr intkey (incrby intkey 10) // 当key 存在时将覆盖 > SETEX key seconds value // 将 key 的值设为 value ,当且仅当 key 不存在。 > SETNX key value
2.1.2 应用场景
缓存相关场景 缓存、 token(跟过期属性完美契合)
线程安全的计数场景 (软限流、分布式ID等)
2.2 Hash
2.2.1 基本指令
// 将哈希表 key 中的域 field 的值设为 value 。 > HSET key field value // 返回哈希表 key 中给定域 field 的值。 > HGET key field // 返回哈希表 key 中的所有域。 > HKEYS key // 返回哈希表 key 中所有域的值。 > HVALS key // 为哈希表 key 中的域 field 的值加上增量 increment 。 > HINCRBY key field increment // 查看哈希表 key 中,给定域 field 是否存在。 > HEXISTS key field
2.2.2 应用场景
存储对象类的数据(官网说的)比如一个对象下有多个字段
统计类的数据,可以对单个统计数据进行单独操作
2.3 List
存储有序的字符串列表,元素可以重复。列表的最大长度为 2^32 - 1 个元素(4294967295,每个列表超过 40 亿个元素)。
2.3.1 基本指令
// 将一个或多个值 value 插入到列表 key 的表头 > LPUSH key value [value ...] // 将一个或多个值 value 插入到列表 key 的表尾(最右边)。 > RPUSH key value [value ...] // 移除并返回列表 key 的头元素。 > LPOP key // 移除并返回列表 key 的尾元素。 > RPOP key // BLPOP 是列表的阻塞式(blocking)弹出原语。 > BLPOP key [key ...] timeout // BRPOP 是列表的阻塞式(blocking)弹出原语。 > BRPOP key [key ...] timeout // 获取指点位置元素 > LINDEX key index
2.3.2 应用场景
用户消息时间线
因为list是有序的,所以我们可以先进先出,先进后出的列表都可以做
2.4 Set
2.4.1 基本指令
// 将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。 > SADD key member [member ...] // 返回集合 key 中的所有成员。 > SMEMBERS key // 返回集合 key 的基数(集合中元素的数量)。 > SCARD key // 如果命令执行时,只提供了 key 参数,那么返回集合中的一个随机元素。 > SRANDMEMBER key [count] // 移除并返回集合中的一个随机元素。 > SPOP key // 移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略。 > SREM key member [member ...] // 判断 member 元素是否集合 key 的成员。 > SISMEMBER key member // 获取前一个集合有而后面1个集合没有的 > sdiff huihuiset huihuiset1 // 获取交集 > sinter huihuiset huihuiset1 // 获取并集 > sunion huihuiset huihuiset1
2.4.2 应用场景
抽奖 spop跟srandmember随机弹出或者获取元素
点赞、签到等,sadd集合存储
交集并集 关注等场景
2.5 ZSet(SortedSet)
sorted set,有序的set,每个元素有个score。
score相同时,按照key的ASCII码排序。
2.5.1 基本指令
//将一个或多个 member 元素及其 score 值加入到有序集 key 当中。 > ZADD key score member [[score member] [score member] ...] // 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递增(从小到大)来排序 > ZRANGE key start stop [WITHSCORES] // 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递减(从大到小)来排列。 > ZREVRANGE key start stop [WITHSCORES] // 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。 有序集成员按 score 值递增(从小到大)次序排列。 > ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] // 移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。 > ZREM key member [member ...] // 返回有序集 key 的基数。 > ZCARD key // 为有序集 key 的成员 member 的 score 值加上增量 increment 。 > ZINCRBY key increment member // 返回有序集 key 中, score 值在 min 和 max 之间(默认包括 score 值等于 min 或 max )的成员的数量。 > ZCOUNT key min max // 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递增(从小到大)顺序排列。 > ZRANK key member
2.5.2 应用场景
排行榜
三、Redis的事务
3.1 事务基本操作
// 1. multi命令开启事务,exec命令提交事务 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> set k1 v1 QUEUED 127.0.0.1:6379(TX)> set k2 v2 QUEUED 127.0.0.1:6379(TX)> exec 1) OK 2) OK // 2. 组队的过程中可以通过discard来放弃组队。 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> set k3 v3 QUEUED 127.0.0.1:6379(TX)> set k4 v4 QUEUED 127.0.0.1:6379(TX)> discard OK
3.2 Redis事务特性
单独的隔离操作
事务中的所有命令都会序列化、按顺序地执行。
事务在执行过程中,不会被其他客户端发送来的命令请求所打断。
没有隔离级别的概念
队列中的命令没有提交之前,都不会被实际地执行,因为事务提交之前任何指令都不会被实际执行,也就不存在“事务内的查询要看到事务里的更新,在事务外查询不能看到”这个让人万分头疼的问题。
不保证原子性
Redis同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚。
四、Redis 分布式锁
4.1 Lua 实现分布式锁
local lockSet = redis.call('exists', KEYS[1]) if lockSet == 0 then redis.call('set', KEYS[1], ARG[1]) redis.call('expire', KEYS[1], ARG[2]) end return lockSet
5.1 Redis dict字典
5.1.1 RedisDb数据接口(server.h文件)
数据最外层的结构为RedisDb
typedef struct redisDb { dict *dict; /* The keyspace for this DB */ //数据库的键 dict *expires; /* Timeout of keys with a timeout set */ //设置了超时时间的键 dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客户端等待的keys dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ //所在数 据库ID long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,仅用于统计 unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb;
5.1.2 dict数据结构(dict.h文件)
我们发现数据存储在dict中
typedef struct dict { dictType *type; //理解为面向对象思想,为支持不同的数据类型对应dictType抽象方法, 不同的数据类型可以不同实现 void *privdata; //也可不同的数据类型相关,不同类型特定函数的可选参数。 dictht ht[2]; //2个hash表,用来数据存储 2个dictht long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash标记 -1代表不再rehash unsigned long iterators; /* number of iterators currently running */ } dict;
5.1.3 dictht结构(dict.h文件)
typedef struct dictht { dictEntry **table; //dictEntry 数组 unsigned long size; //数组大小 默认为4 #define DICT_HT_INITIAL_SIZE 4 unsigned long sizemask; //size-1 用来取模得到数据的下标值 unsigned long used; //改hash表中已有的节点数据 } dictht;
5.1.4 dictEntry节点结构(dict.h文件)
typedef struct dictEntry { void *key; //key union { void *val; uint64_t u64; int64_t s64; double d; } v; //value struct dictEntry *next; //指向下一个节点 } dictEntry;
/* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ #define OBJ_ENCODING_RAW 0 /* Raw representation */ #define OBJ_ENCODING_INT 1 /* Encoded as integer */ #define OBJ_ENCODING_HT 2 /* Encoded as hash table */ #define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ #define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */ #define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ #define OBJ_ENCODING_INTSET 6 /* Encoded as intset */ #define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ #define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */ #define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */ #define LRU_BITS 24 #define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ #define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */ #define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT typedef struct redisObject { unsigned type:4; //数据类型 string hash list unsigned encoding:4; //底层的数据结构 跳表 unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; //用于回收,引用计数法 void *ptr; //指向具体的数据结构的内存地址 比如 跳表、压缩列表 } robj;
5.2 Redis数据结构图
5.3 Redis扩容机制<360>
因为我们的dictEntry数组默认大小是4,如果不进行扩容,那么我们所有的数据都会以链表的形式添加至数组下标 随着数据量越来越大,之前只需要hash取模就能得到下标位置,现在得去循环我下标的链表,所以性能会越来越慢,当我们的数据量达到一定程度后,我们就得去触发扩容操作。
5.3.1 什么时候扩容
5.3.1.1 扩容机制
当没有fork子进程在进行RDB或AOF持久化时,ht[0]的used大于size时,触发扩容
如果有子进程在进行RDB或者AOF时,ht[0]的used大于等于size的5倍的时候,会触发扩容
5.3.1.2 源码验证
扩容,肯定是在添加数据的时候才会扩容,所以我们找一个添加数据的入口。
入口,添加或替换dictReplace (dict.c文件)
int dictReplace(dict *d, void *key, void *val) { dictEntry *entry, *existing, auxentry; /* Try to add the element. If the key * does not exists dictAdd will succeed. */ entry = dictAddRaw(d,key,&existing); if (entry) { dictSetVal(d, entry, val); return 1; } /* Set the new value and free the old one. Note that it is important * to do that in this order, as the value may just be exactly the same * as the previous one. In this context, think to reference counting, * you want to increment (set), and then decrement (free), and not the * reverse. */ auxentry = *existing; dictSetVal(d, existing, val); dictFreeVal(d, &auxentry); return 0; }
进入dictAddRaw方法 (dict.c文件)
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing){ long index; dictEntry *entry; dictht *ht; if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 进行渐进式hash 按步rehash /* Get the index of the new element, or -1 if * the element already exists. */ if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1) return NULL; /* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ //如果在hash 放在ht[1] 否则放在ht[0] ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; entry = zmalloc(sizeof(*entry)); //采用头插法插入hash桶 entry->next = ht->table[index]; ht->table[index] = entry; //已使用++ ht->used++; /* Set the hash entry fields. */ dictSetKey(d, entry, key); return entry; }
static long _dictKeyIndex(dict *d, const void *key,uint64_t hash, dictEntry **existing){ unsigned long idx, table; dictEntry *he; if (existing) *existing = NULL; /* Expand the hash table if needed */ //扩容如果需要 if (_dictExpandIfNeeded(d) == DICT_ERR) return -1; //比较是否存在这个值 for (table = 0; table <= 1; table++) { idx = hash & d->ht[table].sizemask; /* Search if this slot does not already contain the given key */ he = d->ht[table].table[idx]; while(he) { if (key==he->key || dictCompareKeys(d, key,he->key)) { if (existing) *existing = he; return -1; } he = he->next; } if (!dictIsRehashing(d)) break; } return idx; }
/* Expand the hash table if needed */ static int _dictExpandIfNeeded(dict *d){ /* Incremental rehashing already in progress. Return.*/ if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回 /* If the hash table is empty expand it to the initialsize. */ //如果ht[0]的长度为0,设置默认长度4 dictExpand为扩容的关键方法 if (d->ht[0].size == 0) return dictExpand(d,DICT_HT_INITIAL_SIZE); //扩容条件 hash表中已使用的数量大于等于 hash表的大小 //并且dict_can_resize为1 或者 已使用的大小大于hash表大小的 5倍,大于等于1倍的时候, 下面2个满足一个条件即可 if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){ //扩容成原来的2倍 return dictExpand(d, d->ht[0].used*2); } return DICT_OK; }
5.3.2.1 扩容步骤
当满足我扩容条件,触发扩容时,判断是否在扩容,如果在扩容,或者 扩容的大小跟我现在的ht[0].size一样,这次扩容不做
new一个新的dictht,大小为ht[0].used * 2(但是必须向上2的幂,比 如6 ,那么大小为8) ,并且ht[1]=新创建的dictht
我们有个更大的table了,但是需要把数据迁移到ht[1].table ,所以将 dict的rehashidx(数据迁移的偏移量)赋值为0 ,代表可以进行数据迁 移了,也就是可以rehash了
等待数据迁移完成,数据不会马上迁移,而是采用渐进式rehash,慢慢的把数据迁移到ht[1]
当数据迁移完成,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
把dict的rehashidex=-1
5.3.2.2 源码验证
dictExpand方法在_dictExpandIfNeeded 方法中调用 (dict.c文件),为扩容的关键方法
int dictExpand(dict *d, unsigned long size){ /* the size is invalid if it is smaller than the number of * elements already inside the hash table */ //正在扩容,不允许第二次扩容,或者ht[0]的数据量大于扩容后的数量,没有意义,这次不扩容 if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR; dictht n; /* the new hash table */ //得到最接近2的幂 跟hashMap思想一样 unsigned long realsize = _dictNextPower(size); /* Rehashing to the same table size is not useful. */ //如果跟ht[0]的大小一致 直接返回错误 if (realsize == d->ht[0].size) return DICT_ERR; /* Allocate the new hash table and initialize all pointers to NULL */ //为新的dictht赋值 n.size = realsize; n.sizemask = realsize-1; n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0; /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ //ht[0].table为空,代表是初始化 if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; } /* Prepare a second hash table for incremental rehashing */ //将扩容后的dictht赋值给ht[1] d->ht[1] = n; //标记为0,代表可以开始rehash d->rehashidx = 0; return DICT_OK; }
5.3.3.1 迁移方式
假如一次性把数据全部迁移会很耗时间,会让单条指令等待很久,会形成阻塞。
所以,redis采用渐进式Rehash,所谓渐进式,就是慢慢的,不会一次性把所有数据迁移。
那什么时候会进行渐进式数据迁移?
每次进行key的crud操作都会进行一个hash桶的数据迁移
定时任务,进行部分数据迁移
5.3.3.2 源码验证
CRUD触发都会触发_dictRehashStep(渐进式hash)
每次增删改的时候都会调用_dictRehashStep方法
if (dictIsRehashing(d)) _dictRehashStep(d); //这个代码在增删改查的方法中都会调用
_dictRehashStep方法
static void _dictRehashStep(dict *d) { if (d->iterators == 0) dictRehash(d,1); }
int dictRehash(dict *d, int n) { int empty_visits = n*10; /* Max number of empty buckets to visit. */ //要访问的最大的空桶数 防止此次耗时过长 if (!dictIsRehashing(d)) return 0; //如果没有在rehash返回0 //循环,最多1次,并且ht[0]的数据没有迁移完 进入循环 while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; /* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ assert(d->ht[0].size > (unsigned long)d->rehashidx); //rehashidx rehash的索引,默认0 如果hash桶为空 进入自旋 并且判断是否到了最大的访问空桶数 while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; } de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下标为rehashidx桶 /* Move all the keys in this bucket from the old to the new hash HT */ while(de) { //循环hash桶的链表 并且转移到ht[1] uint64_t h; nextde = de->next; /* Get the index in the new hash table */ h = dictHashKey(d, de->key) & d- >ht[1].sizemask; de->next = d->ht[1].table[h]; //头插 d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } //转移完后 将ht[0]相对的hash桶设置为null d->ht[0].table[d->rehashidx] = NULL; d->rehashidx++; } /* Check if we already rehashed the whole table... */ //ht[0].used=0 代表rehash全部完成 if (d->ht[0].used == 0) { //清空ht[0]table zfree(d->ht[0].table); //将ht[0] 重新指向已经完成rehash的ht[1] d->ht[0] = d->ht[1]; //将ht[1]所有数据重新设置 _dictReset(&d->ht[1]); //设置-1,代表rehash完成 d->rehashidx = -1; return 0; } /* More to rehash... */ return 1; } //将ht[1]的属性复位 static void _dictReset(dictht *ht) { ht->table = NULL; ht->size = 0; ht->sizemask = 0; ht->used = 0; }
再讲定时任务之前,我们要知道Redis中有个时间事件驱动,在 server.c文件下有个serverCron 方法。
serverCron 每隔多久会执行一次,执行频率根据redis.conf中的hz配置,serverCorn中有个方法databasesCron()
定时方法serverCron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { //....... /* We need to do a few operations on clients asynchronously. */ clientsCron(); /* Handle background operations on Redis databases. */ databasesCron(); //处理Redis数据库上的后台操作。 //....... }
databasesCron方法
void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); } } /* Defrag keys gradually. */ activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ if (!hasActiveChildProcess()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; /* Resize */ for (j = 0; j < dbs_per_call; j++) { tryResizeHashTables(resize_db % server.dbnum); resize_db++; } /* Rehash */ //Rehash逻辑 if (server.activerehashing) { for (j = 0; j < dbs_per_call; j++) { //进入incrementallyRehash int work_done = incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ break; } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db++; rehash_db %= server.dbnum; } } } } }
int incrementallyRehash(int dbid) { /* Keys dictionary */ if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */ } /* Expires */ if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return 1; /* already used our millisecond for this loop... */ } return 0; }
进入dictRehashMilliseconds(dict.c)
int dictRehashMilliseconds(dict *d, int ms) { long long start = timeInMilliseconds(); int rehashes = 0; //进入rehash方法 dictRehash 去完成渐进时HASH while(dictRehash(d,100)) { rehashes += 100; //判断是否超时 if (timeInMilliseconds()-start > ms) break; } return rehashes; }
5.4 String类型数据结构
Redis中String的底层没有用c的char来实现,而是采用了SDS(simple Dynamic String)的数据结构来实现。并且提供了5种不同的类型
5.4.1 SDS数据结构定义(sds.h文件)
typedef char *sds; /* Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. */ struct __attribute__ ((__packed__)) sdshdr5 { unsigned char flags; /* 3 lsb of type, and 5 msb of string length */ char buf[]; } struct __attribute__ ((__packed__)) sdshdr8 { uint8_t len; /* used */ //已使用的长度 uint8_t alloc; /* excluding the header and null terminator */ //分配的总容量 不包含头和空终止符 unsigned char flags; /* 3 lsb of type, 5 unused bits */ //低三位类型 高5bit未使用 char buf[]; //数据buf 存储字符串数组 }; struct __attribute__ ((__packed__)) sdshdr16 { uint16_t len; /* used */ uint16_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr32 { uint32_t len; /* used */ uint32_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr64 { uint64_t len; /* used */ uint64_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; };
char字符串不记录自身长度,所以获取一个字符串长度的复杂度是O(n),但是SDS记录分配的长度alloc,已使用的长度len,获取长度为 O(1)
减少修改字符串带来的内存重分配次数
空间预分配,SDS长度如果小于1MB,预分配跟长度一样的,大于1M,每次跟len的大小多1M
惰性空间释放,截取的时候不马上释放空间,供下次使用!同时提供相应的释放SDS未使用空间的API
二进制安全
5.5 Hash类型数据结构
Redis的字典相当于Java语言中的HashMap,他是无序字典。内部结构上同样是数组 + 链表二维结构。第一维hash的数组位置碰撞时,就会将碰撞的元素使用链表串起来。
5.5.1 Hash数据结构图
5.5.2 Hash的压缩列表
压缩列表会根据存入的数据的不同类型以及不同大小,分配不同大小的空间。所以是为了节省内存而采用的。
因为是一块完整的内存空间,当里面的元素发生变更时,会产生连锁更新,严重影响我们的访问性能。所以,只适用于数据量比较小的场景。
所以,Redis会有相关配置,Hashes只有小数据量的时候才会用到ziplist,当hash对象同时满足以下两个条件的时候,使用ziplist编码:
哈希对象保存的键值对数量<512个
所有的键值对的键和值的字符串长度都<64byte(一个英文字母一个字节)
hash-max-ziplist-value 64 // ziplist中最大能存放的值长度 hash-max-ziplist-entries 512 // ziplist中最多能存放的entry节点数量
5.6 List类型数据结构
5.6.1 quickList快速列表
兼顾了ziplist的节省内存,并且一定程度上解决了连锁更新的问题,我们的 quicklistNode节点里面是个ziplist,每个节点是分开的。那么就算发生了连锁更新,也只会发生在一个quicklistNode节点。
quicklist.h文件
typedef struct{ struct quicklistNode *prev; //前指针 struct quicklistNode *next; //后指针 unsigned char *zl; //数据指针 指向ziplist结果 unsigned int sz; //ziplist大小 /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素 unsigned int encoding : 2; /* RAW==1 or LZF==2 */ // 是否压缩, 1没有压缩 2 lzf压缩 unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //预留容器字段 unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can't compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ //预留字段 } quicklistNode;
5.6.2 list数据结构图
5.7 Set类型数据结构
Redis用intset或hashtable存储set。满足下面条件,就用inset存储。
如果不是整数类型,就用dictht hash表(数组+链表)
如果元素个数超过512个,也会用hashtable存储。跟一个配置有关:
set-max-intset-entries 512
不满足上述条件的,都使用hash表存储,value存null。
5.8 ZSet数据结构
5.8.1 ZipList
默认使用ziplist编码。
在ziplist的内部,按照score排序递增来存储。插入的时候要移动之后的数据。
如果元素数量大于等于128个,或者任一member长度大于等于64字节使用 skiplist+dict存储。
zset-max-ziplist-entries 128 zset-max-ziplist-value 64
如果不满足条件,采用跳表。
5.8.2 跳表(skiplist)
结构定义(servier.h)
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; //sds数据 double score; //score struct zskiplistNode *backward; //后退指针 //层级数组 struct zskiplistLevel { struct zskiplistNode *forward; //前进指针 unsigned long span; //跨度 } level[]; } zskiplistNode; //跳表列表 typedef struct zskiplist { struct zskiplistNode *header, *tail; //头尾节点 unsigned long length; //节点数量 int level; //最大的节点层级 } zskiplist;
zslInsert 添加节点方法 (t_zset.c)
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || 咕泡出品 必属精品精品属精品 咕泡出品 必属精品 咕泡出品 必属精品 咕泡咕泡 (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(leve
发表评论 取消回复