使用 LangGraph 构建生产级的自适应智能体 RAG

作者:Piyush Agnihotri
发布日期:2025年7月21日
原文:https://ai.plainenglish.io/building-agentic-rag-with-langgraph-mastering-adaptive-rag-for-production-c2c4578c836a
源码:https://github.com/piyushagni5/langgraph-ai/tree/main/agentic-rag/agentic-rag-systems


构建能够智能判断何时检索文档、何时搜索网页或直接生成回答的智能 RAG 系统

自适应 RAG 是一种先进的检索增强生成(Retrieval-Augmented Generation)策略,它智能地将动态查询分析与自我纠正机制相结合,以优化响应的准确性。

目录:

  1. 引言
  2. 自适应 RAG
    2.1 理解自适应 RAG 工作流程
  3. 我们要构建什么?
  4. 项目结构
    4.1 使用 UV 设置环境
  5. 实现指南
    步骤 1:定义状态管理系统和基本常量
    步骤 2:定义聊天模型和嵌入模型
    步骤 3:构建文档摄入管道
    步骤 4:构建查询路由链
    步骤 5:创建文档检索…

1. 引言

检索增强生成(RAG)已经彻底改变了我们构建能够访问外部知识并进行推理的 AI 系统的方式。然而,随着应用变得越来越复杂,传统 RAG 方法的局限性也日益凸显。如今,我们正在见证从简单的线性 RAG 管道向智能、自适应系统的演进,这些系统能够根据查询的复杂性和上下文动态调整其检索和生成策略。

在这份全面的指南中,我们将探讨智能体 RAG (Agentic RAG) 系统,特别关注使用 LangGraph 和 Google Gemini 实现自适应 RAG。但在我们深入实现之前,让我们先了解一下自适应 RAG。

2. 自适应 RAG

自适应 RAG 是一种先进的 RAG 策略,它智能地结合了(1)动态查询分析和(2)主动/自我纠正机制

自适应 RAG 代表了最复杂的演进,它解决了一个根本性的洞见:并非所有查询都是生而平等的。研究表明,现实世界中的查询在复杂程度上表现出巨大差异:

  • 简单查询:“巴黎是哪个国家的首都?”——可以直接由大语言模型(LLM)回答
  • 多跳查询:“占领马拉科夫的人是什么时候来到菲利普斯堡所在地区的?”——需要四个推理步骤

不同检索增强LLM问答方法的概念比较。(A) 针对一个查询,这种单步方法检索相关文档然后生成答案。然而,对于需要多步推理的复杂查询可能不足。(B) 这种多步方法迭代地检索文档并生成中间答案,功能强大但对于简单查询来说效率低下,因为它需要多次访问LLM和检索器。(c) 我们的自适应方法可以根据我们分类器确定的查询复杂性,为检索增强LLM选择最合适的策略,范围从迭代式、单步式甚至无检索方法。| 图片来源:Soyeong Jeong et al., 2024

核心问题:正如比较图所示:

  • (A) 单步方法:对简单查询高效,但不足以应对复杂的多跳推理
  • (B) 多步方法:对复杂查询功能强大,但对简单查询引入了不必要的计算开销
  • (C) 自适应方法:使用查询复杂性分类器将每个查询路由到最合适的策略

2.1 理解自适应 RAG 工作流程

观察实现图,我们可以看到自适应 RAG 是如何通过一个基于图的状态管理系统来编排一个智能决策过程的:

图片来源:langchain-ai

1. 查询路由与分类

系统从一个经过训练的复杂性分类器开始,该分类器分析传入的问题。这不仅仅是简单的关键词匹配;它是一个复杂的评估,用以确定:

  • 查询是否根本不需要检索(参数化知识足够)
  • 如果需要检索,需要何种程度的复杂性
  • 最佳策略范围从无检索、单步到多跳方法

2. 动态知识获取策略

基于复杂性分类,系统智能地在以下选项之间路由:

  • 基于索引的检索:用于可从现有知识库回答的查询
  • 网页搜索:用于需要新鲜信息或当本地检索失败时的查询
  • 无检索:用于可直接从模型的参数化知识回答的查询

3. 多阶段质量保证

系统在多个决策点实施全面的评估:

  • 文档相关性评估:使用置信度评分来评估检索质量
  • 幻觉检测:验证生成的答案是否基于检索到的证据
  • 答案质量评估:确保响应充分解决了原始问题

3. 我们要构建什么?

在本文中,我们探讨开发一个先进的检索增强生成(RAG)系统,该系统能够就信息检索和路由做出智能决策。此实现展示了能够智能路由用户问题的复杂查询分析机制,建立强大的评估框架以确保响应质量,并创建能够在多个信息源之间无缝切换的自适应架构。

这份全面的指南提出了对原始 LangChain 实现 的重构方法,优先考虑增强代码可读性、改进可维护性和卓越的开发者体验。该实现受到 Marco 的 GitHub 仓库 的启发,该仓库本身也参考了 mistralai 的 GitHub 仓库 的工作。我们将使用以下技术实现一个自适应 RAG 系统:

  • LangGraph 用于编排复杂、有状态的工作流程
  • Google Gemini 作为我们的主要语言模型
  • 向量数据库 用于高效的文档检索
  • 网页搜索集成 用于实时信息访问
  • 全面的评估框架 用于质量保证

自适应 RAG 工作流程

4. 项目结构

在深入实现之前,让我们了解一下我们将要构建的完整项目结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
building-adaptive-rag/
├── src/ # 源代码
│ ├── workflow/ # 核心工作流逻辑
│ │ ├── chains/ # LLM 处理链
│ │ │ ├── answer_grader.py
│ │ │ ├── generation.py
│ │ │ ├── hallucination_grader.py
│ │ │ ├── retrieval_grader.py
│ │ │ └── router.py
│ │ ├── nodes/ # 工作流节点
│ │ │ ├── generate.py
│ │ │ ├── grade_documents.py
│ │ │ ├── retrieve.py
│ │ │ └── web_search.py
│ │ ├── consts.py # 节点常量
│ │ ├── graph.py # 主要工作流编排
│ │ └── state.py # 状态管理
│ ├── cli/ # 命令行界面
│ │ └── main.py # 交互式 CLI
│ └── models/ # 模型配置
│ └── model.py # LLM 和嵌入模型
├── data/ # 数据处理
│ └── ingestion.py # 文档摄入和向量存储
├── assets/ # 静态文件和图片
│ ├── LangChain-logo.png
│ └── Langgraph Adaptive Rag.png
├── tests/ # 测试文件
│ ├── __init__.py
│ └── test_chains.py # 链测试套件
├── .env # 环境变量
├── .gitignore
├── main.py # 应用入口点
├── README.md
└── requirements.txt

这种结构遵循一个清晰、模块化的架构,其中每个组件都有特定的职责。workflow 目录包含所有核心逻辑,chains 处理 LLM 操作,nodes 管理工作流步骤,而主要的 graph.py 则编排整个系统。

4.1 使用 UV 设置环境

让我们首先使用 UV 来设置我们的开发环境,UV 是一个快速的 Python 包管理器。首先,确保你已经安装了 UV:

1
curl -LsSf <https://astral.sh/uv/install.sh> | sh

现在,让我们创建我们的项目目录并设置虚拟环境:

1
2
3
4
5
mkdir building-adaptive-rag
cd building-adaptive-rag

uv venv --python 3.10
source .venv/bin/activate # 在 Windows 上是: .venv\Scripts\activate

创建一个 requirements.txt 文件,包含所有必要的依赖项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
beautifulsoup4
langchain-community
tiktoken
langchainhub
langchain
langgraph
tavily-python
langchain-openai
python-dotenv
black
isort
pytest
langchain-chroma
langchain-tavily==0.1.5
langchain_aws
langchain_google_genai

使用 UV 安装所有依赖项:

1
uv pip install -r requirements.txt

通过创建一个 .env 文件来设置你的环境变量:

1
2
3
4
5
6
GOOGLE_API_KEY=your_google_api_key_here
TAVILY_API_KEY=your_tavily_api_key_here # 用于网页搜索功能
LANGCHAIN_API_KEY=your_langchain_api_key_here # 可选,用于追踪
LANGCHAIN_TRACING_V2=true # 可选
LANGCHAIN_ENDPOINT=https://api.smith.langchain.com # 可选
LANGCHAIN_PROJECT=agentic-rag # 可选

5. 实现指南

现在,让我们一步步构建我们的智能体 RAG 系统。我将按照最有助于理解系统流程的顺序来解释每个组件。

步骤 1:定义状态管理系统和基本常量

我们从系统的基础——状态管理开始。这至关重要,因为它定义了信息如何在我们的图中流动。

创建 src/workflow/state.py:

1
2
3
4
5
6
7
8
9
from typing import List, TypedDict


class GraphState(TypedDict):
"""包含查询、文档和控制标志的工作流状态对象。"""
question: str # 用户的原始查询
generation: str # LLM生成的响应
web_search: bool # 用于网页搜索需求的控制标志
documents: List[str] # 检索到的文档上下文

这个 GraphState 类作为流经我们图工作流中每个节点的中央数据结构。question 字段持有用户的输入查询,generation 存储 LLM 的响应,web_search 是一个布尔标志,决定我们是否需要搜索网页以获取额外信息,documents 包含从本地和网页源检索到的所有文档。

通过使用 TypedDict,我们确保了类型安全,同时保持了我们动态工作流所需的灵活性。

创建 src/workflow/consts.py:

1
2
3
4
5
# 工作流节点标识符
RETRIEVE = "retrieve"
GRADE_DOCUMENTS = "grade_documents"
GENERATE = "generate"
WEBSEARCH = "websearch"

这些常量定义了我们图节点的名称,并有助于在整个代码库中保持一致性。拥有集中的常量使得重构更加容易,并减少了在工作流定义中引用节点名称时出现拼写错误的风险。

步骤 2:定义聊天模型和嵌入模型

创建 src/models/model.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from dotenv import load_dotenv
# from langchain_openai import ChatOpenAI, OpenAIEmbeddings
# from langchain_aws import ChatBedrock
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings

load_dotenv()

### 聊天模型
# llm_model = ChatOpenAI(temperature=0)
# llm_model = ChatBedrock(model_id="anthropic.claude-sonnet-4-20250514-v1:0", region_name="us-west-2", temperature=0)
llm_model = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0,
)

### 嵌入模型
embed_model = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004")

步骤 3:构建文档摄入管道

创建 data/ingestion.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import os
from dotenv import load_dotenv
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from src.models.model import embed_model

load_dotenv()

def create_vectorstore():
"""创建或加载用于文档检索的向量存储。"""
chroma_path = "./chroma_langchain_db"

if os.path.exists(chroma_path):
print("正在加载现有向量存储...")
vectorstore = Chroma(
persist_directory=chroma_path,
embedding_function=embed_model,
collection_name="rag-chroma",
)
return vectorstore.as_retriever()

print("正在创建新的向量存储...")
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=250,
chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

vectorstore = Chroma.from_documents(
documents=doc_splits,
collection_name="rag-chroma",
embedding=embed_model,
persist_directory=chroma_path,
)

print("向量存储已创建!")
return vectorstore.as_retriever()

retriever = create_vectorstore()

这个摄入管道构成了我们本地知识库的骨干。我们首先加载环境变量,然后定义一个包含关于 AI 智能体、提示工程和对抗性攻击高质量内容的精选 URL 列表。

WebBaseLoader 从这些 URL 获取内容并将其加载到文档对象中。然后我们使用 RecursiveCharacterTextSplitter 将这些文档分解成更小、易于管理的 250 个 token 的块,这对于嵌入和检索是最佳的。分割器使用 tiktoken 编码来确保准确的 token 计数。

最后,我们创建一个 Chroma 向量存储,它将在本地持久化我们的嵌入,使用 Google 的 text-embedding-004 模型 来获得高质量的语义表示。

步骤 4:构建查询路由链

创建 graph/chains/router.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
from src.models.model import llm_model

class RouteQuery(BaseModel):
"""将用户查询路由到最相关的数据源。"""

datasource: Literal["vectorstore", "websearch"] = Field(
...,
description="根据用户问题选择将其路由到网页搜索或向量存储。",
)

llm = llm_model

structured_llm_router = llm.with_structured_output(RouteQuery)

system = """您是一位专家,擅长将用户问题路由到向量存储或网页搜索。
向量存储包含与智能体、提示工程和对抗性攻击相关的文档。
对于这些主题的问题,请使用向量存储。对于所有其他问题,请使用网页搜索。"""

route_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "{question}"),
]
)

question_router = route_prompt | structured_llm_router

查询路由器是我们系统的第一个决策点,它决定了回答用户问题的最佳来源。我们定义了一个 RouteQuery Pydantic 模型,将路由器的输出限制为 “vectorstore” 或 “websearch”,确保对 LLM 的决策进行可靠的解析。

系统提示明确定义了我们本地知识库的范围,指示路由器对关于智能体、提示工程和对抗性攻击的问题使用向量存储,而将其他所有内容路由到网页搜索。

这种智能路由避免了对我们拥有全面本地知识的主题进行不必要的网页搜索。

步骤 5:创建文档检索评分器

创建 src/workflow/chains/retrieval_grader.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from src.models.model import llm_model

class GradeDocuments(BaseModel):
"""用于对检索到的文档进行相关性检查的二元评分。"""

binary_score: str = Field(
description="文档与问题相关,'yes' 或 'no'"
)

llm = llm_model
structured_llm_grader = llm.with_structured_output(GradeDocuments)

system = """您是一位评估员,正在评估检索到的文档与用户问题的相关性。\n
如果文档包含与问题相关的关键词或语义含义,请将其评为相关。\n
给出一个二元分数 'yes' 或 'no' 来表明文档是否与问题相关。"""

grade_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "检索到的文档: \n\n {document} \n\n 用户问题: {question}"),
]
)

retrieval_grader = grade_prompt | structured_llm_grader

检索评分器 作为一个质量控制机制,评估检索到的文档是否真的与用户的问题相关。这个组件至关重要,因为仅凭向量相似性并不能保证相关性;文档可能在语义上相似,但在上下文中可能不合适。

GradeDocuments 模型确保我们从 LLM 得到一个清晰的二元决策。系统提示指示评分器寻找明确的关键词和语义含义,提供全面的相关性评估。这个评分步骤防止了不相关的文档污染我们的生成过程,并在本地文档不足时触发网页搜索。

步骤 6:构建幻觉检测系统

创建 src/workflow/chains/hallucination_grader.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableSequence
from pydantic import BaseModel, Field
from src.models.model import llm_model

class GradeHallucinations(BaseModel):
"""用于判断生成答案中是否存在幻觉的二元评分。"""

binary_score: bool = Field(
description="答案是否基于事实,'yes' 或 'no'"
)

llm = llm_model
structured_llm_grader = llm.with_structured_output(GradeHallucinations)

system = """您是一位评估员,正在评估 LLM 的生成内容是否基于/得到一组检索到的事实的支持。\n
给出一个二元分数 'yes' 或 'no'。'Yes' 表示答案是基于/得到这组事实的支持。"""

hallucination_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "一组事实: \n\n {documents} \n\n LLM 生成内容: {generation}"),
]
)

hallucination_grader: RunnableSequence = hallucination_prompt | structured_llm_grader

幻觉评分器 可能是确保我们 RAG 系统可靠性最关键的组件。它将生成的响应与检索到的文档进行比较,以验证信息是否基于事实。

这可以防止系统生成听起来合理但实际上不正确的响应。评分器使用布尔值分数来表示生成内容是否得到所提供事实的支持。

当检测到幻觉时,我们的系统可以触发重新生成或寻求额外信息,确保用户收到准确可靠的响应。

步骤 7:创建答案质量评分器

创建 src/workflow/chains/answer_grader.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableSequence
from pydantic import BaseModel, Field
from src.models.model import llm_model

class GradeAnswer(BaseModel):

binary_score: bool = Field(
description="答案是否解决了问题,'yes' 或 'no'"
)

llm = llm_model
structured_llm_grader = llm.with_structured_output(GradeAnswer)

system = """您是一位评估员,正在评估一个答案是否解决了一个问题。\n
给出一个二元分数 'yes' 或 'no'。'Yes' 表示答案解决了问题。"""
answer_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "用户问题: \n\n {question} \n\n LLM 生成内容: {generation}"),
]
)

answer_grader: RunnableSequence = answer_prompt | structured_llm_grader

答案评分器评估生成的响应是否真正解决了用户的问题。即使一个响应在事实上是准确的,它也可能没有直接回答用户所问的问题。

这个组件确保我们的系统提供既准确又与特定查询相关的响应。评分器检查生成内容是否解决了问题,如果没解决,系统可以触发额外的检索或网页搜索以找到更合适的信息。

步骤 8:构建生成链

创建 src/workflow/chains/generation.py:

1
2
3
4
5
6
7
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from src.models.model import llm_model

llm = llm_model
prompt = hub.pull("rlm/rag-prompt")
generation_chain = prompt | llm | StrOutputParser()

生成链负责创建对用户问题的实际响应。我们利用 LangChain Hub 中一个经过验证的 RAG 提示,该提示已为检索增强生成任务进行了优化。这个提示模板知道如何有效地将检索到的上下文与用户的问题相结合,以生成连贯、信息丰富的响应。

该链使用 StrOutputParser 确保我们得到干净的字符串输出,可以被后续组件轻松处理。

步骤 9:实现检索节点

创建 src/workflow/nodes/retrieve.py:

1
2
3
4
5
6
7
8
9
10
11
from typing import Any, Dict
from src.workflow.state import GraphState
from data.ingestion import retriever


def retrieve(state: GraphState) -> Dict[str, Any]:
"""从向量存储中检索文档。"""
print("---RETRIEVE---")
question = state["question"]
documents = retriever.invoke(question)
return {"documents": documents, "question": question}

检索节点是我们工作流程的第一步,它从我们的本地向量存储中获取相关文档。它从状态中获取用户的问题,并使用我们预先配置的检索器来查找语义上最相似的文档。

该函数返回检索到的文档和原始问题,为工作流程中的下一个节点更新状态。此节点代表了传统的 RAG 检索步骤,但通过我们后续的评分和决策过程得到了增强。

步骤 10:创建文档评分节点

创建 src/workflow/nodes/grade_documents.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from typing import Any, Dict
from src.workflow.chains.retrieval_grader import retrieval_grader
from src.workflow.state import GraphState


def grade_documents(state: GraphState) -> Dict[str, Any]:
"""
确定检索到的文档是否与问题相关。
如果任何文档不相关,我们将设置一个标志以运行网页搜索。

Args:
state (dict): 当前图状态

Returns:
state (dict): 过滤掉不相关文档并更新 web_search 状态
"""

print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]

filtered_docs = []
web_search = False
for d in documents:
score = retrieval_grader.invoke(
{"question": question, "document": d.page_content}
)
grade = score.binary_score
if grade.lower() == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
web_search = True
continue
return {"documents": filtered_docs, "question": question, "web_search": web_search}

文档评分节点通过评估每个检索到的文档与用户问题的相关性来实现我们的质量控制机制。它遍历所有检索到的文档,并使用我们的检索评分器来评估其相关性。被认为是相关的文档会被添加到过滤后的列表中,而不相关的文档则被丢弃。

重要的是,如果发现任何文档不相关,该节点会将 web_search 标志设置为 True,这表明我们需要来自外部源的额外信息。这种自适应行为确保了即使用户在本地知识不足的情况下也能得到全面的答案。

步骤 11:构建网页搜索节点

创建 src/workflow/nodes/web_search.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from typing import Any, Dict
from dotenv import load_dotenv
from langchain.schema import Document
from langchain_tavily import TavilySearch
from src.workflow.state import GraphState

load_dotenv()

web_search_tool = TavilySearch(max_results=3)

def web_search(state: GraphState) -> Dict[str, Any]:
print("---WEB SEARCH---")
question = state["question"]

# 初始化文档 - 这是缺失的部分!
documents = state.get("documents", []) # 获取现有文档或空列表

tavily_results = web_search_tool.invoke({"query": question})["results"]
joined_tavily_result = "\n".join(
[tavily_result["content"] for tavily_result in tavily_results]
)
web_results = Document(page_content=joined_tavily_result)

# 将网页结果添加到现有文档中(如果文档为空则创建新列表)
if documents:
documents.append(web_results)
else:
documents = [web_results]

return {"documents": documents, "question": question}

网页搜索节点通过查询互联网获取当前和全面的信息,将我们系统的知识扩展到本地向量存储之外。它使用 Tavily,一个为 AI 应用优化的搜索 API,来查找最相关的网页结果。

该节点检索最多 3 个结果,将其内容合并成一个单独的文档,并将其添加到我们的文档收集中。这种混合方法确保我们的系统既能处理使用本地知识的领域特定查询,也能处理需要来自网络的当前信息的通用查询。

步骤 12:实现生成节点

创建 src/workflow/nodes/generate.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
from typing import Any, Dict
from src.workflow.chains.generation import generation_chain
from src.workflow.state import GraphState


def generate(state: GraphState) -> Dict[str, Any]:
"""使用文档和问题生成答案。"""
print("---GENERATE---")
question = state["question"]
documents = state["documents"]
generation = generation_chain.invoke({"context": documents, "question": question})
return {"documents": documents, "question": question, "generation": generation}

生成节点是我们系统创建对用户问题的实际响应的地方。它接收用户的问题和所有收集到的文档(来自本地检索和网页搜索),并使用我们的生成链来创建一个全面的答案。该节点在状态中维护所有上下文信息,同时添加生成的响应,使后续节点能够评估生成的质量和准确性。

步骤 13:构建完整的图工作流

创建 src/workflow/graph.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from dotenv import load_dotenv
from langgraph.graph import END, StateGraph
from src.workflow.chains.answer_grader import answer_grader
from src.workflow.chains.hallucination_grader import hallucination_grader
from src.workflow.chains.router import RouteQuery, question_router
from src.workflow.consts import GENERATE, GRADE_DOCUMENTS, RETRIEVE, WEBSEARCH
from src.workflow.nodes.generate import generate
from src.workflow.nodes.grade_documents import grade_documents
from src.workflow.nodes.retrieve import retrieve
from src.workflow.nodes.web_search import web_search
from src.workflow.state import GraphState

load_dotenv()


def decide_to_generate(state):
"""路由到网页搜索或生成。"""
print("---ASSESS DOCUMENTS---")
return WEBSEARCH if state["web_search"] else GENERATE


def grade_generation_grounded_in_documents_and_question(state):
"""评估答案质量和事实依据。"""
print("---CHECK HALLUCINATIONS---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]

# 检查是否基于事实
score = hallucination_grader.invoke({"documents": documents, "generation": generation})

if score.binary_score:
# 检查是否有用
score = answer_grader.invoke({"question": question, "generation": generation})
return "useful" if score.binary_score else "not useful"
else:
return "not supported"


def route_question(state: GraphState) -> str:
"""将问题路由到向量存储或网页搜索。"""
print("---ROUTE QUESTION---")
source: RouteQuery = question_router.invoke({"question": state["question"]})
return WEBSEARCH if source.datasource == WEBSEARCH else RETRIEVE


# 构建工作流
workflow = StateGraph(GraphState)
workflow.add_node(RETRIEVE, retrieve)
workflow.add_node(GRADE_DOCUMENTS, grade_documents)
workflow.add_node(GENERATE, generate)
workflow.add_node(WEBSEARCH, web_search)

workflow.set_conditional_entry_point(
route_question,
{WEBSEARCH: WEBSEARCH, RETRIEVE: RETRIEVE},
)

workflow.add_edge(RETRIEVE, GRADE_DOCUMENTS)
workflow.add_conditional_edges(
GRADE_DOCUMENTS,
decide_to_generate,
{WEBSEARCH: WEBSEARCH, GENERATE: GENERATE},
)
workflow.add_conditional_edges(
GENERATE,
grade_generation_grounded_in_documents_and_question,
{"not supported": GENERATE, "useful": END, "not useful": WEBSEARCH},
)
workflow.add_edge(WEBSEARCH, GENERATE)

app = workflow.compile()

# 导出图可视化
app.get_graph().draw_mermaid_png(output_file_path="graph.png")

这是我们智能体 RAG 系统的核心,我们将所有组件编排成一个连贯的工作流。

  • 图从一个条件入口点开始,根据路由器的决定将问题路由到网页搜索或本地检索。
  • 工作流包括三个关键的决策函数,实现了我们的自适应逻辑。decide_to_generate 函数根据文档相关性分数决定是继续生成还是搜索网页。
  • grade_generation_grounded_in_documents_and_question 函数通过检查幻觉和答案相关性来实现我们的自我纠正机制,可能会触发重新生成或额外搜索。
  • route_question 函数处理初始查询路由。

编译后的工作流创建了一个强大的系统,可以根据每一步信息的质量调整其行为,确保响应可靠且全面。

自适应 RAG 工作流

步骤 14:创建主应用程序入口点

创建 src/cli/main.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from dotenv import load_dotenv
from src.workflow.graph import app

load_dotenv()


def format_response(result):
"""从工作流结果中提取响应。"""
if isinstance(result, dict) and "generation" in result:
return result["generation"]
elif isinstance(result, dict) and "answer" in result:
return result["answer"]
else:
return str(result)


def main():
"""自适应 RAG 系统的 CLI。"""
print("Adaptive RAG System")
print("输入 'quit' 退出。\n")

while True:
try:
question = input("Question: ").strip()

if question.lower() in ['quit', 'exit', 'q', '']:
break

print("正在处理...")
result = None
for output in app.stream({"question": question}):
for key, value in output.items():
result = value

if result:
print(f"\nAnswer: {format_response(result)}")
else:
print("没有生成响应。")

except KeyboardInterrupt:
break
except Exception as e:
print(f"错误: {str(e)}")


if __name__ == "__main__":
main()

主应用程序文件为测试和运行我们的智能体 RAG 系统提供了一个简单的入口点。它加载环境变量,导入我们编译好的图应用程序,并通过终端演示高级 RAG 聊天机器人的工作方式。

创建 main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from src.cli.main import main

if __name__ == "__main__":
main()```

### 步骤 15:全面的测试套件

**创建 `tests/test_chains.py`**:

``` python
from pprint import pprint
import pytest
from dotenv import load_dotenv

load_dotenv()

from src.workflow.chains.generation import generation_chain
from src.workflow.chains.hallucination_grader import (GradeHallucinations, hallucination_grader)
from src.workflow.chains.retrieval_grader import GradeDocuments, retrieval_grader
from src.workflow.chains.router import RouteQuery, question_router
from data.ingestion import retriever


def test_retrival_grader_answer_yes() -> None:
"""测试检索评分器识别相关文档。"""
question = "什么是检索增强生成?"
docs = retriever.invoke(question)

doc_txt = docs[1].page_content

res: GradeDocuments = retrieval_grader.invoke(
{"question": question, "document": doc_txt}
)

assert res.binary_score == "yes"


def test_retrival_grader_answer_no() -> None:
"""测试检索评分器识别不相关文档。"""
question = "什么是检索增强生成?"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content

res: GradeDocuments = retrieval_grader.invoke(
{"question": "如何用蘑菇煮意大利面", "document": doc_txt}
)

assert res.binary_score == "no"


def test_generation_chain() -> None:
"""测试生成链产生答案。"""
question = "语言模型是如何工作的?"
docs = retriever.invoke(question)
generation = generation_chain.invoke({"context": docs, "question": question})
pprint(generation)


def test_hallucination_grader_answer_yes() -> None:
"""测试幻觉评分器识别基于事实的内容。"""
question = "向量数据库有什么好处?"
docs = retriever.invoke(question)

generation = generation_chain.invoke({"context": docs, "question": question})
res: GradeHallucinations = hallucination_grader.invoke(
{"documents": docs, "generation": generation}
)
assert res.binary_score


def test_hallucination_grader_answer_no() -> None:
"""测试幻觉评分器识别幻觉内容。"""
question = "向量数据库有什么好处?"
docs = retriever.invoke(question)

res: GradeHallucinations = hallucination_grader.invoke(
{
"documents": docs,
"generation": "要烤一个完美的巧克力蛋糕,你需要将烤箱预热到350度",
}
)
assert not res.binary_score


def test_router_to_vectorstore() -> None:
"""测试路由器将AI查询导向向量存储。"""
question = "RAG和微调有什么区别?"

res: RouteQuery = question_router.invoke({"question": question})
assert res.datasource == "vectorstore"


def test_router_to_websearch() -> None:
"""测试路由器将通用查询导向网页搜索。"""
question = "东京现在天气如何?"

res: RouteQuery = question_router.invoke({"question": question})
assert res.datasource == "websearch"

这个全面的测试套件独立验证我们智能体 RAG 系统的每个组件。

  • 测试涵盖了文档相关性评分的正反两种情况,确保我们的评分器能正确识别相关和不相关的文档。
  • 幻觉检测测试验证了我们的系统能够区分基于事实的响应和虚构的响应。
  • 路由器测试确认了问题能根据其内容被正确地路由到向量存储或网页搜索。

这些测试将确保每个组件在集成到完整工作流之前都能独立正常工作。

6. 运行系统

要运行你的智能体 RAG 系统:

运行主应用程序:

1
2
3
4
# 确保你处于虚拟环境中
source .venv/bin/activate

python main.py

预期的界面和响应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
(building-adaptive-rag) piyushagni5@Piyushs-MacBook-Pro building-adaptive-rag % python main.py
正在加载现有向量存储...
Adaptive RAG System
输入 'quit' 退出。

Question: what is agent memory?
Processing...
---ROUTE QUESTION---
---RETRIEVE---
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS DOCUMENTS---
---WEB SEARCH---
---GENERATE---
---CHECK HALLUCINATIONS---

Answer: Agent memory enables AI agents to maintain persistent states, learn from interactions, and develop long-term relationships with users. It allows AI agents to store and recall past experiences to improve decision-making, perception, and overall performance. Agent memory includes short-term memory for in-context learning and long-term memory for retaining information over extended periods.
Question:

让我们根据执行跟踪和代码分析来理解幕后发生了什么。

分步工作流执行

  1. ---ROUTE QUESTION---: 路由器确定“什么是智能体记忆?”应首先发送到向量存储 (RETRIEVE) 而不是网页搜索。
  2. ---RETRIEVE---: 系统从向量存储中检索了 4 个文档
  3. ---CHECK DOCUMENT RELEVANCE TO QUESTION---: 每个文档都进行了相关性评分:文档 1 → 相关,文档 2 → 相关,文档 3 → 不相关,文档 4 → 不相关
  4. ---ASSESS DOCUMENTS---: 由于某些文档不相关,系统设置了 web_search = True
  5. ---WEB SEARCH---: 系统使用 Tavily 进行了网页搜索,并将结果添加到了相关的向量存储文档中。
  6. ---GENERATE---: 最终答案是使用相关的向量存储文档和网页搜索结果生成的。
  7. ---CHECK HALLUCINATIONS---: 系统验证了答案是基于检索到的来源的。

所以,如果我们总结所有步骤,我们可以说最终结果使用了两种来源。最终答案是以下内容的组合:

  • 来自向量存储的 2 个相关文档
  • 网页搜索结果(因为一些向量存储文档不相关而被添加)

让我们看看系统如何组合来源。在 web_search 节点中,你可以看到关键逻辑:

1
2
3
4
5
# 将网页结果添加到现有文档中(如果文档为空则创建新列表)
if documents:
documents.append(web_results) # ← 将网页结果添加到现有的相关文档中
else:
documents = [web_results]

然后在 generate.py 中,系统使用所有文档生成答案:

1
2
generation = generation_chain.invoke({"context": documents, "question": question})
最终答案来源于:
  • 50% 向量存储(2 个关于智能体记忆的相关文档)
  • 50% 网页搜索(关于智能体记忆的新鲜网页结果)

这种自适应行为是 RAG 系统的关键特性。它智能地用最新的网页搜索数据来补充不足或部分相关的向量存储结果,确保提供最全面、最准确的响应。

执行测试套件

最后,我们可以运行测试脚本来验证所有节点和功能是否正常工作。在运行测试套件之前,请确保你的虚拟环境已激活:

1
python -m pytest tests/ -v

预期响应

7. 结论

使用 LangGraph 构建智能体 RAG 系统代表了对传统 RAG 方法的重大进步。通过融入智能决策、自我反思和自适应策略,我们创建了能够推理信息质量、调整检索策略并在必要时自我纠正的系统。

这个实现为需要可靠、准确和智能信息检索与生成的生产级应用程序提供了坚实的基础。模块化设计允许根据特定的用例和需求轻松扩展和定制。

8. 局限性/后续步骤

为了进一步增强此系统,请考虑:

  • LLM 回退状态:添加一个 llm_fallback 状态来处理与网页搜索和向量存储内容都无关的查询。例如,当用户说“嗨”或提出一般的对话性问题时,系统应路由到直接的 LLM 响应,而不是尝试检索或网页搜索。
  • 增强的路由器 (router.py):
    → 在 RouteQuery 模型中添加 “llm_fallback“ 作为第三个路由选项
    → 更新系统提示,为何时使用这三种路由提供明确的指导:向量存储(用于文档特定查询)、网页搜索(用于当前信息需求)和 LLM 回退(用于一般对话、问候或超出存储文档和网页搜索范围的查询)