帮助与文档

我们为你提供丰富、详尽的使用指南以及产品文档

Airflow on QingCloud AppCenter 用户手册

1.描述

Airflow on QingCloud AppCenter 将 Airflow 通过云应用的形式在 QingCloud AppCenter 部署,具有如下特性:

  • 目前提供单机版和集群版(后续开发中)2个版本,分别满足不同作业数量级别的需求。
  • 单机版提供本地执行器,采用伪分布模式,通过在本地用 python 的多进程库从而达到以多进程的方式调度和运行作业的效果。如果只有适量的作业数量,可以采取这种方式。
  • 集群版提供分布式执行器,采用分布式模式,集群按照计划和负载均衡调度作业,并在作业集群上接收和运行作业。作为高度可伸缩的分布式架构,集群版适合作业数量大系统负载重的需求。
  • Airflow on QingCloud AppCenter 支持 Airflow v1.8.0 版本。
  • 提供实时监控、健康检查、日志自动清理等功能,方便用户运维。
  • Airflow on QingCloud AppCenter 集成了开源离线同步工具 DataX3.0,方便编写各种异构数据源之间稳定高效的数据同步 job。
  • 一键部署,开箱即用。

2.简介

  • AirflowAirbnb 开源的 data pipeline 调度和监控工作流的平台,用于用来创建、监控和调度 data pipeline(ETL),目前项目仍在 Apache Software Foundation 孵化中。

  • Airflow 将 workflow 编排为 tasks 组成的 DAGs(Directed Acyclic Graph) 即有向无环图,调度器在一组workers上按照指定的依赖关系执行tasks。

  • Airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,基于 Web 的用户界面可以可视化管道的依赖关系、监控进度、触发任务等。

  • 这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres 和 S3 交互的能力,并且提供了钩子使得系统拥有很好的扩展性。

  • 不同于其它调度器使用XML或者text文件方式定义工作流,airflow 通过 python 文件定义工作流,用户可以通过代码完全自定义自己的工作流。

类似的产品有:Azkaban (LinkedIn)、 Luigui (Spotify) 和 Oozie (Yahoo)

3.部署

目前 Airflow on QingCloud AppCenter 提供单机版版本,后续会尽快推出集群版:

  • 单机版版本号为: Airflow1.8.0-Standalone-V1.0.0

对于单机版,对 Airflow 进行一个适度的安装,包括如下组件:
Airflow Web 服务器
Airflow 调度器
元数据库(PostgreSQL)
上述两个组件( Web 服务器和调度器)运行于一个单机上,数据库可以选择运行在单机之上,或者选择运行在一个远程的依赖数据库集群中。

对于集群版(开发中),用于可扩展的生产环境中,Airflow 含有以下组件:
一个元数据库(PostgreSQL)
一组Airflow工作节点
一个调节器(RabbitMQ)
一个Airflow Web服务器

两个版本的创建步骤类似,以下以单机版为例具体说明创建步骤。

第1步:基本设置

第1步: 基本设置 根据自己的需求填写 应用名称应用描述,选择版本为单机版(Airflow1.8.0-Standalone-V1.0.0)。

第2步:Airflow节点设置

第2步: Airflow节点设置 CPU,内存,节点类型,磁盘大小根据自己实际需求进行选择即可,生产环境建议磁盘使用超高性能型。

第3步:网络设置

第3步: 网络设置 出于安全考虑,所有的集群都需要部署在私有网络中,选择自己创建的网络中。

第4步:依赖服务设置(可选)

第4步: 依赖服务设置 该步骤可选(若不设置则使用自带的postgresql服务)。
Airflow 的 metadata 存储在 PostgreSQL 数据库中,需要选择依赖的数据库服务器集群。

第5步:Airflow 参数设置

第5步: 环境参数设置 界面提供的参数大部分和 Airflow 性能相关,如果需要调整相关参数,可以按照自己的实际需求配置和调整相关参数,修改参数会导致 Airflow 服务重启,具体可以参考参数说明。

对于 executor 参数设置的说明:
单机版可以用于测试开发环境,需要设置 executor 为 SequentialExecutor ,表示单进程顺序执行 DAG 中的任务,通常只用于测试。
当单机版中参数 executor 设置为 LocalExecutor,表示多进程本地执行,可用于适量作业的生产环境。
集群版(开发中)中参数 executor设置为 CeleryExecutor ,表示使用 celery 作为执行器,可以分布式地多机跑任务,可用于作业数量大的生产环境。

第6步: 用户协议

阅读并同意青云 AppCenter 用户协议之后即可开始部署应用。

4.集群使用

4.1查看集群信息

在集群创建完毕后,可以在控制台 Appcenter -> 集群列表 标签下看到目前已经创建的集群信息。

集群列表 集群列表

集群的详细信息 集群信息

集群基础资源监控信息 基础资源监控信息

4.2 修改配置参数

点击配置参数可以修改 Airflow 参数,修改参数会导致 Airflow 服务重启,参数具体含义可以参考 Airflow 参数说明。 参数配置

4.3 扩容集群

点击 扩容集群 , 可以在集群性能不足时提高集群的配置: 集群扩容

5.Airflow 的使用

5.1 登录 Airflow web UI

Airflow 提供了一个基于 Web 的用户界面让用户可以可视化管道的依赖关系、监控进度、触发任务等操作。
部署好Airflow on QingCloud AppCenter 之后,连入内网,输入如下 url 进入 Airflow web UI 界面。
http://[airflow所在的IP地址]:8080/admin/
Airflow web login界面

默认的用户名密码是admin/admin,登录之后的界面如下所示。 Airflow web 界面

5.2 Dag的开发测试

Airflow on QingCloud AppCenter 提供用户登录主机的权限,在控制台上,Appcenter –>集群列表,找到自己的 Airflow 集群列表,进入节点 tab 页,点集群ID列右下角的 VNC图标。
以用户名airflow,密码airflow.2018!登录系统。 VNC login

Airflow on QingCloud AppCenter 开发测试环境下完成 DAG 文件的开发测试步骤如下。

  • 编写任务脚本(.py)文件
  • 测试任务脚本(command)
  • WebUI 自查

DAG (Directed Acyclic Graph)有向非循环图,表示的是一些任务的集合,描述了任务之间的依赖关系,以及整个 DAG 的一些属性, 比如起止时间,执行周期,重试策略等等。
Airflow 采用 Python 定义 DAG,通常一个 .py 文件就是一个 DAG 。

在部署的时候,如果参数是否载入示例选择是,在 web 界面会出现 airflow 内置的示例dag,通过学习这些 DAG 的源码可掌握 operator、调度、任务依赖等知识,能快速入门 Airflow DAG开发。

Airflow on QingCloud AppCenter 中,DAGS 文件存放的目录是/data/airflow/dags目录,为了方便用户使用,初始化的时候在该目录下放置了一个 hello_world.py 的示例Dag文件。

在该目录下编写自己的DAG文件定义自己的数据流逻辑代码,编码完成后,airflow 提供一系列的命令支持开发测试。

#进入DAGS目录
root@i-gpp3y0no:/data/airflow/dags# cd /
root@i-gpp3y0no:/# cd /data/airflow/dags
root@i-gpp3y0no:/data/airflow/dags# ls
hello_world.py  hello_world.pyc

#测试 python 代码的正确性
root@i-gpp3y0no:/data/airflow/dags# python hello_world.py

#查看 DAG 是否成功加载
root@i-gpp3y0no:/data/airflow/dags# airflow list_dags

#查看 DAG 的下 task 的树形结构是否正确
root@i-gpp3y0no:/data/airflow/dags# airflow list_tasks hello_world_dag --tree
<Task(PythonOperator): hello_task>
    <Task(BashOperator): date_task>
<Task(BashOperator): sleep_task>
    <Task(BashOperator): date_task>

#测试 DAG 的下 task 运行是否正确,不需要依赖满足
root@i-gpp3y0no:/data/airflow/dags# airflow test hello_world_dag hello_task 20180111
root@i-gpp3y0no:/data/airflow/dags# airflow test hello_world_dag sleep_task 20180111
root@i-gpp3y0no:/data/airflow/dags# airflow test hello_world_dag date_task 20180111

# 对dag进行某段时间内的完整测试,会自动解析依赖关系,按依赖顺序执行
root@i-gpp3y0no:/data/airflow/dags# airflow backfill hello_world_dag -s 2018-01-01 -e 2018-01-11

在 DAGS 目录 /data/airflow/dags 下增加任何新的 dag文件,需要重启 Airflow 的 Webserver 服务才能在 Web 界面上显示。
这个操作可以通过自定义服务–>重启Airflow Web服务 实现。 VNC login

关于 Airflow 的 DAG 开发请参考官方文档 获取更多相关知识。

5.3 Dag 的运维管理

以用户名 airflow ,密码 airflow.2018! 登录系统,在 Airflow 的DAG目录下编写自己的 DAG 文件,如果开发的 DAG 通过测试,就可以在生产环境下运行自定义的 DAG 了。
在开发测试环境下准备好 DAG 文件之后,需要把开发好的 DAG 文件上传到生产环境下的 DAGS 目录。
上传之后,重启集群或者重启 Airflow web 服务,然后再刷新 Airflow web UI 界面,就可以看到新加的 DAG 文件已经被加载成功了。 Airflow web on/off键

为了启动 DAG Run,首先打开工作流(off键),然后单击 Trigger Dag 按钮(Links 第一个按钮),最后单击 Graph View 按钮(Links 第三个按钮)以查看运行进度: Airflow web progress

点击 Airflow 的树形图(Tree View)迅速进掌握运行的状态。
垂直列着的方格表示的是一个 DAG 在一天里运行的所有任务。所有的方块都是绿色表示运行全部成功! Airflow web tree

完成后,你可以单击 hello_task,然后单击 View Log 查看日志。如果一切都按预期工作,日志应该显示一些行。

关于 Airflow 的运维管理请参考官方文档 获取更多相关知识。

5.4 日志管理

Airflow on QingCloud AppCenter 日志目录存放在/data/airflow/logs,可以以用户名 airflow ,密码 airflow.2018! 登录系统,进入该目录查看相关日志。
Airflow on QingCloud AppCenter 集成了 Datax,因此也保存了 datax 的日志,存放目录是/data/airflow/dataxjob/log。

6.Airflow 与 Datax 集成的使用场景

为了方便数据迁移的任务调度,Airflow on QingCloud AppCenter 集成了开源离线同步工具 Datax3.0, 并提供了几种数据迁移的常用使用场景下的 samples。
登录 Airflow Web 的用户界面,可以看到默认提供的 Airflow 与 Datax 集成的几种 job samples,如下图所示。 Airflow Datax Samples

默认 Datax Job 的配置文件路径是/data/airflow/dataxjob,用户可以在此目录下查看到默认提供的几种 Datax Job 的配置文件。

注意: datax job配置文件中的数据库配置为测试 DB (并非可用),用户可以改为自己的测试 DB.

6.1 本地文件到数据库

进入 Datax Job 的配置文件路径/data/airflow/dataxjob,查看文件 csv2pg_test.json,该文件配置了 Datax 的 txtfilereader 本地文件读取配置,本地测试数据目录存在放/data/testdata/,同时设置了以 Postgresql 数据库作为目标数据库的 postgresqlwriter 配置。
该配置文件定义了一个完整的从本地文件导入数据到 postgresql 数据库的一个 Datax Job,可以执行以下 Datax 的命令单独执行该导数据的 Job。

python /home/airflow/datax/bin/datax.py /data/airflow/dataxjob/csv2pg_test.json

进入 Airflow 的 Dags 目录查看 Airflow 调用 Datax job的配置,配置文件路径是/data/airflow/dags/csv2pg.py。

登录 Airflow Web 的用户界面,也可以查看到该 Airflow 的 Dag 已经显示在页面上,直接在页面上点击运行即可根据 Dag 的配置参数调度该 Job,该样例默认设置是一天运行一次。

6.2 FTP 文件到数据库

进入 Datax Job 的配置文件路径/data/dataxjob,查看文件 ftpcsv2pg_test.json,该文件配置了 Datax 的 ftpreader ftp 文件读取配置,ftp 测试数据目录存在放一个远程 ftp 的测试服务器上,同时设置了以 Postgresql 数据库作为目标数据库的 postgresqlwriter 配置。
该配置文件定义了一个完整的从远程 FTP 文件导入数据到 postgresql 数据库的一个 Datax Job,可以执行以下 Datax 的命令单独执行该导数据的 Job。

python /home/airflow/datax/bin/datax.py /data/airflow/dataxjob/ftpcsv2pg_test.json

进入 Airflow 的 Dags 目录查看 Airflow 调用 Datax job的配置,配置文件路径是/data/airflow/dags/ftpcsv2pg.py。

登录 Airflow Web 的用户界面,也可以查看到该 Airflow 的 Dag 已经显示在页面上,直接在页面上点击运行即可根据 Dag 的配置参数调度该Job,该样例默认设置是一天运行一次。

6.3 oracle 数据库到postgresql 数据库

进入 Datax Job 的配置文件路径/data/dataxjob,查看文件 oracle2pg_test.json,该文件配置了 Datax 的 oraclereader oracle 数据库读取配置,测试数据的数据源存在放一个远程的 Oracle 测试服务器上,同时设置了以 Postgresql 数据库作为目标数据库的 postgresqlwriter 配置。
该配置文件定义了一个完整的从远程 oralce 数据库导入数据到 postgresql 数据库的一个Datax Job,可以执行以下 Datax 的命令单独调用该导数据的 Job。

python /home/airflow/datax/bin/datax.py /data/airflow/dataxjob/oracle2pg_test.json

进入 Airflow 的 Dags 目录查看 Airflow 调用 Datax job 的配置,配置文件路径是/data/airflow/dags/oracle2pg.py。

登录 Airflow Web 的用户界面,也可以查看到该 Airflow 的 Dag 已经显示在页面上,直接在页面上点击运行即可根据 Dag 的配置参数调度该Job,该样例默认设置是一天运行一次。

6.4 sqlserver 数据库到postgresql 数据库

进入 Datax Job 的配置文件路径/data/dataxjob,查看文件 sqlserver2pg_test.json,该文件配置了 Datax 的 sqlserverreader sqlserver 数据库读取配置,测试数据的数据源存在放一个远程的 sqlserver 测试服务器上,同时设置了以 Postgresql 数据库作为目标数据库的 postgresqlwriter 配置。
该配置文件定义了一个完整的从远程 sqlserver 导入数据到 postgresql 数据库的一个Datax Job,可以执行以下 Datax 的命令单独调用该导数据的 Job。

python /home/airflow/datax/bin/datax.py /data/airflow/dataxjob/sqlserver2pg_test.json

进入 Airflow 的 Dags 目录查看 Airflow 调用 Datax job 的配置,配置文件路径是/data/airflow/dags/sqlserver2pg.py。

登录 Airflow Web 的用户界面,也可以查看到该 Airflow 的 Dag 已经显示在页面上,直接在页面上点击运行即可根据 Dag 的配置参数调度该Job,该样例默认设置是一天运行一次。

关于 Datax 的更多细节请参考官方文档 获取更多相关知识。

关于 Airflow on QingCloud AppCenter 的介绍就到这里 ,希望您在 Qingcloud 上使用愉快!