2018-09-20
Spark SQL 是 Spark 体系里最常用的一层能力。 很多团队用 Spark 做数据处理,并不直接写大量 RDD 代码,而是更多使用SQL的方式
如果你已经接触过 MySQL、PostgreSQL、SQL Server 这类数据库,Spark SQL 的上手门槛并不高。
它的核心价值在于:
这篇文章围绕 Spark SQL 的常用写法展开,重点覆盖:
Spark SQL 可以理解成 Spark 提供的一套结构化数据处理接口。
你可以用它来查询:
对于开发者来说,最常见的工作方式有两种:
很多项目里,第二种方式更常见,因为读取数据和执行 SQL 可以拆开处理。
为了把常见语法讲清楚,下面准备三张逻辑表:
users:用户表products:商品表orders:订单明细表字段如下:
users| 字段 | 含义 |
|---|---|
user_id |
用户编号 |
user_name |
用户名 |
city |
城市 |
user_level |
用户等级 |
products| 字段 | 含义 |
|---|---|
product_id |
商品编号 |
product_name |
商品名 |
category |
类目 |
price |
单价 |
orders| 字段 | 含义 |
|---|---|
order_id |
订单编号 |
user_id |
用户编号 |
product_id |
商品编号 |
quantity |
购买数量 |
amount |
订单金额 |
status |
订单状态 |
Spark SQL 最常见的入口就是临时视图。
例如在 PySpark 里:
users_df.createOrReplaceTempView("users")
products_df.createOrReplaceTempView("products")
orders_df.createOrReplaceTempView("orders")
注册完成后,就可以直接写 SQL:
spark.sql("""
select *
from users
""").show()
如果你用的是 Spark Shell 或 Scala,写法也类似:
usersDf.createOrReplaceTempView("users")
productsDf.createOrReplaceTempView("products")
ordersDf.createOrReplaceTempView("orders")
这里有两个容易混淆的概念:
createOrReplaceTempView:会话级临时视图createOrReplaceGlobalTempView:全局临时视图日常开发中,大多数场景用临时视图就够了。
select *
from orders;
这是最简单的写法,但在正式任务里通常不建议长期依赖 select *,原因是:
更常见的写法是显式列出字段:
select
order_id,
user_id,
product_id,
amount,
status
from orders;
select
order_id,
user_id,
amount,
status
from orders
where amount >= 100
and status = 'paid';
常见条件包括:
=, <>, >, <, >=, <=between ... and ...in (...)is null, is not nulland, or, not例如:
select *
from users
where city in ('Shanghai', 'Beijing')
and user_level is not null;
select
order_id,
user_id,
amount
from orders
order by amount desc
limit 10;
这个写法常用于:
select distinct city
from users;
如果需要按多个字段去重:
select distinct user_id, city
from users;
select
user_id as uid,
user_name as uname,
user_level as level_name
from users;
select
order_id,
quantity,
amount,
amount / quantity as unit_amount
from orders
where quantity > 0;
Spark SQL 支持在 select 中直接写大量表达式,这也是它非常适合做指标计算的原因之一。
CASE WHEN:做条件判断最常用CASE WHEN 在 Spark SQL 里出现频率很高,经常用于:
示例:
select
user_id,
amount,
case
when amount >= 1000 then 'high'
when amount >= 300 then 'middle'
else 'low'
end as amount_tag
from orders;
再比如对订单状态做中文映射:
select
order_id,
status,
case
when status = 'paid' then '已支付'
when status = 'cancelled' then '已取消'
when status = 'refunded' then '已退款'
else '其他状态'
end as status_name
from orders;
GROUP BY、HAVING聚合是 Spark SQL 里最常见的一类操作。
select
user_id,
count(*) as order_cnt,
sum(amount) as total_amount,
avg(amount) as avg_amount,
max(amount) as max_amount,
min(amount) as min_amount
from orders
group by user_id;
select
city,
count(*) as user_cnt
from users
group by city;
HAVING 过滤聚合结果select
user_id,
sum(amount) as total_amount
from orders
group by user_id
having sum(amount) >= 1000;
WHERE 和 HAVING 的区别非常关键:
WHERE:过滤明细行HAVING:过滤聚合结果JOIN 是高频语法Spark SQL 的很多核心任务,本质上都是多表拼接。
select
o.order_id,
u.user_name,
p.product_name,
o.quantity,
o.amount
from orders o
join users u
on o.user_id = u.user_id
join products p
on o.product_id = p.product_id;
内连接只保留关联成功的数据。
select
u.user_id,
u.user_name,
o.order_id,
o.amount
from users u
left join orders o
on u.user_id = o.user_id;
左连接适合:
连接查询里最容易出问题的是这几类情况:
所以在写连接 SQL 时,建议明确:
复杂 SQL 往往离不开子查询和 CTE。
select *
from (
select
user_id,
sum(amount) as total_amount
from orders
group by user_id
) t
where t.total_amount >= 1000;
CTE 就是 WITH 子句,适合把复杂逻辑拆开写。
with user_amount as (
select
user_id,
sum(amount) as total_amount
from orders
group by user_id
),
user_profile as (
select
u.user_id,
u.user_name,
u.city,
a.total_amount
from users u
left join user_amount a
on u.user_id = a.user_id
)
select *
from user_profile
where total_amount >= 1000;
这种写法的优点很明显:
窗口函数是 Spark SQL 里非常实用的一块内容。
select
user_id,
order_id,
amount,
row_number() over (
partition by user_id
order by amount desc
) as rn
from orders;
这个写法常用于:
with ranked_orders as (
select
user_id,
order_id,
amount,
row_number() over (
partition by user_id
order by amount desc, order_id asc
) as rn
from orders
)
select
user_id,
order_id,
amount
from ranked_orders
where rn = 1;
select
user_id,
order_id,
amount,
sum(amount) over (
partition by user_id
order by amount desc
rows between unbounded preceding and current row
) as running_amount
from orders;
除了 row_number(),常见窗口函数还包括:
rank()dense_rank()sum() over (...)avg() over (...)count() over (...)null 相关写法要熟空值处理在数据开发里非常常见。
select *
from users
where city is null;
select *
from users
where city is not null;
select
user_id,
coalesce(city, 'unknown') as city_name
from users;
coalesce 的作用是返回第一个非空值。
例如:
select
user_id,
coalesce(city, 'unknown') as city_name,
coalesce(user_level, 'normal') as level_name
from users;
select
order_id,
coalesce(amount, 0) as safe_amount
from orders;
字符串处理也是 Spark SQL 的常见需求。
select
user_id,
concat(user_name, '_', city) as user_tag
from users;
select
product_name,
substr(product_name, 1, 4) as short_name,
regexp_replace(product_name, ' ', '') as clean_name
from products;
select
trim(user_name) as user_name_clean,
upper(city) as city_upper,
lower(city) as city_lower
from users;
常见字符串函数包括:
concatsubstrlengthtrimupperlowerregexp_replacesplitexplode 很实用实际业务里,经常遇到一行里带一个数组字段,或者某个字段里存着分隔字符串。
这时 explode 很好用。
假设有一个字段 tags,内容类似:
phone,electronics,hot
可以这样写:
select
product_id,
explode(split(tags, ',')) as tag
from product_tags;
执行后,一行会被拆成多行。
例如原始数据:
product_id = 101, tags = phone,electronics,hot
可能会变成:
101 phone
101 electronics
101 hot
explode_outer如果担心空数组或空字段导致数据直接丢掉,可以考虑:
select
product_id,
explode_outer(split(tags, ',')) as tag
from product_tags;
很多原始数据都带 JSON 字段,Spark SQL 对这类场景支持不错。
假设表 event_log 中有个字段 payload:
{"page":"home","channel":"app","score":98}
可以直接取值:
select
get_json_object(payload, '$.page') as page_name,
get_json_object(payload, '$.channel') as channel_name
from event_log;
select
from_json(
payload,
'page string, channel string, score int'
) as payload_obj
from event_log;
再进一步取子字段:
with parsed_log as (
select
from_json(
payload,
'page string, channel string, score int'
) as payload_obj
from event_log
)
select
payload_obj.page,
payload_obj.channel,
payload_obj.score
from parsed_log;
UNION 与 UNION ALL如果要合并多份结构一致的数据,通常会用到这两个语法。
UNION ALLselect user_id, amount
from orders_a
union all
select user_id, amount
from orders_b;
UNION ALL 会直接拼接结果,不去重。
UNIONselect user_id, amount
from orders_a
union
select user_id, amount
from orders_b;
UNION 会去重,代价通常更高。
如果业务不要求去重,优先考虑 UNION ALL。
很多明细表会出现重复数据,常见处理方式是配合窗口函数去重。
with dedup_orders as (
select
*,
row_number() over (
partition by order_id
order by amount desc
) as rn
from orders
)
select *
from dedup_orders
where rn = 1;
这个模式很常见,关键点在于:
Spark SQL 不只是查数据,也可以落结果。
create table user_order_summary as
select
user_id,
count(*) as order_cnt,
sum(amount) as total_amount
from orders
group by user_id;
insert overwrite table user_order_summary
select
user_id,
count(*) as order_cnt,
sum(amount) as total_amount
from orders
group by user_id;
不同环境里,表管理方式可能不同:
SQL 写法会有一些差别,但整体思路基本一致。
select *显式列字段通常更稳:
如果维表有重复,连接结果可能被放大很多倍。
这是 Spark SQL 任务里非常常见的问题来源。
与其写一条极长 SQL,不如按语义拆成多段 WITH。
可以用:
explain
select
u.city,
sum(o.amount) as total_amount
from users u
join orders o
on u.user_id = o.user_id
group by u.city;
执行计划能帮助你判断:
在部分场景里,可以显式提示广播小表:
select /*+ broadcast(u) */
o.order_id,
o.amount,
u.user_name
from orders o
join users u
on o.user_id = u.user_id;
这类写法常用于:
是否生效还要结合执行计划来看。
如果只保留最常用的几类写法,基本可以记住下面这些:
select *
from orders
where status = 'paid'
and amount >= 100;
select
user_id,
sum(amount) as total_amount
from orders
group by user_id;
select
o.order_id,
u.user_name
from orders o
left join users u
on o.user_id = u.user_id;
select
user_id,
order_id,
amount,
row_number() over (
partition by user_id
order by amount desc
) as rn
from orders;
select
user_id,
coalesce(city, 'unknown') as city_name
from users;
Spark SQL 的优势不在“语法有多特别”,而在于它把熟悉的 SQL 能力放进了 Spark 的分布式执行环境里。
对于大多数数据开发任务,真正高频的内容就是这几类:
如果你已经能把这几类写顺手,很多 Spark 数据处理任务就能比较自然地展开。
可以用一句话概括这篇文章的重点:
Spark SQL 的核心,不是记住所有函数,而是熟练掌握常见数据处理模式。
下面是一段很常见的 PySpark + Spark SQL 组合写法:
users_df.createOrReplaceTempView("users")
orders_df.createOrReplaceTempView("orders")
result_df = spark.sql("""
with user_amount as (
select
user_id,
count(*) as order_cnt,
sum(amount) as total_amount
from orders
where status = 'paid'
group by user_id
)
select
u.user_id,
u.user_name,
u.city,
a.order_cnt,
a.total_amount
from users u
left join user_amount a
on u.user_id = a.user_id
""")
result_df.show()
这段代码里已经包含了 Spark SQL 最核心的几种能力:
把这些内容掌握扎实,Spark SQL 的日常开发就已经具备不错的基础了。