day03_BigData渐进学习_aclear_fire

1. 1. yum源配置补充

2.  [自动化部署脚本](#sh)
    ==============

3.  [自动化部署脚本](#installZk)
    =====================

4.  [Zookeeper结构化和命令](#zkDS)
    ========================

1.yum源配置补充

P.S.

centos关机命令:

1、本地yum仓库的安装配置 两种方式: a、每一台机器都配一个本地文件系统上的yum仓库 file:///packege/path/ b、在局域网内部配置一台节点(server-base)的本地文件系统yum仓库,然后将其发布到web服务器中,其他节点就可以通过http://server-base/pagekege/path/ 制作流程: 先挑选一台机器mini4,挂载一个系统光盘到本地目录/mnt/cdrom,然后启动一个httpd服务器,将/mnt/cdrom 软连接到httpd服务器的/var/www/html目录中 (cd /var/www/html; ln -s /mnt/cdrom ./centos ) 然后通过网页访问测试一下: http://mini4/centos 会看到光盘的目录内容 至此:网络版yum私有仓库已经建立完毕 剩下就是去各台yum的客户端配置这个http地址到repo配置文件中 无论哪种配置,都需要先将光盘挂在到本地文件目录中 mount -t iso9660 /dev/cdrom /mnt/cdrom 为了避免每次重启后都要手动mount,可以在/etc/fstab中加入一行挂载配置,即可自动挂载 vi /etc/fstab /dev/cdrom /mnt/cdrom iso9660 defaults 0 0 #这样每次开始 都会自动挂载光驱到 /mnt/cdrom 下面! 2、minimal安装的系统出现的问题:缺各种命令,安装软件时缺各种依赖 scp命令都没有:yum install -y openssh-clients #每台机器上都要安装才行,作用 在两台机器之间传递文件 #注意不一定都是 命令对应于依赖包名! #yum erase openssh-clients #卸载openssh-clients包


2.实现自动部署tomcat脚本

思路:dream //首先hadoop要进行免密登录!-> 安装wget 再解压 再安装. yum install -y wget #我们就不用进行输入 yes了!

!/bin/bash

SERVERS=”node-3.itcast.cn node-4.itcast.cn” #服务器名称
PASSWORD=123456
BASE_SERVER=172.16.203.100
auto_ssh_copy_id() {
expect -c “set timeout -1;
spawn ssh-copy-id $1;
expect {
(yes/no) {send — yes\\r;exp_continue;}
*assword:* {send — $2\\r;exp_continue;}
eof {exit 0;}
}”;

    #用户输入的交互过程,

}
ssh_copy_id_to_all() {
for SERVER in $SERVERS
do
auto_ssh_copy_id $SERVER $PASSWORD
done
}
ssh_copy_id_to_all
for SERVER in $SERVERS
do
scp install.sh root@$SERVER:/root
ssh root@$SERVER /root/install.sh
done

install_everyone.sh

!/bin/bash

BASE_SERVER=mini4 #可以下载软件的服务器在哪里
yum install -y wget
wget $BASE_SERVER/soft/jdk-7u45-linux-x64.tar.gz
tar -zxvf jdk-7u45-linux-x64.tar.gz -C /usr/local
cat >> /etc/profile << EOF
export JAVA_HOME=/usr/local/jdk1.7.0_45
export PATH=\$PATH:\$JAVA_HOME/bin
EOF

标准化输入 cat profile文件 追加export 两行到profile里面去。

原本cat 2.txt >> 1.txt

现在 输出追加到1.txt; << EOF “中间当做文件内容输入到 追加到 1.txt当中!” EOF

-- 对于高可用的客户端 寻找服务器端。 IPC(心跳程序,每秒钟进行检查,查看 当前是否宕机;还要记录宕机的状态信息->最好记录在数据库当中) 于是,我们希望有个这样的第三方进行解耦: 图一: dream 另一种工作场景: 保管数据(进行注册) 提供监听 图二:dream 替我们保管数据、提供监听:这样对于我们复杂的分布式就很简单(所以第三方自己就要很高的可靠性,zookeeper只要有半数以上结点存货即可 正常运行.) 图三: dream


注意:只是状态信息(描述性的信息放置在zk上面(但是如果读取的状态数据特别大的时候 上万个集群?阿里可能会有些问题。)) Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务

  • zookeeper是为别的分布式程序服务的
  • Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
  • Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……
  • 虽然说可以提供各种服务,但是zookeeper在底层其实只提供了两个功能:

管理(存储,读取)用户程序提交的数据; #注意只是状态存取和读取在zk,数据实际在自己 分布式服务期当中

并为用户程序提供数据节点监听服务; #监听 请求,返回;类似于一个中转站

图四:(hadoop集群很多协调服务的 感知,协调 storm框架也是用的zk) dream


自动化配置Zookeeper

1.注意在一开始的 Zk并没有主从之分,只是之后自己配置或者自己进行投票的逻辑进行配置的 PAXOS版本 ->Zab算法. (当发现没有主从的时候,开始投票;注意算法既看版本号也看id) 图一: [caption id=”attachment_1066” align=”aligncenter” width=”1008”]dream zookeeper集群结构[/caption] 开始启动了两台机器: mini1 mini2 :投票:1投票投自己。2也投票投自己。 两个人统计都读出一共投出两张票;没有人当选 leader;所以等到下一轮继续投票,发现leader没有,并且2的ID是大的。1投票就给了2,2投的票也给了2-》所以最后2 就得到两票-》当选leader 当然第三个服务器启动后:发现 有了leader所以 3就自动变成了follower 注意:1.对于大型的集群来说,最好不用这个,因为他们的主从是由投票来的,每次数据修改之后 还需要进行通知,每个机器还需要进行信息的更新。 一个leader要让所有的集群进行数据更新的话,延迟还是很大的。数据要求频繁不适合。 2.图一当中红色的部分 是我们需要给服务器配置的。 然而leader 和 follow是投票算法出来的。

——-开始搭建——-

简而言之配置

  1. apps/conf/zoo.cfg

            然后apps/zookeeper  拷贝过去
    
  2. 各个机器,进行编辑 /root/zkdata/myid

  3. 注意前提是 service iptables stop 2181 端口,关闭防火墙

    rm -rf src/ .txt .xml dist-maven/ docs 因为 这个是源码包 工程java+jar进行的程序;所以吧源码包删除即可。 因为 c语言(make && make install)是与平台相关的 编译。然而 javca 不是,他是运行在虚拟机啊上的。只要里面的 jdk是一直的即可。 注意:这是个集群 每个机器有自己的数据目录。然而给用户看的是一些状态描述。 spark storm hadoop zookeeper集群 都是在内网之中进行工作的。所以防火墙我们是统统关闭的。 server.1=aclear2:2888:3888 leader和follow的,3888是投票的端口。#.1 代表投票服务器编号。 myid还需要记录到 数据文件夹当中。 ——还有如果 service connect error 说明链接错误,没有奇数个服务器,进行链接了!失败了。 3、添加内容:

dataDir=/root/zkdata server.1=aclear2:2888:3888 server.2=aclear3:2888:3888 server.3=aclear5:2888:3888

4 启动(每台机器)

zkServer.sh start

5 查看集群状态

  • jps(查看进程)
  • sh status(查看集群状态,主从信息)

ZK结构和命令

4. zookeeper结构和命令 4.1. zookeeper特性 1、Zookeeper:一个leader,多个follower组成的集群 2、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的 3、分布式读写,更新请求转发,由leader实施 4、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行 5、数据更新原子性,一次数据更新要么成功,要么失败 6、实时性,在一定时间范围内,client能读到最新数据 4.2. zookeeper数据结构 1、层次化的目录结构,命名符合常规文件系统规范(见下图) 2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识 3、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点,下一页详细讲解);;注意:暂时性的结点是 不让新建其子节点的!!! 4、客户端应用可以在节点上设置监视器(后续详细讲解) 4.3. 数据结构的图 dream 4.4. 节点类型 1、Znode有两种类型: 短暂(ephemeral)(断开连接自己删除) 持久(persistent)(断开连接不删除) 2、Znode有四种形式的目录节点(默认是persistent ) PERSISTENT PERSISTENT_SEQUENTIAL(持久序列/test0000000019 ) EPHEMERAL EPHEMERAL_SEQUENTIAL 3、创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护 4、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

—day03_07

区别 export b (只能在本进程和相应的子进程之中b 变量是有效的。) source 1.sh 于是这样我们就能进行响应父进程中使用b变量的! 因为在一开始执行启动ssh是不会source 环境变量的。只能进行手动:

之前配置免密登录

ssh-keygen

ssh-copy-id

!/bin/sh

echo “Start ZkServer…”
for i in 2 3 5
do
ssh aclear$i “source /etc/profile;/root/apps/zookeeper/bin/zkServer.sh start”
done

—day03_08 Zookeeper客户端

1.1. zookeeper命令行操作

运行 zkCli.sh –server 进入命令行工具 1、使用 ls 命令来查看当前 ZooKeeper 中所包含的内容: [zk: 202.115.36.251:2181(CONNECTED) 1] ls / 2、创建一个新的 znode ,使用 create /zk myData 。这个命令创建了一个新的 znode 节点“ zk ”以及与它关联的字符串: [zk: 202.115.36.251:2181(CONNECTED) 2] create /zk “myData“ 3、我们运行 get 命令来确认 znode 是否包含我们所创建的字符串: [zk: 202.115.36.251:2181(CONNECTED) 3] get /zk #监听这个节点的变化,当另外一个客户端改变/zk时,它会打出下面的 #WATCHER:: #WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk [zk: localhost:2181(CONNECTED) 4] get /zk watch 4、下面我们通过 set 命令来对 zk 所关联的字符串进行设置: [zk: 202.115.36.251:2181(CONNECTED) 4] set /zk “zsl“ 5、下面我们将刚才创建的 znode 删除: [zk: 202.115.36.251:2181(CONNECTED) 5] delete /zk 6、删除节点:rmr #注意这个是删除 文件夹下的所有节点。 [zk: 202.115.36.251:2181(CONNECTED) 5] rmr /zk

1.2. zookeeper-api应用

1.2.1. 基本使用

org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话 它提供了表 1 所示几类主要方法 :

功能

描述

create

在本地目录树中创建一个节点

delete

删除一个节点

exists

测试本地是否存在目标节点

get/set data

从目标节点上读取 / 写数据

get/set ACL

获取 / 设置目标节点访问控制列表信息

getchildren

检索一个子节点上的列表

sync

等待要被传送的数据

表 1 : ZooKeeper API 描述 —-search.maven.org 注意:目前是结合 eclipse进行开发,学习。

代码测试:
/**
*
* @author acanprince method: 进行ZooKeeper 客户端的增删改查。监听存数据 因为咱们在j2ee上编写的是客户端
/
public class SimpleZkClient {
//注意在这个里面 , 才是正规的分隔符 然而 ; 不是分隔符! 这样就认成一个整体了
//zoo.cfg 配置是绑定在主机名上的。 不是绑定在IP上的。 所以 这里必须是 主机名称!IP会连接失败!
private static final String connectString = “aclear2:2181,aclear3:2181,aclear5:2181”;
private static final int sessionTimeout = 2000;
ZooKeeper zkClient = null;
//JUnit 库添加到路径中来。 然后就能分步测试
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// TODO 自动生成的方法存根
// 收到事件后的回调函数
System.out.println(event.getType() + “—-“ + event.getPath());
try {
//不加这个 h就只能监听一次的 根节点变化。
zkClient.getChildren(“/“, true);
} catch (KeeperException | InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
});
}
/\
*
* 数据的增删改查
* 返回创建节点的路径
* 参数:路径 数据 权限 类型
* @throws InterruptedException
* @throws KeeperException
*/
public void testCreated() throws KeeperException, InterruptedException {
String nodeCreated = zkClient.create(“/eclipse”, “helloZk”.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//此处的数据 如果上传文件的数据 也需要上传 byte数组的数据。
}
//判断是否存在
@Test
public void testExit() throws KeeperException, InterruptedException {
// TODO 自动生成的方法存根
Stat stat = zkClient.exists(“/eclipse”, false);
System.out.println(stat==null?”not exit”:”exit”);
}
//获取子节点 监听”/“ 结点变化,就会watch内部类被调用。
@Test
public void getChildren() throws KeeperException, InterruptedException {
//注意第二个参数 是watcher,置成true就是默认使用上方设置了的匿名内部类。
List children = zkClient.getChildren(“/“, true);
for (String child : children) {
System.out.println(“当前孩子节点是:” + child);
}
//注意图片 在mian主线程 有两个子线程: connect,egtchildren ; 一个listen()
Thread.sleep(Long.MAX_VALUE);
}
//获取znode的数据
@Test
public void getData() throws KeeperException, InterruptedException {
//如果集群很大 可能集群上的版本,和老的版本是不一致的。 但是我们现在是 小集群或者每次拿到都是最新的就行。
byte[] data = zkClient.getData(“/eclipse”, false, null);
System.out.println(new String(data));
}
//删除 znode;第二个参数:指定要删除的版本,-1表示删除所有版本!
@Test
public void deleteZnode() throws Exception {
zkClient.delete(“/eclipse”, -1);
}
@Test
public void setZnode() throws KeeperException, InterruptedException {
zkClient.setData(“/eclipse”, “Hello Znode”.getBytes(), -1);
byte[] data = zkClient.getData(“/eclipse”, false, null);
System.out.println(“当前数据是:” + new String(data));
}
}

——10客户端要知道 服务器到底有几台在线上和下线了。

dream

dream —-服务端添加: 有序号的临时性的结点 注意:Listen 上方是守护线程——即主线程退出子线程(listen也关闭了!)

public static void main(String args[]) {
System.out.println(“主线程开始了!”);
new Thread(new Runnable() {
@Override
public void run() {
// TODO 自动生成的方法存根
System.out.println(“子线程开始了”);
for (int i = 0; i < 10000; i++) {
}
System.out.println(“子线程执行结束…”);
}
}).start();
System.out.println(“主线程结束执行退出。”);
}

public static void main(String args[]) {
System.out.println(“主线程开始了!”);
Threa thread = new Thread(new Runnable() {
@Override
public void run() {
// TODO 自动生成的方法存根
System.out.println(“子线程开始了”);
for (int i = 0; i < 10000; i++) {
}
System.out.println(“子线程执行结束…”);
}
});
thread.setDaemon(true); //添加使 其为守护线程
thread.start(); //线程启动.
}

客户端:获取列表+监听。get父目录的children 这样能知道那些机器在线;负载均衡:感知获取当前连接的机器数 代码练习: 服务器端:

public class DistributedServer {
private static final String connectString = “aclear2:2181,aclear3:2181,aclear5:2181”;
private static final int sessionTimeout = 2000;
private static final String parentNode = “/servers/“;
private ZooKeeper zooKeeper = null;
/**
* 创建到zk的客户端连接
* @throws IOException
/
public void getConnect() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// TODO 自动生成的方法存根
// 收到事件后的回调函数
System.out.println(event.getType() + “—-“ + event.getPath());
try {
//不加这个 h就只能监听一次的 根节点变化。
zooKeeper.getChildren(“/“, true);
} catch (KeeperException | InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
});
}
/\
*
* 注册服务器;上线连接
* @param hostname
* @throws KeeperException
* @throws InterruptedException
/
public void registerServer(String hostname) throws KeeperException, InterruptedException {
String create = zooKeeper.create(parentNode + “server”, hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + “ is online..” + create);
}
/\
*
* 业务功能
* @throws InterruptedException
*/
public void handleBusiness(String hostname) throws InterruptedException {
System.out.println(hostname + “start working…”);
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String args[]) throws IOException, KeeperException, InterruptedException {
//获取zk连接
DistributedServer server = new DistributedServer();
server.getConnect();
//利用Zk链接注册服务器信息
server.registerServer(args[0]);
//启动业务功能 因为ipc等要封装成一个 通用的业务服务器
//new Thread 执行业务线程。
server.handleBusiness(args[0]);
}
}

客户端: 学习:多线程的 volatile b 我们新建的对象的是放置在 堆内存当中的。当我们进行多线程的应用时:是有自己的线程栈控件,有些读 堆内存、有些写堆内存。 怕就怕 他们把b变量拷贝一份拿去进行,自行处理。修改时才返回修改堆内存物理地址上真正的b变流量。为了真正的每个线程每次都是 拿 真正的变量,就这样进行的。 注意以为是临时变量所以: 关闭服务器,结点就会消失————所以就实现了 宕机监听啦! dream ---- 监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑 监听器的注册是在获取数据的操作中实现: getData(path,watch?)监听的事件是:节点数据变化事件 getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件

1.1. zookeeper应用案例(分布式应用HA||分布式锁)

3.7.1 实现分布式应用的(主节点HA)及客户端动态更新主节点状态

某分布式系统中,主节点可以有多台,可以动态上下线 任意一台客户端都能实时感知到主节点服务器的上下线 dream A、客户端实现

public class AppClient { private String groupNode = “sgroup”; private ZooKeeper zk; private Stat stat = new Stat(); private volatile List serverList; / 连接zookeeper / public void connectZookeeper() throws Exception { zk = new ZooKeeper(“localhost:4180,localhost:4181,localhost:4182”, 5000, new Watcher() { public void process(WatchedEvent event) { // 如果发生了”/sgroup”节点下的子节点变化事件, 更新server列表, 并重新注册监听 if (event.getType() == EventType.NodeChildrenChanged && (“/“ + groupNode).equals(event.getPath())) { try { updateServerList(); } catch (Exception e) { e.printStackTrace(); } } } }); updateServerList(); } / 更新server列表 / private void updateServerList() throws Exception { List newServerList = new ArrayList(); // 获取并监听groupNode的子节点变化 // watch参数为true, 表示监听子节点变化事件. // 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册 List subList = zk.getChildren(“/“ + groupNode, true); for (String subNode : subList) { // 获取每个子节点下关联的server地址 byte[] data = zk.getData(“/“ + groupNode + “/“ + subNode, false, stat); newServerList.add(new String(data, “utf-8”)); } // 替换server列表 serverList = newServerList; System.out.println(“server list updated: “ + serverList); } /* client的工作逻辑写在这个方法中 此处不做任何处理, 只让client sleep / public void handle() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { AppClient ac = new AppClient(); ac.connectZookeeper(); ac.handle(); } }

  B、服务器端实现

public class AppServer { private String groupNode = “sgroup”; private String subNode = “sub”; / 连接zookeeper @param address server的地址 */ public void connectZookeeper(String address) throws Exception { ZooKeeper zk = new ZooKeeper( “localhost:4180,localhost:4181,localhost:4182”, 5000, new Watcher() { public void process(WatchedEvent event) { // 不做处理 } }); // 在”/sgroup”下创建子节点 // 子节点的类型设置为EPHEMERAL_SEQUENTIAL, 表明这是一个临时节点, 且在子节点的名称后面加上一串数字后缀 // 将server的地址数据关联到新创建的子节点上 String createdPath = zk.create(“/“ + groupNode + “/“ + subNode, address.getBytes(“utf-8”), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(“create: “ + createdPath); } / server的工作逻辑写在这个方法中 此处不做任何处理, 只让server sleep */ public void handle() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 在参数中指定server的地址 if (args.length == 0) { System.err.println(“The first argument must be server address”); System.exit(1); } AppServer as = new AppServer(); as.connectZookeeper(args[0]); as.handle(); } }

3.7.2分布式共享锁的简单实现

  • 客户端A

public class DistributedClient { // 超时时间 private static final int SESSION_TIMEOUT = 5000; // zookeeper server列表 private String hosts = “localhost:4180,localhost:4181,localhost:4182”; private String groupNode = “locks”; private String subNode = “sub”; private ZooKeeper zk; // 当前client创建的子节点 private String thisPath; // 当前client等待的子节点 private String waitPath; private CountDownLatch latch = new CountDownLatch(1); /* 连接zookeeper */ public void connectZookeeper() throws Exception { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { try { // 连接建立时, 打开latch, 唤醒wait在该latch上的线程 if (event.getState() == KeeperState.SyncConnected) { latch.countDown(); } // 发生了waitPath的删除事件 if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) { doSomething(); } } catch (Exception e) { e.printStackTrace(); } } }); // 等待连接建立 latch.await(); // 创建子节点 thisPath = zk.create(“/“ + groupNode + “/“ + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // wait一小会, 让结果更清晰一些 Thread.sleep(10); // 注意, 没有必要监听”/locks”的子节点的变化情况 List childrenNodes = zk.getChildren(“/“ + groupNode, false); // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁 if (childrenNodes.size() == 1) { doSomething(); } else { String thisNode = thisPath.substring((“/“ + groupNode + “/“).length()); // 排序 Collections.sort(childrenNodes); int index = childrenNodes.indexOf(thisNode); if (index == -1) { // never happened } else if (index == 0) { // inddx == 0, 说明thisNode在列表中最小, 当前client获得锁 doSomething(); } else { // 获得排名比thisPath前1位的节点 this.waitPath = “/“ + groupNode + “/“ + childrenNodes.get(index - 1); // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法 zk.getData(waitPath, true, new Stat()); } } } private void doSomething() throws Exception { try { System.out.println(“gain lock: “ + thisPath); Thread.sleep(2000); // do something } finally { System.out.println(“finished: “ + thisPath); // 将thisPath删除, 监听thisPath的client将获得通知 // 相当于释放锁 zk.delete(this.thisPath, -1); } } public static void main(String[] args) throws Exception { for (int i = 0; i < 10; i++) { new Thread() { public void run() { try { DistributedClient dl = new DistributedClient(); dl.connectZookeeper(); } catch (Exception e) { e.printStackTrace(); } } }.start(); } Thread.sleep(Long.MAX_VALUE); } }

  • 分布式多进程模式实现:

public class DistributedClientMy { // 超时时间 private static final int SESSION_TIMEOUT = 5000; // zookeeper server列表 private String hosts = “spark01:2181,spark02:2181,spark03:2181”; private String groupNode = “locks”; private String subNode = “sub”; private boolean haveLock = false; private ZooKeeper zk; // 当前client创建的子节点 private volatile String thisPath; / 连接zookeeper / public void connectZookeeper() throws Exception { zk = new ZooKeeper(“spark01:2181”, SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { try { // 子节点发生变化 if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(“/“ + groupNode)) { // thisPath是否是列表中的最小节点 List childrenNodes = zk.getChildren(“/“ + groupNode, true); String thisNode = thisPath.substring((“/“ + groupNode + “/“).length()); // 排序 Collections.sort(childrenNodes); if (childrenNodes.indexOf(thisNode) == 0) { doSomething(); thisPath = zk.create(“/“ + groupNode + “/“ + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } } catch (Exception e) { e.printStackTrace(); } } }); // 创建子节点 thisPath = zk.create(“/“ + groupNode + “/“ + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // wait一小会, 让结果更清晰一些 Thread.sleep(new Random().nextInt(1000)); // 监听子节点的变化 List childrenNodes = zk.getChildren(“/“ + groupNode, true); // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁 if (childrenNodes.size() == 1) { doSomething(); thisPath = zk.create(“/“ + groupNode + “/“ + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } / 共享资源的访问逻辑写在这个方法中 / private void doSomething() throws Exception { try { System.out.println(“gain lock: “ + thisPath); Thread.sleep(2000); // do something } finally { System.out.println(“finished: “ + thisPath); // 将thisPath删除, 监听thisPath的client将获得通知 // 相当于释放锁 zk.delete(this.thisPath, -1); } } public static void main(String[] args) throws Exception { DistributedClientMy dl = new DistributedClientMy(); dl.connectZookeeper(); Thread.sleep(Long.MAX_VALUE); } }

(っ•̀ω•́)っ✎⁾⁾ 坚持技术学习、内容输出与分享,您的支持将鼓励我继续创作!(*/ω\*)
( • ̀ω•́ )✧如有疑问或需要技术讨论,请留言或发邮件到 aclearzhang@qq.com.(*・ω< ) 
  • 本文作者:: AClearZhang
  • 本文链接:: 1050.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议。转载请注明出处!