个人随笔
目录
用Java模拟一个Gossip协议的简单例子
2023-05-03 20:00:34

Gossip是什么

Gossip协议是一个通信协议,一种传播消息的方式,灵感来自于:瘟疫、社交网络等。使用Gossip协议的有:Redis Cluster、Consul、Apache Cassandra等。

原理

Gossip协议基本思想就是:一个节点想要分享一些信息给网络中的其他的一些节点。于是,它周期性的随机选择一些节点,并把信息传递给这些节点。这些收到信息的节点接下来会做同样的事情,即把这些信息传递给其他一些随机选择的节点。一般而言,信息会周期性的传递给N个目标节点,而不只是一个。这个N被称为fanout(这个单词的本意是扇出)。

例子

我们这里假设用N个节点,然后其中某一个节点的数据改变后,所有节点都需要改成相同的(最终相同),举个例子,有10个节点

  1. 12345678910

假设扇出是4,然后节点1的数据变化

第一次扩散、节点1随机从剩下的节点中取四个通知

假设取到的是2,3,4,5,那么第一次扩散就已经有1,2,3,4,5个节点一致了

第二次扩散、节点2,3,4,5分别从剩下的节点中随机取四个

假设2取到的是6,7,8,9;3取到的是7,8,9,10,也有可能2,3,4,5都取到6,7,8,9
那么剩下的节点10就得第三次扩散才能扩散过去。

这里因为每个节点都是异步的,可以不考虑是否扩散成功,所以有些节点可能会多次扩散到,所以我下面的例子获取的扩散次数其实假设只扩散一次,实际应该会比我下面的例子次数多一点,举个例子,redis有多个节点,节点2和节点3准备扩散的时候,同时选了节点4,此时节点4状态还是为扩散的,所以会被扩散2次,如果节点数多,重复扩散数目会更多。

代码

1、主程序GossipTest

  1. public class GossipTest {
  2. public static void main(String[] args) {
  3. //节点数目
  4. int num = 20;
  5. //扇出也就是每次扩散的节点数
  6. int fanout = 4;
  7. List<Node> nodes = new ArrayList<Node>();
  8. for (int i = 1; i <=num; i++) {
  9. Node node = new Node();
  10. node.setFanout(fanout);
  11. node.setIndex(i);
  12. node.setNodes(nodes);
  13. nodes.add(node);
  14. }
  15. //表明这个是第一次传播
  16. ConsistentUtil.map.put(nodes.get(0).getIndex(),0);
  17. nodes.get(0).setData("大家都设置为1");
  18. try {
  19. Thread.sleep(10000);
  20. System.out.println("到结束,总共扩散了"+ConsistentUtil.max+"次"+nodes);
  21. } catch (InterruptedException e) {
  22. }
  23. }
  24. }

作用就是根据节点的数目和扇出的数目初始化相关设置,然后假设第一个节点数据变化,触发了扩散,当然这里的扩散只能用线程模拟,真实情况可能是RPC调用别的服务器的资源。

2、节点Node

  1. /**
  2. * 每一个节点,最终目的是每个节点的数据最终一致
  3. */
  4. class Node{
  5. //节点顺序号
  6. private int index;
  7. //设置扇出
  8. private int fanout;
  9. //节点数据,这里简单的用字符串代替
  10. private String data;
  11. //持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)
  12. private List<Node> nodes;
  13. public int getIndex() {
  14. return index;
  15. }
  16. public void setIndex(int index) {
  17. this.index = index;
  18. }
  19. public String getData() {
  20. return data;
  21. }
  22. public void setData(String data) {
  23. this.data = data;
  24. //这里设置完消息后需要进行传播消息,这里启动一个线程来模拟
  25. //但是这里可能会导致二次调用,所以这里要锁一下
  26. try {
  27. //这里捕获异常,允许传播失败
  28. new ConsistentUtil(nodes, this).start();
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. public List<Node> getNodes() {
  34. return nodes;
  35. }
  36. public void setNodes(List<Node> nodes) {
  37. this.nodes = nodes;
  38. }
  39. public int getFanout() {
  40. return fanout;
  41. }
  42. public void setFanout(int fanout) {
  43. this.fanout = fanout;
  44. }
  45. @Override
  46. public String toString() {
  47. return "Node [index=" + index + ", data=" + data + "]";
  48. }
  49. }

上面的属性有节点编号,扇出,节点数据,以及所有节点信息,这里就单纯用list来保存所有节点信息了,并且所有节点信息的修改不需要加任何的锁,因为扩散可能是多次的,失败后会有别的节点继续扩散,这样的话就完全解耦合了。,扩散的触发点是如下代码

  1. public void setData(String data) {
  2. this.data = data;
  3. //这里设置完消息后需要进行传播消息,这里启动一个线程来模拟
  4. //但是这里可能会导致二次调用,所以这里要锁一下
  5. try {
  6. //这里捕获异常,允许传播失败
  7. new ConsistentUtil(nodes, this).start();
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. }
  11. }

但是世界情况,可能是数据改变后,会有个固定的时间去进行扩散,而不会立马,并且不通的应用场景有不通的需求,这里的话暂时接收到改变就立马处理。扩散的方式也是启动一个线程去处理,并且该线程运行失败也不用处理,因为就算这个节点没有扩散成功,会有别的节点扩散成功的,除非扩散的节点真的一直挂掉了,那就是另一个使用场景了。

3、扩散线程

  1. class ConsistentUtil extends Thread{
  2. public static Map<Integer,Integer> map = new ConcurrentHashMap<Integer,Integer>();
  3. public static int max=0;
  4. public static synchronized void compare(int num) {
  5. if(num>max) {
  6. max=num;
  7. //System.out.println("最大循环次数:"+max);
  8. }
  9. }
  10. //持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)
  11. private List<Node> nodes;
  12. //执行的node
  13. private Node node;
  14. public ConsistentUtil(List<Node> nodes, Node node) {
  15. super();
  16. this.nodes = nodes;
  17. this.node = node;
  18. }
  19. @Override
  20. public void run() {
  21. //1、先检查是否全部都已经一致了
  22. boolean flag = checkConsistent();
  23. if(!flag) {
  24. //随机获取fanout个Node
  25. List<Node> choice = this.getChoiceNodes();
  26. if(choice.size()>0) {
  27. //传播的次数加1
  28. for (Node node : choice) {
  29. //System.out.println(this.node.getIndex()+"向"+node.getIndex()+"进行传播");
  30. //获取上一个节点扩散的次数
  31. Integer num = map.get(this.node.getIndex());
  32. //本次扩散次数加1
  33. map.put(node.getIndex(),num+1);
  34. node.setData(this.node.getData());
  35. }
  36. }
  37. }else {
  38. //这里表示结束了
  39. System.out.println(this.node.getIndex()+"已经结束了");
  40. count();
  41. }
  42. }
  43. private boolean checkConsistent() {
  44. Node pre = null;
  45. for (Node node : nodes) {
  46. if(pre==null) {
  47. pre=node;
  48. }else {
  49. if(pre.getData().equals(node.getData())) {
  50. pre = node;
  51. }else {
  52. //有不一致的
  53. return false;
  54. }
  55. }
  56. }
  57. return true;
  58. }
  59. //选择没有被传播的节点
  60. private List<Node> getChoiceNodes() {
  61. List<Node> noSets = new ArrayList<Node>();
  62. for (Node n : nodes) {
  63. //这个没有传播过,且不是自己,继续传播
  64. if(!this.node.getData().equals(n.getData())&&(this.node.getIndex()!=n.getIndex())) {
  65. noSets.add(n);
  66. }
  67. }
  68. List<Node> choice = new ArrayList<Node>();
  69. //1、获取当前节点数目
  70. int num = noSets.size();
  71. //2、随机获取fanout个下标
  72. if(num<=node.getFanout()) {
  73. //这里就获取全部
  74. return noSets;
  75. }else {
  76. int[] indexs = this.getRandomNums(node.getFanout(), num);
  77. for (int i : indexs) {
  78. choice.add(noSets.get(i));
  79. }
  80. }
  81. return choice;
  82. }
  83. /**
  84. * 从all里面随机产生size个不重复的下标
  85. * @param size 需要产生的下标数
  86. * @param all 范围
  87. * @return
  88. */
  89. private int[] getRandomNums(int size,int all) {
  90. if(size>all) {
  91. System.err.println("size大于all"+size+">"+all);
  92. return null;
  93. }
  94. SecureRandom random = new SecureRandom();
  95. //把all当成一个list
  96. List<Integer> list = new ArrayList<Integer>();
  97. for(int i=0;i<all;i++) {
  98. list.add(i);
  99. }
  100. int[] result = new int[size];
  101. for(int j=0;j<size;j++) {
  102. //随机生成一个下标
  103. int index = random.nextInt(list.size());
  104. //根据下标去取list中的值
  105. result[j]=list.get(index);
  106. //从list移除该值
  107. list.remove(index);
  108. }
  109. return result;
  110. }
  111. //通过map统计次数
  112. private static void count() {
  113. int max = 0;
  114. for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
  115. //Integer mapKey = entry.getKey();
  116. Integer mapValue = entry.getValue();
  117. if(mapValue>max) {
  118. max = mapValue;
  119. }
  120. }
  121. compare(max);
  122. }
  123. }

原理就是,从所有节点中随机获取扇出(fanout)个数据还没有一致的节点,然后修改它们的值。

那么下面就是运行测试数据扇出是4.

节点数目 20 60 180 500 1000 3000 8000 20000
扩散次数 4 6 8 9 10 13 18 18

可以看到,随着节点数的增加,扩散次数其实变化的不大,毕竟扩散是指数级别递增的。

当然我上面的例子,肯定还有很多没考虑的地方,不过大体思路应是这样,只是随手用java模拟一下下而已,有错误麻烦指正下下。
图解Gossip:可能是最有趣的一致性协议(转)

 267

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号-2