微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

flink实战—flink on yarn

flink aide_941 54℃

flink实战—flink on yarn

 版权声明:原创文章 欢迎参考 请勿抄袭 https://blog.csdn.net/aA518189/article/details/83583392

flink  on  yarn  配置

如果我们打算把flink任务提交到yarn上,那么只需要一台flink客户机即可,但是需要有如下的配置

第一步:设置 HADOOP_CONF_DIR环境变量

export HADOOP_CONF_DIR=/usr/local/service/hadoop/etc/hadooop/

注意:如果在flink的配置文件中设置Hadoop的环境变量,可以不在环境变量设置,只需要在配置文件中添加:

env.yarn.conf.dir: /usr/local/service/hadoop/etc/hadoop

第二步:修改flink-conf.yaml配置文件

添加如下配置:其他的参数 默认即可

  1. #设置环境变量
  2. env.yarn.conf.dir: /usr/local/service/hadoop/etc/hadoop
  3. env.yarn.hadoop.conf.dir: /usr/local/service/hadoop/etc/hadooop
  4. env.java.home: /usr/local/jdk
  5. #设置检查点的存储目录
  6. state.checkpoints.dir: hdfs://127.0.0.1/flink_checkpoints
  7. #设置检查点保存的数据 默认是一个
  8. state.checkpoints.num-retained: 5

遇到的问题

如果提交jar到yarn上运行报如下错误:

java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties

解决方式:

在flink的lib包里面添加如下的两个jar包:jar的下载地址:https://download.csdn.net/download/aa518189/

flink 运行模式

Flink 和spark一样有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。

实战开发主要使用Yarn Cluster模式,所以本文主要介绍yarn  模式下flink任务的执行和资源分配。

Yarn Cluster 模式

在图中可以看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用自己的 Container 来启动 Flink 的 JobManager(也就是 App Master)和 TaskManager。flink on yarn模式下 TM和yarn的AppMaster在一个容器内,一起管理任务调度。

flink 任务提交到yarn上的全流程

第一步:

向资源管理器(ResourceManager)请求,要运行一个程序。中获取新的作业ID(jobId),以及程序资源存储路径

第二步

ResourceManager检查作业的输出说明,然后返回一个存放程序资源的路径以及jobId,这个路径在hdfs的tmp文件夹中,如果程序中没有指定输出目录或指定的输出目录已经存在,作业就不提交,错误返回给flink程序

就是这个路径存放程序资源,包括程序的jar包,job的配置文件等。

第三步

将作业资源(包括JAR、配置和信息)复制到HDFS。

第五步:

通过调用资源管理器上的submitApplication()方法提交作业。

第六步

资源管理器收到调用它的submitApplication()消息后,如果容器不够,任务会现在等待队列中等待,之后便将请求传递给调度器(Schedule),调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动应用程序master进程也就是MRAPPMaster。

flink作业的application master是一个Java应用程序,它的主类是MRAPPMaster他对作业进行初始化,通过创建多个薄记对象以保持对作业进度的跟踪,因为他将接受来自任务的进度和完成报告

第七步

MRAPPMaster根据配置信息,获知要启动多少个TaskManger,向ResourceManager请求容器

第八步

一旦资源管理器的调度器为任务分配了容器,MRAPPMaster(application master) 就通过与节点管理器NodeManager通信来启动容器向已获得容器的TaskManger发从启动命令,也就是主类为YarnChild程序

Flink在yarn上运行两种的模式

第一种:

一种是让 Yarn 直接启动 JobManager 和 TaskManager

在yarn上运行一个flink job

注意:一定要设置 yn  yim ytm 的数值,不然flink任务可能会得不到资源一直处于卡顿状态

  1. ./bin/flink run -m yarn-cluster -p 5 -yn 4 -yjm 1024 -ytm 1024 -ys 2 -yqu root.default./examples/batch/WordCount.jar

直接启动参数详解

  1. -c,——class <classname>类的程序入口点
  2. p,——parallelism <parallelism>运行程序。可选的要覆盖的标志中指定的默认值
  3. s,——fromSavepoint <savepointPath> 从指定的保存点恢复程序,从(例如hdfs:/ / / flink /保存点- 1537)
  4. yj,——yarnjar <arg>路径到Flink jar文件
  5. yjm,——yarnjobManagerMemory <arg> 每个JobManager Container内存大小 可选单位(默认:MB)
  6. yn,——容器<arg>要分配的容器数量(=taskManager数量)
  7. -ynm,——yarnname <arg>设置应用程序的自定义名称
  8. -yqu,——yarnqueue <arg>指定yarn队列。
  9. -ys,——yarn slot <arg>每个任务管理器的卡槽数(slot)
  10. ytm,——yarntaskManagerMemory <arg> 每一个TaskManager Container 内存大小可选单位(默认:MB)
  11. -yid,- yarnapplicationId <arg>附加到正在运行的yarn会话

第二种:

是在运行 Flink Workload 的时候启动 Flink 的模块。前者相当于让 Flink 的模块处于 Standby 的状态。这是因为 Flink 实现了 Yarn 的 Client,因此需要 Yarn 的一些配置和 Jar 包。在配置好环境变量后,只需简单的运行如下的脚本,Yarn 就会启动 Flink 的 JobManager 和 TaskManager。

先启动集群:

 ./bin/yarn-session.sh  -n  2 -jm  1024 -tm  1024 -s 2

上面的意思是:向 Yarn 申请 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 1024 的内存

再提交任务

./bin/flink run com.demo.florian.WordCount  ./flink-demo-1.0-SNAPSHOT.jar

启动session的指令参数

  1.  必选
  2.       -n,–container     分配多少个yarn容器 (=taskmanager的数量)
  3. 可选
  4.       -d,–detached                    独立运行
  5.       -jm,–jobManagerMemory      JobManager的内存 [in MB]
  6.       -nm,–name                       在YARN上为一个自定义的应用设置一个名字
  7.       -q,–query                       显示yarn中可用的资源 (内存, cpu核数)
  8.       -qu,–queue                 指定YARN队列.
  9.       -s,–slots                  每个TaskManager使用的slots数量
  10.       -tm,–taskManagerMemory     每个TaskManager的内存 [in MB]
  11.       -z,–zookeeperNamespace      针对HA模式在zookeeper上创建NameSpace

run 提交任务的指令参数

  1. -c,–class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
  2. m,–jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址
  3. 使用这个参数可以指定一个不同于配置文件中的jobmanager
  4. p,–parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。

flink  on yarn 任务设置用户

通过上面的参数指令我们发现flink目前还没有提供设置用户的参数,但是实际开发 需要分用户,所有这个问题还是要解决的。既然参数指令中没有,于是尝试在代码中设置,添加如下代码

System.setProperty("HADOOP_USER_NAME","wangzhihua");

发现任务起来后还是hadoop用户,后来知道flink启动时使用当前用户作为任务的用户,所有我们可以更改flink的启动脚本bin/flink文件,设添加获取当前用户的功能:在flink的启动脚本中添加

export HADOOP_USER_NAME=wangzhihua    

但是 这么写死显然不方便,我们可以改成spark那种在启动脚本中传参的方式,我们知道 flink有一个  -yD 的参数指令可以指定k-v类型的数据,因此改成如下的方式:在flink的启动脚本中添加如下的代码

获取-yD指定的用户为flink当前用户

  1. IFS=‘ ‘ arr=($@)
  2. for(( i=0;i<${#arr[@]};i++)) do
  3. if [ “-yD” = ${arr[i]} ]; then
  4. let i++
  5. IFS=‘=’ arr2=(${arr[i]})
  6. if [ “name” = ${arr2[0]} ]; then
  7. export HADOOP_USER_NAME=${arr2[1]}
  8. fi
  9. fi
  10. done
  11. echo “HADOOP_USER_NAME: $HADOOP_USER_NAME”
  12. echo “USER: $USER”

使用:-yD name=wnagzhxxx(指定任务用户)

 ./flink run  -m yarn-cluster  -yn 5 -yjm 1024 -ytm 4096  -yD name=wangzhxxx -ys 4 -p 10 -ynm flink_sql_commmand -c  Data2Es  /data2es_test_eg-1.0-SNAPSHOT.jar  

slot和parallelism

1.slot是指taskmanager的并发执行能力

在hadoop 1.x 版本中也有slot的概念,有兴趣的读者可以了解一下

taskmanager.numberOfTaskSlots:3

每一个taskmanager中的分配3个TaskSlot,3个taskmanager一共有9个TaskSlot

2.parallelism是指taskmanager实际使用的并发能力

parallelism.default:1

运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲。设置合适的并行度才能提高效率。

3.parallelism是可配置、可指定的

1.可以通过修改$FLINK_HOME/conf/flink-conf.yaml文件的方式更改并行度

2.可以通过设置$FLINK_HOME/bin/flink 的-p参数修改并行度

3.可以通过设置executionEnvironmentk的方法修改并行度

4.可以通过设置flink的编程API修改过并行度

5.这些并行度设置优先级从低到高排序,排序为api>env>p>file.

6.设置合适的并行度,能提高运算效率

7.parallelism不能多与slot个数。

4.slot和parallelism总结

1.slot是静态的概念,是指taskmanager具有的并发执行能力

2.parallelism是动态的概念,是指程序运行时实际使用的并发能力

3.设置合适的parallelism能提高运算效率,太多了和太少了都不行

4.设置parallelism有多中方式,优先级为api>env>p>file

转载请注明:SuperIT » flink实战—flink on yarn

喜欢 (1)or分享 (0)