云南昆明做网站,苏州网站建设名字,成都建设银行官方网站,正规的徐州网站开发1.基于Redis实现分布式锁#xfeff; Redis分布式锁原理如上图所示#xff0c;当有多个Set命令发送到Redis时#xff0c;Redis会串行处理#xff0c;最终只有一个Set命令执行成功#xff0c;从而只有一个线程加锁成功
2.SetNx命令加锁
利用Redis的setNx命令在Redis数据库…1.基于Redis实现分布式锁 Redis分布式锁原理如上图所示当有多个Set命令发送到Redis时Redis会串行处理最终只有一个Set命令执行成功从而只有一个线程加锁成功
2.SetNx命令加锁
利用Redis的setNx命令在Redis数据库中创建一个KeyValue记录这条命令只有当Redis中没有这个Key的时候才执行成功当已经有这个Key的时候会返回失败。
可以借助于redis中的命令setnx(key, value)key不存在就新增存在就什么都不做。同时有多个客户端发送setnx命令只有一个客户端可以成功返回1true其他的客户端返回0false流程图如下图所示
多个客户端同时尝试获取锁setnx获取成功执行业务逻辑执行完成释放锁del其他客户端等待重试 利用如上的setNx命令便可以简单的实现加锁功能当多个线程去执行这个加锁命令时只有一个线程执行成功然后执行业务逻辑其他线程加锁失败返回或者重试
public void testLock() {// 1. 从redis中获取锁,setnxBoolean lock this.redisTemplate.opsForValue().setIfAbsent(lock, 111);if (lock) {// 查询redis中的num值String value this.redisTemplate.opsForValue().get(num);// 没有该值returnif (StringUtils.isBlank(value)){return ;}// 有值就转成成intint num Integer.parseInt(value);// 把redis中的num值1this.redisTemplate.opsForValue().set(num, String.valueOf(num));// 2. 释放锁 delthis.redisTemplate.delete(lock);} else {// 3. 每隔1秒钟回调一次再次尝试获取锁try {Thread.sleep(1000);testLock();} catch (InterruptedException e) {e.printStackTrace();}}
}
3.优化分布式锁_设置过期时间
设置过期有俩种方式可以选择
通过expire设置过期时间缺乏原子性如果在setnx和expire之间出现异常锁也无法释放在set时指定过期时间推荐 代码实现优化就是在设置锁的时候设置过期时间:
public void testLock() {// 1. 从redis中获取锁,setnxBoolean lock this.redisTemplate.opsForValue().setIfAbsent(lock, 111,3, TimeUnit.MINUTES);if (lock) {//与之前相同代码略过...}
}那么还会不会存在问题呢? 场景如果业务逻辑的执行时间是7s。执行流程如下:
index1业务逻辑没执行完3秒后锁被自动释放。index2获取到锁执行业务逻辑3秒后锁被自动释放。index3获取到锁执行业务逻辑index1业务逻辑执行完成开始调用del释放锁这时释放的是index3的锁导致index3的业务只执行1s就被别人释放。最终等于没锁的情况。
解决setnx获取锁时设置一个指定的唯一值例如uuid释放前获取这个值判断是否自己的锁。
4.错误删除锁问题
上面直接删除key来解锁方式会存在一个问题考虑下面这种情况
1线程1执行业务时间过长导致自己加的锁过期
2这时线程2进来加锁成功
3然后线程1业务逻辑执行完毕开始执行del key命令
4这时就会出现错误删除线程2加的锁
5错误删除线程2的锁后线程3又可以加锁成功导致有两个线程执行业务代码 5.优化分布式锁_防止误删除 public void testLock() {// 1. 从redis中获取锁,setnxString uuid UUID.randomUUID().toString();Boolean lock this.redisTemplate.opsForValue().setIfAbsent(lock, uuid,3, TimeUnit.MINUTES);if (lock) {//与之前相同代码略过...// 2. 释放锁 delif (StringUtils.equals(redisTemplate.opsForValue().get(lock),uuid)){this.redisTemplate.delete(lock);}}
}场景
index1执行删除时查询到的lock值确实和uuid相等index1执行删除前lock刚好过期时间已到被redis自动释放index2获取了lockindex1执行删除此时会把index2的lock删除问题缺乏原子性
上面的setNx命令实现了基本的加锁功能但存在一个致命的问题是当程序在执行业务代码崩溃时无法再执行到下面的解锁指令从而导致出现死锁问题 为了解决死锁问题这里就需要引入过期时间的概念过期时间是给当前这个key设置一定的存活时间当存活时间到期后Redis就会自动删除这个过期的Key从而使得程序在崩溃时也能到期自动释放锁 如上图所示使用Redis的expire命令来为锁设置过期时间从而实现到期自动解锁的功能但这里仍然还存在一个问题就是加锁与给锁设置过期时间这两个操作命令并不是原子命令
考虑下面这种情况
当程序在加锁完成后在设置过期时间前崩溃这时仍然会造成锁无法自动释放从而产生死锁现象。
6.优化分布式锁_LUA脚本保证删除的原子性
首先我们先简单介绍一下lua脚本的基本知识lua脚本是c语言 定义变量 全局变量a 11局部变量local b 22redis不允许lua脚本创建全局变量只能声明局部变量 流程控制:if(exp) then业务逻辑elseif(exp) then业务逻辑else业务逻辑end redis中执行lua脚本: eval script numkeys keys[] args[] eval指令的输出不是lua脚本的打印而是lua脚本的返回值 scriptlua脚本字符串定义动态变量KEYS[1] ARGV[1] numkeyskey数组的元素个数 keyskeys数组 argsargv数组 redis集群执行lua脚本可能会报错如果所有keys不在同一个分片上lua脚本就会报错解决方案是: keys只传一个 可以使用CLUSTER KEYSLOT bb{xx}删除LUA脚本
if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 endpublic void testLock() {// 1. 从redis中获取锁,setnxString uuid UUID.randomUUID().toString();Boolean lock this.redisTemplate.opsForValue().setIfAbsent(lock, uuid, 3, TimeUnit.SECONDS);if (lock) {//与之前相同代码略过...// 2. 释放锁 delString script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end;this.redisTemplate.execute(new DefaultRedisScript(script), Arrays.asList(lock), uuid);} else {// 3. 每隔1秒钟回调一次再次尝试获取锁try {Thread.sleep(1000);testLock();} catch (InterruptedException e) {e.printStackTrace();}}
}7.优化分布式锁_可以重入
上述加锁命令使用了 SETNX 一旦键存在就无法再设置成功这就导致后续同一线程内继续加锁将会加锁失败。当一个线程执行一段代码成功获取锁之后继续执行时又遇到加锁的子任务代码可重入性就保证线程能继续执行而不可重入就是需要等待锁释放之后再次获取锁成功才能继续往下执行。 可重入锁最大特性就是计数计算加锁的次数。所以当可重入锁需要在分布式环境实现时我们也就需要统计加锁次数。我们基于Redis Hash 实现方案 Redis 提供了 Hash 哈希表这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数然后利用 lua 脚本判断逻辑。
加锁
if (redis.call(exists, KEYS[1]) 0 or redis.call(hexists, KEYS[1], ARGV[1]) 1)
thenredis.call(hincrby, KEYS[1], ARGV[1], 1);redis.call(expire, KEYS[1], ARGV[2]);return 1;
elsereturn 0;
end假设值为KEYS:[lock], ARGV[uuid, expire]
如果锁不存在或者这是自己的锁就通过hincrby不存在新增存在就加1获取锁或者锁次数加1。 代码实例如下
private Boolean tryLock(String lockName, String uuid, Long expire){String script if (redis.call(exists, KEYS[1]) 0 or redis.call(hexists, KEYS[1], ARGV[1]) 1) then redis.call(hincrby, KEYS[1], ARGV[1], 1); redis.call(expire, KEYS[1], ARGV[2]); return 1; else return 0; end;if (!this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){try {// 没有获取到锁重试Thread.sleep(200);tryLock(lockName, uuid, expire);} catch (InterruptedException e) {e.printStackTrace();}}// 获取到锁返回truereturn true;
}解锁
lua复制代码-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在在尝试解其他线程的锁解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if (redis.call(hexists, KEYS[1], ARGV[1]) 0) thenreturn nil;
end;
-- 小于等于 0 代表可以解锁
if (redis.call(hincrby, KEYS[1], ARGV[1], -1) 0) thenreturn 0;
elseredis.call(del, KEYS[1]);return 1;
end;这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中三个返回值含义如下
1 代表解锁成功锁被释放0 代表可重入次数被减 1null 代表其他线程尝试解锁解锁失败如果返回值使用 BooleanSpring-data-redis 进行类型转换时将会把 null 转为 false这就会影响我们逻辑判断所以返回类型只好使用 Long。
private void unlock(String lockName, String uuid){String script if (redis.call(hexists, KEYS[1], ARGV[1]) 0) then return nil; end; if (redis.call(hincrby, KEYS[1], ARGV[1], -1) 0) then return 0; else redis.call(del, KEYS[1]); return 1; end;;// 这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中三个返回值含义如下// 1 代表解锁成功锁被释放// 0 代表可重入次数被减 1// null 代表其他线程尝试解锁解锁失败Long result this.redisTemplate.execute(new DefaultRedisScript(script, Long.class), Lists.newArrayList(lockName), uuid);// 如果未返回值代表尝试解其他线程的锁if (result null) {throw new IllegalMonitorStateException(attempt to unlock lock, not locked by lockName: lockName with request: uuid);}
}使用
public void testLock() {// 加锁String uuid UUID.randomUUID().toString();Boolean lock this.tryLock(lock, uuid, 300l);if (lock) {// 读取redis中的num值String numString this.redisTemplate.opsForValue().get(num);if (StringUtils.isBlank(numString)) {return;}// 操作Integer num Integer.parseInt(numString);num;// 放入redisthis.redisTemplate.opsForValue().set(num, String.valueOf(num));// 测试可重入性this.testSubLock(uuid);// 释放锁this.unlock(lock, uuid);}
}
// 测试可重入性
private void testSubLock(String uuid){// 加锁Boolean lock this.tryLock(lock, uuid, 300l);if (lock) {System.out.println(分布式可重入锁。。。);this.unlock(lock, uuid);}
}8.优化分布式锁_自动续期
A线程超时时间设为10s为了解决死锁问题但代码执行时间可能需要30s然后redis服务端10s后将锁删除。 此时B线程恰好申请锁redis服务端不存在该锁可以申请也执行了代码。那么问题来了 A、B线程都同时获取到锁并执行业务逻辑这与分布式锁最基本的性质相违背在任意一个时刻只有一个客户端持有锁即独享排他。 对于上述的这种情况原因是由于设置的过期时间太短或者业务执行时间太长导致锁过期但是为了避免死锁问题又必须设置过期时间那这就需要引入自动续期的功能即在加锁成功时开启一个定时任务自动刷新Redis加锁key的超时时间从而避免上诉情况发生如下图所示 锁延期方法开启子线程执行延期。在加锁成功后可以启动一个定时任务来对锁进行自动续期定时任务的执行逻辑是
1判断Redis中的锁是否是自己的
2如果存在的话就使用expire命令重新设置过期时间
这里由于需要两个Redis的命令所以也需要使用lua脚本来实现原子操作代码如下所示
/*** 锁延期* 线程等待超时时间的2/3时间后,执行锁延时代码,直到业务逻辑执行完毕,因此在此过程中,其他线程无法获取到锁,保证了线程安全性* param lockName* param expire 单位毫秒*/
private void renewTime(String lockName, String uuid, Long expire){String script if(redis.call(hexists, KEYS[1], ARGV[1]) 1) then redis.call(expire, KEYS[1], ARGV[2]); return 1; else return 0; end;new Thread(() - {while (this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Lists.newArrayList(lockName), uuid, expire.toString())){try {// 到达过期时间的2/3时间自动续期Thread.sleep(expire / 3);} catch (InterruptedException e) {e.printStackTrace();}}}).start();
}获取锁成功后调用延期方法给锁 定时延期
private Boolean tryLock(String lockName, String uuid, Long expire){String script if (redis.call(exists, KEYS[1]) 0 or redis.call(hexists, KEYS[1], ARGV[1]) 1) then redis.call(hincrby, KEYS[1], ARGV[1], 1); redis.call(expire, KEYS[1], ARGV[2]); return 1; else return 0; end;if (!this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){try {// 没有获取到锁重试Thread.sleep(200);tryLock(lockName, uuid, expire);} catch (InterruptedException e) {e.printStackTrace();}}// 锁续期this.renewTime(lockName, uuid, expire * 1000);// 获取到锁返回truereturn true;
}9.优化分布式锁_Redlock算法
redis集群状态下的问题
客户端A从master获取到锁在master将锁同步到slave之前master宕掉了。slave节点被晋级为master节点客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。 安全失效 解决集群下锁失效参照redis官方网站针对redlock文档redis.io/topics/dist…
10.本地锁会出现的问题
我们知道java中有synchronized、lock锁、读写锁ReadWriteLock众所周知这些锁都是本地锁。 提到锁就不得不提JUC:java.util.concurrent包,又称concurrent包。jdk1.5提供为多线程高并发编程而提供的包,但此文章的场景是分布式场景后续会出JUC的文章。
简单的介绍一下synchronized及lock锁 synchronized是一个关键字lock是一个接口ReentrantLock是实现了lock接口的一个类 ReentrantLock悲观的独占的互斥的排他的可公平可不公平的可重入锁 synchronized悲观的独占的互斥的排他的非公平的可重入锁
11.准备
redis、ab工具(压测)
11.1不使用任何锁的情况下
我们首先创建一个测试方法testNoLock
GetMapping(/test)
public void testNoLock(){String count (String) this.redisTemplate.opsForValue().get(count);if (count null){//没有值直接返回return;}// 有值就转成成intint number Integer.parseInt(count);// 把redis中的num值1this.redisTemplate.opsForValue().set(count, String.valueOf(number));
}测试之前的查看值为1
GetMapping(/getCount)
public String getCount(){String count String.valueOf(this.redisTemplate.opsForValue().get(count));return count; //1
}接下来使用ab压力测试工具
cmd复制代码// ab -n一次发送的请求数 -c请求的并发数 访问路径
ab -n100 -c50 http://127.0.0.1:8080/test/test再次查询结果为6。 11.2使用本地锁
public synchronized void testNoLock(){String count String.valueOf(this.redisTemplate.opsForValue().get(count));if (null.equals(count)){//没有值直接返回return;}// 有值就转成成intint number Integer.parseInt(count);// 把redis中的num值1this.redisTemplate.opsForValue().set(count, String.valueOf(number));}再次使用ab压力测试工具
ab -n100 -c50 http://127.0.0.1:8080/test/test此次结果为106说明结果是正确的看样子结果是非常完美的但是真的很完美吗 11.3使用集群本地锁
我们只需要在idea中在启动俩个服务修改端口号三个运行实例的名称是相同的并且网关的配置就是通过服务名在负载均衡所以我们只需要访问网关网关就会给我们做负载均衡了。 再次使用ab压力测试工具将count重置为1
cmd复制代码ab -n100 -c50 http://127.0.0.1:8080/test/test此次的结果为58 到此说明了本地锁是有局限性的。
12.可重入锁
对于一个功能完整的锁来说可重入功能是必不可少的特性所谓的锁可重入就是同一个线程第一次加锁成功后在第二次加锁时无需进行排队等待只需要判断是否是自己的锁就行了可以直接再次获取锁来执行业务逻辑如下图所示 实现可重入机制的原理就是在加锁的时候记录加锁次数在释放锁的时候减少加锁次数这个加锁的次数记录可以存在Redis中如下图所示 如上图所示加入可重入功能后加锁的步骤就变为如下步骤
1判断锁是否存在
2判断锁是否是自己的
3增加加锁的次数
由于增加次数以及减少次数是多个操作这里需要再次使用lua脚本来实现同时由于这里需要在Redis中存入加锁的次数所以需要使用到Redis中的Map数据结构Map(key,uuid,lockCount)加锁lua脚本如下
//锁不存在
if (redis.call(exists, key) 0) thenredis.call(hset, key, uuid, 1); redis.call(expire, key, time); return 1;
end;
//锁存在判断是否是自己的锁
if (redis.call(hexists, key, uuid) 1) thenredis.call(hincrby, key, uuid, 1); redis.call(expire, key, uuid);return 1;
end;
//锁不是自己的返回加锁失败
return 0
加入可重入功能后的解锁逻辑就变为
1判断锁是否是自己的
2如果是自己的则减少加锁次数否则返回解锁失败
//判断锁是否是自己的,不是自己的直接返回错误
if (redis.call(hexists, keyuuid) 0) thenreturn 0;
end;
//锁是自己的则对加锁次数-1
local counter redis.call(hincrby, key, uuid, -1);
if (counter 0) then //剩余加锁次数大于0则不能释放锁重新设置过期时间redis.call(expire, key, uuid); return 1;
else
//等于0代表可以释放锁了redis.call(del, key); return 1;
end;
到此在实现基本的加锁与解锁的逻辑上又加入了可重入和自动续期的功能。
13.Zookeeper实现分布式锁
Zookeeper是一个分布式协调服务分布式协调主要是来解决分布式系统中多个应用之间的数据一致性,Zookeeper内部的数据存储方式类似于文件目录形式的存储结构,它的内存结果如下图所示 14.Zookeeper加锁原理
在Zookeeper中的指定路径下创建节点然后客户端根据当前路径下的节点状态来判断是否加锁成功如下图一种情况为例线程1创建节点成功后线程2再去创建节点就会创建失败 15.Zookeeper节点类型
持久节点在Zookeeper中创建后会进行持久储存直到客户端主动删除
临时节点以客户端会话Session维度创建节点一旦客户端会话断开节点就会自动删除
临时/持久顺序节点在同一个路径下创建的节点会对每个节点按创建先后顺序编号 zookeeper.exists(/watchpath,new Watcher() {Overridepublic void process(WatchedEvent event) {System.out.println(进入监听器);System.out.println(监听路径Pathevent.getPath());System.out.println(监听事件类型EventTypeevent.getType()); }
});
16.利用临时顺序节点和监听机制来实现分布式锁
实现分布式锁的方式有多种我们可以使用临时节点和顺序节点这种方案来实现分布式锁
1使用临时节点可以在客户端程序崩溃时自动释放锁避免死锁问题
2使用顺序节点的好处是可以利用锁释放的事件监听机制来实现阻塞监听式的分布式锁
下面将基于这两个特性来实现分布式锁
17.加锁原理
1首先在Zookeeper上创建临时顺序节点Node01、Node02等
2第二步客户端拿到加锁路径下所有创建的节点
3判断自己的序号是否最小如果最小的话代表加锁成功如果不是最小的话就对前一个节点创建监听器
4如果前一个节点删除监听器就会通知客户端来准备重新获取锁
加锁原理和代码入下图所示 //加锁路径
String lockPath;
//用来阻塞线程
CountDownLatch cc new CountDownLatch(1);
//创建锁节点的路径
Sting LOCK_ROOT_PATH /locks//先创建锁
public void createLock(){//lockPath /locks/lock_01 lockPath zkClient.create(LOCK_ROOT_PATH/lock_, CreateMode.EPHEMERAL_SEQUENTIAL);
}//获取锁
public boolean acquireLock(){//获取当前加锁路径下所有的节点allLocks zkClient.getChildren(/locks);//按节点顺序大小排序Collections.sort(allLocks);//判断自己是否是第一个节点int index allLocks.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() 1));//如果是第一个节点则加锁成功if (index 0) {System.out.println(Thread.currentThread().getName() 获得锁成功, lockPath: lockPath);return true;} else {//不是序号最小的节点则监听前一个节点String preLock allLocks.get(index - 1);//创建监听器Stat status zkClient.exists(LOCK_ROOT_PATH / preLockPath, watcher);// 前一个节点不存在了则重新获取锁if (status null) {return acquireLock();} else { //阻塞当前进程直到前一个节点释放锁System.out.println( 等待前一个节点锁释放prelocakPathpreLockPath);//唤醒当前线程继续尝试获取锁cc.await();return acquireLock();}}
}private Watcher watcher new Watcher() {Overridepublic void process(WatchedEvent event) {//监听到前一个节点释放锁唤醒当前线程cc.countDown();}
}
18.可重入锁实现
Zookeeper实现可重入分布式锁的机制是在本地维护一个Map记录因为如果在Zookeeper节点维护数据的话Zookeeper的写操作是很慢集群内部需要进行投票同步数据所以在本地维护一个Map记录来记录当前加锁的次数和加锁状态在释放锁的时候减少加锁的次数原理如下图所示 //利用Map记录线程持有的锁
ConcurrentMapThread, LockData lockMap Maps.newConcurrentMap();
public Boolean lock(){Thread currentThread Thread.currentThread();LockData lockData lockMap.get(currentThread);//LockData不为空则说明已经有锁if (lockData ! null) {//加锁次数加一lockData.lockCount.increment();return true;}//没有锁则尝试获取锁Boolean lockResult acquireLock();//获取到锁if (lockResult) {LockData newLockData new LockData(currentThread,1);lockMap.put(currentThread, newLockData);return true;}//获取锁失败return false;
}
19.解锁原理
解锁的步骤如下
1判断锁是不是自己的
2如果是则减少加锁次数
3如果加锁次数等于0则释放锁删除掉创建的临时节点下一个监听这个节点的客户端会感知到节点删除事件从而重新去获取锁 public Boolean releaseLock(){LockData lockData lockMap.get(currentThread);//没有锁if(lockData null){return false; }//有锁则加锁次数减一lockCount lockData.lockCount.decrement();if(lockCount 0){return true;} //加锁次数为0try{//删除节点zkClient.delete(lockPath);//断开连接zkClient.close();finally{//删除加锁记录lockMap.remove(currentThread);}return true;
}
20.Redis和Zookeeper锁对比 Redis Zookeeper 读性能 基于内存 基于内存 加锁性能 直接写内存加锁 Master节点创建好后与其他Follower节点进行同步,半数成功后才能返回写入成功 数据一致性 AP架构Redis集群之间的数据同步是存在一定的延迟的当主节点宕机后数据如果还没有同步到从节点上就会导致分布式锁失效会造成数据的不一致 CP架构当Leader节点宕机后会进行集群重新选举如果此时只有一部分节点收到了数据的话会在集群内进行数据同步保证集群数据的一致性
21.总结
使用Redis还是Zookeeper来实现分布式锁最终还是要基于业务来决定可以参考以下两种情况
1如果业务并发量很大Redis分布式锁高效的读写性能更能支持高并发。
2如果业务要求锁的强一致性那么使用Zookeeper可能是更好的选择。
3在做技术选型的时候也应该酌情考虑团队成员技能及现有资源情况如果部署有Redsi集群克优先考虑使用Redis。
性能角度redis zk mysql安全角度zk redis mysql难易程度zk redis mysql