40.CharacterTextSplitter #
字符型文本分割器(CharacterTextSplitter)是最基础的一类文本分割工具。其核心思想是将一段字符串按照指定的字符数切分成多个小块,且允许不同块之间存在“重叠区”。这种方式适合对没有特殊结构的文本进行粗粒度的快速处理。例如:长段文本、无明显语义段落的数据切分,用于后续如向量化与检索等场景。
其主要功能和参数包括:
separator:分隔符,默认可为空字符串,表示逐字符切分,也可以设置如换行等字符。chunk_size:每个文本块允许的最大字符数。chunk_overlap:相邻文本块之间的重叠区域长度。
使用流程为:将原始文本按照分隔符、块大小、重叠长度进行切分,生成适合作为语料片段的小块文本序列。这种切分方法适合简单的中文、英文或代码片段初步拆分,但不关心内容的句子、段落结构。对于更复杂的分割需求,推荐使用递归分割器等更高级功能。
40.1. 40.CharacterTextSplitter.py #
40.CharacterTextSplitter.py
# 导入字符型文本分割器
#from langchain_text_splitters import CharacterTextSplitter
from smartchain.text_splitters import CharacterTextSplitter
# 定义一个需要被切分的长文本
text = (
"LangChain 文本分割示例。"
"CharacterTextSplitter 会按固定字符数切分,并可重叠。"
"适合在构建向量索引前先做简单分片。"
)
# 初始化分割器对象,每块20个字符,块与块之间重叠5个字符
splitter = CharacterTextSplitter(
separator="",
chunk_size=20,
chunk_overlap=5,
)
# 执行文本切分操作,返回文本块列表
chunks = splitter.split_text(text)
# 打印原始文本长度
print(f"原文本长度: {len(text)} 字符")
# 打印切分后获得了多少块
print(f"切分得到 {len(chunks)} 块:")
# 逐块打印每一个分片和该分片的字符数
for i, c in enumerate(chunks, 1):
print(f"片段{i} ({len(c)} 字符): {c}")40.2. text_splitters.py #
smartchain/text_splitters.py
"""文本分割器模块,提供文本分割功能。"""
# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
"""
文本分割器的抽象基类。
子类需要实现 split_text() 方法来定义具体的分割逻辑。
"""
# 构造函数,初始化分割器参数
def __init__(
self,
chunk_size = 4000, # 每个块的最大字符数,默认为4000
chunk_overlap = 200, # 块之间的重叠字符数,默认为200
length_function = None, # 计算文本长度的函数
keep_separator = False, # 是否保留分隔符,默认为False
strip_whitespace = True, # 是否去除首尾空白,默认为True
):
"""
初始化文本分割器。
Args:
chunk_size: 每个块的最大字符数
chunk_overlap: 块之间的重叠字符数
length_function: 计算文本长度的函数,默认为 len
keep_separator: 是否保留分隔符
strip_whitespace: 是否去除首尾空白
"""
# 检查 chunk_size 参数是否合法
if chunk_size <= 0:
raise ValueError(f"chunk_size 必须 > 0,当前值: {chunk_size}")
# 检查 chunk_overlap 参数是否合法
if chunk_overlap < 0:
raise ValueError(f"chunk_overlap 必须 >= 0,当前值: {chunk_overlap}")
# 检查 chunk_overlap 不得大于 chunk_size
if chunk_overlap > chunk_size:
raise ValueError(
f"chunk_overlap ({chunk_overlap}) 不能大于 chunk_size ({chunk_size})"
)
# 保存参数到成员变量
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function if length_function is not None else len
self._keep_separator = keep_separator
self._strip_whitespace = strip_whitespace
# 抽象方法,子类必须实现此方法
@abstractmethod
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 强制子类必须实现 split_text 方法
raise NotImplementedError("子类必须实现 split_text() 方法")
# 合并分割后的文本块,控制结果块大小和重叠
def _merge_splits(self, splits, separator):
"""
将分割后的文本块合并,确保每个块不超过 chunk_size。
Args:
splits: 分割后的文本块列表
separator: 用于连接文本块的分隔符
Returns:
合并后的文本块列表
"""
# 计算分隔符的长度
separator_len = self._length_function(separator)
# 定义用于存储最终文本块的列表
docs = []
# 正在构建的当前文档块
current_doc = []
# 当前文档块字符总长度
total = 0
# 遍历每个文本分块
for d in splits:
# 获取当前分块的长度
len_ = self._length_function(d)
# 计算如果加上当前块后,文档块的总长度
total_with_sep = (
total + len_ + (separator_len if len(current_doc) > 0 else 0)
)
# 如果添加当前块会超过设定的 chunk_size
if total_with_sep > self._chunk_size:
# 如果当前拼接块不为空,先拼接并记录
if len(current_doc) > 0:
# 拼接当前文档块
doc = self._join_docs(current_doc, separator)
# 如果拼接结果不为空,则添加到结果列表
if doc is not None:
docs.append(doc)
# 如果拼接结果为空,则跳过后续处理
if doc is None:
continue
# 处理重叠逻辑,只保留最后 chunk_overlap 个字符
target_size = self._chunk_overlap # 需要保留的字符数
current_size = 0 # 当前累计保留长度
keep_docs = [] # 记录需要保留的分块
# 从后往前遍历当前文档块,直到累计长度达到重叠数
for i in range(len(current_doc) - 1, -1, -1):
# 取出每个子块
item = current_doc[i]
# 计算每个子块的长度
item_len = self._length_function(item)
# 计算分隔符的长度
sep_len = separator_len if len(keep_docs) > 0 else 0
# 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
if current_size + item_len + sep_len <= target_size:
# 保留该分块,插入到 keep_docs 前面
keep_docs.insert(0, item)
current_size += item_len + sep_len
else:
# 一旦超过指定重叠字符数,则停止
break
# 更新当前文档块及总长度
current_doc = keep_docs
total = current_size
# 把当前分块加入当前文档块
current_doc.append(d)
# 更新累计长度(已有分块需加分隔符长度)
total += len_ + (separator_len if len(current_doc) > 1 else 0)
# 合并处理最后剩余的一个文档块
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
# 返回处理后的所有文档块
return docs
# 用分隔符拼接分块
def _join_docs(self, docs, separator):
"""
使用分隔符连接文档块。
Args:
docs: 文档块列表
separator: 分隔符
Returns:
连接后的文本,如果为空则返回 None
"""
# 使用分隔符拼接各分块
text = separator.join(docs)
# 按需去除首尾空白符
if self._strip_whitespace:
text = text.strip()
# 如拼接结果非空,返回文本,否则返回 None
return text if text else None
# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
"""
字符文本分割器:按字符数分割文本。
可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
"""
# 构造函数,初始化字符型文本分割器
def __init__(
self,
separator = "\n\n", # 分隔符,默认双换行
is_separator_regex = False, # 分隔符是否为正则表达式
**kwargs, # 其它参数传递给父类
):
"""
初始化字符文本分割器。
Args:
separator: 分隔符,默认为双换行符 "\n\n"
is_separator_regex: 分隔符是否为正则表达式
**kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
"""
# 初始化父类参数
super().__init__(**kwargs)
# 保存分隔符
self._separator = separator
# 保存分隔符是否为正则表达式
self._is_separator_regex = is_separator_regex
# 重写文本分割方法
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 判断分隔符是否为正则表达式
if self._is_separator_regex:
# 直接使用正则表达式
sep_pattern = self._separator
else:
# 普通字符要转义用于正则分割
sep_pattern = re.escape(self._separator)
# 使用分隔符(正则)分割文本
splits = self._split_text_with_regex(text, sep_pattern)
# 根据 self._keep_separator 决定合并分块时是否加分隔符
merge_sep = "" if self._keep_separator else self._separator
# 对分割结果做合并处理,返回
return self._merge_splits(splits, merge_sep)
# 使用正则表达式分割文本块
def _split_text_with_regex(
self, text, separator
):
"""
使用正则表达式分割文本。
Args:
text: 要分割的文本
separator: 分隔符(正则表达式)
Returns:
分割后的文本块列表(过滤掉空字符串)
"""
# 如果分隔符不为空
if separator:
# 利用正则分割输入文本
splits = re.split(separator, text)
else:
# 若分隔符为空,则将文本每个字符拆为一个子块
splits = list(text)
# 过滤掉空字符串,仅返回非空的块
return [s for s in splits if s]41.RecursiveCharacterTextSplitter #
递归字符文本分割器(RecursiveCharacterTextSplitter)是对基础字符分割器的高级扩展,其核心思想是“递归地”多级利用不同分隔符,对文本进行尽可能结构化且紧凑的分块切分。
它的主要机制如下:
- 用户可提供一组分隔符(如:段落换行、普通换行、空格、无分隔),优先级从高到低。
- 分割时,先用第一个分隔符切分。如果某些块仍超过
chunk_size限制,则对这些超长块递归地用下一个分隔符继续分割。 - 最后一个分隔符往往为空字符串,意味着在无法再细分时直接按字符截断,保证每个块最大不超过
chunk_size。 - 可以选择是否在切分结果中保留分隔符内容(
keep_separator)。
递归分割器特别适合对结构复杂、层级多、需细粒度控制的文本素材(如长文章、代码文件等)进行语义更相关的切片。它能更智能地尽量在合理的语义边界,比如段落或句子断点处切割文本,优先保留原始结构信息,同时解决极端情况下不得不打碎到字符级别的场景。
常用参数说明:
separators:分隔符列表,优先级高的在前,比如["\n\n", "\n", " ", ""]。chunk_size:每块最大字符数。chunk_overlap:相邻块的重叠区域长度。keep_separator:是否在分块时保留分隔符本身(默认为True,更利于保留语义边界)。
41.1. 41.RecursiveCharacterTextSplitter.py #
41.RecursiveCharacterTextSplitter.py
# 导入递归字符型文本分割器
#from langchain_text_splitters import RecursiveCharacterTextSplitter
from smartchain.text_splitters import RecursiveCharacterTextSplitter
# 定义一个需要被切分的长文本
text = (
"LangChain 文本分割示例。"
"CharacterTextSplitter 会按固定字符数切分,并可重叠。"
"适合在构建向量索引前先做简单分片。"
)
# 初始化分割器对象,每块20个字符,块与块之间重叠5个字符
splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", " ", ""],
chunk_size=20,
chunk_overlap=5,
)
# 执行文本切分操作,返回文本块列表
chunks = splitter.split_text(text)
# 打印原始文本长度
print(f"原文本长度: {len(text)} 字符")
# 打印切分后获得了多少块
print(f"切分得到 {len(chunks)} 块:")
# 逐块打印每一个分片和该分片的字符数
for i, c in enumerate(chunks, 1):
print(f"片段{i} ({len(c)} 字符): {c}")41.2. text_splitters.py #
smartchain/text_splitters.py
"""文本分割器模块,提供文本分割功能。"""
# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
"""
文本分割器的抽象基类。
子类需要实现 split_text() 方法来定义具体的分割逻辑。
"""
# 构造函数,初始化分割器参数
def __init__(
self,
chunk_size = 4000, # 每个块的最大字符数,默认为4000
chunk_overlap = 200, # 块之间的重叠字符数,默认为200
length_function = None, # 计算文本长度的函数
keep_separator = False, # 是否保留分隔符,默认为False
strip_whitespace = True, # 是否去除首尾空白,默认为True
):
"""
初始化文本分割器。
Args:
chunk_size: 每个块的最大字符数
chunk_overlap: 块之间的重叠字符数
length_function: 计算文本长度的函数,默认为 len
keep_separator: 是否保留分隔符
strip_whitespace: 是否去除首尾空白
"""
# 检查 chunk_size 参数是否合法
if chunk_size <= 0:
raise ValueError(f"chunk_size 必须 > 0,当前值: {chunk_size}")
# 检查 chunk_overlap 参数是否合法
if chunk_overlap < 0:
raise ValueError(f"chunk_overlap 必须 >= 0,当前值: {chunk_overlap}")
# 检查 chunk_overlap 不得大于 chunk_size
if chunk_overlap > chunk_size:
raise ValueError(
f"chunk_overlap ({chunk_overlap}) 不能大于 chunk_size ({chunk_size})"
)
# 保存参数到成员变量
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function if length_function is not None else len
self._keep_separator = keep_separator
self._strip_whitespace = strip_whitespace
# 抽象方法,子类必须实现此方法
@abstractmethod
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 强制子类必须实现 split_text 方法
raise NotImplementedError("子类必须实现 split_text() 方法")
# 合并分割后的文本块,控制结果块大小和重叠
def _merge_splits(self, splits, separator):
"""
将分割后的文本块合并,确保每个块不超过 chunk_size。
Args:
splits: 分割后的文本块列表
separator: 用于连接文本块的分隔符
Returns:
合并后的文本块列表
"""
# 计算分隔符的长度
separator_len = self._length_function(separator)
# 定义用于存储最终文本块的列表
docs = []
# 正在构建的当前文档块
current_doc = []
# 当前文档块字符总长度
total = 0
# 遍历每个文本分块
for d in splits:
# 获取当前分块的长度
len_ = self._length_function(d)
# 计算如果加上当前块后,文档块的总长度
total_with_sep = (
total + len_ + (separator_len if len(current_doc) > 0 else 0)
)
# 如果添加当前块会超过设定的 chunk_size
if total_with_sep > self._chunk_size:
# 如果当前拼接块不为空,先拼接并记录
if len(current_doc) > 0:
# 拼接当前文档块
doc = self._join_docs(current_doc, separator)
# 如果拼接结果不为空,则添加到结果列表
if doc is not None:
docs.append(doc)
# 如果拼接结果为空,则跳过后续处理
if doc is None:
continue
# 处理重叠逻辑,只保留最后 chunk_overlap 个字符
target_size = self._chunk_overlap # 需要保留的字符数
current_size = 0 # 当前累计保留长度
keep_docs = [] # 记录需要保留的分块
# 从后往前遍历当前文档块,直到累计长度达到重叠数
for i in range(len(current_doc) - 1, -1, -1):
# 取出每个子块
item = current_doc[i]
# 计算每个子块的长度
item_len = self._length_function(item)
# 计算分隔符的长度
sep_len = separator_len if len(keep_docs) > 0 else 0
# 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
if current_size + item_len + sep_len <= target_size:
# 保留该分块,插入到 keep_docs 前面
keep_docs.insert(0, item)
current_size += item_len + sep_len
else:
# 一旦超过指定重叠字符数,则停止
break
# 更新当前文档块及总长度
current_doc = keep_docs
total = current_size
# 把当前分块加入当前文档块
current_doc.append(d)
# 更新累计长度(已有分块需加分隔符长度)
total += len_ + (separator_len if len(current_doc) > 1 else 0)
# 合并处理最后剩余的一个文档块
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
# 返回处理后的所有文档块
return docs
# 用分隔符拼接分块
def _join_docs(self, docs, separator):
"""
使用分隔符连接文档块。
Args:
docs: 文档块列表
separator: 分隔符
Returns:
连接后的文本,如果为空则返回 None
"""
# 使用分隔符拼接各分块
text = separator.join(docs)
# 按需去除首尾空白符
if self._strip_whitespace:
text = text.strip()
# 如拼接结果非空,返回文本,否则返回 None
return text if text else None
# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
"""
字符文本分割器:按字符数分割文本。
可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
"""
# 构造函数,初始化字符型文本分割器
def __init__(
self,
separator = "\n\n", # 分隔符,默认双换行
is_separator_regex = False, # 分隔符是否为正则表达式
**kwargs, # 其它参数传递给父类
):
"""
初始化字符文本分割器。
Args:
separator: 分隔符,默认为双换行符 "\n\n"
is_separator_regex: 分隔符是否为正则表达式
**kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
"""
# 初始化父类参数
super().__init__(**kwargs)
# 保存分隔符
self._separator = separator
# 保存分隔符是否为正则表达式
self._is_separator_regex = is_separator_regex
# 重写文本分割方法
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 判断分隔符是否为正则表达式
if self._is_separator_regex:
# 直接使用正则表达式
sep_pattern = self._separator
else:
# 普通字符要转义用于正则分割
sep_pattern = re.escape(self._separator)
# 使用分隔符(正则)分割文本
splits = self._split_text_with_regex(text, sep_pattern)
# 根据 self._keep_separator 决定合并分块时是否加分隔符
merge_sep = "" if self._keep_separator else self._separator
# 对分割结果做合并处理,返回
return self._merge_splits(splits, merge_sep)
# 使用正则表达式分割文本块
def _split_text_with_regex(
self, text, separator
):
"""
使用正则表达式分割文本。
Args:
text: 要分割的文本
separator: 分隔符(正则表达式)
Returns:
分割后的文本块列表(过滤掉空字符串)
"""
# 如果分隔符不为空
if separator:
# 利用正则分割输入文本
splits = re.split(separator, text)
else:
# 若分隔符为空,则将文本每个字符拆为一个子块
splits = list(text)
# 过滤掉空字符串,仅返回非空的块
return [s for s in splits if s]
# 定义递归字符文本分割器
+class RecursiveCharacterTextSplitter(TextSplitter):
+ """
+ 递归字符文本分割器:递归地尝试使用不同的分隔符来分割文本。
+ 它会按照分隔符列表的顺序,尝试使用每个分隔符来分割文本。
+ 如果某个片段仍然太大,会递归地使用下一个分隔符继续分割。
+ """
# 构造函数,初始化递归字符文本分割器
+ def __init__(
+ self,
+ separators = None, # 分隔符列表,默认为 ["\n\n", "\n", " ", ""]
+ keep_separator = True, # 是否保留分隔符,默认为 True
+ is_separator_regex = False, # 分隔符是否为正则表达式
+ **kwargs, # 其它参数传递给父类
+ ):
+ """
+ 初始化递归字符文本分割器。
+ Args:
+ separators: 分隔符列表,按优先级排序
+ keep_separator: 是否保留分隔符
+ is_separator_regex: 分隔符是否为正则表达式
+ **kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
+ """
# 初始化父类参数
+ super().__init__(keep_separator=keep_separator, **kwargs)
# 如果未提供分隔符,使用默认值
+ self._separators = separators if separators is not None else ["\n\n", "\n", " ", ""]
# 保存分隔符是否为正则表达式
+ self._is_separator_regex = is_separator_regex
# 递归分割文本的内部方法
+ def _split_text(self, text, separators):
+ """
+ 递归地分割文本。
+ Args:
+ text: 要分割的文本
+ separators: 分隔符列表
+ Returns:
+ 分割后的文本块列表
+ """
# 存储最终结果
+ final_chunks = []
# 确定要使用的分隔符
# 默认使用最后一个分隔符(通常是空字符串,表示按字符分割)
+ separator = separators[-1]
# 存储后续要使用的分隔符
+ new_separators = []
# 遍历分隔符列表,找到第一个在文本中存在的分隔符
+ for i, _s in enumerate(separators):
# 确定分隔符模式(正则或转义)
+ separator_ = _s if self._is_separator_regex else re.escape(_s)
# 如果分隔符为空字符串,直接使用它
+ if not _s:
+ separator = _s
+ break
# 如果文本中存在该分隔符
+ if re.search(separator_, text):
+ separator = _s
# 保存后续要使用的分隔符
+ new_separators = separators[i + 1:]
+ break
# 确定分隔符模式用于分割
+ separator_ = separator if self._is_separator_regex else re.escape(separator)
# 使用分隔符分割文本,传递 keep_separator 参数
+ splits = self._split_text_with_regex(text, separator_, self._keep_separator)
# 处理分割后的片段
+ good_splits = [] # 存储长度合适的片段
# 确定合并时使用的分隔符
+ separator_ = "" if self._keep_separator else separator
# 遍历每个分割片段
+ for s in splits:
# 如果片段长度小于 chunk_size,加入 good_splits
+ if self._length_function(s) < self._chunk_size:
+ good_splits.append(s)
+ else:
# 如果片段太大,先处理之前积累的 good_splits
+ if good_splits:
# 合并 good_splits
+ merged_text = self._merge_splits(good_splits, separator_)
+ final_chunks.extend(merged_text)
+ good_splits = []
# 如果还有后续分隔符,递归分割当前片段
+ if new_separators:
+ other_info = self._split_text(s, new_separators)
+ final_chunks.extend(other_info)
+ else:
# 如果没有后续分隔符,直接添加当前片段
+ final_chunks.append(s)
# 处理剩余的 good_splits
+ if good_splits:
+ merged_text = self._merge_splits(good_splits, separator_)
+ final_chunks.extend(merged_text)
+ return final_chunks
# 使用正则表达式分割文本
+ def _split_text_with_regex(self, text, separator, keep_separator=False):
+ """
+ 使用正则表达式分割文本。
+ Args:
+ text: 要分割的文本
+ separator: 分隔符(正则表达式)
+ keep_separator: 是否保留分隔符
+ Returns:
+ 分割后的文本块列表(过滤掉空字符串)
+ """
# 如果分隔符不为空
+ if separator:
# 如果需要保留分隔符
+ if keep_separator:
# 使用括号捕获分隔符,这样分隔符会保留在结果中
+ splits_ = re.split(f"({separator})", text)
# 将分隔符和文本片段合并
+ splits = []
+ for i in range(0, len(splits_) - 1, 2):
+ if i + 1 < len(splits_):
# 将文本片段和分隔符合并
+ splits.append(splits_[i] + splits_[i + 1])
# 处理最后一个片段(如果有)
+ if len(splits_) % 2 == 1:
+ splits.append(splits_[-1])
+ else:
# 不保留分隔符,直接分割
+ splits = re.split(separator, text)
+ else:
# 如果分隔符为空,将文本转换为字符列表
+ splits = list(text)
# 过滤掉空字符串
+ return [s for s in splits if s]
# 分割文本的公共方法
+ def split_text(self, text):
+ """
+ 将文本分割成多个块。
+ Args:
+ text: 要分割的文本
+ Returns:
+ 文本块列表
+ """
# 调用内部递归分割方法
+ return self._split_text(text, self._separators)42.TokenTextSplitter #
TokenTextSplitter 是一种基于 token 数量对文本进行分割的分割器,适用于语义分割和大模型场景(如向量化前的智能分片等)。
与传统的按字符或分隔符切分不同,TokenTextSplitter 利用 tiktoken(OpenAI 官方分词库)对文本编码成 token,然后按照设定的 chunk_size 和 chunk_overlap 参数进行滑窗、切片,保证每一段都不会超过指定的 token 数且有重叠区域。这样可以更好地适应大语言模型处理文本的方式,有效避免出现内容中断或上下文不连续的情况。
典型用法如下所示:
- 通过
encoding_name选择分词方案(如"cl100k_base"适用于 GPT-4/3.5)。 - 按 token 数切分,并设置重叠,便于后续高效向量化或上下文检索。
- 支持中文、英文等多语言文本。
注意:在实际应用中,合理设置 chunk_size 和 chunk_overlap 能有效提升 LLM 召回率与答案相关性。
42.1. 42.TokenTextSplitter.py #
42.TokenTextSplitter.py
# 导入递归字符型文本分割器
#from langchain_text_splitters import TokenTextSplitter
from smartchain.text_splitters import TokenTextSplitter
import tiktoken
# 定义一个需要被切分的长文本
text = (
"LangChain 文本分割示例。"
"CharacterTextSplitter 会按固定字符数切分,并可重叠。"
"适合在构建向量索引前先做简单分片。"
)
# 初始化分割器对象,每块20个字符,块与块之间重叠5个字符
splitter = TokenTextSplitter(
chunk_size=20,
chunk_overlap=5,
encoding_name="cl100k_base",
)
# 执行文本切分操作,返回文本块列表
chunks = splitter.split_text(text)
# 获取编码器
encoding = tiktoken.get_encoding("cl100k_base")
# 打印原始文本长度
print(f"原文本长度: {len(encoding.encode(text))} 字符")
# 打印切分后获得了多少块
print(f"切分得到 {len(chunks)} 块:")
# 逐块打印每一个分片和该分片的字符数
for i, c in enumerate(chunks, 1):
print(f"片段{i} ({len(encoding.encode(c))} tokens): {c}")42.2. text_splitters.py #
smartchain/text_splitters.py
"""文本分割器模块,提供文本分割功能。"""
+from dataclasses import dataclass
# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
+import tiktoken
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
"""
文本分割器的抽象基类。
子类需要实现 split_text() 方法来定义具体的分割逻辑。
"""
# 构造函数,初始化分割器参数
def __init__(
self,
chunk_size = 4000, # 每个块的最大字符数,默认为4000
chunk_overlap = 200, # 块之间的重叠字符数,默认为200
length_function = None, # 计算文本长度的函数
keep_separator = False, # 是否保留分隔符,默认为False
strip_whitespace = True, # 是否去除首尾空白,默认为True
):
"""
初始化文本分割器。
Args:
chunk_size: 每个块的最大字符数
chunk_overlap: 块之间的重叠字符数
length_function: 计算文本长度的函数,默认为 len
keep_separator: 是否保留分隔符
strip_whitespace: 是否去除首尾空白
"""
# 保存参数到成员变量
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function if length_function is not None else len
self._keep_separator = keep_separator
self._strip_whitespace = strip_whitespace
# 抽象方法,子类必须实现此方法
@abstractmethod
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 强制子类必须实现 split_text 方法
raise NotImplementedError("子类必须实现 split_text() 方法")
# 合并分割后的文本块,控制结果块大小和重叠
def _merge_splits(self, splits, separator):
"""
将分割后的文本块合并,确保每个块不超过 chunk_size。
Args:
splits: 分割后的文本块列表
separator: 用于连接文本块的分隔符
Returns:
合并后的文本块列表
"""
# 计算分隔符的长度
separator_len = self._length_function(separator)
# 定义用于存储最终文本块的列表
docs = []
# 正在构建的当前文档块
current_doc = []
# 当前文档块字符总长度
total = 0
# 遍历每个文本分块
for d in splits:
# 获取当前分块的长度
len_ = self._length_function(d)
# 计算如果加上当前块后,文档块的总长度
total_with_sep = (
total + len_ + (separator_len if len(current_doc) > 0 else 0)
)
# 如果添加当前块会超过设定的 chunk_size
if total_with_sep > self._chunk_size:
# 如果当前拼接块不为空,先拼接并记录
if len(current_doc) > 0:
# 拼接当前文档块
doc = self._join_docs(current_doc, separator)
# 如果拼接结果不为空,则添加到结果列表
if doc is not None:
docs.append(doc)
# 如果拼接结果为空,则跳过后续处理
if doc is None:
continue
# 处理重叠逻辑,只保留最后 chunk_overlap 个字符
target_size = self._chunk_overlap # 需要保留的字符数
current_size = 0 # 当前累计保留长度
keep_docs = [] # 记录需要保留的分块
# 从后往前遍历当前文档块,直到累计长度达到重叠数
for i in range(len(current_doc) - 1, -1, -1):
# 取出每个子块
item = current_doc[i]
# 计算每个子块的长度
item_len = self._length_function(item)
# 计算分隔符的长度
sep_len = separator_len if len(keep_docs) > 0 else 0
# 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
if current_size + item_len + sep_len <= target_size:
# 保留该分块,插入到 keep_docs 前面
keep_docs.insert(0, item)
current_size += item_len + sep_len
else:
# 一旦超过指定重叠字符数,则停止
break
# 更新当前文档块及总长度
current_doc = keep_docs
total = current_size
# 把当前分块加入当前文档块
current_doc.append(d)
# 更新累计长度(已有分块需加分隔符长度)
total += len_ + (separator_len if len(current_doc) > 1 else 0)
# 合并处理最后剩余的一个文档块
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
# 返回处理后的所有文档块
return docs
# 用分隔符拼接分块
def _join_docs(self, docs, separator):
"""
使用分隔符连接文档块。
Args:
docs: 文档块列表
separator: 分隔符
Returns:
连接后的文本,如果为空则返回 None
"""
# 使用分隔符拼接各分块
text = separator.join(docs)
# 按需去除首尾空白符
if self._strip_whitespace:
text = text.strip()
# 如拼接结果非空,返回文本,否则返回 None
return text if text else None
# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
"""
字符文本分割器:按字符数分割文本。
可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
"""
# 构造函数,初始化字符型文本分割器
def __init__(
self,
separator = "\n\n", # 分隔符,默认双换行
is_separator_regex = False, # 分隔符是否为正则表达式
**kwargs, # 其它参数传递给父类
):
"""
初始化字符文本分割器。
Args:
separator: 分隔符,默认为双换行符 "\n\n"
is_separator_regex: 分隔符是否为正则表达式
**kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
"""
# 初始化父类参数
super().__init__(**kwargs)
# 保存分隔符
self._separator = separator
# 保存分隔符是否为正则表达式
self._is_separator_regex = is_separator_regex
# 重写文本分割方法
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 判断分隔符是否为正则表达式
if self._is_separator_regex:
# 直接使用正则表达式
sep_pattern = self._separator
else:
# 普通字符要转义用于正则分割
sep_pattern = re.escape(self._separator)
# 使用分隔符(正则)分割文本
splits = self._split_text_with_regex(text, sep_pattern)
# 根据 self._keep_separator 决定合并分块时是否加分隔符
merge_sep = "" if self._keep_separator else self._separator
# 对分割结果做合并处理,返回
return self._merge_splits(splits, merge_sep)
# 使用正则表达式分割文本块
def _split_text_with_regex(
self, text, separator
):
"""
使用正则表达式分割文本。
Args:
text: 要分割的文本
separator: 分隔符(正则表达式)
Returns:
分割后的文本块列表(过滤掉空字符串)
"""
# 如果分隔符不为空
if separator:
# 利用正则分割输入文本
splits = re.split(separator, text)
else:
# 若分隔符为空,则将文本每个字符拆为一个子块
splits = list(text)
# 过滤掉空字符串,仅返回非空的块
return [s for s in splits if s]
# 定义递归字符文本分割器
class RecursiveCharacterTextSplitter(TextSplitter):
"""
递归字符文本分割器:递归地尝试使用不同的分隔符来分割文本。
它会按照分隔符列表的顺序,尝试使用每个分隔符来分割文本。
如果某个片段仍然太大,会递归地使用下一个分隔符继续分割。
"""
# 构造函数,初始化递归字符文本分割器
def __init__(
self,
separators = None, # 分隔符列表,默认为 ["\n\n", "\n", " ", ""]
keep_separator = True, # 是否保留分隔符,默认为 True
is_separator_regex = False, # 分隔符是否为正则表达式
**kwargs, # 其它参数传递给父类
):
"""
初始化递归字符文本分割器。
Args:
separators: 分隔符列表,按优先级排序
keep_separator: 是否保留分隔符
is_separator_regex: 分隔符是否为正则表达式
**kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
"""
# 初始化父类参数
super().__init__(keep_separator=keep_separator, **kwargs)
# 如果未提供分隔符,使用默认值
self._separators = separators if separators is not None else ["\n\n", "\n", " ", ""]
# 保存分隔符是否为正则表达式
self._is_separator_regex = is_separator_regex
# 递归分割文本的内部方法
def _split_text(self, text, separators):
"""
递归地分割文本。
Args:
text: 要分割的文本
separators: 分隔符列表
Returns:
分割后的文本块列表
"""
# 存储最终结果
final_chunks = []
# 确定要使用的分隔符
# 默认使用最后一个分隔符(通常是空字符串,表示按字符分割)
separator = separators[-1]
# 存储后续要使用的分隔符
new_separators = []
# 遍历分隔符列表,找到第一个在文本中存在的分隔符
for i, _s in enumerate(separators):
# 确定分隔符模式(正则或转义)
separator_ = _s if self._is_separator_regex else re.escape(_s)
# 如果分隔符为空字符串,直接使用它
if not _s:
separator = _s
break
# 如果文本中存在该分隔符
if re.search(separator_, text):
separator = _s
# 保存后续要使用的分隔符
new_separators = separators[i + 1:]
break
# 确定分隔符模式用于分割
separator_ = separator if self._is_separator_regex else re.escape(separator)
# 使用分隔符分割文本,传递 keep_separator 参数
splits = self._split_text_with_regex(text, separator_, self._keep_separator)
# 处理分割后的片段
good_splits = [] # 存储长度合适的片段
# 确定合并时使用的分隔符
separator_ = "" if self._keep_separator else separator
# 遍历每个分割片段
for s in splits:
# 如果片段长度小于 chunk_size,加入 good_splits
if self._length_function(s) < self._chunk_size:
good_splits.append(s)
else:
# 如果片段太大,先处理之前积累的 good_splits
if good_splits:
# 合并 good_splits
merged_text = self._merge_splits(good_splits, separator_)
final_chunks.extend(merged_text)
good_splits = []
# 如果还有后续分隔符,递归分割当前片段
if new_separators:
other_info = self._split_text(s, new_separators)
final_chunks.extend(other_info)
else:
# 如果没有后续分隔符,直接添加当前片段
final_chunks.append(s)
# 处理剩余的 good_splits
if good_splits:
merged_text = self._merge_splits(good_splits, separator_)
final_chunks.extend(merged_text)
return final_chunks
# 使用正则表达式分割文本
def _split_text_with_regex(self, text, separator, keep_separator=False):
"""
使用正则表达式分割文本。
Args:
text: 要分割的文本
separator: 分隔符(正则表达式)
keep_separator: 是否保留分隔符
Returns:
分割后的文本块列表(过滤掉空字符串)
"""
# 如果分隔符不为空
if separator:
# 如果需要保留分隔符
if keep_separator:
# 使用括号捕获分隔符,这样分隔符会保留在结果中
splits_ = re.split(f"({separator})", text)
# 将分隔符和文本片段合并
splits = []
for i in range(0, len(splits_) - 1, 2):
if i + 1 < len(splits_):
# 将文本片段和分隔符合并
splits.append(splits_[i] + splits_[i + 1])
# 处理最后一个片段(如果有)
if len(splits_) % 2 == 1:
splits.append(splits_[-1])
else:
# 不保留分隔符,直接分割
splits = re.split(separator, text)
else:
# 如果分隔符为空,将文本转换为字符列表
splits = list(text)
# 过滤掉空字符串
return [s for s in splits if s]
# 分割文本的公共方法
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 调用内部递归分割方法
return self._split_text(text, self._separators)
# 定义 Tokenizer 数据类
# 引入 dataclass 装饰器并声明一个不可变的数据类 Tokenizer
+@dataclass(frozen=True)
+class Tokenizer:
# 分块时 token 之间的重叠数量
+ chunk_overlap: int
# 每个分块的 token 数量
+ tokens_per_chunk: int
# 解码方法,接受 List[int] 返回字符串
+ decode: callable
# 编码方法,接受 str 返回 List[int]
+ encode: callable
# 定义分词器分块函数
+def split_text_on_tokens(text, tokenizer: Tokenizer):
# 使用分词器将文本切分为多个块
+ splits = []
# 用分词器编码文本,得到 token id 列表
+ input_ids = tokenizer.encode(text)
# 起始索引初始化为 0
+ start_idx = 0
# 循环遍历输入 token
+ while start_idx < len(input_ids):
# 计算当前块的结束索引,不超过总长度
+ cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
# 获取当前分块的 token id 子列表
+ chunk_ids = input_ids[start_idx:cur_idx]
# 若分块为空则结束循环
+ if not chunk_ids:
+ break
# 解码分块为字符串
+ decoded = tokenizer.decode(chunk_ids)
# 如果解码后字符串非空,则加入结果
+ if decoded:
+ splits.append(decoded)
# 如果已到末尾,则跳出循环
+ if cur_idx == len(input_ids):
+ break
# 下一步的起始索引,向前滑动 tokens_per_chunk - chunk_overlap
+ start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
# 返回所有切分好的文本块
+ return splits
# 定义 TokenTextSplitter 类,用于按 token 数量切分文本
+class TokenTextSplitter(TextSplitter):
+ """
+ Token 文本分割器:按 token 数分割文本。
+ 依赖 tiktoken;chunk_size 与 chunk_overlap 都以 token 数计。
+ """
# 构造函数,初始化编码方式等参数
+ def __init__(
+ self,
+ encoding_name,
+ **kwargs,
+ ):
+ """
+ Args:
+ **kwargs: 传递给基类的 chunk_size / chunk_overlap 等参数
+ """
# 调用父类构造函数,传递相关参数
+ super().__init__(**kwargs)
# 默认配置
+ self._tokenizer = tiktoken.get_encoding(encoding_name)
# 按 token 数切分文本的主方法
+ def split_text(self, text):
+ """按 token 数切分文本"""
# 定义内联的编码方法,传递特殊 token 设置
+ def _encode(x: str):
+ return self._tokenizer.encode(x)
# 创建一个 Tokenizer 实例
+ tokenizer = Tokenizer(
+ chunk_overlap=self._chunk_overlap,
+ tokens_per_chunk=self._chunk_size,
+ decode=self._tokenizer.decode,
+ encode=_encode,
+ )
# 使用分词器分块函数进行切分
+ return split_text_on_tokens(text=text, tokenizer=tokenizer)43. MarkdownTextSplitter #
MarkdownTextSplitter 是专为 Markdown 格式文档打造的文本分割工具。
与常规 CharacterTextSplitter 不同,它会优先依据 Markdown 结构特征(如标题、代码块、分隔线和段落等)对文本进行递归式切分,从而尽量保持每个分片的语义完整。这能够更好地兼容知识库构建、向量索引及下游摘要、语义检索等场景,提升切片质量。
其核心特点有:
- 多级分隔优先级:首先按照标题级别(如
#、##),再按代码块(`)、分割线、段落双换行等切分,最后才是以字符粗粒度切断,最大限度避免割裂上下文。 - 可自定义分隔符序列:通过调整
separators顺序,可以灵活定义不同 Markdown 样式或自定义标记的优先级拆分方式。 - 递归分割逻辑:当上一级分隔无法使分片大小合规时,会自动递归向下采用更细分隔规则,直到满足设定的
chunk_size和chunk_overlap。
简单来说,如果你需要整理、搜索或向量化 Markdown 笔记、技术文档、博客等结构化文本,建议优先选择 MarkdownTextSplitter。它能够显著提升后续检索和内容聚合的表现,并更易用于多语种 Markdown 处理。
43.1. 43.MarkdownTextSplitter.py #
43.MarkdownTextSplitter.py
# 导入 TextSplitter 基类和 MarkdownTextSplitter
from smartchain.text_splitters import TextSplitter, MarkdownTextSplitter
# 定义一个 Markdown 文档示例
markdown_text = """# 第一章:介绍
这是第一章的内容。Markdown 是一种轻量级标记语言。
## 1.1 什么是 Markdown
Markdown 允许你使用易读易写的纯文本格式编写文档。
### 优点
- 简单易学
- 广泛支持
- 易于转换
## 1.2 基本语法
Markdown 支持多种语法元素。
``python
# 这是一个代码块示例
def hello():
print("Hello, Markdown!")
``
---
# 第二章:高级特性
这是第二章的内容。
## 2.1 表格
Markdown 支持表格语法。
## 2.2 链接和图片
Markdown 支持链接和图片插入。
---
# 第三章:总结
这是最后一章的内容。
"""
# 创建 Markdown 文本分割器
splitter = MarkdownTextSplitter(
chunk_size=100, # 每个块最多 100 字符
chunk_overlap=20, # 块之间重叠 20 字符
)
# 分割 Markdown 文本
chunks = splitter.split_text(markdown_text)
# 打印结果
print("=" * 60)
print("Markdown 文本分割示例")
print("=" * 60)
print(f"原文本长度: {len(markdown_text)} 字符")
print(f"切分得到 {len(chunks)} 块:\n")
for i, chunk in enumerate(chunks, 1):
print(f"--- 片段 {i} ({len(chunk)} 字符) ---")
# 显示完整内容(如果太长则截断)
if len(chunk) > 100:
print(chunk[:100] + "...")
else:
print(chunk)
print()
43.2. text_splitters.py #
smartchain/text_splitters.py
"""文本分割器模块,提供文本分割功能。"""
from dataclasses import dataclass
# 导入正则表达式模块
import re
# 导入抽象基类相关
from abc import ABC, abstractmethod
import tiktoken
# 定义 TextSplitter 抽象基类
class TextSplitter(ABC):
"""
文本分割器的抽象基类。
子类需要实现 split_text() 方法来定义具体的分割逻辑。
"""
# 构造函数,初始化分割器参数
def __init__(
self,
chunk_size = 4000, # 每个块的最大字符数,默认为4000
chunk_overlap = 200, # 块之间的重叠字符数,默认为200
length_function = None, # 计算文本长度的函数
keep_separator = False, # 是否保留分隔符,默认为False
strip_whitespace = True, # 是否去除首尾空白,默认为True
):
"""
初始化文本分割器。
Args:
chunk_size: 每个块的最大字符数
chunk_overlap: 块之间的重叠字符数
length_function: 计算文本长度的函数,默认为 len
keep_separator: 是否保留分隔符
strip_whitespace: 是否去除首尾空白
"""
# 保存参数到成员变量
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function if length_function is not None else len
self._keep_separator = keep_separator
self._strip_whitespace = strip_whitespace
# 抽象方法,子类必须实现此方法
@abstractmethod
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 强制子类必须实现 split_text 方法
raise NotImplementedError("子类必须实现 split_text() 方法")
# 合并分割后的文本块,控制结果块大小和重叠
def _merge_splits(self, splits, separator):
"""
将分割后的文本块合并,确保每个块不超过 chunk_size。
Args:
splits: 分割后的文本块列表
separator: 用于连接文本块的分隔符
Returns:
合并后的文本块列表
"""
# 计算分隔符的长度
separator_len = self._length_function(separator)
# 定义用于存储最终文本块的列表
docs = []
# 正在构建的当前文档块
current_doc = []
# 当前文档块字符总长度
total = 0
# 遍历每个文本分块
for d in splits:
# 获取当前分块的长度
len_ = self._length_function(d)
# 计算如果加上当前块后,文档块的总长度
total_with_sep = (
total + len_ + (separator_len if len(current_doc) > 0 else 0)
)
# 如果添加当前块会超过设定的 chunk_size
if total_with_sep > self._chunk_size:
# 如果当前拼接块不为空,先拼接并记录
if len(current_doc) > 0:
# 拼接当前文档块
doc = self._join_docs(current_doc, separator)
# 如果拼接结果不为空,则添加到结果列表
if doc is not None:
docs.append(doc)
# 如果拼接结果为空,则跳过后续处理
if doc is None:
continue
# 处理重叠逻辑,只保留最后 chunk_overlap 个字符
target_size = self._chunk_overlap # 需要保留的字符数
current_size = 0 # 当前累计保留长度
keep_docs = [] # 记录需要保留的分块
# 从后往前遍历当前文档块,直到累计长度达到重叠数
for i in range(len(current_doc) - 1, -1, -1):
# 取出每个子块
item = current_doc[i]
# 计算每个子块的长度
item_len = self._length_function(item)
# 计算分隔符的长度
sep_len = separator_len if len(keep_docs) > 0 else 0
# 如果累计长度加上子块长度和分隔符长度小于等于重叠字符数,则保留该分块
if current_size + item_len + sep_len <= target_size:
# 保留该分块,插入到 keep_docs 前面
keep_docs.insert(0, item)
current_size += item_len + sep_len
else:
# 一旦超过指定重叠字符数,则停止
break
# 更新当前文档块及总长度
current_doc = keep_docs
total = current_size
# 把当前分块加入当前文档块
current_doc.append(d)
# 更新累计长度(已有分块需加分隔符长度)
total += len_ + (separator_len if len(current_doc) > 1 else 0)
# 合并处理最后剩余的一个文档块
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
# 返回处理后的所有文档块
return docs
# 用分隔符拼接分块
def _join_docs(self, docs, separator):
"""
使用分隔符连接文档块。
Args:
docs: 文档块列表
separator: 分隔符
Returns:
连接后的文本,如果为空则返回 None
"""
# 使用分隔符拼接各分块
text = separator.join(docs)
# 按需去除首尾空白符
if self._strip_whitespace:
text = text.strip()
# 如拼接结果非空,返回文本,否则返回 None
return text if text else None
# 定义按字符数处理的文本分割器
class CharacterTextSplitter(TextSplitter):
"""
字符文本分割器:按字符数分割文本。
可以指定分隔符,先按分隔符分割,然后再合并成指定大小的块。
"""
# 构造函数,初始化字符型文本分割器
def __init__(
self,
separator = "\n\n", # 分隔符,默认双换行
is_separator_regex = False, # 分隔符是否为正则表达式
**kwargs, # 其它参数传递给父类
):
"""
初始化字符文本分割器。
Args:
separator: 分隔符,默认为双换行符 "\n\n"
is_separator_regex: 分隔符是否为正则表达式
**kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
"""
# 初始化父类参数
super().__init__(**kwargs)
# 保存分隔符
self._separator = separator
# 保存分隔符是否为正则表达式
self._is_separator_regex = is_separator_regex
# 重写文本分割方法
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 判断分隔符是否为正则表达式
if self._is_separator_regex:
# 直接使用正则表达式
sep_pattern = self._separator
else:
# 普通字符要转义用于正则分割
sep_pattern = re.escape(self._separator)
# 使用分隔符(正则)分割文本
splits = self._split_text_with_regex(text, sep_pattern)
# 根据 self._keep_separator 决定合并分块时是否加分隔符
merge_sep = "" if self._keep_separator else self._separator
# 对分割结果做合并处理,返回
return self._merge_splits(splits, merge_sep)
# 使用正则表达式分割文本块
def _split_text_with_regex(
self, text, separator
):
"""
使用正则表达式分割文本。
Args:
text: 要分割的文本
separator: 分隔符(正则表达式)
Returns:
分割后的文本块列表(过滤掉空字符串)
"""
# 如果分隔符不为空
if separator:
# 利用正则分割输入文本
splits = re.split(separator, text)
else:
# 若分隔符为空,则将文本每个字符拆为一个子块
splits = list(text)
# 过滤掉空字符串,仅返回非空的块
return [s for s in splits if s]
# 定义递归字符文本分割器
class RecursiveCharacterTextSplitter(TextSplitter):
"""
递归字符文本分割器:递归地尝试使用不同的分隔符来分割文本。
它会按照分隔符列表的顺序,尝试使用每个分隔符来分割文本。
如果某个片段仍然太大,会递归地使用下一个分隔符继续分割。
"""
# 构造函数,初始化递归字符文本分割器
def __init__(
self,
separators = None, # 分隔符列表,默认为 ["\n\n", "\n", " ", ""]
keep_separator = True, # 是否保留分隔符,默认为 True
is_separator_regex = False, # 分隔符是否为正则表达式
**kwargs, # 其它参数传递给父类
):
"""
初始化递归字符文本分割器。
Args:
separators: 分隔符列表,按优先级排序
keep_separator: 是否保留分隔符
is_separator_regex: 分隔符是否为正则表达式
**kwargs: 传递给父类的其他参数(chunk_size, chunk_overlap 等)
"""
# 初始化父类参数
super().__init__(keep_separator=keep_separator, **kwargs)
# 如果未提供分隔符,使用默认值
self._separators = separators if separators is not None else ["\n\n", "\n", " ", ""]
# 保存分隔符是否为正则表达式
self._is_separator_regex = is_separator_regex
# 递归分割文本的内部方法
def _split_text(self, text, separators):
"""
递归地分割文本。
Args:
text: 要分割的文本
separators: 分隔符列表
Returns:
分割后的文本块列表
"""
# 存储最终结果
final_chunks = []
# 确定要使用的分隔符
# 默认使用最后一个分隔符(通常是空字符串,表示按字符分割)
separator = separators[-1]
# 存储后续要使用的分隔符
new_separators = []
# 遍历分隔符列表,找到第一个在文本中存在的分隔符
for i, _s in enumerate(separators):
# 确定分隔符模式(正则或转义)
separator_ = _s if self._is_separator_regex else re.escape(_s)
# 如果分隔符为空字符串,直接使用它
if not _s:
separator = _s
break
# 如果文本中存在该分隔符
if re.search(separator_, text):
separator = _s
# 保存后续要使用的分隔符
new_separators = separators[i + 1:]
break
# 确定分隔符模式用于分割
separator_ = separator if self._is_separator_regex else re.escape(separator)
# 使用分隔符分割文本,传递 keep_separator 参数
splits = self._split_text_with_regex(text, separator_, self._keep_separator)
# 处理分割后的片段
good_splits = [] # 存储长度合适的片段
# 确定合并时使用的分隔符
separator_ = "" if self._keep_separator else separator
# 遍历每个分割片段
for s in splits:
# 如果片段长度小于 chunk_size,加入 good_splits
if self._length_function(s) < self._chunk_size:
good_splits.append(s)
else:
# 如果片段太大,先处理之前积累的 good_splits
if good_splits:
# 合并 good_splits
merged_text = self._merge_splits(good_splits, separator_)
final_chunks.extend(merged_text)
good_splits = []
# 如果还有后续分隔符,递归分割当前片段
if new_separators:
other_info = self._split_text(s, new_separators)
final_chunks.extend(other_info)
else:
# 如果没有后续分隔符,直接添加当前片段
final_chunks.append(s)
# 处理剩余的 good_splits
if good_splits:
merged_text = self._merge_splits(good_splits, separator_)
final_chunks.extend(merged_text)
return final_chunks
# 使用正则表达式分割文本
def _split_text_with_regex(self, text, separator, keep_separator=False):
"""
使用正则表达式分割文本。
Args:
text: 要分割的文本
separator: 分隔符(正则表达式)
keep_separator: 是否保留分隔符
Returns:
分割后的文本块列表(过滤掉空字符串)
"""
# 如果分隔符不为空
if separator:
# 如果需要保留分隔符
if keep_separator:
# 使用括号捕获分隔符,这样分隔符会保留在结果中
splits_ = re.split(f"({separator})", text)
# 将分隔符和文本片段合并
splits = []
for i in range(0, len(splits_) - 1, 2):
if i + 1 < len(splits_):
# 将文本片段和分隔符合并
splits.append(splits_[i] + splits_[i + 1])
# 处理最后一个片段(如果有)
if len(splits_) % 2 == 1:
splits.append(splits_[-1])
else:
# 不保留分隔符,直接分割
splits = re.split(separator, text)
else:
# 如果分隔符为空,将文本转换为字符列表
splits = list(text)
# 过滤掉空字符串
return [s for s in splits if s]
# 分割文本的公共方法
def split_text(self, text):
"""
将文本分割成多个块。
Args:
text: 要分割的文本
Returns:
文本块列表
"""
# 调用内部递归分割方法
return self._split_text(text, self._separators)
# 定义 Markdown 文本分割器
+class MarkdownTextSplitter(RecursiveCharacterTextSplitter):
+ """
+ Markdown 文本分割器:专门用于分割 Markdown 文档。
+ 使用 Markdown 特定的分隔符,优先按标题、代码块等结构分割。
+ 继承自 RecursiveCharacterTextSplitter,使用递归分割策略。
+ """
# 构造函数,初始化 Markdown 文本分割器
+ def __init__(self, **kwargs):
+ """
+ 初始化 Markdown 文本分割器。
+ Args:
+ **kwargs: 传递给父类的参数(chunk_size, chunk_overlap 等)
+ """
# Markdown 特定的分隔符,按优先级排序
# 优先按标题分割,然后是代码块、水平分割线、段落等
+ separators = [
+ "\n\n# ", # 一级标题(前面有两个换行)
+ "\n\n## ", # 二级标题
+ "\n\n### ", # 三级标题
+ "\n\n#### ", # 四级标题
+ "\n\n##### ", # 五级标题
+ "\n\n###### ", # 六级标题
+ "\n```", # 代码块开始(前面有一个换行)
+ "\n---", # 水平分割线
+ "\n***", # 水平分割线(另一种格式)
+ "\n\n", # 段落分隔(双换行)
+ "\n", # 换行
+ " ", # 空格
+ "", # 字符级别(最后的分隔符)
+ ]
# 调用父类构造函数,传入 Markdown 特定的分隔符
+ super().__init__(
+ separators=separators,
+ keep_separator=True, # 保留分隔符以保持 Markdown 结构
+ **kwargs
+ )
# 定义 Tokenizer 数据类
# 引入 dataclass 装饰器并声明一个不可变的数据类 Tokenizer
@dataclass(frozen=True)
class Tokenizer:
# 分块时 token 之间的重叠数量
chunk_overlap: int
# 每个分块的 token 数量
tokens_per_chunk: int
# 解码方法,接受 List[int] 返回字符串
decode: callable
# 编码方法,接受 str 返回 List[int]
encode: callable
# 定义分词器分块函数
def split_text_on_tokens(text, tokenizer: Tokenizer):
# 使用分词器将文本切分为多个块
splits = []
# 用分词器编码文本,得到 token id 列表
input_ids = tokenizer.encode(text)
# 起始索引初始化为 0
start_idx = 0
# 循环遍历输入 token
while start_idx < len(input_ids):
# 计算当前块的结束索引,不超过总长度
cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
# 获取当前分块的 token id 子列表
chunk_ids = input_ids[start_idx:cur_idx]
# 若分块为空则结束循环
if not chunk_ids:
break
# 解码分块为字符串
decoded = tokenizer.decode(chunk_ids)
# 如果解码后字符串非空,则加入结果
if decoded:
splits.append(decoded)
# 如果已到末尾,则跳出循环
if cur_idx == len(input_ids):
break
# 下一步的起始索引,向前滑动 tokens_per_chunk - chunk_overlap
start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
# 返回所有切分好的文本块
return splits
# 定义 TokenTextSplitter 类,用于按 token 数量切分文本
class TokenTextSplitter(TextSplitter):
"""
Token 文本分割器:按 token 数分割文本。
依赖 tiktoken;chunk_size 与 chunk_overlap 都以 token 数计。
"""
# 构造函数,初始化编码方式等参数
def __init__(
self,
encoding_name,
**kwargs,
):
"""
Args:
**kwargs: 传递给基类的 chunk_size / chunk_overlap 等参数
"""
# 调用父类构造函数,传递相关参数
super().__init__(**kwargs)
# 默认配置
self._tokenizer = tiktoken.get_encoding(encoding_name)
# 按 token 数切分文本的主方法
def split_text(self, text):
"""按 token 数切分文本"""
# 定义内联的编码方法,传递特殊 token 设置
def _encode(x: str):
return self._tokenizer.encode(x)
# 创建一个 Tokenizer 实例
tokenizer = Tokenizer(
chunk_overlap=self._chunk_overlap,
tokens_per_chunk=self._chunk_size,
decode=self._tokenizer.decode,
encode=_encode,
)
# 使用分词器分块函数进行切分
return split_text_on_tokens(text=text, tokenizer=tokenizer)