5.1 设置环境变量

1
Vim	/etc/profile

将我的路径改为你自己的

1
2
3
#flink114
export FLINK_HOME=/opt/module/flink114
export PATH=$FLINK_HOME/bin:$PATH

记得分发和更新

1
source /etc/profile

5.2 配置文件

cd 到你解压的 flink114 的目录下

1
cd /xxx/xxx/flink114/conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
high-availability: zookeeper
high-availability.storageDir: hdfs://mycluster/flink/ha
high-availability.zookeeper.quorum: master:2181,slave1:2181,slav2:2181
high-availability.cluster-id: /flink_cluster
high-availability.jobmanager.port: 6123

jobmanager.rpc.address: master
jobmanager.rpc.port: 6123

jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m

taskmanager.numberOfTaskSlots: 2

state.backend: filesystem
state.checkpoints.dir: hdfs://mycluster/flink/checkpoints
state.savepoints.dir: hdfs://mycluster/flink/savepoints

rest.port: 8081

5.2.2 编辑 zoo.cfg

在下方修改并添加

1
2
3
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

5.2.3 编辑 master

添加你的主机和从节点的hosts和他们的flink端口号

(主节点和失效后的备用节点)

1
2
master:8081
slave1:8081

5.2.4 编辑 workers

添加你的主机和从节点的hosts

(工作作业的节点)

1
2
slave1
slave2

5.3 启动Flink集群

启动JobManager
在JobManager节点上启动Flink的JobManager:

1
bin/jobmanager.sh start cluster

确保在多个节点上启动JobManager,以实现高可用性。例如,在jobmanager2节点上也启动JobManager:

1
bin/jobmanager.sh start cluster

启动TaskManager
在所有TaskManager节点上启动Flink的TaskManager:

1
bin/taskmanager.sh start

一键启动

1
bin/start-cluster.sh

5.3.3 验证高可用性配置

可以通过Flink的Web UI(默认地址为http://jobmanager1:8081)来查看集群的状态。如果JobManager节点发生故障,ZooKeeper会协调新的JobManager节点接管任务,从而实现高可用性。

flink高可用web截图

5.3.4 运行示例作业

可以通过Flink CLI提交一个示例作业来验证集群的正常运行:

1
bin/flink run examples/streaming/WordCount.jar