Gossip是什么
Gossip协议是一个通信协议,一种传播消息的方式,灵感来自于:瘟疫、社交网络等。使用Gossip协议的有:Redis Cluster、Consul、Apache Cassandra等。
原理
Gossip协议基本思想就是:一个节点想要分享一些信息给网络中的其他的一些节点。于是,它周期性的随机选择一些节点,并把信息传递给这些节点。这些收到信息的节点接下来会做同样的事情,即把这些信息传递给其他一些随机选择的节点。一般而言,信息会周期性的传递给N个目标节点,而不只是一个。这个N被称为fanout(这个单词的本意是扇出)。
例子
我们这里假设用N个节点,然后其中某一个节点的数据改变后,所有节点都需要改成相同的(最终相同),举个例子,有10个节点
1,2,3,4,5,6,7,8,9,10
假设扇出是4,然后节点1的数据变化
第一次扩散、节点1随机从剩下的节点中取四个通知
假设取到的是2,3,4,5,那么第一次扩散就已经有1,2,3,4,5个节点一致了
第二次扩散、节点2,3,4,5分别从剩下的节点中随机取四个
假设2取到的是6,7,8,9;3取到的是7,8,9,10,也有可能2,3,4,5都取到6,7,8,9
那么剩下的节点10就得第三次扩散才能扩散过去。
这里因为每个节点都是异步的,可以不考虑是否扩散成功,所以有些节点可能会多次扩散到,所以我下面的例子获取的扩散次数其实假设只扩散一次,实际应该会比我下面的例子次数多一点,举个例子,redis有多个节点,节点2和节点3准备扩散的时候,同时选了节点4,此时节点4状态还是为扩散的,所以会被扩散2次,如果节点数多,重复扩散数目会更多。
代码
1、主程序GossipTest
public class GossipTest {
public static void main(String[] args) {
//节点数目
int num = 20;
//扇出也就是每次扩散的节点数
int fanout = 4;
List<Node> nodes = new ArrayList<Node>();
for (int i = 1; i <=num; i++) {
Node node = new Node();
node.setFanout(fanout);
node.setIndex(i);
node.setNodes(nodes);
nodes.add(node);
}
//表明这个是第一次传播
ConsistentUtil.map.put(nodes.get(0).getIndex(),0);
nodes.get(0).setData("大家都设置为1");
try {
Thread.sleep(10000);
System.out.println("到结束,总共扩散了"+ConsistentUtil.max+"次"+nodes);
} catch (InterruptedException e) {
}
}
}
作用就是根据节点的数目和扇出的数目初始化相关设置,然后假设第一个节点数据变化,触发了扩散,当然这里的扩散只能用线程模拟,真实情况可能是RPC调用别的服务器的资源。
2、节点Node
/**
* 每一个节点,最终目的是每个节点的数据最终一致
*/
class Node{
//节点顺序号
private int index;
//设置扇出
private int fanout;
//节点数据,这里简单的用字符串代替
private String data;
//持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)
private List<Node> nodes;
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
//这里设置完消息后需要进行传播消息,这里启动一个线程来模拟
//但是这里可能会导致二次调用,所以这里要锁一下
try {
//这里捕获异常,允许传播失败
new ConsistentUtil(nodes, this).start();
} catch (Exception e) {
e.printStackTrace();
}
}
public List<Node> getNodes() {
return nodes;
}
public void setNodes(List<Node> nodes) {
this.nodes = nodes;
}
public int getFanout() {
return fanout;
}
public void setFanout(int fanout) {
this.fanout = fanout;
}
@Override
public String toString() {
return "Node [index=" + index + ", data=" + data + "]";
}
}
上面的属性有节点编号,扇出,节点数据,以及所有节点信息,这里就单纯用list来保存所有节点信息了,并且所有节点信息的修改不需要加任何的锁,因为扩散可能是多次的,失败后会有别的节点继续扩散,这样的话就完全解耦合了。,扩散的触发点是如下代码
public void setData(String data) {
this.data = data;
//这里设置完消息后需要进行传播消息,这里启动一个线程来模拟
//但是这里可能会导致二次调用,所以这里要锁一下
try {
//这里捕获异常,允许传播失败
new ConsistentUtil(nodes, this).start();
} catch (Exception e) {
e.printStackTrace();
}
}
但是世界情况,可能是数据改变后,会有个固定的时间去进行扩散,而不会立马,并且不通的应用场景有不通的需求,这里的话暂时接收到改变就立马处理。扩散的方式也是启动一个线程去处理,并且该线程运行失败也不用处理,因为就算这个节点没有扩散成功,会有别的节点扩散成功的,除非扩散的节点真的一直挂掉了,那就是另一个使用场景了。
3、扩散线程
class ConsistentUtil extends Thread{
public static Map<Integer,Integer> map = new ConcurrentHashMap<Integer,Integer>();
public static int max=0;
public static synchronized void compare(int num) {
if(num>max) {
max=num;
//System.out.println("最大循环次数:"+max);
}
}
//持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)
private List<Node> nodes;
//执行的node
private Node node;
public ConsistentUtil(List<Node> nodes, Node node) {
super();
this.nodes = nodes;
this.node = node;
}
@Override
public void run() {
//1、先检查是否全部都已经一致了
boolean flag = checkConsistent();
if(!flag) {
//随机获取fanout个Node
List<Node> choice = this.getChoiceNodes();
if(choice.size()>0) {
//传播的次数加1
for (Node node : choice) {
//System.out.println(this.node.getIndex()+"向"+node.getIndex()+"进行传播");
//获取上一个节点扩散的次数
Integer num = map.get(this.node.getIndex());
//本次扩散次数加1
map.put(node.getIndex(),num+1);
node.setData(this.node.getData());
}
}
}else {
//这里表示结束了
System.out.println(this.node.getIndex()+"已经结束了");
count();
}
}
private boolean checkConsistent() {
Node pre = null;
for (Node node : nodes) {
if(pre==null) {
pre=node;
}else {
if(pre.getData().equals(node.getData())) {
pre = node;
}else {
//有不一致的
return false;
}
}
}
return true;
}
//选择没有被传播的节点
private List<Node> getChoiceNodes() {
List<Node> noSets = new ArrayList<Node>();
for (Node n : nodes) {
//这个没有传播过,且不是自己,继续传播
if(!this.node.getData().equals(n.getData())&&(this.node.getIndex()!=n.getIndex())) {
noSets.add(n);
}
}
List<Node> choice = new ArrayList<Node>();
//1、获取当前节点数目
int num = noSets.size();
//2、随机获取fanout个下标
if(num<=node.getFanout()) {
//这里就获取全部
return noSets;
}else {
int[] indexs = this.getRandomNums(node.getFanout(), num);
for (int i : indexs) {
choice.add(noSets.get(i));
}
}
return choice;
}
/**
* 从all里面随机产生size个不重复的下标
* @param size 需要产生的下标数
* @param all 范围
* @return
*/
private int[] getRandomNums(int size,int all) {
if(size>all) {
System.err.println("size大于all"+size+">"+all);
return null;
}
SecureRandom random = new SecureRandom();
//把all当成一个list
List<Integer> list = new ArrayList<Integer>();
for(int i=0;i<all;i++) {
list.add(i);
}
int[] result = new int[size];
for(int j=0;j<size;j++) {
//随机生成一个下标
int index = random.nextInt(list.size());
//根据下标去取list中的值
result[j]=list.get(index);
//从list移除该值
list.remove(index);
}
return result;
}
//通过map统计次数
private static void count() {
int max = 0;
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
//Integer mapKey = entry.getKey();
Integer mapValue = entry.getValue();
if(mapValue>max) {
max = mapValue;
}
}
compare(max);
}
}
原理就是,从所有节点中随机获取扇出(fanout)个数据还没有一致的节点,然后修改它们的值。
那么下面就是运行测试数据扇出是4.
节点数目 | 20 | 60 | 180 | 500 | 1000 | 3000 | 8000 | 20000 |
---|---|---|---|---|---|---|---|---|
扩散次数 | 4 | 6 | 8 | 9 | 10 | 13 | 18 | 18 |
可以看到,随着节点数的增加,扩散次数其实变化的不大,毕竟扩散是指数级别递增的。
当然我上面的例子,肯定还有很多没考虑的地方,不过大体思路应是这样,只是随手用java模拟一下下而已,有错误麻烦指正下下。
图解Gossip:可能是最有趣的一致性协议(转)