个人随笔
目录
二、Java客户端调用zookeeper的API案例
2020-11-24 22:56:31

上面一篇文章,我们学会了zookeeper的基本API和命令,这一篇文章里我们用Java客户端来调用zookeeper的API,只是基本调用不会太详细,需要详细的可网上自行看文档。

一、环境准备

我这里的环境是window上的虚拟机IP为192.168.157.6,然后需要关闭防火墙(或者开放2181端口,因为这里只是测试,所以就直接执行命令systemctl stop firewalld关闭);新建一个maven项目,因为这篇案例笔记用了Java的客户端以及封装的更全的客户端zkClient所以pom.xml引入如下依赖:

  1. <!-- https://mvnrepository.com/artifact/junit/junit -->
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>4.13</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
  8. <dependency>
  9. <groupId>org.apache.zookeeper</groupId>
  10. <artifactId>zookeeper</artifactId>
  11. <version>3.6.2</version>
  12. </dependency>
  13. <!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
  14. <dependency>
  15. <groupId>com.101tec</groupId>
  16. <artifactId>zkclient</artifactId>
  17. <version>0.11</version>
  18. </dependency>

二、Java客户端测试

这里发现初始化链接的时候超级慢,如果超时时间为10秒以内,基本上都会超时,推测原因为当使用ip创建ZooKeeper对象时,如果host中没有ip到主机名的映射,ZooKeeper创建过程中会调用ZooInetAddress.getHostName()这个方法从网络中获取主机名,这里耗费时间太长所致。所以如果不相等就设置下host进行主机名的映射吧。

  1. package com.suibibk.zookeeper;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import org.apache.zookeeper.AsyncCallback;
  6. import org.apache.zookeeper.CreateMode;
  7. import org.apache.zookeeper.KeeperException;
  8. import org.apache.zookeeper.WatchedEvent;
  9. import org.apache.zookeeper.Watcher;
  10. import org.apache.zookeeper.ZooDefs.Perms;
  11. import org.apache.zookeeper.ZooKeeper;
  12. import org.apache.zookeeper.data.ACL;
  13. import org.apache.zookeeper.data.Id;
  14. import org.apache.zookeeper.data.Stat;
  15. import org.junit.Before;
  16. import org.junit.Test;
  17. /**
  18. * zookeeper API测试
  19. * @author 小林同学
  20. *
  21. */
  22. public class CURD {
  23. ZooKeeper zookeeper;
  24. @Before
  25. public void initZookeeper() throws IOException {
  26. String connectString = "192.168.157.6:2181";
  27. int sessionTimeout = 50*1000;
  28. zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  29. public void process(WatchedEvent event) {
  30. //可做其他操作(设置监听或观察者)
  31. System.out.println("初始化监听:"+event.getPath());
  32. }
  33. });
  34. }
  35. //------------------------------查询-------------------------------//
  36. /**
  37. * 查询数据:没有启动监听,/test2节点数据改变不会促发监听
  38. * @throws KeeperException
  39. * @throws InterruptedException
  40. */
  41. @Test
  42. public void getData1() throws KeeperException, InterruptedException {
  43. String path ="/test2";
  44. //不启动监听
  45. Boolean watch = false;
  46. Stat stat = new Stat();
  47. byte[] bytes = zookeeper.getData(path, watch, stat);
  48. String result =new String(bytes);
  49. System.out.println("result:"+result);
  50. System.out.println("stat:"+stat);
  51. //暂停线程
  52. Thread.sleep(Long.MAX_VALUE);
  53. }
  54. /**
  55. * 查询数据:启动监听,/test2节点数据改变会促发监听,但是指只会监听一次
  56. * @throws KeeperException
  57. * @throws InterruptedException
  58. */
  59. @Test
  60. public void getData2() throws KeeperException, InterruptedException {
  61. String path ="/test2";
  62. //启动监听
  63. Boolean watch = true;
  64. Stat stat = new Stat();
  65. byte[] bytes = zookeeper.getData(path, watch, stat);
  66. String result =new String(bytes);
  67. System.out.println("result:"+result);
  68. System.out.println("stat:"+stat);
  69. //暂停线程
  70. Thread.sleep(Long.MAX_VALUE);
  71. }
  72. /**
  73. * 查询数据:自定义监听,这个也是只能监听一次,有自定义监听器了后就不会触发初始化时候的监听器
  74. * @throws KeeperException
  75. * @throws InterruptedException
  76. */
  77. @Test
  78. public void getData3() throws KeeperException, InterruptedException {
  79. String path ="/test2";
  80. //启动监听
  81. Stat stat = new Stat();
  82. byte[] bytes = zookeeper.getData(path, new Watcher() {
  83. public void process(WatchedEvent event) {
  84. System.out.println("getData3:"+event);
  85. }
  86. }, stat);
  87. String result =new String(bytes);
  88. System.out.println("result:"+result);
  89. System.out.println("stat:"+stat);
  90. //暂停线程
  91. Thread.sleep(Long.MAX_VALUE);
  92. }
  93. /**
  94. * 查询数据:自定义监听,循环监听,有自定义监听器了后就不会触发初始化时候的监听器
  95. * @throws KeeperException
  96. * @throws InterruptedException
  97. */
  98. @Test
  99. public void getData4() throws KeeperException, InterruptedException {
  100. String path ="/test2";
  101. //启动监听
  102. Stat stat = new Stat();
  103. byte[] bytes = zookeeper.getData(path, new Watcher() {
  104. public void process(WatchedEvent event) {
  105. System.out.println("getData3:"+event);
  106. //重新监听
  107. try {
  108. zookeeper.getData(event.getPath(), this, null);
  109. } catch (Exception e) {
  110. // TODO Auto-generated catch block
  111. e.printStackTrace();
  112. }
  113. }
  114. }, stat);
  115. String result =new String(bytes);
  116. System.out.println("result:"+result);
  117. System.out.println("stat:"+stat);
  118. //暂停线程
  119. Thread.sleep(Long.MAX_VALUE);
  120. }
  121. /**
  122. * 查询数据:用DataCallback返回
  123. * @throws KeeperException
  124. * @throws InterruptedException
  125. */
  126. @Test
  127. public void getData5() throws KeeperException, InterruptedException {
  128. String path ="/test2";
  129. //启动监听
  130. String ctx ="我是外部参数";
  131. zookeeper.getData(path, false,new AsyncCallback.DataCallback() {
  132. public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
  133. //状态码OK表示成功 对应 0
  134. System.out.println("rc:"+rc);
  135. //路径
  136. System.out.println("path:"+path);
  137. //上下文
  138. System.out.println("ctx:"+ctx);
  139. //数据
  140. System.out.println("data:"+new String(data));
  141. System.out.println("stat:"+stat);
  142. }
  143. }, ctx);
  144. //暂停线程(因为是异步获取的)
  145. Thread.sleep(Long.MAX_VALUE);
  146. }
  147. /**
  148. * 查询子节点
  149. * @throws KeeperException
  150. * @throws InterruptedException
  151. */
  152. @Test
  153. public void getChildren() throws KeeperException, InterruptedException {
  154. String path ="/test2";
  155. List<String> paths = zookeeper.getChildren(path, false);
  156. for (String p : paths) {
  157. System.out.println("path:"+p);
  158. }
  159. //暂停线程
  160. Thread.sleep(Long.MAX_VALUE);
  161. }
  162. /**
  163. * 查询子节点,开启监听,添加自定义监听
  164. * @throws KeeperException
  165. * @throws InterruptedException
  166. */
  167. @Test
  168. public void getChildren2() throws KeeperException, InterruptedException {
  169. String path ="/test2";
  170. List<String> paths = zookeeper.getChildren(path, new Watcher() {
  171. public void process(WatchedEvent event) {
  172. //这里监听打印的是父节点:只有在子节点的增删才会触发
  173. System.out.println("event:"+event);
  174. }
  175. });
  176. for (String p : paths) {
  177. System.out.println("path:"+p);
  178. }
  179. //暂停线程
  180. Thread.sleep(Long.MAX_VALUE);
  181. }
  182. //-------------------------------------添加----------------------------//
  183. /**
  184. * 创建节点CreateMode可以选择持久节点、临时节点、持久序号节点、临时序号界节点
  185. * @throws InterruptedException
  186. * @throws KeeperException
  187. */
  188. @Test
  189. public void create() throws KeeperException, InterruptedException {
  190. String path="/test2/suibibk";
  191. byte[] data ="https://www.suibibk.com".getBytes();
  192. //ACL事务是有Perms中的属性来或的
  193. //int READ = 1 << 0; 00000001 1
  194. //int WRITE = 1 << 1; 00000010 2
  195. //int CREATE = 1 << 2; 00000100 4
  196. //int DELETE = 1 << 3; 00001000 8
  197. //int ADMIN = 1 << 4; 00010000 16
  198. //int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
  199. //规律如下,若想要READ和WRITE的权限,则只需要READ|WRITE即可,值在十进制为1+2
  200. //1,2,3,6,16这几个数字刚刚号可以凑出1~31来
  201. //eg:读写权限
  202. int perms = Perms.READ|Perms.WRITE;
  203. ACL acl1 = new ACL(perms, new Id("world", "anyone"));
  204. ACL acl2 = new ACL(perms, new Id("ip", "127.0.0.1"));
  205. ACL acl3 = new ACL(perms, new Id("ip", "192.168.157.1"));
  206. List<ACL> acl =new ArrayList<ACL>();
  207. acl.add(acl1);
  208. acl.add(acl2);
  209. acl.add(acl3);
  210. zookeeper.create(path, data, acl, CreateMode.PERSISTENT);
  211. }
  212. /**
  213. * 删除节点
  214. * @throws InterruptedException
  215. * @throws KeeperException
  216. */
  217. @Test
  218. public void delete() throws KeeperException, InterruptedException {
  219. String path="/test2/suibibk";
  220. //-1表示不考虑当前版本信息
  221. zookeeper.delete(path, -1);
  222. }
  223. /**
  224. * 修改节点
  225. * @throws InterruptedException
  226. * @throws KeeperException
  227. */
  228. @Test
  229. public void update() throws KeeperException, InterruptedException {
  230. String path="/test2";
  231. //-1表示不考虑当前版本信息
  232. zookeeper.setData(path, "炼词小程序".getBytes(), -1);
  233. }
  234. }

大家看例子应该可以很清楚了

三、zkClient客户端测试

  1. package com.suibibk.zookeeper;
  2. import java.io.IOException;
  3. import java.util.List;
  4. import org.I0Itec.zkclient.IZkChildListener;
  5. import org.I0Itec.zkclient.IZkDataListener;
  6. import org.I0Itec.zkclient.ZkClient;
  7. import org.apache.zookeeper.CreateMode;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. /**
  11. * ZKClient在原生API接口上进行了包装,同时在内部实现了诸如session超时重连、watcher反复注册等功能,
  12. * 使得zookeeper客户端繁琐的细节对开发人员透明。
  13. * @author 小林同学
  14. */
  15. public class ZkClientTest {
  16. ZkClient zkClient;
  17. @Before
  18. public void initZookeeper() throws IOException {
  19. String connectString = "192.168.157.6:2181";
  20. zkClient = new ZkClient(connectString, 50*1000);
  21. }
  22. /**
  23. * 创建节点
  24. */
  25. @Test
  26. public void create() {
  27. zkClient.create("/test2/suibibk", "https://www.suibibkc.om", CreateMode.PERSISTENT);
  28. }
  29. /**
  30. * 查询节点
  31. */
  32. @Test
  33. public void get() {
  34. List<String> strs = zkClient.getChildren("/test2");
  35. for (String path : strs) {
  36. System.out.println(path);
  37. }
  38. }
  39. /**
  40. * 修改节点
  41. */
  42. @Test
  43. public void update() {
  44. zkClient.writeData("/test2", "lianci");
  45. }
  46. /**
  47. * 删除节点
  48. */
  49. @Test
  50. public void delete() {
  51. zkClient.delete("/test2/suibibk");
  52. }
  53. /**
  54. * 添加监听
  55. * @throws InterruptedException
  56. */
  57. @Test
  58. public void subscribe() throws InterruptedException {
  59. System.out.println("开始监听");
  60. //发现不会触发,由于zkClient创建连接的时候指定了默认的序列化类-new SerializableSerializer(),
  61. //所以存储在节点上的值也是序列化后的字节数组,当使用zkCli.sh在控制台set /xxx/xx的值时,存储的是普通的字符串字节数组。
  62. //所以当set值时虽然触发了值改变事件,但zkClient无法反序列化这个值。
  63. zkClient.subscribeDataChanges("/test2", new IZkDataListener() {
  64. public void handleDataDeleted(String dataPath) throws Exception {
  65. System.out.println("监听到节点删除:"+dataPath);
  66. }
  67. public void handleDataChange(String dataPath, Object data) throws Exception {
  68. System.out.println("监听到节点:"+dataPath+";数据改变:"+data);
  69. }
  70. });
  71. //这里才会触发
  72. zkClient.writeData("/test2", "dog");
  73. Thread.sleep(Integer.MAX_VALUE);
  74. }
  75. /**
  76. * 添加监听,监听子节点
  77. * @throws InterruptedException
  78. */
  79. @Test
  80. public void subscribeChilds() throws InterruptedException {
  81. System.out.println("开始监听");
  82. //对父节点添加监听子节点变化。
  83. zkClient.subscribeChildChanges("/test2", new IZkChildListener() {
  84. public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
  85. // 节点
  86. System.out.println(">>>parentPath: " + parentPath);
  87. // 子节点
  88. System.out.println(">>>currentChilds: " + currentChilds);
  89. }
  90. });
  91. Thread.sleep(Integer.MAX_VALUE);
  92. }
  93. }

结束!

 199

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号-2