ZooKeeper、CuratorFramework、Watcher、ConnectionStateListener

news/2024/7/24 6:53:57 标签: zookeeper, 分布式, 云原生

ZooKeeper

Zookeeper的作用是提供一个分布式的协调服务,它可以让分布式系统中的各个节点之间进行通信和协调,从而保证整个系统的一致性和可靠性。Zookeeper的核心是一个分布式的文件系统,它可以存储和管理分布式系统中的各种信息,如配置信息、元数据、状态信息等。

常用的Zookeeper API包括:

1. 创建节点:create()
2. 获取节点数据:getData()
3. 设置节点数据:setData()
4. 删除节点:delete()
5. 判断节点是否存在:exists()
6. 获取子节点列表:getChildren()
7. 监听节点变化:Watcher机制

CuratorFramework

CuratorFramework是一个ZooKeeper客户端框架,它封装了ZooKeeper API,提供了更加简单易用的接口,同时也提供了一些高级特性,如分布式锁、选举等。

CuratorFramework常用方法包括:

1. create():创建节点
2. delete():删除节点
3. setData():设置节点数据
4. getData():获取节点数据
5. checkExists():检查节点是否存在
6. getChildren():获取子节点列表
7. sync():同步节点数据

Watcher

Watcher是ZooKeeper中的一个机制,用于监听节点的变化。当节点发生变化时,Watcher会触发相应的事件,通知客户端进行相应的处理


Watcher与CuratorFramework结合使用的示例代码

public class CuratorWatcherDemo {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String ZK_PATH = "/test";

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(3, 1000));
        client.start();

        // 创建节点
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(ZK_PATH, "init".getBytes());

        // 注册Watcher
        client.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("Watcher triggered, type=" + event.getType() + ", path=" + event.getPath());
            }
        }).forPath(ZK_PATH);

        // 修改节点数据
        client.setData().forPath(ZK_PATH, "update".getBytes());

        // 删除节点
        client.delete().deletingChildrenIfNeeded().forPath(ZK_PATH);

        client.close();
    }
}

创建了一个CuratorFramework客户端,并使用它创建了一个节点。然后,注册了一个Watcher,用于监听节点的变化。接着,修改了节点的数据,并最终删除了节点。在这个过程中,Watcher会被触发,并输出相应的日志。


ConnectionStateListener

ConnectionStateListener 是 ZooKeeper 客户端连接状态的监听器,用于监听客户端与 ZooKeeper 服务器之间的连接状态变化。当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,ConnectionStateListener 会触发相应的事件,我们可以在事件处理方法中编写相应的业务逻辑。

ConnectionStateListener 常用方法有:

1. void stateChanged(CuratorFramework client, ConnectionState newState):当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,会触发该方法。其中,client 表示 CuratorFramework 客户端对象,newState 表示新的连接状态。

2. void addListener(ConnectionStateListener listener):添加连接状态监听器。

3. void removeListener(ConnectionStateListener listener):移除连接状态监听器。

ConnectionStateListener 与 CuratorFramework 结合使用示例代码:

public class MyConnectionStateListener implements ConnectionStateListener {

    private CuratorFramework client;

    public MyConnectionStateListener(CuratorFramework client) {
        this.client = client;
    }

    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (newState == ConnectionState.CONNECTED) {
            System.out.println("连接成功");
        } else if (newState == ConnectionState.RECONNECTED) {
            System.out.println("重新连接成功");
        } else if (newState == ConnectionState.LOST) {
            System.out.println("连接丢失");
        } else if (newState == ConnectionState.SUSPENDED) {
            System.out.println("连接挂起");
        } else if (newState == ConnectionState.READ_ONLY) {
            System.out.println("只读连接");
        }
    }
}

public class CuratorFrameworkTest {

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
        client.start();
        client.getConnectionStateListenable().addListener(new MyConnectionStateListener(client));
        Thread.sleep(Integer.MAX_VALUE);
    }
}

MyConnectionStateListener 类,实现了 ConnectionStateListener 接口,并在 stateChanged 方法中编写了相应的业务逻辑。然后,在 CuratorFrameworkTest 类中,创建了一个 CuratorFramework 客户端对象,并添加了 MyConnectionStateListener 监听器。最后,让主线程休眠,保证程序不会退出。当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,MyConnectionStateListener 监听器会触发相应的事件,并输出相应的日志信息。


http://www.niftyadmin.cn/n/5050336.html

相关文章

协议-TCP协议-基础概念03-Keep live保活机制-TCP RST-TCP连接

Keep live保活机制-TCP RST-TCP连接 参考来源: 《极客专栏-网络排查案例课》 Keep live保活机制 定时发送心跳探测包; 对于心跳回复包有超时限制; 要打开这个 TCP Keep-alive 特性,你需要使用 setsockopt() 系统调用&#xff0…

拼多多商品详情数据接口

拼多多商品详情接口的具体内容。获取拼多多商品详情,可以参考如下方式: item_get_app-根据ID取商品详情原数据接口包括:标题,价格,促销价,优惠券,库存,销量,详情图片&am…

Flink-CDC——MySQL、SqlSqlServer、Oracle等数据库开启日志方法

文章目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载…

泡泡玛特城市乐园开园在即,知名潮玩IP落地北京朝阳

今年以来,文旅产业成为亮眼消费赛道,大IP主题乐园再次受到市场关注。优质IP可以为园区引流,帮助乐园摆脱门票经济,平衡收入结构。 国内“潮玩第一股”泡泡玛特近日宣布,国内首个潮玩行业沉浸式IP主题乐园——泡泡玛特…

oracle 根据分号分割为多个列

oracle 没有split 函数,因此没法直接使用,但是时间上会遇到需要分割的时候,可以使用正则表达式 SELECT REGEXP_SUBSTR(administration, [^;], 1, 1) AS SKILL1, REGEXP_SUBSTR(administration, [^;], 1, 2) AS SKILL2, REGEXP_SUBSTR(admini…

LLM - Make Causal Mask 构造因果关系掩码

目录 一.引言 二.make_causal_mask 1.完整代码 2.Torch.full 3.torch.view 4.torch.masked_fill_ 5.past_key_values_length 6.Test Main 三.总结 一.引言 Causal Mask 主要用于限定模型的可视范围,防止模型看到未来的数据。在具体应用中,Caus…

C/S架构学习之UDP服务器

UDP服务器的实现流程:一、创建用户数据报套接字(socket函数):通信域选择IPV4网络协议、套接字类型选择数据报式; int sockfd socket(AF_INET,SOCK_DGRAM,0); 二、填充服务器的网络信息结构体:1.定义网络信…

蓝桥杯 题库 简单 每日十题 day10

01 最少砝码 最少砝码 问题描述 你有一架天平。现在你要设计一套砝码,使得利用这些砝码 可以出任意小于等于N的正整数重量。那么这套砝码最少需要包含多少个砝码? 注意砝码可以放在天平两边。 输入格式 输入包含一个正整数N。 输出格式 输出一个整数代表…