Redis的分布式锁问题(十)最强分布式锁工具:Redisson

news/2024/7/23 15:02:53 标签: redis, 分布式锁, 联锁, 可重入锁, 可重试

Redisson的引入

我们先来看看之前的基于setnx实现的分布式锁存在的问题:

我们之前实现的分布式锁是基于redissetnx命令的特性的! 

但是,这样子实现起来会有很多弊端!

不可重入

简单的来说就是一旦setnx [key] [value]后,就不能再对这个key做任何操作了(除了删除)

 假设我们在开发中有A和B两个业务,在业务A中,执行了setnx操作,然后在业务A中调用业务B。

然后在业务B中也有setnx的操作(同一个KEY

此时,业务B就会阻塞在这里,等待业务A释放锁

但是,业务A肯定不会释放锁,因为业务A还没有执行完(调B)。故就会发生死锁。 

可重试 

在我们之前业务逻辑中,尝试获取锁,如果获取不到就直接return了,没有“重来”的机会!也无法提供重试的机制!

超时释放

我们之前,分析过分布式锁被误删的问题。这个问题是已经解决了。

但是,仍然会存在隐患!我们这里是用TTL来控制它。业务执行,时间多少,这是一个未知数,TTL要怎么设置?如何处理业务阻塞?

主从一致性

主节点上获取到了锁,但是主节点突然宕机了,就会从从结点中选出一个节点,作为主节点。

但由于,因为之前的那个主节点宕机了。在新选举出来的这个主节点中是无法获取到之前的锁

所以之前的那个锁相当于失效了!

Redisson 

要解决上述问题并不是那么容易的,如果我们自己实现很有可能会出一些问题!所以最好的办法就是使用市面上的一些框架来解决!

什么是Redisson? 

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redisson使用手册 

0. 项目介绍 - 《Redisson 使用手册》 - 书栈网 · BookStackhttps://www.bookstack.cn/read/redisson-wiki-zh/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D.md里面提到了Redisson可以实现大致如下的分布式锁

Redisson快速入门(Demo) 

(1)导依赖 

<!-- redis-redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

(2)配置Redisson客户端

/**
 * 配置 Redisson
 */
@Configuration
public class RedisConfig {
    @Bean
    public RedissonClient redissonClient() {

        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.89.128:6379").setPassword("888888");

        // 创建 RedissonClient 对象
        return Redisson.create(config);
    }
}

(3)使用Redisson分布式锁 

@Test
void testRedisson() throws Exception {
    RLock anyLock = redissonClient.getLock("anyLock");
    boolean isLock = anyLock.tryLock(1, 10, TimeUnit.SECONDS);
    if(isLock) {
        try {
            System.out.println("执行业务");
        } finally {
            anyLock.unlock();
        }
    }
}

测试结果

Redisson实现可重入锁

这里可重入锁的实现 和 Java的 ReentrantLock 类似!

获取锁的时候,先判断是不是同一个对象,是就将 value+1,释放锁的时候就 value-1,当其小于0时就将该key删除!

在Redis中使用 Hash结构 去存储!

Redisson 解决重入问题

获取锁 

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        if (ttlRemaining) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    // 在Lua脚本中起始位是1
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

上述代码中字符串部分就是Lua脚本,Redisson用其实现可重入锁!  

Redisson 获取锁中的Lua脚本源码解析

-- 判断锁是否存在
if (redis.call('exists', KEYS[1]) == 0) then
    -- 不存在,获取锁
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    -- 设置有效期
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 锁已经存在,判断是否是自己?!
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    -- 自增+1
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    -- 重置有效期
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
return redis.call('pttl', KEYS[1]);
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Redisson 释放锁中的Lua脚本源码解析

-- 判断当前锁是否还是被自己持有
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    -- 不是就就直接返回
    return nil;
    end;
-- 是自己,则重入次数 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 判断重入次数是否已经为0
if (counter > 0) then
    -- 大于0,说明不能释放,重置有效期即可
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    -- 等于0,说明可以直接删除
    redis.call('del', KEYS[1]);
    -- 发消息
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
    end;
return nil;

测试代码

我们这边模拟一下锁重入的场景。方法A上锁后调方法B,方法B也获取锁(如果是不可重入,这里就会阻塞!)

/**
 * Redisson的单元测试
 */
@SpringBootTest
@Slf4j
public class RedissonTest {

    @Resource
    private RedissonClient redissonClient;

    private RLock lock;

    @BeforeEach
    void setUp() {
        lock = redissonClient.getLock("order");
    }

    @Test
    void method1() {
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 ... 1");
            return;
        }
        try {
            log.info("获取锁成功 ... 1");
            method2();
            log.info("开始执行业务 ... 1");
        } finally {
            log.warn("准备释放锁 ... 1");
            lock.unlock();
        }
    }

    @Test
    void method2() {
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 ... 2");
            return;
        }
        try {
            log.info("获取锁成功 ... 2");
            log.info("开始执行业务 ... 2");
        } finally {
            log.warn("准备释放锁 ... 2");
            lock.unlock();
        }
    }
}

运行结果 

Redis 中值的情况 

Redisson实现重试、超时续约问题

  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间

Redisson实现主从一致性问题

问题描述

为了Redis的可靠性,我们一般会使用Redis的主从模式。

使用了主从模式,一般会采用读写分离的策略,主节点写,从节点读!

那么,当数据被写入主节点的时候,主节点时需要向从节点去同步数据的!

这个过程一定会有延时,一致性问题也就发生在这里!

假如,在主节点中获取到了锁,在主节点向从节点同步这个锁信息的时候,主节点宕机了!那么从节点就会从中挑选一个作为主节点!

可是,此时之前的锁信息就丢失了!也就发生了锁失效的问题!!!

Redisson的解决方案——MultiLock

之前我们分析了,主从模式是导致锁失效的原因,所以Redisson中就直接将它们视为相同的角色!

此时,我们获取锁的方式就变了,获取锁的时候,我们需要依次向全部节点获取锁,只有都获取成功时才算成功!!!

如果此时也发生了刚刚描述的问题,是不会出现锁失效的问题的!

分析如下

这套方案就是Redisson中的联锁——MultiLock

官网对于MultiLock的描述

分布式锁和同步器 - 《Redisson 官方文档中文翻译》 - 书栈网 · BookStackhttps://www.bookstack.cn/read/redisson-doc-cn/distributed_locks.md#83-multilock

代码实现

配置Redis连接的客户端——3个Client

@Configuration
public class RedisConfig {
    @Bean
    public RedissonClient redissonClient() {

        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.89.128:6379").setPassword("888888");

        // 创建 RedissonClient 对象
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2() {

        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.89.128:6380");

        // 创建 RedissonClient 对象
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3() {

        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.89.128:6381");

        // 创建 RedissonClient 对象
        return Redisson.create(config);
    }
}

联锁的使用

其实和我们之前的代码没有什么差别! 

@SpringBootTest
@Slf4j
public class RedissonTest {

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private RedissonClient redissonClient2;

    @Resource
    private RedissonClient redissonClient3;

    private RLock lock;

    @BeforeEach
    void setUp() {
        RLock lock1 = redissonClient.getLock("order");
        RLock lock2 = redissonClient.getLock("order");
        RLock lock3 = redissonClient.getLock("order");

        // 创建联锁
        lock = redissonClient.getMultiLock(lock1, lock2, lock3);
    }

    @Test
    void method1() throws InterruptedException {
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("获取锁失败 ... 1");
            return;
        }
        try {
            log.info("获取锁成功 ... 1");
            method2();
            log.info("开始执行业务 ... 1");
        } finally {
            log.warn("准备释放锁 ... 1");
            lock.unlock();
        }
    }

    @Test
    void method2() {
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 ... 2");
            return;
        }
        try {
            log.info("获取锁成功 ... 2");
            log.info("开始执行业务 ... 2");
        } finally {
            log.warn("准备释放锁 ... 2");
            lock.unlock();
        }
    }
}

为什么使用redissonClient.getMultiLock(...)就行?

我们可以跟一下代码看看!

@Override
public RLock getMultiLock(RLock... locks) {
    return new RedissonMultiLock(locks);
}

在这里就可以发生,不管是哪一个对象来调,其实都是一样的,这里面其实是在new一个对象RedissonMultiLock() ,所以谁去调getMultiLock()都是一样的!!!

final List<RLock> locks = new ArrayList<>();

public RedissonMultiLock(RLock... locks) {
    if (locks.length == 0) {
        throw new IllegalArgumentException("Lock objects are not defined");
    }
    this.locks.addAll(Arrays.asList(locks));
}

在这里可以发现,这个可变参数被视为集合,然后都添加到数组(集合)里面去了!

所以,按照联锁的原理,在获取锁的时候,也会依次把集合中的每一个锁都获取一次

MultiLock源码分析 !!!

我们这里跟一下tryLock的源码(RedissonMultiLock) 

@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {

    long newLeaseTime = -1;
    if (leaseTime != -1) {
        // waitTime 为 -1,表示不重试
        if (waitTime == -1) {
            newLeaseTime = unit.toMillis(leaseTime);
        } else {
            // 如果重试就会对时间做个扩容(放弃waitTime,使用newLeaseTime!)
            newLeaseTime = unit.toMillis(waitTime)*2;  
        }
    }

    long time = System.currentTimeMillis();
    // 剩余时间
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }

    // 锁等待时间 与 剩余时间是一样的!
    long lockWaitTime = calcLockWaitTime(remainTime);

    int failedLocksLimit = failedLocksLimit();

    // 定义一个获取锁成功的集合,初始化肯定是0
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            lockAcquired = false;
        }

        if (lockAcquired) {
            acquiredLocks.add(lock);
        } else {
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }

        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }


http://www.niftyadmin.cn/n/14332.html

相关文章

搭建springWeb保姆级教程

经过我们对mybatis和spring框架的学习&#xff0c;我们即将要用框架进行前后端数据交互&#xff0c;已经脱离了那种用servlet的方式进行数据传输&#xff0c;今天让我们来搭建最基本的springweb框架&#xff01;&#xff01;&#xff01; 1.创建一个web项目 1. 2. 选择一个we…

写一个flutter程序2

需求 完成一个简单的移动应用程序&#xff0c;功能是&#xff1a;为一个创业公司生成建议的名称。用户可以选择和取消选择的名称、保存&#xff08;收藏&#xff09;喜欢的名称。该代码一次生成十个名称&#xff0c;当用户滚动时&#xff0c;会生成一新批名称。用户可以点击导…

OpenCV入门(C++/Python)-使用OpenCV裁剪图像(四)

裁剪是为了从图像中删除所有不需要的物体或区域。甚至突出显示图像的特定功能。 使用OpenCV裁剪没有特定的功能&#xff0c;NumPy数组切片是工作。读取的每个图像都存储在2D数组中&#xff08;对于每个颜色通道&#xff09;。只需指定要裁剪区域的高度和宽度&#xff08;以像素…

家政小程序开发,改变传统难题

首先来说说家政小程序的优势&#xff1a;过去家政门店是根据四处贴到广告宣传、发传单、亲朋好友推 荐等方法处于被动等候顾客上门服务&#xff0c;拓客高效率较低&#xff0c;而且 服务销售市场还存有混乱市场竞争、供求不平衡、员工行为规范参差不齐等难题&#xff0c;限定了…

【GD32F427开发板试用】IAR flash loader 下载GD32F427流程简要分析

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;andeyqi 很高兴获的社区的GD32F427开发板的评测机会&#xff0c;这几年芯片慌大家都能感受到&#xff0c;项目上经常因为货源紧张不断更换替代…

如何用 7 分钟击破 Serverless 落地难点?

当前&#xff0c;Serverless 覆盖的技术场景正在不断变广。Serverless 已在微服务、在线应用、事件驱动、任务处理等众多场景被验证且广泛应用 。当你想要部署一个网站时&#xff0c;需要自己购买服务器并花费时间去维护&#xff0c;造成资源浪费不说&#xff0c;还要耗费精力。…

通信基础之天线知识梳理---2022/12/05

天线知识梳理基本振子的辐射和天线电参数天线阵常用线天线天线测量声明&#xff1a; 学习资料来源于慕课。 哈工大的天线原理课程 基本振子的辐射和天线电参数 天线电参数主要有&#xff1a; 方向性函数、方向图、方向性系数、辐射电阻、效率、增益、输入阻抗、频带、极化。 天…

【LeetCode】最大连续 1 的个数

485. 最大连续 1 的个数 给定一个二进制数组 nums &#xff0c; 计算其中最大连续 1 的个数。 输入&#xff1a;nums [1,1,0,1,1,1] 输出&#xff1a;3 解释&#xff1a;开头的两位和最后的三位都是连续 1 &#xff0c;所以最大连续 1 的个数是 3. 示例 2: 输入&#xff1a;n…