airflow安装及使用入门(linux)

目录

airflow

概述

安装

安装python环境

安装Airflow

修改数据库为MySQL

修改执行器

配置邮件服务器

常用命令

airflow

概述

Airflow是一个以编程方式编写,安排和监视工作流的平台

主要用于任务调度的安排;使用Airflow将工作流编写任务的有向无环图(DAG)。Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务

安装

官网:Apache Airflow

安装python环境

1.首先安装anaconda/miniconda

下载地址:

可以从官网下载 :Anaconda | Anaconda Distribution

下载完成后在命令行中安装即可

 bash Anaconda3-2022.10-Linux-x86_64.sh

二者的区别:

conda是一种通用包管理系统,旨在构建和管理任何语言和任何类型的软件。举个例子:包管理与pip的使用类似,环境管理则允许用户方便地安装不同版本的python并可以快速切换。

Anaconda则是一个打包的集合,里面预装好了conda、某个版本的python、众多packages、科学计算工具等等,就是把很多常用的不常用的库都给你装好了。

Miniconda,顾名思义,它只包含最基本的内容——python与conda,以及相关的必须依赖项,对于空间要求严格的用户,Miniconda是一种选择。就只包含最基本的东西,其他的库得自己装。

随后加载环境变量配置文件,使其生效:

source ~/.bashrc

2.创建python3.8环境

首先配置国内镜像:

conda config --add channels Index of /anaconda/pkgs/free/ | 北京外国语大学开源软件镜像站 | BFSU Open Source Mirror

conda config --add channels Index of /anaconda/pkgs/main/ | 北京外国语大学开源软件镜像站 | BFSU Open Source Mirror

conda config --set show_channel_urls yes

创建python环境:

conda create --name airflow python=3.8

查看python版本:python -V

常用conda命令

禁止激活默认base环境:

conda config --set auto_activate_base false

创建环境:conda create -n env_name

查看所有环境:conda info --envs

删除一个环境:conda remove -n env_name --all

激活环境:conda activate airflow

退出当前环境:conda deactivate

安装Airflow

首先修改pip的源,下载速度更快:

pip install numpy -i Simple Index

然后创建pip的配置文件,使得修改的pip源生效:

sudo mkdir ~/.pip

sudo vim ~/.pip/pip.conf

添加以下内容:

[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = https://pypi.tuna.tsinghua.edu.cn

安装airflow:pip install "apache-airflow==2.4.3"

出现上图所示标识,则说明安装的pip源已经生效;

初始化airflow:airflow db init

创建账号:

airflow users create \
--username admin \
--firstname why \
--lastname why \
--role Admin \
--email 3323929507@qq.com

密码:123456

查看版本:airflow version

启动airflow web服务:airflow webserver -p 8089 -D(为防止8080端口被占用,选用不常用的端口)4ddbf71e2f309bb860aab8e64b207a0a.png

启动airflow调度:airflow scheduler -D

启动之后可以看到许多默认的调度示例(如上);

启停脚本:

#!/bin/bash

case $1 in
"start"){
    echo " --------启动 airflow-------"
    ssh 你的服务器名称 "conda activate airflow;airflow webserver -p 8089 -D;airflow scheduler -D; conda deactivate"
};;
"stop"){
    echo " --------关闭 airflow-------"
    ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15 
};;
esac

修改数据库为MySQL

点击查看相关指导操作;

首先在MySQL中建库:

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

由于版本不匹配,可能出现ssl证书的问题,可以选择关闭:

查看ssl是否开启:SHOW VARIABLES LIKE '%ssl%';

如何关闭?

sudo vim /etc/my.cnf

加入以下内容:skip_ssl

除此之外还可能出现时间戳问题:1067 - Invalid default value for ‘update_at’

原因:字段 'update_at' 为 timestamp类型,取值范围是:1970-01-01 00:00:00 到 2037-12-31 23:59:59(UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

解决方法:在配置文件中写入:sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

或者在命令行:set session sql_mode='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

然后添加python连接的依赖:

先安装驱动:pip install mysql-connector-python

再修改配置文件:vim airflow/airflow.cfg

添加以下内容:sql_alchemy_conn = mysql+mysqlconnector://root:hadoop@服务器名称:3306/airflow_db

操作完成后重启数据库:sudo systemctl restart mysqld

注意:数据库必须重启,否则操作不生效

然后停止当前airflow服务,重新初始化后启动;创建账号进行登录

修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞

可以在配置文件中修改对应的调度器:vim airflow/airflow.cfg

选择LocalExecutor,KubernetesExecutor等均可;

然后重启airflow

eae72cb2b6d0456b2f98f66b81e45bbf.png

配置邮件服务器

修改airflow.cfg文件的内容:

主要参数:

smtp_host:SMTP服务器地址(在邮箱客户端可以查看)

port:SMTP服务器端口,163邮箱为25,QQ邮箱为465或587

修改代码,添加发送邮件的任务:

#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta

default_args = {
    # 用户
    'owner': 'why',
    # 是否开启任务依赖
    'depends_on_past': True, 
    # 邮箱
    'email': ['why200011124@163.com'],
    # 启动时间
    'start_date':datetime(2023,1,19),
    # 出错是否发邮件报警
    'email_on_failure': False,
    # 重试是否发邮件报警
    'email_on_retry': False,
    # 重试次数
    'retries': 1,
    # 重试时间间隔
    'retry_delay': timedelta(minutes=5),
}
# 声明任务图
dag = DAG('test_email', default_args=default_args, schedule_interval=timedelta(days=1))
 
# 创建单个任务
t1 = BashOperator(
    # 任务id
    task_id='t1',
    # 任务命令
    bash_command='ssh hadoop102 "echo "task1" >> /home/why/data/test_airflow.txt"',
    # 重试次数
    retries=3,
    # 把任务添加进图中
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='ssh hadoop102 "echo "task2" >> /home/why/data/test_airflow.txt"',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='ssh hadoop102 "echo "task3" >> /home/why/data/test_airflow.txt"',
    retries=3,
    dag=dag)

email=EmailOperator(
   task_id="email",
   to="3323929507@qq.com",
    subject="test-subject",
    html_content="<h1>test-content</h1>",
    cc="why200011124@163.com",
   dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)

参数含义:

to:收件人地址

subject:邮件主题

html_content:邮件内容(html格式)

cc:抄送人地址

然后重启airflow即可;

常用命令

首先需要切入到airflow的conda环境下;

airflow --help:查看帮助信息;

airflow users -help:查看具体命令(users)的帮助信息:

airflow users list:查看所有用户

airflow webserver -p 8089 -D:启动web端

airflow scheduler -D:启动调度器;

airflow dags list:查看所有任务

airflow tasks list test --tree:查看单个任务(test)(以树形结构打印)