导航菜单

  • 1.langchain.intro
  • 2.langchain.chat_models
  • 3.langchain.prompts
  • 4.langchain.example_selectors
  • 5.output_parsers
  • 6.Runnable
  • 7.memory
  • 8.document_loaders
  • 9.text_splitters
  • 10.embeddings
  • 11.tool
  • 12.retrievers
  • 13.optimize
  • 14.项目介绍
  • 15.启动HTTP
  • 16.数据与模型
  • 17.权限管理
  • 18.知识库管理
  • 19.设置
  • 20.文档管理
  • 21.聊天
  • 22.API文档
  • 23.RAG优化
  • 24.索引时优化
  • 25.检索前优化
  • 26.检索后优化
  • 27.系统优化
  • 28.GraphRAG
  • 29.图
  • 30.为什么选择图数据库
  • 31.什么是 Neo4j
  • 32.安装和连接 Neo4j
  • 33.Neo4j核心概念
  • 34.Cypher基础
  • 35.模式匹配
  • 36.数据CRUD操作
  • 37.Python操作Neo4j
  • 38.GraphRAG
  • 39.查询和过滤
  • 40.结果处理和聚合
  • 41.语句组合
  • 42.子查询
  • 43.模式和约束
  • 44.日期时间处理
  • 45.Cypher内置函数
  • 46.py2neo
  • 47.Streamlit
  • 48.Pandas
  • 49.graphRAG
  • 50.deepdoc
  • 51.deepdoc
  • 52.deepdoc
  • 53.deepdoc
  • 54.deepdoc
  • Pillow
  • 1. GraphRAG: 使用知识图谱增强的检索生成系统
  • 2. GraphRAG 架构
    • 2.1 整体架构
    • 2.2 知识图谱构建模块
    • 2.3 图增强检索模块
    • 2.4 推理与生成模块
  • 3. 环境配置
  • 4. 配置文件
    • 4.1. config.py
    • 4.2. pipeline.py
    • 4.3. main.py
    • 4.4 .env
  • 5. 连接Neo4j
    • 5.1. graph_store.py
    • 5.2. pipeline.py
  • 6. 实体提取
    • 6.1. llm_client.py
    • 6.2. graph_store.py
    • 6.3. main.py
    • 6.4. pipeline.py
  • 7. 关系识别
    • 7.1. graph_store.py
    • 7.2. llm_client.py
    • 7.3. pipeline.py
  • 8. 子图提取
    • 8.1. graph_store.py
    • 8.2. main.py
    • 8.3. pipeline.py
  • 9. LLM生成回答
    • 9.1. llm_client.py
    • 9.2. pipeline.py
  • 10.执行流程
    • 10.1 类图
    • 10.2 时序图
    • 10.3 执行流程详细说明
    • 10.4 数据流图
    • 10.5 核心设计模式

1. GraphRAG: 使用知识图谱增强的检索生成系统 #

GraphRAG 是一个结合知识图谱 (Knowledge Graph) 与 RAG(Retrieval-Augmented Generation, 检索增强生成)的智能问答/数据分析系统。它旨在提升复杂场景下的推理能力、可解释性和知识关联性。
通过整合 Neo4j 图数据库与大语言模型(如 DeepSeek),GraphRAG 支持结构化的知识检索、多跳推理和深度信息整合,极大弥补了传统 RAG 系统在长尾知识和推理链上的不足。

核心特点:

  • 将文本文档抽取为知识图谱(实体、关系、属性),存储于 Neo4j
  • 提供结构化子图检索与路径发现,支持多跳知识问答
  • 利用大模型在子图上下文基础上生成高质量、可追溯的答案
  • 适用于企业知识管理、学术检索、信息整合分析等多种场景

2. GraphRAG 架构 #

2.1 整体架构 #

GraphRAG 系统由三个核心模块组成:

  1. 知识图谱构建模块:将原始文档转换为结构化的知识图谱
  2. 图增强检索模块:基于图谱结构进行智能检索
  3. 推理与生成模块:结合图谱上下文生成回答

工作流程:

原始文档 → 实体提取 → 关系识别 → 知识图谱 → 图查询 → 子图提取 → 上下文构建 → LLM生成回答

2.2 知识图谱构建模块 #

知识图谱构建是 GraphRAG 的基础,它将非结构化的文档转换为结构化的知识图谱。

构建流程:

  1. 实体提取:识别文档中的人物、组织、概念等实体
  2. 关系识别:确定实体之间的关联(如"工作在"、"属于"、"导致")
  3. 图谱存储:将实体和关系存储到图数据库中

前置知识:实体和关系

  • 实体(Entity):现实世界中的对象,如"微软"、"AI"、"谷歌"
  • 关系(Relationship):实体之间的关联,如"竞争"、"开发"、"属于"
  • 属性(Property):实体的特征,如"微软"的"行业"="科技"

2.3 图增强检索模块 #

图增强检索是 GraphRAG 的核心,它利用图谱结构进行更智能的检索。

检索方式:

  • 子图检索:查找与查询相关的子图结构
  • 路径发现:寻找实体间的连接路径
  • 社区检测:识别相关的实体群组

2.4 推理与生成模块 #

推理与生成模块结合图谱上下文进行回答生成,利用 LLM 的强大能力生成自然语言回答。

3. 环境配置 #

uv init
uv add python-dotenv neo4j langchain_deepseek langchain
包名 作用简介
python-dotenv 加载 .env 文件中的环境变量,便于安全管理配置参数
neo4j Python 客户端,用于连接和操作 Neo4j 图数据库
langchain_deepseek DeepSeek 相关的 LangChain 集成,支持 AI/LLM 工作流
langchain 构建和编排大语言模型应用(如对话机器人、问答系统)

4. 配置文件 #

4.1. config.py #

config.py

"""
1. 配置加载模块

负责加载 .env 环境变量并提供统一的配置读取方法。
"""

import os
from dotenv import load_dotenv

def load_config() -> dict:
    load_dotenv()
    return {
        "neo4j_uri": os.getenv("NEO4J_URI", "bolt://localhost:7687"),
        "neo4j_user": os.getenv("NEO4J_USER", "neo4j"),
        "neo4j_password": os.getenv("NEO4J_PASSWORD", "12345678"),
        "deepseek_api_key": os.getenv("DEEPSEEK_API_KEY", "sk-924916a162544410964397598709766a"),
        "model_name": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
    }

4.2. pipeline.py #

pipeline.py

"""
2. GraphRAG 业务管线。
"""

from config import load_config

class GraphRAG:
    """1. 封装检索与生成的完整 GraphRAG 系统。"""

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
        print(neo4j_uri, neo4j_user, neo4j_password, deepseek_api_key, model_name)

def build_rag_from_env() -> GraphRAG:
    """2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
    cfg = load_config()
    return GraphRAG(
        neo4j_uri=cfg["neo4j_uri"],
        neo4j_user=cfg["neo4j_user"],
        neo4j_password=cfg["neo4j_password"],
        deepseek_api_key=cfg["deepseek_api_key"],
        model_name=cfg["model_name"],
    )

4.3. main.py #

main.py

"""
+1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""

# 从 pipeline 模块导入 build_rag_from_env 函数
from pipeline import build_rag_from_env

# 定义主函数
def main():
    # 调用 build_rag_from_env 函数,启动 GraphRAG 工作流
    build_rag_from_env()

# 判断是否以主模块运行本文件
if __name__ == "__main__":
    # 运行主函数
    main()

4.4 .env #

NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=12345678
DEEPSEEK_API_KEY=sk-924916a162544410964397598709766a
DEEPSEEK_MODEL=deepseek-chat

5. 连接Neo4j #

5.1. graph_store.py #

graph_store.py

# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""

# 导入 Neo4j 官方驱动
from neo4j import GraphDatabase

# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
    # 封装 Neo4j 的节点与关系操作。
    """封装 Neo4j 的节点与关系操作。"""

    # 构造函数,初始化数据库连接
    def __init__(self, uri: str, user: str, password: str):
        # 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    # 关闭数据库连接的方法
    def close(self):
        # 关闭数据库驱动,释放资源
        """关闭数据库连接。"""
        self.driver.close()

5.2. pipeline.py #

pipeline.py

"""
2. GraphRAG 业务管线。
"""

from config import load_config
+from graph_store import GraphStore
class GraphRAG:
    """1. 封装检索与生成的完整 GraphRAG 系统。"""

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
         # 初始化图数据库连接
+       self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
+   def close(self):
+       """关闭数据库连接。"""
+       self.store.close()    

def build_rag_from_env() -> GraphRAG:
    """2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
    cfg = load_config()
    return GraphRAG(
        neo4j_uri=cfg["neo4j_uri"],
        neo4j_user=cfg["neo4j_user"],
        neo4j_password=cfg["neo4j_password"],
        deepseek_api_key=cfg["deepseek_api_key"],
        model_name=cfg["model_name"],
    )

6. 实体提取 #

6.1. llm_client.py #

llm_client.py

"""
LLM 客户端与提取逻辑。
"""

# 导入类型提示相关模块
from typing import List, Dict, Any
# 导入 DeepSeek 的聊天模型
from langchain_deepseek import ChatDeepSeek
# 导入消息类型,用于与模型交互
from langchain_core.messages import HumanMessage, SystemMessage
# 导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser

# 定义 LLMExtractor 类,负责实体和关系提取
class LLMExtractor:
    """负责实体和关系提取的封装。"""

    # 构造方法,初始化 LLMExtractor 实例
    def __init__(self, model_name: str = "deepseek-chat"):
        # 初始化 ChatDeepSeek LLM 实例
        self.llm = ChatDeepSeek(model=model_name)
        # 初始化用于解析 JSON 输出的解析器
        self.parser = JsonOutputParser()
    # 定义内部方法 _invoke,接收消息列表作为参数
    def _invoke(self, messages: List[Any]) -> Any:
        # 使用 ChatDeepSeek 的 LLM 和 JSON 解析器组成链,保证输出可被解析为 JSON
        chain = self.llm | self.parser
        # 调用链的 invoke 方法,并传入消息列表,获取模型输出
        return chain.invoke(messages)
    # 定义 extract_entities 方法,从传入文本中提取实体
    def extract_entities(self, text: str) -> List[Dict[str, Any]]:
        """从文本中提取实体列表。"""
        # 构造用于实体抽取的 Prompt
        prompt = f"""
        请从以下文本中提取所有实体,包括人物、组织、地点、概念等。
        返回 JSON 格式,每个实体包含 "name"(名称)和 "type"(类型)字段。

        文本:
        {text}

        返回格式:
        [
            {{"name": "实体名称", "type": "实体类型"}},
            ...
        ]
        """
        # 构造消息列表,系统消息设定角色,人类消息为抽取指令
        messages = [
            SystemMessage(content="你是一个专业的实体提取助手。"),
            HumanMessage(content=prompt),
        ]
        try:
            # 调用内部方法与大模型交互获取抽取结果
            result = self._invoke(messages)
            # 若返回结果为列表,则直接返回
            if isinstance(result, list):
                return result
            # 若返回结果为字典,且包含 entities 字段,则返回 entities
            if isinstance(result, dict):
                return result.get("entities", []) if "entities" in result else []
        except Exception:
            # 捕获异常,失败时返回空列表
            pass
        # 默认返回空列表
        return []    

6.2. graph_store.py #

graph_store.py

# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""

# 导入 Neo4j 官方驱动
from neo4j import GraphDatabase

# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
    # 封装 Neo4j 的节点与关系操作。
    """封装 Neo4j 的节点与关系操作。"""

    # 构造函数,初始化数据库连接
    def __init__(self, uri: str, user: str, password: str):
        # 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    # 关闭数据库连接的方法
    def close(self):
        # 关闭数据库驱动,释放资源
        """关闭数据库连接。"""
        self.driver.close()
    # 静态方法,用于转义 Neo4j 标签中的反引号(label可能用于节点或关系类型)
+   @staticmethod
    # 定义 _escape_label 方法,接收一个字符串 label,返回转义后的字符串
+   def _escape_label(label: str) -> str:
        # 如果 label 为空,则返回默认实体类型 "Entity"
+       if not label:
+           return "Entity"
        # 将 label 中的所有反引号替换为两个反引号,防止 Cypher 注入问题
+       return str(label).replace("`", "``")

    # 批量插入或合并实体节点的方法
+   def upsert_entities(self, entities):
        # 批量插入/合并实体。
+       """批量插入/合并实体。"""
        # 创建一个新的数据库会话
+       with self.driver.session() as session:
            # 遍历每一个实体
+           for entity in entities:
                # 跳过不是字典或缺少必要字段的实体
+               if not isinstance(entity, dict) or "name" not in entity or "type" not in entity:
+                   continue
                # 对类型标签进行转义,防止 Cypher 注入错误
+               escaped_type = self._escape_label(entity["type"])
                # 构造 Cypher 查询语句,用于根据 name 和 type 合并实体节点
+               query = f"""
+               MERGE (e:`{escaped_type}` {{name: $name}})
+               RETURN e
+               """
                # 执行合并节点的查询,将实体名称传入参数
+               session.run(query, name=entity["name"])     

6.3. main.py #

main.py

# 1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""
1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""

# 从 pipeline 模块导入 build_rag_from_env 函数
from pipeline import build_rag_from_env

# 定义主函数
def main():
    # 调用 build_rag_from_env 函数,启动 GraphRAG 工作流
+  graphrag = build_rag_from_env()
+  try:
        # 定义一个包含示例文档的列表(只有一个字符串,内容涉及微软、谷歌、OpenAI等公司)
+       documents = [
+           """
+           微软是一家美国科技公司,由比尔·盖茨和保罗·艾伦于1975年创立。
+           微软在人工智能领域与谷歌、OpenAI等公司存在竞争关系。
+           微软开发了 Azure 云平台,谷歌开发了 Google Cloud。
+           """
+       ]
        # 打印当前进入知识图谱构建阶段的信息
+       print("阶段1:构建知识图谱")
        # 调用 graphrag 的 build_knowledge_graph 方法,传入示例文档,进行知识图谱的构建
+       graphrag.build_knowledge_graph(documents)
+  except Exception as e:
+       print(f"发生错误: {e}")
+  finally:
+       graphrag.close()

# 判断是否以主模块运行本文件
if __name__ == "__main__":
    # 运行主函数
    main()

6.4. pipeline.py #

pipeline.py

"""
2. GraphRAG 业务管线。
"""

from config import load_config
from graph_store import GraphStore
+from llm_client import LLMExtractor
class GraphRAG:
    """1. 封装检索与生成的完整 GraphRAG 系统。"""

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
         # 初始化图数据库连接
        self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
        # 初始化实体抽取器
+       self.extractor = LLMExtractor(model_name=model_name)
    def close(self):
        """关闭数据库连接。"""
+       self.store.close()
+   def build_knowledge_graph(self, documents):
        # 定义方法:从文档列表中构建知识图谱
+       """从文档列表构建知识图谱。"""
        # 遍历每一个输入文档
+       for doc in documents:
            # 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
+           entities = self.extractor.extract_entities(doc)  
            # 将实体写入(或更新)到图数据库
+           self.store.upsert_entities(entities)     

def build_rag_from_env() -> GraphRAG:
    """2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
    cfg = load_config()
    return GraphRAG(
        neo4j_uri=cfg["neo4j_uri"],
        neo4j_user=cfg["neo4j_user"],
        neo4j_password=cfg["neo4j_password"],
        deepseek_api_key=cfg["deepseek_api_key"],
        model_name=cfg["model_name"],
    )

7. 关系识别 #

7.1. graph_store.py #

graph_store.py

# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""

# 导入 Neo4j 官方驱动
from neo4j import GraphDatabase

# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
    # 封装 Neo4j 的节点与关系操作。
    """封装 Neo4j 的节点与关系操作。"""

    # 构造函数,初始化数据库连接
    def __init__(self, uri: str, user: str, password: str):
        # 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    # 关闭数据库连接的方法
    def close(self):
        # 关闭数据库驱动,释放资源
        """关闭数据库连接。"""
        self.driver.close()
    # 静态方法,用于转义 Neo4j 标签中的反引号(label可能用于节点或关系类型)
    @staticmethod
    # 定义 _escape_label 方法,接收一个字符串 label,返回转义后的字符串
    def _escape_label(label: str) -> str:
        # 如果 label 为空,则返回默认实体类型 "Entity"
        if not label:
            return "Entity"
        # 将 label 中的所有反引号替换为两个反引号,防止 Cypher 注入问题
        return str(label).replace("`", "``")

    # 批量插入或合并实体节点的方法
    def upsert_entities(self, entities):
        # 批量插入/合并实体。
        """批量插入/合并实体。"""
        # 创建一个新的数据库会话
        with self.driver.session() as session:
            # 遍历每一个实体
            for entity in entities:
                # 跳过不是字典或缺少必要字段的实体
                if not isinstance(entity, dict) or "name" not in entity or "type" not in entity:
                    continue
                # 对类型标签进行转义,防止 Cypher 注入错误
                escaped_type = self._escape_label(entity["type"])
                # 构造 Cypher 查询语句,用于根据 name 和 type 合并实体节点
                query = f"""
                MERGE (e:`{escaped_type}` {{name: $name}})
                RETURN e
                """
                # 执行合并节点的查询,将实体名称传入参数
+               session.run(query, name=entity["name"])  
     # 定义 upsert_relations 方法,用于批量插入/合并关系
+   def upsert_relations(self, relations, entities):
        # 方法说明:批量插入/合并关系
+       """批量插入/合并关系。"""
        # 创建 Neo4j 数据库会话
+       with self.driver.session() as session:
            # 遍历每一个关系字典
+           for relation in relations:
                # 检查关系必须是字典,并且包含 "source"、"target" 和 "type" 这三个字段
+               if not isinstance(relation, dict) or not all(k in relation for k in ("source", "target", "type")):
+                   continue
                # 在实体列表中查找与该关系的 source 匹配的实体
+               source_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["source"]), None)
                # 在实体列表中查找与该关系的 target 匹配的实体
+               target_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["target"]), None)
                # 如果找不到对应的实体则跳过本次循环
+               if not (source_entity and target_entity):
+                   continue
                # 对源实体类型进行转义,防止 Cypher 注入
+               escaped_source_type = self._escape_label(source_entity["type"])
                # 对目标实体类型进行转义,防止 Cypher 注入
+               escaped_target_type = self._escape_label(target_entity["type"])
                # 对关系类型进行转义,防止 Cypher 注入
+               escaped_relation_type = self._escape_label(relation["type"])
                # 构建 Cypher 查询语句,实现实体节点的匹配与关系的插入或合并
+               query = f"""
+               MATCH (s:`{escaped_source_type}` {{name: $source_name}})
+               MATCH (t:`{escaped_target_type}` {{name: $target_name}})
+               MERGE (s)-[r:`{escaped_relation_type}`]->(t)
+               RETURN r
+               """
                # 执行 Cypher 查询并传递参数,实现关系的插入/合并
+               session.run(
+                   query,
+                   source_name=relation["source"],
+                   target_name=relation["target"],
+               )                        

7.2. llm_client.py #

llm_client.py

"""
LLM 客户端与提取逻辑。
"""

# 导入类型提示相关模块
from typing import List, Dict, Any
# 导入 DeepSeek 的聊天模型
from langchain_deepseek import ChatDeepSeek
# 导入消息类型,用于与模型交互
from langchain_core.messages import HumanMessage, SystemMessage
# 导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser

# 定义 LLMExtractor 类,负责实体和关系提取
class LLMExtractor:
    """负责实体和关系提取的封装。"""

    # 构造方法,初始化 LLMExtractor 实例
    def __init__(self, model_name: str = "deepseek-chat"):
        # 初始化 ChatDeepSeek LLM 实例
        self.llm = ChatDeepSeek(model=model_name)
        # 初始化用于解析 JSON 输出的解析器
        self.parser = JsonOutputParser()
    # 定义内部方法 _invoke,接收消息列表作为参数
    def _invoke(self, messages: List[Any]) -> Any:
        # 使用 ChatDeepSeek 的 LLM 和 JSON 解析器组成链,保证输出可被解析为 JSON
        chain = self.llm | self.parser
        # 调用链的 invoke 方法,并传入消息列表,获取模型输出
        return chain.invoke(messages)
    # 定义 extract_entities 方法,从传入文本中提取实体
    def extract_entities(self, text: str) -> List[Dict[str, Any]]:
        """从文本中提取实体列表。"""
        # 构造用于实体抽取的 Prompt
        prompt = f"""
        请从以下文本中提取所有实体,包括人物、组织、地点、概念等。
        返回 JSON 格式,每个实体包含 "name"(名称)和 "type"(类型)字段。

        文本:
        {text}

        返回格式:
        [
            {{"name": "实体名称", "type": "实体类型"}},
            ...
        ]
        """
        # 构造消息列表,系统消息设定角色,人类消息为抽取指令
        messages = [
            SystemMessage(content="你是一个专业的实体提取助手。"),
            HumanMessage(content=prompt),
        ]
        try:
            # 调用内部方法与大模型交互获取抽取结果
            result = self._invoke(messages)
            # 若返回结果为列表,则直接返回
            if isinstance(result, list):
                return result
            # 若返回结果为字典,且包含 entities 字段,则返回 entities
            if isinstance(result, dict):
                return result.get("entities", []) if "entities" in result else []
        except Exception:
            # 捕获异常,失败时返回空列表
            pass
        # 默认返回空列表
        return []    
    # 定义 extract_relations 方法,用于从文本和实体列表中提取关系
+   def extract_relations(self, text: str, entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        # 方法说明:从文本和已知实体列表中抽取实体之间的关系列表
+       """从文本和实体列表中提取关系列表。"""
        # 将所有实体以 "名称(类型)" 的格式拼接成字符串
+       entities_str = ", ".join([f"{e['name']}({e['type']})" for e in entities])
        # 构造面向关系抽取的大模型 Prompt
+       prompt = f"""
+       请从以下文本中提取实体之间的关系。

+       文本:
+       {text}

+       实体列表:
+       {entities_str}

+       返回 JSON 格式,每个关系包含 "source"(源实体)、"target"(目标实体)和 "type"(关系类型)字段。

+       返回格式:
+       [
+           {{"source": "源实体名称", "target": "目标实体名称", "type": "关系类型"}},
+           ...
+       ]
+       """
        # 构建消息列表,包括系统消息和人类消息
+       messages = [
+           SystemMessage(content="你是一个专业的关系提取助手。"),
+           HumanMessage(content=prompt),
+       ]
+       try:
            # 调用内部 _invoke 方法,与大模型交互获取关系抽取结果
+           result = self._invoke(messages)
            # 如果结果类型为列表,则直接返回
+           if isinstance(result, list):
+               return result
            # 如果结果为字典且包含 relations 字段,返回 relations 的值
+           if isinstance(result, dict):
+               return result.get("relations", []) if "relations" in result else []
+       except Exception:
            # 如有异常,忽略并返回空列表
+           pass
        # 默认返回空列表,表示未抽取到关系
+       return []      

7.3. pipeline.py #

pipeline.py

"""
2. GraphRAG 业务管线。
"""

from config import load_config
from graph_store import GraphStore
from llm_client import LLMExtractor
class GraphRAG:
    """1. 封装检索与生成的完整 GraphRAG 系统。"""

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
         # 初始化图数据库连接
        self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
        # 初始化实体抽取器
        self.extractor = LLMExtractor(model_name=model_name)
    def close(self):
        """关闭数据库连接。"""
        self.store.close()
    def build_knowledge_graph(self, documents):
        # 定义方法:从文档列表中构建知识图谱
        """从文档列表构建知识图谱。"""
        # 遍历每一个输入文档
        for doc in documents:
            # 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
            entities = self.extractor.extract_entities(doc)  
            # 将实体写入(或更新)到图数据库
+           self.store.upsert_entities(entities) 
            # 调用 extractor 的 extract_relations 方法抽取当前文档中的实体关系等结构
+           relations = self.extractor.extract_relations(doc, entities)
            # 将关系写入(或更新)到图数据库
+           self.store.upsert_relations(relations, entities)   

def build_rag_from_env() -> GraphRAG:
    """2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
    cfg = load_config()
    return GraphRAG(
        neo4j_uri=cfg["neo4j_uri"],
        neo4j_user=cfg["neo4j_user"],
        neo4j_password=cfg["neo4j_password"],
        deepseek_api_key=cfg["deepseek_api_key"],
        model_name=cfg["model_name"],
    )

8. 子图提取 #

8.1. graph_store.py #

graph_store.py

# Neo4j 读写与检索逻辑。
"""
Neo4j 读写与检索逻辑。
"""

from neo4j import GraphDatabase

# 定义 GraphStore 类,用于封装 Neo4j 的节点与关系操作
class GraphStore:
    # 封装 Neo4j 的节点与关系操作。
    """封装 Neo4j 的节点与关系操作。"""

    # 构造函数,初始化数据库连接
    def __init__(self, uri: str, user: str, password: str):
        # 使用传入的 uri、用户名和密码创建 Neo4j 数据库驱动
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    # 关闭数据库连接的方法
    def close(self):
        # 关闭数据库驱动,释放资源
        """关闭数据库连接。"""
        self.driver.close()
    # 静态方法,用于转义 Neo4j 标签中的反引号(label可能用于节点或关系类型)
    @staticmethod
    # 定义 _escape_label 方法,接收一个字符串 label,返回转义后的字符串
    def _escape_label(label: str) -> str:
        # 如果 label 为空,则返回默认实体类型 "Entity"
        if not label:
            return "Entity"
        # 将 label 中的所有反引号替换为两个反引号,防止 Cypher 注入问题
        return str(label).replace("`", "``")

    # 批量插入或合并实体节点的方法
    def upsert_entities(self, entities):
        # 批量插入/合并实体。
        """批量插入/合并实体。"""
        # 创建一个新的数据库会话
        with self.driver.session() as session:
            # 遍历每一个实体
            for entity in entities:
                # 跳过不是字典或缺少必要字段的实体
                if not isinstance(entity, dict) or "name" not in entity or "type" not in entity:
                    continue
                # 对类型标签进行转义,防止 Cypher 注入错误
                escaped_type = self._escape_label(entity["type"])
                # 构造 Cypher 查询语句,用于根据 name 和 type 合并实体节点
                query = f"""
                MERGE (e:`{escaped_type}` {{name: $name}})
                RETURN e
                """
                # 执行合并节点的查询,将实体名称传入参数
                session.run(query, name=entity["name"])  
     # 定义 upsert_relations 方法,用于批量插入/合并关系
    def upsert_relations(self, relations, entities):
        # 方法说明:批量插入/合并关系
        """批量插入/合并关系。"""
        # 创建 Neo4j 数据库会话
        with self.driver.session() as session:
            # 遍历每一个关系字典
            for relation in relations:
                # 检查关系必须是字典,并且包含 "source"、"target" 和 "type" 这三个字段
                if not isinstance(relation, dict) or not all(k in relation for k in ("source", "target", "type")):
                    continue
                # 在实体列表中查找与该关系的 source 匹配的实体
                source_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["source"]), None)
                # 在实体列表中查找与该关系的 target 匹配的实体
                target_entity = next((e for e in entities if isinstance(e, dict) and e.get("name") == relation["target"]), None)
                # 如果找不到对应的实体则跳过本次循环
                if not (source_entity and target_entity):
                    continue
                # 对源实体类型进行转义,防止 Cypher 注入
                escaped_source_type = self._escape_label(source_entity["type"])
                # 对目标实体类型进行转义,防止 Cypher 注入
                escaped_target_type = self._escape_label(target_entity["type"])
                # 对关系类型进行转义,防止 Cypher 注入
                escaped_relation_type = self._escape_label(relation["type"])
                # 构建 Cypher 查询语句,实现实体节点的匹配与关系的插入或合并
                query = f"""
                MATCH (s:`{escaped_source_type}` {{name: $source_name}})
                MATCH (t:`{escaped_target_type}` {{name: $target_name}})
                MERGE (s)-[r:`{escaped_relation_type}`]->(t)
                RETURN r
                """
                # 执行 Cypher 查询并传递参数,实现关系的插入/合并
                session.run(
                    query,
                    source_name=relation["source"],
                    target_name=relation["target"],
+               )        
     # 定义方法:根据实体名称列表检索子图
+   def retrieve_subgraph(self, entity_names, max_depth: int = 2):
        # 方法说明:按实体名称检索子图,返回节点集合与关系列表
+       """按实体名称检索子图,返回节点集合与关系列表。"""
        # 如果输入实体名称为空,直接返回空集合和空列表
+       if not entity_names:
+           return set(), []

        # 构造 Cypher 查询语句,查找起点为输入实体名称,限定路径深度的所有连接路径
+       query_cypher = f"""
+       MATCH path = (start)-[*1..{max_depth}]-(connected)
+       WHERE start.name IN $entity_names
+       RETURN DISTINCT
+           start.name AS start_name,
+           connected.name AS connected_name,
+           relationships(path) AS relations,
+           length(path) AS depth
+       ORDER BY depth
+       LIMIT 50
+       """

        # 初始化节点名集合
+       nodes = set()
        # 初始化关系字典列表
+       relations_list = []

        # 创建数据库会话
+       with self.driver.session() as session:
            # 执行 Cypher 查询,传入实体名称参数
+           result = session.run(query_cypher, entity_names=entity_names)
            # 遍历查询结果
+           for record in result:
                # 获取起始节点名称
+               start_name = record.get("start_name")
                # 获取连接节点名称
+               connected_name = record.get("connected_name")
                # 将起始节点名称加入节点集合
+               if start_name:
+                   nodes.add(start_name)
                # 将连接节点名称加入节点集合
+               if connected_name:
+                   nodes.add(connected_name)

                # 获取路径上的所有关系
+               rels = record.get("relations", [])
                # 如有关系对象,处理第一个关系对象
+               if rels and len(rels) > 0:
+                   rel = rels[0]
                    # 将关系信息添加到关系列表
+                   relations_list.append(
+                       {
+                           "source": start_name or "Unknown",  # 关系源节点
+                           "target": connected_name or "Unknown",  # 关系目标节点
+                           "type": rel.type if hasattr(rel, "type") else "RELATED",  # 关系类型
+                           "depth": record.get("depth", 0),  # 路径深度
+                       }
+                   )

        # 返回节点名集合与关系列表
+       return nodes, relations_list                               

8.2. main.py #

main.py

# 1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""
1. 命令行运行入口,用于演示完整的 GraphRAG 流程。
"""

# 从 pipeline 模块导入 build_rag_from_env 函数
from pipeline import build_rag_from_env

# 定义主函数
def main():
    # 调用 build_rag_from_env 函数,启动 GraphRAG 工作流
   graphrag = build_rag_from_env()
   try:
        # 定义一个包含示例文档的列表(只有一个字符串,内容涉及微软、谷歌、OpenAI等公司)
        documents = [
            """
            微软是一家美国科技公司,由比尔·盖茨和保罗·艾伦于1975年创立。
            微软在人工智能领域与谷歌、OpenAI等公司存在竞争关系。
            微软开发了 Azure 云平台,谷歌开发了 Google Cloud。
            """
        ]
        # 打印当前进入知识图谱构建阶段的信息
        print("阶段1:构建知识图谱")
        # 调用 graphrag 的 build_knowledge_graph 方法,传入示例文档,进行知识图谱的构建
        graphrag.build_knowledge_graph(documents)
+       print("\n阶段2:查询知识图谱")
+       query = "微软在AI领域的主要竞争对手有哪些?"
+       print(f"问题:{query}")
+       answer = graphrag.query(query)
+       print("\n回答:")
+       print(answer)
   except Exception as e:
        print(f"发生错误: {e}")
   finally:
        graphrag.close()

# 判断是否以主模块运行本文件
if __name__ == "__main__":
    # 运行主函数
    main()

8.3. pipeline.py #

pipeline.py

"""
2. GraphRAG 业务管线。
"""

from config import load_config
from graph_store import GraphStore
from llm_client import LLMExtractor
class GraphRAG:
    """1. 封装检索与生成的完整 GraphRAG 系统。"""

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
         # 初始化图数据库连接
        self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
        # 初始化实体抽取器
        self.extractor = LLMExtractor(model_name=model_name)
    def close(self):
        """关闭数据库连接。"""
        self.store.close()
    def build_knowledge_graph(self, documents):
        # 定义方法:从文档列表中构建知识图谱
        """从文档列表构建知识图谱。"""
        # 遍历每一个输入文档
        for doc in documents:
            # 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
            entities = self.extractor.extract_entities(doc)  
            # 将实体写入(或更新)到图数据库
            self.store.upsert_entities(entities) 
            # 调用 extractor 的 extract_relations 方法抽取当前文档中的实体关系等结构
            relations = self.extractor.extract_relations(doc, entities)
            # 将关系写入(或更新)到图数据库
            self.store.upsert_relations(relations, entities)   
+   def query(self, user_query: str, max_depth: int = 2) -> str:
+       """检索 + 生成组合。"""
        # 检索知识图谱上下文
+       context = self.retrieve(user_query, max_depth)    
+       print('context:', context) 
         # ---------- 检索阶段 ----------
    # 定义 retrieve 方法,用于检索知识图谱上下文
+   def retrieve(self, query: str, max_depth: int = 2) -> str:
+       """检索知识图谱上下文。"""
        # 调用实体抽取器从查询中抽取实体
+       entities = self.extractor.extract_entities(query)
        # 从抽取的实体中提取实体名称列表
+       entity_names = [e["name"] for e in entities if isinstance(e, dict) and e.get("name")]
        # 如果没有抽取到任何实体,返回提示信息
+       if not entity_names:
+           return "未找到相关实体"

        # 使用实体名称和最大深度从图数据库检索子图,获取实体节点和关系列表
+       nodes, relations_list = self.store.retrieve_subgraph(entity_names, max_depth)

        # 构建上下文信息的组成部分列表
+       context_parts = ["知识图谱信息:\n"]
        # 添加相关实体的信息
+       context_parts.append("相关实体:")
        # 遍历所有实体节点并添加到上下文信息
+       for node in nodes:
+           context_parts.append(f"  - {node}")

        # 添加实体关系部分头部
+       context_parts.append("\n实体关系:")
        # 遍历(最多前20条)关系信息并格式化添加到上下文
+       for rel in relations_list[:20]:
+           context_parts.append(f"  {rel['source']} --[{rel['type']}]--> {rel['target']}")

        # 将所有上下文信息组成字符串并返回
+       return "\n".join(context_parts)           

def build_rag_from_env() -> GraphRAG:
    """2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
    cfg = load_config()
    return GraphRAG(
        neo4j_uri=cfg["neo4j_uri"],
        neo4j_user=cfg["neo4j_user"],
        neo4j_password=cfg["neo4j_password"],
        deepseek_api_key=cfg["deepseek_api_key"],
        model_name=cfg["model_name"],
    )

9. LLM生成回答 #

9.1. llm_client.py #

llm_client.py

"""
LLM 客户端与提取逻辑。
"""

# 导入类型提示相关模块
from typing import List, Dict, Any
# 导入 DeepSeek 的聊天模型
from langchain_deepseek import ChatDeepSeek
# 导入消息类型,用于与模型交互
from langchain_core.messages import HumanMessage, SystemMessage
# 导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser

# 定义 LLMExtractor 类,负责实体和关系提取
class LLMExtractor:
    """负责实体和关系提取的封装。"""

    # 构造方法,初始化 LLMExtractor 实例
    def __init__(self, model_name: str = "deepseek-chat"):
        # 初始化 ChatDeepSeek LLM 实例
        self.llm = ChatDeepSeek(model=model_name)
        # 初始化用于解析 JSON 输出的解析器
        self.parser = JsonOutputParser()
    # 定义内部方法 _invoke,接收消息列表作为参数
    def _invoke(self, messages: List[Any]) -> Any:
        # 使用 ChatDeepSeek 的 LLM 和 JSON 解析器组成链,保证输出可被解析为 JSON
        chain = self.llm | self.parser
        # 调用链的 invoke 方法,并传入消息列表,获取模型输出
        return chain.invoke(messages)
    # 定义 extract_entities 方法,从传入文本中提取实体
    def extract_entities(self, text: str) -> List[Dict[str, Any]]:
        """从文本中提取实体列表。"""
        # 构造用于实体抽取的 Prompt
        prompt = f"""
        请从以下文本中提取所有实体,包括人物、组织、地点、概念等。
        返回 JSON 格式,每个实体包含 "name"(名称)和 "type"(类型)字段。

        文本:
        {text}

        返回格式:
        [
            {{"name": "实体名称", "type": "实体类型"}},
            ...
        ]
        """
        # 构造消息列表,系统消息设定角色,人类消息为抽取指令
        messages = [
            SystemMessage(content="你是一个专业的实体提取助手。"),
            HumanMessage(content=prompt),
        ]
        try:
            # 调用内部方法与大模型交互获取抽取结果
            result = self._invoke(messages)
            # 若返回结果为列表,则直接返回
            if isinstance(result, list):
                return result
            # 若返回结果为字典,且包含 entities 字段,则返回 entities
            if isinstance(result, dict):
                return result.get("entities", []) if "entities" in result else []
        except Exception:
            # 捕获异常,失败时返回空列表
            pass
        # 默认返回空列表
        return []    
    # 定义 extract_relations 方法,用于从文本和实体列表中提取关系
    def extract_relations(self, text: str, entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        # 方法说明:从文本和已知实体列表中抽取实体之间的关系列表
        """从文本和实体列表中提取关系列表。"""
        # 将所有实体以 "名称(类型)" 的格式拼接成字符串
        entities_str = ", ".join([f"{e['name']}({e['type']})" for e in entities])
        # 构造面向关系抽取的大模型 Prompt
        prompt = f"""
        请从以下文本中提取实体之间的关系。

        文本:
        {text}

        实体列表:
        {entities_str}

        返回 JSON 格式,每个关系包含 "source"(源实体)、"target"(目标实体)和 "type"(关系类型)字段。

        返回格式:
        [
            {{"source": "源实体名称", "target": "目标实体名称", "type": "关系类型"}},
            ...
        ]
        """
        # 构建消息列表,包括系统消息和人类消息
        messages = [
            SystemMessage(content="你是一个专业的关系提取助手。"),
            HumanMessage(content=prompt),
        ]
        try:
            # 调用内部 _invoke 方法,与大模型交互获取关系抽取结果
            result = self._invoke(messages)
            # 如果结果类型为列表,则直接返回
            if isinstance(result, list):
                return result
            # 如果结果为字典且包含 relations 字段,返回 relations 的值
            if isinstance(result, dict):
                return result.get("relations", []) if "relations" in result else []
        except Exception:
            # 如有异常,忽略并返回空列表
            pass
        # 默认返回空列表,表示未抽取到关系
        return []      

+def build_generator(model_name: str = "deepseek-chat") -> ChatDeepSeek:
        # 方法说明:用于答案生成的 LLM。
+       """用于答案生成的 LLM。"""
        # 返回 ChatDeepSeek 实例,传入模型名称
+       return ChatDeepSeek(model=model_name) 

9.2. pipeline.py #

pipeline.py

"""
2. GraphRAG 业务管线。
"""
+from langchain_core.messages import HumanMessage, SystemMessage
from config import load_config
from graph_store import GraphStore
+from llm_client import LLMExtractor,build_generator

class GraphRAG:
    """1. 封装检索与生成的完整 GraphRAG 系统。"""

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str, deepseek_api_key: str, model_name: str = "deepseek-chat"):
         # 初始化图数据库连接
        self.store = GraphStore(neo4j_uri, neo4j_user, neo4j_password)
        # 初始化实体抽取器
        self.extractor = LLMExtractor(model_name=model_name)
        # 初始化生成器
+       self.generator_llm = build_generator(model_name=model_name)
    def close(self):
        """关闭数据库连接。"""
        self.store.close()
    def build_knowledge_graph(self, documents):
        # 定义方法:从文档列表中构建知识图谱
        """从文档列表构建知识图谱。"""
        # 遍历每一个输入文档
        for doc in documents:
            # 调用 extractor 的 extract_entities 方法抽取当前文档中的实体关系等结构
            entities = self.extractor.extract_entities(doc)  
            # 将实体写入(或更新)到图数据库
            self.store.upsert_entities(entities) 
            # 调用 extractor 的 extract_relations 方法抽取当前文档中的实体关系等结构
            relations = self.extractor.extract_relations(doc, entities)
            # 将关系写入(或更新)到图数据库
            self.store.upsert_relations(relations, entities)   
     # ---------- 生成阶段 ----------
    # 定义 generate 方法,根据传入的查询和上下文生成回答
+   def generate(self, query: str, context: str) -> str:
        # 方法注释:基于上下文生成回答。
+       """基于上下文生成回答。"""
        # 构建 prompt,包含知识图谱信息和用户问题
+       prompt = f"""
+       基于以下知识图谱信息回答问题。请提供准确、详细的回答。

+       知识图谱信息:
+       {context}

+       问题:
+       {query}

+       请基于上述知识图谱信息回答问题。如果信息不足,请说明。
+       """
        # 构造消息列表,包含系统消息和用户消息
+       messages = [
            # 系统消息,设定助手的人设
+           SystemMessage(content="你是一个专业的问答助手,擅长基于知识图谱回答问题。"),
            # 用户消息,实际传递 prompt
+           HumanMessage(content=prompt),
+       ]
        # 调用 LLM(大语言模型)生成回答
+       response = self.generator_llm.invoke(messages)
        # 返回生成的内容
+       return response.content        
    def query(self, user_query: str, max_depth: int = 2) -> str:
        """检索 + 生成组合。"""
        # 检索知识图谱上下文
        context = self.retrieve(user_query, max_depth)    
        # 基于上下文生成回答
+       return self.generate(user_query, context)
         # ---------- 检索阶段 ----------
    # 定义 retrieve 方法,用于检索知识图谱上下文
    def retrieve(self, query: str, max_depth: int = 2) -> str:
        """检索知识图谱上下文。"""
        # 调用实体抽取器从查询中抽取实体
        entities = self.extractor.extract_entities(query)
        # 从抽取的实体中提取实体名称列表
        entity_names = [e["name"] for e in entities if isinstance(e, dict) and e.get("name")]
        # 如果没有抽取到任何实体,返回提示信息
        if not entity_names:
            return "未找到相关实体"

        # 使用实体名称和最大深度从图数据库检索子图,获取实体节点和关系列表
        nodes, relations_list = self.store.retrieve_subgraph(entity_names, max_depth)

        # 构建上下文信息的组成部分列表
        context_parts = ["知识图谱信息:\n"]
        # 添加相关实体的信息
        context_parts.append("相关实体:")
        # 遍历所有实体节点并添加到上下文信息
        for node in nodes:
            context_parts.append(f"  - {node}")

        # 添加实体关系部分头部
        context_parts.append("\n实体关系:")
        # 遍历(最多前20条)关系信息并格式化添加到上下文
        for rel in relations_list[:20]:
            context_parts.append(f"  {rel['source']} --[{rel['type']}]--> {rel['target']}")

        # 将所有上下文信息组成字符串并返回
        return "\n".join(context_parts)           

def build_rag_from_env() -> GraphRAG:
    """2. 使用环境变量创建 GraphRAG 实例的便捷工厂。"""
    cfg = load_config()
    return GraphRAG(
        neo4j_uri=cfg["neo4j_uri"],
        neo4j_user=cfg["neo4j_user"],
        neo4j_password=cfg["neo4j_password"],
        deepseek_api_key=cfg["deepseek_api_key"],
        model_name=cfg["model_name"],
    )

10.执行流程 #

这是一个 GraphRAG(Graph-based Retrieval Augmented Generation) 系统,将知识图谱与大语言模型结合,实现基于图结构的问答系统。

10.1 类图 #

classDiagram class GraphRAG { -GraphStore store -LLMExtractor extractor -ChatDeepSeek generator_llm +__init__(neo4j_uri, neo4j_user, neo4j_password, deepseek_api_key, model_name) +close() +build_knowledge_graph(documents) +retrieve(query, max_depth) str +generate(query, context) str +query(user_query, max_depth) str } class GraphStore { -driver +__init__(uri, user, password) +close() +_escape_label(label) str +upsert_entities(entities) +upsert_relations(relations, entities) +retrieve_subgraph(entity_names, max_depth) } class LLMExtractor { -ChatDeepSeek llm -JsonOutputParser parser +__init__(model_name) +_invoke(messages) Any +extract_entities(text) List +extract_relations(text, entities) List } class Config { +load_config() dict } GraphRAG --> GraphStore : 使用 GraphRAG --> LLMExtractor : 使用 GraphRAG --> ChatDeepSeek : 使用 LLMExtractor --> ChatDeepSeek : 使用 Config ..> GraphRAG : 提供配置

10.2 时序图 #

sequenceDiagram participant Main as main.py participant Config as config.py participant Pipeline as GraphRAG participant Extractor as LLMExtractor participant Store as GraphStore participant LLM as DeepSeek API participant Neo4j as Neo4j DB Note over Main: 程序启动 %% 初始化阶段 Main->>Config: load_config() Config-->>Main: 返回配置字典 Main->>Pipeline: 创建 GraphRAG 实例 Pipeline->>Store: 创建 GraphStore (连接Neo4j) Store->>Neo4j: 建立数据库连接 Pipeline->>Extractor: 创建 LLMExtractor %% 阶段1:构建知识图谱 Note over Main,Neo4j: 阶段1:构建知识图谱 Main->>Pipeline: build_knowledge_graph(documents) loop 遍历每个文档 Pipeline->>Extractor: extract_entities(doc) Extractor->>LLM: 调用API提取实体 LLM-->>Extractor: 返回实体列表 JSON Extractor-->>Pipeline: entities Pipeline->>Store: upsert_entities(entities) Store->>Neo4j: MERGE 节点 Pipeline->>Extractor: extract_relations(doc, entities) Extractor->>LLM: 调用API提取关系 LLM-->>Extractor: 返回关系列表 JSON Extractor-->>Pipeline: relations Pipeline->>Store: upsert_relations(relations, entities) Store->>Neo4j: MERGE 关系 end %% 阶段2:查询知识图谱 Note over Main,Neo4j: 阶段2:查询知识图谱 Main->>Pipeline: query("微软在AI领域的主要竞争对手有哪些?") Pipeline->>Pipeline: retrieve(user_query) Pipeline->>Extractor: extract_entities(query) Extractor->>LLM: 提取查询中的实体 LLM-->>Extractor: ["微软", "AI"] Extractor-->>Pipeline: entity_names Pipeline->>Store: retrieve_subgraph(entity_names, max_depth=2) Store->>Neo4j: MATCH 查询子图 Neo4j-->>Store: 返回节点和关系 Store-->>Pipeline: nodes, relations_list Pipeline->>Pipeline: 构建 context 上下文字符串 Pipeline->>Pipeline: generate(query, context) Pipeline->>LLM: 基于上下文生成回答 LLM-->>Pipeline: 生成的回答 Pipeline-->>Main: 返回 answer Main->>Main: print(answer) %% 清理阶段 Main->>Pipeline: close() Pipeline->>Store: close() Store->>Neo4j: 关闭连接

10.3 执行流程详细说明 #

Step 1: 初始化阶段

   graphrag = build_rag_from_env()

调用链路:

  1. build_rag_from_env() → 读取 .env 配置
  2. 创建 GraphRAG 实例,内部初始化:
    • GraphStore: 连接 Neo4j 图数据库
    • LLMExtractor: 初始化 DeepSeek LLM 用于实体/关系抽取
    • generator_llm: 初始化生成器 LLM

Step 2: 构建知识图谱

        print("阶段1:构建知识图谱")
        # 调用 graphrag 的 build_knowledge_graph 方法,传入示例文档,进行知识图谱的构建
        graphrag.build_knowledge_graph(documents)

处理流程:

步骤 操作 说明
1 extract_entities(doc) LLM 从文本提取实体(微软、比尔·盖茨、谷歌等)
2 upsert_entities() 将实体作为节点写入 Neo4j
3 extract_relations(doc, entities) LLM 提取实体间关系(创立、竞争、开发等)
4 upsert_relations() 将关系作为边写入 Neo4j

示例数据转换:

文本: "微软是一家美国科技公司,由比尔·盖茨和保罗·艾伦于1975年创立..."

↓ 实体提取

[
  {"name": "微软", "type": "组织"},
  {"name": "比尔·盖茨", "type": "人物"},
  {"name": "谷歌", "type": "组织"},
  {"name": "OpenAI", "type": "组织"}
]

↓ 关系提取

[
  {"source": "比尔·盖茨", "target": "微软", "type": "创立"},
  {"source": "微软", "target": "谷歌", "type": "竞争"},
  {"source": "微软", "target": "OpenAI", "type": "竞争"}
]

Step 3: 查询知识图谱

        print("\n阶段2:查询知识图谱")
        query = "微软在AI领域的主要竞争对手有哪些?"
        print(f"问题:{query}")
        answer = graphrag.query(query)

查询流程分为两个子阶段:

检索阶段 (Retrieve)

    def retrieve(self, query: str, max_depth: int = 2) -> str:
        """检索知识图谱上下文。"""
        # 调用实体抽取器从查询中抽取实体
        entities = self.extractor.extract_entities(query)
        # ... 从图数据库检索子图
        nodes, relations_list = self.store.retrieve_subgraph(entity_names, max_depth)
  • 从用户问题中提取实体(如 "微软")
  • 以这些实体为起点,在 Neo4j 中检索 2 跳以内的子图
  • 构建上下文字符串

生成阶段 (Generate)

    def generate(self, query: str, context: str) -> str:
        prompt = f"""
        基于以下知识图谱信息回答问题。请提供准确、详细的回答。
        知识图谱信息:{context}
        问题:{query}
        """
        response = self.generator_llm.invoke(messages)
        return response.content
  • 将检索到的图谱上下文 + 用户问题组成 Prompt
  • 调用 LLM 生成最终回答

10.4 数据流图 #

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Documents │───▶│ LLMExtractor│───▶│  GraphStore │
│  (原始文本)  │    │ (实体/关系  │    │  (Neo4j)    │
└─────────────┘    │   提取)     │    └──────┬──────┘
                   └─────────────┘           │
                                             │ 存储
                                             ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ User Query  │───▶│   Retrieve  │◀───│ 知识图谱    │
│  (用户问题)  │    │  (子图检索) │    │  (节点+边)  │
└─────────────┘    └──────┬──────┘    └─────────────┘
                          │
                          │ context
                          ▼
                   ┌─────────────┐
                   │   Generate  │───▶ Answer
                   │ (LLM生成)   │    (最终回答)
                   └─────────────┘

10.5 核心设计模式 #

模式 应用位置 说明
工厂模式 build_rag_from_env() 根据环境变量创建 GraphRAG 实例
外观模式 GraphRAG 类 统一封装检索和生成的复杂流程
策略模式 LLMExtractor 可替换不同的 LLM 模型
适配器模式 GraphStore 封装 Neo4j 驱动的底层操作
← 上一节 37.Python操作Neo4j 下一节 39.查询和过滤 →

访问验证

请输入访问令牌

Token不正确,请重新输入