最近的论文《搜索增强生成中的最佳实践》通过实证研究评估了各种增强RAG技术的效果,旨在汇聚一套RAG的最佳实践。
Wang Pipeline由Wang及其同事推荐的RAG管道。
我们将实现一些这些最佳实践,特别是那些旨在提高搜索质量的技术(句子分块、HyDE、反向打包)。
为了简洁起见,我们将省略那些专注于提高效率的技术(查询分类和摘要生成)。
我们还将实现一些未涉及但我个人认为有用且有趣的技术(元数据包含、复合多字段嵌入、查询扩展)。
最后,我们将进行一个简短的测试,以查看我们的搜索结果和生成的答案是否比基线有所改进。让我们开始吧!
概述
RAG旨在通过从外部知识库中检索信息来增强LLM(大语言模型)的生成答案。通过提供领域特定的信息,LLM可以快速适应其训练数据范围之外的用例;这比微调便宜得多,也更容易保持最新。
改善RAG质量的措施通常集中在两个方面:
- 提高知识库的质量和清晰度。
- 改进搜索查询的覆盖范围和具体性。
这两种措施将提高LLM访问相关事实和信息的几率,从而减少幻觉或依赖其自身可能过时或无关的知识。
这些方法的多样性难以在几句话中澄清。让我们直接进入实现,以便更清楚地理解。
Han Pipeline图1:作者使用的RAG管道。
目录
- 设置
- 文档的摄取、处理和嵌入
- 数据摄取
- 句子级别、基于令牌的分块
- 元数据包含与生成
- 复合多字段嵌入
- 附录
- 定义
猫咪休息
设置
所有代码可以在 Searchlabs仓库中找到。
首先,你需要以下内容:
- 一个Elastic云部署
- 一个LLM API - 我们在此笔记本中使用的是Azure OpenAI上的GPT-4o部署
- Python版本3.12.4或更高版本
我们将从main.ipynb笔记本运行所有代码。
继续git clone这个仓库,导航到supporting-blog-content/advanced-rag-techniques,然后运行以下命令:
代码语言:bash复制# 创建一个名为'rag_env'的新虚拟环境
python -m venv rag_env
# 激活虚拟环境(对于基于Unix的系统)
source rag_env/bin/activate
# (对于Windows)
.rag_envScriptsactivate
# 安装requirements.txt中列出的包
pip install -r requirements.txt完成后,创建一个.env文件并填写以下字段(参考.env.example)。感谢我的合著者Claude-3.5的有用评论。
代码语言:bash复制# Elastic Cloud: 在Elastic Cloud控制台的“Deployment”页面找到
ELASTIC_CLOUD_ENDPOINT=""
ELASTIC_CLOUD_ID=""
# Elastic Cloud: 在部署设置期间或在“Security”设置中创建
ELASTIC_USERNAME=""
ELASTIC_PASSWORD=""
# Elastic Cloud: 在Kibana或通过API创建的索引名称
ELASTIC_INDEX_NAME=""
# Azure AI Studio: 在Azure OpenAI资源的“Keys and Endpoint”部分找到
AZURE_OPENAI_KEY_1=""
AZURE_OPENAI_KEY_2=""
AZURE_OPENAI_REGION=""
AZURE_OPENAI_ENDPOINT=""
# Azure AI Studio: 在Azure OpenAI资源的“Deployments”部分找到
AZURE_OPENAI_DEPLOYMENT_NAME=""
# 使用BAAI/bge-small-en-v1.5,因为我认为它在资源效率和性能之间取得了良好的平衡。
HUGGINGFACE_EMBEDDING_MODEL="BAAI/bge-small-en-v1.5"接下来,我们将选择要摄取的文档,并将其放置在documents文件夹中。对于本文,我们将使用Elastic N.V. 2023年年度报告。这是一个相当具有挑战性和密集的文档,非常适合压力测试我们的RAG技术。
Han PipelineElastic 2023年年度报告
现在一切准备就绪,让我们开始进行摄取。打开main.ipynb并执行前两个单元格以导入所有包并初始化所有服务。
返回顶部
文档的摄取、处理和嵌入
数据摄取
- 个人注释:LlamaIndex的便利性让我惊叹不已。在没有LLMs和LlamaIndex的旧时代,摄取各种格式的文档是一个痛苦的过程,需要从各处收集晦涩的包。现在,它只需一个函数调用。真是太神奇了。
SimpleDirectoryReader将加载directory_path中的所有文档。对于.pdf文件,它返回一个文档对象列表,我将其转换为Python字典,因为我发现它们更容易处理。
# llamaindex_processor.py
from llama_index.core import SimpleDirectoryReader
class LlamaIndexProcessor:
def __init__(self):
pass
def load_documents(self, directory_path):
'''加载目录中的所有文档'''
reader = SimpleDirectoryReader(input_dir=directory_path)
return reader.load_data()
# main.ipynb
llamaindex_processor = LlamaIndexProcessor()
documents = llamaindex_processor.load_documents('./documents/')
documents = [dict(doc_obj) for doc_obj in documents]每个字典包含text字段中的关键内容。它还包含一些有用的元数据,例如页码、文件名、文件大小和类型。
{
'id_': '5f76f0b3-22d8-49a8-9942-c2bbab14f63f',
'metadata': {
'page_label': '5',
'file_name': 'Elastic_NV_Annual-Report-Fiscal-Year-2023.pdf',
'file_path': '/Users/han/Desktop/Projects/truckasaurus/documents/Elastic_NV_Annual-Report-Fiscal-Year-2023.pdf',
'file_type': 'application/pdf',
'file_size': 3724426,
'creation_date': '2024-07-27',
'last_modified_date': '2024-07-27'
},
'text': '目录n页码n第一部分n项目1. 业务 3n15 项目1A. 风险因素n项目1B. 未解决的员工意见 48n项目2. 物业 48n项目3. 法律诉讼 48n项目4. 矿山安全披露 48n第二部分n项目5. 登记人普通股的市场、相关股东事项和发行人股票购买 49n项目6. [保留] 49n项目7. 财务状况和经营成果的管理层讨论与分析 50n项目7A. 关于市场风险的定量和定性披露 64n项目8. 财务报表和补充数据 66n项目9. 关于会计和财务披露的会计师变更和分歧 100n100n101 项目9A. 控制和程序n项目9B. 其他信息n项目9C. 关于防止检查的外国司法管辖区的披露 101n第三部分n102n102n102n102 项目10. 董事、高级管理人员和公司治理n项目11. 高级管理人员薪酬n项目12. 某些受益所有人和管理层的证券持有情况和相关股东事项n项目13. 某些关系和相关交易及董事独立性n项目14. 主要会计师费用和服务 102n第四部分n103n105 项目15. 附件和财务报表附表n项目16. 10-K表格摘要n签名 106ni',
...
}返回顶部
句子级别、基于令牌的分块
首先,我们需要将文档减少到标准长度的块(以确保一致性和可管理性)。嵌入模型有唯一的令牌限制(它们可以处理的最大输入大小)。令牌是模型处理的基本文本单位。为了防止信息丢失(截断或遗漏内容),我们应提供不超过这些限制的文本(通过将较长的文本拆分为较小的段)。
分块对性能有显著影响。理想情况下,每个块都应代表一个自包含的信息块,捕捉到单个主题的上下文信息。分块方法包括基于词汇的分块,其中文档按词数拆分,以及语义分块,它使用LLM识别逻辑断点。
基于词汇的分块便宜、快速且简单,但有可能拆分句子,从而破坏上下文。语义分块变得缓慢且昂贵,特别是如果你处理像116页的Elastic年度报告这样的文档。
让我们选择一种折中的方法。句子级分块仍然简单,但比基于词汇的分块更有效地保留上下文,同时成本和速度显著降低。此外,我们将实现一个滑动窗口,以捕捉周围的一些上下文,缓解拆分段落的影响。
代码语言:python代码运行次数:0复制# chunker.py
import uuid
import re
class Chunker:
def __init__(self, tokenizer):
self.tokenizer = tokenizer
def split_into_sentences(self, text):
"""将文本拆分成句子。"""
return re.split(r'(?<=[.!?])s ', text)
def sentence_wise_tokenized_chunk_documents(self, documents, chunk_size=512, overlap=20, min_chunk_size=50):
'''
1. 将文本拆分成句子。
2. 使用提供的分词器方法进行分词。
3. 构建最多chunk_size限制的块。
4. 基于令牌创建重叠 - 保留上下文。
5. 只保留符合最小令牌大小要求的块。
'''
chunked_documents = []
for doc in documents:
sentences = self.split_into_sentences(doc['text'])
tokens = []
sentence_boundaries = [0]
# 分词所有句子并跟踪句子边界
for sentence in sentences:
sentence_tokens = self.tokenizer.encode(sentence, add_special_tokens=True)
tokens.extend(sentence_tokens)
sentence_boundaries.append(len(tokens))
# 创建块
chunk_start = 0
while chunk_start < len(tokens):
chunk_end = chunk_start chunk_size
# 找到块中合适的最后一个完整句子
sentence_end = next((i for i in sentence_boundaries if i > chunk_end), len(tokens))
chunk_end = min(chunk_end, sentence_end)
# 创建块
chunk_tokens = tokens[chunk_start:chunk_end]
# 检查块是否符合最小大小要求
if len(chunk_tokens) >= min_chunk_size:
# 为此块创建一个新的文档对象
chunk_doc = {
'id_': str(uuid.uuid4()),
'chunk': chunk_tokens,
'original_text': self.tokenizer.decode(chunk_tokens),
'chunk_index': len(chunked_documents),
'parent_id': doc['id_'],
'chunk_token_count': len(chunk_tokens)
}
# 从原始文档复制所有其他字段
for key, value in doc.items():
if key != 'text' and key not in chunk_doc:
chunk_doc[key] = value
chunked_documents.append(chunk_doc)
# 移动到下一个块开始,考虑重叠
chunk_start = max(chunk_start chunk_size - overlap, chunk_end - overlap)
return chunked_documents
# main.ipynb
# 初始化嵌入模型
HUGGINGFACE_EMBEDDING_MODEL = os.environ.get('HUGGINGFACE_EMBEDDING_MODEL')
embedder = EmbeddingModel(model_name=HUGGINGFACE_EMBEDDING_MODEL)
# 初始化分块器
chunker = Chunker(embedder.tokenizer)Chunker类接收嵌入模型的分词器来编码和解码文本。我们现在将构建每个512个令牌的块,重叠20个令牌。为此,我们将文本拆分成句子,对这些句子进行分词,然后将分词后的句子添加到当前块中,直到无法再添加而不超过令牌限制。
最后,将句子解码回原始文本进行嵌入,并将其存储在名为original_text的字段中。块存储在名为chunk的字段中。为了减少噪音(即无用的文档),我们将丢弃任何小于50个令牌的文档。
让我们在我们的文档上运行它:
代码语言:python代码运行次数:0复制chunked_documents = chunker.sentence_wise_tokenized_chunk_documents(documents, chunk_size=512)并得到类似这样的文本块:
代码语言:python代码运行次数:0复制print(chunked_documents[4]['original_text'])
[CLS] the aggregate market value of the ordinary shares held by non - affiliates of the registrant, based on the closing price of the shares of ordinary shares on the new york stock exchange on october 31, 2022 ( the last business day of the registrant ’ s second fiscal quarter ), was approximately $ 6. 1 billion. [SEP] [CLS] as of may 31, 2023, the registrant had 97, 390, 886 ordinary shares, par value €0. 01 per share, outstanding. [SEP] [CLS] documents incorporated by reference portions of the registrant ’ s definitive proxy statement relating to the registrant ’ s 2023 annual general meeting of shareholders are incorporated by reference into part iii of this annual ......返回顶部
元数据包含与生成
我们已经对文档进行了分块。现在是时候丰富数据了。我想生成或提取额外的元数据。这些额外的元数据可以用于影响和增强搜索性能。
我们将定义一个DocumentEnricher类,其作用是接收一个文档列表(Python字典)和一个处理函数列表。这些函数将在文档的original_text列上运行,并将其输出存储在新字段中。
首先,我们使用TextRank提取关键短语。TextRank是一种基于图的算法,通过根据单词之间的关系对它们的重要性进行排序,从文本中提取关键短语和句子。
接下来,我们使用GPT-4o生成潜在问题。
最后,我们使用Spacy提取实体。
由于每个文件的代码都相当冗长且复杂,我将在这里避免重复。如果你有兴趣,文件在下面的代码示例中标记。
让我们运行数据丰富化:
好的,我会根据你的需求对这篇文章进行优化,使其更加适合初学者阅读,并确保表达清晰流畅。
代码语言:python代码运行次数:0复制# documentenricher.py
from tqdm import tqdm
class DocumentEnricher:
def __init__(self):
pass
def enrich_document(self, documents, processors, text_col='text'):
for doc in tqdm(documents, desc="Enriching documents using processors: " str(processors)):
for (processor, field) in processors:
metadata = processor(doc[text_col])
if isinstance(metadata, list):
metadata = 'n'.join(metadata)
doc.update({field: metadata})
# main.ipynb
# 初始化处理器类
nltkprocessor = NLTKProcessor() # nltk_processor.py
entity_extractor = EntityExtractor() # entity_extractor.py
gpt4o = LLMProcessor(model='gpt-4o') # llm.py
# 初始化文档增强器
documentenricher = DocumentEnricher()
# 在文档中创建新的字段 - 这些是处理器函数的输出。
processors = [
(nltkprocessor.textrank_phrases, "keyphrases"),
(gpt4o.generate_questions, "potential_questions"),
(entity_extractor.extract_entities, "entities")
]
# .enrich_document() 将会直接修改 chunked_docs。
# 为了查看结果,我们将在接下来的几个单元中打印 chunked_docs!
documentenricher.enrich_document(chunked_docs, text_col='original_text', processors=processors)让我们看看结果:
TextRank 提取的关键词
这些关键词代表了段落的核心主题。如果查询与网络安全有关,这段内容的评分将会提高。
代码语言:javascript复制print(chunked_documents[25]['keyphrases'])
'elastic agent stop', 'agent stop malware', 'stop malware ransomware', 'malware ransomware environment', 'ransomware environment wide', 'environment wide visibility', 'wide visibility threat', 'visibility threat detection', 'sep cl key', 'cl key feature'GPT-4o 生成的潜在问题
这些潜在问题可能会直接匹配用户查询,从而提高评分。我们提示 GPT-4o 生成可以用当前段落信息回答的问题。
代码语言:javascript复制print(chunked_documents[25]['potential_questions'])
1. Elastic Agent 在网络安全方面的主要功能是什么?
2. 描述 Logstash 如何在 IT 环境中贡献数据管理。
3. 列出并解释文档中提到的 Logstash 的关键特性。
4. Elastic Agent 如何增强威胁检测中的环境可见性?
5. Logstash 提供哪些超越简单数据收集的功能?
6. 文档中如何建议 Elastic Agent 阻止恶意软件和勒索软件?
7. 能否识别 Elastic Agent 和 Logstash 在集成环境中的功能关系?
8. Elastic Agent 的高级威胁检测能力对组织安全政策有何影响?
9. 比较和对比 Elastic Agent 和 Logstash 的描述功能。
10. Logstash 的集中收集能力如何支持 Elastic Agent 的威胁检测能力?Spacy 提取的实体
这些实体类似于关键词,但捕捉组织和个人的名字,而关键词提取可能会遗漏这些信息。
代码语言:javascript复制print(chunked_documents[29]['entities'])
'appdynamics', 'apm data', 'azure sentinel', 'microsoft', 'mcafee', 'broadcom', 'cisco', 'dynatrace', 'coveo', 'lucidworks'返回顶部
复合多字段嵌入
现在我们已经用额外的元数据丰富了文档,可以利用这些信息创建更强大和上下文感知的嵌入。
让我们回顾一下当前的处理进展。我们在每个文档中有四个感兴趣的字段。
代码语言:javascript复制{
"chunk": "...",
"keyphrases": "...",
"potential_questions": "...",
"entities": "..."
}每个字段代表了文档上下文的不同视角,可能突出 LLM 需要关注的关键区域。
Han Pipeline元数据增强流程
计划是对每个字段进行嵌入,然后创建这些嵌入的加权和,称为复合嵌入。
希望这个复合嵌入能使系统更加上下文感知,并引入另一个可调超参数以控制搜索行为。
首先,让我们对每个字段进行嵌入,并使用我们在 main.ipynb 中定义的嵌入模型更新每个文档。
代码语言:javascript复制# 在 embedding_model.py 中定义的嵌入模型
embedder = EmbeddingModel(model_name=HUGGINGFACE_EMBEDDING_MODEL)
cols_to_embed = ['keyphrases', 'potential_questions', 'entities']
embedding_cols = []
for col in cols_to_embed:
# 处理文本输入
embedding_col = embedder.embed_documents_text_wise(chunked_documents, text_field=col)
embedding_cols.append(embedding_col)
# 处理 token 输入
embedding_col = embedder.embed_documents_token_wise(chunked_documents, token_field="chunk")
embedding_cols.append(embedding_col)每个嵌入函数返回嵌入的字段,该字段是原始输入字段加上 _embedding 后缀。
现在让我们定义复合嵌入的权重:
代码语言:javascript复制embedding_cols = [
'keyphrases_embedding',
'potential_questions_embedding',
'entities_embedding',
'chunk_embedding'
]
combination_weights = [
0.1,
0.15,
0.05,
0.7
]权重允许你根据使用情况和数据质量为每个组件分配优先级。直观上,这些权重的大小取决于每个组件的语义价值。由于 chunk 文本本身最为丰富,我分配了 70% 的权重。因为实体是最小的,只是组织或个人名称的列表,所以我分配了 5% 的权重。这些值的精确设置需要根据具体使用情况进行实证确定。
最后,让我们编写一个函数来应用这些权重,并创建我们的复合嵌入。同时删除所有组件嵌入以节省空间。
代码语言:javascript复制from tqdm import tqdm
def combine_embeddings(objects, embedding_cols, combination_weights, primary_embedding='primary_embedding'):
# 确保权重数量与嵌入列数量匹配
assert len(embedding_cols) == len(combination_weights), "嵌入列数量必须与权重数量匹配"
# 归一化权重使其总和为 1
weights = np.array(combination_weights) / np.sum(combination_weights)
for obj in tqdm(objects, desc="Combining embeddings"):
# 初始化复合嵌入
combined = np.zeros_like(obj[embedding_cols[0]])
# 计算加权和
for col, weight in zip(embedding_cols, weights):
combined = weight * np.array(obj[col])
# 将新的复合嵌入添加到对象中
obj.update({primary_embedding: combined.tolist()})
# 删除原始嵌入列
for col in embedding_cols:
obj.pop(col, None)
combine_embeddings(chunked_documents, embedding_cols, combination_weights)至此,我们完成了文档处理。现在我们有一个文档对象列表,它们的结构如下:
代码语言:javascript复制{
'id_': '7fe71686-5cd0-4831-9e79-998c6dbeae0c',
'chunk': [2312, 14613, ...],
'original_text': 'if an emerging growth company, indicate by check mark if the registrant has elected not to use the extended ...',
'chunk_index': 3,
'chunk_token_count': 399,
'metadata': {
'page_label': '3',
'file_name': 'Elastic_NV_Annual-Report-Fiscal-Year-2023.pdf',
...
},
'keyphrases': 'sep cl unkncheck mark registrantncl unk indicatenunk indicate checknindicate check marknprincipal executive officenaccelerate filer unkncompany unk emergenunk emerge growthnemerge growth company',
'potential_questions': '1. What are the different types of registrant statuses mentioned in the document?n2. Under what section of the Sarbanes-Oxley Act must registrants file a report on the effectiveness of their internal ...',
'entities': 'the effectiveness ofnsection 13nSEPnUNKnsection 21en1934n1933nu. s. c.nsection 404nsection 12nal',
'primary_embedding': [-0.3946287803351879, -0.17586839850991964, ...]
}索引到 Elastic
让我们将文档批量上传到 Elasticsearch。为此,我早在 elastic_helpers.py 中定义了一组 Elastic 辅助函数。这是一段很长的代码,所以我们只看函数调用。
es_bulk_indexer.bulk_upload_documents 适用于任何字典对象列表,利用 Elasticsearch 的动态映射。
# 初始化 Elasticsearch
ELASTIC_CLOUD_ID = os.environ.get('ELASTIC_CLOUD_ID')
ELASTIC_USERNAME = os.environ.get('ELASTIC_USERNAME')
ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD')
ELASTIC_CLOUD_AUTH = (ELASTIC_USERNAME, ELASTIC_PASSWORD)
es_bulk_indexer = ESBulkIndexer(cloud_id=ELASTIC_CLOUD_ID, credentials=ELASTIC_CLOUD_AUTH)
es_query_maker = ESQueryMaker(cloud_id=ELASTIC_CLOUD_ID, credentials=ELASTIC_CLOUD_AUTH)
# 定义索引名称
index_name = os.environ.get('ELASTIC_INDEX_NAME')
# 创建索引并批量上传
index_exists = es_bulk_indexer.check_index_existence(index_name=index_name)
if not index_exists:
logger.info(f"Creating new index: {index_name}")
es_bulk_indexer.create_es_index(es_configuration=BASIC_CONFIG, index_name=index_name)
success_count = es_bulk_indexer.bulk_upload_documents(
index_name=index_name,
documents=chunked_documents,
id_col='id_',
batch_size=32
)前往 Kibana 验证所有文档已被索引。应该有 224 个文档。对于如此大的文档来说,这个结果还不错!
Han Pipeline在 Kibana 中索引的年度报告文档


