作者:Piyush Agnihotri 发布日期:2025年7月21日 原文:https://ai.plainenglish.io/building-agentic-rag-with-langgraph-mastering-adaptive-rag-for-production-c2c4578c836a 源码:https://github.com/piyushagni5/langgraph-ai/tree/main/agentic-rag/agentic-rag-systems
构建能够智能判断何时检索文档、何时搜索网页或直接生成回答的智能 RAG 系统
自适应 RAG 是一种先进的检索增强生成(Retrieval-Augmented Generation)策略,它智能地将动态查询分析与自我纠正机制相结合,以优化响应的准确性。
目录:
引言
自适应 RAG 2.1 理解自适应 RAG 工作流程
我们要构建什么?
项目结构 4.1 使用 UV 设置环境
实现指南 步骤 1:定义状态管理系统和基本常量 步骤 2:定义聊天模型和嵌入模型 步骤 3:构建文档摄入管道 步骤 4:构建查询路由链 步骤 5:创建文档检索…
1. 引言 检索增强生成(RAG)已经彻底改变了我们构建能够访问外部知识并进行推理的 AI 系统的方式。然而,随着应用变得越来越复杂,传统 RAG 方法的局限性也日益凸显。如今,我们正在见证从简单的线性 RAG 管道向智能、自适应系统的演进,这些系统能够根据查询的复杂性和上下文动态调整其检索和生成策略。
在这份全面的指南中,我们将探讨智能体 RAG (Agentic RAG) 系统,特别关注使用 LangGraph 和 Google Gemini 实现自适应 RAG。但在我们深入实现之前,让我们先了解一下自适应 RAG。
2. 自适应 RAG 自适应 RAG 是一种先进的 RAG 策略,它智能地结合了(1)动态查询分析和(2)主动/自我纠正机制 。
自适应 RAG 代表了最复杂的演进,它解决了一个根本性的洞见:并非所有查询都是生而平等的 。研究表明,现实世界中的查询在复杂程度上表现出巨大差异:
简单查询 :“巴黎是哪个国家的首都?”——可以直接由大语言模型(LLM)回答
多跳查询 :“占领马拉科夫的人是什么时候来到菲利普斯堡所在地区的?”——需要四个推理步骤
核心问题 :正如比较图所示:
(A) 单步方法 :对简单查询高效,但不足以应对复杂的多跳推理
(B) 多步方法 :对复杂查询功能强大,但对简单查询引入了不必要的计算开销
(C) 自适应方法 :使用查询复杂性分类器将每个查询路由到最合适的策略
2.1 理解自适应 RAG 工作流程 观察实现图,我们可以看到自适应 RAG 是如何通过一个基于图的状态管理系统来编排一个智能决策过程的:
1. 查询路由与分类 系统从一个经过训练的复杂性分类器开始,该分类器分析传入的问题。这不仅仅是简单的关键词匹配;它是一个复杂的评估,用以确定:
查询是否根本不需要检索(参数化知识足够)
如果需要检索,需要何种程度的复杂性
最佳策略范围从无检索、单步到多跳方法
2. 动态知识获取策略 基于复杂性分类,系统智能地在以下选项之间路由:
基于索引的检索 :用于可从现有知识库回答的查询
网页搜索 :用于需要新鲜信息或当本地检索失败时的查询
无检索 :用于可直接从模型的参数化知识回答的查询
3. 多阶段质量保证 系统在多个决策点实施全面的评估:
文档相关性评估 :使用置信度评分来评估检索质量
幻觉检测 :验证生成的答案是否基于检索到的证据
答案质量评估 :确保响应充分解决了原始问题
3. 我们要构建什么? 在本文中,我们探讨开发一个先进的检索增强生成(RAG)系统,该系统能够就信息检索和路由做出智能决策。此实现展示了能够智能路由用户问题的复杂查询分析机制,建立强大的评估框架以确保响应质量,并创建能够在多个信息源之间无缝切换的自适应架构。
这份全面的指南提出了对原始 LangChain 实现 的重构方法,优先考虑增强代码可读性、改进可维护性和卓越的开发者体验。该实现受到 Marco 的 GitHub 仓库 的启发,该仓库本身也参考了 mistralai 的 GitHub 仓库 的工作。我们将使用以下技术实现一个自适应 RAG 系统:
LangGraph 用于编排复杂、有状态的工作流程
Google Gemini 作为我们的主要语言模型
向量数据库 用于高效的文档检索
网页搜索集成 用于实时信息访问
全面的评估框架 用于质量保证
4. 项目结构 在深入实现之前,让我们了解一下我们将要构建的完整项目结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 building-adaptive-rag/ ├── src/ # 源代码 │ ├── workflow/ # 核心工作流逻辑 │ │ ├── chains/ # LLM 处理链 │ │ │ ├── answer_grader.py │ │ │ ├── generation.py │ │ │ ├── hallucination_grader.py │ │ │ ├── retrieval_grader.py │ │ │ └── router.py │ │ ├── nodes/ # 工作流节点 │ │ │ ├── generate.py │ │ │ ├── grade_documents.py │ │ │ ├── retrieve.py │ │ │ └── web_search.py │ │ ├── consts.py # 节点常量 │ │ ├── graph.py # 主要工作流编排 │ │ └── state.py # 状态管理 │ ├── cli/ # 命令行界面 │ │ └── main.py # 交互式 CLI │ └── models/ # 模型配置 │ └── model.py # LLM 和嵌入模型 ├── data/ # 数据处理 │ └── ingestion.py # 文档摄入和向量存储 ├── assets/ # 静态文件和图片 │ ├── LangChain-logo.png │ └── Langgraph Adaptive Rag.png ├── tests/ # 测试文件 │ ├── __init__.py │ └── test_chains.py # 链测试套件 ├── .env # 环境变量 ├── .gitignore ├── main.py # 应用入口点 ├── README.md └── requirements.txt
这种结构遵循一个清晰、模块化的架构,其中每个组件都有特定的职责。workflow 目录包含所有核心逻辑,chains 处理 LLM 操作,nodes 管理工作流步骤,而主要的 graph.py 则编排整个系统。
4.1 使用 UV 设置环境 让我们首先使用 UV 来设置我们的开发环境,UV 是一个快速的 Python 包管理器。首先,确保你已经安装了 UV:
1 curl -LsSf <https://astral.sh/uv/install.sh> | sh
现在,让我们创建我们的项目目录并设置虚拟环境:
1 2 3 4 5 mkdir building-adaptive-rag cd building-adaptive-rag uv venv --python 3.10 source .venv/bin/activate # 在 Windows 上是: .venv\Scripts\activate
创建一个 requirements.txt 文件,包含所有必要的依赖项:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 beautifulsoup4 langchain-community tiktoken langchainhub langchain langgraph tavily-python langchain-openai python-dotenv black isort pytest langchain-chroma langchain-tavily==0.1.5 langchain_aws langchain_google_genai
使用 UV 安装所有依赖项:
1 uv pip install -r requirements.txt
通过创建一个 .env 文件来设置你的环境变量:
1 2 3 4 5 6 GOOGLE_API_KEY=your_google_api_key_here TAVILY_API_KEY=your_tavily_api_key_here # 用于网页搜索功能 LANGCHAIN_API_KEY=your_langchain_api_key_here # 可选,用于追踪 LANGCHAIN_TRACING_V2=true # 可选 LANGCHAIN_ENDPOINT=https://api.smith.langchain.com # 可选 LANGCHAIN_PROJECT=agentic-rag # 可选
5. 实现指南 现在,让我们一步步构建我们的智能体 RAG 系统。我将按照最有助于理解系统流程的顺序来解释每个组件。
步骤 1:定义状态管理系统和基本常量 我们从系统的基础——状态管理开始。这至关重要,因为它定义了信息如何在我们的图中流动。
创建 src/workflow/state.py :
1 2 3 4 5 6 7 8 9 from typing import List , TypedDictclass GraphState (TypedDict ): """包含查询、文档和控制标志的工作流状态对象。""" question: str generation: str web_search: bool documents: List [str ]
这个 GraphState 类作为流经我们图工作流中每个节点的中央数据结构。question 字段持有用户的输入查询,generation 存储 LLM 的响应,web_search 是一个布尔标志,决定我们是否需要搜索网页以获取额外信息,documents 包含从本地和网页源检索到的所有文档。
通过使用 TypedDict,我们确保了类型安全,同时保持了我们动态工作流所需的灵活性。
创建 src/workflow/consts.py :
1 2 3 4 5 RETRIEVE = "retrieve" GRADE_DOCUMENTS = "grade_documents" GENERATE = "generate" WEBSEARCH = "websearch"
这些常量定义了我们图节点的名称,并有助于在整个代码库中保持一致性。拥有集中的常量使得重构更加容易,并减少了在工作流定义中引用节点名称时出现拼写错误的风险。
步骤 2:定义聊天模型和嵌入模型 创建 src/models/model.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from dotenv import load_dotenvfrom langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings load_dotenv() llm_model = ChatGoogleGenerativeAI( model="gemini-2.0-flash" , temperature=0 , ) embed_model = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004" )
步骤 3:构建文档摄入管道 创建 data/ingestion.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import osfrom dotenv import load_dotenvfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain_chroma import Chromafrom langchain_community.document_loaders import WebBaseLoaderfrom src.models.model import embed_modelload_dotenv() def create_vectorstore (): """创建或加载用于文档检索的向量存储。""" chroma_path = "./chroma_langchain_db" if os.path.exists(chroma_path): print ("正在加载现有向量存储..." ) vectorstore = Chroma( persist_directory=chroma_path, embedding_function=embed_model, collection_name="rag-chroma" , ) return vectorstore.as_retriever() print ("正在创建新的向量存储..." ) urls = [ "https://lilianweng.github.io/posts/2023-06-23-agent/" , "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/" , "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/" , ] docs = [WebBaseLoader(url).load() for url in urls] docs_list = [item for sublist in docs for item in sublist] text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=250 , chunk_overlap=0 ) doc_splits = text_splitter.split_documents(docs_list) vectorstore = Chroma.from_documents( documents=doc_splits, collection_name="rag-chroma" , embedding=embed_model, persist_directory=chroma_path, ) print ("向量存储已创建!" ) return vectorstore.as_retriever() retriever = create_vectorstore()
这个摄入管道构成了我们本地知识库的骨干。我们首先加载环境变量,然后定义一个包含关于 AI 智能体、提示工程和对抗性攻击高质量内容的精选 URL 列表。
WebBaseLoader 从这些 URL 获取内容并将其加载到文档对象中。然后我们使用 RecursiveCharacterTextSplitter 将这些文档分解成更小、易于管理的 250 个 token 的块,这对于嵌入和检索是最佳的。分割器使用 tiktoken 编码来确保准确的 token 计数。
最后,我们创建一个 Chroma 向量存储,它将在本地持久化我们的嵌入,使用 Google 的 text-embedding-004 模型 来获得高质量的语义表示。
步骤 4:构建查询路由链 创建 graph/chains/router.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from typing import Literal from langchain_core.prompts import ChatPromptTemplatefrom pydantic import BaseModel, Fieldimport sysimport ossys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))) from src.models.model import llm_modelclass RouteQuery (BaseModel ): """将用户查询路由到最相关的数据源。""" datasource: Literal ["vectorstore" , "websearch" ] = Field( ..., description="根据用户问题选择将其路由到网页搜索或向量存储。" , ) llm = llm_model structured_llm_router = llm.with_structured_output(RouteQuery) system = """您是一位专家,擅长将用户问题路由到向量存储或网页搜索。 向量存储包含与智能体、提示工程和对抗性攻击相关的文档。 对于这些主题的问题,请使用向量存储。对于所有其他问题,请使用网页搜索。""" route_prompt = ChatPromptTemplate.from_messages( [ ("system" , system), ("human" , "{question}" ), ] ) question_router = route_prompt | structured_llm_router
查询路由器是我们系统的第一个决策点,它决定了回答用户问题的最佳来源。我们定义了一个 RouteQuery Pydantic 模型,将路由器的输出限制为 “vectorstore” 或 “websearch”,确保对 LLM 的决策进行可靠的解析。
系统提示明确定义了我们本地知识库的范围,指示路由器对关于智能体、提示工程和对抗性攻击的问题使用向量存储,而将其他所有内容路由到网页搜索。
这种智能路由避免了对我们拥有全面本地知识的主题进行不必要的网页搜索。
步骤 5:创建文档检索评分器 创建 src/workflow/chains/retrieval_grader.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from langchain_core.prompts import ChatPromptTemplatefrom pydantic import BaseModel, Fieldfrom src.models.model import llm_modelclass GradeDocuments (BaseModel ): """用于对检索到的文档进行相关性检查的二元评分。""" binary_score: str = Field( description="文档与问题相关,'yes' 或 'no'" ) llm = llm_model structured_llm_grader = llm.with_structured_output(GradeDocuments) system = """您是一位评估员,正在评估检索到的文档与用户问题的相关性。\n 如果文档包含与问题相关的关键词或语义含义,请将其评为相关。\n 给出一个二元分数 'yes' 或 'no' 来表明文档是否与问题相关。""" grade_prompt = ChatPromptTemplate.from_messages( [ ("system" , system), ("human" , "检索到的文档: \n\n {document} \n\n 用户问题: {question}" ), ] ) retrieval_grader = grade_prompt | structured_llm_grader
检索评分器 作为一个质量控制机制,评估检索到的文档是否真的与用户的问题相关。这个组件至关重要,因为仅凭向量相似性并不能保证相关性;文档可能在语义上相似,但在上下文中可能不合适。
GradeDocuments 模型确保我们从 LLM 得到一个清晰的二元决策。系统提示指示评分器寻找明确的关键词和语义含义,提供全面的相关性评估。这个评分步骤防止了不相关的文档污染我们的生成过程,并在本地文档不足时触发网页搜索。
步骤 6:构建幻觉检测系统 创建 src/workflow/chains/hallucination_grader.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from langchain_core.prompts import ChatPromptTemplatefrom langchain_core.runnables import RunnableSequencefrom pydantic import BaseModel, Fieldfrom src.models.model import llm_modelclass GradeHallucinations (BaseModel ): """用于判断生成答案中是否存在幻觉的二元评分。""" binary_score: bool = Field( description="答案是否基于事实,'yes' 或 'no'" ) llm = llm_model structured_llm_grader = llm.with_structured_output(GradeHallucinations) system = """您是一位评估员,正在评估 LLM 的生成内容是否基于/得到一组检索到的事实的支持。\n 给出一个二元分数 'yes' 或 'no'。'Yes' 表示答案是基于/得到这组事实的支持。""" hallucination_prompt = ChatPromptTemplate.from_messages( [ ("system" , system), ("human" , "一组事实: \n\n {documents} \n\n LLM 生成内容: {generation}" ), ] ) hallucination_grader: RunnableSequence = hallucination_prompt | structured_llm_grader
幻觉评分器 可能是确保我们 RAG 系统可靠性最关键的组件。它将生成的响应与检索到的文档进行比较,以验证信息是否基于事实。
这可以防止系统生成听起来合理但实际上不正确的响应。评分器使用布尔值分数来表示生成内容是否得到所提供事实的支持。
当检测到幻觉时,我们的系统可以触发重新生成或寻求额外信息,确保用户收到准确可靠的响应。
步骤 7:创建答案质量评分器 创建 src/workflow/chains/answer_grader.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from langchain_core.prompts import ChatPromptTemplatefrom langchain_core.runnables import RunnableSequencefrom pydantic import BaseModel, Fieldfrom src.models.model import llm_modelclass GradeAnswer (BaseModel ): binary_score: bool = Field( description="答案是否解决了问题,'yes' 或 'no'" ) llm = llm_model structured_llm_grader = llm.with_structured_output(GradeAnswer) system = """您是一位评估员,正在评估一个答案是否解决了一个问题。\n 给出一个二元分数 'yes' 或 'no'。'Yes' 表示答案解决了问题。""" answer_prompt = ChatPromptTemplate.from_messages( [ ("system" , system), ("human" , "用户问题: \n\n {question} \n\n LLM 生成内容: {generation}" ), ] ) answer_grader: RunnableSequence = answer_prompt | structured_llm_grader
答案评分器评估生成的响应是否真正解决了用户的问题。即使一个响应在事实上是准确的,它也可能没有直接回答用户所问的问题。
这个组件确保我们的系统提供既准确又与特定查询相关的响应。评分器检查生成内容是否解决了问题,如果没解决,系统可以触发额外的检索或网页搜索以找到更合适的信息。
步骤 8:构建生成链 创建 src/workflow/chains/generation.py :
1 2 3 4 5 6 7 from langchain import hubfrom langchain_core.output_parsers import StrOutputParserfrom src.models.model import llm_modelllm = llm_model prompt = hub.pull("rlm/rag-prompt" ) generation_chain = prompt | llm | StrOutputParser()
生成链负责创建对用户问题的实际响应。我们利用 LangChain Hub 中一个经过验证的 RAG 提示,该提示已为检索增强生成任务进行了优化。这个提示模板知道如何有效地将检索到的上下文与用户的问题相结合,以生成连贯、信息丰富的响应。
该链使用 StrOutputParser 确保我们得到干净的字符串输出,可以被后续组件轻松处理。
步骤 9:实现检索节点 创建 src/workflow/nodes/retrieve.py :
1 2 3 4 5 6 7 8 9 10 11 from typing import Any , Dict from src.workflow.state import GraphStatefrom data.ingestion import retrieverdef retrieve (state: GraphState ) -> Dict [str , Any ]: """从向量存储中检索文档。""" print ("---RETRIEVE---" ) question = state["question" ] documents = retriever.invoke(question) return {"documents" : documents, "question" : question}
检索节点是我们工作流程的第一步,它从我们的本地向量存储中获取相关文档。它从状态中获取用户的问题,并使用我们预先配置的检索器来查找语义上最相似的文档。
该函数返回检索到的文档和原始问题,为工作流程中的下一个节点更新状态。此节点代表了传统的 RAG 检索步骤,但通过我们后续的评分和决策过程得到了增强。
步骤 10:创建文档评分节点 创建 src/workflow/nodes/grade_documents.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from typing import Any , Dict from src.workflow.chains.retrieval_grader import retrieval_graderfrom src.workflow.state import GraphStatedef grade_documents (state: GraphState ) -> Dict [str , Any ]: """ 确定检索到的文档是否与问题相关。 如果任何文档不相关,我们将设置一个标志以运行网页搜索。 Args: state (dict): 当前图状态 Returns: state (dict): 过滤掉不相关文档并更新 web_search 状态 """ print ("---CHECK DOCUMENT RELEVANCE TO QUESTION---" ) question = state["question" ] documents = state["documents" ] filtered_docs = [] web_search = False for d in documents: score = retrieval_grader.invoke( {"question" : question, "document" : d.page_content} ) grade = score.binary_score if grade.lower() == "yes" : print ("---GRADE: DOCUMENT RELEVANT---" ) filtered_docs.append(d) else : print ("---GRADE: DOCUMENT NOT RELEVANT---" ) web_search = True continue return {"documents" : filtered_docs, "question" : question, "web_search" : web_search}
文档评分节点通过评估每个检索到的文档与用户问题的相关性来实现我们的质量控制机制。它遍历所有检索到的文档,并使用我们的检索评分器来评估其相关性。被认为是相关的文档会被添加到过滤后的列表中,而不相关的文档则被丢弃。
重要的是,如果发现任何文档不相关,该节点会将 web_search 标志设置为 True,这表明我们需要来自外部源的额外信息。这种自适应行为确保了即使用户在本地知识不足的情况下也能得到全面的答案。
步骤 11:构建网页搜索节点 创建 src/workflow/nodes/web_search.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from typing import Any , Dict from dotenv import load_dotenvfrom langchain.schema import Documentfrom langchain_tavily import TavilySearchfrom src.workflow.state import GraphStateload_dotenv() web_search_tool = TavilySearch(max_results=3 ) def web_search (state: GraphState ) -> Dict [str , Any ]: print ("---WEB SEARCH---" ) question = state["question" ] documents = state.get("documents" , []) tavily_results = web_search_tool.invoke({"query" : question})["results" ] joined_tavily_result = "\n" .join( [tavily_result["content" ] for tavily_result in tavily_results] ) web_results = Document(page_content=joined_tavily_result) if documents: documents.append(web_results) else : documents = [web_results] return {"documents" : documents, "question" : question}
网页搜索节点通过查询互联网获取当前和全面的信息,将我们系统的知识扩展到本地向量存储之外。它使用 Tavily,一个为 AI 应用优化的搜索 API,来查找最相关的网页结果。
该节点检索最多 3 个结果,将其内容合并成一个单独的文档,并将其添加到我们的文档收集中。这种混合方法确保我们的系统既能处理使用本地知识的领域特定查询,也能处理需要来自网络的当前信息的通用查询。
步骤 12:实现生成节点 创建 src/workflow/nodes/generate.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 from typing import Any , Dict from src.workflow.chains.generation import generation_chainfrom src.workflow.state import GraphStatedef generate (state: GraphState ) -> Dict [str , Any ]: """使用文档和问题生成答案。""" print ("---GENERATE---" ) question = state["question" ] documents = state["documents" ] generation = generation_chain.invoke({"context" : documents, "question" : question}) return {"documents" : documents, "question" : question, "generation" : generation}
生成节点是我们系统创建对用户问题的实际响应的地方。它接收用户的问题和所有收集到的文档(来自本地检索和网页搜索),并使用我们的生成链来创建一个全面的答案。该节点在状态中维护所有上下文信息,同时添加生成的响应,使后续节点能够评估生成的质量和准确性。
步骤 13:构建完整的图工作流 创建 src/workflow/graph.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 from dotenv import load_dotenvfrom langgraph.graph import END, StateGraphfrom src.workflow.chains.answer_grader import answer_graderfrom src.workflow.chains.hallucination_grader import hallucination_graderfrom src.workflow.chains.router import RouteQuery, question_routerfrom src.workflow.consts import GENERATE, GRADE_DOCUMENTS, RETRIEVE, WEBSEARCHfrom src.workflow.nodes.generate import generatefrom src.workflow.nodes.grade_documents import grade_documentsfrom src.workflow.nodes.retrieve import retrievefrom src.workflow.nodes.web_search import web_searchfrom src.workflow.state import GraphStateload_dotenv() def decide_to_generate (state ): """路由到网页搜索或生成。""" print ("---ASSESS DOCUMENTS---" ) return WEBSEARCH if state["web_search" ] else GENERATE def grade_generation_grounded_in_documents_and_question (state ): """评估答案质量和事实依据。""" print ("---CHECK HALLUCINATIONS---" ) question = state["question" ] documents = state["documents" ] generation = state["generation" ] score = hallucination_grader.invoke({"documents" : documents, "generation" : generation}) if score.binary_score: score = answer_grader.invoke({"question" : question, "generation" : generation}) return "useful" if score.binary_score else "not useful" else : return "not supported" def route_question (state: GraphState ) -> str : """将问题路由到向量存储或网页搜索。""" print ("---ROUTE QUESTION---" ) source: RouteQuery = question_router.invoke({"question" : state["question" ]}) return WEBSEARCH if source.datasource == WEBSEARCH else RETRIEVE workflow = StateGraph(GraphState) workflow.add_node(RETRIEVE, retrieve) workflow.add_node(GRADE_DOCUMENTS, grade_documents) workflow.add_node(GENERATE, generate) workflow.add_node(WEBSEARCH, web_search) workflow.set_conditional_entry_point( route_question, {WEBSEARCH: WEBSEARCH, RETRIEVE: RETRIEVE}, ) workflow.add_edge(RETRIEVE, GRADE_DOCUMENTS) workflow.add_conditional_edges( GRADE_DOCUMENTS, decide_to_generate, {WEBSEARCH: WEBSEARCH, GENERATE: GENERATE}, ) workflow.add_conditional_edges( GENERATE, grade_generation_grounded_in_documents_and_question, {"not supported" : GENERATE, "useful" : END, "not useful" : WEBSEARCH}, ) workflow.add_edge(WEBSEARCH, GENERATE) app = workflow.compile () app.get_graph().draw_mermaid_png(output_file_path="graph.png" )
这是我们智能体 RAG 系统的核心,我们将所有组件编排成一个连贯的工作流。
图从一个条件入口点开始,根据路由器的决定将问题路由到网页搜索或本地检索。
工作流包括三个关键的决策函数,实现了我们的自适应逻辑。decide_to_generate 函数根据文档相关性分数决定是继续生成还是搜索网页。
grade_generation_grounded_in_documents_and_question 函数通过检查幻觉和答案相关性来实现我们的自我纠正机制,可能会触发重新生成或额外搜索。
route_question 函数处理初始查询路由。
编译后的工作流创建了一个强大的系统,可以根据每一步信息的质量调整其行为,确保响应可靠且全面。
步骤 14:创建主应用程序入口点 创建 src/cli/main.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from dotenv import load_dotenvfrom src.workflow.graph import appload_dotenv() def format_response (result ): """从工作流结果中提取响应。""" if isinstance (result, dict ) and "generation" in result: return result["generation" ] elif isinstance (result, dict ) and "answer" in result: return result["answer" ] else : return str (result) def main (): """自适应 RAG 系统的 CLI。""" print ("Adaptive RAG System" ) print ("输入 'quit' 退出。\n" ) while True : try : question = input ("Question: " ).strip() if question.lower() in ['quit' , 'exit' , 'q' , '' ]: break print ("正在处理..." ) result = None for output in app.stream({"question" : question}): for key, value in output.items(): result = value if result: print (f"\nAnswer: {format_response(result)} " ) else : print ("没有生成响应。" ) except KeyboardInterrupt: break except Exception as e: print (f"错误: {str (e)} " ) if __name__ == "__main__" : main()
主应用程序文件为测试和运行我们的智能体 RAG 系统提供了一个简单的入口点。它加载环境变量,导入我们编译好的图应用程序,并通过终端演示高级 RAG 聊天机器人的工作方式。
创建 main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 from src.cli.main import mainif __name__ == "__main__" : main()``` **创建 `tests/test_chains.py`**: ``` python from pprint import pprintimport pytestfrom dotenv import load_dotenvload_dotenv() from src.workflow.chains.generation import generation_chainfrom src.workflow.chains.hallucination_grader import (GradeHallucinations, hallucination_grader)from src.workflow.chains.retrieval_grader import GradeDocuments, retrieval_graderfrom src.workflow.chains.router import RouteQuery, question_routerfrom data.ingestion import retrieverdef test_retrival_grader_answer_yes () -> None : """测试检索评分器识别相关文档。""" question = "什么是检索增强生成?" docs = retriever.invoke(question) doc_txt = docs[1 ].page_content res: GradeDocuments = retrieval_grader.invoke( {"question" : question, "document" : doc_txt} ) assert res.binary_score == "yes" def test_retrival_grader_answer_no () -> None : """测试检索评分器识别不相关文档。""" question = "什么是检索增强生成?" docs = retriever.invoke(question) doc_txt = docs[1 ].page_content res: GradeDocuments = retrieval_grader.invoke( {"question" : "如何用蘑菇煮意大利面" , "document" : doc_txt} ) assert res.binary_score == "no" def test_generation_chain () -> None : """测试生成链产生答案。""" question = "语言模型是如何工作的?" docs = retriever.invoke(question) generation = generation_chain.invoke({"context" : docs, "question" : question}) pprint(generation) def test_hallucination_grader_answer_yes () -> None : """测试幻觉评分器识别基于事实的内容。""" question = "向量数据库有什么好处?" docs = retriever.invoke(question) generation = generation_chain.invoke({"context" : docs, "question" : question}) res: GradeHallucinations = hallucination_grader.invoke( {"documents" : docs, "generation" : generation} ) assert res.binary_score def test_hallucination_grader_answer_no () -> None : """测试幻觉评分器识别幻觉内容。""" question = "向量数据库有什么好处?" docs = retriever.invoke(question) res: GradeHallucinations = hallucination_grader.invoke( { "documents" : docs, "generation" : "要烤一个完美的巧克力蛋糕,你需要将烤箱预热到350度" , } ) assert not res.binary_score def test_router_to_vectorstore () -> None : """测试路由器将AI查询导向向量存储。""" question = "RAG和微调有什么区别?" res: RouteQuery = question_router.invoke({"question" : question}) assert res.datasource == "vectorstore" def test_router_to_websearch () -> None : """测试路由器将通用查询导向网页搜索。""" question = "东京现在天气如何?" res: RouteQuery = question_router.invoke({"question" : question}) assert res.datasource == "websearch"
这个全面的测试套件独立验证我们智能体 RAG 系统的每个组件。
测试涵盖了文档相关性评分的正反两种情况,确保我们的评分器能正确识别相关和不相关的文档。
幻觉检测测试验证了我们的系统能够区分基于事实的响应和虚构的响应。
路由器测试确认了问题能根据其内容被正确地路由到向量存储或网页搜索。
这些测试将确保每个组件在集成到完整工作流之前都能独立正常工作。
6. 运行系统 要运行你的智能体 RAG 系统:
运行主应用程序:
1 2 3 4 # 确保你处于虚拟环境中 source .venv/bin/activate python main.py
预期的界面和响应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 (building-adaptive-rag) piyushagni5@Piyushs-MacBook-Pro building-adaptive-rag % python main.py 正在加载现有向量存储... Adaptive RAG System 输入 'quit' 退出。 Question: what is agent memory? Processing... ---ROUTE QUESTION--- ---RETRIEVE--- ---CHECK DOCUMENT RELEVANCE TO QUESTION--- ---GRADE: DOCUMENT RELEVANT--- ---GRADE: DOCUMENT RELEVANT--- ---GRADE: DOCUMENT NOT RELEVANT--- ---GRADE: DOCUMENT RELEVANT--- ---ASSESS DOCUMENTS--- ---WEB SEARCH--- ---GENERATE--- ---CHECK HALLUCINATIONS--- Answer: Agent memory enables AI agents to maintain persistent states, learn from interactions, and develop long-term relationships with users. It allows AI agents to store and recall past experiences to improve decision-making, perception, and overall performance. Agent memory includes short-term memory for in-context learning and long-term memory for retaining information over extended periods. Question:
让我们根据执行跟踪和代码分析来理解幕后发生了什么。
分步工作流执行
---ROUTE QUESTION--- : 路由器确定“什么是智能体记忆? ”应首先发送到向量存储 (RETRIEVE ) 而不是网页搜索。
---RETRIEVE--- : 系统从向量存储中检索了 4 个文档 。
---CHECK DOCUMENT RELEVANCE TO QUESTION--- : 每个文档都进行了相关性评分:文档 1 → 相关,文档 2 → 相关,文档 3 → 不相关,文档 4 → 不相关
---ASSESS DOCUMENTS--- : 由于某些文档不相关,系统设置了 web_search = True。
---WEB SEARCH--- : 系统使用 Tavily 进行了网页搜索,并将结果添加到了相关的向量存储文档中。
---GENERATE--- : 最终答案是使用相关的向量存储文档和网页搜索结果生成的。
---CHECK HALLUCINATIONS--- : 系统验证了答案是基于检索到的来源的。
所以,如果我们总结所有步骤,我们可以说最终结果使用了两种来源。最终答案是以下内容的组合:
来自向量存储的 2 个相关文档
网页搜索结果(因为一些向量存储文档不相关而被添加)
让我们看看系统如何组合来源 。在 web_search 节点中,你可以看到关键逻辑:
1 2 3 4 5 if documents: documents.append(web_results) else : documents = [web_results]
然后在 generate.py 中,系统使用所有文档生成答案:
1 2 generation = generation_chain.invoke({"context" : documents, "question" : question}) 最终答案来源于:
50% 向量存储(2 个关于智能体记忆的相关文档)
50% 网页搜索(关于智能体记忆的新鲜网页结果)
这种自适应行为是 RAG 系统的关键特性。它智能地用最新的网页搜索数据来补充不足或部分相关的向量存储结果,确保提供最全面、最准确的响应。
执行测试套件 :
最后,我们可以运行测试脚本来验证所有节点和功能是否正常工作。在运行测试套件之前,请确保你的虚拟环境已激活:
1 python -m pytest tests/ -v
预期响应 :
7. 结论 使用 LangGraph 构建智能体 RAG 系统代表了对传统 RAG 方法的重大进步。通过融入智能决策、自我反思和自适应策略,我们创建了能够推理信息质量、调整检索策略并在必要时自我纠正的系统。
这个实现为需要可靠、准确和智能信息检索与生成的生产级应用程序提供了坚实的基础。模块化设计允许根据特定的用例和需求轻松扩展和定制。
8. 局限性/后续步骤 为了进一步增强此系统,请考虑:
LLM 回退状态 :添加一个 llm_fallback 状态来处理与网页搜索和向量存储内容都无关的查询。例如,当用户说“嗨”或提出一般的对话性问题时,系统应路由到直接的 LLM 响应,而不是尝试检索或网页搜索。
增强的路由器 (router.py): → 在 RouteQuery 模型中添加 “llm_fallback“ 作为第三个路由选项 → 更新系统提示,为何时使用这三种路由提供明确的指导:向量存储 (用于文档特定查询)、网页搜索 (用于当前信息需求)和 LLM 回退 (用于一般对话、问候或超出存储文档和网页搜索范围的查询)