请选择 进入手机版 | 继续访问电脑版
设为首页 收藏本站
开启辅助访问 切换到宽版

新浪微博登陆

只需一步, 快速开始

QQ登录

只需一步,快速开始

切换风格 立即注册 找回密码

Java教程网

新浪微博达人勋

242

积分

194

帖子

12

贡献

Rank: 3Rank: 3

积分
242

社区QQ达人

发表于 2018-8-3 19:24:42 | 显示全部楼层 |阅读模式
1、Redis分布式锁实现
a、原理
      Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。
  1. public class RedisKeyLock {

  2.     private static Logger logger = Logger.getLogger(RedisKeyLock.class);

  3.     private final static long ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000;

  4.     private final static int EXPIRE_IN_SECOND = 5;//锁失效时间

  5.     private final static long WAIT_INTERVAL_IN_MS = 100;

  6.     private static RedisKeyLock lock;

  7.     private JedisPool jedisPool;

  8.     private RedisKeyLock(JedisPool pool){

  9.         this.jedisPool = pool;

  10.     }

  11.     public static RedisKeyLock getInstance(JedisPool pool){

  12.         if(lock == null){

  13.             lock = new RedisKeyLock(pool);

  14.         }

  15.         return lock;

  16.     }



  17.     public void lock(final String redisKey) {

  18.         Jedis resource = null;

  19.         try {

  20.             long now = System.currentTimeMillis();

  21.             resource = jedisPool.getResource();

  22.             long timeoutAt = now + ACCQUIRE_LOCK_TIMEOUT_IN_MS;

  23.             boolean flag = false;

  24.             while (true) {

  25.                 String expireAt = String.valueOf(now + EXPIRE_IN_SECOND * 1000);

  26.                 long ret = resource.setnx(redisKey, expireAt);

  27.                 if (ret == 1) {//已获取锁

  28.                     flag = true;

  29.                     break;

  30.                 } else {//未获取锁,重试获取锁

  31.                     String oldExpireAt = resource.get(redisKey);

  32.                     if (oldExpireAt != null && Long.parseLong(oldExpireAt) < now) {

  33.                         oldExpireAt = resource.getSet(redisKey, expireAt);

  34.                         if (Long.parseLong(oldExpireAt) < now) {

  35.                             flag = true;

  36.                             break;

  37.                         }

  38.                     }

  39.                 }

  40.                 if (timeoutAt < now) {

  41.                     break;

  42.                 }

  43.               TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);

  44.             }

  45.             if (!flag) {

  46.                 throw new RuntimeException("canot acquire lock now ...");

  47.             }

  48.         } catch (JedisException je) {

  49.             logger.error("lock", je);

  50.             je.printStackTrace();

  51.             if (resource != null) {

  52.                 jedisPool.returnBrokenResource(resource);

  53.             }

  54.         } catch (Exception e) {

  55.             e.printStackTrace();

  56.             logger.error("lock", e);

  57.         } finally {

  58.             if (resource != null) {

  59.                 jedisPool.returnResource(resource);

  60.             }

  61.         }

  62.     }

  63.     public boolean unlock(final String redisKey) {

  64.         Jedis resource = null;

  65.         try {

  66.             resource = jedisPool.getResource();

  67.             resource.del(redisKey);

  68.             return true;

  69.         } catch (JedisException je) {

  70.             je.printStackTrace();

  71.             if (resource != null) {

  72.                 jedisPool.returnBrokenResource(resource);

  73.             }

  74.             return false;

  75.         } catch (Exception e) {

  76.             logger.error("lock", e);

  77.             return false;

  78.         } finally {

  79.             if (resource != null) {

  80.                 jedisPool.returnResource(resource);

  81.             }

  82.         }

  83.     }

  84. }
复制代码



c、代码分析
      lock:通过间隔时间段去请求Redis,来实现阻塞占用,一直到获取锁,或者超时。
     unlock:删除redis中key。
2、Zookeeper分布式锁实现
a、原理
     ZooKeeper核心是一个精简的文件系统,它提供了一些简单的文件操作以及附加的功能 ,它的数据结构原型是一棵znode树(类似Linux的文件系统),并且它们是一些已经被构建好的块,可以用来构建大型的协作数据结构和协议 。
  每个锁都需要一个路径来指定(如:/geffzhang/lock)
1.根据指定的路径, 查找zookeeper集群下的这个节点是否存在.(说明已经有锁了)
2. 如果存在, 根据查询者的一些特征数据(如ip地址/hostname), 当前的锁是不是查询者的
3. 如果不是查询者的锁, 则返回null, 说明创建锁失败
4. 如果是查询者的锁, 则把这个锁返回给查询者
5. 如果这个节点不存在, 说明当前没有锁, 那么创建一个临时节点, 并将查询者的特征信息写入这个节点的数据中, 然后返回这个锁.
据以上5部, 一个分布式的锁就可以创建了.
创建的锁有三种状态:
1. 创建失败(null), 说明该锁被其他查询者使用了.’
2. 创建成功, 但当前没有锁住(unlocked), 可以使用
3. 创建成功, 但当前已经锁住(locked)了, 不能继续加锁.
b、代码样例
  1. public class ZooKeeperLock implements Lock, Watcher {



  2.     private Logger logger = LoggerFactory.getLogger(getClass());



  3.     private static final String SPLITSTR = "_lock_";

  4.     private static final int SESSION_TIMEOUT = 60000;//等锁的毫秒数

  5.     private static final byte[] data = new byte[0];





  6.     private ZooKeeper zk = null;



  7.     private String root = "/locks";//根

  8.     private String lockName;//竞争资源的标志

  9.     private String waitNode;//等待前一个锁

  10.     private String myZnode;//当前锁



  11.     private CountDownLatch latch;//计数器



  12.     /**

  13.      * 创建分布式锁,使用前请确认config配置的zookeeper服务可用

  14.      * @param server 127.0.0.1:2181

  15.      * @param lockName 竞争资源标志,lockName中不能包含单词lock

  16.      */

  17.     public ZooKeeperLock(String server, String lockName){

  18.         this.lockName = lockName;

  19.         // 创建一个与服务器的连接

  20.         try {

  21.             zk = initZk(server);

  22.             Stat stat = zk.exists(root, false);

  23.             if(stat == null){

  24.                 // 创建根节点

  25.                 zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

  26.             }

  27.         } catch (Exception e) {

  28.             throw new LockException(e);

  29.         }

  30.     }



  31.     /**

  32.      * zookeeper节点的监视器

  33.      */

  34.     @Override

  35.     public void process(WatchedEvent event) {

  36.         if(this.latch != null) {

  37.             this.latch.countDown();

  38.         }

  39.     }



  40.     @Override

  41.     public void lock() {

  42.         try {

  43.             if(!tryLock()){

  44.                 boolean locked = waitForLock(waitNode, SESSION_TIMEOUT, TimeUnit.MILLISECONDS);//等待锁

  45.                 if(!locked){

  46.                     logger.error("can not lock...");

  47.                 }

  48.             }

  49.         } catch (Exception e) {

  50.             throw new LockException(e);

  51.         }

  52.     }



  53.     public boolean tryLock() {

  54.         try {

  55.             if(lockName.contains(SPLITSTR)){

  56.                 throw new LockException("lockName can not contains \\u000B");

  57.             }

  58.             //创建临时子节点

  59.             myZnode = zk.create(root + "/" + lockName + SPLITSTR, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);



  60.             //取出所有子节点

  61.             List<String> subNodes = zk.getChildren(root, false);

  62.             //取出所有lockName的锁

  63.             List<String> lockObjNodes = new ArrayList<>();

  64.             for (String node : subNodes) {

  65.                 String _node = node.split(SPLITSTR)[0];

  66.                 if(_node.equals(lockName)){

  67.                     lockObjNodes.add(node);

  68.                 }

  69.             }

  70.             Collections.sort(lockObjNodes);



  71.             if(myZnode.equals(root+"/"+lockObjNodes.get(0))){

  72.                 //如果是最小的节点,则表示取得锁

  73.                 return true;

  74.             }

  75.             //如果不是最小的节点,找到比自己小1的节点

  76.             String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);

  77.             waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);

  78.         } catch (Exception e) {

  79.             throw new LockException(e);

  80.         }

  81.         return false;

  82.     }



  83.     @Override

  84.     public boolean tryLock(long time, TimeUnit unit) {

  85.         try {

  86.             return tryLock() || waitForLock(waitNode, time, unit);

  87.         } catch (Exception e) {

  88.             throw new LockException(e);

  89.         }

  90.     }



  91.     private boolean waitForLock(String lower, long waitTime, TimeUnit unit) throws InterruptedException, KeeperException {

  92.         Stat stat = zk.exists(root + "/" + lower, true);

  93.         //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听

  94.         if(stat != null){

  95.             this.latch = new CountDownLatch(1);

  96.             this.latch.await(waitTime, unit);

  97.             this.latch = null;

  98.         }

  99.         return true;

  100.     }



  101.     @Override

  102.     public void unlock() {

  103.         try {

  104.             zk.delete(myZnode,-1);

  105.             myZnode = null;

  106.         } catch (Exception e) {

  107.             throw new LockException(e);

  108.         }

  109.     }



  110.     private synchronized ZooKeeper initZk(String server) {

  111.         try {

  112.             if(zk == null){

  113.                 zk = new ZooKeeper(server, SESSION_TIMEOUT, this);

  114.             }

  115.         } catch (IOException e) {

  116.             throw new LockException("zk init connect fail" + e.getMessage());

  117.         }

  118.         return zk;

  119.     }



  120.     @Override

  121.     public void lockInterruptibly() throws InterruptedException {

  122.         this.lock();

  123.     }



  124.     @Override

  125.     public Condition newCondition() {

  126.         return null;

  127.     }



  128.     private class LockException extends RuntimeException {

  129.         private static final long serialVersionUID = 1L;

  130.         private LockException(String e){

  131.             super(e);

  132.         }

  133.         private LockException(Exception e){

  134.             super(e);

  135.         }

  136.     }

  137. }
复制代码



c、代码分析
      lock:根据根创建锁节点,然后获取当前已经存在锁的节点,如果第一个节点为自己创建,说明没有锁,不是自己加锁,则给自己创建节点的上一个节点加监听,线程阻塞至上一个节点释放,并通知我,或者等待超时。
      unlock:删除自己创建的节点,zookeeper会自动通知加在节点上的监听。
3、两者区别
       Redis分布式锁,必须使用者自己间隔时间轮询去尝试加锁,当锁被释放后,存在多线程去争抢锁,并且可能每次间隔时间去尝试锁的时候,都不成功,对性能浪费很大。
       Zookeeper分布锁,首先创建加锁标志文件,如果需要等待其他锁,则添加监听后等待通知或者超时,当有锁释放,无须争抢,按照节点顺序,依次通知使用者。

来自群组: java开发组

新浪微博达人勋

6155

积分

1538

帖子

1539

贡献

Rank: 8Rank: 8

积分
6155

社区QQ达人

发表于 2018-8-3 19:25:27 | 显示全部楼层
:lol

新浪微博达人勋

6016

积分

1504

帖子

1504

贡献

Rank: 8Rank: 8

积分
6016
发表于 2018-8-5 02:00:07 | 显示全部楼层
前排,哇咔咔

新浪微博达人勋

2567

积分

643

帖子

641

贡献

Rank: 6Rank: 6

积分
2567
发表于 4 天前 | 显示全部楼层
好,很好,非常好!

新浪微博达人勋

6017

积分

1518

帖子

1499

贡献

Rank: 8Rank: 8

积分
6017
发表于 3 天前 | 显示全部楼层
支持,赞一个
您需要登录后才可以回帖 登录 | 立即注册 新浪微博登陆

本版积分规则

关闭

站长推荐 上一条 /1 下一条

小黑屋|手机版|Archiver|Java教程网    

GMT+8, 2018-8-17 23:47 , Processed in 0.515625 second(s), 39 queries .

Powered by Discuz X3.2

© 2001-2013 JAVA教程网

快速回复 返回顶部 返回列表