Azkaban——轻量级任务调度工具




2021-06-04

blog_main_img

Azkaban 是 LinkedIn 开源的一套批处理工作流调度系统。它的核心思路很朴素:把一组任务按依赖关系组织成 flow,上传成项目包,然后由 Azkaban 负责触发、执行、查看日志、处理失败和管理权限。

如果你有一批数据任务,比如抽取、清洗、汇总、导出,手工跑起来很容易乱。Azkaban 的价值就在这里:它把“先跑谁、后跑谁、失败怎么办、参数怎么传、日志在哪看”这些事情放到一个统一入口里。

Azkaban 适合什么场景

它适合偏批处理的任务编排:

  • 数据仓库离线链路
  • Hive、Spark、Shell、Python 任务
  • 多任务依赖执行
  • 人工触发或周期触发
  • 任务失败后查看日志并重跑
  • 项目级权限隔离

它不是实时流处理引擎,也不是容器编排平台。它更像一个工作流控制台,把脚本、SQL、Spark 提交命令串成一条能追踪的链路。

架构先看懂

Azkaban 常见组件有三个:

WebServer:页面、API、项目管理、权限入口
Executor:真正执行 job 的服务
MySQL:保存项目、flow、执行记录、日志索引等元数据

Azkaban 架构组件

用户在 Web 页面上传项目 zip。WebServer 解析项目,保存元数据。触发 flow 后,Executor 拉起 job,并把执行结果和日志写回。

简单理解:

WebServer 管入口
Executor 管执行
MySQL 管状态

这种拆分让界面管理和任务执行分开,任务重的时候可以扩 Executor。

项目包长什么样

Azkaban 的项目通常是一个 zip 包。里面可以放 .job 文件、脚本文件、配置文件。

一个简单项目结构:

demo-flow/
  extract_users.job
  transform_users.job
  load_users.job
  jobs/
    extract_users.py
    transform_users.py
    load_users.py

每个 .job 文件描述一个任务。依赖关系通过 dependencies 串起来。

Azkaban job flow 依赖关系

一个最小 job

extract_users.job

type=command
command=python3 jobs/extract_users.py --env ${env}

type=command 表示这就是一个普通命令任务。command 是实际执行的命令。

transform_users.job

type=command
dependencies=extract_users
command=python3 jobs/transform_users.py --env ${env}

load_users.job

type=command
dependencies=transform_users
command=python3 jobs/load_users.py --env ${env}

这样就形成了:

extract_users -> transform_users -> load_users

Azkaban 会先跑上游任务,上游成功后再跑下游。

Python 任务怎么写

任务脚本最好做到三点:

  • 参数从命令行传入
  • 日志直接输出到 stdout / stderr
  • 失败时用非零退出码

jobs/extract_users.py

import argparse
import sys


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--env", default="dev")
    return parser.parse_args()


def main():
    args = parse_args()
    print(f"extract users, env={args.env}")

    try:
        # 这里可以放数据库查询、文件拉取、接口请求等逻辑
        print("extract finished")
    except Exception as exc:
        print(f"extract failed: {exc}", file=sys.stderr)
        return 1

    return 0


if __name__ == "__main__":
    raise SystemExit(main())

Azkaban 判断任务成功失败,最直观的方式就是看命令退出码。返回 0 表示成功,非 0 表示失败。

参数传递:别把环境写死

job 文件里可以使用变量:

type=command
command=python3 jobs/load_users.py --env ${env} --batch ${batch}

触发 flow 时传:

env=prod
batch=user_profile

脚本里用 argparse 接:

import argparse


parser = argparse.ArgumentParser()
parser.add_argument("--env", required=True)
parser.add_argument("--batch", required=True)
args = parser.parse_args()

print(args.env, args.batch)

这样同一份项目包可以跑不同环境、不同批次,不用复制一堆几乎一样的 job。

失败重试和失败策略

普通 command job 可以配置重试:

type=command
command=python3 jobs/load_users.py --env ${env}
retries=2
retry.backoff=60000

如果任务偶发失败,比如网络抖动、远端服务短暂不可用,重试很有价值。

但不要把重试当万能药。数据质量错误、SQL 语法错误、权限错误,重试几次都没用。脚本里最好把错误打印清楚,让日志能直接定位问题。

flow 级别还可以配置失败处理策略,例如失败后停止、完成可继续的任务等。生产链路里更推荐保守一点:关键任务失败后停止下游,避免污染结果表。

job 文件不要写成配置垃圾桶

.job 文件应该清楚、短小。

推荐:

type=command
dependencies=transform_users
command=python3 jobs/load_users.py --env ${env} --batch ${batch}
retries=1

不推荐:

type=command
command=一大串难以维护的 shell 拼接

复杂逻辑放进 Python、Shell 或 Spark 代码里,job 只负责描述入口和依赖。这样项目结构会干净很多。

用 Python 生成项目 zip

手工压 zip 容易漏文件。可以写一个小脚本统一打包。

from pathlib import Path
from zipfile import ZIP_DEFLATED, ZipFile


PROJECT_DIR = Path("demo-flow")
OUTPUT = Path("demo-flow.zip")


def build_zip(project_dir: Path, output: Path) -> None:
    if output.exists():
        output.unlink()

    with ZipFile(output, "w", compression=ZIP_DEFLATED) as archive:
        for path in project_dir.rglob("*"):
            if path.is_file():
                archive.write(path, path.relative_to(project_dir))


if __name__ == "__main__":
    build_zip(PROJECT_DIR, OUTPUT)
    print(f"created {OUTPUT}")

打出来的 zip 里,.job 文件应该在根目录或 Azkaban 能识别的位置。上传前可以先解压看一眼结构。

用 API 上传并触发 flow

Azkaban 提供 AJAX API。可以用 Python requests 写一个简单客户端:登录、创建项目、上传 zip、执行 flow。

Azkaban Python API 发布流程

安装依赖:

pip install requests

客户端代码:

from pathlib import Path

import requests


class AzkabanClient:
    def __init__(self, base_url: str, username: str, password: str):
        self.base_url = base_url.rstrip("/")
        self.username = username
        self.password = password
        self.session_id = ""

    def login(self) -> None:
        response = requests.post(
            f"{self.base_url}/",
            data={
                "action": "login",
                "username": self.username,
                "password": self.password,
            },
            timeout=30,
        )
        response.raise_for_status()
        payload = response.json()

        if "session.id" not in payload:
            raise RuntimeError(f"login failed: {payload}")

        self.session_id = payload["session.id"]

    def create_project(self, name: str, description: str) -> None:
        response = requests.post(
            f"{self.base_url}/manager",
            data={
                "session.id": self.session_id,
                "action": "create",
                "name": name,
                "description": description,
            },
            timeout=30,
        )
        response.raise_for_status()

    def upload_project(self, project: str, zip_path: Path) -> dict:
        with zip_path.open("rb") as file:
            response = requests.post(
                f"{self.base_url}/manager",
                data={
                    "session.id": self.session_id,
                    "ajax": "upload",
                    "project": project,
                },
                files={"file": (zip_path.name, file, "application/zip")},
                timeout=120,
            )

        response.raise_for_status()
        return response.json()

    def execute_flow(self, project: str, flow: str, params: dict[str, str]) -> dict:
        data = {
            "session.id": self.session_id,
            "ajax": "executeFlow",
            "project": project,
            "flow": flow,
        }

        for key, value in params.items():
            data[f"flowOverride[{key}]"] = value

        response = requests.get(
            f"{self.base_url}/executor",
            params=data,
            timeout=30,
        )
        response.raise_for_status()
        return response.json()

使用:

client = AzkabanClient(
    base_url="http://azkaban.example.com:8081",
    username="demo",
    password="demo_password",
)

client.login()

try:
    client.create_project("demo-flow", "Python demo workflow")
except requests.HTTPError:
    print("project may already exist")

print(client.upload_project("demo-flow", Path("demo-flow.zip")))

result = client.execute_flow(
    project="demo-flow",
    flow="load_users",
    params={
        "env": "prod",
        "batch": "user_profile",
    },
)

print(result)

这样就能把“打包、上传、触发”串成一个脚本,减少手工操作。

Flow 名字从哪里来

老式 .job 项目里,Azkaban 会根据 job 依赖关系识别 flow。通常没有下游依赖的最终 job,会形成 flow 入口。

比如:

extract_users.job
transform_users.job 依赖 extract_users
load_users.job 依赖 transform_users

触发时 flow 名通常就是最终节点 load_users

如果使用 Flow 2.0,可以用 YAML 描述更清楚的 flow。比如:

nodes:
  - name: extract
    type: command
    config:
      command: python3 jobs/extract_users.py --env ${env}

  - name: transform
    type: command
    dependsOn:
      - extract
    config:
      command: python3 jobs/transform_users.py --env ${env}

  - name: load
    type: command
    dependsOn:
      - transform
    config:
      command: python3 jobs/load_users.py --env ${env}

如果团队里已经有旧项目,用 .job 继续维护没问题。新项目可以考虑用 YAML,把结构写得更集中。

日志排查:让脚本自己会说话

Azkaban 会收集任务日志,但日志内容还是脚本自己输出的。

推荐日志风格:

import logging


logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(name)s - %(message)s",
)

logger = logging.getLogger("load_users")


def main():
    logger.info("load started")
    logger.info("read source rows: %s", 1024)
    logger.info("write target rows: %s", 1024)
    logger.info("load finished")

别只打印“失败了”。至少要打印:

  • 输入参数
  • 读了多少数据
  • 写了多少数据
  • 关键 SQL 或任务名
  • 异常堆栈

日志写得清楚,值班同学会少掉很多猜谜环节。

权限和项目边界

Azkaban 里项目是重要边界。不要把所有任务塞进一个超级项目。

更合理的划分:

ods-user-flow
dwd-order-flow
ads-report-flow
risk-feature-flow

每个项目只给相关人权限。核心生产项目不要随便开放上传和执行权限。

项目命名也要稳定。脚本、监控、通知里都会引用项目名和 flow 名,频繁改名会制造额外麻烦。

任务幂等:数据链路的保命习惯

调度系统只负责把任务拉起来,任务是否安全,取决于你的脚本设计。

比如写入结果表时,不要简单追加导致重复数据。可以按批次覆盖:

DELETE FROM target_table WHERE batch = '${batch}';
INSERT INTO target_table SELECT ...;

或者写入临时表后再交换:

写临时表
校验行数
替换目标分区
清理临时表

任务失败后重跑是常态。能重跑、不脏数据,是数据任务的基本素质。

和 Spark、Hive、Shell 的关系

Azkaban 不替代 Spark、Hive、Flink 这些计算引擎。它只是负责把命令组织起来。

Spark job 可以这样写:

type=command
command=spark-submit --master yarn --deploy-mode cluster jobs/user_profile.py --env ${env}

Hive SQL 可以这样写:

type=command
command=hive -f sql/build_user_profile.sql --hivevar env=${env}

Shell 也可以:

type=command
command=bash scripts/check_source_ready.sh ${batch}

统一入口是 Azkaban,真正干活的是你选择的计算或脚本工具。

常见坑

不要在 job 里写一长串复杂命令。复杂逻辑进脚本,job 保持短。

不要把密码写进 .job 文件。用配置中心、环境变量或密钥管理。

不要一堆任务挤在一个项目里。项目边界越清楚,权限越好管。

不要忽略退出码。脚本失败必须返回非零值。

不要让任务不可重跑。数据任务要考虑重复执行后的结果。

不要只靠重试掩盖逻辑错误。重试适合临时故障,不适合脏数据和代码错误。

一套推荐落地方式

一个比较舒服的项目结构:

user-profile-flow/
  extract_users.job
  transform_users.job
  load_users.job
  jobs/
    extract_users.py
    transform_users.py
    load_users.py
  sql/
    transform_users.sql
  conf/
    dev.properties
    prod.properties