大数据之Azkaban部署
一、Azkaban概论
1.1 为什么需要工作流调度系统
- 一个完整的数据分析系统通常都是由大量任务单元组成:Shell脚本程序,Java程序,MapReduce程序,Hive脚本等;
- 各任务单元之间存在时间先后及前后依赖关系;
- 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;
1.2 常见工作流调度系统
- 简单的任务调度:直接使用 Linux 的 Crontab 来定义;
注⚠️:Crontab不能处理具有依赖关系的任务调度! - 复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如 Ooize、Azkaban、 Airflow、DolphinScheduler 等。
1.3 Azkaban 与 Oozie 对比
Ooize 相比 Azkaban 是一个重量级的任务调度系统,功能全面,但配置使用
也更复杂。如果可以不在意某些功能的缺失,轻量级调度器 Azkaban 是很不错的候选对象。
二、Azkaban集群安装
2.1 集群模式安装
2.1.1 安装包准备
-
将 azkaban-db-3.84.4.tar.gz,azkaban-exec-server-3.84.4.tar.gz,azkaban-web-server-3.84.4.tar.gz 上传到 hadoop102 的/opt/software 路径;
-
新建/opt/module/azkaban 目录,并将所有 tar 包解压到这个目录下:
[xiaobai@hadoop102 azkaban]$ mkdir /opt /module/azkaban
[xiaobai@hadoop102 azkaban]$ tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
[xiaobai@hadoop102 azkaban]$ tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
[xiaobai@hadoop102 azkaban]$ tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
- 进入/opt/module/azkaban 目录,修改名称:
[xiaobai@hadoop102 azkaban]$ mv azkaban-web-server-3.84.4/ azkaban-web
[xiaobai@hadoop102 azkaban]$ mv azkaban-exec-server-3.84.4/ azkaban-exec
[xiaobai@hadoop102 azkaban]$ ll
total 4
drwxr-xr-x. 2 xiaobai xiaobai 4096 Apr 18 2020 azkaban-db-3.84.4
drwxr-xr-x. 6 xiaobai xiaobai 55 Apr 18 2020 azkaban-exec
drwxr-xr-x. 6 xiaobai xiaobai 51 Apr 18 2020 azkaban-web
2.1.2 配置 MySQL
- 启动MySQL:
[xiaobai@hadoop102 azkaban]$ mysql -uroot -p******
- 登陆 MySQL,创建 Azkaban 数据库:
mysql> create database azkaban;
Query OK, 1 row affected (0.19 sec)
- 创建 azkaban 用户并赋予权限 设置密码有效长度 4 位及以上
mysql> set global validate_password_length=4;
Query OK, 0 rows affected (0.07 sec)
- 设置密码策略最低级别(ps: 仅测试使用!)
mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)
- 创建 Azkaban 用户,任何主机都可以访问 Azkaban, 密码******:
mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY '******';
Query OK, 0 rows affected (0.17 sec)
赋予 Azkaban 用户增删改查权限:
mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
Query OK, 0 rows affected (0.03 sec)
- 创建 Azkaban 表,完成后退出 MySQL:
mysql> use azkaban;
Database changed
mysql> source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql;
mysql> show tables;
+-----------------------------+
| Tables_in_azkaban |
+-----------------------------+
| QRTZ_BLOB_TRIGGERS |
| QRTZ_CALENDARS |
| QRTZ_CRON_TRIGGERS |
| QRTZ_FIRED_TRIGGERS |
| QRTZ_JOB_DETAILS |
| QRTZ_LOCKS |
| QRTZ_PAUSED_TRIGGER_GRPS |
| QRTZ_SCHEDULER_STATE |
| QRTZ_SIMPLE_TRIGGERS |
| QRTZ_SIMPROP_TRIGGERS |
| QRTZ_TRIGGERS |
| active_executing_flows |
| active_sla |
| execution_dependencies |
| execution_flows |
| execution_jobs |
| execution_logs |
| executor_events |
| executors |
| project_events |
| project_files |
| project_flow_files |
| project_flows |
| project_permissions |
| project_properties |
| project_versions |
| projects |
| properties |
| ramp |
| ramp_dependency |
| ramp_exceptional_flow_items |
| ramp_exceptional_job_items |
| ramp_items |
| triggers |
| validated_dependencies |
+-----------------------------+
35 rows in set (0.01 sec)
mysql> quit;
Bye
所需脚本为create-all-sql-3.84.4.sql
,如图:
- 更改 MySQL 包大小;防止 Azkaban 连接 MySQL 阻塞:
[xiaobai@hadoop102 azkaban]$ sudo vim /etc/my.cnf
max_allowed_packet=1024M
注⚠️: 要放在在[mysqld]下面哦!
- 重启 MySQL:
[xiaobai@hadoop102 azkaban]$ sudo systemctl restart mysqld
2.1.3 配置 Executor Server
Azkaban Executor Server 处理工作流和作业的实际执行。
- 在/opt/module/azkaban/azkaban-exec/conf目录下编辑 azkaban.properties:
[xiaobai@hadoop102 conf]$ vim azkaban.properties default.timezone.id=Asia/Shanghai
- 同步分发
azkaban-exec
:
[xiaobai@hadoop102 conf]$ xsync /opt/module/azkaban/azkaban-exec/
- 需进入
/opt/module/azkaban/azkaban-exec
路径,分别在三台机器上,启动executor server
:
[xiaobai@hadoop102 azkaban-exec]$ bin/start-exec.sh
[xiaobai@hadoop103 azkaban-exec]$ bin/start-exec.sh
[xiaobai@hadoop104 azkaban-exec]$ bin/start-exec.sh
停止命令:
[xiaobai@hadoop102 azkaban-exec]$ bin/shutdown-exec.sh
注⚠️:
若在/opt/module/azkaban/azkaban-exec
目录下出现 executor.port
文件:
或在azkaban数据库中的executors
表中出现我们所连接的主机名称及端口号,则说明启动成功!
- 激活 executor,如下status显示success,则说明激活成功!
[xiaobai@hadoop102 azkaban-exec]$ curl -G "hadoop102:12321/executor?action=activate" && echo
{"status":"success"}
[xiaobai@hadoop103 azkaban-exec]$ curl -G "hadoop102:12321/executor?action=activate" && echo
{"status":"success"}
[xiaobai@hadoop104 azkaban-exec]$ curl -G "hadoop102:12321/executor?action=activate" && echo
{"status":"success"}
如图,激活成功后active栏显示为1!
2.1.4 配置 Web Server
Azkaban Web Server 处理项目管理,身份验证,计划和执行触发。
- 在
/opt/module/azkaban/azkaban-web/conf
目录下编辑azkaban.properties
:
[xiaobai@hadoop102 conf]$ vim azkaban.properties
修改如下属性:
tips:
StaticRemainingFlowSize:正在排队的任务数;
CpuStatus:CPU占用情况:
MinimumFreeMemory:内存占用情况。
- 修改
azkaban-users.xml
文件,添加用户xiaobai
:
[xiaobai@hadoop102 conf]$ vim azkaban-users.xml
<user password="******" roles="admin" username="xiaobai"/>
- 需进入到 hadoop102 的
/opt/module/azkaban/azkaban-web
路径,启动web server
:
[xiaobai@hadoop102 azkaban-web]$ bin/start-web.sh
停止命令:
[xiaobai@hadoop102 azkaban-web]$ bin/shutdown-web.sh
- 访问
http://hadoop102:8081
,并用xiaobai
用户登录:
2.2 Work Flow 案例
2.2.1 HelloWorld
- 创建
command.job
文件:
jane@janedeMacBook-Pro Desktop % vim command.job
#command.job
type=command
command=echo 'hello world'
压缩command.job,在WebServer新建项目,点击Create Project==>Upload==>command.job⬇️
点击Execute Flow⬇️
点击Execute ⬇️
Continue⬇️
工作单元日志在Job List⬇️
2.2.2 作业依赖案例
需求:
job A和job B执行完之后才能执行jobC。
实现:
- 新建
first.project
文件,编辑内容如下:
azkaban-flow-version: 2.0
注⚠️: 该文件作用,是采用新的 Flow-API 方式解析 flow
文件。
- 新建
first.flow
文件,内容如下
nodes:
- name: jobA
type: command
config:
command: echo "hello world A"
- name: jobB
type: command
config:
command: echo "hello world B"
- name: jobC
type: command
#jobC依赖于jobA和jobB
dependsOn:
- jobA
- jobB
config:
command: echo "hello world C"
注⚠️:
dependsOn与name/type对齐 是数组类型!
1). Name: job 名称;
2). Type: job 类型;command 表示要执行作业的方式为命令;
3). Config: job 配置。
将文件压缩上传至Azkaban Web Client,点击Execute flow,可以看到jobA / joB / jobC之间的依赖关系:
查看job list:
2.2.3 自动失败重试案例
需求:
如果执行任务失败,需要重试 3 次,重试的时间间隔 10000ms。
实现:
编译配置流: autoretry.flow:
nodes:
- name: jobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 5000
tips:
retries: 重试次数;
retry.backoff: 重试的时间间隔
注⚠️:
可在 Flow 全局配置中添加任务失败重试配置,此时重试配置会应用到所有 Job:
config:
retries: 3
retry.backoff: 10000
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
2.2.4 手动失败重试案例
需求:
JobA=》JobB(依赖于 A)=》JobC=》JobD=》JobE=》JobF。生产环境,任何 Job 都 有可能挂掉,可以根据需求执行想要执行的 Job。
实现:
编译配置流handsretry.flow
:
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: echo "This is JobC."
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
- name: JobF
type: command
dependsOn:
- JobE
config:
command: echo "This is JobF."
jobA-jobF执行图:
若是有失败的job,可在history==>flow里进行查看
之后点击prepare execution可接着从失败的job开始执行:
或在excute flow中手动将已经成功的job设为disable / children,再继续点击execute!
tips:
Enable 和 Disable 下面都分别有如下参数:
Parents: 该作业的上一个任务;
Ancestors: 该作业前的所有任务;
Children: 该作业后的一个任务;
Descendents: 该作业后的所有任务;
Enable All: 所有的任务。
三、Azkaban进阶
3.1 JavaProcess 作业类型案例
JavaProcess 类型可以运行一个自定义主类方法,type 类型为 javaprocess
,可用的配置为:
Xms:最小堆;
Xmx:最大堆;
classpath:类路径;
java.class:要运行的 Java 对象,其中必须包含 Main 方法 main.args:main 方法的参数。
🌰:
- 新建一个azkaban的maven工程;
- 创建包名: cpm.xiaobai;
- 创建
TestJavaProcess
类:
package com.xiaobai;
public class TestJavaProcess {
public static void main(String[] args) {
System.out.println("Java Process Test!");
}
}
- 打包jar包
azkaban-javaprocess-1.0-SNAPSHOT.jar
- 新建
javatest.flow
:
nodes:
- name: test_java
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.xiaobai.TestJavaProcess
- 将jar包/ flow包/ project文件一起打包成javatest.zip;
- 创建prohect–>upload–>executor:
3.2 条件工作流案例
条件工作流功能允许用户自定义执行条件来决定是否运行某些 Job。条件可以由当前 Job 的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job 执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。
3.2.1 运行时参数案例
基本原理
- 父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件;
- 子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件。
支持的条件运算符
- == 等于
- != 不等于
- > 大于
- >= 大于等于
- < 小于
- <= 小于等于
- && 与
- || 或
- ! 非
🌰:
需求:
JobA 执行一个 shell 脚本。
JobB 执行一个 shell 脚本,但 JobB 不需要每天都执行,而只需要每个周一执行。
实现:
- 新建 JobA.sh:
#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
- 新建 JobB.sh:
#!/bin/bash
echo "do JobB"
- 新建
condition.flow
:
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${JobA:wk} == 1
将 JobA.sh、JobB.sh、condition.flow 和 frist.project 打包成 condition.zip
;在Azkaban Web Client执行:
JobA执行成功,JobB cancelled,要等到周一才执行jobB。
3.2.2 预定义宏案例
Azkaban 中预置了几个特殊的判断条件,称为预定义宏
。
预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行;可用的预定义宏如下:
- all_success: 表示父 Job 全部
成功
才执行(默认); - all_done:表示父 Job 全部
完成
才执行; - all_failed:表示父 Job 全部失败才执行;
- one_success:表示父 Job 至少一个成功才执行;
- one_failed:表示父 Job 至少一个失败才执行。
需求:
JobA 执行一个 shell 脚本;
JobB 执行一个 shell 脚本;
JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行;
实现:
- 新建 JobA.sh:
#!/bin/bash
echo "do JobA"
- 新建 JobC.sh:
#!/bin/bash
echo "do JobC"
- 新建 macro.flow:
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success
JobA.sh、JobC.sh、macro.flow、frist.project 文件,打包成 macro.zip;在Azkaban Web Client执行:
JobA success,JobB fail,JobC success!
3.3 定时执行案例
需求:
JobA 每间隔 1 分钟执行一次;
实现:
在执行工作流时候,选择左下角 Schedule
填写定时规则:
可在history中查看first的执行情况,每间隔一分钟执行一次:
scheduling中有next execution time可查看下一次执行时间和remove schedule选项移除掉这个定时任务:
3.4 邮件报警案例
3.4.1 登录邮箱开启SMTP
在邮箱的账户里面开启SMTP服务:
复制授权码:
3.4.2 默认邮件报警案例
Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下:
- 在/opt/module/azkaban/azkaban-web/conf目录下修改
azkaban.properties
文件:
[xiaobai@hadoop102 conf]$ vim azkaban.properties
- 需重启
web-server
:
[xiaobai@hadoop102 azkaban-web]$ bin/shutdown-web.sh
[xiaobai@hadoop102 azkaban-web]$ bin/start-web.sh
3.5 电话报警案例
3.5.1 第三方告警平台集成
有时任务执行失败后邮件报警接收不及时,因此可能需要其他报警方式,比如电话报警。
此时,可与第三方告警平台进行集成,例如睿象云。
- 官网地址:
https://www.aiops.com/ - 集成告警平台,使用 Email 集成;
- 获取邮箱地址,后需将报警信息发送至该邮箱;
- 可配置分派策略/ 通知策略;
注⚠️: 若要使用睿象云集成方式,则不可用QQ邮箱,需修改邮箱地址。
3.5.2 Azkaban多Executor模式注意事项
Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下, Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。
为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使 用方案二。
方案一:
指定特定的 Executor(hadoop102)去执行任务。
-
在 MySQL 中 azkaban 数据库 executors 表中,查询 hadoop102 上的 Executor 的 id;
-
在执行工作流程时加入 useExecutor 属性:
方案二⭐️:
在 Executor 所在所有节点部署任务所需脚本和应用。