一、背景
我们再分布式集群环境中,可能会有多个节点,作为管理员,有必要知道线上有多少个节点在运行中,以及节点变化情况和每个节点的内存、CPU、磁盘的使用情况,若是有超出预警值需要及时通知我们,发短信或者邮件,这样子就可以快速响应生产故障或者预防生产故障。
为了实现上面的目标,这里打算借助zookeeper来监控每个节点的存活情况已经内存CPU的使用情况,关键点是用zookeeper的临时序号节点和监听。下面举个例子吧。
二、例子
启动几个节点,节点启动的时候就上报道zookeeper,然后定时上报自己内存CPU等资源的使用情况。监控程序监控节点的变化情况以及节点内容的变化情况,有异常则发邮件给管理员。
三、代码实例
这里不用zookeeper的原生API,原生API实现起来比较复杂。比如要自己去实现循环监听的情况。所以这里用zkClient来。
1、新建maven项目,pom.xml加入如下依赖
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
</dependencies>
2、建立节点类Node
public class Node {
public static void main(String[] args) throws InterruptedException {
new ZkClientMsg().init("2");
System.out.println("执行正常业务逻辑...");
Thread.sleep(Long.MAX_VALUE);
}
}
每个节点在初始化的似乎都调用ZkClientMsg类来将信息上报道zookeeper,然后再执行自己的业务逻辑。
3、ZkClientMsg
该类就是实现了链接zookeeper以及创建临时序号节点,用临时序号节点的特性:当节点挂掉后临时序号节点也会自动清除来监控节点的存活情况,以及定时上报自己的服务器相关信息,代码如下:
package com.suibibk.zookeeper;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import com.google.gson.Gson;
public class ZkClientMsg {
private ZkClient zkClient;
//根节点:为持久及诶单
private String rootPath = "/service";
//子节点:为临时序号节点
private String servicePath = "/service/service";
private String NODE = "1";
//获取当前节点的名称
private String nowPath = "";
public void init(String node){
try {
//初始化
System.out.println("Node start..."+node);
NODE = node;
//上报Zookeeper
initZookeeper();
//创建根节点:根节点用的是持久节点
createRoot();
//创建当前节点:为临时序号节点
createCurrentNode();
//定时上报IP,内存,cpu,磁盘等使用情况
new Thread(new Runnable() {
public void run() {
while(true) {
try {
updateCurrentNode();
//5秒钟上报一次
Thread.sleep(5*1000l);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}).start();
System.out.println("Node finish");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void createRoot() {
//检查根节点是否存在
try {
Boolean result = zkClient.exists(rootPath);
if(!result) {
//根节点不存在,创建
System.out.println("根节点不存在,创建持久节点");
create(rootPath, "service", CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private String create(String path,String data,CreateMode mode) {
try {
String path2 = zkClient.create(path,data,mode);
return path2;
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
public void initZookeeper() throws IOException {
String connectString = "192.168.209.4:2181";
zkClient = new ZkClient(connectString, 50*1000);
}
/**
* 创建临时节点
*/
private void createCurrentNode() {
//获取用户的IP,CPU,RAN,DISK
String result = getOSInfo();
//创建临时序号节点
String path = create(servicePath, result, CreateMode.EPHEMERAL_SEQUENTIAL);
nowPath=path;
}
/**
*更新当前临时节点
*/
private void updateCurrentNode() {
//获取用户的IP,CPU,RAN,DISK
String result = getOSInfo();
//创建临时序号节点,这里要返回节点名称
//-1表示不考虑当前版本信息
try {
zkClient.writeData(nowPath, result);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取系统信息
* @return
*/
private String getOSInfo() {
String str0 = GetSystemInfo.getLocalIp();
String str1 = GetSystemInfo.getMemery();
List<String> strs = GetSystemInfo.getDisk();
String str3 = GetSystemInfo.getCpuRatioForWindows();
Map<String,Object> map = new HashMap<String,Object>();
map.put("TIME",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
map.put("IP",str0);
map.put("CPU",str3);
map.put("RAN",str1);
map.put("DISK",strs);
map.put("NODE",NODE);
Gson gson = new Gson();
String result = gson.toJson(map);
return result;
}
}
4、GetSystemInfo
这是一个获取服务器相关信息的工具类,CPU只支持在win上的获取,这里只是举一个例子,所以就不要求这么精确了,如果是在正式环境使用,建议用专门的工具类来:
package com.suibibk.zookeeper;
import java.io.File;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import com.sun.management.OperatingSystemMXBean;
public class GetSystemInfo {
private static final int CPUTIME = 500;
private static final int PERCENT = 100;
private static final int FAULTLENGTH = 10;
/**
* 获取内存使用情况
* @return
*/
public static String getMemery() {
OperatingSystemMXBean osmxb = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
//物理内存
long totalPhysicalMemory = osmxb.getTotalPhysicalMemorySize();
//可用物理内存
long freePhysicalMemorySize = osmxb.getFreePhysicalMemorySize();
//虚拟内存
long totalvirtualMemory = osmxb.getTotalSwapSpaceSize();
//可用虚拟内存
long freeSwapSpaceSize = osmxb.getFreeSwapSpaceSize();
// System.out.println("物理内存:" + totalPhysicalMemory / 1024 / 1024 + "MB");
// System.out.println("可用物理内存:" + freePhysicalMemorySize / 1024 / 1024 + "MB");
// System.out.println("虚拟内存 :" + totalvirtualMemory / 1024 / 1024 + "MB");
// System.out.println("可用虚拟内存:" + freeSwapSpaceSize / 1024 / 1024 + "MB");
Double compare = Double.valueOf((1 - freePhysicalMemorySize * 1.0 / totalPhysicalMemory) * 100);
String str = "物理内存已使用:" +compare.intValue()+ "%";
return str;
}
/**
* 获取文件系统使用率
* @return
*/
public static List<String> getDisk()
{
// 操作系统
List<String> list = new ArrayList<String>();
for (char c = 'A'; c <= 'Z'; c++)
{
String dirName = c + ":/";
File win = new File(dirName);
if (win.exists())
{
long total = (long)win.getTotalSpace();
long free = (long)win.getFreeSpace();
Double compare = Double.valueOf((1 - free * 1.0 / total) * 100);
String str = c + ":盘 已使用 " + compare.intValue() + "%";
list.add(str);
}
}
return list;
}
/**
* 获取Windows下CPU的使用率
* @return
*/
public static String getCpuRatioForWindows()
{
try
{
String procCmd =
System.getenv("windir")
+ "\\system32\\wbem\\wmic.exe process get Caption,CommandLine,KernelModeTime,ReadOperationCount,ThreadCount,UserModeTime,WriteOperationCount";
// 取进程信息
long[] c0 = readCpu(Runtime.getRuntime().exec(procCmd));
Thread.sleep(CPUTIME);
long[] c1 = readCpu(Runtime.getRuntime().exec(procCmd));
if (c0 != null && c1 != null)
{
long idletime = c1[0] - c0[0];
long busytime = c1[1] - c0[1];
return "CPU使用率:" + Double.valueOf(PERCENT * (busytime) * 1.0 / (busytime + idletime)).intValue() + "%";
}
else
{
return "CPU使用率:" + 0 + "%";
}
}
catch (Exception ex)
{
ex.printStackTrace();
return "CPU使用率:" + 0 + "%";
}
}
/**
* 获取IP
* @return
*/
public static String getLocalIp() {
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
return addr.getHostAddress();
}
//读取cpu相关信息
private static long[] readCpu(final Process proc)
{
long[] retn = new long[2];
try
{
proc.getOutputStream().close();
InputStreamReader ir = new InputStreamReader(proc.getInputStream());
LineNumberReader input = new LineNumberReader(ir);
String line = input.readLine();
if (line == null || line.length() < FAULTLENGTH)
{
return null;
}
int capidx = line.indexOf("Caption");
int cmdidx = line.indexOf("CommandLine");
int rocidx = line.indexOf("ReadOperationCount");
int umtidx = line.indexOf("UserModeTime");
int kmtidx = line.indexOf("KernelModeTime");
int wocidx = line.indexOf("WriteOperationCount");
long idletime = 0;
long kneltime = 0;
long usertime = 0;
while ((line = input.readLine()) != null)
{
if (line.length() < wocidx)
{
continue;
}
// 字段出现顺序:Caption,CommandLine,KernelModeTime,ReadOperationCount,
// ThreadCount,UserModeTime,WriteOperation
String caption = substring(line, capidx, cmdidx - 1).trim();
String cmd = substring(line, cmdidx, kmtidx - 1).trim();
if (cmd.indexOf("wmic.exe") >= 0)
{
continue;
}
String s1 = substring(line, kmtidx, rocidx - 1).trim();
String s2 = substring(line, umtidx, wocidx - 1).trim();
if (caption.equals("System Idle Process") || caption.equals("System"))
{
if (s1.length() > 0)
idletime += Long.valueOf(s1).longValue();
if (s2.length() > 0)
idletime += Long.valueOf(s2).longValue();
continue;
}
if (s1.length() > 0)
kneltime += Long.valueOf(s1).longValue();
if (s2.length() > 0)
usertime += Long.valueOf(s2).longValue();
}
retn[0] = idletime;
retn[1] = kneltime + usertime;
return retn;
}
catch (Exception ex)
{
ex.printStackTrace();
}
finally
{
try
{
proc.getInputStream().close();
}
catch (Exception e)
{
e.printStackTrace();
}
}
return null;
}
/**
* 由于String.subString对汉字处理存在问题(把一个汉字视为一个字节),因此在 包含汉字的字符串时存在隐患,现调整如下:
* @param src 要截取的字符串
* @param start_idx 开始坐标(包括该坐标)
* @param end_idx 截止坐标(包括该坐标)
* @return
*/
private static String substring(String src, int start_idx, int end_idx)
{
byte[] b = src.getBytes();
String tgt = "";
for (int i = start_idx; i <= end_idx; i++)
{
tgt += (char)b[i];
}
return tgt;
}
public static void main(String[] args) {
String str0 = getLocalIp();
System.out.println(str0);
String str1 = getMemery();
System.out.println(str1);
List<String> strs = getDisk();
for (String str : strs) {
System.out.println(str);
}
String str3 = getCpuRatioForWindows();
System.out.println(str3);
}
}
上面节点就基本上完成啦,只需要在Node上启动就可以了。
5、监控器:ZkClientMonitor
下面这里列一下监控器,监控器的实现也很简单,就是监控节点下面的子节点数目的改变,子节点就是每个节点的临时序号节点。然后再监听每个子节点的数据变化,如果超过规定值就预警。
package com.suibibk.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import com.google.gson.Gson;
/**
* @author 小林同学
*/
public class ZkClientMonitor {
final static String path ="/service";
private static ZkClient zkClient;
public static void main(String[] args) throws InterruptedException {
String connectString = "192.168.209.4:2181";
zkClient = new ZkClient(connectString, 50*1000);
initWatch(path);
Thread.sleep(Long.MAX_VALUE);
}
/**
* 初始化监听器
* @param nodePath
*/
public static void initWatch(String path) {
//1、先移除所有监听
zkClient.unsubscribeAll();
//2、添加子节点变更监听
List<String> paths = zkClient.subscribeChildChanges(path, new IZkChildListener() {
public void handleChildChange(String arg0, List<String> arg1) throws Exception {
// TODO Auto-generated method stub
//System.out.println("重新初始化监听器:"+arg0);
List<String> strs = new ArrayList<String>();
for (String string : arg1) {
String str = zkClient.readData(arg0+"/"+string);
strs.add(str);
}
System.out.println("节点数目有变化,当前存活节点为:"+strs+"请留意");
initWatch(arg0);
}
});
for (String string : paths) {
//System.out.println("监听path:"+string+"的数据变化");
//这里进行数据变化监听
zkClient.subscribeDataChanges(path+"/"+string, new IZkDataListener() {
public void handleDataDeleted(String arg0) throws Exception {
}
public void handleDataChange(String arg0, Object arg1) throws Exception {
//System.out.println("arg0:"+arg0+"change data");
System.out.println("节点数据变化:"+arg1);
//这里对数据进行监控判断,若有数据变化超过预警值则发邮件通知管理员
Map map = new Gson().fromJson(arg1.toString(), Map.class);
//获取内存
String ip = (String) map.get("IP");
String ran = (String) map.get("RAN");
//System.out.println("ran:"+ran);
int r = Integer.parseInt(ran.replace("%", "").replaceAll("物理内存已使用:", ""));
//System.out.println("r:"+r);
if(r>60) {
System.out.println("IP:"+ip+"的内存使用高达"+r+"%请留意");
}
}
});
}
}
}
要重新移除监听的原因是因为我这没有具体的对哪些节点是新增的做筛选,实际实现中可以先保存为一个变量,然后在节点数目变化的情况下去匹对,只对新增的节点进行监控数据的改变,这样效果更好,不过我这里只是举个例子而已,为了防止已经加了监听的节点重新加监听我这里就直接先移除所有监听,为什么不移除单个监听的呢?原因如下:
注:我这里采取的是如果有变化就移除该监控节点的所监听然后再初始化监听,因为我尝试过用zkClient.unsubscribeDataChanges(arg0, arg1)这种只移除对应监听的方法完全不生效,就算用zookeeper原生API的removeAllWatchs也完全没有效果,目前还没有找到解决方案,希望有知道的可以告知我一下。
四、测试
测试的话就比较简单,这里先启动节点,然后启动监控,会发现打印如下日志:
节点数据变化:{"NODE":"1","IP":"192.168.209.1","CPU":"CPU使用率:7%","TIME":"2020-12-03 16:11:46","DISK":["C:盘 已使用 45%","D:盘 已使用 29%","E:盘 已使用 50%"],"RAN":"物理内存已使用:70%"}
IP:192.168.209.1的内存使用高达70%请留意
然后我再启动一个节点。日志会打印如下
节点数目有变化,当前存活节点为:[{"NODE":"2","IP":"192.168.209.1","CPU":"CPU使用率:5%","TIME":"2020-12-03 16:12:14","DISK":["C:盘 已使用 45%","D:盘 已使用 29%","E:盘 已使用 50%"],"RAN":"物理内存已使用:71%"}, {"NODE":"1","IP":"192.168.209.1","CPU":"CPU使用率:10%","TIME":"2020-12-03 16:12:09","DISK":["C:盘 已使用 45%","D:盘 已使用 29%","E:盘 已使用 50%"],"RAN":"物理内存已使用:71%"}]请留意
节点数据变化:{"NODE":"2","IP":"192.168.209.1","CPU":"CPU使用率:5%","TIME":"2020-12-03 16:12:17","DISK":["C:盘 已使用 45%","D:盘 已使用 29%","E:盘 已使用 50%"],"RAN":"物理内存已使用:71%"}
IP:192.168.209.1的内存使用高达71%请留意
表明监控到了数目的变化以及对新的节点数据的变化进行监听,实现了目标,收工。