导航菜单

  • 1.langchain.intro
  • 2.langchain.chat_models
  • 3.langchain.prompts
  • 4.langchain.example_selectors
  • 5.output_parsers
  • 6.Runnable
  • 7.memory
  • 8.document_loaders
  • 9.text_splitters
  • 10.embeddings
  • 11.tool
  • 12.retrievers
  • 13.optimize
  • 14.项目介绍
  • 15.启动HTTP
  • 16.数据与模型
  • 17.权限管理
  • 18.知识库管理
  • 19.设置
  • 20.文档管理
  • 21.聊天
  • 22.API文档
  • 23.RAG优化
  • 24.索引时优化
  • 25.检索前优化
  • 26.检索后优化
  • 27.系统优化
  • 28.GraphRAG
  • 29.图
  • 30.为什么选择图数据库
  • 31.什么是 Neo4j
  • 32.安装和连接 Neo4j
  • 33.Neo4j核心概念
  • 34.Cypher基础
  • 35.模式匹配
  • 36.数据CRUD操作
  • 37.GraphRAG
  • 38.查询和过滤
  • 39.结果处理和聚合
  • 40.语句组合
  • 41.子查询
  • 42.模式和约束
  • 43.日期时间处理
  • 44.Cypher内置函数
  • 45.Python操作Neo4j
  • 46.neo4j
  • 47.py2neo
  • 48.Streamlit
  • 49.Pandas
  • 50.graphRAG
  • 51.deepdoc
  • 52.deepdoc
  • 53.deepdoc
  • 55.deepdoc
  • 54.deepdoc
  • Pillow
  • 40.CharacterTextSplitter
    • 40.1. 40.CharacterTextSplitter.py
    • 40.2. text_splitters.py
  • 41.RecursiveCharacterTextSplitter
    • 41.1. 41.RecursiveCharacterTextSplitter.py
    • 41.2. text_splitters.py
  • 42.TokenTextSplitter
    • 42.1. 42.TokenTextSplitter.py
    • 42.2. text_splitters.py
  • 43. MarkdownTextSplitter
    • 43.1. 43.MarkdownTextSplitter.py
    • 43.2. text_splitters.py

40.CharacterTextSplitter #

字符型文本分割器(CharacterTextSplitter)是最基础的一类文本分割工具。其核心思想是将一段字符串按照指定的字符数切分成多个小块,且允许不同块之间存在“重叠区”。这种方式适合对没有特殊结构的文本进行粗粒度的快速处理。例如:长段文本、无明显语义段落的数据切分,用于后续如向量化与检索等场景。

其主要功能和参数包括:

  • separator:分隔符,默认可为空字符串,表示逐字符切分,也可以设置如换行等字符。
  • chunk_size:每个文本块允许的最大字符数。
  • chunk_overlap:相邻文本块之间的重叠区域长度。

使用流程为:将原始文本按照分隔符、块大小、重叠长度进行切分,生成适合作为语料片段的小块文本序列。这种切分方法适合简单的中文、英文或代码片段初步拆分,但不关心内容的句子、段落结构。对于更复杂的分割需求,推荐使用递归分割器等更高级功能。

40.1. 40.CharacterTextSplitter.py #

40.CharacterTextSplitter.py

# 导入字符型文本分割器
#from langchain_text_splitters import CharacterTextSplitter
from smartchain.text_splitters import CharacterTextSplitter

# 定义一个需要被切分的长文本
text = (
    "LangChain 文本分割示例。"
    "CharacterTextSplitter 会按固定字符数切分,并可重叠。"
    "适合在构建向量索引前先做简单分片。"
)

# 初始化分割器对象,每块20个字符,块与块之间重叠5个字符
splitter = CharacterTextSplitter(
    separator="",
    chunk_size=20,
    chunk_overlap=5,
)

# 执行文本切分操作,返回文本块列表
chunks = splitter.split_text(text)

# 打印原始文本长度
print(f"原文本长度: {len(text)} 字符")
# 打印切分后获得了多少块
print(f"切分得到 {len(chunks)} 块:")
# 逐块打印每一个分片和该分片的字符数
for i, c in enumerate(chunks, 1):
    print(f"片段{i} ({len(c)} 字符): {c}")

40.2. text_splitters.py #

smartchain/text_splitters.py

"""文本分割器模块,提供文本分割功能。"""

# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
    """
    文本分割器的抽象基类。
    子类需要实现 split_text() 方法来定义具体的分割逻辑。
    """

    # 构造函数,初始化分割器参数
    def __init__(
        self,
        chunk_size = 4000,         # 每个块的最大字符数,默认为4000
        chunk_overlap = 200,       # 块之间的重叠字符数,默认为200
        length_function = None,    # 计算文本长度的函数
        keep_separator = False,    # 是否保留分隔符,默认为False
        strip_whitespace = True,   # 是否去除首尾空白,默认为True
    ):
        """
        初始化文本分割器。

        Args:
            chunk_size: 每个块的最大字符数
            chunk_overlap: 块之间的重叠字符数
            length_function: 计算文本长度的函数,默认为 len
            keep_separator: 是否保留分隔符
            strip_whitespace: 是否去除首尾空白
        """
        # 检查 chunk_size 参数是否合法
        if chunk_size <= 0:
            raise ValueError(f"chunk_size 必须 > 0,当前值: {chunk_size}")
        # 检查 chunk_overlap 参数是否合法
        if chunk_overlap < 0:
            raise ValueError(f"chunk_overlap 必须 >= 0,当前值: {chunk_overlap}")
        # 检查 chunk_overlap 不得大于 chunk_size
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"chunk_overlap ({chunk_overlap}) 不能大于 chunk_size ({chunk_size})"
            )

        # 保存参数到成员变量
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._length_function = length_function if length_function is not None else len
        self._keep_separator = keep_separator
        self._strip_whitespace = strip_whitespace

    # 抽象方法,子类必须实现此方法
    @abstractmethod
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 强制子类必须实现 split_text 方法
        raise NotImplementedError("子类必须实现 split_text() 方法")

    # 合并分割后的文本块,控制结果块大小和重叠
    def _merge_splits(self, splits, separator):
        """
        将分割后的文本块合并,确保每个块不超过 chunk_size。

        Args:
            splits: 分割后的文本块列表
            separator: 用于连接文本块的分隔符

        Returns:
            合并后的文本块列表
        """
        # 计算分隔符的长度
        separator_len = self._length_function(separator)
        # 定义用于存储最终文本块的列表
        docs = []
        # 正在构建的当前文档块
        current_doc = []
        # 当前文档块字符总长度
        total = 0

        # 遍历每个文本分块
        for d in splits:
            # 获取当前分块的长度
            len_ = self._length_function(d)
            # 计算如果加上当前块后,文档块的总长度
            total_with_sep = (
                total + len_ + (separator_len if len(current_doc) > 0 else 0)
            )

            # 如果添加当前块会超过设定的 chunk_size
            if total_with_sep > self._chunk_size:
                # 如果当前拼接块不为空,先拼接并记录
                if len(current_doc) > 0:
                    # 拼接当前文档块
                    doc = self._join_docs(current_doc, separator)
                    # 如果拼接结果不为空,则添加到结果列表
                    if doc is not None:
                        docs.append(doc)
                    # 如果拼接结果为空,则跳过后续处理
                    if doc is None:
                        continue

                    # 处理重叠逻辑,只保留最后 chunk_overlap 个字符
                    target_size = self._chunk_overlap  # 需要保留的字符数
                    current_size = 0                   # 当前累计保留长度
                    keep_docs = []                     # 记录需要保留的分块

                    # 从后往前遍历当前文档块,直到累计长度达到重叠数
                    for i in range(len(current_doc) - 1, -1, -1):
                        # 取出每个子块
                        item = current_doc[i]  
                        # 计算每个子块的长度
                        item_len = self._length_function(item)
                        # 计算分隔符的长度
                        sep_len = separator_len if len(keep_docs) > 0 else 0
                        # 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
                        if current_size + item_len + sep_len <= target_size:
                            # 保留该分块,插入到 keep_docs 前面
                            keep_docs.insert(0, item)
                            current_size += item_len + sep_len
                        else:
                            # 一旦超过指定重叠字符数,则停止
                            break
                    # 更新当前文档块及总长度
                    current_doc = keep_docs
                    total = current_size

            # 把当前分块加入当前文档块
            current_doc.append(d)
            # 更新累计长度(已有分块需加分隔符长度)
            total += len_ + (separator_len if len(current_doc) > 1 else 0)

        # 合并处理最后剩余的一个文档块
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)

        # 返回处理后的所有文档块
        return docs

    # 用分隔符拼接分块
    def _join_docs(self, docs, separator):
        """
        使用分隔符连接文档块。

        Args:
            docs: 文档块列表
            separator: 分隔符

        Returns:
            连接后的文本,如果为空则返回 None
        """
        # 使用分隔符拼接各分块
        text = separator.join(docs)
        # 按需去除首尾空白符
        if self._strip_whitespace:
            text = text.strip()
        # 如拼接结果非空,返回文本,否则返回 None
        return text if text else None

# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
    """
    字符文本分割器:按字符数分割文本。
    可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
    """

    # 构造函数,初始化字符型文本分割器
    def __init__(
        self,
        separator = "\n\n",         # 分隔符,默认双换行
        is_separator_regex = False, # 分隔符是否为正则表达式
        **kwargs,                   # 其它参数传递给父类
    ):
        """
        初始化字符文本分割器。

        Args:
            separator: 分隔符,默认为双换行符 "\n\n"
            is_separator_regex: 分隔符是否为正则表达式
            **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
        """
        # 初始化父类参数
        super().__init__(**kwargs)
        # 保存分隔符
        self._separator = separator
        # 保存分隔符是否为正则表达式
        self._is_separator_regex = is_separator_regex

    # 重写文本分割方法
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 判断分隔符是否为正则表达式
        if self._is_separator_regex:
            # 直接使用正则表达式
            sep_pattern = self._separator
        else:
            # 普通字符要转义用于正则分割
            sep_pattern = re.escape(self._separator)

        # 使用分隔符(正则)分割文本
        splits = self._split_text_with_regex(text, sep_pattern)

        # 根据 self._keep_separator 决定合并分块时是否加分隔符
        merge_sep = "" if self._keep_separator else self._separator

        # 对分割结果做合并处理,返回
        return self._merge_splits(splits, merge_sep)

    # 使用正则表达式分割文本块
    def _split_text_with_regex(
        self, text, separator
    ):
        """
        使用正则表达式分割文本。

        Args:
            text: 要分割的文本
            separator: 分隔符(正则表达式)

        Returns:
            分割后的文本块列表(过滤掉空字符串)
        """
        # 如果分隔符不为空
        if separator:
            # 利用正则分割输入文本
            splits = re.split(separator, text)
        else:
            # 若分隔符为空,则将文本每个字符拆为一个子块
            splits = list(text)

        # 过滤掉空字符串,仅返回非空的块
        return [s for s in splits if s]

41.RecursiveCharacterTextSplitter #

递归字符文本分割器(RecursiveCharacterTextSplitter)是对基础字符分割器的高级扩展,其核心思想是“递归地”多级利用不同分隔符,对文本进行尽可能结构化且紧凑的分块切分。

它的主要机制如下:

  • 用户可提供一组分隔符(如:段落换行、普通换行、空格、无分隔),优先级从高到低。
  • 分割时,先用第一个分隔符切分。如果某些块仍超过 chunk_size 限制,则对这些超长块递归地用下一个分隔符继续分割。
  • 最后一个分隔符往往为空字符串,意味着在无法再细分时直接按字符截断,保证每个块最大不超过 chunk_size。
  • 可以选择是否在切分结果中保留分隔符内容(keep_separator)。

递归分割器特别适合对结构复杂、层级多、需细粒度控制的文本素材(如长文章、代码文件等)进行语义更相关的切片。它能更智能地尽量在合理的语义边界,比如段落或句子断点处切割文本,优先保留原始结构信息,同时解决极端情况下不得不打碎到字符级别的场景。

常用参数说明:

  • separators:分隔符列表,优先级高的在前,比如 ["\n\n", "\n", " ", ""]。
  • chunk_size:每块最大字符数。
  • chunk_overlap:相邻块的重叠区域长度。
  • keep_separator:是否在分块时保留分隔符本身(默认为True,更利于保留语义边界)。

41.1. 41.RecursiveCharacterTextSplitter.py #

41.RecursiveCharacterTextSplitter.py

# 导入递归字符型文本分割器
#from langchain_text_splitters import RecursiveCharacterTextSplitter
from smartchain.text_splitters import RecursiveCharacterTextSplitter

# 定义一个需要被切分的长文本
text = (
    "LangChain 文本分割示例。"
    "CharacterTextSplitter 会按固定字符数切分,并可重叠。"
    "适合在构建向量索引前先做简单分片。"
)

# 初始化分割器对象,每块20个字符,块与块之间重叠5个字符
splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],
    chunk_size=20,
    chunk_overlap=5,
)

# 执行文本切分操作,返回文本块列表
chunks = splitter.split_text(text)

# 打印原始文本长度
print(f"原文本长度: {len(text)} 字符")
# 打印切分后获得了多少块
print(f"切分得到 {len(chunks)} 块:")
# 逐块打印每一个分片和该分片的字符数
for i, c in enumerate(chunks, 1):
    print(f"片段{i} ({len(c)} 字符): {c}")

41.2. text_splitters.py #

smartchain/text_splitters.py

"""文本分割器模块,提供文本分割功能。"""

# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
    """
    文本分割器的抽象基类。
    子类需要实现 split_text() 方法来定义具体的分割逻辑。
    """

    # 构造函数,初始化分割器参数
    def __init__(
        self,
        chunk_size = 4000,         # 每个块的最大字符数,默认为4000
        chunk_overlap = 200,       # 块之间的重叠字符数,默认为200
        length_function = None,    # 计算文本长度的函数
        keep_separator = False,    # 是否保留分隔符,默认为False
        strip_whitespace = True,   # 是否去除首尾空白,默认为True
    ):
        """
        初始化文本分割器。

        Args:
            chunk_size: 每个块的最大字符数
            chunk_overlap: 块之间的重叠字符数
            length_function: 计算文本长度的函数,默认为 len
            keep_separator: 是否保留分隔符
            strip_whitespace: 是否去除首尾空白
        """
        # 检查 chunk_size 参数是否合法
        if chunk_size <= 0:
            raise ValueError(f"chunk_size 必须 > 0,当前值: {chunk_size}")
        # 检查 chunk_overlap 参数是否合法
        if chunk_overlap < 0:
            raise ValueError(f"chunk_overlap 必须 >= 0,当前值: {chunk_overlap}")
        # 检查 chunk_overlap 不得大于 chunk_size
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"chunk_overlap ({chunk_overlap}) 不能大于 chunk_size ({chunk_size})"
            )

        # 保存参数到成员变量
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._length_function = length_function if length_function is not None else len
        self._keep_separator = keep_separator
        self._strip_whitespace = strip_whitespace

    # 抽象方法,子类必须实现此方法
    @abstractmethod
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 强制子类必须实现 split_text 方法
        raise NotImplementedError("子类必须实现 split_text() 方法")

    # 合并分割后的文本块,控制结果块大小和重叠
    def _merge_splits(self, splits, separator):
        """
        将分割后的文本块合并,确保每个块不超过 chunk_size。

        Args:
            splits: 分割后的文本块列表
            separator: 用于连接文本块的分隔符

        Returns:
            合并后的文本块列表
        """
        # 计算分隔符的长度
        separator_len = self._length_function(separator)
        # 定义用于存储最终文本块的列表
        docs = []
        # 正在构建的当前文档块
        current_doc = []
        # 当前文档块字符总长度
        total = 0

        # 遍历每个文本分块
        for d in splits:
            # 获取当前分块的长度
            len_ = self._length_function(d)
            # 计算如果加上当前块后,文档块的总长度
            total_with_sep = (
                total + len_ + (separator_len if len(current_doc) > 0 else 0)
            )

            # 如果添加当前块会超过设定的 chunk_size
            if total_with_sep > self._chunk_size:
                # 如果当前拼接块不为空,先拼接并记录
                if len(current_doc) > 0:
                    # 拼接当前文档块
                    doc = self._join_docs(current_doc, separator)
                    # 如果拼接结果不为空,则添加到结果列表
                    if doc is not None:
                        docs.append(doc)
                    # 如果拼接结果为空,则跳过后续处理
                    if doc is None:
                        continue

                    # 处理重叠逻辑,只保留最后 chunk_overlap 个字符
                    target_size = self._chunk_overlap  # 需要保留的字符数
                    current_size = 0                   # 当前累计保留长度
                    keep_docs = []                     # 记录需要保留的分块

                    # 从后往前遍历当前文档块,直到累计长度达到重叠数
                    for i in range(len(current_doc) - 1, -1, -1):
                        # 取出每个子块
                        item = current_doc[i]  
                        # 计算每个子块的长度
                        item_len = self._length_function(item)
                        # 计算分隔符的长度
                        sep_len = separator_len if len(keep_docs) > 0 else 0
                        # 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
                        if current_size + item_len + sep_len <= target_size:
                            # 保留该分块,插入到 keep_docs 前面
                            keep_docs.insert(0, item)
                            current_size += item_len + sep_len
                        else:
                            # 一旦超过指定重叠字符数,则停止
                            break
                    # 更新当前文档块及总长度
                    current_doc = keep_docs
                    total = current_size

            # 把当前分块加入当前文档块
            current_doc.append(d)
            # 更新累计长度(已有分块需加分隔符长度)
            total += len_ + (separator_len if len(current_doc) > 1 else 0)

        # 合并处理最后剩余的一个文档块
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)

        # 返回处理后的所有文档块
        return docs

    # 用分隔符拼接分块
    def _join_docs(self, docs, separator):
        """
        使用分隔符连接文档块。

        Args:
            docs: 文档块列表
            separator: 分隔符

        Returns:
            连接后的文本,如果为空则返回 None
        """
        # 使用分隔符拼接各分块
        text = separator.join(docs)
        # 按需去除首尾空白符
        if self._strip_whitespace:
            text = text.strip()
        # 如拼接结果非空,返回文本,否则返回 None
        return text if text else None

# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
    """
    字符文本分割器:按字符数分割文本。
    可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
    """

    # 构造函数,初始化字符型文本分割器
    def __init__(
        self,
        separator = "\n\n",         # 分隔符,默认双换行
        is_separator_regex = False, # 分隔符是否为正则表达式
        **kwargs,                   # 其它参数传递给父类
    ):
        """
        初始化字符文本分割器。

        Args:
            separator: 分隔符,默认为双换行符 "\n\n"
            is_separator_regex: 分隔符是否为正则表达式
            **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
        """
        # 初始化父类参数
        super().__init__(**kwargs)
        # 保存分隔符
        self._separator = separator
        # 保存分隔符是否为正则表达式
        self._is_separator_regex = is_separator_regex

    # 重写文本分割方法
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 判断分隔符是否为正则表达式
        if self._is_separator_regex:
            # 直接使用正则表达式
            sep_pattern = self._separator
        else:
            # 普通字符要转义用于正则分割
            sep_pattern = re.escape(self._separator)

        # 使用分隔符(正则)分割文本
        splits = self._split_text_with_regex(text, sep_pattern)

        # 根据 self._keep_separator 决定合并分块时是否加分隔符
        merge_sep = "" if self._keep_separator else self._separator

        # 对分割结果做合并处理,返回
        return self._merge_splits(splits, merge_sep)

    # 使用正则表达式分割文本块
    def _split_text_with_regex(
        self, text, separator
    ):
        """
        使用正则表达式分割文本。

        Args:
            text: 要分割的文本
            separator: 分隔符(正则表达式)

        Returns:
            分割后的文本块列表(过滤掉空字符串)
        """
        # 如果分隔符不为空
        if separator:
            # 利用正则分割输入文本
            splits = re.split(separator, text)
        else:
            # 若分隔符为空,则将文本每个字符拆为一个子块
            splits = list(text)

        # 过滤掉空字符串,仅返回非空的块
        return [s for s in splits if s]


# 定义递归字符文本分割器
+class RecursiveCharacterTextSplitter(TextSplitter):
+   """
+   递归字符文本分割器:递归地尝试使用不同的分隔符来分割文本。

+   它会按照分隔符列表的顺序,尝试使用每个分隔符来分割文本。
+   如果某个片段仍然太大,会递归地使用下一个分隔符继续分割。
+   """

    # 构造函数,初始化递归字符文本分割器
+   def __init__(
+       self,
+       separators = None,          # 分隔符列表,默认为 ["\n\n", "\n", " ", ""]
+       keep_separator = True,     # 是否保留分隔符,默认为 True
+       is_separator_regex = False, # 分隔符是否为正则表达式
+       **kwargs,                   # 其它参数传递给父类
+   ):
+       """
+       初始化递归字符文本分割器。

+       Args:
+           separators: 分隔符列表,按优先级排序
+           keep_separator: 是否保留分隔符
+           is_separator_regex: 分隔符是否为正则表达式
+           **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
+       """
        # 初始化父类参数
+       super().__init__(keep_separator=keep_separator, **kwargs)
        # 如果未提供分隔符,使用默认值
+       self._separators = separators if separators is not None else ["\n\n", "\n", " ", ""]
        # 保存分隔符是否为正则表达式
+       self._is_separator_regex = is_separator_regex

    # 递归分割文本的内部方法
+   def _split_text(self, text, separators):
+       """
+       递归地分割文本。

+       Args:
+           text: 要分割的文本
+           separators: 分隔符列表

+       Returns:
+           分割后的文本块列表
+       """
        # 存储最终结果
+       final_chunks = []

        # 确定要使用的分隔符
        # 默认使用最后一个分隔符(通常是空字符串,表示按字符分割)
+       separator = separators[-1]
        # 存储后续要使用的分隔符
+       new_separators = []

        # 遍历分隔符列表,找到第一个在文本中存在的分隔符
+       for i, _s in enumerate(separators):
            # 确定分隔符模式(正则或转义)
+           separator_ = _s if self._is_separator_regex else re.escape(_s)
            # 如果分隔符为空字符串,直接使用它
+           if not _s:
+               separator = _s
+               break
            # 如果文本中存在该分隔符
+           if re.search(separator_, text):
+               separator = _s
                # 保存后续要使用的分隔符
+               new_separators = separators[i + 1:]
+               break

        # 确定分隔符模式用于分割
+       separator_ = separator if self._is_separator_regex else re.escape(separator)
        # 使用分隔符分割文本,传递 keep_separator 参数
+       splits = self._split_text_with_regex(text, separator_, self._keep_separator)

        # 处理分割后的片段
+       good_splits = []  # 存储长度合适的片段
        # 确定合并时使用的分隔符
+       separator_ = "" if self._keep_separator else separator

        # 遍历每个分割片段
+       for s in splits:
            # 如果片段长度小于 chunk_size,加入 good_splits
+           if self._length_function(s) < self._chunk_size:
+               good_splits.append(s)
+           else:
                # 如果片段太大,先处理之前积累的 good_splits
+               if good_splits:
                    # 合并 good_splits
+                   merged_text = self._merge_splits(good_splits, separator_)
+                   final_chunks.extend(merged_text)
+                   good_splits = []

                # 如果还有后续分隔符,递归分割当前片段
+               if new_separators:
+                   other_info = self._split_text(s, new_separators)
+                   final_chunks.extend(other_info)
+               else:
                    # 如果没有后续分隔符,直接添加当前片段
+                   final_chunks.append(s)

        # 处理剩余的 good_splits
+       if good_splits:
+           merged_text = self._merge_splits(good_splits, separator_)
+           final_chunks.extend(merged_text)

+       return final_chunks

    # 使用正则表达式分割文本
+   def _split_text_with_regex(self, text, separator, keep_separator=False):
+       """
+       使用正则表达式分割文本。

+       Args:
+           text: 要分割的文本
+           separator: 分隔符(正则表达式)
+           keep_separator: 是否保留分隔符

+       Returns:
+           分割后的文本块列表(过滤掉空字符串)
+       """
        # 如果分隔符不为空
+       if separator:
            # 如果需要保留分隔符
+           if keep_separator:
                # 使用括号捕获分隔符,这样分隔符会保留在结果中
+               splits_ = re.split(f"({separator})", text)
                # 将分隔符和文本片段合并
+               splits = []
+               for i in range(0, len(splits_) - 1, 2):
+                   if i + 1 < len(splits_):
                        # 将文本片段和分隔符合并
+                       splits.append(splits_[i] + splits_[i + 1])
                # 处理最后一个片段(如果有)
+               if len(splits_) % 2 == 1:
+                   splits.append(splits_[-1])
+           else:
                # 不保留分隔符,直接分割
+               splits = re.split(separator, text)
+       else:
            # 如果分隔符为空,将文本转换为字符列表
+           splits = list(text)

        # 过滤掉空字符串
+       return [s for s in splits if s]

    # 分割文本的公共方法
+   def split_text(self, text):
+       """
+       将文本分割成多个块。

+       Args:
+           text: 要分割的文本

+       Returns:
+           文本块列表
+       """
        # 调用内部递归分割方法
+       return self._split_text(text, self._separators)

42.TokenTextSplitter #

TokenTextSplitter 是一种基于 token 数量对文本进行分割的分割器,适用于语义分割和大模型场景(如向量化前的智能分片等)。
与传统的按字符或分隔符切分不同,TokenTextSplitter 利用 tiktoken(OpenAI 官方分词库)对文本编码成 token,然后按照设定的 chunk_size 和 chunk_overlap 参数进行滑窗、切片,保证每一段都不会超过指定的 token 数且有重叠区域。这样可以更好地适应大语言模型处理文本的方式,有效避免出现内容中断或上下文不连续的情况。

典型用法如下所示:

  1. 通过 encoding_name 选择分词方案(如 "cl100k_base" 适用于 GPT-4/3.5)。
  2. 按 token 数切分,并设置重叠,便于后续高效向量化或上下文检索。
  3. 支持中文、英文等多语言文本。

注意:在实际应用中,合理设置 chunk_size 和 chunk_overlap 能有效提升 LLM 召回率与答案相关性。

42.1. 42.TokenTextSplitter.py #

42.TokenTextSplitter.py

# 导入递归字符型文本分割器
#from langchain_text_splitters import TokenTextSplitter

from smartchain.text_splitters import TokenTextSplitter
import tiktoken
# 定义一个需要被切分的长文本
text = (
    "LangChain 文本分割示例。"
    "CharacterTextSplitter 会按固定字符数切分,并可重叠。"
    "适合在构建向量索引前先做简单分片。"
)

# 初始化分割器对象,每块20个字符,块与块之间重叠5个字符
splitter = TokenTextSplitter(
    chunk_size=20,
    chunk_overlap=5,
    encoding_name="cl100k_base",
)

# 执行文本切分操作,返回文本块列表
chunks = splitter.split_text(text)
# 获取编码器
encoding = tiktoken.get_encoding("cl100k_base")

# 打印原始文本长度
print(f"原文本长度: {len(encoding.encode(text))} 字符")
# 打印切分后获得了多少块
print(f"切分得到 {len(chunks)} 块:")
# 逐块打印每一个分片和该分片的字符数
for i, c in enumerate(chunks, 1):
    print(f"片段{i} ({len(encoding.encode(c))} tokens): {c}")

42.2. text_splitters.py #

smartchain/text_splitters.py

"""文本分割器模块,提供文本分割功能。"""

+from dataclasses import dataclass
# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
+import tiktoken
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
    """
    文本分割器的抽象基类。
    子类需要实现 split_text() 方法来定义具体的分割逻辑。
    """

    # 构造函数,初始化分割器参数
    def __init__(
        self,
        chunk_size = 4000,         # 每个块的最大字符数,默认为4000
        chunk_overlap = 200,       # 块之间的重叠字符数,默认为200
        length_function = None,    # 计算文本长度的函数
        keep_separator = False,    # 是否保留分隔符,默认为False
        strip_whitespace = True,   # 是否去除首尾空白,默认为True
    ):
        """
        初始化文本分割器。

        Args:
            chunk_size: 每个块的最大字符数
            chunk_overlap: 块之间的重叠字符数
            length_function: 计算文本长度的函数,默认为 len
            keep_separator: 是否保留分隔符
            strip_whitespace: 是否去除首尾空白
        """
        # 保存参数到成员变量
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._length_function = length_function if length_function is not None else len
        self._keep_separator = keep_separator
        self._strip_whitespace = strip_whitespace

    # 抽象方法,子类必须实现此方法
    @abstractmethod
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 强制子类必须实现 split_text 方法
        raise NotImplementedError("子类必须实现 split_text() 方法")

    # 合并分割后的文本块,控制结果块大小和重叠
    def _merge_splits(self, splits, separator):
        """
        将分割后的文本块合并,确保每个块不超过 chunk_size。

        Args:
            splits: 分割后的文本块列表
            separator: 用于连接文本块的分隔符

        Returns:
            合并后的文本块列表
        """
        # 计算分隔符的长度
        separator_len = self._length_function(separator)
        # 定义用于存储最终文本块的列表
        docs = []
        # 正在构建的当前文档块
        current_doc = []
        # 当前文档块字符总长度
        total = 0

        # 遍历每个文本分块
        for d in splits:
            # 获取当前分块的长度
            len_ = self._length_function(d)
            # 计算如果加上当前块后,文档块的总长度
            total_with_sep = (
                total + len_ + (separator_len if len(current_doc) > 0 else 0)
            )

            # 如果添加当前块会超过设定的 chunk_size
            if total_with_sep > self._chunk_size:
                # 如果当前拼接块不为空,先拼接并记录
                if len(current_doc) > 0:
                    # 拼接当前文档块
                    doc = self._join_docs(current_doc, separator)
                    # 如果拼接结果不为空,则添加到结果列表
                    if doc is not None:
                        docs.append(doc)
                    # 如果拼接结果为空,则跳过后续处理
                    if doc is None:
                        continue

                    # 处理重叠逻辑,只保留最后 chunk_overlap 个字符
                    target_size = self._chunk_overlap  # 需要保留的字符数
                    current_size = 0                   # 当前累计保留长度
                    keep_docs = []                     # 记录需要保留的分块

                    # 从后往前遍历当前文档块,直到累计长度达到重叠数
                    for i in range(len(current_doc) - 1, -1, -1):
                        # 取出每个子块
                        item = current_doc[i]  
                        # 计算每个子块的长度
                        item_len = self._length_function(item)
                        # 计算分隔符的长度
                        sep_len = separator_len if len(keep_docs) > 0 else 0
                        # 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
                        if current_size + item_len + sep_len <= target_size:
                            # 保留该分块,插入到 keep_docs 前面
                            keep_docs.insert(0, item)
                            current_size += item_len + sep_len
                        else:
                            # 一旦超过指定重叠字符数,则停止
                            break
                    # 更新当前文档块及总长度
                    current_doc = keep_docs
                    total = current_size

            # 把当前分块加入当前文档块
            current_doc.append(d)
            # 更新累计长度(已有分块需加分隔符长度)
            total += len_ + (separator_len if len(current_doc) > 1 else 0)

        # 合并处理最后剩余的一个文档块
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)

        # 返回处理后的所有文档块
        return docs

    # 用分隔符拼接分块
    def _join_docs(self, docs, separator):
        """
        使用分隔符连接文档块。

        Args:
            docs: 文档块列表
            separator: 分隔符

        Returns:
            连接后的文本,如果为空则返回 None
        """
        # 使用分隔符拼接各分块
        text = separator.join(docs)
        # 按需去除首尾空白符
        if self._strip_whitespace:
            text = text.strip()
        # 如拼接结果非空,返回文本,否则返回 None
        return text if text else None

# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
    """
    字符文本分割器:按字符数分割文本。
    可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
    """

    # 构造函数,初始化字符型文本分割器
    def __init__(
        self,
        separator = "\n\n",         # 分隔符,默认双换行
        is_separator_regex = False, # 分隔符是否为正则表达式
        **kwargs,                   # 其它参数传递给父类
    ):
        """
        初始化字符文本分割器。

        Args:
            separator: 分隔符,默认为双换行符 "\n\n"
            is_separator_regex: 分隔符是否为正则表达式
            **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
        """
        # 初始化父类参数
        super().__init__(**kwargs)
        # 保存分隔符
        self._separator = separator
        # 保存分隔符是否为正则表达式
        self._is_separator_regex = is_separator_regex

    # 重写文本分割方法
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 判断分隔符是否为正则表达式
        if self._is_separator_regex:
            # 直接使用正则表达式
            sep_pattern = self._separator
        else:
            # 普通字符要转义用于正则分割
            sep_pattern = re.escape(self._separator)

        # 使用分隔符(正则)分割文本
        splits = self._split_text_with_regex(text, sep_pattern)

        # 根据 self._keep_separator 决定合并分块时是否加分隔符
        merge_sep = "" if self._keep_separator else self._separator

        # 对分割结果做合并处理,返回
        return self._merge_splits(splits, merge_sep)

    # 使用正则表达式分割文本块
    def _split_text_with_regex(
        self, text, separator
    ):
        """
        使用正则表达式分割文本。

        Args:
            text: 要分割的文本
            separator: 分隔符(正则表达式)

        Returns:
            分割后的文本块列表(过滤掉空字符串)
        """
        # 如果分隔符不为空
        if separator:
            # 利用正则分割输入文本
            splits = re.split(separator, text)
        else:
            # 若分隔符为空,则将文本每个字符拆为一个子块
            splits = list(text)

        # 过滤掉空字符串,仅返回非空的块
        return [s for s in splits if s]


# 定义递归字符文本分割器
class RecursiveCharacterTextSplitter(TextSplitter):
    """
    递归字符文本分割器:递归地尝试使用不同的分隔符来分割文本。

    它会按照分隔符列表的顺序,尝试使用每个分隔符来分割文本。
    如果某个片段仍然太大,会递归地使用下一个分隔符继续分割。
    """

    # 构造函数,初始化递归字符文本分割器
    def __init__(
        self,
        separators = None,          # 分隔符列表,默认为 ["\n\n", "\n", " ", ""]
        keep_separator = True,     # 是否保留分隔符,默认为 True
        is_separator_regex = False, # 分隔符是否为正则表达式
        **kwargs,                   # 其它参数传递给父类
    ):
        """
        初始化递归字符文本分割器。

        Args:
            separators: 分隔符列表,按优先级排序
            keep_separator: 是否保留分隔符
            is_separator_regex: 分隔符是否为正则表达式
            **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
        """
        # 初始化父类参数
        super().__init__(keep_separator=keep_separator, **kwargs)
        # 如果未提供分隔符,使用默认值
        self._separators = separators if separators is not None else ["\n\n", "\n", " ", ""]
        # 保存分隔符是否为正则表达式
        self._is_separator_regex = is_separator_regex

    # 递归分割文本的内部方法
    def _split_text(self, text, separators):
        """
        递归地分割文本。

        Args:
            text: 要分割的文本
            separators: 分隔符列表

        Returns:
            分割后的文本块列表
        """
        # 存储最终结果
        final_chunks = []

        # 确定要使用的分隔符
        # 默认使用最后一个分隔符(通常是空字符串,表示按字符分割)
        separator = separators[-1]
        # 存储后续要使用的分隔符
        new_separators = []

        # 遍历分隔符列表,找到第一个在文本中存在的分隔符
        for i, _s in enumerate(separators):
            # 确定分隔符模式(正则或转义)
            separator_ = _s if self._is_separator_regex else re.escape(_s)
            # 如果分隔符为空字符串,直接使用它
            if not _s:
                separator = _s
                break
            # 如果文本中存在该分隔符
            if re.search(separator_, text):
                separator = _s
                # 保存后续要使用的分隔符
                new_separators = separators[i + 1:]
                break

        # 确定分隔符模式用于分割
        separator_ = separator if self._is_separator_regex else re.escape(separator)
        # 使用分隔符分割文本,传递 keep_separator 参数
        splits = self._split_text_with_regex(text, separator_, self._keep_separator)

        # 处理分割后的片段
        good_splits = []  # 存储长度合适的片段
        # 确定合并时使用的分隔符
        separator_ = "" if self._keep_separator else separator

        # 遍历每个分割片段
        for s in splits:
            # 如果片段长度小于 chunk_size,加入 good_splits
            if self._length_function(s) < self._chunk_size:
                good_splits.append(s)
            else:
                # 如果片段太大,先处理之前积累的 good_splits
                if good_splits:
                    # 合并 good_splits
                    merged_text = self._merge_splits(good_splits, separator_)
                    final_chunks.extend(merged_text)
                    good_splits = []

                # 如果还有后续分隔符,递归分割当前片段
                if new_separators:
                    other_info = self._split_text(s, new_separators)
                    final_chunks.extend(other_info)
                else:
                    # 如果没有后续分隔符,直接添加当前片段
                    final_chunks.append(s)

        # 处理剩余的 good_splits
        if good_splits:
            merged_text = self._merge_splits(good_splits, separator_)
            final_chunks.extend(merged_text)

        return final_chunks

    # 使用正则表达式分割文本
    def _split_text_with_regex(self, text, separator, keep_separator=False):
        """
        使用正则表达式分割文本。

        Args:
            text: 要分割的文本
            separator: 分隔符(正则表达式)
            keep_separator: 是否保留分隔符

        Returns:
            分割后的文本块列表(过滤掉空字符串)
        """
        # 如果分隔符不为空
        if separator:
            # 如果需要保留分隔符
            if keep_separator:
                # 使用括号捕获分隔符,这样分隔符会保留在结果中
                splits_ = re.split(f"({separator})", text)
                # 将分隔符和文本片段合并
                splits = []
                for i in range(0, len(splits_) - 1, 2):
                    if i + 1 < len(splits_):
                        # 将文本片段和分隔符合并
                        splits.append(splits_[i] + splits_[i + 1])
                # 处理最后一个片段(如果有)
                if len(splits_) % 2 == 1:
                    splits.append(splits_[-1])
            else:
                # 不保留分隔符,直接分割
                splits = re.split(separator, text)
        else:
            # 如果分隔符为空,将文本转换为字符列表
            splits = list(text)

        # 过滤掉空字符串
        return [s for s in splits if s]

    # 分割文本的公共方法
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 调用内部递归分割方法
        return self._split_text(text, self._separators)


# 定义 Tokenizer 数据类
# 引入 dataclass 装饰器并声明一个不可变的数据类 Tokenizer
+@dataclass(frozen=True)
+class Tokenizer:
    # 分块时 token 之间的重叠数量
+   chunk_overlap: int
    # 每个分块的 token 数量
+   tokens_per_chunk: int
    # 解码方法,接受 List[int] 返回字符串
+   decode: callable  
    # 编码方法,接受 str 返回 List[int]
+   encode: callable  


# 定义分词器分块函数
+def split_text_on_tokens(text, tokenizer: Tokenizer):
    # 使用分词器将文本切分为多个块
+   splits = []
    # 用分词器编码文本,得到 token id 列表
+   input_ids = tokenizer.encode(text)
    # 起始索引初始化为 0
+   start_idx = 0
    # 循环遍历输入 token
+   while start_idx < len(input_ids):
        # 计算当前块的结束索引,不超过总长度
+       cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
        # 获取当前分块的 token id 子列表
+       chunk_ids = input_ids[start_idx:cur_idx]
        # 若分块为空则结束循环
+       if not chunk_ids:
+           break
        # 解码分块为字符串
+       decoded = tokenizer.decode(chunk_ids)
        # 如果解码后字符串非空,则加入结果
+       if decoded:
+           splits.append(decoded)
        # 如果已到末尾,则跳出循环
+       if cur_idx == len(input_ids):
+           break
        # 下一步的起始索引,向前滑动 tokens_per_chunk - chunk_overlap
+       start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    # 返回所有切分好的文本块
+   return splits


# 定义 TokenTextSplitter 类,用于按 token 数量切分文本
+class TokenTextSplitter(TextSplitter):
+   """
+   Token 文本分割器:按 token 数分割文本。

+   依赖 tiktoken;chunk_size 与 chunk_overlap 都以 token 数计。
+   """

    # 构造函数,初始化编码方式等参数
+   def __init__(
+       self,
+       encoding_name,
+       **kwargs,
+   ):
+       """
+       Args:
+           **kwargs: 传递给基类的 chunk_size / chunk_overlap 等参数
+       """
        # 调用父类构造函数,传递相关参数
+       super().__init__(**kwargs)
        # 默认配置
+       self._tokenizer = tiktoken.get_encoding(encoding_name)

    # 按 token 数切分文本的主方法
+   def split_text(self, text):
+       """按 token 数切分文本"""

        # 定义内联的编码方法,传递特殊 token 设置
+       def _encode(x: str):
+           return self._tokenizer.encode(x)

        # 创建一个 Tokenizer 实例
+       tokenizer = Tokenizer(
+           chunk_overlap=self._chunk_overlap,
+           tokens_per_chunk=self._chunk_size,
+           decode=self._tokenizer.decode,
+           encode=_encode,
+       )
        # 使用分词器分块函数进行切分
+       return split_text_on_tokens(text=text, tokenizer=tokenizer)

43. MarkdownTextSplitter #

MarkdownTextSplitter 是专为 Markdown 格式文档打造的文本分割工具。
与常规 CharacterTextSplitter 不同,它会优先依据 Markdown 结构特征(如标题、代码块、分隔线和段落等)对文本进行递归式切分,从而尽量保持每个分片的语义完整。这能够更好地兼容知识库构建、向量索引及下游摘要、语义检索等场景,提升切片质量。

其核心特点有:

  • 多级分隔优先级:首先按照标题级别(如#、##),再按代码块(` )、分割线、段落双换行等切分,最后才是以字符粗粒度切断,最大限度避免割裂上下文。
  • 可自定义分隔符序列:通过调整separators顺序,可以灵活定义不同 Markdown 样式或自定义标记的优先级拆分方式。
  • 递归分割逻辑:当上一级分隔无法使分片大小合规时,会自动递归向下采用更细分隔规则,直到满足设定的chunk_size和chunk_overlap。

简单来说,如果你需要整理、搜索或向量化 Markdown 笔记、技术文档、博客等结构化文本,建议优先选择 MarkdownTextSplitter。它能够显著提升后续检索和内容聚合的表现,并更易用于多语种 Markdown 处理。

43.1. 43.MarkdownTextSplitter.py #

43.MarkdownTextSplitter.py

# 导入 TextSplitter 基类和 MarkdownTextSplitter
from smartchain.text_splitters import TextSplitter, MarkdownTextSplitter

# 定义一个 Markdown 文档示例
markdown_text = """# 第一章:介绍

这是第一章的内容。Markdown 是一种轻量级标记语言。

## 1.1 什么是 Markdown

Markdown 允许你使用易读易写的纯文本格式编写文档。

### 优点

- 简单易学
- 广泛支持
- 易于转换

## 1.2 基本语法

Markdown 支持多种语法元素。

``python
# 这是一个代码块示例
def hello():
    print("Hello, Markdown!")
``

---

# 第二章:高级特性

这是第二章的内容。

## 2.1 表格

Markdown 支持表格语法。

## 2.2 链接和图片

Markdown 支持链接和图片插入。

---

# 第三章:总结

这是最后一章的内容。
"""

# 创建 Markdown 文本分割器
splitter = MarkdownTextSplitter(
    chunk_size=100,      # 每个块最多 100 字符
    chunk_overlap=20,    # 块之间重叠 20 字符
)

# 分割 Markdown 文本
chunks = splitter.split_text(markdown_text)

# 打印结果
print("=" * 60)
print("Markdown 文本分割示例")
print("=" * 60)
print(f"原文本长度: {len(markdown_text)} 字符")
print(f"切分得到 {len(chunks)} 块:\n")

for i, chunk in enumerate(chunks, 1):
    print(f"--- 片段 {i} ({len(chunk)} 字符) ---")
    # 显示完整内容(如果太长则截断)
    if len(chunk) > 100:
        print(chunk[:100] + "...")
    else:
        print(chunk)
    print()

43.2. text_splitters.py #

smartchain/text_splitters.py

"""文本分割器模块,提供文本分割功能。"""

from dataclasses import dataclass
# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
import tiktoken
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
    """
    文本分割器的抽象基类。
    子类需要实现 split_text() 方法来定义具体的分割逻辑。
    """

    # 构造函数,初始化分割器参数
    def __init__(
        self,
        chunk_size = 4000,         # 每个块的最大字符数,默认为4000
        chunk_overlap = 200,       # 块之间的重叠字符数,默认为200
        length_function = None,    # 计算文本长度的函数
        keep_separator = False,    # 是否保留分隔符,默认为False
        strip_whitespace = True,   # 是否去除首尾空白,默认为True
    ):
        """
        初始化文本分割器。

        Args:
            chunk_size: 每个块的最大字符数
            chunk_overlap: 块之间的重叠字符数
            length_function: 计算文本长度的函数,默认为 len
            keep_separator: 是否保留分隔符
            strip_whitespace: 是否去除首尾空白
        """
        # 保存参数到成员变量
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._length_function = length_function if length_function is not None else len
        self._keep_separator = keep_separator
        self._strip_whitespace = strip_whitespace

    # 抽象方法,子类必须实现此方法
    @abstractmethod
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 强制子类必须实现 split_text 方法
        raise NotImplementedError("子类必须实现 split_text() 方法")

    # 合并分割后的文本块,控制结果块大小和重叠
    def _merge_splits(self, splits, separator):
        """
        将分割后的文本块合并,确保每个块不超过 chunk_size。

        Args:
            splits: 分割后的文本块列表
            separator: 用于连接文本块的分隔符

        Returns:
            合并后的文本块列表
        """
        # 计算分隔符的长度
        separator_len = self._length_function(separator)
        # 定义用于存储最终文本块的列表
        docs = []
        # 正在构建的当前文档块
        current_doc = []
        # 当前文档块字符总长度
        total = 0

        # 遍历每个文本分块
        for d in splits:
            # 获取当前分块的长度
            len_ = self._length_function(d)
            # 计算如果加上当前块后,文档块的总长度
            total_with_sep = (
                total + len_ + (separator_len if len(current_doc) > 0 else 0)
            )

            # 如果添加当前块会超过设定的 chunk_size
            if total_with_sep > self._chunk_size:
                # 如果当前拼接块不为空,先拼接并记录
                if len(current_doc) > 0:
                    # 拼接当前文档块
                    doc = self._join_docs(current_doc, separator)
                    # 如果拼接结果不为空,则添加到结果列表
                    if doc is not None:
                        docs.append(doc)
                    # 如果拼接结果为空,则跳过后续处理
                    if doc is None:
                        continue

                    # 处理重叠逻辑,只保留最后 chunk_overlap 个字符
                    target_size = self._chunk_overlap  # 需要保留的字符数
                    current_size = 0                   # 当前累计保留长度
                    keep_docs = []                     # 记录需要保留的分块

                    # 从后往前遍历当前文档块,直到累计长度达到重叠数
                    for i in range(len(current_doc) - 1, -1, -1):
                        # 取出每个子块
                        item = current_doc[i]  
                        # 计算每个子块的长度
                        item_len = self._length_function(item)
                        # 计算分隔符的长度
                        sep_len = separator_len if len(keep_docs) > 0 else 0
                        # 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
                        if current_size + item_len + sep_len <= target_size:
                            # 保留该分块,插入到 keep_docs 前面
                            keep_docs.insert(0, item)
                            current_size += item_len + sep_len
                        else:
                            # 一旦超过指定重叠字符数,则停止
                            break
                    # 更新当前文档块及总长度
                    current_doc = keep_docs
                    total = current_size

            # 把当前分块加入当前文档块
            current_doc.append(d)
            # 更新累计长度(已有分块需加分隔符长度)
            total += len_ + (separator_len if len(current_doc) > 1 else 0)

        # 合并处理最后剩余的一个文档块
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)

        # 返回处理后的所有文档块
        return docs

    # 用分隔符拼接分块
    def _join_docs(self, docs, separator):
        """
        使用分隔符连接文档块。

        Args:
            docs: 文档块列表
            separator: 分隔符

        Returns:
            连接后的文本,如果为空则返回 None
        """
        # 使用分隔符拼接各分块
        text = separator.join(docs)
        # 按需去除首尾空白符
        if self._strip_whitespace:
            text = text.strip()
        # 如拼接结果非空,返回文本,否则返回 None
        return text if text else None

# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
    """
    字符文本分割器:按字符数分割文本。
    可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
    """

    # 构造函数,初始化字符型文本分割器
    def __init__(
        self,
        separator = "\n\n",         # 分隔符,默认双换行
        is_separator_regex = False, # 分隔符是否为正则表达式
        **kwargs,                   # 其它参数传递给父类
    ):
        """
        初始化字符文本分割器。

        Args:
            separator: 分隔符,默认为双换行符 "\n\n"
            is_separator_regex: 分隔符是否为正则表达式
            **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
        """
        # 初始化父类参数
        super().__init__(**kwargs)
        # 保存分隔符
        self._separator = separator
        # 保存分隔符是否为正则表达式
        self._is_separator_regex = is_separator_regex

    # 重写文本分割方法
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 判断分隔符是否为正则表达式
        if self._is_separator_regex:
            # 直接使用正则表达式
            sep_pattern = self._separator
        else:
            # 普通字符要转义用于正则分割
            sep_pattern = re.escape(self._separator)

        # 使用分隔符(正则)分割文本
        splits = self._split_text_with_regex(text, sep_pattern)

        # 根据 self._keep_separator 决定合并分块时是否加分隔符
        merge_sep = "" if self._keep_separator else self._separator

        # 对分割结果做合并处理,返回
        return self._merge_splits(splits, merge_sep)

    # 使用正则表达式分割文本块
    def _split_text_with_regex(
        self, text, separator
    ):
        """
        使用正则表达式分割文本。

        Args:
            text: 要分割的文本
            separator: 分隔符(正则表达式)

        Returns:
            分割后的文本块列表(过滤掉空字符串)
        """
        # 如果分隔符不为空
        if separator:
            # 利用正则分割输入文本
            splits = re.split(separator, text)
        else:
            # 若分隔符为空,则将文本每个字符拆为一个子块
            splits = list(text)

        # 过滤掉空字符串,仅返回非空的块
        return [s for s in splits if s]


# 定义递归字符文本分割器
class RecursiveCharacterTextSplitter(TextSplitter):
    """
    递归字符文本分割器:递归地尝试使用不同的分隔符来分割文本。

    它会按照分隔符列表的顺序,尝试使用每个分隔符来分割文本。
    如果某个片段仍然太大,会递归地使用下一个分隔符继续分割。
    """

    # 构造函数,初始化递归字符文本分割器
    def __init__(
        self,
        separators = None,          # 分隔符列表,默认为 ["\n\n", "\n", " ", ""]
        keep_separator = True,     # 是否保留分隔符,默认为 True
        is_separator_regex = False, # 分隔符是否为正则表达式
        **kwargs,                   # 其它参数传递给父类
    ):
        """
        初始化递归字符文本分割器。

        Args:
            separators: 分隔符列表,按优先级排序
            keep_separator: 是否保留分隔符
            is_separator_regex: 分隔符是否为正则表达式
            **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
        """
        # 初始化父类参数
        super().__init__(keep_separator=keep_separator, **kwargs)
        # 如果未提供分隔符,使用默认值
        self._separators = separators if separators is not None else ["\n\n", "\n", " ", ""]
        # 保存分隔符是否为正则表达式
        self._is_separator_regex = is_separator_regex

    # 递归分割文本的内部方法
    def _split_text(self, text, separators):
        """
        递归地分割文本。

        Args:
            text: 要分割的文本
            separators: 分隔符列表

        Returns:
            分割后的文本块列表
        """
        # 存储最终结果
        final_chunks = []

        # 确定要使用的分隔符
        # 默认使用最后一个分隔符(通常是空字符串,表示按字符分割)
        separator = separators[-1]
        # 存储后续要使用的分隔符
        new_separators = []

        # 遍历分隔符列表,找到第一个在文本中存在的分隔符
        for i, _s in enumerate(separators):
            # 确定分隔符模式(正则或转义)
            separator_ = _s if self._is_separator_regex else re.escape(_s)
            # 如果分隔符为空字符串,直接使用它
            if not _s:
                separator = _s
                break
            # 如果文本中存在该分隔符
            if re.search(separator_, text):
                separator = _s
                # 保存后续要使用的分隔符
                new_separators = separators[i + 1:]
                break

        # 确定分隔符模式用于分割
        separator_ = separator if self._is_separator_regex else re.escape(separator)
        # 使用分隔符分割文本,传递 keep_separator 参数
        splits = self._split_text_with_regex(text, separator_, self._keep_separator)

        # 处理分割后的片段
        good_splits = []  # 存储长度合适的片段
        # 确定合并时使用的分隔符
        separator_ = "" if self._keep_separator else separator

        # 遍历每个分割片段
        for s in splits:
            # 如果片段长度小于 chunk_size,加入 good_splits
            if self._length_function(s) < self._chunk_size:
                good_splits.append(s)
            else:
                # 如果片段太大,先处理之前积累的 good_splits
                if good_splits:
                    # 合并 good_splits
                    merged_text = self._merge_splits(good_splits, separator_)
                    final_chunks.extend(merged_text)
                    good_splits = []

                # 如果还有后续分隔符,递归分割当前片段
                if new_separators:
                    other_info = self._split_text(s, new_separators)
                    final_chunks.extend(other_info)
                else:
                    # 如果没有后续分隔符,直接添加当前片段
                    final_chunks.append(s)

        # 处理剩余的 good_splits
        if good_splits:
            merged_text = self._merge_splits(good_splits, separator_)
            final_chunks.extend(merged_text)

        return final_chunks

    # 使用正则表达式分割文本
    def _split_text_with_regex(self, text, separator, keep_separator=False):
        """
        使用正则表达式分割文本。

        Args:
            text: 要分割的文本
            separator: 分隔符(正则表达式)
            keep_separator: 是否保留分隔符

        Returns:
            分割后的文本块列表(过滤掉空字符串)
        """
        # 如果分隔符不为空
        if separator:
            # 如果需要保留分隔符
            if keep_separator:
                # 使用括号捕获分隔符,这样分隔符会保留在结果中
                splits_ = re.split(f"({separator})", text)
                # 将分隔符和文本片段合并
                splits = []
                for i in range(0, len(splits_) - 1, 2):
                    if i + 1 < len(splits_):
                        # 将文本片段和分隔符合并
                        splits.append(splits_[i] + splits_[i + 1])
                # 处理最后一个片段(如果有)
                if len(splits_) % 2 == 1:
                    splits.append(splits_[-1])
            else:
                # 不保留分隔符,直接分割
                splits = re.split(separator, text)
        else:
            # 如果分隔符为空,将文本转换为字符列表
            splits = list(text)

        # 过滤掉空字符串
        return [s for s in splits if s]

    # 分割文本的公共方法
    def split_text(self, text):
        """
        将文本分割成多个块。

        Args:
            text: 要分割的文本

        Returns:
            文本块列表
        """
        # 调用内部递归分割方法
        return self._split_text(text, self._separators)


# 定义 Markdown 文本分割器
+class MarkdownTextSplitter(RecursiveCharacterTextSplitter):
+   """
+   Markdown 文本分割器:专门用于分割 Markdown 文档。

+   使用 Markdown 特定的分隔符,优先按标题、代码块等结构分割。
+   继承自 RecursiveCharacterTextSplitter,使用递归分割策略。
+   """

    # 构造函数,初始化 Markdown 文本分割器
+   def __init__(self, **kwargs):
+       """
+       初始化 Markdown 文本分割器。

+       Args:
+           **kwargs: 传递给父类的参数(chunk_size, chunk_overlap 等)
+       """
        # Markdown 特定的分隔符,按优先级排序
        # 优先按标题分割,然后是代码块、水平分割线、段落等
+       separators = [
+           "\n\n# ",      # 一级标题(前面有两个换行)
+           "\n\n## ",     # 二级标题
+           "\n\n### ",    # 三级标题
+           "\n\n#### ",   # 四级标题
+           "\n\n##### ",  # 五级标题
+           "\n\n###### ", # 六级标题
+           "\n```",       # 代码块开始(前面有一个换行)
+           "\n---",       # 水平分割线
+           "\n***",       # 水平分割线(另一种格式)
+           "\n\n",        # 段落分隔(双换行)
+           "\n",          # 换行
+           " ",           # 空格
+           "",            # 字符级别(最后的分隔符)
+       ]

        # 调用父类构造函数,传入 Markdown 特定的分隔符
+       super().__init__(
+           separators=separators,
+           keep_separator=True,  # 保留分隔符以保持 Markdown 结构
+           **kwargs
+       )


# 定义 Tokenizer 数据类
# 引入 dataclass 装饰器并声明一个不可变的数据类 Tokenizer
@dataclass(frozen=True)
class Tokenizer:
    # 分块时 token 之间的重叠数量
    chunk_overlap: int
    # 每个分块的 token 数量
    tokens_per_chunk: int
    # 解码方法,接受 List[int] 返回字符串
    decode: callable  
    # 编码方法,接受 str 返回 List[int]
    encode: callable  


# 定义分词器分块函数
def split_text_on_tokens(text, tokenizer: Tokenizer):
    # 使用分词器将文本切分为多个块
    splits = []
    # 用分词器编码文本,得到 token id 列表
    input_ids = tokenizer.encode(text)
    # 起始索引初始化为 0
    start_idx = 0
    # 循环遍历输入 token
    while start_idx < len(input_ids):
        # 计算当前块的结束索引,不超过总长度
        cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
        # 获取当前分块的 token id 子列表
        chunk_ids = input_ids[start_idx:cur_idx]
        # 若分块为空则结束循环
        if not chunk_ids:
            break
        # 解码分块为字符串
        decoded = tokenizer.decode(chunk_ids)
        # 如果解码后字符串非空,则加入结果
        if decoded:
            splits.append(decoded)
        # 如果已到末尾,则跳出循环
        if cur_idx == len(input_ids):
            break
        # 下一步的起始索引,向前滑动 tokens_per_chunk - chunk_overlap
        start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    # 返回所有切分好的文本块
    return splits


# 定义 TokenTextSplitter 类,用于按 token 数量切分文本
class TokenTextSplitter(TextSplitter):
    """
    Token 文本分割器:按 token 数分割文本。

    依赖 tiktoken;chunk_size 与 chunk_overlap 都以 token 数计。
    """

    # 构造函数,初始化编码方式等参数
    def __init__(
        self,
        encoding_name,
        **kwargs,
    ):
        """
        Args:
            **kwargs: 传递给基类的 chunk_size / chunk_overlap 等参数
        """
        # 调用父类构造函数,传递相关参数
        super().__init__(**kwargs)
        # 默认配置
        self._tokenizer = tiktoken.get_encoding(encoding_name)

    # 按 token 数切分文本的主方法
    def split_text(self, text):
        """按 token 数切分文本"""

        # 定义内联的编码方法,传递特殊 token 设置
        def _encode(x: str):
            return self._tokenizer.encode(x)

        # 创建一个 Tokenizer 实例
        tokenizer = Tokenizer(
            chunk_overlap=self._chunk_overlap,
            tokens_per_chunk=self._chunk_size,
            decode=self._tokenizer.decode,
            encode=_encode,
        )
        # 使用分词器分块函数进行切分
        return split_text_on_tokens(text=text, tokenizer=tokenizer)

← 上一节 8.document_loaders 下一节 10.embeddings →

访问验证

请输入访问令牌

Token不正确,请重新输入