Redis高并发分布式锁详解( 三 )

2.代码配置
@Beanpublic RedissonClient redisson() throws IOException {Config config = Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream());RedissonClient redisson = Redisson.create(config);return redisson;} //或者@Beanpublic Redisson redisson() {// 此为集群模式Config config = new Config();config.useClusterServers().addNodeAddress("redis://127.0.0.1:6379").addNodeAddress("redis://127.0.0.1:6389").addNodeAddress("redis://127.0.0.1:6399").addNodeAddress("redis://127.0.0.1:6369");return (Redisson) Redisson.create(config);}3.业务代码示例
@RequestMapping("/redlock")public String redlock() {RLock lock1 = redisson.getLock("Key1_product_001");RLock lock2 = redisson.getLock("Key2_product_001");RLock lock3 = redisson.getLock("Key3_product_001");/*** 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)*/RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** waitTimeout 尝试获取锁的最大等待时间,超过这个值 , 则认为获取锁失败* leaseTime锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);if (res) {//成功获得锁 , 在这里处理业务}} catch (Exception e) {throw new RuntimeException("lock fail");} finally {//无论如何, 最后都要解锁redLock.unlock();}return "end";}4.分析说明(为什么不推荐用)
1)如果不是集群,为保证高可用 , 要对三个节点都添加了从节点(因为如果没有从节点,线上只要有两个服务宕机了 , 那么这个分布式锁将不再可用)
2)针对三主三从的情况,A线程对redis_1_主 和 redis_2_主 加锁成功,对 redis_3_主 加锁失败 , 则可以获得分布式锁,执行任务 。但是还没同步情况下,redis_1_主宕机,redis_1_从 晋升成功数据丢失,此时B线程来加锁,redis_1_从加锁成功和 redis_3_主 加锁成功,对 redis_2_主 加锁失败,也能获得分布式锁 。【概率不大但还是会存在问题】
3)针对集群如果不搞主从【一旦出现宕机,数据量大,且访问高的话,这里面就存在着缓存雪崩的危机】 , 此外如果集群半数节点宕机,集群会被迫停了,此外如果加锁节点越多 , 加锁效率越低下 。
4)既然原理与zookeeper的差不多而且也损失了高性能的特性,那其实还不如使用zookeeper分布式锁 。
5.原理分析

Redis高并发分布式锁详解

文章插图
6.源码剖析
1)Redisson类#getLock方法
public RLock getLock(String name) {return new RedissonLock(this.connectionManager.getCommandExecutor(), name);}public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();}2)Redisson类#lock方法
public void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}//RedissonLock类#lockInterruptibly方法public void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);}//RedissonLock类#lockInterruptibly方法public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}//先在redis中发布订阅消息,等待用完锁的线程通知RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {//再次尝试获取锁ttl = tryAcquire(leaseTime, unit, threadId);if (ttl == null) {break;}if (ttl >= 0) {//利用 Semaphore 信号量的方式获得许可,但是这种休眠是定时的getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);}}//RedissonLock类#tryAcquire方法//利用future的方式阻塞式等待返回结果private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));}//RedissonObject类#get方法protected <V> V get(RFuture<V> future) {return commandExecutor.get(future);}//RedissonLock类#subscribe方法protected RFuture<RedissonLockEntry> subscribe(long threadId) {return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());}//PublishSubscribe类#subscribe方法public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);final RPromise<E> newPromise = new RedissonPromise<E>() {@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return semaphore.remove(listenerHolder.get());}};Runnable listener = new Runnable() {@Overridepublic void run() {// 1:判断RedisLockEntry 是否存在E entry = entries.get(entryName);if (entry != null) {entry.aquire();semaphore.release();entry.getPromise().addListener(new TransferListener<E>(newPromise));return;}// 2:创建RedisLockEntryE value = https://www.huyubaike.com/biancheng/createEntry(newPromise);value.aquire();E oldValue = entries.putIfAbsent(entryName, value);if (oldValue != null) {oldValue.aquire();semaphore.release();oldValue.getPromise().addListener(new TransferListener

推荐阅读