CuratorFramework
Summary
dependency: curator-recipes (included zookeeper)
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <!-- <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> --> </dependency>
- Change zookeeper version ( beta version may have issues.)
CuratorFramework zkClient = CuratorFrameworkFactory.builder()...build()
, key properties:- connectString 服务器列表,格式host1:port1,host2:port2
- namespace 命名空间
- retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口(eg:
ExponentialBackoffRetry(3000, 3)
) - sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
- connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms
zkClient.start()
,zkClient.close()
CRUD
- Create:
create()
- creatingParentContainersIfNeeded() 自动递归创建所有所需的父节点
- withMode(CreateMode.xxx) 指定创建模式
- CreateMode.PERSISTENT 持久化 (default)
- CreateMode.PERSISTENT_SEQUENTIAL 持久化并且带序列号
- CreateMode.EPHEMERAL 临时
- CreateMode.EPHEMERAL_SEQUENTIAL 临时并且带序列号
- withACL(Ids.xxx)
- Ids.OPEN_ACL_UNSAFE (default)
- Ids.CREATOR_ALL_ACL
- Ids.READ_ACL_UNSAFE
- Ids.ANYONE_ID_UNSAFE
- Ids.AUTH_IDS
- eg:
String createdPath=zkClient.create().creatingParentsIfNeeded().forPath(path,dataBytes)
String createdPath=zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path,dataBytes)
- Read:
getData()
- 不存在则抛出
NoNodeException
- 返回值是
byte[]
storingStatIn(stat)
获取到该节点的stat- eg:
byte[] data=zkClient.getData().forPath(path)
byte[] data=zkClient.getData().storingStatIn(stat).forPath(path)
- 不存在则抛出
- Update:
setData()
- 不存在则抛出
NoNodeException
- 返回一个
Stat
实例 withVersion(num)
强制指定版本进行更新- eg:
Stat stat=zkClient.setData().forPath(path,dataBytes)
Stat stat=zkClient.setData().withVersion(0).forPath(path,dataBytes)
(Version不匹配则抛出BadVersionException
)
- 不存在则抛出
- Delete:
delete()
- 不存在则抛出
NoNodeException
- 只能删除叶子节点
guaranteed()
保障措施, 如果删除失败,那么在后端还是继续会删除,直到成功deletingChildrenIfNeeded()
递归删除其所有的子节点withVersion(num)
强制指定版本进行删除- eg:
zkClient.delete().guaranteed().deletingChildrenIfNeeded().withVersion(12).forPath(path)
- 不存在则抛出
- More:
checkExists()
检查节点是否存在,不存在则返回为null
,eg:Stat stat=zkClient.checkExists().forPath(path)
client.checkExists().creatingParentContainersIfNeeded().forPath(path)
getChildren()
获取某个节点的所有子节点路径,eg:List<String> childrenPathList=zkClient.getChildren().forPath(zkPath)
- Create:
Watch
usingWatcher(watcher)
:- Target: 监听当前节点,对子节点无效!
- Times: 只触发一次,监听完毕后就销毁
- Watcher:
interface CuratorWatcher
public interface CuratorWatcher{ public void process(WatchedEvent event) throws Exception; // event.getPath(),getType(),getState(),... }
- Usage:
getData().usingWatcher(watcher).forPath(watchPath)
- 监听节点(watchPath)需存在,否则报
NoNodeException
- 可监听到的EventType:
NodeDeleted
,NodeDataChanged
- 监听节点(watchPath)需存在,否则报
checkExists().usingWatcher(watcher).forPath(watchPath)
- 监听节点(watchPath)可不存在
- 可监听到的EventType:
NodeCreated
,NodeDeleted
,NodeDataChanged
NodeCache
:- Target: 监听当前节点,对子节点无效!
- Times: 一次注册,n次监听 (监听节点可不存在)
- Listener:
interface NodeCacheListener
public interface NodeCacheListener{ public void nodeChanged() throws Exception; }
- Usage:
NodeCache nodeCache = new NodeCache(zkClient, watchPath); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { // ChildData data=nodeCache.getCurrentData(); ChildData#getPath(),getData(),getStat() -- get watchPath node information } }); nodeCache.start(true); // 启动(buildInitial: true/false), 注:启动不会触发watcher ChildData data=nodeCache.getCurrentData(); // get watchPath node information
PathChildrenCache
:- Target: 监听节点的一级子节点的CUD,注:
- 监听节点不会触发watcher,监听节点可不存在
- 若监听节点删除,则监听失效,子节点的变化将不会触发watcher
- Times: 一次注册,n次监听
- Listener:
interface PathChildrenCacheListener
public interface PathChildrenCacheListener{ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception; }
Usage:
PathChildrenCache pathChildrenCache=new PathChildrenCache(zkClient, watchPath, true); // cacheData:true/false (true则Client能够获取到节点数据内容) pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { // event.getType() // Type.CHILD_ADDED,CHILD_UPDATED,CHILD_REMOVED,CONNECTION_SUSPENDED,CONNECTION_RECONNECTED,CONNECTION_LOST,INITIALIZED // event.getData() // ChildData, ChildData#getPath,getData,getStat -- 获取触发Event的节点信息 // event.getInitialData() // List<ChildData> -- get initial data when triggered Type.INITIALIZED } }); pathChildrenCache.start(StartMode.BUILD_INITIAL_CACHE); // 启动 // StartMode 为初始的cache设置暖场方式 // StartMode.NORMAL 异步初始化(default mode), Note: initial not trigger watcher // StartMode.POST_INITIALIZED_EVENT 异步初始化, Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件 // StartMode.BUILD_INITIAL_CACHE 同步初始化, start方法返回之前调用rebuild(), Note: initial not trigger watcher List<ChildData> childDataList= pathChildrenCache.getCurrentData(); // get watch node (watchPath's child nodes) information
- Target: 监听节点的一级子节点的CUD,注:
TreeCache
:- Target: 监听节点和该节点的所有子节点 (创建时可设置
maxDepth
) - Times: 一次注册,n次监听
- Listener:
interface TreeCacheListener
public interface TreeCacheListener{ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception; }
- Usage:
TreeCache treeCache=new TreeCache(zkClient, watchPath,false); // cacheData:true/false treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { // event.getType() Type.NODE_ADDED,NODE_UPDATED,NODE_REMOVED,CONNECTION_SUSPENDED,CONNECTION_RECONNECTED,CONNECTION_LOST,INITIALIZED // event.getData() ChildData,ChildData#getPath,getData,getStat -- 获取触发Event的节点信息 } }); treeCache.start(); // 启动 ChildData childData=treeCache.getCurrentData(watchPath); // get watch node (watchPath node) information Map<String,ChildData> childMap=treeCache.getCurrentChildren(watchPath); // get watch node (watchPath's child nodes) information
- Target: 监听节点和该节点的所有子节点 (创建时可设置
ACL 权限控制
- 使用:
schema
:id
:permission
来标识schema
:id
schema
: 权限模式(鉴权的策略)id
: 授权对象,即权限赋予的用户或者一个实体schema id world 只有一个ID: anyone
(任何人都拥有所有权限)ip ip地址或ip段 auth "", 使用已添加认证的用户认证,eg: digest:username:password
digest 用户名:加密密码
通常是username:BASE64(SHA-1(username:password))
permission
: 权限- CREATE : c 可以创建子节点
- DELETE : d 可以删除子节点(仅下一级节点)
- READ : r 可以读取节点数据及显示子节点列表
- WRITE : w 可以设置节点数据
- ADMIN : a 可以设置节点访问控制列表权限
特性:
- ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
- 每个znode支持设置多种权限控制方案和多个权限
- 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点
cmd示例:
- world
# setAcl <path> world:anyone:<permission> $ setAcl /node1 world:anyone:cdrwa $ getAcl /node1
- ip
# setAcl <path> ip:<ip>:<permission> $ setAcl /node2 ip:192.168.100.1:cdrwa $ getAcl /node2
- auth
# addauth digest <user>:<password> # 添加认证用户 # setAcl <path> auth:<user>:<permission> # 设置ACL $ addauth digest tom:123456 $ setAcl /node3 auth:tom:cdrwa
- digest
# echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64 # password:经SHA1及BASE64处理的密文 # setAcl <path> digest:<user>:<password>:<permission> $ echo -n tom:123456 | openssl dgst -binary -sha1 | openssl base64 3YvKnq60bERLJOlabQFeB1f+/n0= $ setAcl /node4 digest:tom:3YvKnq60bERLJOlabQFeB1f+/n0=:cdrwa
- world
Curator
new ACL(perms,id)
- parameter1:
Perms.xxx
- Perms.ADMIN -- 可以修改节点权限(setAcl)
- Perms.READ -- 可读取节点(ls,get)
- Perms.WRITE -- 可修改节点内容 (set)
- Perms.CREATE -- 可创建节点 (create)
- Perms.DELETE -- 可删除节点 (delete)
- parameter2:
new org.apache.zookeeper.data.Id(String schema,String id)
- eg:
Id id1=new Id("digest",DigestAuthenticationProvider.generateDigest("id01:12345")); ACL aclRW=new ACL(Perms.READ|Perms.WRITE,id1);
- Util:
org.apache.zookeeper.ZooDefs.Ids
提供一些常用的Id和ACL实例ANYONE_ID_UNSAFE = new Id("world", "anyone")
AUTH_IDS = new Id("auth", "")
OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)))
CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)))
READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)))
- parameter1:
aclProvider(aclProvider)
same as zkClient cmdsetAcl path acl
public interface ACLProvider{ public List<ACL> getDefaultAcl(); public List<ACL> getAclForPath(String path); }
aclProvider(new DefaultACLProvider())
default,使用ZooDefs.Ids.OPEN_ACL_UNSAFEaclProvider(new ACLProvider(){ ... })
override getDefaultAcl,getAclForPath
authorization(authInfoList)
: same as zkClient cmdaddauth sechema auth
( eg: addauth digest id02:12345 )new AuthInfo(String scheme, byte[] auth)
- eg:
new AuthInfo("digest", "id01:12345".getBytes())
withACL()
: same as zkClient cmdsetAcl path acl
setACL().withACL(aclList).forPath(path)
create().withACL(aclList).forPath(path)
- Note:
setACL()
需要Admin权限 (Perms.ADMIN)getACL()
无需认权限证
- 使用:
Demo: Start
public class CuratorTest {
public static String connectString="127.0.0.1:2181";
public static String namespace="micro";
public static String zkPath="/test";
private CuratorFramework zkClient=null;
@Before
public void init(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
zkClient=CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(100000)
.connectionTimeoutMs(100000)
.namespace(namespace)
.build();
zkClient.start();
CuratorFrameworkState state=zkClient.getState();
System.out.println("Init zkClient :"+state.name());
}
@After
public void close(){
if(zkClient!=null){
zkClient.close();
CuratorFrameworkState state=zkClient.getState();
System.out.println("Close zkClient :"+state.name());
}
}
/*....*/
}
Demo: CRUD
Create
create().creatingParentsIfNeeded().forPath(path,data)
@Test public void createTest() throws Exception{ String result=zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) //default: PERSISTENT .withACL(Ids.OPEN_ACL_UNSAFE) //default: Ids.OPEN_ACL_UNSAFE .forPath(zkPath+"/a","aaa".getBytes()) ; System.out.println("create: "+result); // print created path } @Test public void createChildrenTest() throws Exception { zkClient.create().creatingParentsIfNeeded().forPath(zkPath+"/a/01/02"); }
checkExists().forPath(path)
@Test public void checkExistTest() throws Exception{ Stat stat=zkClient.checkExists().forPath(zkPath+"/a"); System.out.println(stat); // 如果不存在则为空 stat=zkClient.checkExists().forPath(zkPath+"/aa"); //null System.out.println(stat); }
Read
getData().forPath(path)
@Test public void getDataTest() throws Exception{ byte[] data=zkClient.getData().forPath(zkPath+"/a"); System.out.println(new String(data)); // NoNodeException data=zkClient.getData().forPath(zkPath+"/aa"); if(data!=null) System.out.println(new String(data)); }
getData().storingStatIn(stat).forPath(path)
@Test public void getStatTest() throws Exception{ Stat stat = new Stat(); byte[] data=zkClient.getData().storingStatIn(stat).forPath(zkPath+"/a"); if(data!=null){ System.out.println("data: "+new String(data)); System.out.println("version: "+stat.getVersion()); } //NoNodeException data=zkClient.getData().storingStatIn(stat).forPath(zkPath+"/aa"); if(data!=null){ System.out.println("data: "+new String(data)); System.out.println("version: "+stat.getVersion()); } }
getChildren().forPath(path)
@Test public void getChildrenTest() throws Exception{ List<String> children=zkClient.getChildren().forPath(zkPath); for(String child:children){ byte[] data=zkClient.getData().forPath(zkPath+"/"+child); // note: need add parentPath System.out.println(child+":"+new String(data)); } }
Update
setData().forPath(path,data)
@Test public void updateTest() throws Exception{ Stat stat=zkClient.setData().forPath(zkPath+"/a","aaa-aaa".getBytes()); System.out.println(stat.getVersion()); byte[] data=zkClient.getData().forPath(zkPath+"/a"); System.out.println(new String(data)); //BadVersionException stat=zkClient.setData().withVersion(0).forPath(zkPath+"/a","aaa-bbb".getBytes()); System.out.println(stat.getVersion()); //NoNodeException //zkClient.setData().forPath(zkPath+"/a/01","a01".getBytes()); }
Delete
delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
@Test public void deleteTest() throws Exception{ zkClient.delete() .guaranteed() // 如果删除失败,那么在后端还是继续会删除,直到成功 .deletingChildrenIfNeeded() // 如果有子节点,就删除 .forPath(zkPath+"/a/01"); Stat stat=zkClient.checkExists().forPath(zkPath+"/a/01"); System.out.println(zkPath+"/a/01 :"+(stat!=null?"Exist":"NotExist")); stat=zkClient.checkExists().forPath(zkPath+"/a/01/02"); System.out.println(zkPath+"/a/01/02 :"+(stat!=null?"Exist":"NotExist")); // NoNodeException zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(zkPath+"/aa"); }
Demo: Watch
usingWatcher
: 监听只会触发一次,监听完毕后就销毁,且对子节点无效!(getData()
,checkExists()
方法可追加 Watcher)getData().usingWatcher(new CuratorWatcher(){ ... } ).forPath(watchPath)
: 只监听该节点的变化(可监听到NodeDeleted
,NodeDataChanged
),监听节点(watchPath)需存在,不然会报NoNodeException
@Test public void usingWatcherTest() throws Exception{ Stat stat=zkClient.checkExists().forPath(zkPath+"/b"); if(stat==null) zkClient.create().creatingParentContainersIfNeeded().forPath(zkPath+"/b","bbb".getBytes()); zkClient.getData().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { System.out.println("Path:"+event.getPath()); System.out.println("Type:"+event.getType()); System.out.println("State:"+event.getState()); if(!event.getType().equals(EventType.NodeDeleted)){ byte[] data=zkClient.getData().forPath(event.getPath()); if(data!=null) System.out.println("Data:"+new String(data)); } } }).forPath(zkPath+"/b"); // case 1: node data change -- only trigger once // first time - trigger event: NodeDataChanged // zkClient.setData().forPath(zkPath+"/b","bbb-bbb".getBytes()); // second time - no watcher trigger // zkClient.setData().forPath(zkPath+"/b","bbb-ccc".getBytes()); // case 2: create children -- no trigger // zkClient.create().creatingParentsIfNeeded().forPath(zkPath+"/b/01/02","Hello".getBytes()); // case 3: update children data -- no trigger // zkClient.setData().forPath(zkPath+"/b/01","b01".getBytes()); // case 4: delete children -- no trigger // zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b/01/02"); // case 5: delete node -- trigger event: NodeDeleted zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b"); Thread.sleep(10000); }
checkExists().usingWatcher(new CuratorWatcher(){ ... } ).forPath(watchPath)
: 只监听该节点的变化(可监听到NodeCreated
,NodeDeleted
,NodeDataChanged
),监听节点(watchPath)可不存在@Test public void usingWatcher2Test() throws Exception{ // Stat stat=zkClient.checkExists().forPath(zkPath+"/b"); // if(stat==null) // zkClient.create().creatingParentContainersIfNeeded().forPath(zkPath+"/b","bbb".getBytes()); zkClient.checkExists().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { System.out.println("Path:"+event.getPath()); System.out.println("Type:"+event.getType()); System.out.println("State:"+event.getState()); if(!event.getType().equals(EventType.NodeDeleted)){ byte[] data=zkClient.getData().forPath(event.getPath()); if(data!=null) System.out.println("Data:"+new String(data)); } } }).forPath(zkPath+"/b"); // case 1: node data change // first time - trigger event:NodeDataChanged // zkClient.setData().forPath(zkPath+"/b","bbb-bbb".getBytes()); // second time - no watcher trigger // zkClient.setData().forPath(zkPath+"/b","bbb-ccc".getBytes()); // case 2: create children -- no trigger // zkClient.create().creatingParentsIfNeeded().forPath(zkPath+"/b/01/02","Hello".getBytes()); // case 3: update children data -- no trigger // zkClient.setData().forPath(zkPath+"/b/01","b01".getBytes()); // case 4: delete children -- no trigger // zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b/01/02"); // case 5: delete node -- trigger event: NodeDeleted // zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b"); // case 6: create node -- trigger event: NodeCreated zkClient.create().creatingParentContainersIfNeeded().forPath(zkPath+"/b","bbb".getBytes()); Thread.sleep(10000); }
NodeCache
: 一次注册,n次监听 -- 监听当前节点,对子节点无效!(监听节点可不存在)@Test public void nodeCacheWatchTest() throws Exception{ String watchPath=zkPath+"/c"; if(zkClient.checkExists().forPath(watchPath)==null) zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ccc".getBytes()); NodeCache nodeCache = new NodeCache(zkClient, watchPath); System.out.println("Add Listener..."); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("Trigger Watcher..."); ChildData data=nodeCache.getCurrentData(); if(data!=null){ System.out.println(data.getPath()); System.out.println(new String(data.getData())); System.out.println(data.getStat().getVersion()); }else { System.out.println("Empty!"); } } }); // start -- 注意:不管是否cacheData,启动不会触发watcher // nodeCache.start(); // 不cache节点数据 nodeCache.start(true); // cache节点数据 // check initial data ChildData data=nodeCache.getCurrentData(); if(data!=null){ System.out.println("节点初始化数据为:"); System.out.println(data.getPath()); System.out.println(new String(data.getData())); System.out.println(data.getStat().getVersion()); }else { System.out.println("节点初始化数据为空!"); } System.out.println("Begin Case 1 : watchPath -- trigger watcher"); // node data change -- always trigger event:NodeDataChanged zkClient.setData().forPath(watchPath,"ccc-ccc".getBytes()); Thread.sleep(2000); zkClient.setData().forPath(watchPath,"ccc-ddd".getBytes()); // delete node -- trigger event: NodeDeleted zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath); // create node -- trigger event: NodeCreated zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ccc".getBytes()); Thread.sleep(2000); System.out.println("Begin Case 2 : children -- no trigger"); // create children zkClient.create().creatingParentsIfNeeded().forPath(watchPath+"/01/02","Hello".getBytes()); // update children data zkClient.setData().forPath(watchPath+"/01","c01".getBytes()); // delete children zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath+"/01/02"); Thread.sleep(10000); if(nodeCache!=null) nodeCache.close(); }
PathChildrenCache
: 监听节点的一级子节点的CUD (Note: 监听节点不会触发watcher,监听节点可不存在; 若监听节点删除,则监听失效,子节点的变化将不会触发watcher )@Test public void pathChildenCacheWatchTest() throws Exception{ String watchPath=zkPath+"/d"; if(zkClient.checkExists().forPath(watchPath)==null){ zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ddd".getBytes()); } if(zkClient.checkExists().forPath(watchPath+"/0")==null){ zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath+"/0","00".getBytes()); } // cacheData:true/false PathChildrenCache childCache=new PathChildrenCache(zkClient, watchPath, true); System.out.println("Add Listener..."); childCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println(event.getType()); ChildData child=event.getData(); if(child!=null) System.out.println(child.getPath()+":"+new String(child.getData())+",version:"+child.getStat().getVersion()); if(event.getType().equals(Type.INITIALIZED)){ System.out.println("watcher到的节点初始化信息:"); List<ChildData> childDataList=event.getInitialData(); for(ChildData c:childDataList){ System.out.println(c.getPath()+":"+new String(c.getData())+",version:"+c.getStat().getVersion()); } } } }); System.out.println("Start Cache: "); // childCache.start(); //default: StartMode.NORMAL,异步初始化 -- initial data not trigger watcher // childCache.start(StartMode.POST_INITIALIZED_EVENT); // 异步初始化 -- initial trigger watcher: Type.INITIALIZED childCache.start(StartMode.BUILD_INITIAL_CACHE); // 同步初始化 -- initial not trigger watcher // Thread.sleep(2000); List<ChildData> childDataList= childCache.getCurrentData(); System.out.println("start 节点初始化信息:"); for(ChildData child:childDataList){ System.out.println(child.getPath()+":"+new String(child.getData())+",version:"+child.getStat().getVersion()); } System.out.println("Begin Case 1 : watchPath -- no trigger"); // node data change // zkClient.setData().forPath(watchPath,"ddd-ddd".getBytes()); // delete node -- Note: 会导致watcher失效 // zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath); // create node // zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ddd".getBytes()); // Thread.sleep(2000); System.out.println("Begin Case 2 : children -- trigger"); // create children -- trigger CHILD_ADDED -- Note: on trigger "/01" CHILD_ADD, won't trigger "/01/02" CHILD zkClient.create().creatingParentsIfNeeded().forPath(watchPath+"/01/02","Hello".getBytes()); // update children data -- trigger CHILD_UPDATED zkClient.setData().forPath(watchPath+"/01","d01".getBytes()); // delete children -- trigger CHILD_REMOVED zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath+"/01"); Thread.sleep(10000); if(childCache!=null) childCache.close(); }
TreeCache
: 监听节点和该节点的所有子节点 (创建时可设置maxDepth
)@Test public void treeCacheWatchTest() throws Exception{ String watchPath=zkPath+"/e"; if(zkClient.checkExists().forPath(watchPath)==null){ zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"eee".getBytes()); } if(zkClient.checkExists().forPath(watchPath+"/0")==null){ zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath+"/0","00".getBytes()); } System.out.println("Add Listener..."); TreeCache treeCache=new TreeCache(zkClient, watchPath); // default:not cacheData,initial data will trigger watcher: NODE_ADDED treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("Watch:"); System.out.println(event.getType()); ChildData childData=event.getData(); if(childData!=null) System.out.println(childData.getPath()+":"+new String(childData.getData())+",version:"+childData.getStat().getVersion()); } }); treeCache.start(); System.out.println("Initial Data..."); ChildData childData=treeCache.getCurrentData(watchPath); if(childData!=null) System.out.println("Current:"+childData.getPath()+":"+new String(childData.getData())+",version:"+childData.getStat().getVersion()); Map<String,ChildData> childMap=treeCache.getCurrentChildren(watchPath); if(childMap!=null){ for(String key:childMap.keySet()){ System.out.println("Child:"+key+":"+childMap.get(key)); } } System.out.println("Begin case 1 : watchPath -- trigger"); // node data change -- trigger NODE_UPDATED zkClient.setData().forPath(watchPath,"eee-eee".getBytes()); // delete node -- trigger NODE_REMOVED 2 times for path '/test/e/0','/test/e' zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath); // create node -- trigger NODE_ADDED zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"eee".getBytes()); Thread.sleep(2000); System.out.println("Begin Case 2 : children -- trigger"); // create children -- trigger NODE_ADDED 2 times for path: '/test/e/01','/test/e/01/02' zkClient.create().creatingParentsIfNeeded().forPath(watchPath+"/01/02","Hello".getBytes()); // update children data -- trigger NODE_UPDATED zkClient.setData().forPath(watchPath+"/01","d01".getBytes()); // delete children -- trigger NODE_REMOVED 2 times for path: '/test/e/01/02','/test/e/01' zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath+"/01"); Thread.sleep(10000); if(treeCache!=null) treeCache.close(); }
Demo: ACL
init
public class CuratorAclTest { public static String connectString="127.0.0.1:2181"; public static String namespace="micro"; public static String zkPath="/bb"; private CuratorFramework zkClient=null; private ACL aclAdmin=null; private ACL aclRW=null; private ACL aclCD=null; /* * Perms.ADMIN -- 可以修改节点权限(setAcl) * Perms.READ -- 可读取节点(ls,get) * Perms.WRITE -- 可修改节点内容 (set) * Perms.CREATE -- 可创建节点 (create) * Persm.DELETE -- 可删除节点 (delete) */ @Before public void init() throws NoSuchAlgorithmException{ Id id0=new Id("digest",DigestAuthenticationProvider.generateDigest("id00:admin")); Id id1=new Id("digest",DigestAuthenticationProvider.generateDigest("id01:12345")); Id id2=new Id("digest",DigestAuthenticationProvider.generateDigest("id02:12345")); aclAdmin=new ACL(Perms.ADMIN, id0); aclRW=new ACL(Perms.READ|Perms.WRITE,id1); aclCD=new ACL(Perms.CREATE|Perms.DELETE,id2); } @After public void close(){ if(zkClient!=null){ zkClient.close(); CuratorFrameworkState state=zkClient.getState(); System.out.println("Close zkClient :"+state.name()); } } /* ... */ }
ACL
aclProvider
/setACL
: same as zkClient cmdsetAcl path acl
@Test public void aclProviderTest() throws Exception{ RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3); zkClient=CuratorFrameworkFactory.builder() //.aclProvider(new DefaultACLProvider()) // default .aclProvider(new ACLProvider() { // Note: 在节点创建时触发调用,为新建的节点设置权限 @Override public List<ACL> getDefaultAcl() { System.out.println("setting default acl..."); return ZooDefs.Ids.OPEN_ACL_UNSAFE; } @Override public List<ACL> getAclForPath(String path) { System.out.println("setting acl..."); // 为zkPath节点和其以下的所有子节点设置acl if(path.startsWith("/"+namespace+zkPath)){ // note: namespace+zkPath System.out.println("setting aclAdmin,aclRW,aclCD..."); return Arrays.asList(aclAdmin,aclRW,aclCD); } else return ZooDefs.Ids.OPEN_ACL_UNSAFE; } }) .connectString(connectString) .retryPolicy(retryPolicy) .sessionTimeoutMs(100000) .connectionTimeoutMs(100000) .namespace(namespace) .build(); zkClient.start(); CuratorFrameworkState state=zkClient.getState(); System.out.println("Init zkClient :"+state.name()); // case 1 -- Create -zkPath --> Success aclCreateTest(); // case 2 -- Read and Write -zkPath --> NoAuthException!! // aclReadWriteTest(); // case 3 -- Delete -zkPath --> NoAuthException!! aclDeleteTest(); // case 4 -- Create -zkPath --> Success aclCreateTest(); System.out.println("child case..."); // case 5 -- Delete and Create -zkPath child --> NoAuthException!! // aclDeleteChildTest(); // aclCreateChildTest(); // case 6 -- Read and Write -zkPath child --> NoAuthException!! // aclReadWriteChildTest(); }
authorization: same as zkClient cmd
addauth sechema auth
( eg: addauth digest id02:12345 )@Test public void authorizationTest() throws Exception{ List<AuthInfo> authInfos = new ArrayList<AuthInfo>(); authInfos.add(new AuthInfo("digest", "id00:admin".getBytes())); // admin:修改权限认证 // authInfos.add(new AuthInfo("digest", "id01:12345".getBytes())); // rw:读写节点认证 authInfos.add(new AuthInfo("digest", "id02:12345".getBytes())); // cr:增删节点认证 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3); zkClient=CuratorFrameworkFactory.builder() // Note: // 不设置aclProvider,等同.aclProvider(new DefaultACLProvider()),即使用ZooDefs.Ids.OPEN_ACL_UNSAFE // 对于新创建的节点,acl为ZooDefs.Ids.OPEN_ACL_UNSAFE // .authorization(authInfos) // 附上访问权限信息AuthInfo,用于访问节点认证 .connectString(connectString) .retryPolicy(retryPolicy) .sessionTimeoutMs(100000) .connectionTimeoutMs(100000) .namespace(namespace) .build(); zkClient.start(); CuratorFrameworkState state=zkClient.getState(); System.out.println("Init zkClient :"+state.name()); //get acl -- 无需权限 getACLTest(); // set acl -zkPath // --> 拥有admin权限认证时才可 setACLTest(); // getChildACLTest(); // case 1: read & write -zkPath // --> 需拥有rw权限 aclReadWriteTest(); // case 2: create -zkPath child // --> 需拥有c权限才可创建,新创建的节点未设置acl,则使用默认acl{world,anyone} aclCreateChildTest(); // case 3: read & write -zkPath child aclReadWriteChildTest(); // case 4: delete -zkPath child aclDeleteChildTest(); // case 4: delete -zkPath // --> 需拥有d权限才可删除 aclDeleteTest(); }
Set/Get ACL
getACL (无需认权限证)
public void getACLTest() throws Exception{ if(zkClient.checkExists().forPath(zkPath)==null){ System.out.println(zkPath+" not exist!"); return; } System.out.println("list acl:"); List<ACL> aclList=zkClient.getACL().forPath(zkPath); for(ACL acl:aclList) System.out.println(acl); } public void getChildACLTest() throws Exception{ System.out.println("list child acl:"); List<String> childList=zkClient.getChildren().forPath(zkPath); for(String child:childList){ List<ACL> aclList=zkClient.getACL().forPath(zkPath+"/"+child); System.out.println(child+" acl: "); for(ACL acl:aclList) System.out.println("\t"+acl); } }
- setACL
public void setACLTest() throws Exception{ if(zkClient.checkExists().forPath(zkPath)!=null){ System.out.println("Set ACL"); Stat stat=zkClient.setACL().withACL(Arrays.asList(aclAdmin,aclRW,aclCD)).forPath(zkPath); System.out.println(stat.getVersion()); }else{ System.out.println("Create and Set ACL"); zkClient.create().creatingParentsIfNeeded().withACL(Arrays.asList(aclAdmin,aclRW,aclCD)).forPath(zkPath,"access test".getBytes()); } getACLTest(); }
CRUD : call to test
/* Current Path */ public void aclReadWriteTest() throws Exception{ System.out.println("aclReadWriteTest...."); if(zkClient.checkExists().forPath(zkPath)==null){ System.out.println(zkPath+" not exist!"); return; } //read System.out.println("read..."+zkPath); byte[] data=zkClient.getData().forPath(zkPath); if(data!=null) System.out.println(new String(data)); //write System.out.println("write..."+zkPath); Stat stat=zkClient.setData().forPath(zkPath,"access test-acl".getBytes()); System.out.println(stat.getVersion()); } public void aclCreateTest() throws Exception{ System.out.println("acl create test..."); if(zkClient.checkExists().forPath(zkPath)!=null){ System.out.println(zkPath+" already exist!"); }else{ //create System.out.println("create..."+zkPath); zkClient.create().creatingParentsIfNeeded().forPath(zkPath,"access test-new".getBytes()); } //print acl System.out.println("list acl..."+zkPath); List<ACL> aclList=zkClient.getACL().forPath(zkPath); for(ACL acl:aclList){ System.out.println(acl); } } public void aclDeleteTest() throws Exception{ System.out.println("aclDeleteTest...."); if(zkClient.checkExists().forPath(zkPath)==null){ System.out.println(zkPath+" not exist!"); }else{ //delete System.out.println("delete..."+zkPath); zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath); } } /* Children CRUD */ public void aclReadWriteChildTest() throws Exception{ System.out.println("aclReadWriteChildTest...."); String accessPath=zkPath+"/a"; //read System.out.println("read..."+accessPath); byte[] data=zkClient.getData().forPath(accessPath); if(data!=null) System.out.println(new String(data)); //write System.out.println("write..."+accessPath); Stat stat=zkClient.setData().forPath(accessPath,"aaa-acl".getBytes()); System.out.println(stat.getVersion()); } public void aclCreateChildTest() throws Exception{ System.out.println("aclCreateChildTest...."); String accessPath=zkPath+"/a"; if(zkClient.checkExists().forPath(accessPath)!=null){ System.out.println(accessPath+" already exist!"); }else{ //create System.out.println("create..."+accessPath); zkClient.create().creatingParentsIfNeeded().forPath(accessPath,"aaa-new".getBytes()); } //print acl System.out.println("list acl..."+accessPath); List<ACL> aclList=zkClient.getACL().forPath(accessPath); for(ACL acl:aclList){ System.out.println(acl); } } public void aclDeleteChildTest() throws Exception{ System.out.println("aclDeleteChildTest...."); String accessPath=zkPath+"/a"; if(zkClient.checkExists().forPath(accessPath)==null){ System.out.println(accessPath+" not exist!"); }else{ //delete System.out.println("delete..."+accessPath); zkClient.delete().deletingChildrenIfNeeded().forPath(accessPath); } }
一个应用示例
Client端监控文件增删,实现文件从Server端到Client端的同步
Summary
Two Parts:
- Part 1: FileServer -> CRUD files
Part 2: ClientUser -> Auto download from FileServer / delete local file depends on the files updates on FileServer.
admin(
/admin
): FileServer- upload file: POST
/images
-> upload(MultipartFile photo)- save file with name
{image.id}.{image.type}
on local fileLocation - save file information on DB
- add zk node:
/{image.id}_{PERSISTENT_SEQUENTIAL}
:ADD:{image.id}:{image.type}:{image.originalName}
- save file with name
- delete file: DEL
images/{id}
-> delete(id)- delete file information on DB
- add zk node:
/{image.id}_{PERSISTENT_SEQUENTIAL}
:DEL:{image.id}:{image.type}:{image.originalName}
- list all files information: GET
/images
-> list() - get file information by id: GET
/images/{id}
-> get(id) - config:
- fileLocation:
/Users/cj/space/java/admin-uploads
- zk
- server: 127.0.0.1:2181
- namespace: demo
- fileLocation:
- upload file: POST
user(
/user
): Client User- get local file: GET
/images/{filename}
-> get(filename) - zk listen
/
:CHILD_ADDED
:ADD:{filename}:{extension}:{originalName}
-> download file from fileServer to local,then del this child nodeDEL:{filename}:{extension}:{originalName}
-> delete local file,then del this child node
- config:
- local fileLocation
/Users/cj/space/java/user-downloads
- remote fileServer
http://localhost:8080/zookeeper-demo/admin/images
- zk
- server: 127.0.0.1:2181
- namespace: demo
- local fileLocation
- get local file: GET
verify:
# upload: POST /admin/images $ curl -i -F 'photo=@/Users/cj/Pictures/design/极光2.jpg' -X POST http://localhost:8080/zookeeper-demo/admin/images # list: GET /admin/images $ curl -i -H "Content-Type: application/json" http://localhost:8080/zookeeper-demo/admin/images # get: GET /admin/images/{id} $ curl -i -H "Content-Type: application/json" http://localhost:8080/zookeeper-demo/admin/images/1d0fa647-9dbe-410a-8d9c-0e1c973a98e2 # delete: DELETE /admin/images/{id} $ curl -i -H "Content-Type: application/json" -X DELETE http://localhost:8080/zookeeper-demo/admin/images/1d0fa647-9dbe-410a-8d9c-0e1c973a98e2 # visit: # /user/images/1d0fa647-9dbe-410a-8d9c-0e1c973a98e2.jpg # More (for test) # /light.jpg # /admin/images # /admin/images/test1/虫洞.gif # /admin/images/test2/极光1.jpg
Dependencies
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Curator (include zookeeper) -->
<!-- Note: need to change zookeeper version,the beta version zookeeper has issues -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- FileUpload -->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
Admin(FileServer)
application.yml
server: port: 8080 servlet: context-path: /zookeeper-demo zk: server: 127.0.0.1:2181 namespace: demo admin: fileLocation: /Users/cj/space/java/admin-uploads
ImageAdminController
@RestController @RequestMapping("/admin/images") public class ImageAdminController { @Value("${admin.fileLocation}") private String fileLocation; @Autowired private ImageService imageService; @Resource(name="zkService") private ZkCuratorService zkService; @PostMapping public Object upload(@RequestParam(value="photo") MultipartFile photo) throws Exception{ System.out.println("Name:"+photo.getName()); // photo System.out.println("OriginalFilename:"+photo.getOriginalFilename()); // 极光2.jpg System.out.println("Size:"+photo.getSize()); // 186316 System.out.println("ContentType:"+photo.getContentType()); // image/jpeg Image image=new Image(UUID.randomUUID().toString(), photo.getOriginalFilename(),"Active"); String destFilePath=this.fileLocation+"/"+image.getId()+"."+image.getType(); System.out.println("dest:"+destFilePath); FileUtils.copyToFile(photo.getInputStream(), new File(destFilePath)); boolean result=this.imageService.save(image); // zookeeper String data="ADD:"+image.getId()+":"+image.getType()+":"+image.getOriginalName(); String zkPath=zkService.create("/"+image.getId(), data.getBytes()); System.out.println("create zkPath(for ADD):"+zkPath); return ResponseEntity.ok(result); } @DeleteMapping("/{id}") public Object delete(@PathVariable("id") String id) throws Exception{ Image image=this.imageService.delete(id); // zookeeper if(image!=null){ String data="DEL:"+image.getId()+":"+image.getType()+":"+image.getOriginalName(); String zkPath=zkService.create("/"+image.getId(), data.getBytes()); System.out.println("create zkPath(for DEL):"+zkPath); } return ResponseEntity.ok(image!=null); } @GetMapping public Object list(){ return ResponseEntity.ok(this.imageService.list()); } @GetMapping("/{id}") public Object get(@PathVariable("id")String id) throws IOException{ Image image=this.imageService.get(id); if(image==null || !"Active".equals(image.getStatus()) || StringUtils.isEmpty(image.getType()) || "Unknow".equals(image.getType())) return ResponseEntity.ok("Not Available!"); FileInputStream in=new FileInputStream(this.fileLocation+"/"+image.getId()+"."+image.getType()); return ResponseEntity.ok() .contentType(MediaType.IMAGE_JPEG) .contentType(MediaType.IMAGE_PNG) .contentType(MediaType.IMAGE_GIF) .body(IOUtils.toByteArray(in)); } /* ------ For Test: -------- */ @GetMapping(value="/test1/{name}" ,produces={MediaType.IMAGE_JPEG_VALUE,MediaType.IMAGE_PNG_VALUE,MediaType.IMAGE_GIF_VALUE}) public Object getImageByName1(@PathVariable("name") String filename) throws IOException{ String filePath="/Users/cj/Pictures/design"; FileInputStream in = new FileInputStream(filePath+"/"+filename); return IOUtils.toByteArray(in); // or ResponseEntity.ok(IOUtils.toByteArray(in)); } @GetMapping(value="/test2/{name}") public Object getImageByName2(@PathVariable("name") String filename) throws IOException{ String filePath="/Users/cj/Pictures/design"; FileInputStream in = new FileInputStream(filePath+"/"+filename); return ResponseEntity.ok() .contentType(MediaType.IMAGE_JPEG) .contentType(MediaType.IMAGE_PNG) .contentType(MediaType.IMAGE_GIF) .body(IOUtils.toByteArray(in)); } @PostConstruct private void init() throws Exception{ System.out.println("init:"+this.fileLocation); FileUtils.forceMkdir(new File(this.fileLocation)); String id="1d0fa647-9dbe-410a-8d9c-0e1c973a98e2"; Image image=new Image(id,"light.jpg","Active"); FileUtils.copyFile(new File("/Users/cj/Pictures/design/极光1.jpg"), new File(this.fileLocation+"/"+id+".jpg")); this.imageService.save(image); // zookeeper String data="ADD:"+image.getId()+":"+image.getType()+":"+image.getOriginalName(); String zkPath=zkService.create("/"+image.getId(), data.getBytes()); System.out.println("create zkPath(for ADD):"+zkPath); } @PreDestroy private void clear() throws Exception{ FileUtils.cleanDirectory(new File(this.fileLocation)); // zookeeper zkService.delete("/"); } }
main
@SpringBootApplication @Configuration public class App { public static void main( String[] args ){ SpringApplication.run(App.class, args); } @Bean(name="zkService") public ZkCuratorService zkService(){ return new ZkCuratorService(); } }
User(ClientUser)
application.yml
server: port: 8090 servlet: context-path: /zookeeper-demo zk: server: 127.0.0.1:2181 namespace: demo user: fileLocation: /Users/cj/space/java/user-downloads fileServer: http://localhost:8080/zookeeper-demo/admin/images
ImageUserController
@RestController @RequestMapping("/user/images") public class ImageUserController { @Value("${user.fileLocation}") private String fileLocation; @Value("${user.fileServer}") private String fileServer; @Resource(name="zkClientService") // @Resource(name="zkService") private ZkCuratorService zkService; @GetMapping("/{filename}") public Object get(@PathVariable("filename") String filename) throws IOException{ FileInputStream in=new FileInputStream(this.fileLocation+"/"+filename); return ResponseEntity.ok() .contentType(MediaType.IMAGE_JPEG) .contentType(MediaType.IMAGE_PNG) .contentType(MediaType.IMAGE_GIF) .body(IOUtils.toByteArray(in)); } @PostConstruct public void init() throws Exception{ PathChildrenCacheListener listener=new PathChildrenCacheListener(){ @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("Get Event:"+event.getType()); if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ ChildData child=event.getData(); if(child==null) return; String data=new String(child.getData()); String array[]=data.split(":"); String filename= array[1]+"."+array[2]; System.out.println(child.getPath()+":"+data); if("ADD".equals(array[0])){ try{ Thread.sleep(3000); // ! for test and connect refused exception FileUtils.copyURLToFile(new URL(fileServer+"/"+filename) , new File(fileLocation+"/"+filename)); zkService.delete(child.getPath()); }catch(ConnectException ex){ System.out.println(ex.getMessage()); } }else if("DEL".equals(array[0])){ try{ FileUtils.forceDelete(new File(fileLocation+"/"+filename)); }catch(FileNotFoundException ex){ System.out.println("not exist:"+fileLocation+"/"+filename); } zkService.delete(child.getPath()); } } } }; zkService.addPathChildrenWatcher("/", true, listener); } @PreDestroy public void clear() throws Exception{ FileUtils.cleanDirectory(new File(this.fileLocation)); // zookeeper zkService.delete("/"); } }
main
@SpringBootApplication @Configuration public class App { public static void main( String[] args ){ SpringApplication.run(App.class, args); } @Bean(name="zkClientService") public ZkCuratorService zkClientService(){ return new ZkCuratorService(); } }
Common: Service & Entity
ImageService
@Service public class ImageService { private final ConcurrentMap<String,Image> images=new ConcurrentHashMap<String,Image>(); public boolean save(Image image){ return images.put(image.getId(),image)==null; } public Image delete(String id){ Image image=images.get(id); if(image==null) return null; image.setStatus("InActive"); images.replace(id, image); return image; } public Image get(String id){ return images.get(id); } public Collection<Image> list(){ return images.values(); } }
ZkCuratorService
public class ZkCuratorService { private CuratorFramework zkClient; @Value("${zk.server}") private String connectString; @Value("${zk.namespace}") private String namespace; private PathChildrenCache childCache; public CuratorFramework getZkClient() { return zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @PostConstruct public void init(){ System.out.println("ZK Service init"); RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3); zkClient=CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(retryPolicy) .sessionTimeoutMs(10000) .connectionTimeoutMs(10000) .namespace(namespace) .build(); zkClient.start(); } @PreDestroy public void close(){ if(zkClient!=null) zkClient.close(); } public void addPathChildrenWatcher(String watchPath,boolean createIfNotExist,PathChildrenCacheListener listener) throws Exception{ if(createIfNotExist){ if(zkClient.checkExists().forPath(watchPath)==null){ zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath); } } childCache = new PathChildrenCache(zkClient, watchPath, true); System.out.println("Add Listener..."); childCache.getListenable().addListener(listener); System.out.println("Start Cache..."); childCache.start(StartMode.POST_INITIALIZED_EVENT); // 异步初始化 -- initial trigger watcher: Type.INITIALIZED } public String create(String path,byte[] data) throws Exception{ return zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT_SEQUENTIAL) .forPath(path,data); } public void delete(String path) throws Exception{ zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); } public Stat update(String path,byte[] data) throws Exception{ return zkClient.setData().forPath(path,data); } public Stat checkExists(String path) throws Exception{ return zkClient.checkExists().forPath(path); } }
Entity: Image
public class Image { private String id; private String originalName; private String type; private String status; public Image(){} public Image(String id,String originalName,String status){ this.id=id; this.originalName=originalName; this.status=status; int pos=originalName.lastIndexOf("."); this.type=(pos!=-1?originalName.substring(pos+1):"Unknow"); } /* getter & setter .... */ }