Redis分布式锁实现指北
Redis分布式锁实现指北
背景
分布式锁在业务场景中的应用十分广泛,当你的服务部署在分布式的多节点集群中,在执行某一个方法时,需要控制并发的情况,保证同一时间只有一个请求可以执行,你一定需要分布式锁来实现。
最常见的分布式锁,一般就是Redis和ZK(亦或者MySQL的for update也可以充当),当然也有很多其他的实现,本篇的主角,就是Redis,我们来逐步使用实现一个简单易用的Redis分布式锁。
RedisLock
首先,我们先来回忆一下Java的Lock是怎么玩的:
ReentrantLock lock = new ReentrantLock();
try {
lock.lock();
// do something...
} finally {
lock.unlock();
}
很简单,很优雅,那么我们就仿照Java的API来实现我们的Redis Lock。
最直观的实现方式,就是使用Redis String的setnx方法,如果key存在就放弃,如果key不存在就set一个值,并指定过期时间,释放锁时,就把key删除,非常easy。
Version1:
@Component
public class RedisLockUtil {
@Autowired
private JedisClient jedisClient;
public boolean lock(String key, long lockTimeout) {
key = key + ".lock";
UUID uuid = UUID.randomUUID();
String res = jedisClient.set(key, uuid.toString(), "nx", "px", lockTimeout);
if (Objects.equals("OK", res)) {
return true;
}
return false;
}
public boolean unlock(String key) {
key = key + ".lock";
Long res = jedisClient.del(key);
return Objects.equals(res, 1L);
}
}
Version1的实现非常简单,也可以满足基本的需求,但是有一个问题,任何线程,只要知道lockkey的名称,都可以调用unlock方法,这显然不是我们所希望的,所以我们在此基础上,进行一点优化,只有加锁的人,才可以释放锁,因此,我们需要保存一些额外的信息。
Version2:
我们新建一个Bean,存储RedisLock的相关信息,改造后我们的实现如下:
public class RedisLock {
private String key;
private final UUID uuid;
private long lockTimeout;
private long startLockTimeMillis;
private long getLockTimeMillis;
public RedisLock(String key, UUID uuid, long lockTimeout, long startLockTimeMillis, long getLockTimeMillis) {
this.key = key;
this.uuid = uuid;
this.lockTimeout = lockTimeout;
this.startLockTimeMillis = startLockTimeMillis;
this.getLockTimeMillis = getLockTimeMillis;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public UUID getUuid() {
return uuid;
}
public long getLockTimeout() {
return lockTimeout;
}
public void setLockTimeout(long lockTimeout) {
this.lockTimeout = lockTimeout;
}
public long getGetLockTimeMillis() {
return getLockTimeMillis;
}
public void setGetLockTimeMillis(long getLockTimeMillis) {
this.getLockTimeMillis = getLockTimeMillis;
}
public long getStartLockTimeMillis() {
return startLockTimeMillis;
}
public void setStartLockTimeMillis(long startLockTimeMillis) {
this.startLockTimeMillis = startLockTimeMillis;
}
}
@Component
public class RedisLockUtil {
@Autowired
private JedisClient jedisClient;
// 释放锁的Lua脚本
private static final String LUA_SCRIPT_UNLOCK =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('del', KEYS[1]); " +
" return 'suc' " +
"else " +
" return 'fail' " +
"end";
public RedisLock lock(String key, long lockTimeout) {
key = key + ".lock";
UUID uuid = UUID.randomUUID();
String res = jedisClient.set(key, uuid.toString(), "nx", "px", lockTimeout);
if (Objects.equals("OK", res)) {
return new RedisLock(key, uuid, lockTimeout, System.currentTimeMillis());
}
return null;
}
public boolean unlock(RedisLock lock) {
if (lock == null) {
return false;
}
try {
String lockValue = lock.getUuid().toString();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
Collections.singletonList(lock.getKey()),
Collections.singletonList(lockValue)
);
pipeline.sync();
return response != null && Objects.equals(response.get(), "suc");
} catch (Exception e) {
return false;
}
}
}
Well,改造后的方法,好像比Version1复杂了一些,别着急,其实并不复杂,为了保证原子性,删除lockkey部分我们采用了Lua脚本的方式实现,只有当lockkey的value校验一致时,才可以对key进行删除,这样就解决了Version1中出现的问题。
Version2基本可以运行的很好了,但是还有一个问题,我们知道Java的synchronized和ReentrantLock的实现,都是带有自旋抢锁功能的,当第一次没有获取到锁后,会尝试自旋等待,再次尝试抢锁,而我们的实现,好像并不具备这个功能,这怎么可以呢?那我们也要加入自旋。
Version3:
在RedisLock Bean中加入一些额外的信息:
public class RedisLock {
private String key;
private final UUID uuid;
private long lockTimeout;
private long startLockTimeMillis;
private long getLockTimeMillis;
private int tryCount;
public RedisLock(String key, UUID uuid, long lockTimeout, long startLockTimeMillis, long getLockTimeMillis, int tryCount) {
this.key = key;
this.uuid = uuid;
this.lockTimeout = lockTimeout;
this.startLockTimeMillis = startLockTimeMillis;
this.getLockTimeMillis = getLockTimeMillis;
this.tryCount = tryCount;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public UUID getUuid() {
return uuid;
}
public long getLockTimeout() {
return lockTimeout;
}
public void setLockTimeout(long lockTimeout) {
this.lockTimeout = lockTimeout;
}
public long getGetLockTimeMillis() {
return getLockTimeMillis;
}
public void setGetLockTimeMillis(long getLockTimeMillis) {
this.getLockTimeMillis = getLockTimeMillis;
}
public long getStartLockTimeMillis() {
return startLockTimeMillis;
}
public void setStartLockTimeMillis(long startLockTimeMillis) {
this.startLockTimeMillis = startLockTimeMillis;
}
public int getTryCount() {
return tryCount;
}
public void setTryCount(int tryCount) {
this.tryCount = tryCount;
}
}
@Component
public class RedisLockUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisDemo.class);
@Autowired
private JedisClient jedisClient;
//拿锁的EVAL函数
private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";
// 释放锁的Lua脚本
private static final String LUA_SCRIPT_UNLOCK =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('del', KEYS[1]); " +
" return 'suc' " +
"else " +
" return 'fail' " +
"end";
public RedisLock lock(String key, long lockTimeout, long tryLockTimeout) {
long timestamp = System.currentTimeMillis();
try {
key = key + ".lock";
UUID uuid = UUID.randomUUID();
String lockValue = uuid.toString();
int tryCount = 0;
while (tryLockTimeout == 0 || (System.currentTimeMillis() - timestamp) < tryLockTimeout) {
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
Collections.singletonList(key),
Arrays.asList(lockValue, String.valueOf(lockTimeout))
);
pipeline.sync();
tryCount++;
if (result != null && "OK".equals(result.get())) {
return new RedisLock(key, uuid, lockTimeout, timestamp, System.currentTimeMillis(), tryCount);
} else {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
logger.error("Fail to get lock key: {}", key, e);
}
return null;
}
public boolean unlock(RedisLock lock) {
if (lock == null) {
return false;
}
try {
String lockValue = lock.getUuid().toString();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
Collections.singletonList(lock.getKey()),
Collections.singletonList(lockValue)
);
pipeline.sync();
return response != null && Objects.equals(response.get(), "suc");
} catch (Exception e) {
logger.error("Fail to unlock key: {}", key, e);
return false;
}
}
}
Well,Version3版本可以算是一个里程碑的版本,我们在获取锁的方法中,加入了自旋抢锁的功能,可以指定抢锁的超时时间,在此之前,当前线程会一直自旋尝试获取锁,在这个版本中,我们将加锁的方式,也换成了LUA脚本,由于老旧版本的Redis还不支持setnx指定过期时间,为了兼容性考虑,我们这里采用LUA的方式。
Version3已经是一个非常可用的版本了,但是如果我们有更高的要求,还是有一点问题的,就是线程安全问题,假如线程A获取锁成功,并拿到了RedisLock对象,而线程B拿着该对象,调用解锁的方法,是可以解锁成功的,虽然这是一种极端场景,但是也确实有可能发生,为了避免这个问题,我们需要考虑线程一致性的问题,即加锁和解锁的线程,必须是同一线程。
Version4:
在RedisLock Bean中加入一些额外的信息:
public class RedisLock {
private String key;
private final UUID uuid;
private final long threadId; // 新增线程ID字段
.....省略其他代码.....
@Component
public class RedisLockUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisDemo.class);
@Autowired
private JedisClient jedisClient;
//拿锁的EVAL函数
private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";
// 释放锁的Lua脚本
private static final String LUA_SCRIPT_UNLOCK =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('del', KEYS[1]); " +
" return 'suc' " +
"else " +
" return 'fail' " +
"end";
public RedisLock lock(String key, long lockTimeout, long tryLockTimeout) {
long timestamp = System.currentTimeMillis();
try {
key = key + ".lock";
UUID uuid = UUID.randomUUID();
// 获取当前线程ID
String lockValue = uuid + ":" + Thread.currentThread().getId();
int tryCount = 0;
while (tryLockTimeout == 0 || (System.currentTimeMillis() - timestamp) < tryLockTimeout) {
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
Collections.singletonList(key),
Arrays.asList(lockValue, String.valueOf(lockTimeout)) // 使用包含线程ID的lockValue
);
pipeline.sync();
tryCount++;
if (result != null && "OK".equals(result.get())) {
return new RedisLock(key, uuid, Thread.currentThread().getId(), lockTimeout, timestamp, System.currentTimeMillis(), tryCount);
} else {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
logger.error("Fail to get lock key: {}", key, e);
}
return null;
}
public boolean unlock(RedisLock lock) {
if (lock == null) {
return false;
}
// 验证当前线程是否为加锁线程
if (lock.getThreadId() != Thread.currentThread().getId()) {
logger.warn("Current thread[{}] is not the lock owner[{}], will not unlock",
Thread.currentThread().getId(), lock.getThreadId());
return false;
}
try {
String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
Collections.singletonList(lock.getKey()),
Collections.singletonList(lockValue) // 使用包含线程ID的lockValue
);
pipeline.sync();
return response != null && Objects.equals(response.get(), "suc");
} catch (Exception e) {
logger.error("Failed to release lock: {}", lock.getKey(), e);
return false;
}
}
}
在Version4版本中,我们中RedisLock中加入了ThreadId字段,表示当前获取锁的线程ID,在加锁和解锁时,分别加入当前线程的ID标识,保证解锁的线程必须和加锁的线程一致。
至此,我们基本上完成了Redis分布式锁的实现,最后的Version4已经是一个生产环境可用的版本,足以应对绝对大部分的业务场景使用。
Think More
But,我们是追求完美主义的程序员(Doge),Version4已经足够可用,但是再想想,可否继续优化呢?
设想一个场景,加入线程A获取到锁资源,将锁的过期时间设定为3秒,然后线程A就去执行业务逻辑代码了,但是业务代码中,存在一个调用外部API的方法,这个方法出了点意外,耗时超过了3秒,此时锁已经过期,但是业务逻辑并没有执行完毕,此时等待抢锁的线程B,正好就拿到了锁,执行相同的业务代码,这显然不是我们所期望的,那么该如何解决?
如果使用过Redisson的童鞋想必已经猜到了,我们需要一个锁(Lock)的看门狗(Watch Dog),来检查锁是否需要进行续期操作。
Version5:
@Component
public class RedisLockUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
@Autowired
private JedisClient jedisClient;
// 锁的过期时间,单位毫秒
private static final long DEFAULT_LOCK_TIMEOUT = 3000;
// 争抢锁的超时时间,单位毫秒,0代表永不超时(一直抢到锁为止)
private static final long DEFAULT_TRY_LOCK_TIMEOUT = 0;
//拿锁的EVAL函数
private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";
// 释放锁的Lua脚本
private static final String LUA_SCRIPT_UNLOCK =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('del', KEYS[1]); " +
" return 'suc' " +
"else " +
" return 'fail' " +
"end";
// 添加续期相关的常量
private static final String LUA_SCRIPT_RENEW =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('pexpire', KEYS[1], ARGV[2]); " +
" return 'suc' " +
"else " +
" return 'fail' " +
"end";
// 续期时间点(锁过期时间的2/3处开始续期)
private static final double RENEW_FACTOR = 2.0/3;
public RedisLock lock(String key) {
return lock(key, DEFAULT_LOCK_TIMEOUT, DEFAULT_TRY_LOCK_TIMEOUT);
}
public <T> T lockAndExecute(String key, Supplier<T> supplier) {
return lockAndExecute(key, DEFAULT_LOCK_TIMEOUT, DEFAULT_TRY_LOCK_TIMEOUT, supplier);
}
public void lockAndExecute(String key, Runnable runnable) {
lockAndExecute(key, DEFAULT_LOCK_TIMEOUT, DEFAULT_TRY_LOCK_TIMEOUT, runnable);
}
/**
* 获取分布式锁
*
* @param key redis key名称
* @param lockTimeout 锁时长
* @param tryLockTimeout 尝试获取锁等待时间
* @return
*/
public RedisLock lock(String key, long lockTimeout, long tryLockTimeout) {
long timestamp = System.currentTimeMillis();
try {
key = key + ".lock";
UUID uuid = UUID.randomUUID();
String lockValue = uuid + ":" + Thread.currentThread().getId();
int tryCount = 0;
while (tryLockTimeout == 0 || (System.currentTimeMillis() - timestamp) < tryLockTimeout) {
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
Collections.singletonList(key),
Arrays.asList(lockValue, String.valueOf(lockTimeout))
);
pipeline.sync();
tryCount++;
if (result != null && "OK".equals(result.get())) {
RedisLock lock = new RedisLock(key, uuid, Thread.currentThread().getId(),
lockTimeout, timestamp, System.currentTimeMillis(), tryCount);
// 启动续期守护线程
startRenewThread(lock);
return lock;
} else {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
logger.error("Fail to get lock key: {}", key, e);
}
return null;
}
/**
* 启动续期守护线程
*/
private void startRenewThread(RedisLock lock) {
Thread renewThread = new Thread(() -> {
long renewInterval = (long) (lock.getLockTimeout() * RENEW_FACTOR);
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(renewInterval);
boolean renewed = renewLock(lock);
if (!renewed) {
break;
}
} catch (InterruptedException e) {
break;
}
}
});
renewThread.setDaemon(true);
renewThread.start();
lock.setRenewThread(renewThread); // 需要在RedisLock类中添加renewThread字段
}
/**
* 续期锁
*/
private boolean renewLock(RedisLock lock) {
try {
String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> response = pipeline.eval(LUA_SCRIPT_RENEW,
Collections.singletonList(lock.getKey()),
Arrays.asList(lockValue, String.valueOf(lock.getLockTimeout()))
);
pipeline.sync();
boolean success = response != null && Objects.equals(response.get(), "suc");
if (success) {
logger.debug("Successfully renewed lock: {}", lock.getKey());
} else {
logger.warn("Failed to renew lock: {}", lock.getKey());
}
return success;
} catch (Exception e) {
logger.error("Error while renewing lock: {}", lock.getKey(), e);
return false;
}
}
/**
* 加锁执行业务逻辑,执行完成后自动释放锁
*
* @param key 锁的key
* @param supplier 需要执行的业务逻辑
* @param <T> 返回值类型
* @return 业务逻辑的执行结果
*/
public <T> T lockAndExecute(String key, long lockTimeout, long tryLockTimeout, Supplier<T> supplier) {
RedisLock lock = null;
try {
lock = lock(key, lockTimeout, tryLockTimeout);
if (Objects.nonNull(lock)) {
return supplier.get();
}
throw new RuntimeException("获取锁失败: " + key);
} finally {
if (Objects.nonNull(lock)) {
unlock(lock);
}
}
}
/**
* 加锁执行无返回值的业务逻辑,执行完成后自动释放锁
*
* @param key 锁的key
* @param runnable 需要执行的业务逻辑
*/
public void lockAndExecute(String key, long lockTimeout, long tryLockTimeout, Runnable runnable) {
RedisLock lock = null;
try {
lock = lock(key, lockTimeout, tryLockTimeout);
if (Objects.nonNull(lock)) {
runnable.run();
return;
}
throw new RuntimeException("获取锁失败: " + key);
} finally {
if (Objects.nonNull(lock)) {
unlock(lock);
}
}
}
public boolean unlock(RedisLock lock) {
if (lock == null) {
return false;
}
// 验证当前线程是否为加锁线程
if (lock.getThreadId() != Thread.currentThread().getId()) {
logger.warn("Current thread[{}] is not the lock owner[{}], will not unlock",
Thread.currentThread().getId(), lock.getThreadId());
return false;
}
try {
// 停止续期线程
if (lock.getRenewThread() != null) {
lock.getRenewThread().interrupt();
}
String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
Collections.singletonList(lock.getKey()),
Collections.singletonList(lockValue) // 使用包含线程ID的lockValue
);
pipeline.sync();
return response != null && Objects.equals(response.get(), "suc");
} catch (Exception e) {
logger.error("Failed to release lock: {}", lock.getKey(), e);
return false;
}
}
}
在这个版本中,我们加入了看门狗的锁续期逻辑,在锁的过期时间达到我们指定的阈值时,会触发锁续期的线程,执行锁续期逻辑,同时我们还加入了一些门面方法,将RedisLockUtil更加易用,就此,我们基本完成了一个非常优秀的分布式锁实现。
Something Else…
关于RedisLock的完整实现基本完成,如果了解过Java的ReentrantLock的同学应该知道,concurrent package中的锁的实现,基本都是基于AQS队列实现的,如果您对AQS不太了解,可以点击这里:
聊聊并发:(七)concurrent包之AbstractQueuedSynchronizer分析
那么我们的RedisLock是否也可以与AQS结合呢?这是个有意思的问题,我们来一起尝试一下:
Version6:
public class RedisLock {
private String key;
private final UUID uuid;
private final long threadId;
private long lockTimeout;
private long startLockTimeMillis;
private long getLockTimeMillis;
private int tryCount;
private Thread renewThread;
private final RedisLockUtil.Sync sync; // 添加sync字段
public RedisLock(String key, UUID uuid, long threadId, long lockTimeout, long startLockTimeMillis, long getLockTimeMillis, int tryCount, RedisLockUtil.Sync sync) {
this.key = key;
this.uuid = uuid;
this.threadId = threadId;
this.lockTimeout = lockTimeout;
this.startLockTimeMillis = startLockTimeMillis;
this.getLockTimeMillis = getLockTimeMillis;
this.tryCount = tryCount;
this.sync = sync;
}
.....省略部分代码.....
@Component
public class RedisLockUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
@Autowired
private JedisClient jedisClient;
//拿锁的EVAL函数
private static final String LUA_SCRIPT_LOCK = "return redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) ";
// 释放锁的Lua脚本
private static final String LUA_SCRIPT_UNLOCK =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('del', KEYS[1]); " +
" return 'suc' " +
"else " +
" return 'fail' " +
"end";
// 添加续期相关的常量
private static final String LUA_SCRIPT_RENEW =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('pexpire', KEYS[1], ARGV[2]); " +
" return 'suc' " +
"else " +
" return 'fail' " +
"end";
// 续期时间点(锁过期时间的2/3处开始续期)
private static final double RENEW_FACTOR = 2.0/3;
/**
* 基于AQS的同步器实现
*/
static class Sync extends AbstractQueuedSynchronizer {
private final Redis1 lockUtil;
private final String key;
private final long lockTimeout;
private RedisLock redisLock;
public Sync(Redis1 lockUtil, String key, long lockTimeout) {
this.lockUtil = lockUtil;
this.key = key;
this.lockTimeout = lockTimeout;
}
@Override
protected boolean tryAcquire(int acquires) {
// 尝试获取Redis锁
redisLock = lockUtil.tryLock(key, lockTimeout, this);
if (redisLock != null) {
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (redisLock != null) {
return lockUtil.unlock(redisLock);
}
return false;
}
public RedisLock getRedisLock() {
return redisLock;
}
}
/**
* 尝试获取锁的内部方法,不包含AQS逻辑
*/
private RedisLock tryLock(String key, long lockTimeout, Sync sync) {
// 将原来lock方法的实现移到这里
long timestamp = System.currentTimeMillis();
try {
key = key + ".lock";
UUID uuid = UUID.randomUUID();
String lockValue = uuid + ":" + Thread.currentThread().getId();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> result = pipeline.eval(LUA_SCRIPT_LOCK,
Collections.singletonList(key),
Arrays.asList(lockValue, String.valueOf(lockTimeout))
);
pipeline.sync();
if (result != null && "OK".equals(result.get())) {
RedisLock lock = new RedisLock(key, uuid, Thread.currentThread().getId(),
lockTimeout, timestamp, System.currentTimeMillis(), 1, sync);
startRenewThread(lock);
return lock;
}
} catch (Exception e) {
logger.error("Fail to get lock key: {}", key, e);
}
return null;
}
/**
* 获取分布式锁(使用AQS实现线程排队)
*/
public RedisLock lock(String key, long lockTimeout) {
Sync sync = new Sync(this, key, lockTimeout);
sync.acquire(1); // 阻塞直到获取到锁
return sync.getRedisLock();
}
/**
* 尝试获取分布式锁(非阻塞)
*/
public RedisLock tryLock(String key, long lockTimeout) {
Sync sync = new Sync(this, key, lockTimeout);
return sync.tryAcquire(1) ? sync.getRedisLock() : null;
}
/**
* 带超时的尝试获取分布式锁
*/
public RedisLock tryLock(String key, long lockTimeout, long tryLockTimeout, TimeUnit unit) throws InterruptedException {
Sync sync = new Sync(this, key, lockTimeout);
return sync.tryAcquireNanos(1, unit.toNanos(tryLockTimeout)) ? sync.getRedisLock() : null;
}
/**
* 启动续期守护线程
*/
private void startRenewThread(RedisLock lock) {
Thread renewThread = new Thread(() -> {
long renewInterval = (long) (lock.getLockTimeout() * RENEW_FACTOR);
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(renewInterval);
boolean renewed = renewLock(lock);
if (!renewed) {
break;
}
} catch (InterruptedException e) {
break;
}
}
});
renewThread.setDaemon(true);
renewThread.start();
lock.setRenewThread(renewThread); // 需要在RedisLock类中添加renewThread字段
}
/**
* 续期锁
*/
private boolean renewLock(RedisLock lock) {
try {
String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> response = pipeline.eval(LUA_SCRIPT_RENEW,
Collections.singletonList(lock.getKey()),
Arrays.asList(lockValue, String.valueOf(lock.getLockTimeout()))
);
pipeline.sync();
boolean success = response != null && Objects.equals(response.get(), "suc");
if (success) {
logger.debug("Successfully renewed lock: {}", lock.getKey());
} else {
logger.warn("Failed to renew lock: {}", lock.getKey());
}
return success;
} catch (Exception e) {
logger.error("Error while renewing lock: {}", lock.getKey(), e);
return false;
}
}
public boolean unlock(RedisLock lock) {
if (lock == null) {
return false;
}
// 验证当前线程是否为加锁线程
if (lock.getThreadId() != Thread.currentThread().getId()) {
logger.warn("Current thread[{}] is not the lock owner[{}], will not unlock",
Thread.currentThread().getId(), lock.getThreadId());
return false;
}
try {
// 停止续期线程
if (lock.getRenewThread() != null) {
lock.getRenewThread().interrupt();
}
String lockValue = lock.getUuid().toString() + ":" + lock.getThreadId();
JedisClusterPipeline pipeline = jedisClient.getPipelined();
Response<String> response = pipeline.eval(LUA_SCRIPT_UNLOCK,
Collections.singletonList(lock.getKey()),
Collections.singletonList(lockValue) // 使用包含线程ID的lockValue
);
pipeline.sync();
return response != null && Objects.equals(response.get(), "suc");
} catch (Exception e) {
logger.error("Failed to release lock: {}", lock.getKey(), e);
return false;
}
}
}
结语
本篇,我们从0-1,从简单到复杂完整了实现了一个Redis分布式锁,实现的细节难免粗糙,如果有问题请您指出,希望本篇可以给您提供一个思路,谢谢。