Loading...

Redis 如何保证高性能

最近是真的是太懒了😥,都快忘记要写博客这一回事了。之前还可以借口说工作太忙,现在就只能坦率地承认了。意识到这点,赶紧狠狠地呼了自己一耳光,继续学习,😡Fighting!!!。

Redis 简介

关于 NoSQL 的优缺点,及其与关系型数据库的对比,此前已在 MongoDB 基础教程一文中已有描述,可自行前往查看。

Redis 诞生于 2009 年,其全称是 Remote Dictionary Server,远程词典服务器,是一个基于内存的键值型 NoSQL 数据库。

Redis 特征:

  • 键值型,value 支持多种不同数据结构,功能丰富。
  • 单线程,每个命令具备原子性。
  • 低延迟,速度快(基于内存,IO多路复用,良好的编码)。
  • 支持数据持久化。
  • 支持主从集群、分片集群。
  • 支持多语言客户端。

卸载安装

Redis 卸载

ps aux | grep redis # 查看是否启动redis,如已启动则停止
whereis redis # 搜索redis,核对并删除对应文件及目录

Redis 安装

  1. Redis 安装启动

    cd /usr/local
    mkdir redis
    cd redis/
    wget https://download.redis.io/releases/redis-6.2.6.tar.gz
    tar xzf redis-6.2.6.tar.gz
    cd redis-6.2.6
    make && make install
    # 检查安装
    cd /usr/local/bin
    # 任意位置运行redis
    cd ~
    redis-server # 前台启动
    # 后台启动
    cd /usr/local/redis/redis-6.2.6
    cp redis.conf redis.conf.backup # backup
    vi redis.conf # 根据需要修改配置
    redis-server redis.conf
    ps aux | grep redis

    redis.conf 配置项示例:

    # 监听地址,学习时可以放开允许所有IP
    bind 0.0.0.0
    # 后台运行,默认为前台运行
    daemonize yes
    # 设置redis密码
    requirepass foobared
  2. Redis 开机自启

    # 编辑启动脚本
    vi /etc/systemd/system/redis.service
    systemctl daemon-reload # 重载系统服务
    systemctl start redis # 启动redis
    systemctl status redis # 查看redis状态
    systemctl stop redis # 停止redis
    systemctl enable redis # 允许redis开机自启

    redis.service 启动脚本内容示例如下:

    [Unit]
    Description=redis-server
    After=network.target
    
    [Service]
    Type=forking
    ExecStart=/usr/local/bin/redis-server /usr/local/redis/redis-6.2.6/redis.conf
    PrivateTmp=true
    
    [Install]
    WantedBy=multi-user.target

    注:

    • 如启动不成功,尝试检查配置文件内容是否正确,端口是否被占用。

Redis 连接

  1. 命令行客户端:redis-cli

    cd ~ # 任意位置
    redis-cli -h 127.0.0.1 -p 6379 # 语法:redis-cli [options] [commonds]
    AUTH foobared # 语法:AUTH [username] password
    set name chinmoku
    get name
    exit # 退出
  2. 图形化客户端:RedisDesktopManager

    源码地址:https://github.com/uglide/RedisDesktopManager/releases

    源码下载后需要手动编译,也可以点击此处下载对应的编译版本。

    注:

    • 通过 windows 连接服务器的 redis 服务,需要注意对应的防火墙、安全组等是否开放。

      # 添加防火墙指定端口
      firewall-cmd --add-port=6379/tcp --permanent
      firewall-cmd --reload # 重载生效

Redis 命令

数据结构

Redis 是一个以 key-value 形式存储数据的数据库,key 一般是 String 类型,但 value 类型则存在多种:

基本数据类型:

  • String - 存储字符串,例如:hello world

  • Hash - 存储较为复杂的数据结构,例如:{name: "Chinmoku", age: 20}

  • List - 存储数组列表,例如:[A, B, C, C]

  • Set - 存储元素不可重复的数组,例如:{A, B, C}

  • SortedSet - 存储有序的,且元素不可重复的数组,例如:{A: 1, B: 2, C: 3}

特殊类型:

  • GEO
  • BitMap
  • HyperLog

通用命令

Redis 命令参考:https://redis.io/commands

或者通过 help [commands] 即可查看相关命令用法。

Redis 通用命令示例:

  1. KEYS pattern 查询以字母 a 开头的 key。

    由于 redis 是单线程执行,当数据量较大时,会导致业务阻塞,不建议在生产环境下使用。

  2. DEL key [key …] 删除指定的一个或多个 key。

  3. EXISTS key [key …] 判断指定的 key 是否存在。

  4. EXPIRE key seconds 设置 key 的有效期。

  5. TTL key 获取 key 的有效期。

    如不指定,则默认为 -1 表示永久有效。如果 key 不存在,查询其有效期则会返回 -2

String 类型

String 类型是 redis 中最简单的数据类型,但其 value 根据字符串的不同格式,又可以分为三类:

  • string 普通字符串。
  • int 整数类型,可以进行自增、自减等操作。
  • float 浮点类型,可以进行自增、自减等操作。

不论是哪种格式的字符串类型,其底层都是使用字节数组进行存储的,只是采用的编码方式不同而已。字符串类型的最大空间不能超过 512m。

String 类型常见命令:

  1. SET key value
  2. GET key
  3. MSET key value [key value …] 批量插入。
  4. MGET key [key …] 批量获取。
  5. INCR key 设置某个 key 自增。
  6. INCRBY key increment 自增幅度。
  7. INCRBYFLOAT key increment 设置浮点数自增。
  8. SETNX key value 当 key 不存在时,才进行插入。
  9. SETEX key seconds value 设置指定 key 的有效期。

PS. Redis 中 key 的层级格式:

在业务使用过程中,我们定义的 key 要求全局唯一,往往是以对象的 ID 来表示,但当 redis 中存在不同类型的对象时,key 重复的几率会相应地增加,并且难以对不同的 key 进行区分。因此,在 redis 使用过程中,我们通常会对 redis 的 key 格式做出一定的限制,但这种限制并不是强制的,企业或开发者可以自行设计,例如:项目名:业务名:id

此外,这种 key 的设计方式,在部分 redis 客户端中,也可以形成不同的层级显示。

Hash 类型

Hash 类型,也叫散列,其 value 是一个无序字典,类似于 java 中的 HashMap 结构。

Hash 类型常见命令:

  1. HSET key field value [field value …]
  2. HGET key field
  3. HMSET key field value [field value …] 设置多个 Hash 字段。
  4. HMGET key field [field …] 获取多个 Hash 字段。
  5. HGETALL key 获取 key 对应的所有 Hash 键值对。
  6. HKEYS key 获取 key 对应的所有 Hash 键。
  7. HVALS key 获取 key 对应的所有 Hash 值。
  8. HINCRBY key field increment 设置 Hash 中某一字段按步长自增。
  9. HSETNX key field value 当某一 key 对应 Hash 的字段不存在时,才进行赋值。

🎈 为什么 redis 中推荐使用 Hash 而非 String 来存储 JSON 的数据结构?

在 Redis 中使用 String 的数据结构来存储 JSON,其实是将 JSON 字符串序列化之后再进行存储的,当需要获取 JSON 内部的某个字段时,必须获取整个字符串内容,而使用 Hash 结构存储 JSON 数据时,则会将 JSON 中的每个字段独立存储,它更加方便对 JSON 中的单个字段做 CRUD 操作。

List 类型

Redis 中的 List 可以看做是一个双向链表,既可以支持正向检索,也支持反向检索。与 Java 中的 LinkedList 相似,它也具有如下特征:

  • 有序。
  • 可重复。
  • 插入和删除效率高。
  • 查询效率较低。

List 类型常见命令:

  1. LPUSH key element [element …] 向数组左侧追加元素。

使用 LPUSH 后,其放入和取出元素的顺序是相反的,与之对应的这是 RPUSH。

  1. LPOP key [count] 从左侧开始获取并移除数组中的元素。

  2. LRANGE key start stop 获取数组中指定下标范围内的元素。

  3. BLPOP key [key …] timeout 从左侧开始阻塞式地获取并移除数组中的元素。

    当 BLPOP 无法获取到元素时,会等待指定的时间,若等待期间可以获取到元素,则直接取出并返回;如等待时间过期仍未获取到元素,则返回 nil

🎈 如何利用 Redis 的 List 数据结构来模拟栈、队列及阻塞队列?

使用 LPUSH 和 LPOP,或 RPUSH 和 RPOP,都从左侧或都从右侧存入和取出元素,则可以模拟栈的数据结构;相反地,左侧存入右侧取出,或右侧存入左侧取出,则可以模拟队列的数据结构。而对于阻塞队列,则是在队列特征基础上,取出元素时,使用 BLOPO 或 BRPOP 即可。

Set 类型

Redis 中的 Set 结构与 Java 中的 HashSet 相似,它也具备与 HashSet 类似的特征:

  • 无序。
  • 不可重复。
  • 查询效率高。
  • 支持交集、并集、差集等。

Set 类型常见命令:

  1. SADD key member [member …] 向 Set 中插入元素。
  2. SREM key member [member …] 移除 Set 中的元素。
  3. SCARD key 查询 Set 中元素数量。
  4. SISMEMBER key member 判断 Set 中是否存在某元素。
  5. SMEMBERS key 获取 Set 中所有元素。
  6. SINTER key [key …] 交集。
  7. SDIFF key [key …] 差集。
  8. SUNION key [key …] 并集。

SortedSet 类型

Redis 中的 SortedSet 是一个可排序的 Set 集合。SortedSet 中每一个元素都带有一个 score 属性,可以基于 score 属性对元素进行排序,其底层实现是一个跳表(SkipList)加 Hash 表。SortedSet 具备如下特征:

  • 可排序。
  • 不重复。
  • 查询效率高。

SortedSet 类型常见命令:

  1. ZADD key
  2. ZREM key member [member …]
  3. ZSCORE key member 获取某元素的 score 值。
  4. ZRANK key member 获取某元素在该集合中的排名。
  5. ZCARD key
  6. ZCOUNT key min max 统计指定 score 范围内的元素个数。
  7. ZINCRBY key increment member 使集合中指定元素按步长自增。
  8. ZRANGE key min max 获取指定排名范围的元素。
  9. ZRANGEBYSCORE key min max 获取指定 score 范围的元素。
  10. ZINTER numkeys key [key …] 交集。
  11. ZDIFF numkeys key [key …] 差集。
  12. ZUNION numkeys key [key …] 并集。

SortedSet 查询排序默认采用升序,如需使用降序,只需在字母 Z 之后添加 REV 即可,如 ZREVRANGE。

Java 客户端

Redis 官方推荐如下三款客户端产品:

  1. Jedis,其特点是学习成本低,基本使用 redis 命令作为 API 方法名。但基于 Jedis 的连接无法保证线程安全,在多线程环境下需要基于连接池来使用。
  2. Lettuce,基于 Netty 实现,支持同步、异步和响应式编程,并且是线程安全的,支持 Redis 哨兵模式、集群模式和管道模式。
  3. Redisson,是一个基于 Redis 实现的分布式、可伸缩的 Java 数据结构集合,包含了诸如 Map、Queue、Lock、Semaphere、AtomicLong 等强大功能。

出于对本文篇幅的考虑,此处仅对 Jedis 做扩展介绍,如对其他客户端产品感兴趣,可自行了解。

PS. 综合各种 Redis 客户端的优缺点,也可以考虑使用 Spring 集成的工具包 Spring Data Redis

Jedis

项目地址:https://github.com/redis/jedis

基本使用方式:

  1. 添加依赖:redis.clients.jedis:4.1.1

  2. 建立连接:

    private Jedis jedis;
    
    @BeforeEach
    void setUp() {
        jedis = new Jedis("127.0.0.1", 6379);// 建立连接
        jedis.auth("123456");// 密码
        jedis.select(0);// 选择库
    }
  3. 测试使用:

    @Test
    void test() {
        String result = jedis.set("name", "Chinmoku");
        System.out.println("result = " + result);
        String name = jedis.get("name");
        System.out.println("name = " + name);
    }

    类似地,其他 redis 相关操作调用的方法名称,也基本与 redis 命令相同。

  4. 释放资源:

    @AfterEach
    void tearDown() {
        if (jedis != null) {
            jedis.close();
        }
    }

Jedis 连接池:

Jedis 本身是线程不安全的,并且频繁地创建和销毁连接会产生极大的性能消耗,因此在使用 Jedis 的过程中,不推荐使用 Jedis 直连,而建议实现连接池。

public class JedisConnectionFactory {
    private static final JedisPool jedisPoll;
    
    static {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(8);// 最大连接数
        jedisPoolConfig.setMaxIdle(8);// 最大空闲连接数
        jedisPoolConfig.setMinIdle(8);// 最小空闲连接数
        jedisPoolConfig.setMaxWaitMillis(200);// 最长等待时间
        jedisPoll = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379, 1000, "123456");
    }
    
    public static Jedis getJedis() {
        return jedisPool.getResource();
    }
}

SpringDataRedis

SpringData 是 Sring 中数据操作的模块,包含对各种数据库的集成,其中对 Redis 的集成模块即为 SpringDataRedis:

  • 提供了对不同 Redis 客户端的整合(Lettuce 和 Jedis)。
  • 提供了 RedisTemplate 统一 API 来操作 Redis。
  • 支持 Redis 的发布订阅模式。
  • 支持 Redis 的哨兵模式和集群模式。
  • 支持基于 Lettuce 的响应式编程。
  • 支持基于 JDK、JSON、字符串、Spring 对象的数据序列号及反序列化。
  • 支持基于 Redis 的 JDKCollection 实现。

SpringDataRedis 官方文档:https://spring.io/projects/spring-data-redis

SpringDataRedis 提供了 RedisTemplate 工具类,其中封装了各种对 Redis 的操作,并且将不同数据类型的操作 API 封装到了不同的类型中:

API 返回值类型 说明
redisTemplate.opsForValue() valueOperations 操作String类型数据
redisTemplate.opsForHash() HashOperations 操作Hash类型数据
redisTemplate.opsForList() ListOperations 操作List类型数据
redisTemplate.opsForSet() SetOperations 操作Set类型数据
redisTemplate.opsForZSet() ZSetOperations 操作SortedSet类型数据
redisTemplate 通用命令

基本使用方式:

  1. 添加依赖:

    • Redis 依赖:org.springframework.boot.spring-boot-starter-data-redis:2.6.2
    • 连接池依赖:org.apache.commons.commons-pool2:2.11.1
  2. 配置文件:

    spring:
      redis:
        host: 127.0.0.1
        port: 6379
        password: 123456
        # 默认使用lettuce,如需使用jedis,需引入对应依赖
        lettuce:
          pool:
            max-active: 8
            max-idle: 8
            min-idle: 0
            max-wait: 100 # 连接等待时间
            timeout: 600 # 连接超时时间
  3. 注入 RedisTemplate 并使用:

    @Autowired
    private RedisTemplate redisTemplate;
    
    @Test
    void test() {
        redisTemplate.opsForValue().set("name", "chinmoku");// 插入
        Object name = redisTemplate.opsForValue().get("name");// 读取
        System.out.println("name = " + name);
        // 存储对象
        redisTemplate.opsForValue().set("user:1", new User("chinmoku"));
        User user = (User) redisTemplate.opsForValue().get("user:1");
        System.out.println("user = " + user);
    }

SpringDataRedis 序列化方式:

SpringDataRedis 可以接收任意 Object 作为值写入 Redis,但在写入之前都会将其序列化为字节形式,并且默认采用 JdkSerializationRedisSerializer,这会导致可读性差和不必要的内存占用的问题。因此,我们可以对 SpringDataRedis 进行个性化实现:

/**
 * SpringDataRedis 自定义序列化方式
 */
@Configuation
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        // 设置连接工厂
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 设置序列化工具(注:使用json序列化需要引入jackson-databind依赖)
        GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();

        // key和hashKey采用string序列化
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        // value和hashValue采用json序列化
        redisTemplate.setValueSerializer(jsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jsonRedisSerializer);
        return redisTemplate;
    }
}

如上示例,当存储 Java 对象时,我们可以使用 JSON 序列化器,自动实现对象的序列化和反序列化,但这种方式会在 Redis 中额外存储一个 @class 字段,用于指定序列化和反序列化对象所属的类,这也会在一定程度上造成不必要的空间浪费。

为了节省空间,在编码过程中,我们通常推荐在存储数据之前,手动进行对象的序列化处理,然后再通过 String 序列化方式进行存储;取值时,再通过程序判断进行反序列化。为此,对于 String 类型的数据存储,SpringDataRedis 提供了一个 StringRedisTemplate 类,它默认即采用 String 序列化方式:

@Autowired
private StringRedisTemplate stringRedisTemplate;
// JSON工具(也可以使用其他序列化工具,如fastjson等)
private static final ObjectMapper mapper = new ObjectMapper();

@Test
void testStringTemplate() throws JsonProcessingException {
    User user = new User("chinmoku");
    // 手动序列化
    String json = mapper.writeValueAsString(user);
    redisTemplate.opsForValue().set("user:1", json);
    
    String value = redisTemplate.opsForValue().get("user:1");
    User u = mapper.readVal(value, User.class);// 反序列化
    System.out.println("user = " + u);
}

RedisTemplate 操作 Hash:

@Test
void testHash() {
    stringRedisTemplate.opsForHash().put("user:2", "name", "zhangsan");
    stringRedisTemplate.opsForHash().put("user:2", "age", "21");
    Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries("user:2");
    System.out.println("entries = " + entries);
}

缓存问题及解决方案

缓存更新策略

所谓缓存,就是数据交换的缓冲区,是存储数据的地方,一般读写性能较高。

操作缓存和数据库时,需要考虑如下三个问题:

  1. 更新数据库时,对于缓存数据,是应当执行更新操作还是删除操作?

    • 更新缓存:每次更新数据库都更新缓存,容易产生较多的无效写操作。
    • 删除缓存:更新数据库时使缓存失效,查询数据库时再更新缓存(推荐)。
  2. 如何保证缓存和数据库操作同时成功或同时失败?

    • 单系统环境下,需要将缓存和数据库操作放在同一个事务中。
    • 分布式系统环境下,可以利用 TCC 等分布式事务方案。
  3. 执行数据操作时,先操作缓存还是数据库?

    redis-cache

    如上图所示,不论是先操作缓存还是先操作数据库,在并发环境下,都有可能出现数据不一致问题。但通常而言,缓存操作效率远高于数据库操作效率,因此先操作数据库,再删除缓存出现数据不一致的概率较低,但也仅限于概率较低而已,仍存在数据不一致的风险。

扩展:

基于数据库与缓存之间的数据不一致问题,目前有如下几种较为常见的解决方案:

  1. 延时双删

    所谓的延时双删,可以通过如下伪代码进行表示:

    public void update(String key, Object data) {
        redis.delKey(key);
        db.update(data);
        Thread.sleep(500);
        redis.delKey(key);
    }

    这里的延时,主要是为了确保在进行第二次删除之前,其他交叉线程已经完成数据库查询和更新缓存操作,如下图所示:

    redis-cache-sleep

    在延时双删过程中,如果存在其他线程对缓存进行更新,那么该更新缓存的线程所得数据仍为旧数据,但新的线程访问时,能保证得到的数据是新的数据。

  2. 异步更新缓存

    TODO

    值得注意的是,在保证 Redis 性能的前提下,数据库与缓存之间的数据其实是无法保证绝对一致的,我们通常所说的缓存一致性,往往需要将实际业务场景纳入考虑范围,进行综合考虑。

缓存穿透

缓存穿透是指客户端请求的数据在缓存和数据库中均不存在,这样就使得缓存永远无法生效,导致请求始终都会直接访问数据库,从而影响服务性能。

针对缓存穿透,有如下两种常见解决方案:

  1. 缓存空对象:即当数据库中不存在该数据时,则向缓存中存入空对象,同时设置较短的过期时间。

    优点:实现简单,方便维护。

    缺点:

    • 占用不必要的内存。
    • 可能造成短期数据不一致(针对这一缺点,可以在更新数据时同步更新到缓存)。
  2. 布隆过滤

    相关介绍可查阅布隆过滤器_百度百科

    优点:内存占用少,没有多余的 key。

    缺点:

    • 实现复杂。
    • 存在误判的可能。

除此之外,在业务上我们也应当做相应的控制,例如增加 ID 复杂度、做好数据的基础格式校验、加强用户权限校验、做好热点数据限流等。

缓存雪崩

缓存雪崩是指在同一时段内的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求直接到达数据库,给数据库带来巨大压力。

针对缓存雪崩,有如下几种常见解决方案:

  1. 给不同 key 的 TTL 添加随机值,尽量分散缓存的过期时间。
  2. 利用 Redis 集群提高服务的可用性(以解决 Redis 服务宕机问题)。
  3. 给缓存服务添加降级限流策略,做好服务容错。
  4. 给业务添加多级缓存。

缓存击穿

缓存击穿也被称为热点 key 问题,就是一个被高并发访问并且缓存重建业务较为复杂的 key 突然失效,大量请求瞬间访问到数据库,导致数据库承受巨大压力。

针对缓存击穿,有如下几种常见解决方案:

  1. 互斥锁

    重建数据时,进行加锁,获取到锁的线程才进行数据重建,其他线程进行一定的等待和重试。这种方案由于需要线程等待,性能相对较差。

  2. 逻辑过期

    不采用 Redis 提供的 TTL,而是自定义一个逻辑过期字段用于记录过期时间,当程序通过该字段判断数据过期时,则使用互斥锁开启一个新的线程,用该线程来执行更新缓存数据操作,更新完数据后,释放锁。其他未获取到互斥锁时,直接返回缓存中的旧数据。

代码实现

为了更加方便开发过程中,对解决缓存穿透和缓存击穿等问题整理思路,此处引用一份黑马教学编写的示例代码,点击此处可以查看源代码位置。

@Slf4j
@Component
public class CacheClient {

    private final StringRedisTemplate stringRedisTemplate;

    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    public CacheClient(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public void set(String key, Object value, Long time, TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
    }

    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
        // 设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        // 写入Redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }

    public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isNotBlank(json)) {
            // 3.存在,直接返回
            return JSONUtil.toBean(json, type);
        }
        // 判断命中的是否是空值
        if (json != null) {
            // 返回一个错误信息
            return null;
        }

        // 4.不存在,根据id查询数据库
        R r = dbFallback.apply(id);
        // 5.不存在,返回错误
        if (r == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回错误信息
            return null;
        }
        // 6.存在,写入redis
        this.set(key, r, time, unit);
        return r;
    }

    public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isBlank(json)) {
            // 3.存在,直接返回
            return null;
        }
        // 4.命中,需要先把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 5.判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())) {
            // 5.1.未过期,直接返回店铺信息
            return r;
        }
        // 5.2.已过期,需要缓存重建
        // 6.缓存重建
        // 6.1.获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        // 6.2.判断是否获取锁成功
        if (isLock){
            // 6.3.成功,开启独立线程,实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    // 查询数据库
                    R newR = dbFallback.apply(id);
                    // 重建缓存
                    this.setWithLogicalExpire(key, newR, time, unit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    // 释放锁
                    unlock(lockKey);
                }
            });
        }
        // 6.4.返回过期的商铺信息
        return r;
    }

    public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            // 3.存在,直接返回
            return JSONUtil.toBean(shopJson, type);
        }
        // 判断命中的是否是空值
        if (shopJson != null) {
            // 返回一个错误信息
            return null;
        }

        // 4.实现缓存重建
        // 4.1.获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        R r = null;
        try {
            boolean isLock = tryLock(lockKey);
            // 4.2.判断是否获取成功
            if (!isLock) {
                // 4.3.获取锁失败,休眠并重试
                Thread.sleep(50);
                return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
            }
            // 4.4.获取锁成功,根据id查询数据库
            r = dbFallback.apply(id);
            // 5.不存在,返回错误
            if (r == null) {
                // 将空值写入redis
                stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
                // 返回错误信息
                return null;
            }
            // 6.存在,写入redis
            this.set(key, r, time, unit);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            // 7.释放锁
            unlock(lockKey);
        }
        // 8.返回
        return r;
    }

    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }
}

Redis 分布式锁

所谓分布式锁,就是满足分布式系统或集群模式下多进程可见并互斥的锁。

分布式锁的核心是实现多线程之间的互斥,其实现方式常见有如下三种:

MySQL Redis Zookeeper
互斥 利用mysql本身的互斥锁机制 利用setnx这样的互斥命令 利用节点唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用超时机制,到期释放 临时节点,断开连接自动释放

基于 Redis 实现分布式锁

基本实现思路:

  1. 版本一:

    # 添加锁,利用SETNX的互斥性
    SETNX lock thread-1
    # 添加锁过期时间,避免服务宕机导致死锁
    EXPIRE lock 10
    # 释放锁
    DEL lock
  2. 版本二:

    在版本一中,由于命令逐条执行,考虑到在添加锁和设置锁过期时间之间仍有可能出现服务宕机的情况,因此我们还需要保证添加锁和设置锁过期时间这两个操作同时执行,具有原子性,因此需要对该方案进行升级:

    # 添加互斥锁并设置过期时间(原子性)
    SET lock thread-1 EX 10 NX
    # 释放锁
    DEL lock
  3. 版本三:

    基于版本二,基本上能够解决大部分并发情况,但在部分极端情况下,仍旧存在线程安全问题。主要表现在当获取到互斥锁的线程(Thread-A)执行时间超过 Redis TTL 过期时间,就会导致锁提前失效,在当前线程(Thread-A)未释放锁的情况下,此时进入新的线程(Thread-B)仍旧可以获取锁并执行业务逻辑,并且,在新的线程(Thread-B)执行过程中,先前获取到锁的线程(Thread-A)执行完业务逻辑后,就会释放线程(Thread-B)持有的锁。这样就会导致大量锁持有异常,从而引发业务灾难。

    出现这样的问题,根本上是由于释放了非当前线程持有的锁导致的,因此,要解决这个问题,只需要在线程释放锁时,判断其释放的锁是否是当前线程持有即可。

    /**
     * 注:此段代码只用于方案说明,无法直接使用。
     */
    String lockKeyName = "lock:user:192838";
    String threadId = UUID.randomUUID().toString(true) + "-" + Thread.currentThread().getId();
    // 获取锁
    Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKeyName, threadId, 200, TimeUtil.SECONDS);
    
    // 释放锁
    String id = stringRedisTemplate.opsForValue().get(lockKeyName);
    // 校验锁拥有者
    if (threadId.equals(id)) {
        // 释放锁
        stringRedisTemplate.delete(lockKeyName);
    }
  4. 最终版:

    可以说即便是基于版本三,仍旧有出现线程安全问题的可能。考虑这样一种情况:当程序在释放锁时,先通过校验锁是否为当前线程持有,校验通过后,如果此时线程(Thread-A)突然发生阻塞(可能是由 JVM 垃圾回收导致),并且导致锁超时自动释放,此时在该线程阻塞的状态下,其他线程(Thread-B)就仍可以获取到锁,而当线程(Thread-A)阻塞通过后,由于它已经通过锁持有者校验,因此它将会进行释放锁,但此时它锁释放的锁,其实际上并非这个线程(Thread-A)所持有,从而再次引起版本三种提到的问题。

    出现这样的问题,根本上是因为校验锁持有者的操作与释放锁操作之间存在时间空隙,导致锁被提前释放,因此,要解决这个问题,就需要保证校验锁持有者操作和释放锁操作同时执行,具有原子性。

    这里推荐使用 Lua 脚本,来保证一个原子性的操作,Lua 脚本基本语法,可参考 Lua 教程 | 菜鸟教程

    Lua 脚本示例:

    # 无参脚本
    EVAL "return redis.call('set', 'name', 'chinmoku')" 0
    # 带参脚本
    EVAL "return redis.call('set', KEYS[1], ARGV[1])" 2 name chinmoku

    基于 Lua 脚本,我们可以通过如下方式来解决版本三中遗留的问题:

    • 编写释放锁的 Lua 脚本:

      -- FileName: unlock.lua
      -- 校验锁拥有者
      if (redis.call('get', KEYS[1]) == ARGV[1]) then
          -- 释放锁 del key
          return redis.call('del', KEYS[1])
      end
      return 0
    • 在业务中执行 Lua 脚本:

      String lockKeyName = "lock:user:192838";
      String threadId = UUID.randomUUID().toString(true) + "-" + Thread.currentThread().getId();
      
      private static DefaultRedisScript<Long> UNLOCK_SCRIPT;
      static {
          UNLOCK_SCRIPT = new DefaultRedisScript<>();
          UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
          UNLOCK_SCRIPT.setResultType(Long.class);
      }
      
      public void unlock() {
          stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(lockKeyName), threadId);
      }

Redisson

上述对 Redis 分布式锁进行了一系列方案探索,已经能够解决大部分并发场景下 Redis 分布式锁问题,但这种基于 SETNX 命令实现的分布式锁仍旧存在一些不足,例如:

  • 不可重入:同一个线程无法多次获取同一把锁。
  • 不可重试:线程获取锁只尝试了一次,没有重试机制。
  • 超时释放:锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,仍旧会导致锁释放,因而存在安全隐患。
  • 主从一致性:如果 Redis 提供主从集群,主从节点同步存在延迟时,如果主节点突然宕机,未能完成同步,则会导致数据不一致。

鉴于如上问题,可以推荐一个更加使用的 Redis 解决方案,即 Redisson。

Redisson 是一个在 Redis 基础上实现扩展库,它不仅提供一系列分布式的 Java 常用对象,还提供了许多分布式服务,包括各种分布式锁的实现。

官网地址:https://redisson.org

Github 首页:https://github.com/redisson/redisson

基本使用:

  1. 引入依赖:org.redisson.redisson:3.13.6

  2. 客户端配置:

    @Configuration
    public class RedisConfig {
        @Bean
        public RedissonClient redissonClient() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");
          return Redisson.create(config);  
        }
    }
  3. 测试使用:

    @Resource
    private RedissonClient redissonClient;
    
    @Test
    void test() throws InterruptedException {
        RLock lock = redissonClient.getLock("lock:order:123764");
        boolean isLock = lock.tryLock();
        if (isLock) {
            try {
                System.out.println("---> TODO 执行业务逻辑");
            } finally {
                lock.unlock();
            }
        }
    }

Redisson 分布式锁原理

redisson-lock

Redisson 如何处理分布式锁问题:

  1. 可重入问题:利用 Hash 结构记录线程 ID 和重入次数。

  2. 可重试问题:利用信号量和 PubSub 功能实现等待、唤醒、获取锁失败的重试机制。

  3. 超时续约:利用 watchDog,每隔一段时间(releaseTime / 3)重置锁的超时时间。

  4. 主从一致性问题:

    Redisson 使用联锁(multiLock)机制,同时存在多个 Redis 主节点,并且获取锁时,需要保证所有主节点内该锁都已被释放。

    @Resource
    private RedissonClient redissonClient1;
    @Resource
    private RedissonClient redissonClient2;
    @Resource
    private RedissonClient redissonClient3;
    
    private RLock lock;
    
    @BeforeEach
    void setUp() {
        RLock lock1 = redissonClient1.getLock("order");
        RLock lock2 = redissonClient2.getLock("order");
        RLock lock3 = redissonClient3.getLock("order");
        lock = redissonClient1.getMultiLock(lock1, lock2, lock3);
    }

    其本质上是对 MultiLock 内所有的锁进行遍历。

Redis 消息队列

通常,对于并发和性能要求都较高的系统或业务,往往需要将流程分片化。例如典型的秒杀项目,出于性能考虑,我们通常会将较为复杂的、耗时较长但无需即时返回结果的请求存储到消息队列中,开启异步线程进行处理,这样既不会到影响业务,同时也提高了系统的性能。

目前比较流行的消息队列产品包括:RabbitMQ、ActiveMQ、RocketMQ、kafka 等。

此外,Redis 也提供了三种不同的方式来实现消息队列:

  1. List 结构:基于 List 结构模拟消息队列。
  2. PubSub:基本的点对点消息模型。
  3. Stream:比较完善的消息队列模型(推荐)。

基于 List 模拟消息队列

消息队列,简单理解即时存放消息的队列,而 Redis 的 List 数据结构是一个双向链表,因此我们可以很容易地通过 LPUSH 结合 RPOP 或 RPUSH 结合 LPOP 来模拟出队列的效果。

但值得注意的是,当 Redis List 中没有消息时,RPOP 或 LPOP 会返回 nil,而不是像 JVM 的阻塞队列那样阻塞并等待消息,因此,在取出消息时应当使用 BRPOP 或 BLPOP 来实现阻塞效果。

相比 JVM 阻塞队列来说,Redis 实现的阻塞队列具有如下优点:

  1. 不受 JVM 内存大小限制。
  2. 基于 Redis 持久化,可以保证数据安全。
  3. 可以满足消息的有序性。

缺点:

  1. 无法避免消息丢失(当取出消息但未及时处理时,服务出现异常,会导致消息丢失)。
  2. 只支持单消费者,无法实现多消费者同时消费。

基于 PubSub 的消息队列

PubSub 是 Redis 2.0 版本引入的消息传递模型,消费者可以订阅一个或多个 channel,生产者想对应的 channel 发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel] 订阅一个或多个频道。
  • PUBLISH channel msg 向一个频道发送消息。
  • PSUBSCRIBE pattern [pattern] 订阅与 pattern 格式匹配的所有频道。

PubSub 有点:

  1. 采用发布订阅模型,支持多生产、多消费。

缺点:

  1. 不支持数据持久化。
  2. 无法避免消息丢失。
  3. 消息堆积有上限,超出时消息会丢失。

基于 Stream 的消息队列

Stream 是 Redis 5.0 引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。

Stream 消息队列相关命令:

  1. XADD key *|ID field value [field value …] 发送消息。

    # 添加消息
    XADD user:1001 * name chinmoku
  2. XREAD STREAMS key [key …] ID [ID …]

    # 从0开始读取消息
    XREAD COUNT 1 STREAMS user:1001 0
    # 读取最新消息
    XREAD COUNT 1 STREAMS user:1001 $
    # 阻塞读取消息
    XREAD COUNT 1 BLOCK 1000 STREAMS user:1001 $

实现消息队列示例:

while(true) {
    // 尝试读取队列中的消息,最多阻塞2s
    Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS user:1001 $");
    if (msg == null) {
        continue;
    }
    // 处理消息
    handleMessage(msg);
}

STREAM 类型消息队列的 XREAD 命令特点:

  • 消息可回溯。
  • 一个消息可以被多个消费者读取。
  • 可以阻塞读取。
  • 有消息漏读的风险。

消费者组(Consumer Group)

由于单消费者具有消息漏读的风险,因此引入了消费者组概念,所谓消费者组,就是将多个消费者划分到一个组中,监听同一个队列,它具备如下特点:

  1. 消息分流:

    队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。

  2. 消息标示:

    消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,仍旧会从标示之后读取消息,保证每一条消息都会被消费。

  3. 消息确认:

    消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list 中,当处理完成后需要通过 XACK 来确认消息,标记消息为已处理,这时才会从 pending-list 中移除。

Stream Consumer Group 相关操作命令:

  1. XGROUP CREATE key groupName ID [MKSTREAM] 创建消费者组。
  2. XGROUP DESTORY key groupName 删除消费者组。
  3. XGROUP CREATECONSUMER key groupName consumername 给指定消费者组添加消费者。
  4. XGROUP DELCONSUMER key groupName consumername 删除消费者组中指定消费者。
  5. XREADGROUP GROUP group consumer STREAMS key [key …] ID [ID …] 从消费者组读取消息。
  6. XACK key group ID [ID …] 确认消息(读取消息后需要进行确认)。
  7. XPENDING key group [start end count] 获取未确认的消息。

伪代码实现消费者组监听逻辑:

white(true) {
    // 尝试监听队列,使用阻塞模式,最长等待2000ms
    Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 200 STREAMS S1 >");
    if (msg == null) {// 没有消息,则继续下一次
        continue;
    }
    try {
        // 处理消息,完成后并做ACK
        handleMessage(msg);
    } catch(Exception e) {
        while(true) {
            // 从 pending-list 中读取消息
            Object msg = redis.call("XREADGROUP GROUP g1 s1 COUNT 1 STREAMS s1 0");
            if (msg == null) {
                break;
            }
            try {
                // 存在异常消息,再次处理
                handleMessage(msg);
            } catch(Exception e) {
                // 再次出现异常,继续循环
                continue;
            }
        }
    }
}

STREAM 类型消息队列的 XREADGROUP 命令特点:

  • 消息可回溯。
  • 可以多消费者争抢消息,加快消费速度。
  • 可以阻塞读取。
  • 没有消息漏读风险。
  • 有消息确认机制,保证消息至少被消费一次。

Redis 消息队列对比:

List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受内存空间限制,可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

使用 Redis 作为消息队列,推荐使用 Stream,但 Redis 毕竟并非主要用于消息队列,因此在对消息队列性能要求较高的项目中,还是推荐使用专门的消息队列产品。

参考

参考内容来源于网络,本文不保证参考链接的长期有效性,以及参考内容的原创性。

版权声明

本文链接:https://www.chinmoku.cc/java/database/redis-tutorial/

本博客中的所有内容,包括但不限于文字、图片、音频、视频、图表和其他可视化材料,均受版权法保护。未经本博客所有者书面授权许可,禁止在任何媒体、网站、社交平台或其他渠道上复制、传播、修改、发布、展示或以任何其他方式使用此博客中的任何内容。

Press ESC to close