banner

背景

分布式锁在业务场景中的应用十分广泛,当你的服务部署在分布式的多节点集群中,在执行某一个方法时,需要控制并发的情况,保证同一时间只有一个请求可以执行,你一定需要分布式锁来实现。

最常见的分布式锁,一般就是Redis和ZK(亦或者MySQL的for update也可以充当),当然也有很多其他的实现,本篇的主角,就是Redis,我们来逐步使用实现一个简单易用的Redis分布式锁。

RedisLock

首先,我们先来回忆一下Java的Lock是怎么玩的:

ReentrantLock lock = new ReentrantLock();
try {
    lock.lock();
    // do something...
} finally {
    lock.unlock();
}

很简单,很优雅,那么我们就仿照Java的API来实现我们的Redis Lock。

最直观的实现方式,就是使用Redis String的setnx方法,如果key存在就放弃,如果key不存在就set一个值,并指定过期时间,释放锁时,就把key删除,非常easy。

Version1:

@Component
public class RedisLockUtil {
  	@Autowired
    private JedisClient jedisClient;

    public boolean lock(String key, long lockTimeout) {
        key = key + ".lock";
        UUID uuid = UUID.randomUUID();
        String res = jedisClient.set(key, uuid.toString(), "nx", "px", lockTimeout);
        if (Objects.equals("OK", res)) {
            return true;
        }
        return false;
    }

    public boolean unlock(String key) {
        key = key + ".lock";
        Long res = jedisClient.del(key);
        return Objects.equals(res, 1L);
    }
}  

Version1的实现非常简单,也可以满足基本的需求,但是有一个问题,任何线程,只要知道lockkey的名称,都可以调用unlock方法,这显然不是我们所希望的,所以我们在此基础上,进行一点优化,只有加锁的人,才可以释放锁,因此,我们需要保存一些额外的信息。

Version2:

我们新建一个Bean,存储RedisLock的相关信息,改造后我们的实现如下:

public class RedisLock {
    private String key;
    private final UUID uuid;
    private long lockTimeout;
    private long startLockTimeMillis;
    private long getLockTimeMillis;

    public RedisLock(String key, UUID uuid, long lockTimeout, long startLockTimeMillis, long getLockTimeMillis) {
        this.key = key;
        this.uuid = uuid;
        this.lockTimeout = lockTimeout;
        this.startLockTimeMillis = startLockTimeMillis;
        this.getLockTimeMillis = getLockTimeMillis;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public UUID getUuid() {
        return uuid;
    }

    public long getLockTimeout() {
        return lockTimeout;
    }

    public void setLockTimeout(long lockTimeout) {
        this.lockTimeout = lockTimeout;
    }

    public long getGetLockTimeMillis() {
        return getLockTimeMillis;
    }

    public void setGetLockTimeMillis(long getLockTimeMillis) {
        this.getLockTimeMillis = getLockTimeMillis;
    }

    public long getStartLockTimeMillis() {
        return startLockTimeMillis;
    }

    public void setStartLockTimeMillis(long startLockTimeMillis) {
        this.startLockTimeMillis = startLockTimeMillis;
    }
}
@Component
public class RedisLockUtil {

    @Autowired
    private JedisClient jedisClient;

    // 释放锁的Lua脚本
    private static final String LUA_SCRIPT_UNLOCK =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "    redis.call('del', KEYS[1]); " +
                    "    return 'suc' " +
                    "else " +
                    "    return 'fail' " +
                    "end";

    public RedisLock lock(String key, long lockTimeout) {
        key = key + ".lock";
        UUID uuid = UUID.randomUUID();
        String res = jedisClient.set(key, uuid.toString(), "nx", "px", lockTimeout);
        if (Objects.equals("OK", res)) {
            return new RedisLock(key, uuid, lockTimeout, System.currentTimeMillis());
        }
        return null;
    }

    public boolean unlock(RedisLock lock) {
        if (lock == null) {
            return false;
        }

        try {
            String lockValue = lock.getUuid().toString();
            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
                    Collections.singletonList(lock.getKey()),
                    Collections.singletonList(lockValue)
            );
            pipeline.sync();
            return response != null && Objects.equals(response.get(), "suc");
        } catch (Exception e) {
            return false;
        }
    }
}

Well,改造后的方法,好像比Version1复杂了一些,别着急,其实并不复杂,为了保证原子性,删除lockkey部分我们采用了Lua脚本的方式实现,只有当lockkey的value校验一致时,才可以对key进行删除,这样就解决了Version1中出现的问题。

Version2基本可以运行的很好了,但是还有一个问题,我们知道Java的synchronized和ReentrantLock的实现,都是带有自旋抢锁功能的,当第一次没有获取到锁后,会尝试自旋等待,再次尝试抢锁,而我们的实现,好像并不具备这个功能,这怎么可以呢?那我们也要加入自旋。

Version3:

在RedisLock Bean中加入一些额外的信息:

public class RedisLock {
    private String key;
    private final UUID uuid;
    private long lockTimeout;
    private long startLockTimeMillis;
    private long getLockTimeMillis;
    private int tryCount;

    public RedisLock(String key, UUID uuid, long lockTimeout, long startLockTimeMillis, long getLockTimeMillis, int tryCount) {
        this.key = key;
        this.uuid = uuid;
        this.lockTimeout = lockTimeout;
        this.startLockTimeMillis = startLockTimeMillis;
        this.getLockTimeMillis = getLockTimeMillis;
        this.tryCount = tryCount;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public UUID getUuid() {
        return uuid;
    }

    public long getLockTimeout() {
        return lockTimeout;
    }

    public void setLockTimeout(long lockTimeout) {
        this.lockTimeout = lockTimeout;
    }

    public long getGetLockTimeMillis() {
        return getLockTimeMillis;
    }

    public void setGetLockTimeMillis(long getLockTimeMillis) {
        this.getLockTimeMillis = getLockTimeMillis;
    }

    public long getStartLockTimeMillis() {
        return startLockTimeMillis;
    }

    public void setStartLockTimeMillis(long startLockTimeMillis) {
        this.startLockTimeMillis = startLockTimeMillis;
    }

    public int getTryCount() {
        return tryCount;
    }

    public void setTryCount(int tryCount) {
        this.tryCount = tryCount;
    }
}  
@Component
public class RedisLockUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisDemo.class);

    @Autowired
    private JedisClient jedisClient;

    //拿锁的EVAL函数
    private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";

    // 释放锁的Lua脚本
    private static final String LUA_SCRIPT_UNLOCK =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "    redis.call('del', KEYS[1]); " +
                    "    return 'suc' " +
                    "else " +
                    "    return 'fail' " +
                    "end";

    public RedisLock lock(String key, long lockTimeout, long tryLockTimeout) {
        long timestamp = System.currentTimeMillis();
        try {
            key = key + ".lock";
            UUID uuid = UUID.randomUUID();
            String lockValue = uuid.toString();
            int tryCount = 0;

            while (tryLockTimeout == 0 || (System.currentTimeMillis() - timestamp) < tryLockTimeout) {
                JedisClusterPipeline pipeline = jedisClient.getPipelined();
                Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
                        Collections.singletonList(key),
                        Arrays.asList(lockValue, String.valueOf(lockTimeout))
                );
                pipeline.sync();
                tryCount++;
                if (result != null && "OK".equals(result.get())) {
                    return new RedisLock(key, uuid, lockTimeout, timestamp, System.currentTimeMillis(), tryCount);
                } else {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Fail to get lock key: {}", key, e);
        }
        return null;
    }

    public boolean unlock(RedisLock lock) {
        if (lock == null) {
            return false;
        }

        try {
            String lockValue = lock.getUuid().toString();
            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
                    Collections.singletonList(lock.getKey()),
                    Collections.singletonList(lockValue)
            );
            pipeline.sync();
            return response != null && Objects.equals(response.get(), "suc");
        } catch (Exception e) {
          	logger.error("Fail to unlock key: {}", key, e);
            return false;
        }
    }
}

Well,Version3版本可以算是一个里程碑的版本,我们在获取锁的方法中,加入了自旋抢锁的功能,可以指定抢锁的超时时间,在此之前,当前线程会一直自旋尝试获取锁,在这个版本中,我们将加锁的方式,也换成了LUA脚本,由于老旧版本的Redis还不支持setnx指定过期时间,为了兼容性考虑,我们这里采用LUA的方式。

Version3已经是一个非常可用的版本了,但是如果我们有更高的要求,还是有一点问题的,就是线程安全问题,假如线程A获取锁成功,并拿到了RedisLock对象,而线程B拿着该对象,调用解锁的方法,是可以解锁成功的,虽然这是一种极端场景,但是也确实有可能发生,为了避免这个问题,我们需要考虑线程一致性的问题,即加锁和解锁的线程,必须是同一线程。

Version4:

在RedisLock Bean中加入一些额外的信息:

public class RedisLock {
    private String key;
    private final UUID uuid;
    private final long threadId;  // 新增线程ID字段
  	.....省略其他代码.....
      
    
@Component
public class RedisLockUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisDemo.class);

    @Autowired
    private JedisClient jedisClient;

    //拿锁的EVAL函数
    private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";

    // 释放锁的Lua脚本
    private static final String LUA_SCRIPT_UNLOCK =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "    redis.call('del', KEYS[1]); " +
                    "    return 'suc' " +
                    "else " +
                    "    return 'fail' " +
                    "end";

    public RedisLock lock(String key, long lockTimeout, long tryLockTimeout) {
        long timestamp = System.currentTimeMillis();
        try {
            key = key + ".lock";
            UUID uuid = UUID.randomUUID();
            // 获取当前线程ID
            String lockValue = uuid + ":" + Thread.currentThread().getId();
            int tryCount = 0;

            while (tryLockTimeout == 0 || (System.currentTimeMillis() - timestamp) < tryLockTimeout) {
                JedisClusterPipeline pipeline = jedisClient.getPipelined();
                Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
                        Collections.singletonList(key),
                        Arrays.asList(lockValue, String.valueOf(lockTimeout))  // 使用包含线程ID的lockValue
                );
                pipeline.sync();
                tryCount++;
                if (result != null && "OK".equals(result.get())) {
                    return new RedisLock(key, uuid, Thread.currentThread().getId(), lockTimeout, timestamp, System.currentTimeMillis(), tryCount);
                } else {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Fail to get lock key: {}", key, e);
        }
        return null;
    }

    public boolean unlock(RedisLock lock) {
        if (lock == null) {
            return false;
        }

        // 验证当前线程是否为加锁线程
        if (lock.getThreadId() != Thread.currentThread().getId()) {
            logger.warn("Current thread[{}] is not the lock owner[{}], will not unlock",
                    Thread.currentThread().getId(), lock.getThreadId());
            return false;
        }

        try {
            String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
                    Collections.singletonList(lock.getKey()),
                    Collections.singletonList(lockValue)  // 使用包含线程ID的lockValue
            );
            pipeline.sync();
            return response != null && Objects.equals(response.get(), "suc");
        } catch (Exception e) {
            logger.error("Failed to release lock: {}", lock.getKey(), e);
            return false;
        }
    }
}

在Version4版本中,我们中RedisLock中加入了ThreadId字段,表示当前获取锁的线程ID,在加锁和解锁时,分别加入当前线程的ID标识,保证解锁的线程必须和加锁的线程一致。

至此,我们基本上完成了Redis分布式锁的实现,最后的Version4已经是一个生产环境可用的版本,足以应对绝对大部分的业务场景使用。

Think More

But,我们是追求完美主义的程序员(Doge),Version4已经足够可用,但是再想想,可否继续优化呢?

设想一个场景,加入线程A获取到锁资源,将锁的过期时间设定为3秒,然后线程A就去执行业务逻辑代码了,但是业务代码中,存在一个调用外部API的方法,这个方法出了点意外,耗时超过了3秒,此时锁已经过期,但是业务逻辑并没有执行完毕,此时等待抢锁的线程B,正好就拿到了锁,执行相同的业务代码,这显然不是我们所期望的,那么该如何解决?

如果使用过Redisson的童鞋想必已经猜到了,我们需要一个锁(Lock)的看门狗(Watch Dog),来检查锁是否需要进行续期操作。

Version5:

@Component
public class RedisLockUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);

    @Autowired
    private JedisClient jedisClient;

    // 锁的过期时间,单位毫秒
    private static final long DEFAULT_LOCK_TIMEOUT = 3000;

    // 争抢锁的超时时间,单位毫秒,0代表永不超时(一直抢到锁为止)
    private static final long DEFAULT_TRY_LOCK_TIMEOUT = 0;

    //拿锁的EVAL函数
    private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";

    // 释放锁的Lua脚本
    private static final String LUA_SCRIPT_UNLOCK =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "    redis.call('del', KEYS[1]); " +
                    "    return 'suc' " +
                    "else " +
                    "    return 'fail' " +
                    "end";

    // 添加续期相关的常量
    private static final String LUA_SCRIPT_RENEW =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "    redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "    return 'suc' " +
                    "else " +
                    "    return 'fail' " +
                    "end";

    // 续期时间点(锁过期时间的2/3处开始续期)
    private static final double RENEW_FACTOR = 2.0/3;

    public RedisLock lock(String key) {
        return lock(key, DEFAULT_LOCK_TIMEOUT, DEFAULT_TRY_LOCK_TIMEOUT);
    }

    public <T> T lockAndExecute(String key, Supplier<T> supplier) {
        return lockAndExecute(key, DEFAULT_LOCK_TIMEOUT, DEFAULT_TRY_LOCK_TIMEOUT, supplier);
    }

    public void lockAndExecute(String key, Runnable runnable) {
        lockAndExecute(key, DEFAULT_LOCK_TIMEOUT, DEFAULT_TRY_LOCK_TIMEOUT, runnable);
    }

    /**
     * 获取分布式锁
     *
     * @param key   redis key名称
     * @param lockTimeout   锁时长
     * @param tryLockTimeout   尝试获取锁等待时间
     * @return
     */
    public RedisLock lock(String key, long lockTimeout, long tryLockTimeout) {
        long timestamp = System.currentTimeMillis();
        try {
            key = key + ".lock";
            UUID uuid = UUID.randomUUID();
            String lockValue = uuid + ":" + Thread.currentThread().getId();
            int tryCount = 0;

            while (tryLockTimeout == 0 || (System.currentTimeMillis() - timestamp) < tryLockTimeout) {
                JedisClusterPipeline pipeline = jedisClient.getPipelined();
                Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
                        Collections.singletonList(key),
                        Arrays.asList(lockValue, String.valueOf(lockTimeout))
                );
                pipeline.sync();
                tryCount++;
                if (result != null && "OK".equals(result.get())) {
                    RedisLock lock = new RedisLock(key, uuid, Thread.currentThread().getId(), 
                            lockTimeout, timestamp, System.currentTimeMillis(), tryCount);
                    // 启动续期守护线程
                    startRenewThread(lock);
                    return lock;
                } else {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Fail to get lock key: {}", key, e);
        }
        return null;
    }

    /**
     * 启动续期守护线程
     */
    private void startRenewThread(RedisLock lock) {
        Thread renewThread = new Thread(() -> {
            long renewInterval = (long) (lock.getLockTimeout() * RENEW_FACTOR);
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(renewInterval);
                    boolean renewed = renewLock(lock);
                    if (!renewed) {
                        break;
                    }
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        renewThread.setDaemon(true);
        renewThread.start();
        lock.setRenewThread(renewThread);  // 需要在RedisLock类中添加renewThread字段
    }

    /**
     * 续期锁
     */
    private boolean renewLock(RedisLock lock) {
        try {
            String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> response = pipeline.eval(LUA_SCRIPT_RENEW,
                    Collections.singletonList(lock.getKey()),
                    Arrays.asList(lockValue, String.valueOf(lock.getLockTimeout()))
            );
            pipeline.sync();
            boolean success = response != null && Objects.equals(response.get(), "suc");
            if (success) {
                logger.debug("Successfully renewed lock: {}", lock.getKey());
            } else {
                logger.warn("Failed to renew lock: {}", lock.getKey());
            }
            return success;
        } catch (Exception e) {
            logger.error("Error while renewing lock: {}", lock.getKey(), e);
            return false;
        }
    }

    /**
     * 加锁执行业务逻辑,执行完成后自动释放锁
     *
     * @param key      锁的key
     * @param supplier 需要执行的业务逻辑
     * @param <T>      返回值类型
     * @return 业务逻辑的执行结果
     */
    public <T> T lockAndExecute(String key, long lockTimeout, long tryLockTimeout, Supplier<T> supplier) {
        RedisLock lock = null;
        try {
            lock = lock(key, lockTimeout, tryLockTimeout);
            if (Objects.nonNull(lock)) {
                return supplier.get();
            }
            throw new RuntimeException("获取锁失败: " + key);
        } finally {
            if (Objects.nonNull(lock)) {
                unlock(lock);
            }
        }
    }

    /**
     * 加锁执行无返回值的业务逻辑,执行完成后自动释放锁
     *
     * @param key      锁的key
     * @param runnable 需要执行的业务逻辑
     */
    public void lockAndExecute(String key, long lockTimeout, long tryLockTimeout, Runnable runnable) {
        RedisLock lock = null;
        try {
            lock = lock(key, lockTimeout, tryLockTimeout);
            if (Objects.nonNull(lock)) {
                runnable.run();
                return;
            }
            throw new RuntimeException("获取锁失败: " + key);
        } finally {
            if (Objects.nonNull(lock)) {
                unlock(lock);
            }
        }
    }
    public boolean unlock(RedisLock lock) {
        if (lock == null) {
            return false;
        }
        
        // 验证当前线程是否为加锁线程
        if (lock.getThreadId() != Thread.currentThread().getId()) {
            logger.warn("Current thread[{}] is not the lock owner[{}], will not unlock", 
                Thread.currentThread().getId(), lock.getThreadId());
            return false;
        }
        
        try {
            // 停止续期线程
            if (lock.getRenewThread() != null) {
                lock.getRenewThread().interrupt();
            }

            String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
                    Collections.singletonList(lock.getKey()),
                    Collections.singletonList(lockValue)  // 使用包含线程ID的lockValue
            );
            pipeline.sync();
            return response != null && Objects.equals(response.get(), "suc");
        } catch (Exception e) {
            logger.error("Failed to release lock: {}", lock.getKey(), e);
            return false;
        }
    }
}

在这个版本中,我们加入了看门狗的锁续期逻辑,在锁的过期时间达到我们指定的阈值时,会触发锁续期的线程,执行锁续期逻辑,同时我们还加入了一些门面方法,将RedisLockUtil更加易用,就此,我们基本完成了一个非常优秀的分布式锁实现。

Something Else…

关于RedisLock的完整实现基本完成,如果了解过Java的ReentrantLock的同学应该知道,concurrent package中的锁的实现,基本都是基于AQS队列实现的,如果您对AQS不太了解,可以点击这里:

聊聊并发:(七)concurrent包之AbstractQueuedSynchronizer分析

那么我们的RedisLock是否也可以与AQS结合呢?这是个有意思的问题,我们来一起尝试一下:

Version6:

public class RedisLock {
    private String key;
    private final UUID uuid;
    private final long threadId;
    private long lockTimeout;
    private long startLockTimeMillis;
    private long getLockTimeMillis;
    private int tryCount;
    private Thread renewThread;
    private final RedisLockUtil.Sync sync;  // 添加sync字段

    public RedisLock(String key, UUID uuid, long threadId, long lockTimeout, long startLockTimeMillis, long getLockTimeMillis, int tryCount, RedisLockUtil.Sync sync) {
        this.key = key;
        this.uuid = uuid;
        this.threadId = threadId;
        this.lockTimeout = lockTimeout;
        this.startLockTimeMillis = startLockTimeMillis;
        this.getLockTimeMillis = getLockTimeMillis;
        this.tryCount = tryCount;
        this.sync = sync;
    }
  .....省略部分代码.....
@Component
public class RedisLockUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);

    @Autowired
    private JedisClient jedisClient;

    //拿锁的EVAL函数
    private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";

    // 释放锁的Lua脚本
    private static final String LUA_SCRIPT_UNLOCK =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "    redis.call('del', KEYS[1]); " +
                    "    return 'suc' " +
                    "else " +
                    "    return 'fail' " +
                    "end";

    // 添加续期相关的常量
    private static final String LUA_SCRIPT_RENEW =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "    redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "    return 'suc' " +
                    "else " +
                    "    return 'fail' " +
                    "end";

    // 续期时间点(锁过期时间的2/3处开始续期)
    private static final double RENEW_FACTOR = 2.0/3;


    /**
     * 基于AQS的同步器实现
     */
    static class Sync extends AbstractQueuedSynchronizer {
        private final Redis1 lockUtil;
        private final String key;
        private final long lockTimeout;
        private RedisLock redisLock;

        public Sync(Redis1 lockUtil, String key, long lockTimeout) {
            this.lockUtil = lockUtil;
            this.key = key;
            this.lockTimeout = lockTimeout;
        }

        @Override
        protected boolean tryAcquire(int acquires) {
            // 尝试获取Redis锁
            redisLock = lockUtil.tryLock(key, lockTimeout, this);
            if (redisLock != null) {
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (redisLock != null) {
                return lockUtil.unlock(redisLock);
            }
            return false;
        }

        public RedisLock getRedisLock() {
            return redisLock;
        }
    }

    /**
     * 尝试获取锁的内部方法,不包含AQS逻辑
     */
    private RedisLock tryLock(String key, long lockTimeout, Sync sync) {
        // 将原来lock方法的实现移到这里
        long timestamp = System.currentTimeMillis();
        try {
            key = key + ".lock";
            UUID uuid = UUID.randomUUID();
            String lockValue = uuid + ":" + Thread.currentThread().getId();

            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
                    Collections.singletonList(key),
                    Arrays.asList(lockValue, String.valueOf(lockTimeout))
            );
            pipeline.sync();

            if (result != null && "OK".equals(result.get())) {
                RedisLock lock = new RedisLock(key, uuid, Thread.currentThread().getId(),
                        lockTimeout, timestamp, System.currentTimeMillis(), 1, sync);
                startRenewThread(lock);
                return lock;
            }
        } catch (Exception e) {
            logger.error("Fail to get lock key: {}", key, e);
        }
        return null;
    }

    /**
     * 获取分布式锁(使用AQS实现线程排队)
     */
    public RedisLock lock(String key, long lockTimeout) {
        Sync sync = new Sync(this, key, lockTimeout);
        sync.acquire(1); // 阻塞直到获取到锁
        return sync.getRedisLock();
    }

    /**
     * 尝试获取分布式锁(非阻塞)
     */
    public RedisLock tryLock(String key, long lockTimeout) {
        Sync sync = new Sync(this, key, lockTimeout);
        return sync.tryAcquire(1) ? sync.getRedisLock() : null;
    }

    /**
     * 带超时的尝试获取分布式锁
     */
    public RedisLock tryLock(String key, long lockTimeout, long tryLockTimeout, TimeUnit unit) throws InterruptedException {
        Sync sync = new Sync(this, key, lockTimeout);
        return sync.tryAcquireNanos(1, unit.toNanos(tryLockTimeout)) ? sync.getRedisLock() : null;
    }


    /**
     * 启动续期守护线程
     */
    private void startRenewThread(RedisLock lock) {
        Thread renewThread = new Thread(() -> {
            long renewInterval = (long) (lock.getLockTimeout() * RENEW_FACTOR);
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(renewInterval);
                    boolean renewed = renewLock(lock);
                    if (!renewed) {
                        break;
                    }
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        renewThread.setDaemon(true);
        renewThread.start();
        lock.setRenewThread(renewThread);  // 需要在RedisLock类中添加renewThread字段
    }

    /**
     * 续期锁
     */
    private boolean renewLock(RedisLock lock) {
        try {
            String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> response = pipeline.eval(LUA_SCRIPT_RENEW,
                    Collections.singletonList(lock.getKey()),
                    Arrays.asList(lockValue, String.valueOf(lock.getLockTimeout()))
            );
            pipeline.sync();
            boolean success = response != null && Objects.equals(response.get(), "suc");
            if (success) {
                logger.debug("Successfully renewed lock: {}", lock.getKey());
            } else {
                logger.warn("Failed to renew lock: {}", lock.getKey());
            }
            return success;
        } catch (Exception e) {
            logger.error("Error while renewing lock: {}", lock.getKey(), e);
            return false;
        }
    }

    public boolean unlock(RedisLock lock) {
        if (lock == null) {
            return false;
        }

        // 验证当前线程是否为加锁线程
        if (lock.getThreadId() != Thread.currentThread().getId()) {
            logger.warn("Current thread[{}] is not the lock owner[{}], will not unlock",
                    Thread.currentThread().getId(), lock.getThreadId());
            return false;
        }

        try {
            // 停止续期线程
            if (lock.getRenewThread() != null) {
                lock.getRenewThread().interrupt();
            }

            String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
            JedisClusterPipeline pipeline = jedisClient.getPipelined();
            Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
                    Collections.singletonList(lock.getKey()),
                    Collections.singletonList(lockValue)  // 使用包含线程ID的lockValue
            );
            pipeline.sync();
            return response != null && Objects.equals(response.get(), "suc");
        } catch (Exception e) {
            logger.error("Failed to release lock: {}", lock.getKey(), e);
            return false;
        }
    }
}

结语

本篇,我们从0-1,从简单到复杂完整了实现了一个Redis分布式锁,实现的细节难免粗糙,如果有问题请您指出,希望本篇可以给您提供一个思路,谢谢。