sparksql的常见用法




2018-09-20

blog_main_img

Spark SQL 是 Spark 体系里最常用的一层能力。 很多团队用 Spark 做数据处理,并不直接写大量 RDD 代码,而是更多使用SQL的方式

Spark SQL 常用语法实战:把常见查询写法一次讲清楚

如果你已经接触过 MySQL、PostgreSQL、SQL Server 这类数据库,Spark SQL 的上手门槛并不高。
它的核心价值在于:

  • 保留了 SQL 的表达力
  • 可以直接运行在 Spark 的分布式执行引擎上
  • 很适合做清洗、聚合、宽表拼接、指标计算、明细分析

这篇文章围绕 Spark SQL 的常用写法展开,重点覆盖:

  • 临时视图怎么用
  • 常见查询怎么写
  • 聚合、连接、窗口函数怎么写
  • 数组、JSON、空值处理怎么写
  • 写 Spark SQL 时有哪些实用建议

1. Spark SQL 是什么

Spark SQL 可以理解成 Spark 提供的一套结构化数据处理接口。
你可以用它来查询:

  • Hive 表
  • Parquet 文件
  • ORC 文件
  • JSON 文件
  • CSV 文件
  • DataFrame 注册出来的视图

对于开发者来说,最常见的工作方式有两种:

  1. 直接写 SQL 查询
  2. 用 DataFrame 读数据,再注册成视图,然后写 SQL

很多项目里,第二种方式更常见,因为读取数据和执行 SQL 可以拆开处理。


2. 准备一组示例数据

为了把常见语法讲清楚,下面准备三张逻辑表:

  • users:用户表
  • products:商品表
  • orders:订单明细表

字段如下:

2.1 users

字段 含义
user_id 用户编号
user_name 用户名
city 城市
user_level 用户等级

2.2 products

字段 含义
product_id 商品编号
product_name 商品名
category 类目
price 单价

2.3 orders

字段 含义
order_id 订单编号
user_id 用户编号
product_id 商品编号
quantity 购买数量
amount 订单金额
status 订单状态

3. 把 DataFrame 注册成临时视图

Spark SQL 最常见的入口就是临时视图。

例如在 PySpark 里:

users_df.createOrReplaceTempView("users")
products_df.createOrReplaceTempView("products")
orders_df.createOrReplaceTempView("orders")

注册完成后,就可以直接写 SQL:

spark.sql("""
select *
from users
""").show()

如果你用的是 Spark Shell 或 Scala,写法也类似:

usersDf.createOrReplaceTempView("users")
productsDf.createOrReplaceTempView("products")
ordersDf.createOrReplaceTempView("orders")

这里有两个容易混淆的概念:

  • createOrReplaceTempView:会话级临时视图
  • createOrReplaceGlobalTempView:全局临时视图

日常开发中,大多数场景用临时视图就够了。


4. 最基础的查询写法

4.1 查询全部字段

select *
from orders;

这是最简单的写法,但在正式任务里通常不建议长期依赖 select *,原因是:

  • 字段不够明确
  • 上游表结构变动会带来不必要影响
  • 读取字段过多时会增加开销

更常见的写法是显式列出字段:

select
    order_id,
    user_id,
    product_id,
    amount,
    status
from orders;

4.2 条件过滤

select
    order_id,
    user_id,
    amount,
    status
from orders
where amount >= 100
  and status = 'paid';

常见条件包括:

  • 比较运算:=, <>, >, <, >=, <=
  • 区间判断:between ... and ...
  • 集合判断:in (...)
  • 空值判断:is null, is not null
  • 多条件组合:and, or, not

例如:

select *
from users
where city in ('Shanghai', 'Beijing')
  and user_level is not null;

4.3 排序与取前几行

select
    order_id,
    user_id,
    amount
from orders
order by amount desc
limit 10;

这个写法常用于:

  • 查看金额最高的订单
  • 观察样本数据
  • 检查计算结果是否符合预期

5. 去重、别名与表达式

5.1 去重

select distinct city
from users;

如果需要按多个字段去重:

select distinct user_id, city
from users;

5.2 字段别名

select
    user_id as uid,
    user_name as uname,
    user_level as level_name
from users;

5.3 直接写计算表达式

select
    order_id,
    quantity,
    amount,
    amount / quantity as unit_amount
from orders
where quantity > 0;

Spark SQL 支持在 select 中直接写大量表达式,这也是它非常适合做指标计算的原因之一。


6. CASE WHEN:做条件判断最常用

CASE WHEN 在 Spark SQL 里出现频率很高,经常用于:

  • 分类打标
  • 业务映射
  • 规则判断
  • 指标分层

示例:

select
    user_id,
    amount,
    case
        when amount >= 1000 then 'high'
        when amount >= 300 then 'middle'
        else 'low'
    end as amount_tag
from orders;

再比如对订单状态做中文映射:

select
    order_id,
    status,
    case
        when status = 'paid' then '已支付'
        when status = 'cancelled' then '已取消'
        when status = 'refunded' then '已退款'
        else '其他状态'
    end as status_name
from orders;

7. 聚合查询:GROUP BYHAVING

聚合是 Spark SQL 里最常见的一类操作。

7.1 按用户统计订单金额

select
    user_id,
    count(*) as order_cnt,
    sum(amount) as total_amount,
    avg(amount) as avg_amount,
    max(amount) as max_amount,
    min(amount) as min_amount
from orders
group by user_id;

7.2 按城市统计用户数

select
    city,
    count(*) as user_cnt
from users
group by city;

7.3 使用 HAVING 过滤聚合结果

select
    user_id,
    sum(amount) as total_amount
from orders
group by user_id
having sum(amount) >= 1000;

WHEREHAVING 的区别非常关键:

  • WHERE:过滤明细行
  • HAVING:过滤聚合结果

8. 多表连接:JOIN 是高频语法

Spark SQL 的很多核心任务,本质上都是多表拼接。

8.1 内连接

select
    o.order_id,
    u.user_name,
    p.product_name,
    o.quantity,
    o.amount
from orders o
join users u
    on o.user_id = u.user_id
join products p
    on o.product_id = p.product_id;

内连接只保留关联成功的数据。

8.2 左连接

select
    u.user_id,
    u.user_name,
    o.order_id,
    o.amount
from users u
left join orders o
    on u.user_id = o.user_id;

左连接适合:

  • 保留主表全部数据
  • 查看哪些记录没有匹配上
  • 做标签补充

8.3 连接时的常见注意点

连接查询里最容易出问题的是这几类情况:

  • 连接键类型不一致
  • 一对多关系导致结果放大
  • 字段重名没有加表别名
  • 维表重复导致明细被意外膨胀

所以在写连接 SQL 时,建议明确:

  • 连接键是什么
  • 主表是谁
  • 连接后预期行数会不会变化

9. 子查询与 CTE

复杂 SQL 往往离不开子查询和 CTE。

9.1 子查询

select *
from (
    select
        user_id,
        sum(amount) as total_amount
    from orders
    group by user_id
) t
where t.total_amount >= 1000;

9.2 CTE 写法

CTE 就是 WITH 子句,适合把复杂逻辑拆开写。

with user_amount as (
    select
        user_id,
        sum(amount) as total_amount
    from orders
    group by user_id
),
user_profile as (
    select
        u.user_id,
        u.user_name,
        u.city,
        a.total_amount
    from users u
    left join user_amount a
        on u.user_id = a.user_id
)
select *
from user_profile
where total_amount >= 1000;

这种写法的优点很明显:

  • 可读性更高
  • 分层更清晰
  • 更适合维护复杂业务逻辑

10. 窗口函数:做排名、组内统计特别方便

窗口函数是 Spark SQL 里非常实用的一块内容。

10.1 组内排名

select
    user_id,
    order_id,
    amount,
    row_number() over (
        partition by user_id
        order by amount desc
    ) as rn
from orders;

这个写法常用于:

  • 每个用户金额最高的一笔订单
  • 每个类目金额最高的商品
  • 每个分组取前 N 条记录

10.2 每组取金额最高的一条

with ranked_orders as (
    select
        user_id,
        order_id,
        amount,
        row_number() over (
            partition by user_id
            order by amount desc, order_id asc
        ) as rn
    from orders
)
select
    user_id,
    order_id,
    amount
from ranked_orders
where rn = 1;

10.3 累计求和

select
    user_id,
    order_id,
    amount,
    sum(amount) over (
        partition by user_id
        order by amount desc
        rows between unbounded preceding and current row
    ) as running_amount
from orders;

除了 row_number(),常见窗口函数还包括:

  • rank()
  • dense_rank()
  • sum() over (...)
  • avg() over (...)
  • count() over (...)

11. 空值处理:null 相关写法要熟

空值处理在数据开发里非常常见。

11.1 判断空值

select *
from users
where city is null;
select *
from users
where city is not null;

11.2 用默认值填充

select
    user_id,
    coalesce(city, 'unknown') as city_name
from users;

coalesce 的作用是返回第一个非空值。

例如:

select
    user_id,
    coalesce(city, 'unknown') as city_name,
    coalesce(user_level, 'normal') as level_name
from users;

11.3 避免空值参与计算带来歧义

select
    order_id,
    coalesce(amount, 0) as safe_amount
from orders;

12. 字符串处理函数

字符串处理也是 Spark SQL 的常见需求。

12.1 拼接字符串

select
    user_id,
    concat(user_name, '_', city) as user_tag
from users;

12.2 截取与替换

select
    product_name,
    substr(product_name, 1, 4) as short_name,
    regexp_replace(product_name, ' ', '') as clean_name
from products;

12.3 大小写转换与去空格

select
    trim(user_name) as user_name_clean,
    upper(city) as city_upper,
    lower(city) as city_lower
from users;

常见字符串函数包括:

  • concat
  • substr
  • length
  • trim
  • upper
  • lower
  • regexp_replace
  • split

13. 数组与拆分:explode 很实用

实际业务里,经常遇到一行里带一个数组字段,或者某个字段里存着分隔字符串。
这时 explode 很好用。

13.1 拆分字符串再展开

假设有一个字段 tags,内容类似:

phone,electronics,hot

可以这样写:

select
    product_id,
    explode(split(tags, ',')) as tag
from product_tags;

执行后,一行会被拆成多行。

例如原始数据:

product_id = 101, tags = phone,electronics,hot

可能会变成:

101  phone
101  electronics
101  hot

13.2 explode_outer

如果担心空数组或空字段导致数据直接丢掉,可以考虑:

select
    product_id,
    explode_outer(split(tags, ',')) as tag
from product_tags;

14. JSON 处理:Spark SQL 也很常见

很多原始数据都带 JSON 字段,Spark SQL 对这类场景支持不错。

14.1 提取 JSON 字段

假设表 event_log 中有个字段 payload

{"page":"home","channel":"app","score":98}

可以直接取值:

select
    get_json_object(payload, '$.page') as page_name,
    get_json_object(payload, '$.channel') as channel_name
from event_log;

14.2 把 JSON 解析成结构化字段

select
    from_json(
        payload,
        'page string, channel string, score int'
    ) as payload_obj
from event_log;

再进一步取子字段:

with parsed_log as (
    select
        from_json(
            payload,
            'page string, channel string, score int'
        ) as payload_obj
    from event_log
)
select
    payload_obj.page,
    payload_obj.channel,
    payload_obj.score
from parsed_log;

15. UNIONUNION ALL

如果要合并多份结构一致的数据,通常会用到这两个语法。

15.1 UNION ALL

select user_id, amount
from orders_a

union all

select user_id, amount
from orders_b;

UNION ALL 会直接拼接结果,不去重。

15.2 UNION

select user_id, amount
from orders_a

union

select user_id, amount
from orders_b;

UNION 会去重,代价通常更高。

如果业务不要求去重,优先考虑 UNION ALL


16. 去重保留一条:这是高频需求

很多明细表会出现重复数据,常见处理方式是配合窗口函数去重。

with dedup_orders as (
    select
        *,
        row_number() over (
            partition by order_id
            order by amount desc
        ) as rn
    from orders
)
select *
from dedup_orders
where rn = 1;

这个模式很常见,关键点在于:

  • 用什么字段定义“重复”
  • 用什么排序规则决定“保留哪一条”

17. 创建结果表与写入结果

Spark SQL 不只是查数据,也可以落结果。

17.1 建表查询

create table user_order_summary as
select
    user_id,
    count(*) as order_cnt,
    sum(amount) as total_amount
from orders
group by user_id;

17.2 插入已有表

insert overwrite table user_order_summary
select
    user_id,
    count(*) as order_cnt,
    sum(amount) as total_amount
from orders
group by user_id;

不同环境里,表管理方式可能不同:

  • 有的基于 Hive Metastore
  • 有的基于 Lakehouse 体系
  • 有的直接查外部文件

SQL 写法会有一些差别,但整体思路基本一致。


18. 写 Spark SQL 时的几个实用建议

18.1 少用 select *

显式列字段通常更稳:

  • 可读性更好
  • 更方便排查问题
  • 有助于减少无用列读取

18.2 大表连接时看清主键关系

如果维表有重复,连接结果可能被放大很多倍。
这是 Spark SQL 任务里非常常见的问题来源。

18.3 复杂逻辑尽量拆成 CTE

与其写一条极长 SQL,不如按语义拆成多段 WITH

18.4 关注执行计划

可以用:

explain
select
    u.city,
    sum(o.amount) as total_amount
from users u
join orders o
    on u.user_id = o.user_id
group by u.city;

执行计划能帮助你判断:

  • 有没有多余扫描
  • 有没有不必要的 shuffle
  • 连接策略是否合理

18.5 小表可以考虑广播

在部分场景里,可以显式提示广播小表:

select /*+ broadcast(u) */
    o.order_id,
    o.amount,
    u.user_name
from orders o
join users u
    on o.user_id = u.user_id;

这类写法常用于:

  • 明细大表关联小维表
  • 减少 shuffle 成本

是否生效还要结合执行计划来看。


19. 一组很常用的 Spark SQL 片段

如果只保留最常用的几类写法,基本可以记住下面这些:

19.1 过滤

select *
from orders
where status = 'paid'
  and amount >= 100;

19.2 聚合

select
    user_id,
    sum(amount) as total_amount
from orders
group by user_id;

19.3 连接

select
    o.order_id,
    u.user_name
from orders o
left join users u
    on o.user_id = u.user_id;

19.4 排名

select
    user_id,
    order_id,
    amount,
    row_number() over (
        partition by user_id
        order by amount desc
    ) as rn
from orders;

19.5 空值处理

select
    user_id,
    coalesce(city, 'unknown') as city_name
from users;

20. 总结

Spark SQL 的优势不在“语法有多特别”,而在于它把熟悉的 SQL 能力放进了 Spark 的分布式执行环境里。
对于大多数数据开发任务,真正高频的内容就是这几类:

  • 过滤
  • 聚合
  • 连接
  • 窗口函数
  • 空值处理
  • 数组与 JSON 解析

如果你已经能把这几类写顺手,很多 Spark 数据处理任务就能比较自然地展开。

可以用一句话概括这篇文章的重点:

Spark SQL 的核心,不是记住所有函数,而是熟练掌握常见数据处理模式。


21. 附:一段可直接运行的示例结构

下面是一段很常见的 PySpark + Spark SQL 组合写法:

users_df.createOrReplaceTempView("users")
orders_df.createOrReplaceTempView("orders")

result_df = spark.sql("""
with user_amount as (
    select
        user_id,
        count(*) as order_cnt,
        sum(amount) as total_amount
    from orders
    where status = 'paid'
    group by user_id
)
select
    u.user_id,
    u.user_name,
    u.city,
    a.order_cnt,
    a.total_amount
from users u
left join user_amount a
    on u.user_id = a.user_id
""")

result_df.show()

这段代码里已经包含了 Spark SQL 最核心的几种能力:

  • 临时视图
  • 条件过滤
  • 聚合
  • CTE
  • 左连接

把这些内容掌握扎实,Spark SQL 的日常开发就已经具备不错的基础了。