个人随笔
目录
七、zookeeper使用场景之:分布式锁
2020-12-09 09:24:53

一、锁的的基本概念

开发中锁的概念并不陌生,通过锁可以实现在多个线程或多个进程间在争抢资源时,能够合理的分配置资源的所有权。在单体应用中我们可以通过 synchronized 或ReentrantLock 来实现锁。但在分布式系统中,仅仅是加synchronized 是不够的,需要借助第三组件来实现。比如一些简单的做法是使用 关系型数据行级锁来实现不同进程之间的互斥,但大型分布式系统的性能瓶颈往往集中在数据库操作上。为了提高性能得采用如Redis、Zookeeper之内的组件实现分布式锁。

二、锁的获取

某银行帐户,可以同时进行帐户信息的读取,但读取其间不能修改帐户数据。其帐户ID为:888

获得读锁流程:

1、基于资源ID创建临时序号读锁节点 /lock/888.R0000000002 Read
2、获取 /lock 下所有子节点,判断其最小的节点是否为读锁,如果是则获锁成功
3、最小节点不是读锁,则阻塞等待。添加lock/ 子节点变更监听。
4、当节点变更监听触发,执行第2步

数据结构:

获得写锁流程:
1、基于资源ID创建临时序号写锁节点 /lock/888.R0000000002 Write
2、获取 /lock 下所有子节点,判断其最小的节点是否为自己,如果是则获锁成功
3、最小节点不是自己,则阻塞等待。添加lock/ 子节点变更监听。
4、当节点变更监听触发,执行第2步
释放锁:
读取完毕后,手动删除临时节点,如果获锁期间宕机,则会在会话失效后自动删除。

三、关于羊群效应

在等待锁获得期间,所有等待节点都在监听 Lock节点,一但lock 节点变更所有等待节点都会被触发,然后在同时反查Lock 子节点。如果等待对例过大会使用Zookeeper承受非常大的流量压力。

为了改善这种情况,可以采用监听链表的方式,每个等待对列只监听前一个节点,如果前一个节点释放锁的时候,才会被触发通知。这样就形成了一个监听链表。

四、代码实例

  1. package com.suibibk.zookeeper;
  2. public class Lock {
  3. private String lockId;
  4. private String path;
  5. private boolean active;
  6. public Lock(String lockId, String path) {
  7. this.lockId = lockId;
  8. this.path = path;
  9. }
  10. public Lock() {
  11. }
  12. public String getLockId() {
  13. return lockId;
  14. }
  15. public void setLockId(String lockId) {
  16. this.lockId = lockId;
  17. }
  18. public String getPath() {
  19. return path;
  20. }
  21. public void setPath(String path) {
  22. this.path = path;
  23. }
  24. public boolean isActive() {
  25. return active;
  26. }
  27. public void setActive(boolean active) {
  28. this.active = active;
  29. }
  30. }
  1. public class ZookeeperLock {
  2. private String server = "192.168.209.4:2181";
  3. private ZkClient zkClient;
  4. private static final String rootPath = "/tuling-lock";
  5. public ZookeeperLock() {
  6. zkClient = new ZkClient(server, 5000, 20000);
  7. buildRoot();
  8. }
  9. // 构建根节点
  10. public void buildRoot() {
  11. if (!zkClient.exists(rootPath)) {
  12. zkClient.createPersistent(rootPath);
  13. }
  14. }
  15. public Lock lock(String lockId, long timeout) {
  16. Lock lockNode = createLockNode(lockId);
  17. lockNode = tryActiveLock(lockNode);// 尝试激活锁
  18. if (!lockNode.isActive()) {
  19. try {
  20. synchronized (lockNode) {
  21. lockNode.wait(timeout);
  22. }
  23. } catch (InterruptedException e) {
  24. throw new RuntimeException(e);
  25. }
  26. }
  27. if (!lockNode.isActive()) {
  28. throw new RuntimeException(" lock timeout");
  29. }
  30. return lockNode;
  31. }
  32. public void unlock(Lock lock) {
  33. if (lock.isActive()) {
  34. zkClient.delete(lock.getPath());
  35. }
  36. }
  37. // 尝试激活锁
  38. private Lock tryActiveLock(Lock lockNode) {
  39. // 判断当前是否为最小节点
  40. List<String> list = zkClient.getChildren(rootPath)
  41. .stream()
  42. .sorted()
  43. .map(p -> rootPath + "/" + p)
  44. .collect(Collectors.toList());
  45. String firstNodePath = list.get(0);
  46. if (firstNodePath.equals(lockNode.getPath())) {
  47. lockNode.setActive(true);
  48. } else {
  49. String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
  50. zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
  51. @Override
  52. public void handleDataChange(String dataPath, Object data) throws Exception {
  53. }
  54. @Override
  55. public void handleDataDeleted(String dataPath) throws Exception {
  56. // 事件处理 与心跳 在同一个线程,如果Debug时占用太多时间,将导致本节点被删除,从而影响锁逻辑。
  57. System.out.println("节点删除:" + dataPath);
  58. Lock lock = tryActiveLock(lockNode);
  59. synchronized (lockNode) {
  60. if (lock.isActive()) {
  61. lockNode.notify();
  62. }
  63. }
  64. zkClient.unsubscribeDataChanges(upNodePath, this);
  65. }
  66. });
  67. }
  68. return lockNode;
  69. }
  70. public Lock createLockNode(String lockId) {
  71. String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "lock");
  72. return new Lock(lockId, nodePath);
  73. }
  74. static int i = 0;
  75. static String lockId = "lockId";
  76. public static void main(String[] args) {
  77. ZookeeperLock lock = new ZookeeperLock();
  78. for (int j = 0; j < 100; j++) {
  79. new Thread(new Runnable() {
  80. @Override
  81. public void run() {
  82. try {
  83. Thread.currentThread().sleep(100l);
  84. } catch (InterruptedException e) {
  85. // TODO Auto-generated catch block
  86. e.printStackTrace();
  87. }
  88. Lock l = lock.createLockNode(lockId);
  89. i=i+1;
  90. System.out.println(Thread.currentThread()+"i="+i);
  91. lock.unlock(l);
  92. }
  93. }).start();
  94. }
  95. }
  96. }

运行上面的例子,查看日志可以看成功锁住了

  1. Thread[Thread-71,5,main]i=1
  2. Thread[Thread-44,5,main]i=2
  3. Thread[Thread-20,5,main]i=3
  4. Thread[Thread-24,5,main]i=4
  5. Thread[Thread-7,5,main]i=5
  6. Thread[Thread-21,5,main]i=6
  7. ...
  8. Thread[Thread-33,5,main]i=91
  9. Thread[Thread-49,5,main]i=92
  10. Thread[Thread-34,5,main]i=93
  11. Thread[Thread-58,5,main]i=94
  12. Thread[Thread-30,5,main]i=95
  13. Thread[Thread-61,5,main]i=96
  14. Thread[Thread-48,5,main]i=97
  15. Thread[Thread-29,5,main]i=98
  16. Thread[Thread-69,5,main]i=99
  17. Thread[Thread-62,5,main]i=100
 374

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号-2