导航菜单

  • 1.langchain.intro
  • 2.langchain.chat_models
  • 3.langchain.prompts
  • 4.langchain.example_selectors
  • 5.output_parsers
  • 6.Runnable
  • 7.memory
  • 8.document_loaders
  • 9.text_splitters
  • 10.embeddings
  • 11.tool
  • 12.retrievers
  • 13.optimize
  • 14.项目介绍
  • 15.启动HTTP
  • 16.数据与模型
  • 17.权限管理
  • 18.知识库管理
  • 19.设置
  • 20.文档管理
  • 21.聊天
  • 22.API文档
  • 23.RAG优化
  • 24.索引时优化
  • 25.检索前优化
  • 26.检索后优化
  • 27.系统优化
  • 28.GraphRAG
  • 29.图
  • 30.为什么选择图数据库
  • 31.什么是 Neo4j
  • 32.安装和连接 Neo4j
  • 33.Neo4j核心概念
  • 34.Cypher基础
  • 35.模式匹配
  • 36.数据CRUD操作
  • 37.GraphRAG
  • 38.查询和过滤
  • 39.结果处理和聚合
  • 40.语句组合
  • 41.子查询
  • 42.模式和约束
  • 43.日期时间处理
  • 44.Cypher内置函数
  • 45.Python操作Neo4j
  • 46.neo4j
  • 47.py2neo
  • 48.Streamlit
  • 49.Pandas
  • 50.graphRAG
  • 51.deepdoc
  • 52.deepdoc
  • 53.deepdoc
  • 55.deepdoc
  • 54.deepdoc
  • Pillow
  • 34.TextLoader
    • 34.1. 34.TextLoader.py
    • 34.2. document_loaders.py
  • 35.PDFLoader
    • 35.1. 35.PDFLoader.py
    • 35.2. document_loaders.py
  • 36.Docx2txtLoader
    • 36.1. 36.Docx2txtLoader.py
    • 36.2. document_loaders.py
  • 37.WebBaseLoader
    • 37.1. 37.WebBaseLoader.py
    • 37.2. document_loaders.py
  • 38.CSVLoader
    • 38.1. 38.CSVLoader.py
    • 38.2. example.csv
    • 38.3. document_loaders.py
  • 39.LineByLineLoader
    • 39.1. 39.BaseLoader.py
    • 39.2. sample_lines.txt
    • 39.3. document_loaders.py

34.TextLoader #

  • dataclasses
  • files

TextLoader 是一个用于从文本文件中加载数据的实用工具。它能自动检测文件的编码方式(如 UTF-8、GBK 等),将文件内容读取为一个或多个 Document 对象。每个 Document 通常包含文件的部分文本内容及相关元数据信息(如文件路径、分块编号等)。

主要要点:

  • 自动编码检测:通过 autodetect_encoding=True 让加载过程适应各种文本文件,无需手动指定编码。
  • 批量文档处理:load() 方法将读取的文本封装为 Document 列表,适合下游文本处理与问答应用。
  • 内容访问与展示:可以灵活访问每个文档的具体内容(如首 300 个字符进行预览),以及与文件相关的元数据,便于溯源和管理。

应用场景:适用于知识库构建、语料整理、分块检索、语义分析等文本型任务,是大语言模型与文档内容之间的桥梁。

34.1. 34.TextLoader.py #

34.TextLoader.py


# 从 langchain_community 库中导入 TextLoader 类
#from langchain_community.document_loaders import TextLoader
from smartchain.document_loaders import TextLoader
# 指定需要加载的文件路径,这里以当前目录下的 README.md 文件为例
file_path = "files/example.md"

# 创建 TextLoader 实例,并自动检测文件编码格式
loader = TextLoader(file_path, autodetect_encoding=True)

# 加载文件内容为 Document 列表
docs = loader.load()

# 打印加载成功的文件路径
print(f"成功加载文件: {file_path}")

# 打印加载到的 Document 数量
print(f"共 {len(docs)} 个 Document\n")

# 遍历每一个 Document,并输出相关信息
for i, doc in enumerate(docs, 1):
    # 打印 Document 的编号
    print(f"--- Document {i} ---")
    # 打印内容预览提示
    print("内容预览:")
    # 获取 Document 内容的前 300 个字符作为预览
    preview = doc.page_content[:300]
    # 如果有内容则输出内容预览,否则输出“空内容”
    print(preview if preview else "(空内容)")
    # 打印元数据提示
    print("\n元数据:")
    # 输出 Document 的元数据信息
    print(doc.metadata)

34.2. document_loaders.py #

smartchain/document_loaders.py

# 导入dataclass与field用于定义数据结构
from dataclasses import dataclass, field
# 导入os模块用于文件操作
import os
# 导入类型注解
from typing import Dict, Any


# 定义Document数据类,包含内容和元数据
@dataclass
class Document:
    """文档对象,包含内容和元数据"""

    # 文档正文内容字段,默认空字符串
    page_content: str = ""
    # 文档元数据字段,默认空字典
    metadata: Dict[str, Any] = field(default_factory=dict)


# 定义文本加载器类TextLoader
class TextLoader:
    """
    文本文件加载器,将本地文本读取为 Document 列表。

    Args:
        file_path: 待读取的文本文件路径
        encoding: 指定编码;若为 None 且 autodetect_encoding=True,则尝试多种常见编码
        autodetect_encoding: 是否自动探测常见编码(utf-8, gbk, latin-1)
    """

    # 构造方法,初始化文件路径、编码和自动检测开关
    def __init__(
        self,
        file_path,
        encoding=None,
        autodetect_encoding=False,
    ):
        # 存储文件路径
        self.file_path = file_path
        # 存储编码方式
        self.encoding = encoding
        # 存储是否自动检测编码
        self.autodetect_encoding = autodetect_encoding

    # 文件读取与编码判定的内部方法
    def _detect_and_read(self):
        """读取文件内容,支持编码探测"""
        # 检查文件是否存在,如果不存在则报错
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 如果指定了编码,则直接用指定编码读取
        if self.encoding:
            with open(self.file_path, "r", encoding=self.encoding, errors="ignore") as f:
                return f.read()

        # 未指定编码时,尝试常见编码列表
        tried = []
        # 若开启自动检测,尝试三种编码,否则只尝试utf-8
        for enc in (["utf-8", "gbk", "latin-1"] if self.autodetect_encoding else ["utf-8"]):
            try:
                # 用当前编码尝试打开并读取文件
                with open(self.file_path, "r", encoding=enc) as f:
                    return f.read()
            except Exception as e:
                # 记录失败的编码及异常信息,继续下一个
                tried.append((enc, str(e)))
                continue

        # 所有编码都读取失败,抛出UnicodeDecodeError异常
        raise UnicodeDecodeError(
            "auto", b"", 0, 1, f"无法以常见编码读取文件,尝试过: {tried}"
        )

    # 主方法,加载文件为Document对象
    def load(self):
        """读取文件并返回 Document 列表"""
        # 读取文件内容
        content = self._detect_and_read()
        # 构建元数据,指明文件来源
        metadata = {"source": self.file_path}
        # 返回包含单个Document的列表
        return [Document(page_content=content, metadata=metadata)]

35.PDFLoader #

  • PyPDF2

PyPDFLoader 依赖 PyPDF2 库(如未安装需先完成 pip install PyPDF2),可以用于按页提取 PDF 文件中的文本,并以 Document 对象的列表形式返回,适用于多轮对话、检索增强问答(RAG)等场景。

使用示例说明:

  1. 导入 PyPDFLoader。
  2. 指定 PDF 文件路径,实例化加载器。
  3. 调用 .load() 方法批量加载 PDF,每页对应一个 Document,内容与元数据都能获取。
  4. 可遍历、查看部分内容与全部元数据,便于后续处理。

通过代码可以看到,如何逐页访问 PDF 内容,并便捷获取来源、页码等信息,为文档分块、问答检索等应用打好基础。

35.1. 35.PDFLoader.py #

35.PDFLoader.py

# uv add PyPDF2
# 导入 PyPDFLoader 类(从 smartchain 库)
from smartchain.document_loaders import PyPDFLoader

# 指定需要加载的 PDF 文件路径
file_path = "files/example.pdf"

# 创建 PyPDFLoader 实例,传入文件路径
loader = PyPDFLoader(file_path)

# 加载 PDF 文件内容,返回 Document 对象列表
docs = loader.load()

# 打印已成功加载的 PDF 文件路径
print(f"成功加载文件: {file_path}")

# 打印加载到的 Document 数量
print(f"共 {len(docs)} 个 Document\n")

# 遍历所有 Document,并输出相关信息
for i, doc in enumerate(docs, 1):
    # 打印当前 Document 的编号
    print(f"--- Document {i} ---")
    # 打印内容预览提示
    print("内容预览:")
    # 获取当前 Document 的前 300 个字符用于内容预览
    preview = doc.page_content[:300]
    # 如果内容非空则输出预览,否则输出“空内容”
    print(preview if preview else "(空内容)")
    # 打印元数据提示
    print("\n元数据:")
    # 输出当前 Document 的元数据信息
    print(doc.metadata)

35.2. document_loaders.py #

smartchain/document_loaders.py

from dataclasses import dataclass, field
import os
+from typing import Optional, List, Dict


@dataclass
class Document:
    """文档对象,包含内容和元数据"""

+   page_content: str = ""
+   metadata: Dict = field(default_factory=dict)


class TextLoader:
    """
    文本文件加载器,将本地文本读取为 Document 列表。

    Args:
        file_path: 待读取的文本文件路径
        encoding: 指定编码;若为 None 且 autodetect_encoding=True,则尝试多种常见编码
        autodetect_encoding: 是否自动探测常见编码(utf-8, gbk, latin-1)
    """

    def __init__(
        self,
+       file_path: str,
+       encoding: Optional[str] = None,
+       autodetect_encoding: bool = False,
    ):
        self.file_path = file_path
        self.encoding = encoding
        self.autodetect_encoding = autodetect_encoding

+   def _detect_and_read(self) -> str:
        """读取文件内容,支持编码探测"""
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 指定编码
        if self.encoding:
            with open(self.file_path, "r", encoding=self.encoding, errors="ignore") as f:
                return f.read()

        # 自动探测常见编码
        tried = []
+       for enc in (["utf-8", "utf-8-sig", "gbk", "latin-1"] if self.autodetect_encoding else ["utf-8"]):
            try:
                with open(self.file_path, "r", encoding=enc) as f:
                    return f.read()
            except Exception as e:
                tried.append((enc, str(e)))
                continue

        raise UnicodeDecodeError(
            "auto", b"", 0, 1, f"无法以常见编码读取文件,尝试过: {tried}"
        )

+   def load(self) -> List[Document]:
        """读取文件并返回 Document 列表"""
        content = self._detect_and_read()
        metadata = {"source": self.file_path}
        return [Document(page_content=content, metadata=metadata)]

# 定义 PyPDFLoader 类
+class PyPDFLoader:
+   """
+   PDF 加载器:按页读取文本,返回 Document 列表。
+   依赖 PyPDF2。若未安装,请先 `pip install PyPDF2`。
+   """

    # 构造函数,接收 PDF 文件路径
+   def __init__(self, file_path: str):
        # 保存文件路径到实例变量
+       self.file_path = file_path

    # 加载 PDF 文件并返回 Document 对象列表
+   def load(self) -> List[Document]:
        # 检查文件是否存在,如果不存在则抛出异常
+       if not os.path.exists(self.file_path):
+           raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 PyPDF2 库
+       try:
+           import PyPDF2
        # 如果没有安装 PyPDF2,则抛出导入异常
+       except ImportError:
+           raise ImportError("需要安装 PyPDF2 才能使用 PyPDFLoader:pip install PyPDF2")

        # 初始化 Document 列表
+       docs: List[Document] = []
        # 以二进制只读模式打开 PDF 文件
+       with open(self.file_path, "rb") as f:
            # 创建 PDF 读取器对象
+           reader = PyPDF2.PdfReader(f)
            # 枚举每个页面
+           for idx, page in enumerate(reader.pages):
                # 提取当前页文本,如果无内容则使用空字符串
+               content = page.extract_text() or ""
                # 构建当前页的元数据信息
+               metadata = {
+                   "source": self.file_path,
+                   "page": idx + 1,
+                   "total_pages": len(reader.pages),
+               }
                # 创建 Document 对象并添加到列表
+               docs.append(Document(page_content=content, metadata=metadata))
        # 返回全部 Document 对象列表
+       return docs

36.Docx2txtLoader #

Docx2txtLoader 是一个用于从 .docx 文件(Microsoft Word 文档)中批量提取文本内容的加载器。它依赖 python-docx 库(如未安装需 pip install python-docx),可将 Word 文档按段落、页或整体进行文本抽取,并将内容和元数据封装为 Document 对象列表,方便知识库构建与文档问答场景。

主要特点:

  • 自动提取 Word 文档中全部可见文字内容,支持多种复杂结构。
  • 每个文档内容独立封装,便于下游分块、溯源等操作。
  • 元数据记录源文件路径,为后续检索和追踪提供支持。

应用场景
可用于笔记整理、合同分析、论文知识归档、问答系统知识接入等一切涉及 Word 文件的结构化知识抽取处理任务。

基本用法:

  1. 导入 Docx2txtLoader。
  2. 实例化时传入 .docx 文件路径。
  3. 调用 .load() 返回 Document 对象列表,即可进一步遍历、分块与分析。

36.1. 36.Docx2txtLoader.py #

python-docx

36.Docx2txtLoader.py

# uv add python-docx
#from langchain_community.document_loaders import Docx2txtLoader
# 导入 PyPDFLoader 类(从 smartchain 库)
from smartchain.document_loaders import Docx2txtLoader

# 指定需要加载的 PDF 文件路径
file_path = "files/DOCX.docx"

# 创建 PyPDFLoader 实例,传入文件路径
loader = Docx2txtLoader(file_path)

# 加载 PDF 文件内容,返回 Document 对象列表
docs = loader.load()

# 打印已成功加载的 PDF 文件路径
print(f"成功加载文件: {file_path}")

# 打印加载到的 Document 数量
print(f"共 {len(docs)} 个 Document\n")

# 遍历所有 Document,并输出相关信息
for i, doc in enumerate(docs, 1):
    # 打印当前 Document 的编号
    print(f"--- Document {i} ---")
    # 打印内容预览提示
    print("内容预览:")
    # 获取当前 Document 的前 300 个字符用于内容预览
    preview = doc.page_content[:300]
    # 如果内容非空则输出预览,否则输出“空内容”
    print(preview if preview else "(空内容)")
    # 打印元数据提示
    print("\n元数据:")
    # 输出当前 Document 的元数据信息
    print(doc.metadata)

36.2. document_loaders.py #

smartchain/document_loaders.py

from dataclasses import dataclass, field
import os
+from typing import Optional, List, Dict, Union
+from urllib.request import urlopen, Request
+from urllib.parse import urlparse
+import ssl


@dataclass
class Document:
    """文档对象,包含内容和元数据"""

    page_content: str = ""
    metadata: Dict = field(default_factory=dict)


class TextLoader:
    """
    文本文件加载器,将本地文本读取为 Document 列表。

    Args:
        file_path: 待读取的文本文件路径
        encoding: 指定编码;若为 None 且 autodetect_encoding=True,则尝试多种常见编码
        autodetect_encoding: 是否自动探测常见编码(utf-8, gbk, latin-1)
    """

    def __init__(
        self,
        file_path: str,
        encoding: Optional[str] = None,
        autodetect_encoding: bool = False,
    ):
        self.file_path = file_path
        self.encoding = encoding
        self.autodetect_encoding = autodetect_encoding

    def _detect_and_read(self) -> str:
        """读取文件内容,支持编码探测"""
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 指定编码
        if self.encoding:
            with open(self.file_path, "r", encoding=self.encoding, errors="ignore") as f:
                return f.read()

        # 自动探测常见编码
        tried = []
        for enc in (["utf-8", "utf-8-sig", "gbk", "latin-1"] if self.autodetect_encoding else ["utf-8"]):
            try:
                with open(self.file_path, "r", encoding=enc) as f:
                    return f.read()
            except Exception as e:
                tried.append((enc, str(e)))
                continue

        raise UnicodeDecodeError(
            "auto", b"", 0, 1, f"无法以常见编码读取文件,尝试过: {tried}"
        )

    def load(self) -> List[Document]:
        """读取文件并返回 Document 列表"""
        content = self._detect_and_read()
        metadata = {"source": self.file_path}
        return [Document(page_content=content, metadata=metadata)]

# 定义 PyPDFLoader 类
class PyPDFLoader:
    """
    PDF 加载器:按页读取文本,返回 Document 列表。
    依赖 PyPDF2。若未安装,请先 `pip install PyPDF2`。
    """

    # 构造函数,接收 PDF 文件路径
    def __init__(self, file_path: str):
        # 保存文件路径到实例变量
        self.file_path = file_path

    # 加载 PDF 文件并返回 Document 对象列表
    def load(self) -> List[Document]:
        # 检查文件是否存在,如果不存在则抛出异常
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 PyPDF2 库
        try:
            import PyPDF2
        # 如果没有安装 PyPDF2,则抛出导入异常
        except ImportError:
            raise ImportError("需要安装 PyPDF2 才能使用 PyPDFLoader:pip install PyPDF2")

        # 初始化 Document 列表
        docs: List[Document] = []
        # 以二进制只读模式打开 PDF 文件
        with open(self.file_path, "rb") as f:
            # 创建 PDF 读取器对象
            reader = PyPDF2.PdfReader(f)
            # 枚举每个页面
            for idx, page in enumerate(reader.pages):
                # 提取当前页文本,如果无内容则使用空字符串
                content = page.extract_text() or ""
                # 构建当前页的元数据信息
                metadata = {
                    "source": self.file_path,
                    "page": idx + 1,
                    "total_pages": len(reader.pages),
                }
                # 创建 Document 对象并添加到列表
                docs.append(Document(page_content=content, metadata=metadata))
        # 返回全部 Document 对象列表
        return docs


+# 定义 Docx2txtLoader 类
+class Docx2txtLoader:
+    """
+    DOCX 加载器:读取 Word 文档文本,返回 Document 列表。
+    依赖 python-docx。若未安装,请先 `pip install python-docx`。
+    """
+
+    # 构造函数,接收 DOCX 文件路径
+    # 参数 file_path: DOCX 文件的路径
+    def __init__(self, file_path: str):
+        # 保存文件路径到实例属性
+        self.file_path = file_path
+
+    # 加载 DOCX 文件并返回 Document 对象列表
+    def load(self) -> List[Document]:
+        # 检查 DOCX 文件是否存在,若不存在则抛出 FileNotFoundError
+        if not os.path.exists(self.file_path):
+            raise FileNotFoundError(f"文件不存在: {self.file_path}")
+        # 尝试导入 python-docx 库中的 Document,如果未安装,则捕获异常
+        try:
+            from docx import Document as DocxDocument
+        # 如果没有安装 python-docx,则抛出导入异常,告知用户安装
+        except ImportError:
+            raise ImportError("需要安装 python-docx 才能使用 Docx2txtLoader:pip install python-docx")
+
+        # 打开 DOCX 文件,创建文档对象
+        doc = DocxDocument(self.file_path)
+        # 初始化段落文本列表,用于存储所有段落的内容
+        paragraphs = []
+        # 遍历文档中的每一个段落对象
+        for para in doc.paragraphs:
+            # 获取该段落的文本,去除首尾空白字符
+            text = para.text.strip()
+            # 判断段落内容是否非空,非空则添加到段落列表
+            if text:
+                paragraphs.append(text)
+        # 将所有段落文本用换行符拼接为一个整体内容字符串
+        content = "\n".join(paragraphs)
+        # 构建文档的元数据信息,标明来源文件路径
+        metadata = {
+            "source": self.file_path,
+        }
+        # 构造并返回包含 Document 对象的单元素列表
+        return [Document(page_content=content, metadata=metadata)]

37.WebBaseLoader #

  • beautifulsoup4
  • ssl
  • urllib

WebBaseLoader 是一个网页内容加载器,用于批量抓取指定网页(Web 页面)并将其内容封装为 Document 对象。它适用于需要快速采集网页正文内容用于下游文本分析或智能应用(如问答、摘要等)的场景。

基本用法:

  • 传入参数:

    • web_paths:一个字符串网页地址或者字符串列表,即待抓取的目标网页 URL。
    • (可选)其它参数:如请求头设置、超时设置、UA 伪装等,具体可查阅实现源码。
  • 执行流程:

    1. 实例化 WebBaseLoader,传入待加载的网址列表。
    2. 调用 .load() 方法,自动抓取每个网页内容。
    3. 返回值是 Document 对象的列表,每个元素包含网页正文和采集元数据(如 url)。

返回的 Document:

  • page_content:网页主体提取后的纯文本内容(如正文、主要文章)。
  • metadata:以字典形式给出,含以下常用字段:
    • source:采集的网页 url
    • title:网页标题(如实现支持)
    • 其它按实际 loader 提供

典型应用场景:

  • 快速采集新闻、博客、公告等网页主内容用于索引、搜索或智能问答。
  • 结合分词、向量化模块,实现网页知识库自动同步。

注意事项:

  • 若目标网页禁止爬虫,需合理设置 User-Agent 或遵循 robots 协议。
  • 特殊网页(需登录、动态加载内容)可能无法直接采集,可结合 Selenium、Playwright 等浏览器自动化工具补充。
  • 支持批量 url 采集,返回多个 Document。

代码解释:

  • 首先从 smartchain.document_loaders 导入 WebBaseLoader,指定目标 url 列表。
  • 调用 load() 方法,即可获取所有网页内容及元数据,作为 Document 对象集合。
  • 可遍历处理每一条 Document,访问其正文内容 page_content 和采集时的 metadata。

本模块适合需要高效采集网页文本,并与智能分析链路(如 embeddings、LLM 提问)集成使用的开发者。

37.1. 37.WebBaseLoader.py #

37.WebBaseLoader.py

# uv add beautifulsoup4 lxml
# 从 smartchain.document_loaders 导入 WebBaseLoader 类
from smartchain.document_loaders import WebBaseLoader

# 指定要加载的网页 URL
url = "https://www.example.com"

# 创建 WebBaseLoader 对象,并通过 load 方法加载网页内容为 docs
docs = WebBaseLoader(web_paths=[url]).load()

# 打印当前加载网页的 URL 以及所获得 Document 的总数
print(f"加载 {url},共 {len(docs)} 个 Document")

# 遍历所有加载得到的 Document 对象
for i, doc in enumerate(docs, 1):
    # 打印每个文档内容的前 120 个字符,以及该文档的元数据信息
    print(f"{doc.page_content[:120]} \n元数据:{doc.metadata}\n")

37.2. document_loaders.py #

smartchain/document_loaders.py

from dataclasses import dataclass, field
import os
+from typing import Optional, List, Dict, Union
+from urllib.request import urlopen, Request
+from urllib.parse import urlparse
+import ssl


@dataclass
class Document:
    """文档对象,包含内容和元数据"""

    page_content: str = ""
    metadata: Dict = field(default_factory=dict)


class TextLoader:
    """
    文本文件加载器,将本地文本读取为 Document 列表。

    Args:
        file_path: 待读取的文本文件路径
        encoding: 指定编码;若为 None 且 autodetect_encoding=True,则尝试多种常见编码
        autodetect_encoding: 是否自动探测常见编码(utf-8, gbk, latin-1)
    """

    def __init__(
        self,
        file_path: str,
        encoding: Optional[str] = None,
        autodetect_encoding: bool = False,
    ):
        self.file_path = file_path
        self.encoding = encoding
        self.autodetect_encoding = autodetect_encoding

    def _detect_and_read(self) -> str:
        """读取文件内容,支持编码探测"""
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 指定编码
        if self.encoding:
            with open(self.file_path, "r", encoding=self.encoding, errors="ignore") as f:
                return f.read()

        # 自动探测常见编码
        tried = []
        for enc in (["utf-8", "utf-8-sig", "gbk", "latin-1"] if self.autodetect_encoding else ["utf-8"]):
            try:
                with open(self.file_path, "r", encoding=enc) as f:
                    return f.read()
            except Exception as e:
                tried.append((enc, str(e)))
                continue

        raise UnicodeDecodeError(
            "auto", b"", 0, 1, f"无法以常见编码读取文件,尝试过: {tried}"
        )

    def load(self) -> List[Document]:
        """读取文件并返回 Document 列表"""
        content = self._detect_and_read()
        metadata = {"source": self.file_path}
        return [Document(page_content=content, metadata=metadata)]

# 定义 PyPDFLoader 类
class PyPDFLoader:
    """
    PDF 加载器:按页读取文本,返回 Document 列表。
    依赖 PyPDF2。若未安装,请先 `pip install PyPDF2`。
    """

    # 构造函数,接收 PDF 文件路径
    def __init__(self, file_path: str):
        # 保存文件路径到实例变量
        self.file_path = file_path

    # 加载 PDF 文件并返回 Document 对象列表
    def load(self) -> List[Document]:
        # 检查文件是否存在,如果不存在则抛出异常
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 PyPDF2 库
        try:
            import PyPDF2
        # 如果没有安装 PyPDF2,则抛出导入异常
        except ImportError:
            raise ImportError("需要安装 PyPDF2 才能使用 PyPDFLoader:pip install PyPDF2")

        # 初始化 Document 列表
        docs: List[Document] = []
        # 以二进制只读模式打开 PDF 文件
        with open(self.file_path, "rb") as f:
            # 创建 PDF 读取器对象
            reader = PyPDF2.PdfReader(f)
            # 枚举每个页面
            for idx, page in enumerate(reader.pages):
                # 提取当前页文本,如果无内容则使用空字符串
                content = page.extract_text() or ""
                # 构建当前页的元数据信息
                metadata = {
                    "source": self.file_path,
                    "page": idx + 1,
                    "total_pages": len(reader.pages),
                }
                # 创建 Document 对象并添加到列表
                docs.append(Document(page_content=content, metadata=metadata))
        # 返回全部 Document 对象列表
        return docs


# 定义 Docx2txtLoader 类
class Docx2txtLoader:
    """
    DOCX 加载器:读取 Word 文档文本,返回 Document 列表。
    依赖 python-docx。若未安装,请先 `pip install python-docx`。
    """

    # 构造函数,接收 DOCX 文件路径
    # 参数 file_path: DOCX 文件的路径
    def __init__(self, file_path: str):
        # 保存文件路径到实例属性
        self.file_path = file_path

    # 加载 DOCX 文件并返回 Document 对象列表
    def load(self) -> List[Document]:
        # 检查 DOCX 文件是否存在,若不存在则抛出 FileNotFoundError
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 python-docx 库中的 Document,如果未安装,则捕获异常
        try:
            from docx import Document as DocxDocument
        # 如果没有安装 python-docx,则抛出导入异常,告知用户安装
        except ImportError:
            raise ImportError("需要安装 python-docx 才能使用 Docx2txtLoader:pip install python-docx")

        # 打开 DOCX 文件,创建文档对象
        doc = DocxDocument(self.file_path)
        # 初始化段落文本列表,用于存储所有段落的内容
        paragraphs = []
        # 遍历文档中的每一个段落对象
        for para in doc.paragraphs:
            # 获取该段落的文本,去除首尾空白字符
            text = para.text.strip()
            # 判断段落内容是否非空,非空则添加到段落列表
            if text:
                paragraphs.append(text)
        # 将所有段落文本用换行符拼接为一个整体内容字符串
        content = "\n".join(paragraphs)
        # 构建文档的元数据信息,标明来源文件路径
        metadata = {
            "source": self.file_path,
        }
        # 构造并返回包含 Document 对象的单元素列表
        return [Document(page_content=content, metadata=metadata)]

# 定义 WebBaseLoader 类
+class WebBaseLoader:
+   """
+   网页加载器:从网页 URL 读取文本内容,返回 Document 列表。
+   依赖 beautifulsoup4。若未安装,请先 `pip install beautifulsoup4`。
+   """

    # 构造函数,接收网页路径列表
    # 参数 web_paths: 网页 URL 列表,可以是单个 URL 字符串或 URL 列表
+   def __init__(self, web_paths: Union[str, List[str]]):
        # 如果传入的是单个字符串,则将其转为列表
+       if isinstance(web_paths, str):
+           self.web_paths = [web_paths]
+       else:
            # 如果本身就是列表,则直接赋值
+           self.web_paths = web_paths

    # 抓取单个网页内容的方法
+   def _scrape(self, url: str) -> Dict[str, str]:
+       """
+       抓取网页内容并解析 HTML。

+       Args:
+           url: 要抓取的网页 URL

+       Returns:
+           包含 'content'、'title'、'language' 的字典
+       """
        # 尝试导入 BeautifulSoup 库
+       try:
+           from bs4 import BeautifulSoup
        # 如果未安装 beautifulsoup4,则抛出异常
+       except ImportError:
+           raise ImportError("需要安装 beautifulsoup4 才能使用 WebBaseLoader:pip install beautifulsoup4")

        # 构造 HTTP 请求对象,设置 User-Agent
+       req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
        # 创建 SSL 上下文,关闭证书验证
+       ssl_context = ssl.create_default_context()
+       ssl_context.check_hostname = False
+       ssl_context.verify_mode = ssl.CERT_NONE
        # 打开 url 并读取响应内容
+       with urlopen(req, context=ssl_context) as response:
            # 读取原始 HTML 字节数据
+           html_content = response.read()
            # 尝试从响应中获取字符编码,没有则用 utf-8
+           encoding = response.headers.get_content_charset() or "utf-8"
            # 按指定编码解码 HTML 内容
+           html_text = html_content.decode(encoding, errors="ignore")

        # 用 BeautifulSoup 解析 HTML 文档
+       soup = BeautifulSoup(html_text, "html.parser")
        # 移除 script 和 style 标签(不需要其内容)
+       for script in soup(["script", "style"]):
+           script.decompose()

        # 查找网页标题
+       title_tag = soup.find("title")
        # 获取标题文本(若未找到则为空字符串)
+       title = title_tag.get_text().strip() if title_tag else ""
        # 查找 html 根标签
+       html_tag = soup.find("html")
        # 获取 lang 属性作为语言,没有则默认为 en
+       language = html_tag.get("lang", "en") if html_tag else "en"
        # 提取网页正文(所有可见文本,按换行分隔)
+       text_content = soup.get_text(separator="\n", strip=True)

        # 返回网页内容,标题,语言
+       return {
+           "content": text_content,
+           "title": title,
+           "language": language,
+       }

    # 加载所有网页并返回 Document 对象列表
+   def load(self) -> List[Document]:
+       """
+       加载所有指定的网页并返回 Document 列表。

+       Returns:
+           Document 对象列表,每个网页对应一个 Document
+       """
        # 新建存储所有 Document 的列表
+       docs: List[Document] = []
        # 遍历待抓取的每一个 URL
+       for url in self.web_paths:
            # 抓取网页内容和信息
+           scraped_data = self._scrape(url)
            # 构建元数据,包括来源 URL、标题和语言
+           metadata = {
+               "source": url,
+               "title": scraped_data["title"],
+               "language": scraped_data["language"],
+           }
            # 创建 Document 对象,加入结果列表
+           docs.append(
+               Document(
+                   page_content=scraped_data["content"],
+                   metadata=metadata,
+               )
+           )
        # 返回所有 Document 对象
+       return docs

38.CSVLoader #

使用 CSVLoader 类来加载和处理 CSV 文件,将每一行转换为结构化的 Document 对象。你可以通过最简单的方式直接读取本地的 CSV 文件,也可以传递自定义参数来适配不同格式的 CSV 数据。

CSVLoader 的主要功能如下:

  • 支持读取常见的逗号分隔(CSV)文件,自动识别列名和内容。
  • 每一行会生成一个 Document,内容为该行的数据(可以排除元数据字段)。
  • 可以指定哪些字段作为元数据(如 source、行号等),便于后续检索和分析。
  • 支持自定义分隔符、引号等 CSV 格式参数(通过 csv_args 参数传递)。
  • 兼容多种编码方式,解决读取中文文件乱码问题(如可指定 encoding="utf-8")。

38.1. 38.CSVLoader.py #

38.CSVLoader.py

#from langchain_community.document_loaders import CSVLoader
from smartchain.document_loaders import CSVLoader
file_path = "files/example.csv"

# 基本用法
docs = CSVLoader(file_path=file_path, encoding="utf-8").load()
print(f"共 {len(docs)} 个 Document")
for i, doc in enumerate(docs, 1):
    print(f"--- 文档 {i} ---\n{doc.page_content}\n元数据:{doc.metadata}\n")

# 自定义参数
docs_custom = CSVLoader(
    file_path=file_path,
    encoding="utf-8",
    csv_args={"delimiter": ",", "quotechar": '"'}
).load()
print(f"自定义参数: {len(docs_custom)} 个 Document")
for i, doc in enumerate(docs_custom[:2], 1):
    print(f"--- 文档 {i} ---\n{doc.page_content[:100]}...\n元数据:{doc.metadata}\n")

38.2. example.csv #

files/example.csv

姓名,年龄,城市,职业
张三,25,北京,工程师
李四,30,上海,设计师
王五,28,广州,产品经理
赵六,35,深圳,数据分析师

38.3. document_loaders.py #

smartchain/document_loaders.py

from dataclasses import dataclass, field
import os
+import csv
+from typing import Optional, List, Dict, Union, Sequence
from urllib.request import urlopen, Request
from urllib.parse import urlparse
import ssl


@dataclass
class Document:
    """文档对象,包含内容和元数据"""

    page_content: str = ""
    metadata: Dict = field(default_factory=dict)


class TextLoader:
    """
    文本文件加载器,将本地文本读取为 Document 列表。

    Args:
        file_path: 待读取的文本文件路径
        encoding: 指定编码;若为 None 且 autodetect_encoding=True,则尝试多种常见编码
        autodetect_encoding: 是否自动探测常见编码(utf-8, gbk, latin-1)
    """

    def __init__(
        self,
        file_path: str,
        encoding: Optional[str] = None,
        autodetect_encoding: bool = False,
    ):
        self.file_path = file_path
        self.encoding = encoding
        self.autodetect_encoding = autodetect_encoding

    def _detect_and_read(self) -> str:
        """读取文件内容,支持编码探测"""
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 指定编码
        if self.encoding:
            with open(self.file_path, "r", encoding=self.encoding, errors="ignore") as f:
                return f.read()

        # 自动探测常见编码
        tried = []
        for enc in (["utf-8", "utf-8-sig", "gbk", "latin-1"] if self.autodetect_encoding else ["utf-8"]):
            try:
                with open(self.file_path, "r", encoding=enc) as f:
                    return f.read()
            except Exception as e:
                tried.append((enc, str(e)))
                continue

        raise UnicodeDecodeError(
            "auto", b"", 0, 1, f"无法以常见编码读取文件,尝试过: {tried}"
        )

    def load(self) -> List[Document]:
        """读取文件并返回 Document 列表"""
        content = self._detect_and_read()
        metadata = {"source": self.file_path}
        return [Document(page_content=content, metadata=metadata)]

# 定义 PyPDFLoader 类
class PyPDFLoader:
    """
    PDF 加载器:按页读取文本,返回 Document 列表。
    依赖 PyPDF2。若未安装,请先 `pip install PyPDF2`。
    """

    # 构造函数,接收 PDF 文件路径
    def __init__(self, file_path: str):
        # 保存文件路径到实例变量
        self.file_path = file_path

    # 加载 PDF 文件并返回 Document 对象列表
    def load(self) -> List[Document]:
        # 检查文件是否存在,如果不存在则抛出异常
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 PyPDF2 库
        try:
            import PyPDF2
        # 如果没有安装 PyPDF2,则抛出导入异常
        except ImportError:
            raise ImportError("需要安装 PyPDF2 才能使用 PyPDFLoader:pip install PyPDF2")

        # 初始化 Document 列表
        docs: List[Document] = []
        # 以二进制只读模式打开 PDF 文件
        with open(self.file_path, "rb") as f:
            # 创建 PDF 读取器对象
            reader = PyPDF2.PdfReader(f)
            # 枚举每个页面
            for idx, page in enumerate(reader.pages):
                # 提取当前页文本,如果无内容则使用空字符串
                content = page.extract_text() or ""
                # 构建当前页的元数据信息
                metadata = {
                    "source": self.file_path,
                    "page": idx + 1,
                    "total_pages": len(reader.pages),
                }
                # 创建 Document 对象并添加到列表
                docs.append(Document(page_content=content, metadata=metadata))
        # 返回全部 Document 对象列表
        return docs


# 定义 Docx2txtLoader 类
class Docx2txtLoader:
    """
    DOCX 加载器:读取 Word 文档文本,返回 Document 列表。
    依赖 python-docx。若未安装,请先 `pip install python-docx`。
    """

    # 构造函数,接收 DOCX 文件路径
    # 参数 file_path: DOCX 文件的路径
    def __init__(self, file_path: str):
        # 保存文件路径到实例属性
        self.file_path = file_path

    # 加载 DOCX 文件并返回 Document 对象列表
    def load(self) -> List[Document]:
        # 检查 DOCX 文件是否存在,若不存在则抛出 FileNotFoundError
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 python-docx 库中的 Document,如果未安装,则捕获异常
        try:
            from docx import Document as DocxDocument
        # 如果没有安装 python-docx,则抛出导入异常,告知用户安装
        except ImportError:
            raise ImportError("需要安装 python-docx 才能使用 Docx2txtLoader:pip install python-docx")

        # 打开 DOCX 文件,创建文档对象
        doc = DocxDocument(self.file_path)
        # 初始化段落文本列表,用于存储所有段落的内容
        paragraphs = []
        # 遍历文档中的每一个段落对象
        for para in doc.paragraphs:
            # 获取该段落的文本,去除首尾空白字符
            text = para.text.strip()
            # 判断段落内容是否非空,非空则添加到段落列表
            if text:
                paragraphs.append(text)
        # 将所有段落文本用换行符拼接为一个整体内容字符串
        content = "\n".join(paragraphs)
        # 构建文档的元数据信息,标明来源文件路径
        metadata = {
            "source": self.file_path,
        }
        # 构造并返回包含 Document 对象的单元素列表
        return [Document(page_content=content, metadata=metadata)]

# 定义 WebBaseLoader 类
class WebBaseLoader:
    """
    网页加载器:从网页 URL 读取文本内容,返回 Document 列表。
    依赖 beautifulsoup4。若未安装,请先 `pip install beautifulsoup4`。
    """

    # 构造函数,接收网页路径列表
    # 参数 web_paths: 网页 URL 列表,可以是单个 URL 字符串或 URL 列表
    def __init__(self, web_paths: Union[str, List[str]]):
        # 如果传入的是单个字符串,则将其转为列表
        if isinstance(web_paths, str):
            self.web_paths = [web_paths]
        else:
            # 如果本身就是列表,则直接赋值
            self.web_paths = web_paths

    # 抓取单个网页内容的方法
    def _scrape(self, url: str) -> Dict[str, str]:
        """
        抓取网页内容并解析 HTML。

        Args:
            url: 要抓取的网页 URL

        Returns:
            包含 'content'、'title'、'language' 的字典
        """
        # 尝试导入 BeautifulSoup 库
        try:
            from bs4 import BeautifulSoup
        # 如果未安装 beautifulsoup4,则抛出异常
        except ImportError:
            raise ImportError("需要安装 beautifulsoup4 才能使用 WebBaseLoader:pip install beautifulsoup4")

        # 构造 HTTP 请求对象,设置 User-Agent
        req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
        # 创建 SSL 上下文,关闭证书验证
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        # 打开 url 并读取响应内容
        with urlopen(req, context=ssl_context) as response:
            # 读取原始 HTML 字节数据
            html_content = response.read()
            # 尝试从响应中获取字符编码,没有则用 utf-8
            encoding = response.headers.get_content_charset() or "utf-8"
            # 按指定编码解码 HTML 内容
            html_text = html_content.decode(encoding, errors="ignore")

        # 用 BeautifulSoup 解析 HTML 文档
        soup = BeautifulSoup(html_text, "html.parser")
        # 移除 script 和 style 标签(不需要其内容)
        for script in soup(["script", "style"]):
            script.decompose()

        # 查找网页标题
        title_tag = soup.find("title")
        # 获取标题文本(若未找到则为空字符串)
        title = title_tag.get_text().strip() if title_tag else ""
        # 查找 html 根标签
        html_tag = soup.find("html")
        # 获取 lang 属性作为语言,没有则默认为 en
        language = html_tag.get("lang", "en") if html_tag else "en"
        # 提取网页正文(所有可见文本,按换行分隔)
        text_content = soup.get_text(separator="\n", strip=True)

        # 返回网页内容,标题,语言
        return {
            "content": text_content,
            "title": title,
            "language": language,
        }

    # 加载所有网页并返回 Document 对象列表
    def load(self) -> List[Document]:
        """
        加载所有指定的网页并返回 Document 列表。

        Returns:
            Document 对象列表,每个网页对应一个 Document
        """
        # 新建存储所有 Document 的列表
        docs: List[Document] = []
        # 遍历待抓取的每一个 URL
        for url in self.web_paths:
            # 抓取网页内容和信息
            scraped_data = self._scrape(url)
            # 构建元数据,包括来源 URL、标题和语言
            metadata = {
                "source": url,
                "title": scraped_data["title"],
                "language": scraped_data["language"],
            }
            # 创建 Document 对象,加入结果列表
            docs.append(
                Document(
                    page_content=scraped_data["content"],
                    metadata=metadata,
                )
            )
        # 返回所有 Document 对象
        return docs

# 定义 CSVLoader 类
+class CSVLoader:
+   """
+   CSV 加载器:读取 CSV 文件,将每一行转换为 Document 对象。
+   每行数据格式化为 "列名: 值\n列名: 值..." 的文本格式。
+   """

    # 构造函数,初始化类成员变量
+   def __init__(
+       self,
+       file_path,                # CSV 文件路径
+       source_column=None,       # 指定 source 的列名(可选)
+       metadata_columns=(),      # 指定作为元数据的列列表(可选)
+       csv_args=None,            # 传递给 csv.DictReader 的参数(可选)
+       encoding=None,            # 文件编码(可选)
+       autodetect_encoding=False # 是否自动检测编码(可选)
+   ):
        # 保存文件路径
+       self.file_path = file_path
        # 保存 source 列名称
+       self.source_column = source_column
        # 保存元数据列列表
+       self.metadata_columns = metadata_columns
        # 如果 csv_args 为 None,则用空字典
+       self.csv_args = csv_args or {}
        # 保存文件编码
+       self.encoding = encoding
        # 保存是否自动检测编码
+       self.autodetect_encoding = autodetect_encoding

    # 加载 CSV 文件并返回 Document 对象列表
+   def load(self):
+       """
+       加载 CSV 文件并返回 Document 列表。

+       Returns:
+           Document 对象列表,每行 CSV 对应一个 Document
+       """
        # 检查指定的文件路径是否存在
+       if not os.path.exists(self.file_path):
+           raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 初始化 Document 列表
+       docs = []

        # 尝试使用指定的编码打开文件
+       try:
            # 打开 CSV 文件,指定换行和编码
+           with open(self.file_path, newline="", encoding=self.encoding) as csvfile:
                # 调用内部方法读取文件并生成 Document 列表
+               docs = self._read_file(csvfile)
        # 捕获解码错误(编码不匹配时)
+       except UnicodeDecodeError as e:
            # 如果设置了自动检测编码
+           if self.autodetect_encoding:
                # 依次尝试常见的几种编码
+               encodings_to_try = ["utf-8", "utf-8-sig", "gbk", "latin-1"]
+               for enc in encodings_to_try:
+                   try:
                        # 按当前尝试的编码重新打开文件
+                       with open(self.file_path, newline="", encoding=enc) as csvfile:
                            # 读取文件
+                           docs = self._read_file(csvfile)
                            # 如果读取成功则结束尝试
+                           break
+                   except UnicodeDecodeError:
                        # 解码失败则继续尝试下一个编码
+                       continue
                # 如果所有编码都读取失败,抛出异常
+               if not docs:
+                   raise RuntimeError(f"无法以常见编码读取文件 {self.file_path}") from e
+           else:
                # 未设置自动检测编码,直接抛出运行时异常
+               raise RuntimeError(f"编码错误,无法读取文件 {self.file_path}") from e
        # 捕获其他打开或读取文件的异常
+       except Exception as e:
            # 抛出运行时异常,包含原始异常信息
+           raise RuntimeError(f"读取文件 {self.file_path} 时出错") from e

        # 返回 Document 列表
+       return docs

    # 内部方法:读取 CSV 文件内容并生成每行的 Document
+   def _read_file(self, csvfile):
+       """
+       从已打开的 CSV 文件中读取数据并生成 Document 对象。

+       Args:
+           csvfile: 已打开的 CSV 文件对象

+       Returns:
+           Document 对象列表
+       """
        # 初始化 Document 列表
+       docs = []
        # 使用 DictReader 处理 csv,每行是一个字典
+       csv_reader = csv.DictReader(csvfile, **self.csv_args)

        # 遍历每一行数据
+       for i, row in enumerate(csv_reader):
            # 确定 source 元数据
+           try:
                # 如果指定了 source 列名,则取该列的值
+               if self.source_column is not None:
+                   source = row[self.source_column]
+               else:
                    # 未指定时以文件路径为 source
+                   source = str(self.file_path)
+           except KeyError:
                # 指定的 source 列名不存在时报错
+               raise ValueError(
+                   f"源列 '{self.source_column}' 在 CSV 文件中不存在。"
+               )

            # 构造文档主体内容(排除元数据列)
+           content_parts = []
            # 遍历每列字段及其值
+           for k, v in row.items():
                # 跳过元数据列,只加入正文内容
+               if k not in self.metadata_columns:
                    # 去除列名两端空白
+                   key = k.strip() if k is not None else ""
                    # 若值为字符串则去两端空白,否则转为字符串
+                   if isinstance(v, str):
+                       value = v.strip()
+                   else:
+                       value = str(v) if v is not None else ""
                    # 拼接为 '列名: 值' 形式加入内容列表
+                   content_parts.append(f"{key}: {value}")

            # 用换行连接所有列内容,组成正文内容
+           content = "\n".join(content_parts)

            # 构建元数据字典,包括 source 和行号
+           metadata = {
+               "source": source,
+               "row": i,  # 行号,从0开始
+           }
            # 添加额外指定的元数据列
+           for col in self.metadata_columns:
+               try:
                    # 取出元数据列的值
+                   metadata[col] = row[col]
+               except KeyError:
                    # 元数据列不存在时报错
+                   raise ValueError(f"元数据列 '{col}' 在 CSV 文件中不存在。")

            # 创建 Document 对象并添加到文档列表
+           docs.append(Document(page_content=content, metadata=metadata))

        # 返回所有生成的 Document 对象列表
+       return docs

39.LineByLineLoader #

LineByLineLoader 是一种简单的文本文件加载器,用于将文本文件中的每一行内容作为一个独立的 Document 进行处理。这种加载方式通常适用于每行为独立语义单元的场景,比如日志、代码片段、FAQ 列表、逐行标注文本等。

其核心实现思路如下:

  • 通过 lazy_load() 方法,逐行读取文本文件内容,每行去除首尾空格后作为 page_content;
  • 针对每一行,都附加了元数据(metadata),如文件路径、当前的行号(从0开始)等,方便后续追溯来源;
  • 继承自 BaseLoader,因此也支持一次性全部加载(即使用 load() 方法)。

这种按行处理的加载方式,能够让用户对结构化或半结构化的文本文件实现快速解析和后续操作。

下面演示了如何使用 LineByLineLoader 读取文本文件,并输出每一行为一个 Document。你可以根据自身业务需要修改元数据内容,或实现更复杂的文本解析逻辑。

39.1. 39.BaseLoader.py #

39.BaseLoader.py

# 导入所需的 Document 类和 BaseLoader 基类
from smartchain.document_loaders import Document, BaseLoader

# 定义逐行文本加载器,每一行为一个 Document
class LineByLineLoader(BaseLoader):
    # 初始化函数,接收文件路径和编码
    def __init__(self, file_path: str, encoding: str = "utf-8"):
        # 保存文件路径到实例
        self.file_path = file_path
        # 保存文本编码方式到实例
        self.encoding = encoding

    # 懒加载方法,逐行读取文本内容
    def lazy_load(self):
        # 以指定编码打开文件进行读取
        with open(self.file_path, "r", encoding=self.encoding) as f:
            # 枚举每一行内容,i 为行号,line 为文本
            for i, line in enumerate(f):
                # 去除每行的首尾空白后生成 Document 并返回(yield)
                yield Document(
                    page_content=line.strip(),
                    metadata={"source": self.file_path, "line_number": i}
                )

# 指定示例文件路径
sample_file = "files/sample_lines.txt"
# 使用 lazy_load() 方法加载文件内容,每行为一个 Document 对象
docs1 = list(LineByLineLoader(sample_file).lazy_load())
print(f"lazy_load() {len(docs1)}:", [d.page_content for d in docs1])

# 使用 load() 方法加载文件内容(BaseLoader 提供的方法)
docs2 = LineByLineLoader(sample_file).load()
print(f"load() {len(docs2)}:", [d.page_content for d in docs2])
print(f"元数据示例: {docs2[0].metadata}")

39.2. sample_lines.txt #

files/sample_lines.txt

第一行内容
第二行内容
第三行内容
第四行内容

39.3. document_loaders.py #

smartchain/document_loaders.py

from dataclasses import dataclass, field
import os
import csv
+from typing import Optional, List, Dict, Union, Sequence, Iterator
+from abc import ABC, abstractmethod
from urllib.request import urlopen, Request
from urllib.parse import urlparse
import ssl


@dataclass
class Document:
    """文档对象,包含内容和元数据"""

    page_content: str = ""
    metadata: Dict = field(default_factory=dict)

# 定义 BaseLoader 抽象基类
+class BaseLoader(ABC):
+   """
+   文档加载器的抽象基类。

+   子类需要实现 `lazy_load()` 方法,使用生成器模式来避免一次性加载所有文档到内存中。
+   `load()` 方法已经提供,子类通常不需要重写它。
+   """

    # 加载所有文档并返回 Document 对象的列表
+   def load(self) -> List[Document]:
+       """
+       加载所有文档并返回 Document 列表。

+       Returns:
+           Document 对象列表
+       """
        # 调用 lazy_load() 方法并将结果转换为列表,再返回该列表
+       return list(self.lazy_load())

    # 将 lazy_load 定义为抽象方法,要求子类必须实现
+   @abstractmethod
+   def lazy_load(self) -> Iterator[Document]:
+       """
+       惰性加载方法,返回 Document 对象的迭代器。

+       子类必须实现此方法,使用生成器(yield)来逐个产生 Document 对象。
+       这样可以避免一次性加载所有文档到内存中。

+       Yields:
+           Document 对象
+       """
        # 若子类未实现此方法,则抛出 NotImplementedError 并提示必须重写
+       msg = f"{self.__class__.__name__} 必须实现 lazy_load() 方法"
+       raise NotImplementedError(msg)


class TextLoader:
    """
    文本文件加载器,将本地文本读取为 Document 列表。

    Args:
        file_path: 待读取的文本文件路径
        encoding: 指定编码;若为 None 且 autodetect_encoding=True,则尝试多种常见编码
        autodetect_encoding: 是否自动探测常见编码(utf-8, gbk, latin-1)
    """

    def __init__(
        self,
        file_path: str,
        encoding: Optional[str] = None,
        autodetect_encoding: bool = False,
    ):
        self.file_path = file_path
        self.encoding = encoding
        self.autodetect_encoding = autodetect_encoding

    def _detect_and_read(self) -> str:
        """读取文件内容,支持编码探测"""
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 指定编码
        if self.encoding:
            with open(self.file_path, "r", encoding=self.encoding, errors="ignore") as f:
                return f.read()

        # 自动探测常见编码
        tried = []
        for enc in (["utf-8", "utf-8-sig", "gbk", "latin-1"] if self.autodetect_encoding else ["utf-8"]):
            try:
                with open(self.file_path, "r", encoding=enc) as f:
                    return f.read()
            except Exception as e:
                tried.append((enc, str(e)))
                continue

        raise UnicodeDecodeError(
            "auto", b"", 0, 1, f"无法以常见编码读取文件,尝试过: {tried}"
        )

    def load(self) -> List[Document]:
        """读取文件并返回 Document 列表"""
        content = self._detect_and_read()
        metadata = {"source": self.file_path}
        return [Document(page_content=content, metadata=metadata)]

# 定义 PyPDFLoader 类
class PyPDFLoader:
    """
    PDF 加载器:按页读取文本,返回 Document 列表。
    依赖 PyPDF2。若未安装,请先 `pip install PyPDF2`。
    """

    # 构造函数,接收 PDF 文件路径
    def __init__(self, file_path: str):
        # 保存文件路径到实例变量
        self.file_path = file_path

    # 加载 PDF 文件并返回 Document 对象列表
    def load(self) -> List[Document]:
        # 检查文件是否存在,如果不存在则抛出异常
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 PyPDF2 库
        try:
            import PyPDF2
        # 如果没有安装 PyPDF2,则抛出导入异常
        except ImportError:
            raise ImportError("需要安装 PyPDF2 才能使用 PyPDFLoader:pip install PyPDF2")

        # 初始化 Document 列表
        docs: List[Document] = []
        # 以二进制只读模式打开 PDF 文件
        with open(self.file_path, "rb") as f:
            # 创建 PDF 读取器对象
            reader = PyPDF2.PdfReader(f)
            # 枚举每个页面
            for idx, page in enumerate(reader.pages):
                # 提取当前页文本,如果无内容则使用空字符串
                content = page.extract_text() or ""
                # 构建当前页的元数据信息
                metadata = {
                    "source": self.file_path,
                    "page": idx + 1,
                    "total_pages": len(reader.pages),
                }
                # 创建 Document 对象并添加到列表
                docs.append(Document(page_content=content, metadata=metadata))
        # 返回全部 Document 对象列表
        return docs


# 定义 Docx2txtLoader 类
class Docx2txtLoader:
    """
    DOCX 加载器:读取 Word 文档文本,返回 Document 列表。
    依赖 python-docx。若未安装,请先 `pip install python-docx`。
    """

    # 构造函数,接收 DOCX 文件路径
    # 参数 file_path: DOCX 文件的路径
    def __init__(self, file_path: str):
        # 保存文件路径到实例属性
        self.file_path = file_path

    # 加载 DOCX 文件并返回 Document 对象列表
    def load(self) -> List[Document]:
        # 检查 DOCX 文件是否存在,若不存在则抛出 FileNotFoundError
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")
        # 尝试导入 python-docx 库中的 Document,如果未安装,则捕获异常
        try:
            from docx import Document as DocxDocument
        # 如果没有安装 python-docx,则抛出导入异常,告知用户安装
        except ImportError:
            raise ImportError("需要安装 python-docx 才能使用 Docx2txtLoader:pip install python-docx")

        # 打开 DOCX 文件,创建文档对象
        doc = DocxDocument(self.file_path)
        # 初始化段落文本列表,用于存储所有段落的内容
        paragraphs = []
        # 遍历文档中的每一个段落对象
        for para in doc.paragraphs:
            # 获取该段落的文本,去除首尾空白字符
            text = para.text.strip()
            # 判断段落内容是否非空,非空则添加到段落列表
            if text:
                paragraphs.append(text)
        # 将所有段落文本用换行符拼接为一个整体内容字符串
        content = "\n".join(paragraphs)
        # 构建文档的元数据信息,标明来源文件路径
        metadata = {
            "source": self.file_path,
        }
        # 构造并返回包含 Document 对象的单元素列表
        return [Document(page_content=content, metadata=metadata)]

# 定义 WebBaseLoader 类
class WebBaseLoader:
    """
    网页加载器:从网页 URL 读取文本内容,返回 Document 列表。
    依赖 beautifulsoup4。若未安装,请先 `pip install beautifulsoup4`。
    """

    # 构造函数,接收网页路径列表
    # 参数 web_paths: 网页 URL 列表,可以是单个 URL 字符串或 URL 列表
    def __init__(self, web_paths: Union[str, List[str]]):
        # 如果传入的是单个字符串,则将其转为列表
        if isinstance(web_paths, str):
            self.web_paths = [web_paths]
        else:
            # 如果本身就是列表,则直接赋值
            self.web_paths = web_paths

    # 抓取单个网页内容的方法
    def _scrape(self, url: str) -> Dict[str, str]:
        """
        抓取网页内容并解析 HTML。

        Args:
            url: 要抓取的网页 URL

        Returns:
            包含 'content'、'title'、'language' 的字典
        """
        # 尝试导入 BeautifulSoup 库
        try:
            from bs4 import BeautifulSoup
        # 如果未安装 beautifulsoup4,则抛出异常
        except ImportError:
            raise ImportError("需要安装 beautifulsoup4 才能使用 WebBaseLoader:pip install beautifulsoup4")

        # 构造 HTTP 请求对象,设置 User-Agent
        req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
        # 创建 SSL 上下文,关闭证书验证
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        # 打开 url 并读取响应内容
        with urlopen(req, context=ssl_context) as response:
            # 读取原始 HTML 字节数据
            html_content = response.read()
            # 尝试从响应中获取字符编码,没有则用 utf-8
            encoding = response.headers.get_content_charset() or "utf-8"
            # 按指定编码解码 HTML 内容
            html_text = html_content.decode(encoding, errors="ignore")

        # 用 BeautifulSoup 解析 HTML 文档
        soup = BeautifulSoup(html_text, "html.parser")
        # 移除 script 和 style 标签(不需要其内容)
        for script in soup(["script", "style"]):
            script.decompose()

        # 查找网页标题
        title_tag = soup.find("title")
        # 获取标题文本(若未找到则为空字符串)
        title = title_tag.get_text().strip() if title_tag else ""
        # 查找 html 根标签
        html_tag = soup.find("html")
        # 获取 lang 属性作为语言,没有则默认为 en
        language = html_tag.get("lang", "en") if html_tag else "en"
        # 提取网页正文(所有可见文本,按换行分隔)
        text_content = soup.get_text(separator="\n", strip=True)

        # 返回网页内容,标题,语言
        return {
            "content": text_content,
            "title": title,
            "language": language,
        }

    # 加载所有网页并返回 Document 对象列表
    def load(self) -> List[Document]:
        """
        加载所有指定的网页并返回 Document 列表。

        Returns:
            Document 对象列表,每个网页对应一个 Document
        """
        # 新建存储所有 Document 的列表
        docs: List[Document] = []
        # 遍历待抓取的每一个 URL
        for url in self.web_paths:
            # 抓取网页内容和信息
            scraped_data = self._scrape(url)
            # 构建元数据,包括来源 URL、标题和语言
            metadata = {
                "source": url,
                "title": scraped_data["title"],
                "language": scraped_data["language"],
            }
            # 创建 Document 对象,加入结果列表
            docs.append(
                Document(
                    page_content=scraped_data["content"],
                    metadata=metadata,
                )
            )
        # 返回所有 Document 对象
        return docs

# 定义 CSVLoader 类
class CSVLoader:
    """
    CSV 加载器:读取 CSV 文件,将每一行转换为 Document 对象。
    每行数据格式化为 "列名: 值\n列名: 值..." 的文本格式。
    """

    # 构造函数,初始化类成员变量
    def __init__(
        self,
        file_path,                # CSV 文件路径
        source_column=None,       # 指定 source 的列名(可选)
        metadata_columns=(),      # 指定作为元数据的列列表(可选)
        csv_args=None,            # 传递给 csv.DictReader 的参数(可选)
        encoding=None,            # 文件编码(可选)
        autodetect_encoding=False # 是否自动检测编码(可选)
    ):
        # 保存文件路径
        self.file_path = file_path
        # 保存 source 列名称
        self.source_column = source_column
        # 保存元数据列列表
        self.metadata_columns = metadata_columns
        # 如果 csv_args 为 None,则用空字典
        self.csv_args = csv_args or {}
        # 保存文件编码
        self.encoding = encoding
        # 保存是否自动检测编码
        self.autodetect_encoding = autodetect_encoding

    # 加载 CSV 文件并返回 Document 对象列表
    def load(self):
        """
        加载 CSV 文件并返回 Document 列表。

        Returns:
            Document 对象列表,每行 CSV 对应一个 Document
        """
        # 检查指定的文件路径是否存在
        if not os.path.exists(self.file_path):
            raise FileNotFoundError(f"文件不存在: {self.file_path}")

        # 初始化 Document 列表
        docs = []

        # 尝试使用指定的编码打开文件
        try:
            # 打开 CSV 文件,指定换行和编码
            with open(self.file_path, newline="", encoding=self.encoding) as csvfile:
                # 调用内部方法读取文件并生成 Document 列表
                docs = self._read_file(csvfile)
        # 捕获解码错误(编码不匹配时)
        except UnicodeDecodeError as e:
            # 如果设置了自动检测编码
            if self.autodetect_encoding:
                # 依次尝试常见的几种编码
                encodings_to_try = ["utf-8", "utf-8-sig", "gbk", "latin-1"]
                for enc in encodings_to_try:
                    try:
                        # 按当前尝试的编码重新打开文件
                        with open(self.file_path, newline="", encoding=enc) as csvfile:
                            # 读取文件
                            docs = self._read_file(csvfile)
                            # 如果读取成功则结束尝试
                            break
                    except UnicodeDecodeError:
                        # 解码失败则继续尝试下一个编码
                        continue
                # 如果所有编码都读取失败,抛出异常
                if not docs:
                    raise RuntimeError(f"无法以常见编码读取文件 {self.file_path}") from e
            else:
                # 未设置自动检测编码,直接抛出运行时异常
                raise RuntimeError(f"编码错误,无法读取文件 {self.file_path}") from e
        # 捕获其他打开或读取文件的异常
        except Exception as e:
            # 抛出运行时异常,包含原始异常信息
            raise RuntimeError(f"读取文件 {self.file_path} 时出错") from e

        # 返回 Document 列表
        return docs

    # 内部方法:读取 CSV 文件内容并生成每行的 Document
    def _read_file(self, csvfile):
        """
        从已打开的 CSV 文件中读取数据并生成 Document 对象。

        Args:
            csvfile: 已打开的 CSV 文件对象

        Returns:
            Document 对象列表
        """
        # 初始化 Document 列表
        docs = []
        # 使用 DictReader 处理 csv,每行是一个字典
        csv_reader = csv.DictReader(csvfile, **self.csv_args)

        # 遍历每一行数据
        for i, row in enumerate(csv_reader):
            # 确定 source 元数据
            try:
                # 如果指定了 source 列名,则取该列的值
                if self.source_column is not None:
                    source = row[self.source_column]
                else:
                    # 未指定时以文件路径为 source
                    source = str(self.file_path)
            except KeyError:
                # 指定的 source 列名不存在时报错
                raise ValueError(
                    f"源列 '{self.source_column}' 在 CSV 文件中不存在。"
                )

            # 构造文档主体内容(排除元数据列)
            content_parts = []
            # 遍历每列字段及其值
            for k, v in row.items():
                # 跳过元数据列,只加入正文内容
                if k not in self.metadata_columns:
                    # 去除列名两端空白
                    key = k.strip() if k is not None else ""
                    # 若值为字符串则去两端空白,否则转为字符串
                    if isinstance(v, str):
                        value = v.strip()
                    else:
                        value = str(v) if v is not None else ""
                    # 拼接为 '列名: 值' 形式加入内容列表
                    content_parts.append(f"{key}: {value}")

            # 用换行连接所有列内容,组成正文内容
            content = "\n".join(content_parts)

            # 构建元数据字典,包括 source 和行号
            metadata = {
                "source": source,
                "row": i,  # 行号,从0开始
            }
            # 添加额外指定的元数据列
            for col in self.metadata_columns:
                try:
                    # 取出元数据列的值
                    metadata[col] = row[col]
                except KeyError:
                    # 元数据列不存在时报错
                    raise ValueError(f"元数据列 '{col}' 在 CSV 文件中不存在。")

            # 创建 Document 对象并添加到文档列表
            docs.append(Document(page_content=content, metadata=metadata))

        # 返回所有生成的 Document 对象列表
        return docs

← 上一节 7.memory 下一节 9.text_splitters →

访问验证

请输入访问令牌

Token不正确,请重新输入