网站/小程序/APP个性化定制开发,二开,改版等服务,加扣:8582-36016

Redis是一个开源的、支持网络、基于内存的键值对存储系统,它可以用作数据库、缓存和消息中间件。它支持多种数据类型,包括字符串、散列、列表、集合、位图等,拥有极快的读写速度,并且支持丰富的特性,如事务、持久化、复制、脚本、发布/订阅等。


一、Redis的特性

1.1 Redis为什么快?

  • 基于内存操作,操作不需要跟磁盘交互,单次执行很快

  • 命令执行是单线程,因为是基于内存操作,单次执行的时间快于线程切换时间,同时通信采用多路复用

  • Redis本身就是一个k-v结构,类似于hashMap,所以查询性能接近于O(1)

  • 同时redis自己底层数据结构支持,比如跳表、SDS

  • lO多路复用,单个线程中通过记录跟踪每一个sock(I/O流)的状态来管理多个I/O流

1.2 Redis其他特性

  • 更丰富的数据类型,虽然都是k、v结构,value可以存储很多的数据类型

  • 完善的内存管理机制、保证数据一致性:持久化机制、过期策略

  • 支持多种编程语言

  • 高可用,集群、保证高可用

1.3 Redis高可用

  • 很完善的内存管理机制,过期、淘汰、持久化

  • 集群模式,主从、哨兵、cluster集群

二、Redis数据类型以及使用场景

Redis的数据类型有String、Hash、Set、List、Zset、bitMap(基于String类型)、 Hyperloglog(基于String类型)、Geo(地理位置)、Streams流。

2.1 String

2.1.1 基本指令

// 批量设置
> mset key1 value1 key2 value2
// 批量获取
> mget key1 key2
// 获取长度
> strlen key 
//  字符串追加内容
> append key xxx
// 获取指定区间的字符
> getrange key 0 5
// 整数值递增 (递增指定的值)
> incr intkey (incrby intkey 10)
// 当key 存在时将覆盖
> SETEX key seconds value
// 将 key 的值设为 value ,当且仅当 key 不存在。
> SETNX key value

2.1.2 应用场景

  1. 缓存相关场景 缓存、 token(跟过期属性完美契合)

  2. 线程安全的计数场景 (软限流、分布式ID等)

2.2 Hash

2.2.1 基本指令

// 将哈希表 key 中的域 field 的值设为 value 。
> HSET key field value
// 返回哈希表 key 中给定域 field 的值。
>  HGET key field
// 返回哈希表 key 中的所有域。
> HKEYS key
// 返回哈希表 key 中所有域的值。
>  HVALS key
// 为哈希表 key 中的域 field 的值加上增量 increment 。
> HINCRBY key field increment
// 查看哈希表 key 中,给定域 field 是否存在。
> HEXISTS key field

2.2.2 应用场景

  1. 存储对象类的数据(官网说的)比如一个对象下有多个字段

  2. 统计类的数据,可以对单个统计数据进行单独操作

2.3 List

存储有序的字符串列表,元素可以重复。列表的最大长度为 2^32 - 1 个元素(4294967295,每个列表超过 40 亿个元素)。

2.3.1 基本指令

// 将一个或多个值 value 插入到列表 key 的表头
> LPUSH key value [value ...]
// 将一个或多个值 value 插入到列表 key 的表尾(最右边)。
> RPUSH key value [value ...]
// 移除并返回列表 key 的头元素。
> LPOP key
// 移除并返回列表 key 的尾元素。
> RPOP key
// BLPOP 是列表的阻塞式(blocking)弹出原语。
> BLPOP key [key ...] timeout
// BRPOP 是列表的阻塞式(blocking)弹出原语。
> BRPOP key [key ...] timeout
// 获取指点位置元素
> LINDEX key index

2.3.2 应用场景

  1. 用户消息时间线

因为list是有序的,所以我们可以先进先出,先进后出的列表都可以做

2.4 Set

2.4.1 基本指令

// 将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。
> SADD key member [member ...]
// 返回集合 key 中的所有成员。
> SMEMBERS key
// 返回集合 key 的基数(集合中元素的数量)。
> SCARD key
// 如果命令执行时,只提供了 key 参数,那么返回集合中的一个随机元素。
> SRANDMEMBER key [count]
// 移除并返回集合中的一个随机元素。
> SPOP key
// 移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略。
> SREM key member [member ...]
// 判断 member 元素是否集合 key 的成员。
> SISMEMBER key member
// 获取前一个集合有而后面1个集合没有的
> sdiff huihuiset huihuiset1
// 获取交集
> sinter huihuiset huihuiset1
// 获取并集
> sunion huihuiset huihuiset1

2.4.2 应用场景

  1. 抽奖 spop跟srandmember随机弹出或者获取元素

  2. 点赞、签到等,sadd集合存储

  3. 交集并集 关注等场景

2.5 ZSet(SortedSet)

sorted set,有序的set,每个元素有个score。

score相同时,按照key的ASCII码排序。

2.5.1 基本指令

//将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
> ZADD key score member [[score member] [score member] ...]
// 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递增(从小到大)来排序
> ZRANGE key start stop [WITHSCORES]
// 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递减(从大到小)来排列。
> ZREVRANGE key start stop [WITHSCORES]
// 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。
有序集成员按 score 值递增(从小到大)次序排列。
>  ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
// 移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
> ZREM key member [member ...]
// 返回有序集 key 的基数。
> ZCARD key
// 为有序集 key 的成员 member 的 score 值加上增量 increment 。
> ZINCRBY key increment member
// 返回有序集 key 中, score 值在 min 和 max 之间(默认包括 score 值等于 min 或 max )的成员的数量。
> ZCOUNT key min max
// 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递增(从小到大)顺序排列。
> ZRANK key member

2.5.2 应用场景

  1. 排行榜

三、Redis的事务

3.1 事务基本操作

// 1. multi命令开启事务,exec命令提交事务
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
// 2. 组队的过程中可以通过discard来放弃组队。
 127.0.0.1:6379> multi
OK
 127.0.0.1:6379(TX)> set k3 v3
QUEUED
 127.0.0.1:6379(TX)> set k4 v4
QUEUED
 127.0.0.1:6379(TX)> discard
OK

3.2 Redis事务特性

  1. 单独的隔离操作

事务中的所有命令都会序列化、按顺序地执行。

事务在执行过程中,不会被其他客户端发送来的命令请求所打断。

  1. 没有隔离级别的概念

队列中的命令没有提交之前,都不会被实际地执行,因为事务提交之前任何指令都不会被实际执行,也就不存在“事务内的查询要看到事务里的更新,在事务外查询不能看到”这个让人万分头疼的问题。

  1. 不保证原子性

Redis同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚。

四、Redis 分布式锁

4.1 Lua 实现分布式锁

local lockSet = redis.call('exists', KEYS[1])
if lockSet == 0
  then
    redis.call('set', KEYS[1], ARG[1])
    redis.call('expire', KEYS[1], ARG[2])
  end
return lockSet

5.1 Redis dict字典

5.1.1 RedisDb数据接口(server.h文件)

数据最外层的结构为RedisDb

typedef struct redisDb {
    dict *dict; /* The keyspace for this DB */ //数据库的键
    dict *expires; /* Timeout of keys with a timeout set */ //设置了超时时间的键
    dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客户端等待的keys
    dict *ready_keys; /* Blocked keys that received a PUSH */
    dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
    int id; /* Database ID */ //所在数 据库ID
    long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,仅用于统计
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

5.1.2 dict数据结构(dict.h文件)

我们发现数据存储在dict中

typedef struct dict {
    dictType *type; //理解为面向对象思想,为支持不同的数据类型对应dictType抽象方法,
    不同的数据类型可以不同实现
    void *privdata; //也可不同的数据类型相关,不同类型特定函数的可选参数。
    dictht ht[2]; //2个hash表,用来数据存储 2个dictht
    long rehashidx; /* rehashing not in progress if rehashidx == -1 
    */ // rehash标记 -1代表不再rehash
    unsigned long iterators; /* number of iterators currently running */
} dict;

5.1.3 dictht结构(dict.h文件)

typedef struct dictht {
    dictEntry **table; //dictEntry 数组
    unsigned long size; //数组大小 默认为4 #define DICT_HT_INITIAL_SIZE 4
    unsigned long sizemask; //size-1 用来取模得到数据的下标值
    unsigned long used; //改hash表中已有的节点数据
} dictht;

5.1.4 dictEntry节点结构(dict.h文件)

typedef struct dictEntry {
    void *key; //key
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v; //value
    struct dictEntry *next; //指向下一个节点
} dictEntry;
/* Objects encoding. Some kind of objects like Strings and
Hashes can be
* internally represented in multiple ways. The 'encoding'
field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old
list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string
encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list
of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree
of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of
obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution
in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object
never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object
allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject {
unsigned type:4; //数据类型 string hash list
unsigned encoding:4; //底层的数据结构 跳表
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant
8 bits frequency
* and most significant 16 bits
access time). */
int refcount; //用于回收,引用计数法
void *ptr; //指向具体的数据结构的内存地址 比如 跳表、压缩列表
} robj;

5.2 Redis数据结构图

一文搞懂Redis_Redis

5.3 Redis扩容机制<360>

因为我们的dictEntry数组默认大小是4,如果不进行扩容,那么我们所有的数据都会以链表的形式添加至数组下标 随着数据量越来越大,之前只需要hash取模就能得到下标位置,现在得去循环我下标的链表,所以性能会越来越慢,当我们的数据量达到一定程度后,我们就得去触发扩容操作。

5.3.1 什么时候扩容

5.3.1.1 扩容机制
  1. 当没有fork子进程在进行RDB或AOF持久化时,ht[0]的used大于size时,触发扩容

  2. 如果有子进程在进行RDB或者AOF时,ht[0]的used大于等于size的5倍的时候,会触发扩容

5.3.1.2 源码验证

扩容,肯定是在添加数据的时候才会扩容,所以我们找一个添加数据的入口。

  1. 入口,添加或替换dictReplace (dict.c文件)

int dictReplace(dict *d, void *key, void *val) {
    dictEntry *entry, *existing, auxentry;
    /* Try to add the element. If the key
    * does not exists dictAdd will succeed. */
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }
    /* Set the new value and free the old one. Note that
    it is important
    * to do that in this order, as the value may just be
    exactly the same
    * as the previous one. In this context, think to
    reference counting,
    * you want to increment (set), and then decrement
    (free), and not the
    * reverse. */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}
  1. 进入dictAddRaw方法 (dict.c文件)

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing){
    long index;
    dictEntry *entry;
    dictht *ht;
         if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 进行渐进式hash 按步rehash
         /* Get the index of the new element, or -1 if
    * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
        return NULL;
         /* Allocate the memory and store the new entry.
    * Insert the element in top, with the assumption that
    in a database
    * system it is more likely that recently added
    entries are accessed
    * more frequently. */
    //如果在hash 放在ht[1] 否则放在ht[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    //采用头插法插入hash桶
    entry->next = ht->table[index];
    ht->table[index] = entry;
    //已使用++
    ht->used++;
         /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
static long _dictKeyIndex(dict *d, const void *key,uint64_t hash, dictEntry **existing){
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;
         /* Expand the hash table if needed */
    //扩容如果需要
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    //比较是否存在这个值
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain
        the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key,he->key)) {
                if (existing) *existing = he;
                    return -1;
                }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d){
    /* Incremental rehashing already in progress. Return.*/
    if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回
         /* If the hash table is empty expand it to the initialsize. */
    //如果ht[0]的长度为0,设置默认长度4 dictExpand为扩容的关键方法
    if (d->ht[0].size == 0)
            return dictExpand(d,DICT_HT_INITIAL_SIZE);
     //扩容条件 hash表中已使用的数量大于等于 hash表的大小
    //并且dict_can_resize为1 或者 已使用的大小大于hash表大小的 5倍,大于等于1倍的时候,
    下面2个满足一个条件即可
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
        //扩容成原来的2倍
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}
5.3.2.1 扩容步骤
  1. 当满足我扩容条件,触发扩容时,判断是否在扩容,如果在扩容,或者 扩容的大小跟我现在的ht[0].size一样,这次扩容不做

  2. new一个新的dictht,大小为ht[0].used * 2(但是必须向上2的幂,比 如6 ,那么大小为8) ,并且ht[1]=新创建的dictht

  3. 我们有个更大的table了,但是需要把数据迁移到ht[1].table ,所以将 dict的rehashidx(数据迁移的偏移量)赋值为0 ,代表可以进行数据迁 移了,也就是可以rehash了

  4. 等待数据迁移完成,数据不会马上迁移,而是采用渐进式rehash,慢慢的把数据迁移到ht[1]

  5. 当数据迁移完成,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0

  6. 把dict的rehashidex=-1

5.3.2.2 源码验证
  1. dictExpand方法在_dictExpandIfNeeded 方法中调用 (dict.c文件),为扩容的关键方法

int dictExpand(dict *d, unsigned long size){
    /* the size is invalid if it is smaller than the
    number of
    * elements already inside the hash table */
    //正在扩容,不允许第二次扩容,或者ht[0]的数据量大于扩容后的数量,没有意义,这次不扩容
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;
         dictht n; /* the new hash table */
    //得到最接近2的幂 跟hashMap思想一样
    unsigned long realsize = _dictNextPower(size);
         /* Rehashing to the same table size is not useful. */
    //如果跟ht[0]的大小一致 直接返回错误
    if (realsize == d->ht[0].size) return DICT_ERR;
         /* Allocate the new hash table and initialize all
    pointers to NULL */
    //为新的dictht赋值
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;
         /* Is this the first initialization? If so it's not
    really a rehashing
    * we just set the first hash table so that it can
    accept keys. */
    //ht[0].table为空,代表是初始化
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }
         /* Prepare a second hash table for incremental rehashing */
    //将扩容后的dictht赋值给ht[1]
    d->ht[1] = n;
    //标记为0,代表可以开始rehash
    d->rehashidx = 0;
    return DICT_OK;
}
5.3.3.1 迁移方式

假如一次性把数据全部迁移会很耗时间,会让单条指令等待很久,会形成阻塞。

所以,redis采用渐进式Rehash,所谓渐进式,就是慢慢的,不会一次性把所有数据迁移。

那什么时候会进行渐进式数据迁移?

  1. 每次进行key的crud操作都会进行一个hash桶的数据迁移

  2. 定时任务,进行部分数据迁移

5.3.3.2 源码验证
CRUD触发都会触发_dictRehashStep(渐进式hash)
  1. 每次增删改的时候都会调用_dictRehashStep方法

if (dictIsRehashing(d)) _dictRehashStep(d); //这个代码在增删改查的方法中都会调用
  1. _dictRehashStep方法

static void _dictRehashStep(dict *d) {
  if (d->iterators == 0) dictRehash(d,1);
}
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty
    buckets to visit. */ //要访问的最大的空桶数 防止此次耗时过长
         if (!dictIsRehashing(d)) return 0; //如果没有在rehash返回0
         //循环,最多1次,并且ht[0]的数据没有迁移完 进入循环
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;
        /* Note that rehashidx can't overflow as we are
        sure there are more
        * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        //rehashidx rehash的索引,默认0 如果hash桶为空 进入自旋 并且判断是否到了最大的访问空桶数
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下标为rehashidx桶
        /* Move all the keys in this bucket from the old
        to the new hash HT */
        while(de) { //循环hash桶的链表 并且转移到ht[1]
            uint64_t h;
            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d-
            >ht[1].sizemask;
            de->next = d->ht[1].table[h]; //头插
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        //转移完后 将ht[0]相对的hash桶设置为null
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    /* Check if we already rehashed the whole table... */
    //ht[0].used=0 代表rehash全部完成
    if (d->ht[0].used == 0) {
        //清空ht[0]table
        zfree(d->ht[0].table);
        //将ht[0] 重新指向已经完成rehash的ht[1]
        d->ht[0] = d->ht[1];
        //将ht[1]所有数据重新设置
        _dictReset(&d->ht[1]);
        //设置-1,代表rehash完成
        d->rehashidx = -1;
        return 0;
    }
    /* More to rehash... */
    return 1;
}
 //将ht[1]的属性复位
static void _dictReset(dictht *ht)
{
    ht->table = NULL;
    ht->size = 0;
    ht->sizemask = 0;
    ht->used = 0;
}

再讲定时任务之前,我们要知道Redis中有个时间事件驱动,在 server.c文件下有个serverCron 方法。

serverCron 每隔多久会执行一次,执行频率根据redis.conf中的hz配置,serverCorn中有个方法databasesCron()

  1. 定时方法serverCron

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //.......
    /* We need to do a few operations on clients
    asynchronously. */
    clientsCron();
    /* Handle background operations on Redis databases. */
    databasesCron(); //处理Redis数据库上的后台操作。
    //.......
}
  1. databasesCron方法

void databasesCron(void) {
    /* Expire keys by random sampling. Not required for
    slaves
    * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }
    /* Defrag keys gradually. */
    activeDefragCycle();
    /* Perform hash tables rehashing if needed, but only
    if there are no
    * other processes saving the DB on disk. Otherwise
    rehashing is bad
    * as will cause a lot of copy-on-write of memory
    pages. */
    if (!hasActiveChildProcess()) {
        /* We use global counters so if we stop the
        computation at a given
        * DB we'll be able to start from the successive
        in the next
        * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;
        /* Don't test more DBs than we have. */
        if (dbs_per_call > server.dbnum)
            dbs_per_call = server.dbnum;
        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }
        /* Rehash */ //Rehash逻辑
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                //进入incrementallyRehash
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop
                        here, we'll do
                        * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash,
                        we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
}
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1;
        /* already used our millisecond for this
        loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this
        loop... */
    }
    return 0;
}
  1. 进入dictRehashMilliseconds(dict.c)

int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;
    //进入rehash方法 dictRehash 去完成渐进时HASH
    while(dictRehash(d,100)) {
        rehashes += 100;
        //判断是否超时
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

5.4 String类型数据结构

Redis中String的底层没有用c的char来实现,而是采用了SDS(simple Dynamic String)的数据结构来实现。并且提供了5种不同的类型

5.4.1 SDS数据结构定义(sds.h文件)

typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags
    byte directly.
    * However is here to document the layout of type 5 SDS
strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of
        string length */
    char buf[];
}
 struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len;  /* used */ //已使用的长度
    uint8_t alloc; /* excluding the header and null terminator 
    */ //分配的总容量 不包含头和空终止符
    unsigned char flags; /* 3 lsb of type, 5 unused bits */ //低三位类型 高5bit未使用
    char buf[]; //数据buf 存储字符串数组
};
 struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};
 struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};
 struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};
  1. char字符串不记录自身长度,所以获取一个字符串长度的复杂度是O(n),但是SDS记录分配的长度alloc,已使用的长度len,获取长度为 O(1)

  2. 减少修改字符串带来的内存重分配次数

  3. 空间预分配,SDS长度如果小于1MB,预分配跟长度一样的,大于1M,每次跟len的大小多1M

  4. 惰性空间释放,截取的时候不马上释放空间,供下次使用!同时提供相应的释放SDS未使用空间的API

  5. 二进制安全

5.5 Hash类型数据结构

Redis的字典相当于Java语言中的HashMap,他是无序字典。内部结构上同样是数组 + 链表二维结构。第一维hash的数组位置碰撞时,就会将碰撞的元素使用链表串起来。

5.5.1 Hash数据结构图

一文搞懂Redis_Redis_02

5.5.2 Hash的压缩列表

压缩列表会根据存入的数据的不同类型以及不同大小,分配不同大小的空间。所以是为了节省内存而采用的。

因为是一块完整的内存空间,当里面的元素发生变更时,会产生连锁更新,严重影响我们的访问性能。所以,只适用于数据量比较小的场景。

所以,Redis会有相关配置,Hashes只有小数据量的时候才会用到ziplist,当hash对象同时满足以下两个条件的时候,使用ziplist编码:

  1. 哈希对象保存的键值对数量<512个

  2. 所有的键值对的键和值的字符串长度都<64byte(一个英文字母一个字节)

hash-max-ziplist-value 64 // ziplist中最大能存放的值长度
hash-max-ziplist-entries 512 // ziplist中最多能存放的entry节点数量

5.6 List类型数据结构

5.6.1 quickList快速列表

兼顾了ziplist的节省内存,并且一定程度上解决了连锁更新的问题,我们的 quicklistNode节点里面是个ziplist,每个节点是分开的。那么就算发生了连锁更新,也只会发生在一个quicklistNode节点。

quicklist.h文件

typedef struct{
    struct quicklistNode *prev; //前指针
    struct quicklistNode *next; //后指针
    unsigned char *zl; //数据指针 指向ziplist结果
    unsigned int sz; //ziplist大小 /* ziplist
    size in bytes */
    unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素
    unsigned int encoding : 2; /* RAW==1 or LZF==2 */ //
    是否压缩, 1没有压缩 2 lzf压缩
    unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //预留容器字段
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for  future usage */ //预留字段
} quicklistNode;

5.6.2 list数据结构图

一文搞懂Redis_redis dict_03

5.7 Set类型数据结构

Redis用intset或hashtable存储set。满足下面条件,就用inset存储。

  1. 如果不是整数类型,就用dictht hash表(数组+链表)

  2. 如果元素个数超过512个,也会用hashtable存储。跟一个配置有关:

set-max-intset-entries 512

不满足上述条件的,都使用hash表存储,value存null。

5.8 ZSet数据结构

5.8.1 ZipList

默认使用ziplist编码。

在ziplist的内部,按照score排序递增来存储。插入的时候要移动之后的数据。

如果元素数量大于等于128个,或者任一member长度大于等于64字节使用 skiplist+dict存储。

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

如果不满足条件,采用跳表。

5.8.2 跳表(skiplist)

结构定义(servier.h)

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
 sds ele; //sds数据
 double score; //score
 struct zskiplistNode *backward; //后退指针
 //层级数组
 struct zskiplistLevel {
     struct zskiplistNode *forward; //前进指针
     unsigned long span; //跨度
 } level[];
} zskiplistNode;
 //跳表列表
typedef struct zskiplist {
 struct zskiplistNode *header, *tail; //头尾节点
 unsigned long length; //节点数量
 int level; //最大的节点层级
} zskiplist;

zslInsert 添加节点方法 (t_zset.c)

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds
ele) {
 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 unsigned int rank[ZSKIPLIST_MAXLEVEL];
 int i, level;
 serverAssert(!isnan(score));
 x = zsl->header;
 for (i = zsl->level-1; i >= 0; i--) {
     /* store rank that is crossed to reach the insert
     position */
     rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
     while (x->level[i].forward &&
         (x->level[i].forward->score < score ||
         咕泡出品 必属精品精品属精品 咕泡出品 必属精品 咕泡出品 必属精品 咕泡咕泡
         (x->level[i].forward->score == score
         &&
         sdscmp(x->level[i].forward->ele,ele) <
         0)))
         {
         rank[i] += x->level[i].span;
         x = x->level[i].forward;
     }
     update[i] = x;
 }
 /* we assume the element is not already inside, since
 we allow duplicated
    * scores, reinserting the same element should never
    happen since the
    * caller of zslInsert() should test in the hash table
    if the element is
    * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(leve                                                    

评论 0

暂无评论
0
0
0
立即
投稿
发表
评论
返回
顶部