本网站(662p.com)打包出售,且带程序代码数据,662p.com域名,程序内核采用TP框架开发,需要联系扣扣:2360248666 /wx:lianweikj
精品域名一口价出售:1y1m.com(350元) ,6b7b.com(400元) , 5k5j.com(380元) , yayj.com(1800元), jiongzhun.com(1000元) , niuzen.com(2800元) , zennei.com(5000元)
需要联系扣扣:2360248666 /wx:lianweikj
从Curator实现分布式锁的源码再到羊群效应
manongba · 372浏览 · 发布于2022-01-14 +关注

一、前言

Curator是一款由Java编写的,操作Zookeeper的客户端工具,在其内部封装了分布式锁、选举等高级功能。

今天主要是分析其实现分布式锁的主要原理,有关分布式锁的一些介绍或其他实现,有兴趣的同学可以翻阅以下文章:

我用了上万字,走了一遍Redis实现分布式锁的坎坷之路,从单机到主从再到多实例,原来会发生这么多的问题_阳阳的博客-CSDN博客

Redisson可重入与锁续期源码分析_阳阳的博客-CSDN博客

在使用Curator获取分布式锁时,Curator会在指定的path下创建一个有序的临时节点,如果该节点是最小的,则代表获取锁成功。

接下来,在准备工作中,我们可以观察是否会创建出一个临时节点出来。

二、准备工作

首先我们需要搭建一个zookeeper集群,当然你使用单机也行。

在这篇文章面试官:能给我画个Zookeeper选举的图吗?,介绍了一种使用docker-compose方式快速搭建zk集群的方式。

在pom中引入依赖:

<dependency> 
         <groupId>org.apache.curator</groupId> 
         <artifactId>curator-recipes</artifactId> 
         <version>2.12.0</version> 
     </dependency>

    Curator客户端的配置项:

    /** 
     * @author qcy 
     * @create 2022/01/01 22:59:34 
     */ 
    @Configuration 
    public class CuratorFrameworkConfig { 
    
        //zk各节点地址 
        private static final String CONNECT_STRING = "localhost:2181,localhost:2182,localhost:2183"; 
        //连接超时时间(单位:毫秒) 
        private static final int CONNECTION_TIME_OUT_MS = 10 * 1000; 
        //会话超时时间(单位:毫秒) 
        private static final int SESSION_TIME_OUT_MS = 30 * 1000; 
        //重试的初始等待时间(单位:毫秒) 
        private static final int BASE_SLEEP_TIME_MS = 2 * 1000; 
        //最大重试次数 
        private static final int MAX_RETRIES = 3; 
    
        @Bean 
        public CuratorFramework getCuratorFramework() { 
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() 
                    .connectString(CONNECT_STRING) 
                    .connectionTimeoutMs(CONNECTION_TIME_OUT_MS) 
                    .sessionTimeoutMs(SESSION_TIME_OUT_MS) 
                    .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)) 
                    .build(); 
            curatorFramework.start(); 
            return curatorFramework; 
        } 
         
    }

      SESSION_TIME_OUT_MS参数则会保证,在某个客户端获取到锁之后突然宕机,zk能在该时间内删除当前客户端创建的临时有序节点。

      测试代码如下:

      //临时节点路径,qcy是博主名字缩写哈 
         private static final String LOCK_PATH = "/lockqcy"; 
      
         @Resource 
         CuratorFramework curatorFramework; 
      
         public void testCurator() throws Exception { 
             InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, LOCK_PATH); 
             interProcessMutex.acquire(); 
      
             try { 
                 //模拟业务耗时 
                 Thread.sleep(30 * 1000); 
             } catch (Exception e) { 
                 e.printStackTrace(); 
             } finally { 
                 interProcessMutex.release(); 
             } 
         }

        当使用接口调用该方法时,在Thread.sleep处打上断点,进入到zk容器中观察创建出来的节点。

        使用 docker exec -it zk容器名 /bin/bash 以交互模式进入容器,接着使用 ./bin/zkCli.sh 连接到zk的server端。

        然后使用 ls path 查看节点

        这三个节点都是持久节点,可以使用 get path 查看节点的数据结构信息

        若一个节点的ephemeralOwner值为0,即该节点的临时拥有者的会话id为0,则代表该节点为持久节点。

        当走到断点Thread.sleep时,确实发现在lockqcy下创建出来一个临时节点

        到这里吗,准备工作已经做完了,接下来分析interProcessMutex.acquire与release的流程

        三、源码分析

        Curator支持多种类型的锁,例如

        • InterProcessMutex,可重入锁排它锁

        • InterProcessReadWriteLock,读写锁

        • InterProcessSemaphoreMutex,不可重入排它锁

        今天主要是分析InterProcessMutex的加解锁过程,先看加锁过程

        加锁

        public void acquire() throws Exception { 
              if (!internalLock(-1, null)) { 
                  throw new IOException("Lost connection while trying to acquire lock: " + basePath); 
              } 
          }

          这里是阻塞式获取锁,获取不到锁,就一直进行阻塞。所以对于internalLock方法,超时时间设置为-1,时间单位设置成null。

          private boolean internalLock(long time, TimeUnit unit) throws Exception { 
                 Thread currentThread = Thread.currentThread(); 
                 //通过能否在map中取到该线程的LockData信息,来判断该线程是否已经持有锁 
                 LockData lockData = threadData.get(currentThread); 
                 if (lockData != null) { 
                     //进行可重入,直接返回加锁成功 
                     lockData.lockCount.incrementAndGet(); 
                     return true; 
                 } 
                 //进行加锁 
                 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); 
                 if (lockPath != null) { 
                     //加锁成功,保存到map中 
                     LockData newLockData = new LockData(currentThread, lockPath); 
                     threadData.put(currentThread, newLockData); 
                     return true; 
                 } 
          
                 return false; 
             }

            其中threadData是一个map,key线程对象,value为该线程绑定的锁数据。

            LockData中保存了加锁线程owningThread,重入计数lockCount与加锁路径lockPath,例如

            /lockqcy/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005

              private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 
              
                  private static class LockData { 
                      final Thread owningThread; 
                      final String lockPath; 
                      final AtomicInteger lockCount = new AtomicInteger(1); 
              
                      private LockData(Thread owningThread, String lockPath) { 
                          this.owningThread = owningThread; 
                          this.lockPath = lockPath; 
                      } 
                  }

                进入到internals.attemptLock方法中

                String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { 
                      //开始时间 
                      final long startMillis = System.currentTimeMillis(); 
                      //将超时时间统一转化为毫秒单位 
                      final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; 
                      //节点数据,这里为null 
                      final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; 
                      //重试次数 
                      int retryCount = 0; 
                      //锁路径 
                      String ourPath = null; 
                      //是否获取到锁 
                      boolean hasTheLock = false; 
                      //是否完成 
                      boolean isDone = false; 
                
                      while (!isDone) { 
                          isDone = true; 
                
                          try { 
                              //创建一个临时有序节点,并返回节点路径 
                              //内部调用client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); 
                              ourPath = driver.createsTheLock(client, path, localLockNodeBytes); 
                              //依据返回的节点路径,判断是否抢到了锁 
                              hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); 
                          } catch (KeeperException.NoNodeException e) { 
                              //在会话过期时,可能导致driver找不到临时有序节点,从而抛出NoNodeException 
                              //这里就进行重试 
                              if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { 
                                  isDone = false; 
                              } else { 
                                  throw e; 
                              } 
                          } 
                      } 
                      //获取到锁,则返回节点路径,供调用方记录到map中 
                      if (hasTheLock) { 
                          return ourPath; 
                      } 
                
                      return null; 
                  }

                  接下来,将会在internalLockLoop中利用刚才创建出来的临时有序节点,判断是否获取到了锁。

                  private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { 
                         //是否获取到锁 
                         boolean haveTheLock = false; 
                         boolean doDelete = false; 
                         try { 
                             if (revocable.get() != null) { 
                                 //当前不会进入这里 
                                 client.getData().usingWatcher(revocableWatcher).forPath(ourPath); 
                             } 
                             //一直尝试获取锁 
                             while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) { 
                                 //返回basePath(这里是lockqcy)下所有的临时有序节点,并且按照后缀从小到大排列 
                                 List<String> children = getSortedChildren(); 
                                 //取出当前线程创建出来的临时有序节点的名称,这里就是/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005 
                                 String sequenceNodeName = ourPath.substring(basePath.length() + 1); 
                                 //判断当前节点是否处于排序后的首位,如果处于首位,则代表获取到了锁 
                                 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); 
                                 if (predicateResults.getsTheLock()) { 
                                     //获取到锁之后,则终止循环 
                                     haveTheLock = true; 
                                 } else { 
                                     //这里代表没有获取到锁 
                                     //获取比当前节点索引小的前一个节点 
                                     String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); 
                  
                                     synchronized (this) { 
                                         try { 
                                             //如果前一个节点不存在,则直接抛出NoNodeException,catch中不进行处理,在下一轮中继续获取锁 
                                             //如果前一个节点存在,则给它设置一个监听器,监听它的释放事件 
                                             client.getData().usingWatcher(watcher).forPath(previousSequencePath); 
                                             if (millisToWait != null) { 
                                                 millisToWait -= (System.currentTimeMillis() - startMillis); 
                                                 startMillis = System.currentTimeMillis(); 
                                                 //判断是否超时 
                                                 if (millisToWait <= 0) { 
                                                     //获取锁超时,删除刚才创建的临时有序节点 
                                                     doDelete = true; 
                                                     break; 
                                                 } 
                                                 //没超时的话,在millisToWait内进行等待 
                                                 wait(millisToWait); 
                                             } else { 
                                                 //无限期阻塞等待,监听到前一个节点被删除时,才会触发唤醒操作 
                                                 wait(); 
                                             } 
                                         } catch (KeeperException.NoNodeException e) { 
                                             //如果前一个节点不存在,则直接抛出NoNodeException,catch中不进行处理,在下一轮中继续获取锁 
                                         } 
                                     } 
                                 } 
                             } 
                         } catch (Exception e) { 
                             ThreadUtils.checkInterrupted(e); 
                             doDelete = true; 
                             throw e; 
                         } finally { 
                             if (doDelete) { 
                                 //删除刚才创建出来的临时有序节点 
                                 deleteOurPath(ourPath); 
                             } 
                         } 
                         return haveTheLock; 
                     }

                    判断是否获取到锁的核心逻辑位于getsTheLock中

                    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { 
                         //获取当前节点在所有子节点排序后的索引位置 
                         int ourIndex = children.indexOf(sequenceNodeName); 
                         //判断当前节点是否处于子节点中 
                         validateOurIndex(sequenceNodeName, ourIndex); 
                         //InterProcessMutex的构造方法,会将maxLeases初始化为1 
                         //ourIndex必须为0,才能使得getsTheLock为true,也就是说,当前节点必须是basePath下的最小节点,才能代表获取到了锁 
                         boolean getsTheLock = ourIndex < maxLeases; 
                         //如果获取不到锁,则返回上一个节点的名称,用作对其设置监听 
                         String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); 
                    
                         return new PredicateResults(pathToWatch, getsTheLock); 
                     } 
                    
                     static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException { 
                         if (ourIndex < 0) { 
                             //可能会由于连接丢失导致临时节点被删除,因此这里属于保险措施 
                             throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName); 
                         } 
                     }

                      那什么时候,在internalLockLoop处于wait的线程能被唤醒呢?

                      在internalLockLoop方法中,已经使用

                      client.getData().usingWatcher(watcher).forPath(previousSequencePath);

                        给前一个节点设置了监听器,当该节点被删除时,将会触发watcher中的回调

                                                           java
                                                         

                        相关推荐

                        PHP实现部分字符隐藏

                        沙雕mars · 1312浏览 · 2019-04-28 09:47:56
                        Java中ArrayList和LinkedList区别

                        kenrry1992 · 896浏览 · 2019-05-08 21:14:54
                        Tomcat 下载及安装配置

                        manongba · 957浏览 · 2019-05-13 21:03:56
                        JAVA变量介绍

                        manongba · 953浏览 · 2019-05-13 21:05:52
                        什么是SpringBoot

                        iamitnan · 1077浏览 · 2019-05-14 22:20:36
                        加载中

                        0评论

                        评论
                        分类专栏
                        小鸟云服务器
                        扫码进入手机网页