Elasticsearch python版




2022-10-28

blog_main_img

Elasticsearch 很容易被一句“能搜得很快”概括掉,但真正落到业务里,它更像一套检索工作台:字段怎么建、词怎么切、查询怎么拼、结果怎么排、Python 怎么把数据送进去,每一步都会影响最后的体验。

最近测试es和ck的功能,这里放上es的个人经验。。。

拿一个“商品内容检索”场景讲:既要能搜标题和简介,也要能按品牌、类目、价格、状态筛选,还要能给重点字段更高权重。这样写出来的套路,迁移到文章检索、知识库检索、后台订单搜索也比较顺手。

先看概念

Elasticsearch 里的 index 可以理解成一类文档的集合,document 是具体的一条数据,mapping 决定字段类型和分词策略,query DSL 负责描述检索条件。

它最拿手的是倒排索引:把文本切成词项,再记录每个词项命中了哪些文档。这样用户搜“降噪耳机”时,系统不用逐条扫描整段文本,而是从词项入口直接找候选文档,再做相关性计算。

但这里有个容易踩的点:Elasticsearch 不是把数据库查询语句换个壳。你不能等业务跑起来以后,再随手把所有字段都塞进去搜索。字段建模越随意,后面的排序、聚合、过滤、中文分词都会变得很难收拾。

建索引前,先想清楚字段的职责

比如商品检索可以先拆成几类字段:

title      参与全文搜索,权重要高
brief      参与全文搜索,权重略低
brand      精确过滤,也可以做聚合
category   精确过滤,也可以做聚合
tags       参与搜索,也能做标签筛选
price      范围筛选和排序
status     上架状态过滤
rank_score 业务排序分
sku        唯一标识

这里最关键的是分清 textkeyword

text 会走分词,适合标题、简介、正文这类需要全文检索的字段。

keyword 不分词,适合品牌、状态、类目、编号这类精确匹配字段。

数字字段不要用字符串凑合,否则范围查询和排序会出怪问题。

Elasticsearch 映射与分词

下面用 Python 创建一个索引。中文分词方案需要按集群安装情况选择;这里先用内置 standard 跑通流程,如果集群装了中文分词插件,再把 analyzer 换成对应名称。

from elasticsearch import Elasticsearch

es = Elasticsearch(
    "http://localhost:9200",
    basic_auth=("elastic", "change_me"),
)

index_name = "shop_product"

settings = {
    "analysis": {
        "analyzer": {
            "product_text_analyzer": {
                "type": "standard"
            }
        }
    }
}

mappings = {
    "properties": {
        "sku": {"type": "keyword"},
        "title": {
            "type": "text",
            "analyzer": "product_text_analyzer",
            "fields": {
                "raw": {"type": "keyword"}
            }
        },
        "brief": {
            "type": "text",
            "analyzer": "product_text_analyzer"
        },
        "brand": {"type": "keyword"},
        "category": {"type": "keyword"},
        "tags": {"type": "keyword"},
        "price": {"type": "scaled_float", "scaling_factor": 100},
        "status": {"type": "keyword"},
        "rank_score": {"type": "integer"}
    }
}

if not es.indices.exists(index=index_name):
    es.indices.create(
        index=index_name,
        settings=settings,
        mappings=mappings,
    )

title 这里多加了一个 raw 子字段。title 用来全文搜索,title.raw 可以用来精确排序或聚合。很多业务里,一个字段同时有“能搜”和“能精确处理”两种诉求,多字段映射就很实用。

写入数据:别一条条慢慢塞

Python 客户端提供了 bulk helpers,批量写入更适合业务初始化、同步任务、搜索索引重建。

from elasticsearch import helpers

products = [
    {
        "sku": "P10001",
        "title": "轻薄降噪蓝牙耳机",
        "brief": "通勤和办公都能用,佩戴负担低",
        "brand": "Acme",
        "category": "audio",
        "tags": ["wireless", "noise-control"],
        "price": 399.00,
        "status": "online",
        "rank_score": 86,
    },
    {
        "sku": "P10002",
        "title": "运动防汗入耳耳机",
        "brief": "稳固佩戴,适合健身训练",
        "brand": "Nova",
        "category": "audio",
        "tags": ["sport", "wireless"],
        "price": 269.00,
        "status": "online",
        "rank_score": 72,
    },
]

actions = [
    {
        "_index": index_name,
        "_id": product["sku"],
        "_source": product,
    }
    for product in products
]

helpers.bulk(es, actions)
es.indices.refresh(index=index_name)

上面 _id 直接用 sku,这样重复写入同一个商品时会覆盖旧文档,适合“搜索索引是业务库投影”的模式。如果你希望保留多份记录,就不要用业务编号当 _id

查询别混成一锅粥

一个好用的搜索接口,通常把条件拆成三层:

must 放真正参与相关性计算的匹配条件。

filter 放精确筛选,不参与评分,适合状态、类目、范围条件。

should 放加权偏好,比如品牌优先、标签加分、运营分加成。

Elasticsearch 查询评分

query_text = "轻薄 降噪"

query = {
    "bool": {
        "must": [
            {
                "multi_match": {
                    "query": query_text,
                    "fields": ["title^4", "brief^2", "tags"],
                    "operator": "and"
                }
            }
        ],
        "filter": [
            {"term": {"status": "online"}},
            {"term": {"category": "audio"}},
            {"range": {"price": {"gte": 100, "lte": 800}}},
        ],
        "should": [
            {"term": {"brand": {"value": "Acme", "boost": 1.8}}},
            {"range": {"rank_score": {"gte": 80, "boost": 1.4}}},
        ],
        "minimum_should_match": 0,
    }
}

response = es.search(
    index=index_name,
    query=query,
    size=10,
    sort=[
        {"_score": "desc"},
        {"rank_score": "desc"},
        {"sku": "asc"},
    ],
    highlight={
        "fields": {
            "title": {},
            "brief": {},
        }
    }
)

for hit in response["hits"]["hits"]:
    source = hit["_source"]
    print(hit["_score"], source["sku"], source["title"])

这个查询里,title^4 表示标题权重更高,brief^2 表示简介也参与但没标题那么重。filter 负责把不合格的文档挡掉,should 则用于“命中更好,不命中也可以”的业务偏好。

如果你把所有条件都塞进 must,相关性会被很多精确条件搅乱;如果把全文搜索也塞进 filter,相关性评分又发挥不出来。这个边界要有意识地守住。

分词效果要亲眼看

搜索不准,很多时候不是查询语句写错,而是词被切得不符合业务语言。可以用 analyze API 看某段文本到底被切成什么。

tokens = es.indices.analyze(
    index=index_name,
    analyzer="product_text_analyzer",
    text="轻薄降噪蓝牙耳机",
)

for token in tokens["tokens"]:
    print(token["token"])

如果分词结果太碎,可能需要调整分词器;如果品牌、型号被错误拆开,可能要用 keyword、同义词、词典或多字段策略处理。搜索质量不是靠一个查询参数救活的,它通常来自 mapping、analyzer、数据清洗和 query DSL 的组合拳。

聚合:搜索页左侧筛选栏怎么来

商品列表常见的品牌、类目、价格区间筛选,可以用聚合拿到。

response = es.search(
    index=index_name,
    query={
        "bool": {
            "filter": [
                {"term": {"status": "online"}},
                {"term": {"category": "audio"}},
            ]
        }
    },
    size=0,
    aggs={
        "brand_options": {
            "terms": {
                "field": "brand",
                "size": 10
            }
        },
        "price_ranges": {
            "range": {
                "field": "price",
                "ranges": [
                    {"to": 200},
                    {"from": 200, "to": 500},
                    {"from": 500}
                ]
            }
        }
    }
)

for bucket in response["aggregations"]["brand_options"]["buckets"]:
    print(bucket["key"], bucket["doc_count"])

这里 size=0 表示不拿命中文档,只拿聚合结果。做筛选栏、报表入口、检索侧边栏时,这个写法非常常见。

深翻页别硬怼 from + size

浅翻页用 fromsize 很方便:

page = 2
page_size = 10

response = es.search(
    index=index_name,
    query=query,
    from_=(page - 1) * page_size,
    size=page_size,
)

但页数很深时,from + size 会让集群取出大量候选再丢弃前面的部分,代价会越来越高。更稳的做法是用 search_after,并且提供稳定排序字段。

first_page = es.search(
    index=index_name,
    query=query,
    size=10,
    sort=[
        {"rank_score": "desc"},
        {"sku": "asc"},
    ],
)

last_hit = first_page["hits"]["hits"][-1]
next_cursor = last_hit["sort"]

next_page = es.search(
    index=index_name,
    query=query,
    size=10,
    sort=[
        {"rank_score": "desc"},
        {"sku": "asc"},
    ],
    search_after=next_cursor,
)

这里 sku 作为稳定兜底排序字段,避免多个文档业务分相同导致顺序飘动。列表页如果只需要“继续加载”,search_after 比深页码更自然。
Elasticsearch Python 写入管线

更新文档:局部改字段,别重建全量数据

商品价格、状态、运营分经常会变,局部更新可以直接走 update API。

es.update(
    index=index_name,
    id="P10001",
    doc={
        "price": 379.00,
        "rank_score": 91,
    },
)

如果要根据已有字段计算新值,可以用脚本,但脚本逻辑要克制。复杂业务计算最好在业务侧完成,再把结果写进索引。

es.update(
    index=index_name,
    id="P10001",
    script={
        "source": "ctx._source.rank_score = params.score",
        "params": {
            "score": 95
        }
    },
)

用封装函数把查询写顺

业务代码里别把一坨 DSL 到处复制,可以做一个小构造器。参数越清晰,后面越好改。

def build_product_query(keyword, brand=None, category=None, price_min=None, price_max=None):
    filters = [{"term": {"status": "online"}}]

    if brand:
        filters.append({"term": {"brand": brand}})

    if category:
        filters.append({"term": {"category": category}})

    price_range = {}
    if price_min is not None:
        price_range["gte"] = price_min
    if price_max is not None:
        price_range["lte"] = price_max
    if price_range:
        filters.append({"range": {"price": price_range}})

    must = []
    if keyword:
        must.append(
            {
                "multi_match": {
                    "query": keyword,
                    "fields": ["title^4", "brief^2", "tags"],
                    "minimum_should_match": "70%"
                }
            }
        )

    return {
        "bool": {
            "must": must or [{"match_all": {}}],
            "filter": filters,
            "should": [
                {"range": {"rank_score": {"gte": 80, "boost": 1.2}}}
            ],
            "minimum_should_match": 0,
        }
    }


def search_products(keyword, brand=None, category=None, price_min=None, price_max=None, size=10):
    return es.search(
        index=index_name,
        query=build_product_query(
            keyword=keyword,
            brand=brand,
            category=category,
            price_min=price_min,
            price_max=price_max,
        ),
        size=size,
        sort=[
            {"_score": "desc"},
            {"rank_score": "desc"},
            {"sku": "asc"},
        ],
    )

这个封装看起来普通,但能挡掉很多维护痛苦:新增品牌筛选、调整字段权重、改价格范围逻辑,都集中在一个地方。