上面一篇文章,我们学会了zookeeper的基本API和命令,这一篇文章里我们用Java客户端来调用zookeeper的API,只是基本调用不会太详细,需要详细的可网上自行看文档。
一、环境准备
我这里的环境是window上的虚拟机IP为192.168.157.6,然后需要关闭防火墙(或者开放2181端口,因为这里只是测试,所以就直接执行命令systemctl stop firewalld关闭);新建一个maven项目,因为这篇案例笔记用了Java的客户端以及封装的更全的客户端zkClient所以pom.xml引入如下依赖:
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
二、Java客户端测试
这里发现初始化链接的时候超级慢,如果超时时间为10秒以内,基本上都会超时,推测原因为当使用ip创建ZooKeeper对象时,如果host中没有ip到主机名的映射,ZooKeeper创建过程中会调用ZooInetAddress.getHostName()这个方法从网络中获取主机名,这里耗费时间太长所致。所以如果不相等就设置下host进行主机名的映射吧。
package com.suibibk.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
/**
* zookeeper API测试
* @author 小林同学
*
*/
public class CURD {
ZooKeeper zookeeper;
@Before
public void initZookeeper() throws IOException {
String connectString = "192.168.157.6:2181";
int sessionTimeout = 50*1000;
zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
//可做其他操作(设置监听或观察者)
System.out.println("初始化监听:"+event.getPath());
}
});
}
//------------------------------查询-------------------------------//
/**
* 查询数据:没有启动监听,/test2节点数据改变不会促发监听
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getData1() throws KeeperException, InterruptedException {
String path ="/test2";
//不启动监听
Boolean watch = false;
Stat stat = new Stat();
byte[] bytes = zookeeper.getData(path, watch, stat);
String result =new String(bytes);
System.out.println("result:"+result);
System.out.println("stat:"+stat);
//暂停线程
Thread.sleep(Long.MAX_VALUE);
}
/**
* 查询数据:启动监听,/test2节点数据改变会促发监听,但是指只会监听一次
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getData2() throws KeeperException, InterruptedException {
String path ="/test2";
//启动监听
Boolean watch = true;
Stat stat = new Stat();
byte[] bytes = zookeeper.getData(path, watch, stat);
String result =new String(bytes);
System.out.println("result:"+result);
System.out.println("stat:"+stat);
//暂停线程
Thread.sleep(Long.MAX_VALUE);
}
/**
* 查询数据:自定义监听,这个也是只能监听一次,有自定义监听器了后就不会触发初始化时候的监听器
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getData3() throws KeeperException, InterruptedException {
String path ="/test2";
//启动监听
Stat stat = new Stat();
byte[] bytes = zookeeper.getData(path, new Watcher() {
public void process(WatchedEvent event) {
System.out.println("getData3:"+event);
}
}, stat);
String result =new String(bytes);
System.out.println("result:"+result);
System.out.println("stat:"+stat);
//暂停线程
Thread.sleep(Long.MAX_VALUE);
}
/**
* 查询数据:自定义监听,循环监听,有自定义监听器了后就不会触发初始化时候的监听器
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getData4() throws KeeperException, InterruptedException {
String path ="/test2";
//启动监听
Stat stat = new Stat();
byte[] bytes = zookeeper.getData(path, new Watcher() {
public void process(WatchedEvent event) {
System.out.println("getData3:"+event);
//重新监听
try {
zookeeper.getData(event.getPath(), this, null);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, stat);
String result =new String(bytes);
System.out.println("result:"+result);
System.out.println("stat:"+stat);
//暂停线程
Thread.sleep(Long.MAX_VALUE);
}
/**
* 查询数据:用DataCallback返回
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getData5() throws KeeperException, InterruptedException {
String path ="/test2";
//启动监听
String ctx ="我是外部参数";
zookeeper.getData(path, false,new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
//状态码OK表示成功 对应 0
System.out.println("rc:"+rc);
//路径
System.out.println("path:"+path);
//上下文
System.out.println("ctx:"+ctx);
//数据
System.out.println("data:"+new String(data));
System.out.println("stat:"+stat);
}
}, ctx);
//暂停线程(因为是异步获取的)
Thread.sleep(Long.MAX_VALUE);
}
/**
* 查询子节点
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getChildren() throws KeeperException, InterruptedException {
String path ="/test2";
List<String> paths = zookeeper.getChildren(path, false);
for (String p : paths) {
System.out.println("path:"+p);
}
//暂停线程
Thread.sleep(Long.MAX_VALUE);
}
/**
* 查询子节点,开启监听,添加自定义监听
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getChildren2() throws KeeperException, InterruptedException {
String path ="/test2";
List<String> paths = zookeeper.getChildren(path, new Watcher() {
public void process(WatchedEvent event) {
//这里监听打印的是父节点:只有在子节点的增删才会触发
System.out.println("event:"+event);
}
});
for (String p : paths) {
System.out.println("path:"+p);
}
//暂停线程
Thread.sleep(Long.MAX_VALUE);
}
//-------------------------------------添加----------------------------//
/**
* 创建节点CreateMode可以选择持久节点、临时节点、持久序号节点、临时序号界节点
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void create() throws KeeperException, InterruptedException {
String path="/test2/suibibk";
byte[] data ="https://www.suibibk.com".getBytes();
//ACL事务是有Perms中的属性来或的
//int READ = 1 << 0; 00000001 1
//int WRITE = 1 << 1; 00000010 2
//int CREATE = 1 << 2; 00000100 4
//int DELETE = 1 << 3; 00001000 8
//int ADMIN = 1 << 4; 00010000 16
//int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
//规律如下,若想要READ和WRITE的权限,则只需要READ|WRITE即可,值在十进制为1+2
//1,2,3,6,16这几个数字刚刚号可以凑出1~31来
//eg:读写权限
int perms = Perms.READ|Perms.WRITE;
ACL acl1 = new ACL(perms, new Id("world", "anyone"));
ACL acl2 = new ACL(perms, new Id("ip", "127.0.0.1"));
ACL acl3 = new ACL(perms, new Id("ip", "192.168.157.1"));
List<ACL> acl =new ArrayList<ACL>();
acl.add(acl1);
acl.add(acl2);
acl.add(acl3);
zookeeper.create(path, data, acl, CreateMode.PERSISTENT);
}
/**
* 删除节点
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void delete() throws KeeperException, InterruptedException {
String path="/test2/suibibk";
//-1表示不考虑当前版本信息
zookeeper.delete(path, -1);
}
/**
* 修改节点
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void update() throws KeeperException, InterruptedException {
String path="/test2";
//-1表示不考虑当前版本信息
zookeeper.setData(path, "炼词小程序".getBytes(), -1);
}
}
大家看例子应该可以很清楚了
三、zkClient客户端测试
package com.suibibk.zookeeper;
import java.io.IOException;
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.junit.Before;
import org.junit.Test;
/**
* ZKClient在原生API接口上进行了包装,同时在内部实现了诸如session超时重连、watcher反复注册等功能,
* 使得zookeeper客户端繁琐的细节对开发人员透明。
* @author 小林同学
*/
public class ZkClientTest {
ZkClient zkClient;
@Before
public void initZookeeper() throws IOException {
String connectString = "192.168.157.6:2181";
zkClient = new ZkClient(connectString, 50*1000);
}
/**
* 创建节点
*/
@Test
public void create() {
zkClient.create("/test2/suibibk", "https://www.suibibkc.om", CreateMode.PERSISTENT);
}
/**
* 查询节点
*/
@Test
public void get() {
List<String> strs = zkClient.getChildren("/test2");
for (String path : strs) {
System.out.println(path);
}
}
/**
* 修改节点
*/
@Test
public void update() {
zkClient.writeData("/test2", "lianci");
}
/**
* 删除节点
*/
@Test
public void delete() {
zkClient.delete("/test2/suibibk");
}
/**
* 添加监听
* @throws InterruptedException
*/
@Test
public void subscribe() throws InterruptedException {
System.out.println("开始监听");
//发现不会触发,由于zkClient创建连接的时候指定了默认的序列化类-new SerializableSerializer(),
//所以存储在节点上的值也是序列化后的字节数组,当使用zkCli.sh在控制台set /xxx/xx的值时,存储的是普通的字符串字节数组。
//所以当set值时虽然触发了值改变事件,但zkClient无法反序列化这个值。
zkClient.subscribeDataChanges("/test2", new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("监听到节点删除:"+dataPath);
}
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("监听到节点:"+dataPath+";数据改变:"+data);
}
});
//这里才会触发
zkClient.writeData("/test2", "dog");
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 添加监听,监听子节点
* @throws InterruptedException
*/
@Test
public void subscribeChilds() throws InterruptedException {
System.out.println("开始监听");
//对父节点添加监听子节点变化。
zkClient.subscribeChildChanges("/test2", new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
// 节点
System.out.println(">>>parentPath: " + parentPath);
// 子节点
System.out.println(">>>currentChilds: " + currentChilds);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
结束!