一致性服务 Zookeeper(CuratorFramework)

CuratorFramework

Summary

  1. dependency: curator-recipes (included zookeeper)

     <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
         <version>4.0.1</version>
         <exclusions>
             <exclusion>
                  <groupId>org.apache.zookeeper</groupId>
                  <artifactId>zookeeper</artifactId>
             </exclusion>
         </exclusions>
     </dependency>
     <dependency>
         <groupId>org.apache.zookeeper</groupId>
         <artifactId>zookeeper</artifactId>
         <version>3.4.6</version>
         <!-- <exclusions>
             <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-api</artifactId>
             </exclusion>
             <exclusion>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-log4j12</artifactId>
             </exclusion>
         </exclusions> -->
     </dependency>
    
    • Change zookeeper version ( beta version may have issues.)
  2. CuratorFramework zkClient = CuratorFrameworkFactory.builder()...build(), key properties:

    • connectString 服务器列表,格式host1:port1,host2:port2
    • namespace 命名空间
    • retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口(eg: ExponentialBackoffRetry(3000, 3))
    • sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
    • connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms
  3. zkClient.start(),zkClient.close()

  4. CRUD

    • Create: create()
      • creatingParentContainersIfNeeded() 自动递归创建所有所需的父节点
      • withMode(CreateMode.xxx) 指定创建模式
        • CreateMode.PERSISTENT 持久化 (default)
        • CreateMode.PERSISTENT_SEQUENTIAL 持久化并且带序列号
        • CreateMode.EPHEMERAL 临时
        • CreateMode.EPHEMERAL_SEQUENTIAL 临时并且带序列号
      • withACL(Ids.xxx)
        • Ids.OPEN_ACL_UNSAFE (default)
        • Ids.CREATOR_ALL_ACL
        • Ids.READ_ACL_UNSAFE
        • Ids.ANYONE_ID_UNSAFE
        • Ids.AUTH_IDS
      • eg:
        • String createdPath=zkClient.create().creatingParentsIfNeeded().forPath(path,dataBytes)
        • String createdPath=zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path,dataBytes)
    • Read: getData()
      • 不存在则抛出NoNodeException
      • 返回值是byte[]
      • storingStatIn(stat) 获取到该节点的stat
      • eg:
        • byte[] data=zkClient.getData().forPath(path)
        • byte[] data=zkClient.getData().storingStatIn(stat).forPath(path)
    • Update: setData()
      • 不存在则抛出NoNodeException
      • 返回一个Stat实例
      • withVersion(num) 强制指定版本进行更新
      • eg:
        • Stat stat=zkClient.setData().forPath(path,dataBytes)
        • Stat stat=zkClient.setData().withVersion(0).forPath(path,dataBytes) (Version不匹配则抛出BadVersionException)
    • Delete: delete()
      • 不存在则抛出NoNodeException
      • 只能删除叶子节点
      • guaranteed() 保障措施, 如果删除失败,那么在后端还是继续会删除,直到成功
      • deletingChildrenIfNeeded() 递归删除其所有的子节点
      • withVersion(num) 强制指定版本进行删除
      • eg:
        • zkClient.delete().guaranteed().deletingChildrenIfNeeded().withVersion(12).forPath(path)
    • More:
      • checkExists() 检查节点是否存在,不存在则返回为null,eg:
        • Stat stat=zkClient.checkExists().forPath(path)
        • client.checkExists().creatingParentContainersIfNeeded().forPath(path)
      • getChildren() 获取某个节点的所有子节点路径,eg:
        • List<String> childrenPathList=zkClient.getChildren().forPath(zkPath)
  5. Watch

    • usingWatcher(watcher):

      • Target: 监听当前节点,对子节点无效!
      • Times: 只触发一次,监听完毕后就销毁
      • Watcher: interface CuratorWatcher
          public interface CuratorWatcher{
              public void process(WatchedEvent event) throws Exception;    // event.getPath(),getType(),getState(),...
          }
        
      • Usage:
        • getData().usingWatcher(watcher).forPath(watchPath)
          • 监听节点(watchPath)需存在,否则报NoNodeException
          • 可监听到的EventType: NodeDeleted,NodeDataChanged
        • checkExists().usingWatcher(watcher).forPath(watchPath)
          • 监听节点(watchPath)可不存在
          • 可监听到的EventType: NodeCreated,NodeDeleted,NodeDataChanged
    • NodeCache:

      • Target: 监听当前节点,对子节点无效!
      • Times: 一次注册,n次监听 (监听节点可不存在)
      • Listener: interface NodeCacheListener
          public interface NodeCacheListener{
              public void nodeChanged() throws Exception;
          }
        
      • Usage:
          NodeCache nodeCache = new NodeCache(zkClient, watchPath);
          nodeCache.getListenable().addListener(new NodeCacheListener() {
              @Override
              public void nodeChanged() throws Exception {
                  // ChildData data=nodeCache.getCurrentData();     ChildData#getPath(),getData(),getStat() -- get watchPath node information
              }
          });
          nodeCache.start(true);                         // 启动(buildInitial: true/false), 注:启动不会触发watcher
          ChildData data=nodeCache.getCurrentData();    // get watchPath node information
        
    • PathChildrenCache:

      • Target: 监听节点的一级子节点的CUD,注:
        • 监听节点不会触发watcher,监听节点可不存在
        • 若监听节点删除,则监听失效,子节点的变化将不会触发watcher
      • Times: 一次注册,n次监听
      • Listener: interface PathChildrenCacheListener
          public interface PathChildrenCacheListener{
               public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
          }
        
      • Usage:

          PathChildrenCache pathChildrenCache=new PathChildrenCache(zkClient, watchPath, true);    // cacheData:true/false (true则Client能够获取到节点数据内容)
          pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
              @Override
              public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                  // event.getType()            // Type.CHILD_ADDED,CHILD_UPDATED,CHILD_REMOVED,CONNECTION_SUSPENDED,CONNECTION_RECONNECTED,CONNECTION_LOST,INITIALIZED
                  // event.getData()            // ChildData, ChildData#getPath,getData,getStat    -- 获取触发Event的节点信息
                  // event.getInitialData()    // List<ChildData>     -- get initial data when triggered Type.INITIALIZED                
              }
          });
          pathChildrenCache.start(StartMode.BUILD_INITIAL_CACHE);                        // 启动
          // StartMode 为初始的cache设置暖场方式
          // StartMode.NORMAL                        异步初始化(default mode), Note: initial not trigger watcher
          // StartMode.POST_INITIALIZED_EVENT        异步初始化, Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
          // StartMode.BUILD_INITIAL_CACHE        同步初始化, start方法返回之前调用rebuild(), Note: initial not trigger watcher
        
          List<ChildData> childDataList= pathChildrenCache.getCurrentData();            //  get watch node (watchPath's child nodes) information
        
    • TreeCache:
      • Target: 监听节点和该节点的所有子节点 (创建时可设置maxDepth
      • Times: 一次注册,n次监听
      • Listener: interface TreeCacheListener
          public interface TreeCacheListener{
              public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception;
          }
        
      • Usage:
          TreeCache treeCache=new TreeCache(zkClient, watchPath,false);        // cacheData:true/false
          treeCache.getListenable().addListener(new TreeCacheListener() {
              @Override
              public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                  // event.getType()        Type.NODE_ADDED,NODE_UPDATED,NODE_REMOVED,CONNECTION_SUSPENDED,CONNECTION_RECONNECTED,CONNECTION_LOST,INITIALIZED
                  // event.getData()         ChildData,ChildData#getPath,getData,getStat -- 获取触发Event的节点信息
              }
          });
          treeCache.start();                                                            // 启动
          ChildData childData=treeCache.getCurrentData(watchPath);                    // get watch node (watchPath node) information
          Map<String,ChildData> childMap=treeCache.getCurrentChildren(watchPath);        // get watch node (watchPath's child nodes) information
        
  6. ACL 权限控制

    • 使用:schema:id:permission 来标识
      • schema:id
        • schema: 权限模式(鉴权的策略)
        • id: 授权对象,即权限赋予的用户或者一个实体
          schema id
          world 只有一个ID:anyone (任何人都拥有所有权限)
          ip ip地址或ip段
          auth "", 使用已添加认证的用户认证,eg: digest:username:password
          digest 用户名:加密密码 通常是username:BASE64(SHA-1(username:password))
      • permission: 权限
        • CREATE : c 可以创建子节点
        • DELETE : d 可以删除子节点(仅下一级节点)
        • READ : r 可以读取节点数据及显示子节点列表
        • WRITE : w 可以设置节点数据
        • ADMIN : a 可以设置节点访问控制列表权限
    • 特性:

      • ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
      • 每个znode支持设置多种权限控制方案和多个权限
      • 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点
    • cmd示例:

      • world
          # setAcl <path> world:anyone:<permission>
          $ setAcl /node1 world:anyone:cdrwa
          $ getAcl /node1
        
      • ip
          # setAcl <path> ip:<ip>:<permission>
          $ setAcl /node2 ip:192.168.100.1:cdrwa
          $ getAcl /node2
        
      • auth
          # addauth digest <user>:<password>                 # 添加认证用户
          # setAcl <path> auth:<user>:<permission>         # 设置ACL
          $ addauth digest tom:123456
          $ setAcl /node3 auth:tom:cdrwa
        
      • digest
          # echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64   # password:经SHA1及BASE64处理的密文
          # setAcl <path> digest:<user>:<password>:<permission>
          $ echo -n tom:123456 | openssl dgst -binary -sha1 | openssl base64
          3YvKnq60bERLJOlabQFeB1f+/n0=
          $ setAcl /node4 digest:tom:3YvKnq60bERLJOlabQFeB1f+/n0=:cdrwa
        
    • Curator

      • new ACL(perms,id)

        • parameter1: Perms.xxx
          • Perms.ADMIN -- 可以修改节点权限(setAcl)
          • Perms.READ -- 可读取节点(ls,get)
          • Perms.WRITE -- 可修改节点内容 (set)
          • Perms.CREATE -- 可创建节点 (create)
          • Perms.DELETE -- 可删除节点 (delete)
        • parameter2: new org.apache.zookeeper.data.Id(String schema,String id)
        • eg:
            Id id1=new Id("digest",DigestAuthenticationProvider.generateDigest("id01:12345"));
            ACL aclRW=new ACL(Perms.READ|Perms.WRITE,id1);
          
        • Util: org.apache.zookeeper.ZooDefs.Ids 提供一些常用的Id和ACL实例
          • ANYONE_ID_UNSAFE = new Id("world", "anyone")
          • AUTH_IDS = new Id("auth", "")
          • OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)))
          • CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)))
          • READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)))
      • aclProvider(aclProvider) same as zkClient cmd setAcl path acl

          public interface ACLProvider{
              public List<ACL> getDefaultAcl();
              public List<ACL> getAclForPath(String path);
          }
        
        • aclProvider(new DefaultACLProvider()) default,使用ZooDefs.Ids.OPEN_ACL_UNSAFE
        • aclProvider(new ACLProvider(){ ... }) override getDefaultAcl,getAclForPath
      • authorization(authInfoList) : same as zkClient cmd addauth sechema auth ( eg: addauth digest id02:12345 )
        • new AuthInfo(String scheme, byte[] auth)
        • eg: new AuthInfo("digest", "id01:12345".getBytes())
      • withACL(): same as zkClient cmd setAcl path acl
        • setACL().withACL(aclList).forPath(path)
        • create().withACL(aclList).forPath(path)
      • Note:
        • setACL() 需要Admin权限 (Perms.ADMIN)
        • getACL() 无需认权限证

Demo: Start

public class CuratorTest {
    public static String connectString="127.0.0.1:2181";
    public static String namespace="micro";
    public static String zkPath="/test";
    private CuratorFramework zkClient=null;

    @Before
    public void init(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
        zkClient=CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(100000)
                .connectionTimeoutMs(100000)
                .namespace(namespace)
                .build();
        zkClient.start();
        CuratorFrameworkState state=zkClient.getState();
        System.out.println("Init zkClient :"+state.name());
    }

    @After
    public void close(){
        if(zkClient!=null){
            zkClient.close();
            CuratorFrameworkState state=zkClient.getState();
            System.out.println("Close zkClient :"+state.name());
        }
    }

    /*....*/
}

Demo: CRUD

  1. Create

    • create().creatingParentsIfNeeded().forPath(path,data)

        @Test
        public void createTest() throws Exception{
            String result=zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)    //default: PERSISTENT
                    .withACL(Ids.OPEN_ACL_UNSAFE)        //default: Ids.OPEN_ACL_UNSAFE
                    .forPath(zkPath+"/a","aaa".getBytes())
                    ;
      
            System.out.println("create: "+result);    // print created path
        }
      
        @Test
        public void createChildrenTest() throws Exception {
            zkClient.create().creatingParentsIfNeeded().forPath(zkPath+"/a/01/02");
        }
      
    • checkExists().forPath(path)

        @Test
        public void checkExistTest() throws Exception{
            Stat stat=zkClient.checkExists().forPath(zkPath+"/a");
            System.out.println(stat);
      
            // 如果不存在则为空
            stat=zkClient.checkExists().forPath(zkPath+"/aa");    //null
            System.out.println(stat);
        }
      
  2. Read

    • getData().forPath(path)

        @Test
        public void getDataTest() throws Exception{
      
            byte[] data=zkClient.getData().forPath(zkPath+"/a");
            System.out.println(new String(data));
      
            // NoNodeException
            data=zkClient.getData().forPath(zkPath+"/aa");
            if(data!=null)
                System.out.println(new String(data));
        }
      
    • getData().storingStatIn(stat).forPath(path)

        @Test
        public void getStatTest() throws Exception{
            Stat stat = new Stat();
            byte[] data=zkClient.getData().storingStatIn(stat).forPath(zkPath+"/a");
            if(data!=null){
                System.out.println("data: "+new String(data));
                System.out.println("version: "+stat.getVersion());
            }
      
            //NoNodeException
            data=zkClient.getData().storingStatIn(stat).forPath(zkPath+"/aa");    
            if(data!=null){
                System.out.println("data: "+new String(data));
                System.out.println("version: "+stat.getVersion());
            }
        }
      
    • getChildren().forPath(path)
        @Test
        public void getChildrenTest() throws Exception{
            List<String> children=zkClient.getChildren().forPath(zkPath);
            for(String child:children){
                byte[] data=zkClient.getData().forPath(zkPath+"/"+child);    // note: need add parentPath
                System.out.println(child+":"+new String(data));
            }
        }
      
  3. Update

    • setData().forPath(path,data)

        @Test
        public void updateTest() throws Exception{
            Stat stat=zkClient.setData().forPath(zkPath+"/a","aaa-aaa".getBytes());
            System.out.println(stat.getVersion());
      
            byte[] data=zkClient.getData().forPath(zkPath+"/a");
            System.out.println(new String(data));
      
            //BadVersionException
            stat=zkClient.setData().withVersion(0).forPath(zkPath+"/a","aaa-bbb".getBytes());    
            System.out.println(stat.getVersion());
      
            //NoNodeException
            //zkClient.setData().forPath(zkPath+"/a/01","a01".getBytes());
        }
      
  4. Delete

    • delete().guaranteed().deletingChildrenIfNeeded().forPath(path)

        @Test
        public void deleteTest() throws Exception{
      
            zkClient.delete()
                .guaranteed()                    // 如果删除失败,那么在后端还是继续会删除,直到成功
                .deletingChildrenIfNeeded()        // 如果有子节点,就删除
                .forPath(zkPath+"/a/01");
      
            Stat stat=zkClient.checkExists().forPath(zkPath+"/a/01");
            System.out.println(zkPath+"/a/01 :"+(stat!=null?"Exist":"NotExist"));
      
            stat=zkClient.checkExists().forPath(zkPath+"/a/01/02");
            System.out.println(zkPath+"/a/01/02 :"+(stat!=null?"Exist":"NotExist"));
      
            // NoNodeException
            zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(zkPath+"/aa");    
        }
      

Demo: Watch

  1. usingWatcher: 监听只会触发一次,监听完毕后就销毁,且对子节点无效!(getData(),checkExists() 方法可追加 Watcher)

    • getData().usingWatcher(new CuratorWatcher(){ ... } ).forPath(watchPath): 只监听该节点的变化(可监听到 NodeDeleted,NodeDataChanged),监听节点(watchPath)需存在,不然会报NoNodeException

        @Test
        public void usingWatcherTest() throws Exception{
      
            Stat stat=zkClient.checkExists().forPath(zkPath+"/b");
            if(stat==null)
                zkClient.create().creatingParentContainersIfNeeded().forPath(zkPath+"/b","bbb".getBytes());
      
            zkClient.getData().usingWatcher(new CuratorWatcher() {
                @Override
                public void process(WatchedEvent event) throws Exception {
                    System.out.println("Path:"+event.getPath());
                    System.out.println("Type:"+event.getType());
                    System.out.println("State:"+event.getState());
                    if(!event.getType().equals(EventType.NodeDeleted)){
                        byte[] data=zkClient.getData().forPath(event.getPath());
                        if(data!=null)
                            System.out.println("Data:"+new String(data));
                    }
                }
            }).forPath(zkPath+"/b");
      
            // case 1: node data change -- only trigger once
            // first time - trigger event: NodeDataChanged
            // zkClient.setData().forPath(zkPath+"/b","bbb-bbb".getBytes());
            // second time - no watcher trigger
            // zkClient.setData().forPath(zkPath+"/b","bbb-ccc".getBytes());    
      
            // case 2: create children -- no trigger
            // zkClient.create().creatingParentsIfNeeded().forPath(zkPath+"/b/01/02","Hello".getBytes());
      
            // case 3: update children data -- no trigger
            // zkClient.setData().forPath(zkPath+"/b/01","b01".getBytes());
      
            // case 4: delete children -- no trigger
            // zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b/01/02");
      
            // case 5: delete node -- trigger event: NodeDeleted
            zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b");
      
            Thread.sleep(10000);
        }
      
    • checkExists().usingWatcher(new CuratorWatcher(){ ... } ).forPath(watchPath): 只监听该节点的变化(可监听到 NodeCreated,NodeDeleted,NodeDataChanged),监听节点(watchPath)可不存在

        @Test
        public void usingWatcher2Test() throws Exception{
            // Stat stat=zkClient.checkExists().forPath(zkPath+"/b");
            // if(stat==null)
            //         zkClient.create().creatingParentContainersIfNeeded().forPath(zkPath+"/b","bbb".getBytes());
      
            zkClient.checkExists().usingWatcher(new CuratorWatcher() {
                @Override
                public void process(WatchedEvent event) throws Exception {
                    System.out.println("Path:"+event.getPath());
                    System.out.println("Type:"+event.getType());
                    System.out.println("State:"+event.getState());
                    if(!event.getType().equals(EventType.NodeDeleted)){
                        byte[] data=zkClient.getData().forPath(event.getPath());
                        if(data!=null)
                            System.out.println("Data:"+new String(data));
                    }
                }
            }).forPath(zkPath+"/b");
      
            // case 1: node data change
            // first time - trigger event:NodeDataChanged
            // zkClient.setData().forPath(zkPath+"/b","bbb-bbb".getBytes());
            // second time - no watcher trigger
            // zkClient.setData().forPath(zkPath+"/b","bbb-ccc".getBytes());    
      
            // case 2: create children -- no trigger
            // zkClient.create().creatingParentsIfNeeded().forPath(zkPath+"/b/01/02","Hello".getBytes());
      
            // case 3: update children data -- no trigger
            // zkClient.setData().forPath(zkPath+"/b/01","b01".getBytes());
      
            // case 4: delete children -- no trigger
            // zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b/01/02");
      
            // case 5: delete node -- trigger event: NodeDeleted
            // zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath+"/b");
      
            // case 6: create node -- trigger event: NodeCreated
            zkClient.create().creatingParentContainersIfNeeded().forPath(zkPath+"/b","bbb".getBytes());
      
            Thread.sleep(10000);
        }
      
  2. NodeCache: 一次注册,n次监听 -- 监听当前节点,对子节点无效!(监听节点可不存在)

    @Test
    public void nodeCacheWatchTest() throws Exception{
    
     String watchPath=zkPath+"/c";
     if(zkClient.checkExists().forPath(watchPath)==null)
         zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ccc".getBytes());
    
     NodeCache nodeCache = new NodeCache(zkClient, watchPath);
    
     System.out.println("Add Listener...");
     nodeCache.getListenable().addListener(new NodeCacheListener() {
         @Override
         public void nodeChanged() throws Exception {
             System.out.println("Trigger Watcher...");
             ChildData data=nodeCache.getCurrentData();
             if(data!=null){
                 System.out.println(data.getPath());
                 System.out.println(new String(data.getData()));
                 System.out.println(data.getStat().getVersion());
             }else {
                 System.out.println("Empty!");
             }
         }
     });
    
     // start -- 注意:不管是否cacheData,启动不会触发watcher
     // nodeCache.start();        // 不cache节点数据
     nodeCache.start(true);    // cache节点数据 
    
     // check initial data
     ChildData data=nodeCache.getCurrentData();
     if(data!=null){
         System.out.println("节点初始化数据为:");
         System.out.println(data.getPath());
         System.out.println(new String(data.getData()));
         System.out.println(data.getStat().getVersion());
     }else {
         System.out.println("节点初始化数据为空!");
     }
    
     System.out.println("Begin Case 1 : watchPath -- trigger watcher");
     // node data change -- always trigger event:NodeDataChanged
     zkClient.setData().forPath(watchPath,"ccc-ccc".getBytes());
     Thread.sleep(2000);
     zkClient.setData().forPath(watchPath,"ccc-ddd".getBytes());    
     // delete node -- trigger event: NodeDeleted
     zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath);
     // create node -- trigger event: NodeCreated
     zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ccc".getBytes());
    
     Thread.sleep(2000);
    
     System.out.println("Begin Case 2 : children -- no trigger");
     // create children
     zkClient.create().creatingParentsIfNeeded().forPath(watchPath+"/01/02","Hello".getBytes());
     // update children data
     zkClient.setData().forPath(watchPath+"/01","c01".getBytes());
     // delete children
     zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath+"/01/02");
    
     Thread.sleep(10000);
    
     if(nodeCache!=null)
         nodeCache.close();
    }
    
  3. PathChildrenCache: 监听节点的一级子节点的CUD (Note: 监听节点不会触发watcher,监听节点可不存在; 若监听节点删除,则监听失效,子节点的变化将不会触发watcher )

     @Test
     public void pathChildenCacheWatchTest() throws Exception{
    
         String watchPath=zkPath+"/d";
         if(zkClient.checkExists().forPath(watchPath)==null){
             zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ddd".getBytes());
         }
         if(zkClient.checkExists().forPath(watchPath+"/0")==null){
             zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath+"/0","00".getBytes());
         }
    
         // cacheData:true/false
         PathChildrenCache childCache=new PathChildrenCache(zkClient, watchPath, true);
    
         System.out.println("Add Listener...");
         childCache.getListenable().addListener(new PathChildrenCacheListener() {
             @Override
             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                 System.out.println(event.getType());
                 ChildData child=event.getData();
                 if(child!=null)
                     System.out.println(child.getPath()+":"+new String(child.getData())+",version:"+child.getStat().getVersion());
    
                 if(event.getType().equals(Type.INITIALIZED)){
                     System.out.println("watcher到的节点初始化信息:");
                     List<ChildData> childDataList=event.getInitialData();
                     for(ChildData c:childDataList){
                         System.out.println(c.getPath()+":"+new String(c.getData())+",version:"+c.getStat().getVersion());
                     }
                 }
             }
         });
    
         System.out.println("Start Cache: ");
         // childCache.start();                                    //default: StartMode.NORMAL,异步初始化     -- initial data not trigger watcher
         // childCache.start(StartMode.POST_INITIALIZED_EVENT);     // 异步初始化 -- initial trigger watcher: Type.INITIALIZED
         childCache.start(StartMode.BUILD_INITIAL_CACHE);        // 同步初始化 -- initial not trigger watcher
    
         // Thread.sleep(2000);
         List<ChildData> childDataList= childCache.getCurrentData();
         System.out.println("start 节点初始化信息:");
         for(ChildData child:childDataList){
             System.out.println(child.getPath()+":"+new String(child.getData())+",version:"+child.getStat().getVersion());
         }
    
         System.out.println("Begin Case 1 : watchPath -- no trigger");
         // node data change
         // zkClient.setData().forPath(watchPath,"ddd-ddd".getBytes());
         // delete node    -- Note: 会导致watcher失效
         // zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath);
         // create node
         // zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"ddd".getBytes());
         // Thread.sleep(2000);
    
         System.out.println("Begin Case 2 : children -- trigger");
         // create children -- trigger CHILD_ADDED -- Note: on trigger "/01" CHILD_ADD, won't trigger "/01/02" CHILD
         zkClient.create().creatingParentsIfNeeded().forPath(watchPath+"/01/02","Hello".getBytes());
         // update children data -- trigger CHILD_UPDATED
         zkClient.setData().forPath(watchPath+"/01","d01".getBytes());
         // delete children -- trigger CHILD_REMOVED
         zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath+"/01");
    
         Thread.sleep(10000);
    
         if(childCache!=null)
             childCache.close();
     }
    
  4. TreeCache: 监听节点和该节点的所有子节点 (创建时可设置maxDepth

     @Test
     public void treeCacheWatchTest() throws Exception{
    
         String watchPath=zkPath+"/e";
         if(zkClient.checkExists().forPath(watchPath)==null){
             zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"eee".getBytes());
         }
         if(zkClient.checkExists().forPath(watchPath+"/0")==null){
             zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath+"/0","00".getBytes());
         }
    
         System.out.println("Add Listener...");
         TreeCache treeCache=new TreeCache(zkClient, watchPath);        // default:not cacheData,initial data will trigger watcher: NODE_ADDED
         treeCache.getListenable().addListener(new TreeCacheListener() {
             @Override
             public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                 System.out.println("Watch:");
                 System.out.println(event.getType());
                 ChildData childData=event.getData();
                 if(childData!=null)
                     System.out.println(childData.getPath()+":"+new String(childData.getData())+",version:"+childData.getStat().getVersion());
             }
         });
    
         treeCache.start();
    
         System.out.println("Initial Data...");
         ChildData childData=treeCache.getCurrentData(watchPath);
         if(childData!=null)
             System.out.println("Current:"+childData.getPath()+":"+new String(childData.getData())+",version:"+childData.getStat().getVersion());
         Map<String,ChildData> childMap=treeCache.getCurrentChildren(watchPath);
         if(childMap!=null){
             for(String key:childMap.keySet()){
                 System.out.println("Child:"+key+":"+childMap.get(key));
             }
         }
    
         System.out.println("Begin case 1 : watchPath -- trigger");
         // node data change -- trigger NODE_UPDATED
         zkClient.setData().forPath(watchPath,"eee-eee".getBytes());
         // delete node    -- trigger NODE_REMOVED 2 times for path '/test/e/0','/test/e'
         zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath);
         // create node -- trigger NODE_ADDED
         zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath,"eee".getBytes());
         Thread.sleep(2000);
    
         System.out.println("Begin Case 2 : children -- trigger");
         // create children -- trigger NODE_ADDED 2 times for path: '/test/e/01','/test/e/01/02'
         zkClient.create().creatingParentsIfNeeded().forPath(watchPath+"/01/02","Hello".getBytes());
         // update children data -- trigger NODE_UPDATED
         zkClient.setData().forPath(watchPath+"/01","d01".getBytes());
         // delete children -- trigger NODE_REMOVED 2 times for path: '/test/e/01/02','/test/e/01'
         zkClient.delete().deletingChildrenIfNeeded().forPath(watchPath+"/01");
    
         Thread.sleep(10000);
    
         if(treeCache!=null)
             treeCache.close();
     }
    

Demo: ACL

  1. init

     public class CuratorAclTest {
    
         public static String connectString="127.0.0.1:2181";
         public static String namespace="micro";
         public static String zkPath="/bb";
         private CuratorFramework zkClient=null;
    
         private ACL aclAdmin=null;
         private ACL aclRW=null;
         private ACL aclCD=null;
    
         /*
          * Perms.ADMIN    -- 可以修改节点权限(setAcl)
          * Perms.READ    -- 可读取节点(ls,get)
          * Perms.WRITE    -- 可修改节点内容 (set)
          * Perms.CREATE -- 可创建节点 (create)
          * Persm.DELETE -- 可删除节点 (delete)
          */
         @Before
         public void init() throws NoSuchAlgorithmException{
    
             Id id0=new Id("digest",DigestAuthenticationProvider.generateDigest("id00:admin"));
             Id id1=new Id("digest",DigestAuthenticationProvider.generateDigest("id01:12345"));
             Id id2=new Id("digest",DigestAuthenticationProvider.generateDigest("id02:12345"));
    
             aclAdmin=new ACL(Perms.ADMIN, id0);
             aclRW=new ACL(Perms.READ|Perms.WRITE,id1);
             aclCD=new ACL(Perms.CREATE|Perms.DELETE,id2);
         }
    
         @After
         public void close(){
             if(zkClient!=null){
                 zkClient.close();        
                 CuratorFrameworkState state=zkClient.getState();
                 System.out.println("Close zkClient :"+state.name());
             }
         }
    
         /*  ... */
     }
    
  2. ACL

    • aclProvider/setACL: same as zkClient cmd setAcl path acl

            @Test
        public void aclProviderTest() throws Exception{
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
            zkClient=CuratorFrameworkFactory.builder()
                    //.aclProvider(new DefaultACLProvider())    // default
                    .aclProvider(new ACLProvider() {    // Note: 在节点创建时触发调用,为新建的节点设置权限
                        @Override
                        public List<ACL> getDefaultAcl() {
                            System.out.println("setting default acl...");
                            return ZooDefs.Ids.OPEN_ACL_UNSAFE;
                        }
                        @Override
                        public List<ACL> getAclForPath(String path) {
                            System.out.println("setting acl...");
                            // 为zkPath节点和其以下的所有子节点设置acl
                            if(path.startsWith("/"+namespace+zkPath)){    // note: namespace+zkPath
                                System.out.println("setting aclAdmin,aclRW,aclCD...");
                                return Arrays.asList(aclAdmin,aclRW,aclCD);
                            }
                            else
                                return ZooDefs.Ids.OPEN_ACL_UNSAFE;
                        }
                    })
                    .connectString(connectString)
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(100000)
                    .connectionTimeoutMs(100000)
                    .namespace(namespace)
                    .build();
      
            zkClient.start();
      
            CuratorFrameworkState state=zkClient.getState();
            System.out.println("Init zkClient :"+state.name());
      
            // case 1 -- Create -zkPath --> Success
            aclCreateTest();
      
            // case 2 -- Read and Write -zkPath    --> NoAuthException!!
            // aclReadWriteTest();
      
            // case 3 -- Delete -zkPath --> NoAuthException!!
            aclDeleteTest();
      
            // case 4 -- Create -zkPath --> Success
            aclCreateTest();
      
            System.out.println("child case...");
      
            // case 5 -- Delete and Create -zkPath child --> NoAuthException!!
            // aclDeleteChildTest();
            // aclCreateChildTest();
      
            // case 6 -- Read and Write -zkPath child    --> NoAuthException!!
            // aclReadWriteChildTest();
        }
      
    • authorization: same as zkClient cmd addauth sechema auth ( eg: addauth digest id02:12345 )

        @Test
        public void authorizationTest() throws Exception{
            List<AuthInfo> authInfos = new ArrayList<AuthInfo>();
            authInfos.add(new AuthInfo("digest", "id00:admin".getBytes()));    // admin:修改权限认证
            // authInfos.add(new AuthInfo("digest", "id01:12345".getBytes()));    // rw:读写节点认证
            authInfos.add(new AuthInfo("digest", "id02:12345".getBytes()));    // cr:增删节点认证
      
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
            zkClient=CuratorFrameworkFactory.builder()
                    // Note: 
                    // 不设置aclProvider,等同.aclProvider(new DefaultACLProvider()),即使用ZooDefs.Ids.OPEN_ACL_UNSAFE
                    // 对于新创建的节点,acl为ZooDefs.Ids.OPEN_ACL_UNSAFE
                    // .authorization(authInfos)    // 附上访问权限信息AuthInfo,用于访问节点认证
                    .connectString(connectString)
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(100000)
                    .connectionTimeoutMs(100000)
                    .namespace(namespace)
                    .build();
      
            zkClient.start();
      
            CuratorFrameworkState state=zkClient.getState();
            System.out.println("Init zkClient :"+state.name());
      
            //get acl -- 无需权限
            getACLTest();
      
            // set acl -zkPath 
            // --> 拥有admin权限认证时才可
            setACLTest();
      
            // getChildACLTest();
      
            // case 1: read & write -zkPath
            // --> 需拥有rw权限
            aclReadWriteTest();
      
            // case 2: create -zkPath child    
            // --> 需拥有c权限才可创建,新创建的节点未设置acl,则使用默认acl{world,anyone}
            aclCreateChildTest();
      
            // case 3: read & write -zkPath child
            aclReadWriteChildTest();
            // case 4: delete -zkPath child
            aclDeleteChildTest();
      
            // case 4: delete -zkPath
            // --> 需拥有d权限才可删除
            aclDeleteTest();
        }
      
  3. Set/Get ACL

    • getACL (无需认权限证)

        public void getACLTest() throws Exception{
            if(zkClient.checkExists().forPath(zkPath)==null){
                System.out.println(zkPath+" not exist!");
                return;
            }
            System.out.println("list acl:");
            List<ACL> aclList=zkClient.getACL().forPath(zkPath);
            for(ACL acl:aclList)
                System.out.println(acl);
        }
      
        public void getChildACLTest() throws Exception{
            System.out.println("list child acl:");
            List<String> childList=zkClient.getChildren().forPath(zkPath);
            for(String child:childList){
                List<ACL> aclList=zkClient.getACL().forPath(zkPath+"/"+child);
                System.out.println(child+" acl: ");
                for(ACL acl:aclList)
                    System.out.println("\t"+acl);
            }
        }
      
    • setACL
        public void setACLTest() throws Exception{
            if(zkClient.checkExists().forPath(zkPath)!=null){
                System.out.println("Set ACL");
                Stat stat=zkClient.setACL().withACL(Arrays.asList(aclAdmin,aclRW,aclCD)).forPath(zkPath);
                System.out.println(stat.getVersion());
            }else{
                System.out.println("Create and Set ACL");
                zkClient.create().creatingParentsIfNeeded().withACL(Arrays.asList(aclAdmin,aclRW,aclCD)).forPath(zkPath,"access test".getBytes());
            }
            getACLTest();
        }
      
  4. CRUD : call to test

     /* Current Path */
     public void aclReadWriteTest() throws Exception{
         System.out.println("aclReadWriteTest....");
         if(zkClient.checkExists().forPath(zkPath)==null){
             System.out.println(zkPath+" not exist!");
             return;
         }
    
         //read
         System.out.println("read..."+zkPath);
         byte[] data=zkClient.getData().forPath(zkPath);
         if(data!=null)
             System.out.println(new String(data));
    
         //write
         System.out.println("write..."+zkPath);
         Stat stat=zkClient.setData().forPath(zkPath,"access test-acl".getBytes());
         System.out.println(stat.getVersion());
     }
    
     public void aclCreateTest() throws Exception{
         System.out.println("acl create test...");
         if(zkClient.checkExists().forPath(zkPath)!=null){
             System.out.println(zkPath+" already exist!");
         }else{
             //create
             System.out.println("create..."+zkPath);
             zkClient.create().creatingParentsIfNeeded().forPath(zkPath,"access test-new".getBytes());
         }
         //print acl
         System.out.println("list acl..."+zkPath);
         List<ACL> aclList=zkClient.getACL().forPath(zkPath);
         for(ACL acl:aclList){
             System.out.println(acl);
         }
     }
    
     public void aclDeleteTest() throws Exception{
         System.out.println("aclDeleteTest....");
         if(zkClient.checkExists().forPath(zkPath)==null){
             System.out.println(zkPath+" not exist!");
         }else{
             //delete
             System.out.println("delete..."+zkPath);
             zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath);
         }
     }
    
     /* Children CRUD */
    
     public void aclReadWriteChildTest() throws Exception{
         System.out.println("aclReadWriteChildTest....");
         String accessPath=zkPath+"/a";
    
         //read
         System.out.println("read..."+accessPath);
         byte[] data=zkClient.getData().forPath(accessPath);
         if(data!=null)
             System.out.println(new String(data));
    
         //write
         System.out.println("write..."+accessPath);
         Stat stat=zkClient.setData().forPath(accessPath,"aaa-acl".getBytes());
         System.out.println(stat.getVersion());
     }
    
     public void aclCreateChildTest() throws Exception{
         System.out.println("aclCreateChildTest....");
         String accessPath=zkPath+"/a";
         if(zkClient.checkExists().forPath(accessPath)!=null){
             System.out.println(accessPath+" already exist!");
         }else{
             //create
             System.out.println("create..."+accessPath);
             zkClient.create().creatingParentsIfNeeded().forPath(accessPath,"aaa-new".getBytes());
         }
         //print acl
         System.out.println("list acl..."+accessPath);
         List<ACL> aclList=zkClient.getACL().forPath(accessPath);
         for(ACL acl:aclList){
             System.out.println(acl);
         }
     }
    
     public void aclDeleteChildTest() throws Exception{
         System.out.println("aclDeleteChildTest....");
         String accessPath=zkPath+"/a";
         if(zkClient.checkExists().forPath(accessPath)==null){
             System.out.println(accessPath+" not exist!");
         }else{
             //delete
             System.out.println("delete..."+accessPath);
             zkClient.delete().deletingChildrenIfNeeded().forPath(accessPath);
         }
     }
    

一个应用示例

Client端监控文件增删,实现文件从Server端到Client端的同步

Summary

Two Parts:

  • Part 1: FileServer -> CRUD files
  • Part 2: ClientUser -> Auto download from FileServer / delete local file depends on the files updates on FileServer.

  • admin(/admin): FileServer

    • upload file: POST /images -> upload(MultipartFile photo)
      • save file with name {image.id}.{image.type} on local fileLocation
      • save file information on DB
      • add zk node: /{image.id}_{PERSISTENT_SEQUENTIAL}: ADD:{image.id}:{image.type}:{image.originalName}
    • delete file: DEL images/{id} -> delete(id)
      • delete file information on DB
      • add zk node: /{image.id}_{PERSISTENT_SEQUENTIAL}: DEL:{image.id}:{image.type}:{image.originalName}
    • list all files information: GET /images -> list()
    • get file information by id: GET /images/{id} -> get(id)
    • config:
      • fileLocation: /Users/cj/space/java/admin-uploads
      • zk
        • server: 127.0.0.1:2181
        • namespace: demo
  • user(/user): Client User

    • get local file: GET /images/{filename} -> get(filename)
    • zk listen /:CHILD_ADDED:
      • ADD:{filename}:{extension}:{originalName} -> download file from fileServer to local,then del this child node
      • DEL:{filename}:{extension}:{originalName} -> delete local file,then del this child node
    • config:
      • local fileLocation /Users/cj/space/java/user-downloads
      • remote fileServer http://localhost:8080/zookeeper-demo/admin/images
      • zk
        • server: 127.0.0.1:2181
        • namespace: demo
  • verify:

      # upload: POST /admin/images
      $ curl -i -F 'photo=@/Users/cj/Pictures/design/极光2.jpg' -X POST http://localhost:8080/zookeeper-demo/admin/images 
    
      # list: GET /admin/images
      $ curl -i -H "Content-Type: application/json" http://localhost:8080/zookeeper-demo/admin/images
    
      # get: GET /admin/images/{id}
      $ curl -i -H "Content-Type: application/json" http://localhost:8080/zookeeper-demo/admin/images/1d0fa647-9dbe-410a-8d9c-0e1c973a98e2
    
      # delete: DELETE /admin/images/{id}
      $ curl -i -H "Content-Type: application/json" -X DELETE http://localhost:8080/zookeeper-demo/admin/images/1d0fa647-9dbe-410a-8d9c-0e1c973a98e2
    
      # visit: 
      # /user/images/1d0fa647-9dbe-410a-8d9c-0e1c973a98e2.jpg
    
      # More (for test)
      # /light.jpg
      # /admin/images
      # /admin/images/test1/虫洞.gif
      # /admin/images/test2/极光1.jpg
    

Dependencies

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<!-- Curator (include zookeeper) -->
<!-- Note: need to change zookeeper version,the beta version zookeeper has issues -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
    <exclusions>
        <exclusion>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
    <exclusions>
        <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- FileUpload -->
<dependency>
    <groupId>commons-fileupload</groupId>
    <artifactId>commons-fileupload</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.5</version>
</dependency>

Admin(FileServer)

  1. application.yml

     server:
       port: 8080
       servlet:
         context-path: /zookeeper-demo
     zk:
       server: 127.0.0.1:2181
       namespace: demo
     admin:
       fileLocation: /Users/cj/space/java/admin-uploads
    
  2. ImageAdminController

    @RestController
    @RequestMapping("/admin/images")
    public class ImageAdminController {
    
     @Value("${admin.fileLocation}")
     private String fileLocation;
    
     @Autowired
     private ImageService imageService;
    
     @Resource(name="zkService")
     private ZkCuratorService zkService;
    
     @PostMapping
     public Object upload(@RequestParam(value="photo") MultipartFile photo) throws Exception{
    
         System.out.println("Name:"+photo.getName());                            // photo
         System.out.println("OriginalFilename:"+photo.getOriginalFilename());    // 极光2.jpg
         System.out.println("Size:"+photo.getSize());                            // 186316
         System.out.println("ContentType:"+photo.getContentType());                // image/jpeg
    
         Image image=new Image(UUID.randomUUID().toString(),
                 photo.getOriginalFilename(),"Active");
         String destFilePath=this.fileLocation+"/"+image.getId()+"."+image.getType();
         System.out.println("dest:"+destFilePath);
         FileUtils.copyToFile(photo.getInputStream(), new File(destFilePath));
         boolean result=this.imageService.save(image);
    
         // zookeeper
         String data="ADD:"+image.getId()+":"+image.getType()+":"+image.getOriginalName();
         String zkPath=zkService.create("/"+image.getId(), data.getBytes());
         System.out.println("create zkPath(for ADD):"+zkPath);
    
         return ResponseEntity.ok(result);
     }
    
     @DeleteMapping("/{id}")
     public Object delete(@PathVariable("id") String id) throws Exception{
         Image image=this.imageService.delete(id);
    
         // zookeeper
         if(image!=null){
             String data="DEL:"+image.getId()+":"+image.getType()+":"+image.getOriginalName();
             String zkPath=zkService.create("/"+image.getId(), data.getBytes());
             System.out.println("create zkPath(for DEL):"+zkPath);
         }
    
         return ResponseEntity.ok(image!=null);
     }
    
     @GetMapping
     public Object list(){
         return ResponseEntity.ok(this.imageService.list());
     }
    
     @GetMapping("/{id}")
     public Object get(@PathVariable("id")String id) throws IOException{
         Image image=this.imageService.get(id);
         if(image==null || !"Active".equals(image.getStatus()) 
                 || StringUtils.isEmpty(image.getType()) || "Unknow".equals(image.getType()))
             return ResponseEntity.ok("Not Available!");
    
         FileInputStream in=new FileInputStream(this.fileLocation+"/"+image.getId()+"."+image.getType());
         return ResponseEntity.ok()
                 .contentType(MediaType.IMAGE_JPEG)
                 .contentType(MediaType.IMAGE_PNG)
                 .contentType(MediaType.IMAGE_GIF)
                 .body(IOUtils.toByteArray(in));
     }
    
     /* ------ For Test: -------- */
     @GetMapping(value="/test1/{name}"
             ,produces={MediaType.IMAGE_JPEG_VALUE,MediaType.IMAGE_PNG_VALUE,MediaType.IMAGE_GIF_VALUE})
     public Object getImageByName1(@PathVariable("name") String filename) throws IOException{
         String filePath="/Users/cj/Pictures/design";
         FileInputStream in = new FileInputStream(filePath+"/"+filename);
         return IOUtils.toByteArray(in);    // or ResponseEntity.ok(IOUtils.toByteArray(in));
     }
     @GetMapping(value="/test2/{name}")
     public Object getImageByName2(@PathVariable("name") String filename) throws IOException{
         String filePath="/Users/cj/Pictures/design";
         FileInputStream in = new FileInputStream(filePath+"/"+filename);
         return ResponseEntity.ok()
                 .contentType(MediaType.IMAGE_JPEG)
                 .contentType(MediaType.IMAGE_PNG)
                 .contentType(MediaType.IMAGE_GIF)
                 .body(IOUtils.toByteArray(in));
     }
     @PostConstruct
     private void init() throws Exception{
         System.out.println("init:"+this.fileLocation);
         FileUtils.forceMkdir(new File(this.fileLocation));
    
         String id="1d0fa647-9dbe-410a-8d9c-0e1c973a98e2";
         Image image=new Image(id,"light.jpg","Active");
         FileUtils.copyFile(new File("/Users/cj/Pictures/design/极光1.jpg"), new File(this.fileLocation+"/"+id+".jpg"));
         this.imageService.save(image);
    
         // zookeeper
         String data="ADD:"+image.getId()+":"+image.getType()+":"+image.getOriginalName();
         String zkPath=zkService.create("/"+image.getId(), data.getBytes());
         System.out.println("create zkPath(for ADD):"+zkPath);
     }
     @PreDestroy
     private void clear() throws Exception{
         FileUtils.cleanDirectory(new File(this.fileLocation));
         // zookeeper
         zkService.delete("/");
     }
    }
    
  3. main

    @SpringBootApplication
    @Configuration
    public class App {
     public static void main( String[] args ){
         SpringApplication.run(App.class, args);
     }
    
     @Bean(name="zkService")
     public ZkCuratorService zkService(){
         return new ZkCuratorService();
     }
    }
    

User(ClientUser)

  1. application.yml

     server:
       port: 8090
       servlet:
         context-path: /zookeeper-demo
     zk:
       server: 127.0.0.1:2181
       namespace: demo
     user:
       fileLocation: /Users/cj/space/java/user-downloads
       fileServer: http://localhost:8080/zookeeper-demo/admin/images
    
  2. ImageUserController

    @RestController
    @RequestMapping("/user/images")
    public class ImageUserController {
    
     @Value("${user.fileLocation}")
     private String fileLocation;
    
     @Value("${user.fileServer}")
     private String fileServer;
    
     @Resource(name="zkClientService")
    //    @Resource(name="zkService")
     private ZkCuratorService zkService;
    
     @GetMapping("/{filename}")
     public Object get(@PathVariable("filename") String filename) throws IOException{
         FileInputStream in=new FileInputStream(this.fileLocation+"/"+filename);
         return ResponseEntity.ok()
                 .contentType(MediaType.IMAGE_JPEG)
                 .contentType(MediaType.IMAGE_PNG)
                 .contentType(MediaType.IMAGE_GIF)
                 .body(IOUtils.toByteArray(in));
     }
    
     @PostConstruct
     public void init() throws Exception{
         PathChildrenCacheListener listener=new PathChildrenCacheListener(){
             @Override
             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) 
                     throws Exception {
                 System.out.println("Get Event:"+event.getType());
    
                 if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
                     ChildData child=event.getData();
                     if(child==null)
                         return;
                     String data=new String(child.getData());
                     String array[]=data.split(":");
                     String filename= array[1]+"."+array[2];
                     System.out.println(child.getPath()+":"+data);
    
                     if("ADD".equals(array[0])){
                         try{
                             Thread.sleep(3000);        // ! for test and connect refused exception
                             FileUtils.copyURLToFile(new URL(fileServer+"/"+filename)
                                     , new File(fileLocation+"/"+filename));
                             zkService.delete(child.getPath());
                         }catch(ConnectException ex){
                             System.out.println(ex.getMessage());
                         }
                     }else if("DEL".equals(array[0])){
                         try{
                             FileUtils.forceDelete(new File(fileLocation+"/"+filename));
                         }catch(FileNotFoundException ex){
                             System.out.println("not exist:"+fileLocation+"/"+filename);
                         }
                         zkService.delete(child.getPath());
                     }
                 }
             }
         };
         zkService.addPathChildrenWatcher("/", true, listener);
     }
    
     @PreDestroy
     public void clear() throws Exception{
         FileUtils.cleanDirectory(new File(this.fileLocation));
         // zookeeper
         zkService.delete("/");
     }
    }
    
  3. main

    @SpringBootApplication
    @Configuration
    public class App {
     public static void main( String[] args ){
         SpringApplication.run(App.class, args);
     }
    
     @Bean(name="zkClientService")
     public ZkCuratorService zkClientService(){
         return new ZkCuratorService();
     }
    }
    

Common: Service & Entity

  1. ImageService

    @Service
    public class ImageService {
     private final ConcurrentMap<String,Image> images=new ConcurrentHashMap<String,Image>();
    
     public boolean save(Image image){
         return images.put(image.getId(),image)==null;
     }
     public Image delete(String id){
         Image image=images.get(id);
         if(image==null)
             return null;
         image.setStatus("InActive");
         images.replace(id, image);
         return image;
     }
     public Image get(String id){
         return images.get(id);
     }
     public Collection<Image> list(){
         return images.values();
     }   
    }
    
  2. ZkCuratorService

    public class ZkCuratorService {
    
     private CuratorFramework zkClient;
    
     @Value("${zk.server}")
     private String connectString;
    
     @Value("${zk.namespace}")
     private String namespace;
    
     private PathChildrenCache childCache;
    
     public CuratorFramework getZkClient() {
         return zkClient;
     }
     public void setZkClient(CuratorFramework zkClient) {
         this.zkClient = zkClient;
     }
    
     @PostConstruct
     public void init(){
         System.out.println("ZK Service init");
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
         zkClient=CuratorFrameworkFactory.builder()
                 .connectString(connectString)
                 .retryPolicy(retryPolicy)
                 .sessionTimeoutMs(10000)
                 .connectionTimeoutMs(10000)
                 .namespace(namespace)
                 .build();
         zkClient.start();
     }
     @PreDestroy
     public void close(){
         if(zkClient!=null)
             zkClient.close();
     }
     public void addPathChildrenWatcher(String watchPath,boolean createIfNotExist,PathChildrenCacheListener listener) throws Exception{
         if(createIfNotExist){
             if(zkClient.checkExists().forPath(watchPath)==null){
                 zkClient.create().creatingParentContainersIfNeeded().forPath(watchPath);
             }
         }
         childCache = new PathChildrenCache(zkClient, watchPath, true);
         System.out.println("Add Listener...");
         childCache.getListenable().addListener(listener);
         System.out.println("Start Cache...");
         childCache.start(StartMode.POST_INITIALIZED_EVENT);     // 异步初始化 -- initial trigger watcher: Type.INITIALIZED
     }
     public String create(String path,byte[] data) throws Exception{
         return zkClient.create()
                 .creatingParentsIfNeeded()
                 .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                 .forPath(path,data);
     }
     public void delete(String path) throws Exception{
         zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
     }
     public Stat update(String path,byte[] data) throws Exception{
         return zkClient.setData().forPath(path,data);
     }
     public Stat checkExists(String path) throws Exception{
         return zkClient.checkExists().forPath(path);
     }
    }
    
  3. Entity: Image

    public class Image {
     private String id;
     private String originalName;
     private String type;
     private String status;
    
     public Image(){}
    
     public Image(String id,String originalName,String status){
         this.id=id;
         this.originalName=originalName;
         this.status=status;
         int pos=originalName.lastIndexOf(".");
         this.type=(pos!=-1?originalName.substring(pos+1):"Unknow");
     }
    
     /* getter & setter .... */
    }
    

Reference

My demo: zookeeper-demo