September 28, 2020

通过 Faiss 和 USE 给 ElasticSearch 增加向量搜索

通过 Faiss 和 USE 给 ElasticSearch 增加向量搜索

工作中用到,翻译的,原文在此:https://link.zhihu.com/?target=https%3A//blog.onebar.io/building-a-semantic-search-engine-using-open-source-components-e15af5ed7885

题图的标注很有趣: “Finding text by its meaning is not easier for the computer than finding Waldo on this picture is for you”

当我们开始创建第一个产品模型时我们就知道搜索部分对我们来说是最难的部分。我们的方案是集中精力在前段 UI 上先,把真正信息检索的部分扔给 ElasticSearch 去做。 ElasticSearch 是一个强大的、可伸缩并且久经考验的方案。针对纷繁复杂的场景它提供了各种不同的应对方案,但说到底,它底层还是一个基于 TF/IDF 词频计算的关键词搜索方案。当用户确切的了解自己想要的东西并且能准确的输入关键词时,ElasticSearch 工作的非常棒!但稍微复杂些的场景下它就有些力不从心,比如:

  • 试着考虑以下两个句子的相似度:

    • "How much time off can I take?"
    • "How many paid vacation days do I get per year?"
  • 或者下面酱紫的:

    • "Does my insurance cover teeth implants?"
    • "Do we have a dental plan?"
  • 甚至是多语种的:

    • “Wie bestelle ich ein Mittagessen?”
    • "How can I order food?"

以上的几个例子对基于词频统计的 ElasticSearch 来说基本就挂了,但对于最新的 NLP 技术来讲却恰好是后者的用武之地。感谢开源社区的贡献,我们有足够的开源包、预训练模型、论文等参考。本文会选用其中的几种工具作为 pipeline 来实现一个更聪明的搜索引擎。

基于关键词的搜索

ElasticSearch 以数字的方式存储了每篇文章中每个句子的每个词(ASCII or UTF 编码)。然后做倒排序以方便的找到它们,用户检索某个关键词的时候 ES 负责利用各种复杂的算法,在所有的 document 中查找该关键词对应的内容。这些个算法某种成都上也照顾到了一点点的词义,但更多的精力还是放在这些关键词出现的频率、关键词之间的距离这些问题上面。ASCII 编码包含了对人类来说足够的信息,但很不幸,这些个信息对计算机来说无异于天书。

向量表示

针对这个问题的解决方案之一是替换掉简单的 ASCII/UTF 编码,寻找一种不仅仅可以标示某一个句子包含了某一个单词这样简单的信息,而且还能标示它本身的意思的方式。比如,我们可以寻求一种可以编码 which other words our word is frequently used together with (represent by the probable context) 的方式,翻译成人话就是我们在什么上下文中使用某个单词决定了这个单词的意思,上下文 决定 意思,就这么简单。这个思路里面假设相同的上下文用到的单词也相同,以此思考并诉诸数学,我们甚至可以找到编码整个句子的方式,想象是不是有点激动?

上图中如果我们简单的比较两个向量编码可以发现两个向量完全不同,但如果我们去考虑两个向量的距离的话就会发现相同的词的距离近,也就是说两个类似的词一般会用在类似的上下文中,反过来看,用在类似上下文中的词它们的意思就相近,记住这句话。

简言之就是,确实有这样的标示方式存在,这种方式被称作 word(or sentence) embedding,几乎是机器学习 NLP 方向的基石。有很多种方法被用作分析海量的文本,把词或者整个句子向量化,映射到一个 N 维的向量空间中,在这个向量空间,你可以简单方便的计算这些向量之间的距离,距离近就意味着在这个特定的向量空间中这两个向量代表的词或者句子意思相近。在众多的这种算法中,我们选用来自 Google 的 USE( UniversalSentenceEncoder )算法,该算法开源并可以方便的下载到。接下来我们将采用 USE 方法作为核心来构建本文的搜索引擎实力。

思路简介

本示例中我们将创建一个跟 ElasticSearch 平行的服务,用于完成向量搜索。每次用户查询我们都会分别发给 ES 和向量搜索方案,以此来有效利用两种方案,优化最终的结果。这个方案下我们不仅可以获得词义搜索结果而不仅仅是单纯的关键词匹配结果;而且在词义搜索没有找到合适结果的情况下回退到关键词搜索方案。以下列出了用到的开源库:

  • Tensorflow and Simple Tensorflow Serving
  • Pre-trained Universal Sentence Encoder
  • Docker
  • Python3
  • FAISS by Facebook
  • ElasticSearch
  • gRPC for the API

Docker

我们需要把方案拆分成几个不同的组件,Docker 用来做不同组件之间的隔离是最容易上手了。同时我们还需要运行几个不同的 NLP 处理模块,而这些模块各自还依赖各自的奇奇怪怪的依赖,在同一个环境中配置比较困难,上 Docker 可以有效解决这个问题。

句子编码器

Universal Sentence Encoder 网络是众多可以拿来生成向量表示的网络之一,在我们内部的测试中它获得了最好的表现(截止 2019年7月,鉴于 NLP 这几年的猥琐发育,目前时间点上可能已经有了更好的方式也未可知),而且还有个加分项是它支持多语言,也就是说这个预训练模型是基于多语言语料训练的,当然,聪明如你肯定想到了那岂不是可以用来做翻译?是的。

Universal Sentence Encoder 可以通过 Tensorflow Hub 的方式获得,你可以很方便的下载到本地或者直接在先上体验以下。但如果是生产环境的话,还是建议部署到一个单独的服务上去,以方便更新、缩容扩容等操作。部署 Tensorflow 模型的最主流方式大概就是用官方的 Tensorflow Serving tool 了,通过这个方式我们可以把 TF 的模型 dump 出来,然后启动一个多线程的、面向生产环境的 REST API 服务。Tensorflow Serving 本身以 Docker image 的方式提供,所以我们需要做的就是把 image pull 下来,然后给他指定对应模型的路径就好了。

不幸的是,在部署 USE 的过程中我们发现 USE 用到了一个自定义的 TF 操作,sentencepiece, TF Serving 本身需要所有的自定义操作组件都以静态链接库的方式链接到 tf_serving 库,然后 blah blah... 总之我们建议你用另外一个叫 Simple Tensorflow Serving 的东西来跑 USE 模型。步骤如下:

$ git clone https://github.com/tobegit3hub/simple_tensorflow_serving
$ git clone https://github.com/google/sentencepiece
$ mkdir simple_tensorflow_serving/tf_sentencepiece
# Copy the correct version of sentencepiece library into simple_tensorflow_serving
$ cp sentencepiece/tensorflow/tf_sentencepiece/_sentencepiece_processor_ops.so.1.13.1 simple_tensorflow_serving/tf_sentencepiece/sentencepiece_processor_ops.so
# Pin the desired tensorflow version to 1.13.1
# Note that sed command is written for Mac OS, you may need to tweak it to work in Linux
$ sed -i.back s'/tensorflow.*$/tensorflow==1.13.1/g' simple_tensorflow_serving/requirements.txt
# Create a folder to store the USE model
$ mkdir simple_tensorflow_serving/models/use

然后通过以下的 Python 代码段下载 USE 模型,放到 simple_tensorflow_serving/models 目录下:

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.saved_model import simple_save

export_dir = "./models/use/00000001"
with tf.Session(graph=tf.Graph()) as sess:
    module = hub.Module("https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/1")
    text_input = tf.placeholder(dtype=tf.string, shape=[None])

    sess.run([tf.global_variables_initializer(), tf.tables_initializer()])

    embeddings = module(text_input)

    simple_save(sess,
        export_dir,
        inputs={'text': text_input},
        outputs={'embeddings': embeddings},
        legacy_init_op=tf.tables_initializer())

最后如下模样修改一下 simple_tensorflow_serving/Dockerfile :

FROM python:3.6

RUN apt-get update -y
RUN apt-get install -y unzip wget
RUN apt-get install -y default-jdk

RUN wget http://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/8/h2o-3.18.0.8.zip
RUN unzip ./h2o-3.18.0.8.zip
RUN mv h2o-3.18.0.8/h2o.jar /tmp/

ADD ./requirements.txt /
RUN pip3 install -r /requirements.txt

ADD . /simple_tensorflow_serving/
WORKDIR /simple_tensorflow_serving/
RUN cp ./third_party/openscoring/openscoring-server-executable-1.4-SNAPSHOT.jar /tmp/

RUN python ./setup.py install

# Modify the CMD to include the path to our USE model and a shared sentencepiece library.
# We also hardcode the serving port here, but you can tweak it to your convenience

CMD ["simple_tensorflow_serving", "--port=8501", "--model_base_path=/simple_tensorflow_serving/models/use/", "--custom_op_paths=/simple_tensorflow_serving/tf_sentencepiece/"]

创建 Docker 镜像:

$ docker build . --tag use:latest

运行:

$ docker run -it use:latest

编码器客户端逻辑

每隔几个月就会有大公司发布新的 NLP 模型,所以我们的 pipeline 支持方便的切换不同的 NLP 模型就变得非常重要。为此我们设计了如下的一个 interface 用来跟 Simple Tensorflow Serving 的 REST 接口通信,具体代码如下:


import logging
from abc import ABC, abstractmethod
from typing import Iterable, List

import requests
import numpy as np


logger = logging.getLogger(__name__)


class Encoder(ABC):
    """A shared encoder interface.
    Each client must provide an encode() method and a FEATURE_SIZE
    constant indicating the size of encoded vectors"""

    FEATURE_SIZE: int

    @abstractmethod
    def encode(self, data: Iterable[str]) -> np.array:
        pass


class UniversalEncoderError(Exception):
    pass


class UniversalEncoder(Encoder):
    """
        Requests-based client for the Universal Sentence Encoder TF model
    """

    # Length of returned vectors
    FEATURE_SIZE = 512
    BATCH_SIZE = 32

    def __init__(self, host, port):
        self.server_url = "http://{host}:{port}".format(
            host=host,
            port=port,
        )

    @staticmethod
    def _sanitize_input(sentence: str):
        return sentence.replace('\n', '').lower().strip()[:1000]

    def encode(self, data: Iterable[str]) -> np.array:
        logger.debug(f"Encode request: {data}")
        data = [self._sanitize_input(sentence) for sentence in data]
        all_vectors: List[List[float]] = []
        
        for i in range(0, len(data), self.BATCH_SIZE):
            batch = data[i:i+self.BATCH_SIZE]
            response = requests.post(
                url=self.server_url,
                json={
                    "model_name": "default",
                    "model_version": "00000001",
                    "data": {
                        "text": batch
                    }
                }
            )
            if not response.ok:
                raise UniversalEncoderError(
                    f"Bad response from encoder: {response}"
                )
            all_vectors += response.json()['embeddings']
            
        return np.array(all_vectors).astype(np.float32)
        

创建索引

如何编码上面已经介绍完了,接下来我们说说具体如何创建一个索引并实际的检索它。ElasticSearch 里面是通过 Trie-Tree 倒排序的方式来快速的定位一个关键词和与之对应的包含该关键词的文档列表的。在关键词对关键词快速检索场景下,Trie 树是一个非常高效的数据结构,但不幸的是在向量空间中并没有与之对应的东西,因为向量空间中无法做到精确的关键词对关键词匹配。取而代之的是,我们采用 K 近邻方式来完成一个针对向量的检索,检索到的记录可以是记录的标题或者文档的其他字段。因为距离本身就是一个衡量相关性的标准,所以没有必要再找一个额外的指标来了。

Facebook FAISS

有众多的方式方法可以帮你完成向量化空间的索引工作, OneBar 选择的是 FAISS,来自 Facebook AI Research 部门并以 MIT 协议开源。

Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that possibly do not fit in RAM. It also contains supporting code for evaluation and parameter tuning. Faiss is written in C++ with complete wrappers for Python/numpy. Some of the most useful algorithms are implemented on the GPU.

使用 FAISS 和 Encoder 的代码示例如下:

import faiss
from encoder import UniversalEncoder

# Don't forget to start the encoder container locally and open the 8501 port

encoder = UniversalEncoder(host='localhost', port=8501)
data = [
    'What color is chameleon?',
    'When is the festival of colors?',
    'When is the next music festival?',
    'How far is the moon?',
    'How far is the sun?',
    'What happens when the sun goes down?',
    'What we do in the shadows?',
    'What is the meaning of all this?',
    'What is the meaning of Russel\'s paradox?',
    'How are you doing?'
]
encoded_data = encoder.encode(data)

# We're going to use a regular "Flat Inner Product" index here
# as we're not storing millions of rows.
# We'll also wrap it in the IndexIDMap just to demonstrate
# how you can store actual document IDs right in the index
# and get them back along with the found vectors when searching

index = faiss.IndexIDMap(faiss.IndexFlatIP(encoder.FEATURE_SIZE))
index.add_with_ids(encoded_data, np.array(range(0, len(data))))

def search(query):
    query_vector = encoder.encode([query])
    k = 1
    top_k = index.search(query_vector, k)
    return [
        data[_id] for _id in top_k[1].tolist()[0]
    ]
    
# Examples:
    
print(search("When is Holi?"))

# >>> ['When is the festival of colors?']

print(search("How far is the Earth satelite?"))

# >>> ['How far is the moon?']

print(search("How far is the shiny yellow thing?"))

# >>> ['How far is the sun?']

print(search("What is the meaning of life?"))

# >>> ['What is the meaning of all this?']

print(search("Are we vampires?"))

# >>> ['What we do in the shadows?']

print(search("как оно?"))

# >>> ['How are you doing?']

注意,我们这里采用了 faiss.IndexFlatIP 索引方式,这种方式只适合小数据且不支持优化,在这种索引上面进行搜索复杂度是 O(N),所以如果有百万级别的向量数据的话,会变的非常慢。针对大数组数据, FAISS 允许你先进行聚合操作,然后只需要在结果之上进行查询即可。这个方法把搜索的速度提高了 K 倍(K是聚合的操作次数),但缺点就是可能会丧失部分精度。详细信息可以参考 wiki。

OneBar 自己写了一个 FAISS 的 wrapper,针对每个客户端创建不同的索引。一个索引包含标题,其他的包含句子形式存在的,该标题对应的描述等信息。用户检索时,会返回每个符合的索引,其中包含标题的索引优先级适当提高一点。

数据持久化

基于句子编码的检索非常消耗系统资源。在一颗 CPU 上编码 1000 条句子大约需要 5 分钟的时间!如果你的场景需要存储海量的句子,并且经常需要重新创建索引的话,每次都把全量数据重新创建索引的话也没什么必要。幸运的是 FAISS 内置了一种可以针对磁盘上的索引进行 dump/load 操作的方法。


state_dir = './search_data/'

faiss.write_index(
    index,
    os.path.join(state_dir, "faiss.index")
)

index = faiss.read_index(
    os.path.join(state_dir, "faiss.index")
)

通过这个方式你就可以按照需要设计处理流程,在合适的时候把服务状态写到磁盘并在需要的时候重新 load 回来。

gRPC

设计上我们希望我们的语义索引可以以一种独立服务的方式提供服务,换句话说最好能支持并发访问,但同时我们有不希望去处理麻烦的同步逻辑。提供一个 gRPC API 索引服务是一个比较可行的方式。gRPC 提供了一种标准的架构方式,可以方便的对外提供多线程对并发的服务,下面是一个最小化的 gRPC 接口示例:


syntax = "proto3";

message TopKDocumentsRequest {
    string query = 1;
    int32 k = 2;
}

message DocumentsWithinDistanceRequest {
    string query = 1;
    double distance = 2;
}

message DocumentMatch {
    int64 id = 1;
    double distance = 2;
}

message SearchResponse {
   enum Error {
       NO_ERROR = 0;
       INDEX_IS_NOT_READY = 1;
       UNKNOWN_ERROR = 2;
   }
   repeated DocumentMatch matches = 1;
   Error error = 2;
}

message SyncRequest {
    oneof target {
        int64 documentId = 1;
        bool all = 2;
    }
    bool delete = 3;
}

message SyncResponse {
    enum Error {
        NO_ERROR = 0;
        ANOTHER_SYNC_IN_PROGRESS = 1;
        INDEX_IS_NOT_READY = 2;
        UNKNOWN_ERROR = 3;
    }
    Error error = 1;
}

service Victor {
    rpc FindTopKDocuments(TopKDocumentsRequest) returns (SearchResponse);
    rpc FindDocumentsWithinDistance(DocumentsWithinDistanceRequest) returns (SearchResponse);
    rpc Sync(SyncRequest) returns (SyncResponse);
}

把该文件放到一个子目录下( ./protos ) ,编译:

$ python -m grpc_tools.protoc -I./protos/ --python_out=./protos/ --grpc_python_out=./protos/ ./protos/victor.proto

接下来就可以根据自己的需要继承并实现 victorpb2grpc.VictorService 里面的方法。根据不同的场景,可以选择创建不同的索引来支持。

ElasticSearch

跳过安装配置不讲,在我们的架构中,每个 document 都会被创建两份索引:ElasticSearch 创建一份,我们自己组建的语义搜索创建一份。原先的 ElasticSearch 还会负责 “简单” 的搜索,基于向量的语义搜索负责更模糊的匹配。

前端和最终的结果展现

最终我们得到了组件好的 pipeline,再加上一个前端就齐活儿了,这个前端负责接收用户的检索请求,在向量搜索和 ElasticSearch 上分别得到结果,结果最终排序,然后给到用户。最终结果的一个简单的排序方法是为向量距离设定一个阈值,把超过这个阈值的 ES 搜索结果作为最终的结果。当然还有更多、更复杂的方式可以来做精排这个事情,但这里就不再展开说了。逻辑如下:

# Connect to the semantic-search service and run the query
con = get_victor_connection()
response = con.FindTopKProblems(
    victor_pb2.TopKProblemsRequest(
        query=search_query,
        # In our implementation -1 means "return all matches"
        k=-1,
    ),
)

# Because we're using Cosine Similarity to find closes vectors,
# the resulting distance will always be in the range from -1 to 1.
# This allows us to easily define a confidence threshold and 
# consider anything above this threshold to be a
# "High Confidence Result"

CONFIDENCE_THRESHOLD = 0.65

# A custom scoring function we're goint to pass to ElasticSearch
# More information here: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-function-score-query.html

scroring_functions = [
    {
        "filter": {"term": {"document_id": match.id}},
        # This function will re-shape elastic search results in a way
        # that those, that scored well in similarity search, will move higher
        "weight": math.pow(
            document_count, match.distance
        )
    } for match in response.matches
]

high_confidence_docs = [
    match.id for match in response.matches
    if match.distance > CONFIDENCE_THRESHOLD
]

elastic_query = {
    'bool': {
        'should': [{
            "match": {
                "title": search_query,
            }
        },
        #
        # You can include any extra fields that you want to
        # query for your documents here
        # ...
        #
        #
        # The "terms" part of the query would ensure that anything that
        # has been found by Victor will always be returned by ElasticSearch
        {
            "terms": {
                "document_id": high_confidence_docs,
                # You can add extra boost to high confidence results
                "boost": 10,
            }
        }]
    }
}

params = {
    'query': {
        "function_score": {
            "query": elastic_query
            "functions": scoring_functions
        }
    }
}

# We use https://github.com/elastic/elasticsearch-dsl-py
# It's a python wrapper around narive ES DSL

from elasticsearch_dsl import Search
from our_project.search import Document

search = Search.from_dict(params)
search = search.index(Document.get_index_name())

search.execute()

最终结果差不多长这样

总结

上面的示例挺吸引人的,但实际使用的话还是有不少的限制:

  • 针对整句做编码操作是一个非常重的活。 USE 针对短句的编码比较稳定可靠,如果长度也都差不多就更好。
  • 编码器在正常书写的句子上工作的很好,但遇到特定的短语、缩写这些情况就力不从心。这也是为什么我们把 ElasticSearch 作为兜底方案的原因,因为 ElasticSearch 针对不常见的写法和不完整拼写等场景都支持的很好。