2021-06-04
Azkaban 是 LinkedIn 开源的一套批处理工作流调度系统。它的核心思路很朴素:把一组任务按依赖关系组织成 flow,上传成项目包,然后由 Azkaban 负责触发、执行、查看日志、处理失败和管理权限。
如果你有一批数据任务,比如抽取、清洗、汇总、导出,手工跑起来很容易乱。Azkaban 的价值就在这里:它把“先跑谁、后跑谁、失败怎么办、参数怎么传、日志在哪看”这些事情放到一个统一入口里。
它适合偏批处理的任务编排:
它不是实时流处理引擎,也不是容器编排平台。它更像一个工作流控制台,把脚本、SQL、Spark 提交命令串成一条能追踪的链路。
Azkaban 常见组件有三个:
WebServer:页面、API、项目管理、权限入口
Executor:真正执行 job 的服务
MySQL:保存项目、flow、执行记录、日志索引等元数据
用户在 Web 页面上传项目 zip。WebServer 解析项目,保存元数据。触发 flow 后,Executor 拉起 job,并把执行结果和日志写回。
简单理解:
WebServer 管入口
Executor 管执行
MySQL 管状态
这种拆分让界面管理和任务执行分开,任务重的时候可以扩 Executor。
Azkaban 的项目通常是一个 zip 包。里面可以放 .job 文件、脚本文件、配置文件。
一个简单项目结构:
demo-flow/
extract_users.job
transform_users.job
load_users.job
jobs/
extract_users.py
transform_users.py
load_users.py
每个 .job 文件描述一个任务。依赖关系通过 dependencies 串起来。
extract_users.job:
type=command
command=python3 jobs/extract_users.py --env ${env}
type=command 表示这就是一个普通命令任务。command 是实际执行的命令。
transform_users.job:
type=command
dependencies=extract_users
command=python3 jobs/transform_users.py --env ${env}
load_users.job:
type=command
dependencies=transform_users
command=python3 jobs/load_users.py --env ${env}
这样就形成了:
extract_users -> transform_users -> load_users
Azkaban 会先跑上游任务,上游成功后再跑下游。
任务脚本最好做到三点:
jobs/extract_users.py:
import argparse
import sys
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--env", default="dev")
return parser.parse_args()
def main():
args = parse_args()
print(f"extract users, env={args.env}")
try:
# 这里可以放数据库查询、文件拉取、接口请求等逻辑
print("extract finished")
except Exception as exc:
print(f"extract failed: {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())
Azkaban 判断任务成功失败,最直观的方式就是看命令退出码。返回 0 表示成功,非 0 表示失败。
job 文件里可以使用变量:
type=command
command=python3 jobs/load_users.py --env ${env} --batch ${batch}
触发 flow 时传:
env=prod
batch=user_profile
脚本里用 argparse 接:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--env", required=True)
parser.add_argument("--batch", required=True)
args = parser.parse_args()
print(args.env, args.batch)
这样同一份项目包可以跑不同环境、不同批次,不用复制一堆几乎一样的 job。
普通 command job 可以配置重试:
type=command
command=python3 jobs/load_users.py --env ${env}
retries=2
retry.backoff=60000
如果任务偶发失败,比如网络抖动、远端服务短暂不可用,重试很有价值。
但不要把重试当万能药。数据质量错误、SQL 语法错误、权限错误,重试几次都没用。脚本里最好把错误打印清楚,让日志能直接定位问题。
flow 级别还可以配置失败处理策略,例如失败后停止、完成可继续的任务等。生产链路里更推荐保守一点:关键任务失败后停止下游,避免污染结果表。
.job 文件应该清楚、短小。
推荐:
type=command
dependencies=transform_users
command=python3 jobs/load_users.py --env ${env} --batch ${batch}
retries=1
不推荐:
type=command
command=一大串难以维护的 shell 拼接
复杂逻辑放进 Python、Shell 或 Spark 代码里,job 只负责描述入口和依赖。这样项目结构会干净很多。
手工压 zip 容易漏文件。可以写一个小脚本统一打包。
from pathlib import Path
from zipfile import ZIP_DEFLATED, ZipFile
PROJECT_DIR = Path("demo-flow")
OUTPUT = Path("demo-flow.zip")
def build_zip(project_dir: Path, output: Path) -> None:
if output.exists():
output.unlink()
with ZipFile(output, "w", compression=ZIP_DEFLATED) as archive:
for path in project_dir.rglob("*"):
if path.is_file():
archive.write(path, path.relative_to(project_dir))
if __name__ == "__main__":
build_zip(PROJECT_DIR, OUTPUT)
print(f"created {OUTPUT}")
打出来的 zip 里,.job 文件应该在根目录或 Azkaban 能识别的位置。上传前可以先解压看一眼结构。
Azkaban 提供 AJAX API。可以用 Python requests 写一个简单客户端:登录、创建项目、上传 zip、执行 flow。
安装依赖:
pip install requests
客户端代码:
from pathlib import Path
import requests
class AzkabanClient:
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip("/")
self.username = username
self.password = password
self.session_id = ""
def login(self) -> None:
response = requests.post(
f"{self.base_url}/",
data={
"action": "login",
"username": self.username,
"password": self.password,
},
timeout=30,
)
response.raise_for_status()
payload = response.json()
if "session.id" not in payload:
raise RuntimeError(f"login failed: {payload}")
self.session_id = payload["session.id"]
def create_project(self, name: str, description: str) -> None:
response = requests.post(
f"{self.base_url}/manager",
data={
"session.id": self.session_id,
"action": "create",
"name": name,
"description": description,
},
timeout=30,
)
response.raise_for_status()
def upload_project(self, project: str, zip_path: Path) -> dict:
with zip_path.open("rb") as file:
response = requests.post(
f"{self.base_url}/manager",
data={
"session.id": self.session_id,
"ajax": "upload",
"project": project,
},
files={"file": (zip_path.name, file, "application/zip")},
timeout=120,
)
response.raise_for_status()
return response.json()
def execute_flow(self, project: str, flow: str, params: dict[str, str]) -> dict:
data = {
"session.id": self.session_id,
"ajax": "executeFlow",
"project": project,
"flow": flow,
}
for key, value in params.items():
data[f"flowOverride[{key}]"] = value
response = requests.get(
f"{self.base_url}/executor",
params=data,
timeout=30,
)
response.raise_for_status()
return response.json()
使用:
client = AzkabanClient(
base_url="http://azkaban.example.com:8081",
username="demo",
password="demo_password",
)
client.login()
try:
client.create_project("demo-flow", "Python demo workflow")
except requests.HTTPError:
print("project may already exist")
print(client.upload_project("demo-flow", Path("demo-flow.zip")))
result = client.execute_flow(
project="demo-flow",
flow="load_users",
params={
"env": "prod",
"batch": "user_profile",
},
)
print(result)
这样就能把“打包、上传、触发”串成一个脚本,减少手工操作。
老式 .job 项目里,Azkaban 会根据 job 依赖关系识别 flow。通常没有下游依赖的最终 job,会形成 flow 入口。
比如:
extract_users.job
transform_users.job 依赖 extract_users
load_users.job 依赖 transform_users
触发时 flow 名通常就是最终节点 load_users。
如果使用 Flow 2.0,可以用 YAML 描述更清楚的 flow。比如:
nodes:
- name: extract
type: command
config:
command: python3 jobs/extract_users.py --env ${env}
- name: transform
type: command
dependsOn:
- extract
config:
command: python3 jobs/transform_users.py --env ${env}
- name: load
type: command
dependsOn:
- transform
config:
command: python3 jobs/load_users.py --env ${env}
如果团队里已经有旧项目,用 .job 继续维护没问题。新项目可以考虑用 YAML,把结构写得更集中。
Azkaban 会收集任务日志,但日志内容还是脚本自己输出的。
推荐日志风格:
import logging
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(name)s - %(message)s",
)
logger = logging.getLogger("load_users")
def main():
logger.info("load started")
logger.info("read source rows: %s", 1024)
logger.info("write target rows: %s", 1024)
logger.info("load finished")
别只打印“失败了”。至少要打印:
日志写得清楚,值班同学会少掉很多猜谜环节。
Azkaban 里项目是重要边界。不要把所有任务塞进一个超级项目。
更合理的划分:
ods-user-flow
dwd-order-flow
ads-report-flow
risk-feature-flow
每个项目只给相关人权限。核心生产项目不要随便开放上传和执行权限。
项目命名也要稳定。脚本、监控、通知里都会引用项目名和 flow 名,频繁改名会制造额外麻烦。
调度系统只负责把任务拉起来,任务是否安全,取决于你的脚本设计。
比如写入结果表时,不要简单追加导致重复数据。可以按批次覆盖:
DELETE FROM target_table WHERE batch = '${batch}';
INSERT INTO target_table SELECT ...;
或者写入临时表后再交换:
写临时表
校验行数
替换目标分区
清理临时表
任务失败后重跑是常态。能重跑、不脏数据,是数据任务的基本素质。
Azkaban 不替代 Spark、Hive、Flink 这些计算引擎。它只是负责把命令组织起来。
Spark job 可以这样写:
type=command
command=spark-submit --master yarn --deploy-mode cluster jobs/user_profile.py --env ${env}
Hive SQL 可以这样写:
type=command
command=hive -f sql/build_user_profile.sql --hivevar env=${env}
Shell 也可以:
type=command
command=bash scripts/check_source_ready.sh ${batch}
统一入口是 Azkaban,真正干活的是你选择的计算或脚本工具。
不要在 job 里写一长串复杂命令。复杂逻辑进脚本,job 保持短。
不要把密码写进 .job 文件。用配置中心、环境变量或密钥管理。
不要一堆任务挤在一个项目里。项目边界越清楚,权限越好管。
不要忽略退出码。脚本失败必须返回非零值。
不要让任务不可重跑。数据任务要考虑重复执行后的结果。
不要只靠重试掩盖逻辑错误。重试适合临时故障,不适合脏数据和代码错误。
一个比较舒服的项目结构:
user-profile-flow/
extract_users.job
transform_users.job
load_users.job
jobs/
extract_users.py
transform_users.py
load_users.py
sql/
transform_users.sql
conf/
dev.properties
prod.properties