1. RAGFlow Excel解析 #
1.1 什么是Excel解析器 #
RAGFlow的Excel解析器使用openpyxl库来读取Excel文件(.xlsx格式)。它可以将Excel表格转换为结构化的数据,方便后续处理和分块。
主要功能:
- 读取Excel文件的所有工作表
- 识别表头和数据行
- 将每行数据转换为文本格式
- 支持分块处理
1.2 基础使用示例 #
下面是一个完整的使用示例:
# 导入必要的库
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.abspath('.'))
# 导入RAGFlow的Excel解析器
from rag.app.table import Excel, chunk
# 定义回调函数(用于显示进度,可选)
def dummy(prog=None, msg=""):
if msg:
print(f"进度: {msg}")
# 定义Excel文件路径(请替换为你的Excel文件路径)
excel_file = './data/table_only2.xlsx'
# 方法一:直接解析(返回DataFrame列表)
print("方法一:直接解析")
excel_parser = Excel()
# 解析Excel文件
# 返回的是pandas DataFrame列表,每个DataFrame对应一个工作表
dataframes = excel_parser(excel_file, callback=dummy)
# 查看解析结果
print(f"共解析 {len(dataframes)} 个工作表")
for idx, df in enumerate(dataframes, 1):
print(f"\n 工作表 {idx} ")
print(f"行数: {len(df)}")
print(f"列数: {len(df.columns)}")
print(f"表头: {list(df.columns)}")
print(f"前3行数据:")
print(df.head(3))
# 方法二:解析并自动分块
print("\n方法二:解析并分块")
data_chunk = chunk(excel_file, callback=dummy)
# 查看分块结果
# 每个chunk对应一行数据,格式为:表头1:值1,表头2:值2,...
print(f"\n共生成 {len(data_chunk)} 个chunk")
for idx, data in enumerate(data_chunk[:5], 1): # 只显示前5个
print(f"\n Chunk {idx} ")
print(data['content_with_weight'])代码说明:
- 直接解析:使用
Excel类直接解析,返回pandas DataFrame列表 - 解析并分块:使用
chunk函数,自动将每行数据转换为chunk - 结果格式:每个chunk包含
content_with_weight(内容)和metadata(元数据)
1.3 重要注意事项 #
1.3.1 表头必须是第一行 #
RAGFlow的Excel解析器默认第一行为表头。如果第一行不是表头,解析会出现混乱。
示例:正确的Excel格式
| 姓名 | 年龄 | 城市 |
||||
| 张三 | 25 | 北京 |
| 李四 | 30 | 上海 |示例:错误的Excel格式(第一行不是表头)
| 员工信息表 | | |
||||
| 姓名 | 年龄 | 城市 |
||||
| 张三 | 25 | 北京 |如果遇到这种情况,需要使用自定义解析器(见后面章节)。
1.3.2 公式处理 #
对于包含公式的单元格,openpyxl默认读取公式本身,而不是计算结果。如果需要读取计算结果,需要使用data_only=True参数。
示例:处理公式
# 导入必要的库
from openpyxl import load_workbook
# 方法一:读取公式(默认)
wb = load_workbook('example.xlsx')
sheet = wb.active
cell_value = sheet['A1'].value # 如果A1是公式,返回公式本身,如 "=SUM(B1:B10)"
# 方法二:读取计算结果
wb = load_workbook('example.xlsx', data_only=True)
sheet = wb.active
cell_value = sheet['A1'].value # 返回计算结果,如 100注意:data_only=True需要Excel文件之前被打开并保存过,否则可能无法获取计算结果。
2. Excel解析的常见问题 #
2.1 问题一:第一行不是表头 #
如果Excel文件的第一行不是表头(比如是标题行),使用基础解析器会出现问题。
问题示例:
# 导入必要的库
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.abspath('.'))
# 导入RAGFlow的Excel解析器
from rag.app.table import chunk
# 定义回调函数
def dummy(prog=None, msg=""):
if msg:
print(f"进度: {msg}")
# 假设Excel文件第一行是标题,不是表头
excel_file = './data/table_only.xlsx'
# 使用基础解析器解析
data_chunk = chunk(excel_file, callback=dummy)
# 查看结果(可能会出现混乱)
for idx, data in enumerate(data_chunk[:3], 1):
print(f"\n Chunk {idx} ")
print(data['content_with_weight'])解决方案:使用自定义解析器,自动检测表头位置(见后面章节)。
2.2 问题二:合并单元格 #
Excel中经常有合并单元格的情况,基础解析器可能无法正确处理。
解决方案:自定义解析器可以:
- 检测合并单元格
- 将合并单元格的值填充到所有相关单元格
- 保持数据的完整性
2.3 问题三:多行表头 #
有些Excel表格有多行表头,需要合并处理。
示例:多行表头
| 基本信息 | 联系方式 |
| 姓名 | 年龄 | 电话 | 邮箱 |
|||||
| 张三 | 25 | 138 | zs@ |解决方案:自定义解析器可以自动检测并合并多行表头。
3. 自定义Excel解析器 #
3.1 为什么需要自定义解析器 #
虽然RAGFlow提供了基础Excel解析器,但在实际应用中,你可能需要:
- 自动检测表头:不要求第一行必须是表头
- 处理合并单元格:正确填充合并单元格的值
- 处理多行表头:自动合并多行表头
- 提取表格标题:识别表格标题行
- 表格总结:使用LLM对表格进行总结
- 按行分块:将表格的每一行作为独立的chunk
3.2 自定义解析器的实现思路 #
我们的自定义解析器将实现以下功能:
- 读取Excel文件:使用openpyxl读取所有工作表
- 处理合并单元格:检测并填充合并单元格的值
- 检测表头:自动识别表头位置
- 提取标题:识别表格标题行
- 合并多行表头:将多行表头合并为一行
- 表格转换:转换为HTML、键值对等格式
- 表格总结:使用LLM生成表格总结
- 分块处理:生成Document列表
3.3 完整实现示例 #
下面是一个完整的自定义Excel解析器实现:
# 导入必要的库
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.abspath('.'))
# 导入openpyxl相关
import openpyxl
from openpyxl import load_workbook
# 导入LangChain相关
from langchain_core.documents import Document
# 导入工具函数(需要自己实现或从utils导入)
from utils import table_to_summary
def is_header_row(row):
"""
判断一行是否是表头行
参数:
row: 一行数据(列表)
返回:
bool: 如果是表头行返回True,否则返回False
"""
# 空行不算表头
if not row:
return False
# 遍历行中的每个单元格
for cell in row:
# 如果单元格不是字符串,肯定不是表头
if not isinstance(cell, str):
return False
# 如果单元格包含数字,可能不是表头
# 但像"2023年"、"第1章"这样的还是可能是表头
if cell and any(c.isdigit() for c in cell.strip()):
# 特殊情况处理:像"2023年"、"第1章"这样的可能是表头
if not (cell.strip().endswith('年') or
(cell.strip().startswith('第') and cell.strip().endswith(('章','节')))):
return False
return True
def merge_headers(table_data):
"""
处理多行表头合并
参数:
table_data: 表格数据(二维列表)
返回:
list: 合并表头后的表格数据
"""
# 如果表格为空,直接返回
if not table_data:
return table_data
# 步骤1: 检测表头行
header_rows = []
for row in table_data:
if is_header_row(row):
header_rows.append(row)
else:
# 遇到非表头行,停止检测
break
# 如果没有检测到多行表头,直接返回原数据
if len(header_rows) <= 1:
return table_data
# 步骤2: 合并表头行
merged_header = []
num_cols = len(header_rows[0]) # 保持列数不变
# 遍历每一列
for col_idx in range(num_cols):
# 收集该列的所有表头单元格(去除空值)
column_headers = []
for row in header_rows:
if col_idx < len(row) and row[col_idx] and str(row[col_idx]).strip():
cell = str(row[col_idx]).strip()
# 避免重复
if cell not in column_headers:
column_headers.append(cell)
# 合并策略:
# - 如果该列所有表头相同,只保留一个
# - 否则用"/"连接不同的表头
if len(column_headers) == 1:
merged_header.append(column_headers[0])
elif column_headers:
merged_header.append("/".join(column_headers))
else:
merged_header.append("") # 保持列数
# 步骤3: 构建新表格:合并后的表头 + 剩余数据行
new_table = [merged_header] + table_data[len(header_rows):]
return new_table
def process_table(data):
"""
处理表格数据:
1. 删除整行和整列为空的记录
2. 统计一行中非空记录数最大的数量
3. 判断标题行
参数:
data: 原始表格数据(二维列表)
返回:
tuple: (处理后的表格数据, 标题文本)
"""
# 步骤1: 删除整行为空的记录
def is_empty_row(row):
"""判断一行是否为空"""
return all(cell is None or str(cell).strip() == '' for cell in row)
# 过滤掉空行
data = [row for row in data if not is_empty_row(row)]
# 如果过滤后没有数据,返回None
if not data:
return None, ''
# 步骤2: 删除整列为空的记录
# 找出非空列索引
non_empty_cols = set()
for row in data:
for col_idx, cell in enumerate(row):
if cell and str(cell).strip():
non_empty_cols.add(col_idx)
# 只保留非空列
data = [[cell for col_idx, cell in enumerate(row) if col_idx in non_empty_cols]
for row in data]
# 步骤3: 统计每行非空记录数
non_empty_counts = [sum(1 for cell in row if cell and str(cell).strip()) for row in data]
max_non_empty = max(non_empty_counts) if non_empty_counts else 0
# 步骤4: 判断标题行(检查前3行)
title_row_indices = []
for i in range(min(3, len(data))):
row = data[i]
# 计算非空且不重复的单元格数量
unique_non_empty = len({cell for cell in row if cell and str(cell).strip()})
# 如果非空单元格数量少于最大值的一半,可能是标题行
if unique_non_empty < max_non_empty / 2:
title_row_indices.append(i)
# 步骤5: 分离标题行和数据行
titles = []
data_rows = [row for i, row in enumerate(data)
if i not in title_row_indices]
# 提取标题文本
if title_row_indices:
# 获取所有标题行内容
title_rows = [data[i] for i in title_row_indices]
for row in title_rows:
# 去除空白字符
stripped = [s.strip() for s in row if s.strip()]
# 去重(保持顺序)
unique_words = list(dict.fromkeys(stripped))
# 合并
titles.append(" ".join(unique_words))
# 步骤6: 合并多行表头
processed_data = merge_headers(data_rows)
return processed_data, " ".join(titles) if titles else ''
def list_to_html_table(data):
"""
将表格列表转换为HTML表格
参数:
data: 二维列表,第一行为表头
返回:
str: HTML格式的表格字符串
"""
# 如果数据为空或没有表头,返回空字符串
if not data or len(data) < 1:
return ""
html = ["<table border='1'>"]
# 添加表头
html.append(" <tr>")
for header in data[0]:
html.append(f" <th>{header}</th>")
html.append(" </tr>")
# 添加数据行
for row in data[1:]:
html.append(" <tr>")
for cell in row:
html.append(f" <td>{cell}</td>")
html.append(" </tr>")
html.append("</table>")
return "\n".join(html)
def list_to_key_value(data):
"""
将表格数据转换为键值对格式(用于明细检索)
参数:
data: 二维列表,第一行为表头
返回:
list: 键值对格式的字符串列表
"""
# 如果数据为空或没有数据行,返回空列表
if not data or len(data) < 2:
return []
# 获取表头
headers = data[0]
result = []
# 遍历数据行
for row in data[1:]:
kv_pairs = []
# 将每行的数据转换为"表头:值"的格式
for i in range(min(len(headers), len(row))):
kv_pairs.append(f"{headers[i]}:{row[i]}")
result.append(",".join(kv_pairs))
return result
def read_and_process_excel(file_path, data_only=False):
"""
读取并处理Excel文件
参数:
file_path: Excel文件路径
data_only: 是否只读取数据(不读取公式)
返回:
list: 工作表列表,每个元素包含工作表名称和表格数据
"""
# 打开Excel文件
workbook = openpyxl.load_workbook(file_path, data_only=data_only)
results = []
# 遍历所有工作表
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
data = []
# 创建一个字典来存储单元格值
cell_values = {}
# 步骤1: 遍历所有单元格,获取初始值
for row in sheet.iter_rows():
for cell in row:
cell_values[cell.coordinate] = cell.value
# 步骤2: 处理合并单元格
# 遍历所有合并单元格
for merged_range in sheet.merged_cells.ranges:
# 获取合并区域的最小和最大行列
min_col, min_row = merged_range.min_col, merged_range.min_row
max_col, max_row = merged_range.max_col, merged_range.max_row
# 获取合并单元格的值(通常位于合并区域的左上角)
value = sheet.cell(row=min_row, column=min_col).value
# 将值填充到所有合并区域的单元格中
for row in range(min_row, max_row + 1):
for col in range(min_col, max_col + 1):
# 将行列号转换为单元格坐标(如A1, B2)
cell_coordinate = openpyxl.utils.get_column_letter(col) + str(row)
cell_values[cell_coordinate] = value
# 步骤3: 构建表格数据
max_row = sheet.max_row
max_col = sheet.max_column
for row in range(1, max_row + 1):
row_values = []
for col in range(1, max_col + 1):
# 获取单元格坐标
cell_coordinate = openpyxl.utils.get_column_letter(col) + str(row)
# 获取单元格值,如果为空则返回空字符串
cell_value = cell_values.get(cell_coordinate, None)
# 将值转换为字符串,并去除换行符
row_values.append(str(cell_value).replace('\n', '') if cell_value is not None else '')
data.append(row_values)
# 保存工作表信息
results.append({
"name": sheet_name,
"table": data
})
return results
class MyExcel(object):
"""
自定义Excel解析器
功能:
1. 自动检测表头位置
2. 处理合并单元格
3. 处理多行表头
4. 提取表格标题
5. 表格总结和分块
"""
def __call__(self, fnm):
"""
解析Excel文件
参数:
fnm: Excel文件路径
返回:
list: 表格数据列表,每个元素包含工作表名称、标题和数据
"""
# 步骤1: 读取并处理Excel文件
# data_only=True表示只读取数据,不读取公式
excel_result = read_and_process_excel(fnm, data_only=True)
table_datas = []
# 步骤2: 处理每个工作表
for table in excel_result:
table_name = table['name']
# 如果表格数据少于2行,跳过(至少需要表头和数据行)
if len(table['table']) < 2:
continue
# 步骤3: 处理表格数据,提取表格标题和合并多行的表头
data, title = process_table(table['table'])
# 如果处理后的数据为空,跳过
if not data:
continue
# 保存表格信息
table_datas.append({
"name": table_name, # 工作表名称
"title": title, # 表格标题
"data": data # 处理后的表格数据
})
return table_datas
def chunk(self, fnm, llm=None):
"""
解析Excel文件并生成Document列表
参数:
fnm: Excel文件路径
llm: 语言模型实例(用于表格总结)
返回:
list: LangChain Document列表
"""
# 步骤1: 解析Excel文件
table_datas = self(fnm)
# 如果没有提供LLM,创建一个(需要从model导入)
if llm is None:
from model import RagLLM
llm = RagLLM()
docs = []
# 步骤2: 处理每个表格
for table_info in table_datas:
# 步骤2.1: 将表格数据转换为HTML格式
table_html = list_to_html_table(table_info["data"])
# 步骤2.2: 将表格数据转换为键值对格式(用于明细检索)
table_lines = list_to_key_value(table_info["data"])
# 步骤2.3: 构建表格标题(工作表名称 + 表格标题)
table_caption = f'{table_info["name"]} {table_info["title"]}'
# 步骤2.4: 构建完整的表格内容(标题 + HTML)
table_chunk = f"{table_caption} {table_html}"
# 步骤2.5: 使用LLM对表格进行总结
table_summary = table_to_summary(table_chunk, llm)
# 步骤2.6: 创建表格总结Document
# page_content是总结,metadata中保存原始表格内容
doc = Document(
page_content=table_summary,
metadata={
"source": "table",
"content": table_chunk # 原始表格内容保存在metadata中
}
)
docs.append(doc)
# 步骤2.7: 为表格的每一行创建明细Document(用于精确检索)
for row in table_lines:
doc = Document(
page_content=f"{table_caption} 明细项:{row}",
metadata={
"source": "table-row",
"content": ""
}
)
docs.append(doc)
return docs
# 使用示例
if __name__ == "__main__":
# 定义Excel文件路径
excel_file = './data/table_only.xlsx'
# 创建自定义解析器实例
my_excel_parser = MyExcel()
# 解析Excel文件
print("正在解析Excel文件...")
table_datas = my_excel_parser(excel_file)
print(f"共解析 {len(table_datas)} 个表格")
for idx, table_info in enumerate(table_datas, 1):
print(f"\n 表格 {idx} ")
print(f"工作表: {table_info['name']}")
print(f"标题: {table_info['title']}")
print(f"数据行数: {len(table_info['data']) - 1}") # 减去表头行
# 解析并分块
print("\n正在生成Document列表...")
from model import RagLLM
llm = RagLLM()
documents = my_excel_parser.chunk(excel_file, llm=llm)
# 查看结果统计
table_count = sum(1 for doc in documents if doc.metadata['source'] == 'table')
table_row_count = sum(1 for doc in documents if doc.metadata['source'] == 'table-row')
print(f"\n解析完成!")
print(f"表格总结chunks: {table_count} 个")
print(f"表格行chunks: {table_row_count} 个")
print(f"总计: {len(documents)} 个Document")
# 查看示例结果
print("\n========== 示例结果 ==========")
for idx, doc in enumerate(documents[:3], 1):
print(f"\n Document {idx} ({doc.metadata['source']}) ")
content_preview = doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content
print(f"内容: {content_preview}")3.4 关键功能说明 #
3.4.1 合并单元格处理 #
合并单元格是Excel中的常见情况。我们的实现:
- 检测合并单元格:使用
sheet.merged_cells.ranges获取所有合并区域 - 获取合并值:从合并区域的左上角单元格获取值
- 填充值:将值填充到合并区域的所有单元格
3.4.2 表头检测 #
表头检测的逻辑:
- 判断规则:表头行通常都是文本,不包含数字(特殊情况除外)
- 多行表头:连续的表头行会被合并
- 标题行:非空单元格数量少于数据行的行可能是标题行
3.4.3 表格转换 #
表格可以转换为多种格式:
- HTML格式:用于显示和保存结构化信息
- 键值对格式:用于精确检索("表头:值"的格式)
- 总结格式:使用LLM生成的表格总结
4. 完整使用示例 #
下面是一个完整的示例,演示如何使用自定义解析器:
# 导入必要的库
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.abspath('.'))
# 导入自定义解析器(假设已经保存为my_excel_parser.py)
from my_excel_parser import MyExcel
from model import RagLLM
# 定义Excel文件路径
excel_file = './data/table_only.xlsx'
# 创建LLM实例(用于表格总结)
llm = RagLLM()
# 创建自定义解析器实例
my_excel_parser = MyExcel()
# 解析Excel文件并生成Document列表
print("开始解析Excel文件...")
documents = my_excel_parser.chunk(excel_file, llm=llm)
# 查看结果统计
table_count = sum(1 for doc in documents if doc.metadata['source'] == 'table')
table_row_count = sum(1 for doc in documents if doc.metadata['source'] == 'table-row')
print(f"\n解析完成!")
print(f"表格总结chunks: {table_count} 个")
print(f"表格行chunks: {table_row_count} 个")
print(f"总计: {len(documents)} 个Document")
# 查看示例结果
print("\n========== 示例结果 ==========")
for idx, doc in enumerate(documents[:5], 1):
print(f"\n Document {idx} ({doc.metadata['source']}) ")
content_preview = doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content
print(f"内容: {content_preview}")
if doc.metadata.get('content'):
print(f"原始内容: {doc.metadata['content'][:100]}...")
# 可以保存到向量数据库
# from langchain_chroma import Chroma
# from model import RagEmbedding
#
# embedding_model = RagEmbedding()
# vector_db = Chroma.from_documents(
# documents,
# embedding_model.get_embedding_fun(),
# collection_name="my_excel_collection"
# )5. 总结与最佳实践 #
5.1 Excel解析 vs 其他格式解析 #
| 特性 | Excel解析 | PDF解析 | Word解析 | ||-||-| | 数据格式 | 结构化表格 | 渲染后的图片 | 结构化文档 | | OCR需求 | 不需要 | 需要 | 不需要 | | 表头处理 | 需要识别 | 布局识别 | 直接读取 | | 合并单元格 | 需要处理 | 表格识别 | 直接读取 | | 公式处理 | 需要选择 | 不适用 | 不适用 |
5.2 常见问题处理 #
问题1:第一行不是表头
- 解决方案:使用自定义解析器,自动检测表头位置
问题2:合并单元格处理不当
- 解决方案:在读取时检测合并单元格,填充值到所有相关单元格
问题3:多行表头
- 解决方案:检测连续的表头行,合并为一行
问题4:公式读取问题
- 解决方案:使用
data_only=True读取计算结果,但需要文件之前被打开过
5.3 性能优化建议 #
- 批量处理:对于大量Excel文件,考虑并行处理
- 缓存结果:解析结果可以缓存,避免重复解析
- 选择性读取:如果只需要特定工作表,可以只读取需要的部分