1. GraphRAG: 使用知识图谱增强的检索生成系统 #
GraphRAG 是一个结合知识图谱 (Knowledge Graph) 与 RAG(Retrieval-Augmented Generation, 检索增强生成)的智能问答/数据分析系统。它旨在提升复杂场景下的推理能力、可解释性和知识关联性。
通过整合 Neo4j 图数据库与大语言模型(如 DeepSeek),GraphRAG 支持结构化的知识检索、多跳推理和深度信息整合,极大弥补了传统 RAG 系统在长尾知识和推理链上的不足。
核心特点:
- 将文本文档抽取为知识图谱(实体、关系、属性),存储于 Neo4j
- 提供结构化子图检索与路径发现,支持多跳知识问答
- 利用大模型在子图上下文基础上生成高质量、可追溯的答案
- 适用于企业知识管理、学术检索、信息整合分析等多种场景
2. GraphRAG 架构 #
2.1 整体架构 #
GraphRAG 系统由三个核心模块组成:
- 知识图谱构建模块:将原始文档转换为结构化的知识图谱
- 图增强检索模块:基于图谱结构进行智能检索
- 推理与生成模块:结合图谱上下文生成回答
工作流程:
原始文档 → 实体提取 → 关系识别 → 知识图谱 → 图查询 → 子图提取 → 上下文构建 → LLM生成回答2.2 知识图谱构建模块 #
知识图谱构建是 GraphRAG 的基础,它将非结构化的文档转换为结构化的知识图谱。
构建流程:
- 实体提取:识别文档中的人物、组织、概念等实体
- 关系识别:确定实体之间的关联(如"工作在"、"属于"、"导致")
- 图谱存储:将实体和关系存储到图数据库中
前置知识:实体和关系
- 实体(Entity):现实世界中的对象,如"微软"、"AI"、"谷歌"
- 关系(Relationship):实体之间的关联,如"竞争"、"开发"、"属于"
- 属性(Property):实体的特征,如"微软"的"行业"="科技"
2.3 图增强检索模块 #
图增强检索是 GraphRAG 的核心,它利用图谱结构进行更智能的检索。
检索方式:
- 子图检索:查找与查询相关的子图结构
- 路径发现:寻找实体间的连接路径
- 社区检测:识别相关的实体群组
2.4 推理与生成模块 #
推理与生成模块结合图谱上下文进行回答生成,利用 LLM 的强大能力生成自然语言回答。
3. 环境配置 #
uv init
uv add python-dotenv neo4j langchain_deepseek langchain| 包名 | 作用简介 |
|---|---|
| python-dotenv | 加载 .env 文件中的环境变量,便于安全管理配置参数 |
| neo4j | Python 客户端,用于连接和操作 Neo4j 图数据库 |
| langchain_deepseek | DeepSeek 相关的 LangChain 集成,支持 AI/LLM 工作流 |
| langchain | 构建和编排大语言模型应用(如对话机器人、问答系统) |
4. 配置文件 #
4.1. config.py #
config.py
"""
1. 配置加载模块
负责加载 .env 环境变量并提供统一的配置读取方法。
"""
import os
from dotenv import load_dotenv
def load_config() -> dict:
load_dotenv()
return {
"neo4j_uri": os.getenv("NEO4J_URI", "bolt://localhost:7687"),
"neo4j_user": os.getenv("NEO4J_USER", "neo4j"),
"neo4j_password": os.getenv("NEO4J_PASSWORD", "12345678"),
"deepseek_api_key": os.getenv("DEEPSEEK_API_KEY", "sk-924916a162544410964397598709766a"),
"model_name": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
}
4.2. pipeline.py #
pipeline.py
"""
2. GraphRAG 业务管线。
"""
from config import load_config
class GraphRAG:
"""1. 封装检索与生成的完整 GraphRAG 系统。"""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
print(neo4j_uri, neo4j_user, neo4j_password, deepseek_api_key, model_name)
def build_rag_from_env() -> GraphRAG:
"""2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
cfg = load_config()
return GraphRAG(
neo4j_uri=cfg["neo4j_uri"],
neo4j_user=cfg["neo4j_user"],
neo4j_password=cfg["neo4j_password"],
deepseek_api_key=cfg["deepseek_api_key"],
model_name=cfg["model_name"],
)
4.3. main.py #
main.py
"""
+1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""
# 从 pipeline 模块导入 build_rag_from_env 函数
from pipeline import build_rag_from_env
# 定义主函数
def main():
# 调用 build_rag_from_env 函数,启动 GraphRAG 工作流
build_rag_from_env()
# 判断是否以主模块运行本文件
if __name__ == "__main__":
# 运行主函数
main()4.4 .env #
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=12345678
DEEPSEEK_API_KEY=sk-924916a162544410964397598709766a
DEEPSEEK_MODEL=deepseek-chat5. 连接Neo4j #
5.1. graph_store.py #
graph_store.py
# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""
# 导入 Neo4j 官方驱动
from neo4j import GraphDatabase
# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
# 封装 Neo4j 的节点与关系操作。
"""封装 Neo4j 的节点与关系操作。"""
# 构造函数,初始化数据库连接
def __init__(self, uri: str, user: str, password: str):
# 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
self.driver = GraphDatabase.driver(uri, auth=(user, password))
# 关闭数据库连接的方法
def close(self):
# 关闭数据库驱动,释放资源
"""关闭数据库连接。"""
self.driver.close()
5.2. pipeline.py #
pipeline.py
"""
2. GraphRAG 业务管线。
"""
from config import load_config
+from graph_store import GraphStore
class GraphRAG:
"""1. 封装检索与生成的完整 GraphRAG 系统。"""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
# 初始化图数据库连接
+ self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
+ def close(self):
+ """关闭数据库连接。"""
+ self.store.close()
def build_rag_from_env() -> GraphRAG:
"""2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
cfg = load_config()
return GraphRAG(
neo4j_uri=cfg["neo4j_uri"],
neo4j_user=cfg["neo4j_user"],
neo4j_password=cfg["neo4j_password"],
deepseek_api_key=cfg["deepseek_api_key"],
model_name=cfg["model_name"],
)
6. 实体提取 #
6.1. llm_client.py #
llm_client.py
"""
LLM 客户端与提取逻辑。
"""
# 导入类型提示相关模块
from typing import List, Dict, Any
# 导入 DeepSeek 的聊天模型
from langchain_deepseek import ChatDeepSeek
# 导入消息类型,用于与模型交互
from langchain_core.messages import HumanMessage, SystemMessage
# 导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser
# 定义 LLMExtractor 类,负责实体和关系提取
class LLMExtractor:
"""负责实体和关系提取的封装。"""
# 构造方法,初始化 LLMExtractor 实例
def __init__(self, model_name: str = "deepseek-chat"):
# 初始化 ChatDeepSeek LLM 实例
self.llm = ChatDeepSeek(model=model_name)
# 初始化用于解析 JSON 输出的解析器
self.parser = JsonOutputParser()
# 定义内部方法 _invoke,接收消息列表作为参数
def _invoke(self, messages: List[Any]) -> Any:
# 使用 ChatDeepSeek 的 LLM 和 JSON 解析器组成链,保证输出可被解析为 JSON
chain = self.llm | self.parser
# 调用链的 invoke 方法,并传入消息列表,获取模型输出
return chain.invoke(messages)
# 定义 extract_entities 方法,从传入文本中提取实体
def extract_entities(self, text: str) -> List[Dict[str, Any]]:
"""从文本中提取实体列表。"""
# 构造用于实体抽取的 Prompt
prompt = f"""
请从以下文本中提取所有实体,包括人物、组织、地点、概念等。
返回 JSON 格式,每个实体包含 "name"(名称)和 "type"(类型)字段。
文本:
{text}
返回格式:
[
{{"name": "实体名称", "type": "实体类型"}},
...
]
"""
# 构造消息列表,系统消息设定角色,人类消息为抽取指令
messages = [
SystemMessage(content="你是一个专业的实体提取助手。"),
HumanMessage(content=prompt),
]
try:
# 调用内部方法与大模型交互获取抽取结果
result = self._invoke(messages)
# 若返回结果为列表,则直接返回
if isinstance(result, list):
return result
# 若返回结果为字典,且包含 entities 字段,则返回 entities
if isinstance(result, dict):
return result.get("entities", []) if "entities" in result else []
except Exception:
# 捕获异常,失败时返回空列表
pass
# 默认返回空列表
return [] 6.2. graph_store.py #
graph_store.py
# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""
# 导入 Neo4j 官方驱动
from neo4j import GraphDatabase
# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
# 封装 Neo4j 的节点与关系操作。
"""封装 Neo4j 的节点与关系操作。"""
# 构造函数,初始化数据库连接
def __init__(self, uri: str, user: str, password: str):
# 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
self.driver = GraphDatabase.driver(uri, auth=(user, password))
# 关闭数据库连接的方法
def close(self):
# 关闭数据库驱动,释放资源
"""关闭数据库连接。"""
self.driver.close()
# 静态方法,用于转义 Neo4j 标签中的反引号(label可能用于节点或关系类型)
+ @staticmethod
# 定义 _escape_label 方法,接收一个字符串 label,返回转义后的字符串
+ def _escape_label(label: str) -> str:
# 如果 label 为空,则返回默认实体类型 "Entity"
+ if not label:
+ return "Entity"
# 将 label 中的所有反引号替换为两个反引号,防止 Cypher 注入问题
+ return str(label).replace("`", "``")
# 批量插入或合并实体节点的方法
+ def upsert_entities(self, entities):
# 批量插入/合并实体。
+ """批量插入/合并实体。"""
# 创建一个新的数据库会话
+ with self.driver.session() as session:
# 遍历每一个实体
+ for entity in entities:
# 跳过不是字典或缺少必要字段的实体
+ if not isinstance(entity, dict) or "name" not in entity or "type" not in entity:
+ continue
# 对类型标签进行转义,防止 Cypher 注入错误
+ escaped_type = self._escape_label(entity["type"])
# 构造 Cypher 查询语句,用于根据 name 和 type 合并实体节点
+ query = f"""
+ MERGE (e:`{escaped_type}` {{name: $name}})
+ RETURN e
+ """
# 执行合并节点的查询,将实体名称传入参数
+ session.run(query, name=entity["name"])
6.3. main.py #
main.py
# 1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""
1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""
# 从 pipeline 模块导入 build_rag_from_env 函数
from pipeline import build_rag_from_env
# 定义主函数
def main():
# 调用 build_rag_from_env 函数,启动 GraphRAG 工作流
+ graphrag = build_rag_from_env()
+ try:
# 定义一个包含示例文档的列表(只有一个字符串,内容涉及微软、谷歌、OpenAI等公司)
+ documents = [
+ """
+ 微软是一家美国科技公司,由比尔·盖茨和保罗·艾伦于1975年创立。
+ 微软在人工智能领域与谷歌、OpenAI等公司存在竞争关系。
+ 微软开发了 Azure 云平台,谷歌开发了 Google Cloud。
+ """
+ ]
# 打印当前进入知识图谱构建阶段的信息
+ print("阶段1:构建知识图谱")
# 调用 graphrag 的 build_knowledge_graph 方法,传入示例文档,进行知识图谱的构建
+ graphrag.build_knowledge_graph(documents)
+ except Exception as e:
+ print(f"发生错误: {e}")
+ finally:
+ graphrag.close()
# 判断是否以主模块运行本文件
if __name__ == "__main__":
# 运行主函数
main()
6.4. pipeline.py #
pipeline.py
"""
2. GraphRAG 业务管线。
"""
from config import load_config
from graph_store import GraphStore
+from llm_client import LLMExtractor
class GraphRAG:
"""1. 封装检索与生成的完整 GraphRAG 系统。"""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
# 初始化图数据库连接
self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
# 初始化实体抽取器
+ self.extractor = LLMExtractor(model_name=model_name)
def close(self):
"""关闭数据库连接。"""
+ self.store.close()
+ def build_knowledge_graph(self, documents):
# 定义方法:从文档列表中构建知识图谱
+ """从文档列表构建知识图谱。"""
# 遍历每一个输入文档
+ for doc in documents:
# 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
+ entities = self.extractor.extract_entities(doc)
# 将实体写入(或更新)到图数据库
+ self.store.upsert_entities(entities)
def build_rag_from_env() -> GraphRAG:
"""2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
cfg = load_config()
return GraphRAG(
neo4j_uri=cfg["neo4j_uri"],
neo4j_user=cfg["neo4j_user"],
neo4j_password=cfg["neo4j_password"],
deepseek_api_key=cfg["deepseek_api_key"],
model_name=cfg["model_name"],
)
7. 关系识别 #
7.1. graph_store.py #
graph_store.py
# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""
# 导入 Neo4j 官方驱动
from neo4j import GraphDatabase
# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
# 封装 Neo4j 的节点与关系操作。
"""封装 Neo4j 的节点与关系操作。"""
# 构造函数,初始化数据库连接
def __init__(self, uri: str, user: str, password: str):
# 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
self.driver = GraphDatabase.driver(uri, auth=(user, password))
# 关闭数据库连接的方法
def close(self):
# 关闭数据库驱动,释放资源
"""关闭数据库连接。"""
self.driver.close()
# 静态方法,用于转义 Neo4j 标签中的反引号(label可能用于节点或关系类型)
@staticmethod
# 定义 _escape_label 方法,接收一个字符串 label,返回转义后的字符串
def _escape_label(label: str) -> str:
# 如果 label 为空,则返回默认实体类型 "Entity"
if not label:
return "Entity"
# 将 label 中的所有反引号替换为两个反引号,防止 Cypher 注入问题
return str(label).replace("`", "``")
# 批量插入或合并实体节点的方法
def upsert_entities(self, entities):
# 批量插入/合并实体。
"""批量插入/合并实体。"""
# 创建一个新的数据库会话
with self.driver.session() as session:
# 遍历每一个实体
for entity in entities:
# 跳过不是字典或缺少必要字段的实体
if not isinstance(entity, dict) or "name" not in entity or "type" not in entity:
continue
# 对类型标签进行转义,防止 Cypher 注入错误
escaped_type = self._escape_label(entity["type"])
# 构造 Cypher 查询语句,用于根据 name 和 type 合并实体节点
query = f"""
MERGE (e:`{escaped_type}` {{name: $name}})
RETURN e
"""
# 执行合并节点的查询,将实体名称传入参数
+ session.run(query, name=entity["name"])
# 定义 upsert_relations 方法,用于批量插入/合并关系
+ def upsert_relations(self, relations, entities):
# 方法说明:批量插入/合并关系
+ """批量插入/合并关系。"""
# 创建 Neo4j 数据库会话
+ with self.driver.session() as session:
# 遍历每一个关系字典
+ for relation in relations:
# 检查关系必须是字典,并且包含 "source"、"target" 和 "type" 这三个字段
+ if not isinstance(relation, dict) or not all(k in relation for k in ("source", "target", "type")):
+ continue
# 在实体列表中查找与该关系的 source 匹配的实体
+ source_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["source"]), None)
# 在实体列表中查找与该关系的 target 匹配的实体
+ target_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["target"]), None)
# 如果找不到对应的实体则跳过本次循环
+ if not (source_entity and target_entity):
+ continue
# 对源实体类型进行转义,防止 Cypher 注入
+ escaped_source_type = self._escape_label(source_entity["type"])
# 对目标实体类型进行转义,防止 Cypher 注入
+ escaped_target_type = self._escape_label(target_entity["type"])
# 对关系类型进行转义,防止 Cypher 注入
+ escaped_relation_type = self._escape_label(relation["type"])
# 构建 Cypher 查询语句,实现实体节点的匹配与关系的插入或合并
+ query = f"""
+ MATCH (s:`{escaped_source_type}` {{name: $source_name}})
+ MATCH (t:`{escaped_target_type}` {{name: $target_name}})
+ MERGE (s)-[r:`{escaped_relation_type}`]->(t)
+ RETURN r
+ """
# 执行 Cypher 查询并传递参数,实现关系的插入/合并
+ session.run(
+ query,
+ source_name=relation["source"],
+ target_name=relation["target"],
+ )
7.2. llm_client.py #
llm_client.py
"""
LLM 客户端与提取逻辑。
"""
# 导入类型提示相关模块
from typing import List, Dict, Any
# 导入 DeepSeek 的聊天模型
from langchain_deepseek import ChatDeepSeek
# 导入消息类型,用于与模型交互
from langchain_core.messages import HumanMessage, SystemMessage
# 导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser
# 定义 LLMExtractor 类,负责实体和关系提取
class LLMExtractor:
"""负责实体和关系提取的封装。"""
# 构造方法,初始化 LLMExtractor 实例
def __init__(self, model_name: str = "deepseek-chat"):
# 初始化 ChatDeepSeek LLM 实例
self.llm = ChatDeepSeek(model=model_name)
# 初始化用于解析 JSON 输出的解析器
self.parser = JsonOutputParser()
# 定义内部方法 _invoke,接收消息列表作为参数
def _invoke(self, messages: List[Any]) -> Any:
# 使用 ChatDeepSeek 的 LLM 和 JSON 解析器组成链,保证输出可被解析为 JSON
chain = self.llm | self.parser
# 调用链的 invoke 方法,并传入消息列表,获取模型输出
return chain.invoke(messages)
# 定义 extract_entities 方法,从传入文本中提取实体
def extract_entities(self, text: str) -> List[Dict[str, Any]]:
"""从文本中提取实体列表。"""
# 构造用于实体抽取的 Prompt
prompt = f"""
请从以下文本中提取所有实体,包括人物、组织、地点、概念等。
返回 JSON 格式,每个实体包含 "name"(名称)和 "type"(类型)字段。
文本:
{text}
返回格式:
[
{{"name": "实体名称", "type": "实体类型"}},
...
]
"""
# 构造消息列表,系统消息设定角色,人类消息为抽取指令
messages = [
SystemMessage(content="你是一个专业的实体提取助手。"),
HumanMessage(content=prompt),
]
try:
# 调用内部方法与大模型交互获取抽取结果
result = self._invoke(messages)
# 若返回结果为列表,则直接返回
if isinstance(result, list):
return result
# 若返回结果为字典,且包含 entities 字段,则返回 entities
if isinstance(result, dict):
return result.get("entities", []) if "entities" in result else []
except Exception:
# 捕获异常,失败时返回空列表
pass
# 默认返回空列表
return []
# 定义 extract_relations 方法,用于从文本和实体列表中提取关系
+ def extract_relations(self, text: str, entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# 方法说明:从文本和已知实体列表中抽取实体之间的关系列表
+ """从文本和实体列表中提取关系列表。"""
# 将所有实体以 "名称(类型)" 的格式拼接成字符串
+ entities_str = ", ".join([f"{e['name']}({e['type']})" for e in entities])
# 构造面向关系抽取的大模型 Prompt
+ prompt = f"""
+ 请从以下文本中提取实体之间的关系。
+ 文本:
+ {text}
+ 实体列表:
+ {entities_str}
+ 返回 JSON 格式,每个关系包含 "source"(源实体)、"target"(目标实体)和 "type"(关系类型)字段。
+ 返回格式:
+ [
+ {{"source": "源实体名称", "target": "目标实体名称", "type": "关系类型"}},
+ ...
+ ]
+ """
# 构建消息列表,包括系统消息和人类消息
+ messages = [
+ SystemMessage(content="你是一个专业的关系提取助手。"),
+ HumanMessage(content=prompt),
+ ]
+ try:
# 调用内部 _invoke 方法,与大模型交互获取关系抽取结果
+ result = self._invoke(messages)
# 如果结果类型为列表,则直接返回
+ if isinstance(result, list):
+ return result
# 如果结果为字典且包含 relations 字段,返回 relations 的值
+ if isinstance(result, dict):
+ return result.get("relations", []) if "relations" in result else []
+ except Exception:
# 如有异常,忽略并返回空列表
+ pass
# 默认返回空列表,表示未抽取到关系
+ return [] 7.3. pipeline.py #
pipeline.py
"""
2. GraphRAG 业务管线。
"""
from config import load_config
from graph_store import GraphStore
from llm_client import LLMExtractor
class GraphRAG:
"""1. 封装检索与生成的完整 GraphRAG 系统。"""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
# 初始化图数据库连接
self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
# 初始化实体抽取器
self.extractor = LLMExtractor(model_name=model_name)
def close(self):
"""关闭数据库连接。"""
self.store.close()
def build_knowledge_graph(self, documents):
# 定义方法:从文档列表中构建知识图谱
"""从文档列表构建知识图谱。"""
# 遍历每一个输入文档
for doc in documents:
# 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
entities = self.extractor.extract_entities(doc)
# 将实体写入(或更新)到图数据库
+ self.store.upsert_entities(entities)
# 调用 extractor 的 extract_relations 方法抽取当前文档中的实体关系等结构
+ relations = self.extractor.extract_relations(doc, entities)
# 将关系写入(或更新)到图数据库
+ self.store.upsert_relations(relations, entities)
def build_rag_from_env() -> GraphRAG:
"""2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
cfg = load_config()
return GraphRAG(
neo4j_uri=cfg["neo4j_uri"],
neo4j_user=cfg["neo4j_user"],
neo4j_password=cfg["neo4j_password"],
deepseek_api_key=cfg["deepseek_api_key"],
model_name=cfg["model_name"],
)
8. 子图提取 #
8.1. graph_store.py #
graph_store.py
# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""
from neo4j import GraphDatabase
# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
# 封装 Neo4j 的节点与关系操作。
"""封装 Neo4j 的节点与关系操作。"""
# 构造函数,初始化数据库连接
def __init__(self, uri: str, user: str, password: str):
# 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
self.driver = GraphDatabase.driver(uri, auth=(user, password))
# 关闭数据库连接的方法
def close(self):
# 关闭数据库驱动,释放资源
"""关闭数据库连接。"""
self.driver.close()
# 静态方法,用于转义 Neo4j 标签中的反引号(label可能用于节点或关系类型)
@staticmethod
# 定义 _escape_label 方法,接收一个字符串 label,返回转义后的字符串
def _escape_label(label: str) -> str:
# 如果 label 为空,则返回默认实体类型 "Entity"
if not label:
return "Entity"
# 将 label 中的所有反引号替换为两个反引号,防止 Cypher 注入问题
return str(label).replace("`", "``")
# 批量插入或合并实体节点的方法
def upsert_entities(self, entities):
# 批量插入/合并实体。
"""批量插入/合并实体。"""
# 创建一个新的数据库会话
with self.driver.session() as session:
# 遍历每一个实体
for entity in entities:
# 跳过不是字典或缺少必要字段的实体
if not isinstance(entity, dict) or "name" not in entity or "type" not in entity:
continue
# 对类型标签进行转义,防止 Cypher 注入错误
escaped_type = self._escape_label(entity["type"])
# 构造 Cypher 查询语句,用于根据 name 和 type 合并实体节点
query = f"""
MERGE (e:`{escaped_type}` {{name: $name}})
RETURN e
"""
# 执行合并节点的查询,将实体名称传入参数
session.run(query, name=entity["name"])
# 定义 upsert_relations 方法,用于批量插入/合并关系
def upsert_relations(self, relations, entities):
# 方法说明:批量插入/合并关系
"""批量插入/合并关系。"""
# 创建 Neo4j 数据库会话
with self.driver.session() as session:
# 遍历每一个关系字典
for relation in relations:
# 检查关系必须是字典,并且包含 "source"、"target" 和 "type" 这三个字段
if not isinstance(relation, dict) or not all(k in relation for k in ("source", "target", "type")):
continue
# 在实体列表中查找与该关系的 source 匹配的实体
source_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["source"]), None)
# 在实体列表中查找与该关系的 target 匹配的实体
target_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["target"]), None)
# 如果找不到对应的实体则跳过本次循环
if not (source_entity and target_entity):
continue
# 对源实体类型进行转义,防止 Cypher 注入
escaped_source_type = self._escape_label(source_entity["type"])
# 对目标实体类型进行转义,防止 Cypher 注入
escaped_target_type = self._escape_label(target_entity["type"])
# 对关系类型进行转义,防止 Cypher 注入
escaped_relation_type = self._escape_label(relation["type"])
# 构建 Cypher 查询语句,实现实体节点的匹配与关系的插入或合并
query = f"""
MATCH (s:`{escaped_source_type}` {{name: $source_name}})
MATCH (t:`{escaped_target_type}` {{name: $target_name}})
MERGE (s)-[r:`{escaped_relation_type}`]->(t)
RETURN r
"""
# 执行 Cypher 查询并传递参数,实现关系的插入/合并
session.run(
query,
source_name=relation["source"],
target_name=relation["target"],
+ )
# 定义方法:根据实体名称列表检索子图
+ def retrieve_subgraph(self, entity_names, max_depth: int = 2):
# 方法说明:按实体名称检索子图,返回节点集合与关系列表
+ """按实体名称检索子图,返回节点集合与关系列表。"""
# 如果输入实体名称为空,直接返回空集合和空列表
+ if not entity_names:
+ return set(), []
# 构造 Cypher 查询语句,查找起点为输入实体名称,限定路径深度的所有连接路径
+ query_cypher = f"""
+ MATCH path = (start)-[*1..{max_depth}]-(connected)
+ WHERE start.name IN $entity_names
+ RETURN DISTINCT
+ start.name AS start_name,
+ connected.name AS connected_name,
+ relationships(path) AS relations,
+ length(path) AS depth
+ ORDER BY depth
+ LIMIT 50
+ """
# 初始化节点名集合
+ nodes = set()
# 初始化关系字典列表
+ relations_list = []
# 创建数据库会话
+ with self.driver.session() as session:
# 执行 Cypher 查询,传入实体名称参数
+ result = session.run(query_cypher, entity_names=entity_names)
# 遍历查询结果
+ for record in result:
# 获取起始节点名称
+ start_name = record.get("start_name")
# 获取连接节点名称
+ connected_name = record.get("connected_name")
# 将起始节点名称加入节点集合
+ if start_name:
+ nodes.add(start_name)
# 将连接节点名称加入节点集合
+ if connected_name:
+ nodes.add(connected_name)
# 获取路径上的所有关系
+ rels = record.get("relations", [])
# 如有关系对象,处理第一个关系对象
+ if rels and len(rels) > 0:
+ rel = rels[0]
# 将关系信息添加到关系列表
+ relations_list.append(
+ {
+ "source": start_name or "Unknown", # 关系源节点
+ "target": connected_name or "Unknown", # 关系目标节点
+ "type": rel.type if hasattr(rel, "type") else "RELATED", # 关系类型
+ "depth": record.get("depth", 0), # 路径深度
+ }
+ )
# 返回节点名集合与关系列表
+ return nodes, relations_list
8.2. main.py #
main.py
# 1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""
1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""
# 从 pipeline 模块导入 build_rag_from_env 函数
from pipeline import build_rag_from_env
# 定义主函数
def main():
# 调用 build_rag_from_env 函数,启动 GraphRAG 工作流
graphrag = build_rag_from_env()
try:
# 定义一个包含示例文档的列表(只有一个字符串,内容涉及微软、谷歌、OpenAI等公司)
documents = [
"""
微软是一家美国科技公司,由比尔·盖茨和保罗·艾伦于1975年创立。
微软在人工智能领域与谷歌、OpenAI等公司存在竞争关系。
微软开发了 Azure 云平台,谷歌开发了 Google Cloud。
"""
]
# 打印当前进入知识图谱构建阶段的信息
print("阶段1:构建知识图谱")
# 调用 graphrag 的 build_knowledge_graph 方法,传入示例文档,进行知识图谱的构建
graphrag.build_knowledge_graph(documents)
+ print("\n阶段2:查询知识图谱")
+ query = "微软在AI领域的主要竞争对手有哪些?"
+ print(f"问题:{query}")
+ answer = graphrag.query(query)
+ print("\n回答:")
+ print(answer)
except Exception as e:
print(f"发生错误: {e}")
finally:
graphrag.close()
# 判断是否以主模块运行本文件
if __name__ == "__main__":
# 运行主函数
main()
8.3. pipeline.py #
pipeline.py
"""
2. GraphRAG 业务管线。
"""
from config import load_config
from graph_store import GraphStore
from llm_client import LLMExtractor
class GraphRAG:
"""1. 封装检索与生成的完整 GraphRAG 系统。"""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
# 初始化图数据库连接
self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
# 初始化实体抽取器
self.extractor = LLMExtractor(model_name=model_name)
def close(self):
"""关闭数据库连接。"""
self.store.close()
def build_knowledge_graph(self, documents):
# 定义方法:从文档列表中构建知识图谱
"""从文档列表构建知识图谱。"""
# 遍历每一个输入文档
for doc in documents:
# 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
entities = self.extractor.extract_entities(doc)
# 将实体写入(或更新)到图数据库
self.store.upsert_entities(entities)
# 调用 extractor 的 extract_relations 方法抽取当前文档中的实体关系等结构
relations = self.extractor.extract_relations(doc, entities)
# 将关系写入(或更新)到图数据库
self.store.upsert_relations(relations, entities)
+ def query(self, user_query: str, max_depth: int = 2) -> str:
+ """检索 + 生成组合。"""
# 检索知识图谱上下文
+ context = self.retrieve(user_query, max_depth)
+ print('context:', context)
# ---------- 检索阶段 ----------
# 定义 retrieve 方法,用于检索知识图谱上下文
+ def retrieve(self, query: str, max_depth: int = 2) -> str:
+ """检索知识图谱上下文。"""
# 调用实体抽取器从查询中抽取实体
+ entities = self.extractor.extract_entities(query)
# 从抽取的实体中提取实体名称列表
+ entity_names = [e["name"] for e in entities if isinstance(e, dict) and e.get("name")]
# 如果没有抽取到任何实体,返回提示信息
+ if not entity_names:
+ return "未找到相关实体"
# 使用实体名称和最大深度从图数据库检索子图,获取实体节点和关系列表
+ nodes, relations_list = self.store.retrieve_subgraph(entity_names, max_depth)
# 构建上下文信息的组成部分列表
+ context_parts = ["知识图谱信息:\n"]
# 添加相关实体的信息
+ context_parts.append("相关实体:")
# 遍历所有实体节点并添加到上下文信息
+ for node in nodes:
+ context_parts.append(f" - {node}")
# 添加实体关系部分头部
+ context_parts.append("\n实体关系:")
# 遍历(最多前20条)关系信息并格式化添加到上下文
+ for rel in relations_list[:20]:
+ context_parts.append(f" {rel['source']} --[{rel['type']}]--> {rel['target']}")
# 将所有上下文信息组成字符串并返回
+ return "\n".join(context_parts)
def build_rag_from_env() -> GraphRAG:
"""2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
cfg = load_config()
return GraphRAG(
neo4j_uri=cfg["neo4j_uri"],
neo4j_user=cfg["neo4j_user"],
neo4j_password=cfg["neo4j_password"],
deepseek_api_key=cfg["deepseek_api_key"],
model_name=cfg["model_name"],
)
9. LLM生成回答 #
9.1. llm_client.py #
llm_client.py
"""
LLM 客户端与提取逻辑。
"""
# 导入类型提示相关模块
from typing import List, Dict, Any
# 导入 DeepSeek 的聊天模型
from langchain_deepseek import ChatDeepSeek
# 导入消息类型,用于与模型交互
from langchain_core.messages import HumanMessage, SystemMessage
# 导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser
# 定义 LLMExtractor 类,负责实体和关系提取
class LLMExtractor:
"""负责实体和关系提取的封装。"""
# 构造方法,初始化 LLMExtractor 实例
def __init__(self, model_name: str = "deepseek-chat"):
# 初始化 ChatDeepSeek LLM 实例
self.llm = ChatDeepSeek(model=model_name)
# 初始化用于解析 JSON 输出的解析器
self.parser = JsonOutputParser()
# 定义内部方法 _invoke,接收消息列表作为参数
def _invoke(self, messages: List[Any]) -> Any:
# 使用 ChatDeepSeek 的 LLM 和 JSON 解析器组成链,保证输出可被解析为 JSON
chain = self.llm | self.parser
# 调用链的 invoke 方法,并传入消息列表,获取模型输出
return chain.invoke(messages)
# 定义 extract_entities 方法,从传入文本中提取实体
def extract_entities(self, text: str) -> List[Dict[str, Any]]:
"""从文本中提取实体列表。"""
# 构造用于实体抽取的 Prompt
prompt = f"""
请从以下文本中提取所有实体,包括人物、组织、地点、概念等。
返回 JSON 格式,每个实体包含 "name"(名称)和 "type"(类型)字段。
文本:
{text}
返回格式:
[
{{"name": "实体名称", "type": "实体类型"}},
...
]
"""
# 构造消息列表,系统消息设定角色,人类消息为抽取指令
messages = [
SystemMessage(content="你是一个专业的实体提取助手。"),
HumanMessage(content=prompt),
]
try:
# 调用内部方法与大模型交互获取抽取结果
result = self._invoke(messages)
# 若返回结果为列表,则直接返回
if isinstance(result, list):
return result
# 若返回结果为字典,且包含 entities 字段,则返回 entities
if isinstance(result, dict):
return result.get("entities", []) if "entities" in result else []
except Exception:
# 捕获异常,失败时返回空列表
pass
# 默认返回空列表
return []
# 定义 extract_relations 方法,用于从文本和实体列表中提取关系
def extract_relations(self, text: str, entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# 方法说明:从文本和已知实体列表中抽取实体之间的关系列表
"""从文本和实体列表中提取关系列表。"""
# 将所有实体以 "名称(类型)" 的格式拼接成字符串
entities_str = ", ".join([f"{e['name']}({e['type']})" for e in entities])
# 构造面向关系抽取的大模型 Prompt
prompt = f"""
请从以下文本中提取实体之间的关系。
文本:
{text}
实体列表:
{entities_str}
返回 JSON 格式,每个关系包含 "source"(源实体)、"target"(目标实体)和 "type"(关系类型)字段。
返回格式:
[
{{"source": "源实体名称", "target": "目标实体名称", "type": "关系类型"}},
...
]
"""
# 构建消息列表,包括系统消息和人类消息
messages = [
SystemMessage(content="你是一个专业的关系提取助手。"),
HumanMessage(content=prompt),
]
try:
# 调用内部 _invoke 方法,与大模型交互获取关系抽取结果
result = self._invoke(messages)
# 如果结果类型为列表,则直接返回
if isinstance(result, list):
return result
# 如果结果为字典且包含 relations 字段,返回 relations 的值
if isinstance(result, dict):
return result.get("relations", []) if "relations" in result else []
except Exception:
# 如有异常,忽略并返回空列表
pass
# 默认返回空列表,表示未抽取到关系
return []
+def build_generator(model_name: str = "deepseek-chat") -> ChatDeepSeek:
# 方法说明:用于答案生成的 LLM。
+ """用于答案生成的 LLM。"""
# 返回 ChatDeepSeek 实例,传入模型名称
+ return ChatDeepSeek(model=model_name) 9.2. pipeline.py #
pipeline.py
"""
2. GraphRAG 业务管线。
"""
+from langchain_core.messages import HumanMessage, SystemMessage
from config import load_config
from graph_store import GraphStore
+from llm_client import LLMExtractor,build_generator
class GraphRAG:
"""1. 封装检索与生成的完整 GraphRAG 系统。"""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
# 初始化图数据库连接
self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
# 初始化实体抽取器
self.extractor = LLMExtractor(model_name=model_name)
# 初始化生成器
+ self.generator_llm = build_generator(model_name=model_name)
def close(self):
"""关闭数据库连接。"""
self.store.close()
def build_knowledge_graph(self, documents):
# 定义方法:从文档列表中构建知识图谱
"""从文档列表构建知识图谱。"""
# 遍历每一个输入文档
for doc in documents:
# 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
entities = self.extractor.extract_entities(doc)
# 将实体写入(或更新)到图数据库
self.store.upsert_entities(entities)
# 调用 extractor 的 extract_relations 方法抽取当前文档中的实体关系等结构
relations = self.extractor.extract_relations(doc, entities)
# 将关系写入(或更新)到图数据库
self.store.upsert_relations(relations, entities)
# ---------- 生成阶段 ----------
# 定义 generate 方法,根据传入的查询和上下文生成回答
+ def generate(self, query: str, context: str) -> str:
# 方法注释:基于上下文生成回答。
+ """基于上下文生成回答。"""
# 构建 prompt,包含知识图谱信息和用户问题
+ prompt = f"""
+ 基于以下知识图谱信息回答问题。请提供准确、详细的回答。
+ 知识图谱信息:
+ {context}
+ 问题:
+ {query}
+ 请基于上述知识图谱信息回答问题。如果信息不足,请说明。
+ """
# 构造消息列表,包含系统消息和用户消息
+ messages = [
# 系统消息,设定助手的人设
+ SystemMessage(content="你是一个专业的问答助手,擅长基于知识图谱回答问题。"),
# 用户消息,实际传递 prompt
+ HumanMessage(content=prompt),
+ ]
# 调用 LLM(大语言模型)生成回答
+ response = self.generator_llm.invoke(messages)
# 返回生成的内容
+ return response.content
def query(self, user_query: str, max_depth: int = 2) -> str:
"""检索 + 生成组合。"""
# 检索知识图谱上下文
context = self.retrieve(user_query, max_depth)
# 基于上下文生成回答
+ return self.generate(user_query, context)
# ---------- 检索阶段 ----------
# 定义 retrieve 方法,用于检索知识图谱上下文
def retrieve(self, query: str, max_depth: int = 2) -> str:
"""检索知识图谱上下文。"""
# 调用实体抽取器从查询中抽取实体
entities = self.extractor.extract_entities(query)
# 从抽取的实体中提取实体名称列表
entity_names = [e["name"] for e in entities if isinstance(e, dict) and e.get("name")]
# 如果没有抽取到任何实体,返回提示信息
if not entity_names:
return "未找到相关实体"
# 使用实体名称和最大深度从图数据库检索子图,获取实体节点和关系列表
nodes, relations_list = self.store.retrieve_subgraph(entity_names, max_depth)
# 构建上下文信息的组成部分列表
context_parts = ["知识图谱信息:\n"]
# 添加相关实体的信息
context_parts.append("相关实体:")
# 遍历所有实体节点并添加到上下文信息
for node in nodes:
context_parts.append(f" - {node}")
# 添加实体关系部分头部
context_parts.append("\n实体关系:")
# 遍历(最多前20条)关系信息并格式化添加到上下文
for rel in relations_list[:20]:
context_parts.append(f" {rel['source']} --[{rel['type']}]--> {rel['target']}")
# 将所有上下文信息组成字符串并返回
return "\n".join(context_parts)
def build_rag_from_env() -> GraphRAG:
"""2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
cfg = load_config()
return GraphRAG(
neo4j_uri=cfg["neo4j_uri"],
neo4j_user=cfg["neo4j_user"],
neo4j_password=cfg["neo4j_password"],
deepseek_api_key=cfg["deepseek_api_key"],
model_name=cfg["model_name"],
)
10.执行流程 #
这是一个 GraphRAG(Graph-based Retrieval Augmented Generation) 系统,将知识图谱与大语言模型结合,实现基于图结构的问答系统。
10.1 类图 #
10.2 时序图 #
10.3 执行流程详细说明 #
Step 1: 初始化阶段
graphrag = build_rag_from_env()调用链路:
build_rag_from_env()→ 读取.env配置- 创建
GraphRAG实例,内部初始化:GraphStore: 连接 Neo4j 图数据库LLMExtractor: 初始化 DeepSeek LLM 用于实体/关系抽取generator_llm: 初始化生成器 LLM
Step 2: 构建知识图谱
print("阶段1:构建知识图谱")
# 调用 graphrag 的 build_knowledge_graph 方法,传入示例文档,进行知识图谱的构建
graphrag.build_knowledge_graph(documents)处理流程:
| 步骤 | 操作 | 说明 |
|---|---|---|
| 1 | extract_entities(doc) |
LLM 从文本提取实体(微软、比尔·盖茨、谷歌等) |
| 2 | upsert_entities() |
将实体作为节点写入 Neo4j |
| 3 | extract_relations(doc, entities) |
LLM 提取实体间关系(创立、竞争、开发等) |
| 4 | upsert_relations() |
将关系作为边写入 Neo4j |
示例数据转换:
文本: "微软是一家美国科技公司,由比尔·盖茨和保罗·艾伦于1975年创立..."
↓ 实体提取
[
{"name": "微软", "type": "组织"},
{"name": "比尔·盖茨", "type": "人物"},
{"name": "谷歌", "type": "组织"},
{"name": "OpenAI", "type": "组织"}
]
↓ 关系提取
[
{"source": "比尔·盖茨", "target": "微软", "type": "创立"},
{"source": "微软", "target": "谷歌", "type": "竞争"},
{"source": "微软", "target": "OpenAI", "type": "竞争"}
]Step 3: 查询知识图谱
print("\n阶段2:查询知识图谱")
query = "微软在AI领域的主要竞争对手有哪些?"
print(f"问题:{query}")
answer = graphrag.query(query)查询流程分为两个子阶段:
检索阶段 (Retrieve)
def retrieve(self, query: str, max_depth: int = 2) -> str:
"""检索知识图谱上下文。"""
# 调用实体抽取器从查询中抽取实体
entities = self.extractor.extract_entities(query)
# ... 从图数据库检索子图
nodes, relations_list = self.store.retrieve_subgraph(entity_names, max_depth)- 从用户问题中提取实体(如 "微软")
- 以这些实体为起点,在 Neo4j 中检索 2 跳以内的子图
- 构建上下文字符串
生成阶段 (Generate)
def generate(self, query: str, context: str) -> str:
prompt = f"""
基于以下知识图谱信息回答问题。请提供准确、详细的回答。
知识图谱信息:{context}
问题:{query}
"""
response = self.generator_llm.invoke(messages)
return response.content- 将检索到的图谱上下文 + 用户问题组成 Prompt
- 调用 LLM 生成最终回答
10.4 数据流图 #
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Documents │───▶│ LLMExtractor│───▶│ GraphStore │
│ (原始文本) │ │ (实体/关系 │ │ (Neo4j) │
└─────────────┘ │ 提取) │ └──────┬──────┘
└─────────────┘ │
│ 存储
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User Query │───▶│ Retrieve │◀───│ 知识图谱 │
│ (用户问题) │ │ (子图检索) │ │ (节点+边) │
└─────────────┘ └──────┬──────┘ └─────────────┘
│
│ context
▼
┌─────────────┐
│ Generate │───▶ Answer
│ (LLM生成) │ (最终回答)
└─────────────┘10.5 核心设计模式 #
| 模式 | 应用位置 | 说明 |
|---|---|---|
| 工厂模式 | build_rag_from_env() |
根据环境变量创建 GraphRAG 实例 |
| 外观模式 | GraphRAG 类 |
统一封装检索和生成的复杂流程 |
| 策略模式 | LLMExtractor |
可替换不同的 LLM 模型 |
| 适配器模式 | GraphStore |
封装 Neo4j 驱动的底层操作 |