1.本章目标 #
本章将介绍如何在 rag-lite 项目中管理和扩展知识库。你将学习知识库的基本目录结构、核心模型与服务的作用,以及知识库存储扩展(如本地存储与 MinIO)的配置和使用方法。通过本章,你可以掌握知识库的基本操作流程和自定义能力,为后续集成与智能问答打下基础。
2.目录结构 #
# 项目根目录
rag-lite/
# 应用目录
├── app/
# 蓝图目录
│ ├── blueprints/
# 蓝图初始化文件
│ │ ├── __init__.py
# 认证相关蓝图
│ │ ├── auth.py
# 知识库相关蓝图
│ │ ├── knowledgebase.py
# 通用工具蓝图
│ │ └── utils.py
# 模型目录
│ ├── models/
# 模型初始化文件
│ │ ├── __init__.py
# 基础模型
│ │ ├── base.py
# 聊天消息模型
│ │ ├── chat_message.py
# 聊天会话模型
│ │ ├── chat_session.py
# 文档模型
│ │ ├── document.py
# 知识库模型
│ │ ├── knowledgebase.py
# 设置模型
│ │ ├── settings.py
# 用户模型
│ │ └── user.py
# 服务层目录
│ ├── services/
# 存储相关服务
│ ├── storage/
# 存储初始化文件
│ │ ├── __init__.py
# 存储基类
│ │ ├── base.py
# 存储工厂
│ │ ├── factory.py
# 本地存储实现
│ │ ├── local_storage.py
# minio 存储实现
│ │ └── minio_storage.py
# 服务层基类
│ ├── base_service.py
# 知识库服务
│ ├── knowledgebase_service.py
# 存储服务
│ ├── storage_service.py
# 用户服务
│ └── user_service.py
# 静态文件目录
│ ├── static/
# 模板文件目录
│ ├── templates/
# 公共页面模板
│ ├── base.html
# 主页模板
│ ├── home.html
# 知识库列表模板
│ ├── kb_list.html
# 登录页面模板
│ ├── login.html
# 注册页面模板
│ └── register.html
# 工具函数目录
│ ├── utils/
# 认证工具
│ ├── auth.py
# 数据库工具
│ ├── db.py
# 日志工具
│ └── logger.py
# 应用初始化文件
│ ├── __init__.py
# 应用配置文件
│ └── config.py
# 日志文件目录
├── logs/
# 主日志文件
│ └── rag_lite.log
# 存储文件目录
├── storages/
# 封面图片或其他资源
│ └── covers/
# 启动入口
├── main.py
# 规划文档
├── plan.md
# Python 构建配置文件
└── pyproject.toml3.创建知识库 #
3.1. knowledgebase.py #
app/blueprints/knowledgebase.py
# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""
# 导入Flask中的Blueprint和request
from flask import Blueprint,request
# 导入logging模块
import logging
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 配置logger
logger = logging.getLogger(__name__)
# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)
# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
def api_create():
# 设置接口功能描述
"""创建知识库"""
# 从请求中解析JSON数据
data = request.get_json()
# 校验数据是否存在以及name键是否存在
if not data or 'name' not in data:
# 若校验失败,返回错误响应
return error_response("name is required", 400)
# 获取知识库名称
name = data['name']
# 获取用户id,可为空
user_id = data.get('user_id')
# 获取知识库描述,可为空
description = data.get('description')
# 获取分块大小,默认为512
chunk_size = data.get('chunk_size', 512)
# 获取分块重叠,默认为50
chunk_overlap = data.get('chunk_overlap', 50)
# 调用知识库服务创建知识库,返回信息字典
kb_dict = kb_service.create(
name=name,
user_id=user_id,
description=description,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
# 返回创建成功的知识库信息
return success_response(kb_dict)3.2. utils.py #
app/blueprints/utils.py
"""
路由工具函数
"""
# 导入Flask用于返回JSON响应
from flask import jsonify
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)
# 定义成功响应函数
def success_response(data=None, message="success"):
"""
成功响应
Args:
data: 响应数据
message: 响应消息
Returns:
JSON 响应
"""
# 返回标准格式的JSON成功响应
return jsonify({
"code": 200, # 状态码200,表示成功
"message": message, # 响应消息
"data": data # 响应数据
})
# 定义错误响应函数
def error_response(message: str, code: int = 400):
"""
错误响应
Args:
message: 错误消息
code: HTTP 状态码
Returns:
JSON 响应和状态码
"""
# 返回标准格式的JSON错误响应,以及相应的HTTP状态码
return jsonify({
"code": code, # 错误码,对应HTTP状态码
"message": message, # 错误消息
"data": None # 错误时无数据
}), code
# 定义API错误处理装饰器
def handle_api_error(func):
"""
API 错误处理装饰器
使用示例:
@handle_api_error
def my_api():
# API 逻辑
return success_response(data)
"""
# 保留原函数信息并定义包装器
@wraps(func)
def wrapper(*args, **kwargs):
try:
# 正常执行被装饰的API函数
return func(*args, **kwargs)
except ValueError as e:
# 捕获ValueError,日志记录warning信息并返回400错误响应
logger.warning(f"ValueError in {func.__name__}: {e}")
return error_response(str(e), 400)
except Exception as e:
# 捕获其他所有异常,日志记录error信息并返回500错误响应
logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
return error_response(str(e), 500)
# 返回包装后的函数
return wrapper
3.3. knowledgebase_service.py #
app/services/knowledgebase_service.py
# 从基础服务导入BaseService类
from app.services.base_service import BaseService
# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
"""知识库服务"""
# 定义创建知识库的方法
def create(self, name: str, user_id: str, description: str = None,
chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
"""
创建知识库
Args:
name: 知识库名称
user_id: 用户ID
description: 描述
chunk_size: 分块大小
chunk_overlap: 分块重叠
Returns:
创建的知识库字典
"""
# 启动数据库事务,上下文管理器自动处理提交或回滚
with self.transaction() as session:
# 先创建知识库对象
kb = Knowledgebase(
name=name, # 设置知识库名称
user_id=user_id, # 设置用户ID
description=description, # 设置知识库描述
chunk_size=chunk_size, # 设置分块大小
chunk_overlap=chunk_overlap # 设置分块重叠
)
# 将知识库对象添加到session
session.add(kb)
# 刷新session,生成知识库ID
session.flush() # 刷新以获取 ID,但不提交
# 刷新kb对象的数据库状态
session.refresh(kb)
# 转换kb对象为字典(在session内部,避免分离后出错)
kb_dict = kb.to_dict()
# 记录创建知识库的日志,包含ID
self.logger.info(f"创建了知识库,ID: {kb.id}")
# 返回知识库字典信息
return kb_dict
# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()
3.4. init.py #
app/init.py
# RAG Lite 应用模块说明
"""
RAG Lite Application
"""
# 导入操作系统相关模块
import os
# 从 Flask 包导入 Flask 应用对象
from flask import Flask
# 导入 Flask 跨域资源共享支持
from flask_cors import CORS
# 导入应用配置类
from app.config import Config
# 导入日志工具,用于获取日志记录器
from app.utils.logger import get_logger
# 导入数据库初始化函数
from app.utils.db import init_db
# 导入蓝图模块
+from app.blueprints import auth,knowledgebase
# 导入获取当前用户信息函数
from app.utils.auth import get_current_user
# 定义创建 Flask 应用的工厂函数
def create_app(config_class=Config):
# 获取日志记录器,名称为当前模块名
logger = get_logger(__name__)
# 尝试初始化数据库
try:
# 输出日志,表示即将初始化数据库
logger.info("初始化数据库...")
# 执行数据库初始化函数
init_db()
# 输出日志,表示数据库初始化成功
logger.info("数据库初始化成功")
# 捕获任意异常
except Exception as e:
# 输出警告日志,提示数据库初始化失败,并输出异常信息
logger.warning(f"数据库初始化失败: {e}")
# 输出警告日志,提示检查数据库是否已存在,并建议手动创建数据表
logger.warning("请确认数据库已存在,或手动创建数据表")
# 创建 Flask 应用对象,并指定模板和静态文件目录
base_dir = os.path.abspath(os.path.dirname(__file__))
# 创建 Flask 应用对象,并指定模板和静态文件目录
app = Flask(
__name__,
# 指定模板文件目录
template_folder=os.path.join(base_dir, 'templates'),
# 指定静态文件目录
static_folder=os.path.join(base_dir, 'static')
)
# 从给定配置类加载配置信息到应用
app.config.from_object(config_class)
# 启用跨域请求支持
CORS(app)
# 记录应用创建日志信息
logger.info("Flask 应用已创建")
# 注册上下文处理器,使 current_user 在所有模板中可用
@app.context_processor
def inject_user():
# 返回当前用户信息字典
# 使用 get_current_user 获取当前用户信息,并将其添加到上下文字典中
# 这样在模板中可以直接使用 current_user 变量
return dict(current_user=get_current_user())
# 注册蓝图
app.register_blueprint(auth.bp)
# 注册知识库蓝图
+ app.register_blueprint(knowledgebase.bp)
# 定义首页路由
@app.route('/')
def index():
return "Hello, World!"
# 返回已配置的 Flask 应用对象
return app4.知识库列表 #
4.1. kb_list.html #
app/templates/kb_list.html
{% extends "base.html" %}
{% block title %}知识库管理 - RAG Lite{% endblock %}
{% block content %}
<style>
@media (min-width: 992px) {
#kbList > div {
flex: 0 0 20%;
max-width: 20%;
}
}
</style>
<div class="row">
<div class="col-12">
<nav aria-label="breadcrumb" class="mb-3">
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href="/">首页</a></li>
<li class="breadcrumb-item active">知识库管理</li>
</ol>
</nav>
<div class="d-flex justify-content-between align-items-center mb-4">
<h2><i class="bi bi-collection"></i> 知识库管理</h2>
</div>
<!-- 知识库列表 -->
<div class="row" id="kbList">
{% if kbs %}
{% for kb in kbs %}
<div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
<div class="card h-100">
<div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
<i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
</div>
<div class="card-body">
<h5 class="card-title">
<i class="bi bi-folder"></i> {{ kb.name }}
</h5>
<p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
</div>
</div>
</div>
{% endfor %}
{% else %}
<div class="col-12">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
</div>
</div>
{% endif %}
</div>
<!-- 分页控件 -->
{% if pagination and pagination.total > pagination.page_size %}
<nav aria-label="知识库列表分页" class="mt-4">
<ul class="pagination justify-content-center">
{% set current_page = pagination.page %}
{% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}
<!-- 上一页 -->
<li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
<i class="bi bi-chevron-left"></i> 上一页
</a>
</li>
<!-- 页码 -->
{% set start_page = [1, current_page - 2] | max %}
{% set end_page = [total_pages, current_page + 2] | min %}
{% if start_page > 1 %}
<li class="page-item">
<a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
</li>
{% if start_page > 2 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endif %}
{% for page_num in range(start_page, end_page + 1) %}
<li class="page-item {% if page_num == current_page %}active{% endif %}">
<a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
{{ page_num }}
</a>
</li>
{% endfor %}
{% if end_page < total_pages %}
{% if end_page < total_pages - 1 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
<li class="page-item">
<a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
</li>
{% endif %}
<!-- 下一页 -->
<li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
下一页 <i class="bi bi-chevron-right"></i>
</a>
</li>
</ul>
<div class="text-center text-muted small mt-2">
共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
</div>
</nav>
{% endif %}
</div>
</div>
{% endblock %}
{% block extra_js %}
<script>
</script>
{% endblock %}
4.2. knowledgebase.py #
app/blueprints/knowledgebase.py
# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""
# 导入Flask中的Blueprint和request
+from flask import Blueprint,request,render_template
# 导入logging模块
import logging
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
+from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
+from app.blueprints.utils import (get_pagination_params)
# 配置logger
logger = logging.getLogger(__name__)
# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)
# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
+@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
def api_create():
# 设置接口功能描述
"""创建知识库"""
# 从请求中解析JSON数据
data = request.get_json()
# 校验数据是否存在以及name键是否存在
if not data or 'name' not in data:
# 若校验失败,返回错误响应
return error_response("name is required", 400)
# 获取知识库名称
name = data['name']
# 获取用户id,可为空
user_id = data.get('user_id')
# 获取知识库描述,可为空
description = data.get('description')
# 获取分块大小,默认为512
chunk_size = data.get('chunk_size', 512)
# 获取分块重叠,默认为50
chunk_overlap = data.get('chunk_overlap', 50)
# 调用知识库服务创建知识库,返回信息字典
kb_dict = kb_service.create(
+ name=name,#知识库名称
+ user_id=user_id,#用户ID
+ description=description,#知识库描述
+ chunk_size=chunk_size,#分块大小
+ chunk_overlap=chunk_overlap,#分块重叠
)
# 返回创建成功的知识库信息
return success_response(kb_dict)
# 注册'/kb'路由,处理GET请求,显示知识库列表页面
+@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
+@login_required
# 定义kb_list函数,渲染知识库列表页面
+def kb_list():
# 设置本函数用途说明(文档字符串)
+ """知识库列表页面"""
# 获取当前登录用户信息
+ current_user = get_current_user()
# 获取分页参数(页码和每页大小),最大每页100
+ page, page_size = get_pagination_params(max_page_size=100)
# 调用知识库服务,获取分页后的知识库列表结果
+ result = kb_service.list(
+ user_id=current_user['id'], # 用户ID
+ page=page, # 页码
+ page_size=page_size # 每页大小
+ )
# 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
+ return render_template('kb_list.html',
+ kbs=result['items'],
+ pagination=result) 4.3. utils.py #
app/blueprints/utils.py
"""
路由工具函数
"""
# 导入Flask用于返回JSON响应
+from flask import jsonify,request
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入类型提示工具
+from typing import Tuple, Optional
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)
# 定义成功响应函数
def success_response(data=None, message="success"):
"""
成功响应
Args:
data: 响应数据
message: 响应消息
Returns:
JSON 响应
"""
# 返回标准格式的JSON成功响应
return jsonify({
"code": 200, # 状态码200,表示成功
"message": message, # 响应消息
"data": data # 响应数据
})
# 定义错误响应函数
def error_response(message: str, code: int = 400):
"""
错误响应
Args:
message: 错误消息
code: HTTP 状态码
Returns:
JSON 响应和状态码
"""
# 返回标准格式的JSON错误响应,以及相应的HTTP状态码
return jsonify({
"code": code, # 错误码,对应HTTP状态码
"message": message, # 错误消息
"data": None # 错误时无数据
}), code
# 定义API错误处理装饰器
def handle_api_error(func):
"""
API 错误处理装饰器
使用示例:
@handle_api_error
def my_api():
# API 逻辑
return success_response(data)
"""
# 保留原函数信息并定义包装器
@wraps(func)
def wrapper(*args, **kwargs):
try:
# 正常执行被装饰的API函数
return func(*args, **kwargs)
except ValueError as e:
# 捕获ValueError,日志记录warning信息并返回400错误响应
logger.warning(f"ValueError in {func.__name__}: {e}")
return error_response(str(e), 400)
except Exception as e:
# 捕获其他所有异常,日志记录error信息并返回500错误响应
logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
return error_response(str(e), 500)
# 返回包装后的函数
return wrapper
# 定义获取分页参数的函数,允许指定最大每页数量
+def get_pagination_params(max_page_size: int = 1000) -> Tuple[int, int]:
+ """
+ 获取分页参数
+ Args:
+ max_page_size: 最大每页数量
+ Returns:
+ (page, page_size) 元组
+ """
# 获取请求中的 'page' 参数,默认为1,并将其转换为整数
+ page = int(request.args.get('page', 1))
# 获取请求中的 'page_size' 参数,默认为10,并将其转换为整数
+ page_size = int(request.args.get('page_size', 10))
# 保证 page 至少为1
+ page = max(1, page)
# 保证 page_size 至少为1且不超过 max_page_size
+ page_size = max(1, min(page_size, max_page_size))
# 返回分页的(page, page_size)元组
+ return page, page_size4.4. knowledgebase_service.py #
app/services/knowledgebase_service.py
# 知识库服务
+"""
+知识库服务
+"""
# 导入类型提示工具
+from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService
# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
"""知识库服务"""
# 定义创建知识库的方法
def create(self, name: str, user_id: str, description: str = None,
chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
"""
创建知识库
Args:
name: 知识库名称
user_id: 用户ID
description: 描述
chunk_size: 分块大小
chunk_overlap: 分块重叠
Returns:
创建的知识库字典
"""
# 启动数据库事务,上下文管理器自动处理提交或回滚
with self.transaction() as session:
# 先创建知识库对象
kb = Knowledgebase(
name=name, # 设置知识库名称
user_id=user_id, # 设置用户ID
description=description, # 设置知识库描述
chunk_size=chunk_size, # 设置分块大小
chunk_overlap=chunk_overlap # 设置分块重叠
)
# 将知识库对象添加到session
session.add(kb)
# 刷新session,生成知识库ID
session.flush() # 刷新以获取 ID,但不提交
# 刷新kb对象的数据库状态
session.refresh(kb)
# 转换kb对象为字典(在session内部,避免分离后出错)
kb_dict = kb.to_dict()
# 记录创建知识库的日志,包含ID
self.logger.info(f"创建了知识库,ID: {kb.id}")
# 返回知识库字典信息
return kb_dict
# 定义获取知识库列表的方法
+ def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
+ """
+ 获取知识库列表
+ Args:
+ user_id: 用户ID(可选)
+ page: 页码
+ page_size: 每页数量
+ Returns:
+ 包含 items, total, page, page_size 的字典
+ """
# 使用数据库会话
+ with self.session() as session:
# 查询Knowledgebase表
+ query = session.query(Knowledgebase)
# 如果指定了user_id,则筛选属于该用户的知识库
+ if user_id:
+ query = query.filter(Knowledgebase.user_id == user_id)
# 统计总记录数
+ total = query.count()
# 计算分页偏移量
+ offset = (page - 1) * page_size
# 获取当前页的数据列表
+ kbs = query.offset(offset).limit(page_size).all()
# 初始化知识库字典列表
+ items = []
# 遍历查询结果,将每一项转为dict后添加到items列表
+ for kb in kbs:
+ kb_dict = kb.to_dict()
+ items.append(kb_dict)
# 返回包含分页信息和数据条目的字典
+ return {
+ 'items': items,
+ 'total': total,
+ 'page': page,
+ 'page_size': page_size
+ }
# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()
4.5. auth.py #
app/utils/auth.py
# 认证工具
"""
认证工具
"""
# 从 Flask 导入 session 和 g(全局对象)
+from flask import session, redirect, url_for, request, g
# 导入装饰器工具,用来保持原函数信息
+from functools import wraps
# 导入日志库 logging
import logging
# 导入 user_service 服务
from app.services.user_service import user_service
# 获取当前模块的日志记录器实例
logger = logging.getLogger(__name__)
# 定义获取当前登录用户信息的函数
def get_current_user():
"""
获取当前登录用户信息
使用 Flask 的 g 对象缓存,避免重复查询
Returns:
用户信息字典,如果未登录则返回 None
"""
# 如果 g 对象没有 current_user 属性
if not hasattr(g, 'current_user'):
# 如果会话中有 user_id 字段
if 'user_id' in session:
# 通过 user_id 获取用户信息并缓存到 g.current_user
g.current_user = user_service.get_by_id(session['user_id'])
else:
# 如果没有登录,将 g.current_user 设为 None
g.current_user = None
# 返回当前用户信息
return g.current_user
# 定义 API 登录认证装饰器
+def api_login_required(f):
+ """
+ API 登录装饰器
+ 用于 API 端点,返回 JSON 错误响应而不是重定向
+ """
# 保持被装饰函数的元信息
+ @wraps(f)
# 定义装饰后的函数
+ def decorated_function(*args, **kwargs):
# 判断 session 中是否没有 user_id,未登录状态
+ if 'user_id' not in session:
# 延迟导入 jsonify 防止循环引用
+ from flask import jsonify
# 返回未授权的 JSON 错误响应和 401 状态码
+ return jsonify({
+ "code": 401,
+ "message": "Unauthorized",
+ "data": None
+ }), 401
# 如果已登录,正常执行原函数
+ return f(*args, **kwargs)
# 返回包装后的函数
+ return decorated_function
# 定义 API 登录认证装饰器
+def api_login_required(f):
+ """
+ API 登录装饰器
+ 用于 API 端点,返回 JSON 错误响应而不是重定向
+ """
# 使用 wraps 保持原函数的元信息
+ @wraps(f)
+ def decorated_function(*args, **kwargs):
# 如果 session 中没有 user_id,说明未登录
+ if 'user_id' not in session:
# 延迟导入 jsonify,避免循环引用
+ from flask import jsonify
# 返回未授权的 JSON 响应,状态码 401
+ return jsonify({
+ "code": 401,
+ "message": "Unauthorized",
+ "data": None
+ }), 401
# 已登录则继续执行原函数
+ return f(*args, **kwargs)
# 返回包装后的函数
+ return decorated_function
+def login_required(f):
+ """
+ 登录装饰器
+ 需要登录才能访问的页面使用此装饰器
+ """
+ @wraps(f)
+ def decorated_function(*args, **kwargs):
+ if 'user_id' not in session:
# 保存原始请求的URL,登录后可以重定向回去
+ return redirect(url_for('auth.login', next=request.url))
+ return f(*args, **kwargs)
+ return decorated_function 5.创建知识库 #
5.1. knowledgebase.py #
app/blueprints/knowledgebase.py
# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""
# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template
# 导入logging模块
import logging
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
+from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params)
# 配置logger
logger = logging.getLogger(__name__)
# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)
# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
# 接口用途说明文档字符串
"""创建知识库"""
# 获取当前用户,如未登录则返回错误响应
+ current_user, err = get_current_user_or_error()
+ if err:
+ return err
# 检查请求是否为multipart/form-data(用于文件上传的表单方式)
+ if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单数据中获取知识库名称
+ name = request.form.get('name')
# 如果未传入name参数,返回错误
+ if not name:
+ return error_response("name is required", 400)
# 获取描述字段,没有则为None
+ description = request.form.get('description') or None
# 获取分块大小,默认为512
+ chunk_size = int(request.form.get('chunk_size', 512))
# 获取分块重叠,默认为50
+ chunk_overlap = int(request.form.get('chunk_overlap', 50))
+ else:
# 如果是json请求数据(向后兼容旧用法)
+ data = request.get_json()
# 判断是否存在name字段,不存在则报错
+ if not data or 'name' not in data:
+ return error_response("name is required", 400)
# 获取知识库名称
+ name = data['name']
# 获取描述
+ description = data.get('description')
# 获取分块大小,默认为512
+ chunk_size = data.get('chunk_size', 512)
# 获取分块重叠,默认为50
+ chunk_overlap = data.get('chunk_overlap', 50)
# 调用知识库服务,创建知识库,返回知识库信息字典
kb_dict = kb_service.create(
+ name=name, # 知识库名称
+ user_id=current_user['id'], # 用户ID
+ description=description, # 知识库描述
+ chunk_size=chunk_size, # 分块大小
+ chunk_overlap=chunk_overlap, # 分块重叠
)
# 返回成功响应,包含知识库信息
return success_response(kb_dict)
# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
# 设置本函数用途说明(文档字符串)
"""知识库列表页面"""
# 获取当前登录用户信息
current_user = get_current_user()
# 获取分页参数(页码和每页大小),最大每页100
page, page_size = get_pagination_params(max_page_size=100)
# 调用知识库服务,获取分页后的知识库列表结果
result = kb_service.list(
user_id=current_user['id'], # 用户ID
page=page, # 页码
page_size=page_size # 每页大小
)
# 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
return render_template('kb_list.html',
kbs=result['items'],
pagination=result) 5.2. utils.py #
app/blueprints/utils.py
"""
路由工具函数
"""
# 导入Flask用于返回JSON响应
from flask import jsonify,request
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入类型提示工具
from typing import Tuple, Optional
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)
# 定义成功响应函数
def success_response(data=None, message="success"):
"""
成功响应
Args:
data: 响应数据
message: 响应消息
Returns:
JSON 响应
"""
# 返回标准格式的JSON成功响应
return jsonify({
"code": 200, # 状态码200,表示成功
"message": message, # 响应消息
"data": data # 响应数据
})
# 定义错误响应函数
def error_response(message: str, code: int = 400):
"""
错误响应
Args:
message: 错误消息
code: HTTP 状态码
Returns:
JSON 响应和状态码
"""
# 返回标准格式的JSON错误响应,以及相应的HTTP状态码
return jsonify({
"code": code, # 错误码,对应HTTP状态码
"message": message, # 错误消息
"data": None # 错误时无数据
}), code
# 定义API错误处理装饰器
def handle_api_error(func):
"""
API 错误处理装饰器
使用示例:
@handle_api_error
def my_api():
# API 逻辑
return success_response(data)
"""
# 保留原函数信息并定义包装器
@wraps(func)
def wrapper(*args, **kwargs):
try:
# 正常执行被装饰的API函数
return func(*args, **kwargs)
except ValueError as e:
# 捕获ValueError,日志记录warning信息并返回400错误响应
logger.warning(f"ValueError in {func.__name__}: {e}")
return error_response(str(e), 400)
except Exception as e:
# 捕获其他所有异常,日志记录error信息并返回500错误响应
logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
return error_response(str(e), 500)
# 返回包装后的函数
return wrapper
# 定义获取分页参数的函数,允许指定最大每页数量
def get_pagination_params(max_page_size: int = 1000) -> Tuple[int, int]:
"""
获取分页参数
Args:
max_page_size: 最大每页数量
Returns:
(page, page_size) 元组
"""
# 获取请求中的 'page' 参数,默认为1,并将其转换为整数
page = int(request.args.get('page', 1))
# 获取请求中的 'page_size' 参数,默认为10,并将其转换为整数
page_size = int(request.args.get('page_size', 10))
# 保证 page 至少为1
page = max(1, page)
# 保证 page_size 至少为1且不超过 max_page_size
page_size = max(1, min(page_size, max_page_size))
# 返回分页的(page, page_size)元组
return page, page_size
# 定义获取当前用户或返回错误的函数
+def get_current_user_or_error():
+ """
+ 获取当前用户,如果未登录则返回错误响应
+ Returns:
+ 如果成功返回 (user_dict, None),如果失败返回 (None, error_response)
+ """
# 调用 get_current_user() 获取当前用户对象
+ current_user = get_current_user()
# 如果没有获取到用户,则返回 (None, 错误响应)
+ if not current_user:
+ return None, error_response("Unauthorized", 401)
# 如果获取到用户,则返回 (用户对象, None)
+ return current_user, None5.3. kb_list.html #
app/templates/kb_list.html
{% extends "base.html" %}
{% block title %}知识库管理 - RAG Lite{% endblock %}
{% block content %}
<style>
@media (min-width: 992px) {
#kbList > div {
flex: 0 0 20%;
max-width: 20%;
}
}
</style>
<div class="row">
<div class="col-12">
<nav aria-label="breadcrumb" class="mb-3">
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href="/">首页</a></li>
<li class="breadcrumb-item active">知识库管理</li>
</ol>
</nav>
<div class="d-flex justify-content-between align-items-center mb-4">
<h2><i class="bi bi-collection"></i> 知识库管理</h2>
+ <button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
+ <i class="bi bi-plus-circle"></i> 创建知识库
+ </button>
</div>
<!-- 知识库列表 -->
<div class="row" id="kbList">
{% if kbs %}
{% for kb in kbs %}
<div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
<div class="card h-100">
<div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
<i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
</div>
<div class="card-body">
<h5 class="card-title">
<i class="bi bi-folder"></i> {{ kb.name }}
</h5>
<p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
</div>
</div>
</div>
{% endfor %}
{% else %}
<div class="col-12">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
</div>
</div>
{% endif %}
</div>
<!-- 分页控件 -->
{% if pagination and pagination.total > pagination.page_size %}
<nav aria-label="知识库列表分页" class="mt-4">
<ul class="pagination justify-content-center">
{% set current_page = pagination.page %}
{% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}
<!-- 上一页 -->
<li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
<i class="bi bi-chevron-left"></i> 上一页
</a>
</li>
<!-- 页码 -->
{% set start_page = [1, current_page - 2] | max %}
{% set end_page = [total_pages, current_page + 2] | min %}
{% if start_page > 1 %}
<li class="page-item">
<a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
</li>
{% if start_page > 2 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endif %}
{% for page_num in range(start_page, end_page + 1) %}
<li class="page-item {% if page_num == current_page %}active{% endif %}">
<a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
{{ page_num }}
</a>
</li>
{% endfor %}
{% if end_page < total_pages %}
{% if end_page < total_pages - 1 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
<li class="page-item">
<a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
</li>
{% endif %}
<!-- 下一页 -->
<li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
下一页 <i class="bi bi-chevron-right"></i>
</a>
</li>
</ul>
<div class="text-center text-muted small mt-2">
共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
</div>
</nav>
{% endif %}
</div>
</div>
+<!-- 创建知识库模态框 -->
+<div class="modal fade" id="createKbModal" tabindex="-1">
+ <div class="modal-dialog">
+ <div class="modal-content">
+ <div class="modal-header">
+ <h5 class="modal-title">创建知识库</h5>
+ <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
+ </div>
+ <form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
+ <div class="modal-body">
+ <div class="mb-3">
+ <label class="form-label">名称 <span class="text-danger">*</span></label>
+ <input type="text" class="form-control" name="name" required>
+ </div>
+ <div class="mb-3">
+ <label class="form-label">描述</label>
+ <textarea class="form-control" name="description" rows="3"></textarea>
+ </div>
+ <div class="row">
+ <div class="col-md-6 mb-3">
+ <label class="form-label">分块大小</label>
+ <input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
+ <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
+ </div>
+ <div class="col-md-6 mb-3">
+ <label class="form-label">分块重叠</label>
+ <input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
+ <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
+ </div>
+ </div>
+ </div>
+ <div class="modal-footer">
+ <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
+ <button type="submit" class="btn btn-primary">创建</button>
+ </div>
+ </form>
+ </div>
+ </div>
+</div>
{% endblock %}
{% block extra_js %}
<script>
+async function createKb(event) {
+ event.preventDefault();
+ const form = event.target;
+ const formData = new FormData(form);
+ try {
+ const response = await fetch('/api/v1/kb', {
+ method: 'POST',
+ body: formData // 使用 FormData,不要设置 Content-Type,让浏览器自动设置
+ });
+ if (response.ok) {
+ location.reload();
+ } else {
+ const error = await response.json();
+ alert('创建失败: ' + error.message);
+ }
+ } catch (error) {
+ alert('创建失败: ' + error.message);
+ }
+}
</script>
{% endblock %}6.删除知识库 #
6.1. knowledgebase.py #
app/blueprints/knowledgebase.py
# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""
# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template
# 导入logging模块
import logging
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
+from app.blueprints.utils import (get_pagination_params,check_ownership)
# 配置logger
logger = logging.getLogger(__name__)
# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)
# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
# 接口用途说明文档字符串
"""创建知识库"""
# 获取当前用户,如未登录则返回错误响应
current_user, err = get_current_user_or_error()
if err:
return err
# 检查请求是否为multipart/form-data(用于文件上传的表单方式)
if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单数据中获取知识库名称
name = request.form.get('name')
# 如果未传入name参数,返回错误
if not name:
return error_response("name is required", 400)
# 获取描述字段,没有则为None
description = request.form.get('description') or None
# 获取分块大小,默认为512
chunk_size = int(request.form.get('chunk_size', 512))
# 获取分块重叠,默认为50
chunk_overlap = int(request.form.get('chunk_overlap', 50))
else:
# 如果是json请求数据(向后兼容旧用法)
data = request.get_json()
# 判断是否存在name字段,不存在则报错
if not data or 'name' not in data:
return error_response("name is required", 400)
# 获取知识库名称
name = data['name']
# 获取描述
description = data.get('description')
# 获取分块大小,默认为512
chunk_size = data.get('chunk_size', 512)
# 获取分块重叠,默认为50
chunk_overlap = data.get('chunk_overlap', 50)
# 调用知识库服务,创建知识库,返回知识库信息字典
kb_dict = kb_service.create(
name=name, # 知识库名称
user_id=current_user['id'], # 用户ID
description=description, # 知识库描述
chunk_size=chunk_size, # 分块大小
chunk_overlap=chunk_overlap, # 分块重叠
)
# 返回成功响应,包含知识库信息
return success_response(kb_dict)
# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
# 设置本函数用途说明(文档字符串)
"""知识库列表页面"""
# 获取当前登录用户信息
current_user = get_current_user()
# 获取分页参数(页码和每页大小),最大每页100
page, page_size = get_pagination_params(max_page_size=100)
# 调用知识库服务,获取分页后的知识库列表结果
result = kb_service.list(
user_id=current_user['id'], # 用户ID
page=page, # 页码
page_size=page_size # 每页大小
)
# 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
return render_template('kb_list.html',
kbs=result['items'],
pagination=result)
+@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
+@api_login_required
+@handle_api_error
+def api_delete(kb_id):
+ """删除知识库"""
+ current_user, err = get_current_user_or_error()
+ if err:
+ return err
+ kb_dict = kb_service.get_by_id(kb_id)
+ if not kb_dict:
+ return error_response("Knowledgebase not found", 404)
# 验证用户是否有权限访问该知识库
+ has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
+ if not has_permission:
+ return err
+ success = kb_service.delete(kb_id)
+ if not success:
+ return error_response("Knowledgebase not found", 404)
+ return success_response()6.2. utils.py #
app/blueprints/utils.py
"""
路由工具函数
"""
# 导入Flask用于返回JSON响应
from flask import jsonify,request
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入类型提示工具
from typing import Tuple, Optional
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)
# 定义成功响应函数
def success_response(data=None, message="success"):
"""
成功响应
Args:
data: 响应数据
message: 响应消息
Returns:
JSON 响应
"""
# 返回标准格式的JSON成功响应
return jsonify({
"code": 200, # 状态码200,表示成功
"message": message, # 响应消息
"data": data # 响应数据
})
# 定义错误响应函数
def error_response(message: str, code: int = 400):
"""
错误响应
Args:
message: 错误消息
code: HTTP 状态码
Returns:
JSON 响应和状态码
"""
# 返回标准格式的JSON错误响应,以及相应的HTTP状态码
return jsonify({
"code": code, # 错误码,对应HTTP状态码
"message": message, # 错误消息
"data": None # 错误时无数据
}), code
# 定义API错误处理装饰器
def handle_api_error(func):
"""
API 错误处理装饰器
使用示例:
@handle_api_error
def my_api():
# API 逻辑
return success_response(data)
"""
# 保留原函数信息并定义包装器
@wraps(func)
def wrapper(*args, **kwargs):
try:
# 正常执行被装饰的API函数
return func(*args, **kwargs)
except ValueError as e:
# 捕获ValueError,日志记录warning信息并返回400错误响应
logger.warning(f"ValueError in {func.__name__}: {e}")
return error_response(str(e), 400)
except Exception as e:
# 捕获其他所有异常,日志记录error信息并返回500错误响应
logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
return error_response(str(e), 500)
# 返回包装后的函数
return wrapper
# 定义获取分页参数的函数,允许指定最大每页数量
def get_pagination_params(max_page_size: int = 1000) -> Tuple[int, int]:
"""
获取分页参数
Args:
max_page_size: 最大每页数量
Returns:
(page, page_size) 元组
"""
# 获取请求中的 'page' 参数,默认为1,并将其转换为整数
page = int(request.args.get('page', 1))
# 获取请求中的 'page_size' 参数,默认为10,并将其转换为整数
page_size = int(request.args.get('page_size', 10))
# 保证 page 至少为1
page = max(1, page)
# 保证 page_size 至少为1且不超过 max_page_size
page_size = max(1, min(page_size, max_page_size))
# 返回分页的(page, page_size)元组
return page, page_size
# 定义获取当前用户或返回错误的函数
def get_current_user_or_error():
"""
获取当前用户,如果未登录则返回错误响应
Returns:
如果成功返回 (user_dict, None),如果失败返回 (None, error_response)
"""
# 调用 get_current_user() 获取当前用户对象
current_user = get_current_user()
# 如果没有获取到用户,则返回 (None, 错误响应)
if not current_user:
return None, error_response("Unauthorized", 401)
# 如果获取到用户,则返回 (用户对象, None)
return current_user, None
# 定义检查资源所有权的函数,判断当前用户是否为资源所有者
+def check_ownership(entity_user_id: str, current_user_id: str,
+ entity_name: str = "资源") -> Tuple[bool, Optional[Tuple]]:
# 检查资源所属用户ID是否与当前用户ID相同
+ if entity_user_id != current_user_id:
# 如果不同,返回False,并返回403未授权的错误响应
+ return False, error_response(f"Unauthorized to access this {entity_name}", 403)
# 如果相同,则有权限,返回True和None
+ return True, None
6.3. base_service.py #
app/services/base_service.py
# 基础服务类
"""
基础服务类
"""
# 导入日志库
import logging
# 导入可选类型、泛型、类型变量和类型别名
from typing import Optional, TypeVar, Generic, Dict, Any
# 导入数据库会话和事务管理工具
from app.utils.db import db_session, db_transaction
# 创建日志记录器
logger = logging.getLogger(__name__)
# 定义泛型的类型变量T
T = TypeVar('T')
# 定义基础服务类,支持泛型
class BaseService(Generic[T]):
# 基础服务类,提供通用的数据库操作方法
# 初始化方法
def __init__(self):
# 初始化服务的日志记录器
self.logger = logging.getLogger(self.__class__.__name__)
# 数据库会话上下文管理器(只读)
def session(self):
"""
数据库会话上下文管理器(只读操作,不自动提交)
使用示例:
with self.session() as db:
result = db.query(Model).all()
# 不需要手动关闭 session
"""
# 返回数据库会话
return db_session()
# 数据库事务上下文管理器(自动提交)
def transaction(self):
"""
数据库事务上下文管理器(自动提交,出错时回滚)
使用示例:
with self.transaction() as db:
obj = Model(...)
db.add(obj)
# 自动提交,出错时自动回滚
"""
# 返回数据库事务
return db_transaction()
def get_by_id(self, model_class: type, entity_id: str) -> Optional[T]:
"""
根据ID获取实体(通用方法)
Args:
model_class: 模型类
entity_id: 实体ID
Returns:
实体对象,如果不存在则返回 None
"""
with self.session() as session:
try:
return session.query(model_class).filter(model_class.id == entity_id).first()
except Exception as e:
self.logger.error(f"Error getting {model_class.__name__} by id {entity_id}: {e}")
return None
6.4. knowledgebase_service.py #
app/services/knowledgebase_service.py
# 知识库服务
"""
知识库服务
"""
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService
# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
"""知识库服务"""
# 定义创建知识库的方法
def create(self, name: str, user_id: str, description: str = None,
chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
"""
创建知识库
Args:
name: 知识库名称
user_id: 用户ID
description: 描述
chunk_size: 分块大小
chunk_overlap: 分块重叠
Returns:
创建的知识库字典
"""
# 启动数据库事务,上下文管理器自动处理提交或回滚
with self.transaction() as session:
# 先创建知识库对象
kb = Knowledgebase(
name=name, # 设置知识库名称
user_id=user_id, # 设置用户ID
description=description, # 设置知识库描述
chunk_size=chunk_size, # 设置分块大小
chunk_overlap=chunk_overlap # 设置分块重叠
)
# 将知识库对象添加到session
session.add(kb)
# 刷新session,生成知识库ID
session.flush() # 刷新以获取 ID,但不提交
# 刷新kb对象的数据库状态
session.refresh(kb)
# 转换kb对象为字典(在session内部,避免分离后出错)
kb_dict = kb.to_dict()
# 记录创建知识库的日志,包含ID
self.logger.info(f"创建了知识库,ID: {kb.id}")
# 返回知识库字典信息
return kb_dict
# 定义获取知识库列表的方法
def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
"""
获取知识库列表
Args:
user_id: 用户ID(可选)
page: 页码
page_size: 每页数量
Returns:
包含 items, total, page, page_size 的字典
"""
# 使用数据库会话
with self.session() as session:
# 查询Knowledgebase表
query = session.query(Knowledgebase)
# 如果指定了user_id,则筛选属于该用户的知识库
if user_id:
query = query.filter(Knowledgebase.user_id == user_id)
# 统计总记录数
total = query.count()
# 计算分页偏移量
offset = (page - 1) * page_size
# 获取当前页的数据列表
kbs = query.offset(offset).limit(page_size).all()
# 初始化知识库字典列表
items = []
# 遍历查询结果,将每一项转为dict后添加到items列表
for kb in kbs:
kb_dict = kb.to_dict()
items.append(kb_dict)
# 返回包含分页信息和数据条目的字典
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size
+ }
+ def delete(self, kb_id: str) -> bool:
+ """
+ 删除知识库
+ Args:
+ kb_id: 知识库ID
+ Returns:
+ 是否删除成功
+ """
+ with self.transaction() as session:
+ kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
+ if not kb:
+ return False
+ session.delete(kb)
+ self.logger.info(f"Deleted knowledgebase: {kb_id}")
+ return True
+ def get_by_id(self, kb_id: str) -> Optional[dict]:
+ """根据ID获取知识库"""
+ with self.session() as session:
+ kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
+ if kb:
# 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
+ return kb.to_dict()
+ return None
# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()
6.5. kb_list.html #
app/templates/kb_list.html
{% extends "base.html" %}
{% block title %}知识库管理 - RAG Lite{% endblock %}
{% block content %}
<style>
@media (min-width: 992px) {
#kbList > div {
flex: 0 0 20%;
max-width: 20%;
}
}
</style>
<div class="row">
<div class="col-12">
<nav aria-label="breadcrumb" class="mb-3">
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href="/">首页</a></li>
<li class="breadcrumb-item active">知识库管理</li>
</ol>
</nav>
<div class="d-flex justify-content-between align-items-center mb-4">
<h2><i class="bi bi-collection"></i> 知识库管理</h2>
<button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
<i class="bi bi-plus-circle"></i> 创建知识库
</button>
</div>
<!-- 知识库列表 -->
<div class="row" id="kbList">
{% if kbs %}
{% for kb in kbs %}
<div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
<div class="card h-100">
<div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
<i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
</div>
<div class="card-body">
<h5 class="card-title">
<i class="bi bi-folder"></i> {{ kb.name }}
</h5>
<p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
</div>
+ <div class="card-footer bg-transparent">
+ <button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
+ <i class="bi bi-trash"></i> 删除
+ </button>
+ </div>
</div>
</div>
{% endfor %}
{% else %}
<div class="col-12">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
</div>
</div>
{% endif %}
</div>
<!-- 分页控件 -->
{% if pagination and pagination.total > pagination.page_size %}
<nav aria-label="知识库列表分页" class="mt-4">
<ul class="pagination justify-content-center">
{% set current_page = pagination.page %}
{% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}
<!-- 上一页 -->
<li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
<i class="bi bi-chevron-left"></i> 上一页
</a>
</li>
<!-- 页码 -->
{% set start_page = [1, current_page - 2] | max %}
{% set end_page = [total_pages, current_page + 2] | min %}
{% if start_page > 1 %}
<li class="page-item">
<a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
</li>
{% if start_page > 2 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endif %}
{% for page_num in range(start_page, end_page + 1) %}
<li class="page-item {% if page_num == current_page %}active{% endif %}">
<a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
{{ page_num }}
</a>
</li>
{% endfor %}
{% if end_page < total_pages %}
{% if end_page < total_pages - 1 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
<li class="page-item">
<a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
</li>
{% endif %}
<!-- 下一页 -->
<li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
下一页 <i class="bi bi-chevron-right"></i>
</a>
</li>
</ul>
<div class="text-center text-muted small mt-2">
共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
</div>
</nav>
{% endif %}
</div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">创建知识库</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
<div class="modal-body">
<div class="mb-3">
<label class="form-label">名称 <span class="text-danger">*</span></label>
<input type="text" class="form-control" name="name" required>
</div>
<div class="mb-3">
<label class="form-label">描述</label>
<textarea class="form-control" name="description" rows="3"></textarea>
</div>
<div class="row">
<div class="col-md-6 mb-3">
<label class="form-label">分块大小</label>
<input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
<div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
</div>
<div class="col-md-6 mb-3">
<label class="form-label">分块重叠</label>
<input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
<div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="submit" class="btn btn-primary">创建</button>
</div>
</form>
</div>
</div>
</div>
{% endblock %}
{% block extra_js %}
<script>
async function createKb(event) {
event.preventDefault();
const form = event.target;
const formData = new FormData(form);
try {
const response = await fetch('/api/v1/kb', {
method: 'POST',
body: formData // 使用 FormData,不要设置 Content-Type,让浏览器自动设置
});
if (response.ok) {
location.reload();
} else {
const error = await response.json();
alert('创建失败: ' + error.message);
}
} catch (error) {
alert('创建失败: ' + error.message);
}
}
+async function deleteKb(kbId, kbName) {
+ if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
+ return;
+ }
+ try {
+ const response = await fetch(`/api/v1/kb/${kbId}`, {
+ method: 'DELETE'
+ });
+ if (response.ok) {
+ location.reload();
+ } else {
+ const error = await response.json();
+ alert('删除失败: ' + error.message);
+ }
+ } catch (error) {
+ alert('删除失败: ' + error.message);
+ }
+}
</script>
{% endblock %}
7.更新知识库 #
7.1. knowledgebase.py #
app/blueprints/knowledgebase.py
# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""
# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template
# 导入logging模块
import logging
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params,check_ownership)
# 配置logger
logger = logging.getLogger(__name__)
# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)
# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
# 接口用途说明文档字符串
"""创建知识库"""
# 获取当前用户,如未登录则返回错误响应
current_user, err = get_current_user_or_error()
if err:
return err
# 检查请求是否为multipart/form-data(用于文件上传的表单方式)
if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单数据中获取知识库名称
name = request.form.get('name')
# 如果未传入name参数,返回错误
if not name:
return error_response("name is required", 400)
# 获取描述字段,没有则为None
description = request.form.get('description') or None
# 获取分块大小,默认为512
chunk_size = int(request.form.get('chunk_size', 512))
# 获取分块重叠,默认为50
chunk_overlap = int(request.form.get('chunk_overlap', 50))
else:
# 如果是json请求数据(向后兼容旧用法)
data = request.get_json()
# 判断是否存在name字段,不存在则报错
if not data or 'name' not in data:
return error_response("name is required", 400)
# 获取知识库名称
name = data['name']
# 获取描述
description = data.get('description')
# 获取分块大小,默认为512
chunk_size = data.get('chunk_size', 512)
# 获取分块重叠,默认为50
chunk_overlap = data.get('chunk_overlap', 50)
# 调用知识库服务,创建知识库,返回知识库信息字典
kb_dict = kb_service.create(
name=name, # 知识库名称
user_id=current_user['id'], # 用户ID
description=description, # 知识库描述
chunk_size=chunk_size, # 分块大小
chunk_overlap=chunk_overlap, # 分块重叠
)
# 返回成功响应,包含知识库信息
return success_response(kb_dict)
# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
# 设置本函数用途说明(文档字符串)
"""知识库列表页面"""
# 获取当前登录用户信息
current_user = get_current_user()
# 获取分页参数(页码和每页大小),最大每页100
page, page_size = get_pagination_params(max_page_size=100)
# 调用知识库服务,获取分页后的知识库列表结果
result = kb_service.list(
user_id=current_user['id'], # 用户ID
page=page, # 页码
page_size=page_size # 每页大小
)
# 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
return render_template('kb_list.html',
kbs=result['items'],
pagination=result)
# 注册DELETE方法的API路由,用于删除知识库
@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
# 要求API登录
@api_login_required
# 处理API错误的装饰器
@handle_api_error
def api_delete(kb_id):
"""删除知识库"""
# 获取当前用户信息,如果未登录则返回错误
current_user, err = get_current_user_or_error()
if err:
return err
# 根据知识库ID获取知识库信息
kb_dict = kb_service.get_by_id(kb_id)
# 如果知识库不存在,返回404错误
if not kb_dict:
+ return error_response("未找到知识库", 404)
# 验证当前用户是否拥有该知识库的操作权限
has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
if not has_permission:
return err
# 调用服务删除知识库
success = kb_service.delete(kb_id)
# 如果删除失败,返回404错误
if not success:
+ return error_response("未找到知识库", 404)
# 返回删除成功的响应
+ return success_response("知识库删除成功")
# 注册PUT方法的API路由,用于更新知识库
+@bp.route('/api/v1/kb/<kb_id>', methods=['PUT'])
# 要求API登录
+@api_login_required
# 处理API错误的装饰器
+@handle_api_error
+def api_update(kb_id):
+ """更新知识库(支持封面图片更新)"""
# 获取当前用户信息,如果未登录则返回错误
+ current_user, err = get_current_user_or_error()
+ if err:
+ return err
# 获取并校验知识库是否存在
+ kb_dict = kb_service.get_by_id(kb_id)
+ if not kb_dict:
+ return error_response("未找到知识库", 404)
# 检查是否拥有操作该知识库的权限
+ has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
+ if not has_permission:
+ return err
# 检查请求内容类型是否为multipart/form-data(用于文件上传)
+ if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单数据获取字段
+ name = request.form.get('name')
+ description = request.form.get('description') or None
+ chunk_size = request.form.get('chunk_size')
+ chunk_overlap = request.form.get('chunk_overlap')
# 组装待更新的数据字典
+ update_data = {}
+ if name:
+ update_data['name'] = name
+ if description is not None:
+ update_data['description'] = description
+ if chunk_size:
+ update_data['chunk_size'] = int(chunk_size)
+ if chunk_overlap:
+ update_data['chunk_overlap'] = int(chunk_overlap)
+ else:
# 如果不是form-data,则按JSON方式解析提交内容
+ data = request.get_json()
# 请求体如果为空,返回400错误
+ if not data:
+ return error_response("请求体不能为空", 400)
# 组装待更新的数据字典
+ update_data = {}
+ if 'name' in data:
+ update_data['name'] = data['name']
+ if 'description' in data:
+ update_data['description'] = data.get('description')
+ if 'chunk_size' in data:
+ update_data['chunk_size'] = data['chunk_size']
+ if 'chunk_overlap' in data:
+ update_data['chunk_overlap'] = data['chunk_overlap']
# 调用服务进行知识库更新
+ updated_kb = kb_service.update(
+ kb_id=kb_id,
+ **update_data
+ )
# 如果未找到知识库,返回404
+ if not updated_kb:
+ return error_response("未找到知识库", 404)
# 返回更新成功的响应及最新知识库数据
+ return success_response(updated_kb, "知识库更新成功")
7.2. knowledgebase_service.py #
app/services/knowledgebase_service.py
# 知识库服务
"""
知识库服务
"""
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService
# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
"""知识库服务"""
# 定义创建知识库的方法
def create(self, name: str, user_id: str, description: str = None,
chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
"""
创建知识库
Args:
name: 知识库名称
user_id: 用户ID
description: 描述
chunk_size: 分块大小
chunk_overlap: 分块重叠
Returns:
创建的知识库字典
"""
# 启动数据库事务,上下文管理器自动处理提交或回滚
with self.transaction() as session:
# 先创建知识库对象
kb = Knowledgebase(
name=name, # 设置知识库名称
user_id=user_id, # 设置用户ID
description=description, # 设置知识库描述
chunk_size=chunk_size, # 设置分块大小
chunk_overlap=chunk_overlap # 设置分块重叠
)
# 将知识库对象添加到session
session.add(kb)
# 刷新session,生成知识库ID
session.flush() # 刷新以获取 ID,但不提交
# 刷新kb对象的数据库状态
session.refresh(kb)
# 转换kb对象为字典(在session内部,避免分离后出错)
kb_dict = kb.to_dict()
# 记录创建知识库的日志,包含ID
self.logger.info(f"创建了知识库,ID: {kb.id}")
# 返回知识库字典信息
return kb_dict
# 定义获取知识库列表的方法
def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
"""
获取知识库列表
Args:
user_id: 用户ID(可选)
page: 页码
page_size: 每页数量
Returns:
包含 items, total, page, page_size 的字典
"""
# 使用数据库会话
with self.session() as session:
# 查询Knowledgebase表
query = session.query(Knowledgebase)
# 如果指定了user_id,则筛选属于该用户的知识库
if user_id:
query = query.filter(Knowledgebase.user_id == user_id)
# 统计总记录数
total = query.count()
# 计算分页偏移量
offset = (page - 1) * page_size
# 获取当前页的数据列表
kbs = query.offset(offset).limit(page_size).all()
# 初始化知识库字典列表
items = []
# 遍历查询结果,将每一项转为dict后添加到items列表
for kb in kbs:
kb_dict = kb.to_dict()
items.append(kb_dict)
# 返回包含分页信息和数据条目的字典
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size
}
def delete(self, kb_id: str) -> bool:
"""
删除知识库
Args:
kb_id: 知识库ID
Returns:
是否删除成功
"""
with self.transaction() as session:
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
if not kb:
return False
session.delete(kb)
self.logger.info(f"Deleted knowledgebase: {kb_id}")
return True
def get_by_id(self, kb_id: str) -> Optional[dict]:
"""根据ID获取知识库"""
with self.session() as session:
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
if kb:
# 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
return kb.to_dict()
+ return None
# 定义 update 方法,用于更新知识库
+ def update(self, kb_id: str, **kwargs) -> Optional[dict]:
+ """
+ 更新知识库
+ Args:
+ kb_id: 知识库ID
+ cover_image_data: 新的封面图片数据(可选)
+ cover_image_filename: 新的封面图片文件名(可选)
+ delete_cover: 是否删除封面图片(可选)
+ **kwargs: 要更新的字段(name, description, chunk_size, chunk_overlap 等)
+ Returns:
+ 更新后的知识库字典,如果不存在则返回 None
+ """
# 开启数据库事务
+ with self.transaction() as session:
# 查询指定ID的知识库对象
+ kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
# 如果未找到知识库,则返回 None
+ if not kb:
+ return None
# 遍历要更新的字段和值
+ for key, value in kwargs.items():
# 判断知识库对象是否有该字段,且值不为 None
+ if hasattr(kb, key) and value is not None:
# 设置该字段的新值
+ setattr(kb, key, value)
# 刷新session,保证对象属性为最新状态
+ session.flush()
# 刷新对象,避免未提交前读取到旧数据
+ session.refresh(kb)
# 在事务内部将对象转为字典,避免 session 关闭后访问失败
+ kb_dict = kb.to_dict()
# 如果本次更新包含 'cover_image' 字段,记录详细日志
+ if 'cover_image' in kwargs:
+ self.logger.info(f"更新知识库 {kb_id}, 封面图片={kb_dict.get('cover_image')}")
+ else:
# 否则仅记录知识库ID
+ self.logger.info(f"更新知识库: {kb_id}")
# 返回更新后的知识库字典
+ return kb_dict
# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()
7.3. kb_list.html #
app/templates/kb_list.html
{% extends "base.html" %}
{% block title %}知识库管理 - RAG Lite{% endblock %}
{% block content %}
<style>
@media (min-width: 992px) {
#kbList > div {
flex: 0 0 20%;
max-width: 20%;
}
}
</style>
<div class="row">
<div class="col-12">
<nav aria-label="breadcrumb" class="mb-3">
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href="/">首页</a></li>
<li class="breadcrumb-item active">知识库管理</li>
</ol>
</nav>
<div class="d-flex justify-content-between align-items-center mb-4">
<h2><i class="bi bi-collection"></i> 知识库管理</h2>
<button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
<i class="bi bi-plus-circle"></i> 创建知识库
</button>
</div>
<!-- 知识库列表 -->
<div class="row" id="kbList">
{% if kbs %}
{% for kb in kbs %}
<div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
<div class="card h-100">
<div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
<i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
</div>
<div class="card-body">
<h5 class="card-title">
<i class="bi bi-folder"></i> {{ kb.name }}
</h5>
<p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
</div>
<div class="card-footer bg-transparent">
+ <button class="btn btn-sm btn-warning"
+ data-kb-id="{{ kb.id }}"
+ data-kb-name="{{ kb.name }}"
+ data-kb-description="{{ kb.description or '' }}"
+ data-kb-chunk-size="{{ kb.chunk_size }}"
+ data-kb-chunk-overlap="{{ kb.chunk_overlap }}"
+ data-kb-cover-image="{{ kb.cover_image or '' }}"
+ onclick="editKbFromButton(this)">
+ <i class="bi bi-pencil"></i> 编辑
+ </button>
<button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
<i class="bi bi-trash"></i> 删除
</button>
</div>
</div>
</div>
{% endfor %}
{% else %}
<div class="col-12">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
</div>
</div>
{% endif %}
</div>
<!-- 分页控件 -->
{% if pagination and pagination.total > pagination.page_size %}
<nav aria-label="知识库列表分页" class="mt-4">
<ul class="pagination justify-content-center">
{% set current_page = pagination.page %}
{% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}
<!-- 上一页 -->
<li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
<i class="bi bi-chevron-left"></i> 上一页
</a>
</li>
<!-- 页码 -->
{% set start_page = [1, current_page - 2] | max %}
{% set end_page = [total_pages, current_page + 2] | min %}
{% if start_page > 1 %}
<li class="page-item">
<a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
</li>
{% if start_page > 2 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endif %}
{% for page_num in range(start_page, end_page + 1) %}
<li class="page-item {% if page_num == current_page %}active{% endif %}">
<a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
{{ page_num }}
</a>
</li>
{% endfor %}
{% if end_page < total_pages %}
{% if end_page < total_pages - 1 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
<li class="page-item">
<a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
</li>
{% endif %}
<!-- 下一页 -->
<li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
下一页 <i class="bi bi-chevron-right"></i>
</a>
</li>
</ul>
<div class="text-center text-muted small mt-2">
共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
</div>
</nav>
{% endif %}
</div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">创建知识库</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
<div class="modal-body">
<div class="mb-3">
<label class="form-label">名称 <span class="text-danger">*</span></label>
<input type="text" class="form-control" name="name" required>
</div>
<div class="mb-3">
<label class="form-label">描述</label>
<textarea class="form-control" name="description" rows="3"></textarea>
</div>
<div class="row">
<div class="col-md-6 mb-3">
<label class="form-label">分块大小</label>
<input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
<div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
</div>
<div class="col-md-6 mb-3">
<label class="form-label">分块重叠</label>
<input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
<div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="submit" class="btn btn-primary">创建</button>
</div>
</form>
</div>
</div>
</div>
+<!-- 编辑知识库模态框 -->
+<div class="modal fade" id="editKbModal" tabindex="-1">
+ <div class="modal-dialog">
+ <div class="modal-content">
+ <div class="modal-header">
+ <h5 class="modal-title">编辑知识库</h5>
+ <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
+ </div>
+ <form id="editKbForm" onsubmit="updateKb(event)" enctype="multipart/form-data">
+ <input type="hidden" name="kb_id" id="editKbId">
+ <div class="modal-body">
+ <div class="mb-3">
+ <label class="form-label">名称 <span class="text-danger">*</span></label>
+ <input type="text" class="form-control" name="name" id="editKbName" required>
+ </div>
+ <div class="mb-3">
+ <label class="form-label">描述</label>
+ <textarea class="form-control" name="description" id="editKbDescription" rows="3"></textarea>
+ </div>
+ <div class="row">
+ <div class="col-md-6 mb-3">
+ <label class="form-label">分块大小</label>
+ <input type="number" class="form-control" name="chunk_size" id="editKbChunkSize" value="512" min="100" max="2000">
+ <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
+ </div>
+ <div class="col-md-6 mb-3">
+ <label class="form-label">分块重叠</label>
+ <input type="number" class="form-control" name="chunk_overlap" id="editKbChunkOverlap" value="50" min="0" max="200">
+ <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
+ </div>
+ </div>
+ </div>
+ <div class="modal-footer">
+ <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
+ <button type="submit" class="btn btn-primary">保存</button>
+ </div>
+ </form>
+ </div>
+ </div>
+</div>
{% endblock %}
{% block extra_js %}
<script>
async function createKb(event) {
event.preventDefault();
const form = event.target;
const formData = new FormData(form);
try {
const response = await fetch('/api/v1/kb', {
method: 'POST',
body: formData // 使用 FormData,不要设置 Content-Type,让浏览器自动设置
});
if (response.ok) {
location.reload();
} else {
const error = await response.json();
alert('创建失败: ' + error.message);
}
} catch (error) {
alert('创建失败: ' + error.message);
}
}
async function deleteKb(kbId, kbName) {
if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
return;
}
try {
const response = await fetch(`/api/v1/kb/${kbId}`, {
method: 'DELETE'
});
if (response.ok) {
location.reload();
} else {
const error = await response.json();
alert('删除失败: ' + error.message);
}
} catch (error) {
alert('删除失败: ' + error.message);
}
}
+// 从按钮的 data 属性读取数据并编辑知识库
+function editKbFromButton(button) {
+ const kbId = button.getAttribute('data-kb-id');
+ const name = button.getAttribute('data-kb-name');
+ const description = button.getAttribute('data-kb-description') || '';
+ const chunkSize = parseInt(button.getAttribute('data-kb-chunk-size')) || 512;
+ const chunkOverlap = parseInt(button.getAttribute('data-kb-chunk-overlap')) || 50;
+ editKb(kbId, name, description, chunkSize, chunkOverlap);
+}
+// 编辑知识库
+function editKb(kbId, name, description, chunkSize, chunkOverlap) {
+ // 填充表单数据
+ document.getElementById('editKbId').value = kbId;
+ document.getElementById('editKbName').value = name;
+ document.getElementById('editKbDescription').value = description || '';
+ document.getElementById('editKbChunkSize').value = chunkSize;
+ document.getElementById('editKbChunkOverlap').value = chunkOverlap;
+ // 显示模态框
+ const modal = new bootstrap.Modal(document.getElementById('editKbModal'));
+ modal.show();
+}
+async function updateKb(event) {
+ event.preventDefault();
+ const form = event.target;
+ const formData = new FormData(form);
+ const kbId = formData.get('kb_id');
+ try {
+ const response = await fetch(`/api/v1/kb/${kbId}`, {
+ method: 'PUT',
+ body: formData
+ });
+ if (response.ok) {
+ location.reload();
+ } else {
+ const error = await response.json();
+ alert('更新失败: ' + error.message);
+ }
+ } catch (error) {
+ alert('更新失败: ' + error.message);
+ }
+}
</script>
{% endblock %}
8.知识库封面 #
8.1. storage_service.py #
app/services/storage_service.py
# 从 app.services.storage.factory 导入 StorageFactory 类
from app.services.storage.factory import StorageFactory
# 获取 StorageFactory 的单例实例,并赋值给 storage_service
storage_service = StorageFactory.get_instance()8.2. knowledgebase.py #
app/blueprints/knowledgebase.py
# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""
# 导入Flask中的Blueprint和request
+from flask import Blueprint,request,render_template,send_file,abort
# 使用BytesIO将图片数据包装为文件流
+from io import BytesIO
# 导入logging模块
import logging
# 导入mimetypes、os模块用于类型判断
+import mimetypes
# 导入os模块用于路径操作
+import os
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params,check_ownership)
# 导入存储服务
+from app.services.storage_service import storage_service
# 配置logger
logger = logging.getLogger(__name__)
# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)
# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
# 接口用途说明文档字符串
"""创建知识库"""
# 获取当前用户,如未登录则返回错误响应
current_user, err = get_current_user_or_error()
if err:
return err
# 检查请求是否为multipart/form-data(用于文件上传的表单方式)
if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单数据中获取知识库名称
name = request.form.get('name')
# 如果未传入name参数,返回错误
if not name:
return error_response("name is required", 400)
# 获取描述字段,没有则为None
description = request.form.get('description') or None
# 获取分块大小,默认为512
chunk_size = int(request.form.get('chunk_size', 512))
# 获取分块重叠,默认为50
chunk_overlap = int(request.form.get('chunk_overlap', 50))
# 设置封面图片数据变量初值为None
+ cover_image_data = None
# 设置封面图片文件名变量初值为None
+ cover_image_filename = None
# 判断请求中是否包含'cover_image'文件
+ if 'cover_image' in request.files:
# 获取上传的封面图片文件对象
+ cover_file = request.files['cover_image']
# 如果上传的文件存在且有文件名
+ if cover_file and cover_file.filename:
# 读取文件内容为二进制数据
+ cover_image_data = cover_file.read()
# 获取上传文件的文件名
+ cover_image_filename = cover_file.filename
# 记录封面图片上传的信息到日志,包括文件名、字节大小和内容类型
+ logger.info(f"收到新知识库的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")
else:
# 如果是json请求数据(向后兼容旧用法)
data = request.get_json()
# 判断是否存在name字段,不存在则报错
if not data or 'name' not in data:
return error_response("name is required", 400)
# 获取知识库名称
name = data['name']
# 获取描述
description = data.get('description')
# 获取分块大小,默认为512
chunk_size = data.get('chunk_size', 512)
# 获取分块重叠,默认为50
chunk_overlap = data.get('chunk_overlap', 50)
# 设置封面图片数据变量初值为None
+ cover_image_data = None
# 设置封面图片文件名变量初值为None
+ cover_image_filename = None
# 调用知识库服务,创建知识库,返回知识库信息字典
kb_dict = kb_service.create(
name=name, # 知识库名称
user_id=current_user['id'], # 用户ID
description=description, # 知识库描述
chunk_size=chunk_size, # 分块大小
chunk_overlap=chunk_overlap, # 分块重叠
+ cover_image_data=cover_image_data, # 封面图片数据
+ cover_image_filename=cover_image_filename # 封面图片文件名
)
# 返回成功响应,包含知识库信息
return success_response(kb_dict)
# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
# 设置本函数用途说明(文档字符串)
"""知识库列表页面"""
# 获取当前登录用户信息
current_user = get_current_user()
# 获取分页参数(页码和每页大小),最大每页100
page, page_size = get_pagination_params(max_page_size=100)
# 调用知识库服务,获取分页后的知识库列表结果
result = kb_service.list(
user_id=current_user['id'], # 用户ID
page=page, # 页码
page_size=page_size # 每页大小
)
# 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
return render_template('kb_list.html',
kbs=result['items'],
pagination=result)
# 注册DELETE方法的API路由,用于删除知识库
@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
# 要求API登录
@api_login_required
# 处理API错误的装饰器
@handle_api_error
def api_delete(kb_id):
"""删除知识库"""
# 获取当前用户信息,如果未登录则返回错误
current_user, err = get_current_user_or_error()
if err:
return err
# 根据知识库ID获取知识库信息
kb_dict = kb_service.get_by_id(kb_id)
# 如果知识库不存在,返回404错误
if not kb_dict:
return error_response("未找到知识库", 404)
# 验证当前用户是否拥有该知识库的操作权限
has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
if not has_permission:
return err
# 调用服务删除知识库
success = kb_service.delete(kb_id)
# 如果删除失败,返回404错误
if not success:
return error_response("未找到知识库", 404)
# 返回删除成功的响应
return success_response("知识库删除成功")
# 注册PUT方法的API路由(用于更新知识库)
@bp.route('/api/v1/kb/<kb_id>', methods=['PUT'])
# 要求API登录
@api_login_required
# 捕获API内部错误的装饰器
@handle_api_error
def api_update(kb_id):
# 定义API用于更新知识库(含封面图片)
"""更新知识库(支持封面图片更新)"""
# 获取当前登录用户信息,如果未登录则返回错误响应
current_user, err = get_current_user_or_error()
if err:
return err
# 获取指定ID的知识库记录,验证其是否存在
kb_dict = kb_service.get_by_id(kb_id)
if not kb_dict:
return error_response("未找到知识库", 404)
# 校验当前用户是否有操作该知识库的权限
has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
if not has_permission:
return err
# 判断请求内容类型是否为multipart/form-data(一般用于带文件上传的表单提交)
if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单中获取普通字段
name = request.form.get('name')
description = request.form.get('description') or None
chunk_size = request.form.get('chunk_size')
chunk_overlap = request.form.get('chunk_overlap')
# 初始化封面图片相关变量
+ cover_image_data = None
+ cover_image_filename = None
# 获得delete_cover字段(类型字符串,需判断是否为'true')
+ delete_cover = request.form.get('delete_cover') == 'true'
# 如果有上传封面图片,则读取文件内容
+ if 'cover_image' in request.files:
+ cover_file = request.files['cover_image']
+ if cover_file and cover_file.filename:
+ cover_image_data = cover_file.read()
+ cover_image_filename = cover_file.filename
# 记录上传日志
+ logger.info(f"收到知识库 {kb_id} 的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")
# 构建待更新的数据
update_data = {}
if name:
update_data['name'] = name
if description is not None:
update_data['description'] = description
if chunk_size:
update_data['chunk_size'] = int(chunk_size)
if chunk_overlap:
update_data['chunk_overlap'] = int(chunk_overlap)
else:
# 非表单上传,则按JSON结构解析请求内容
data = request.get_json()
# 如果请求体是空的,直接返回错误
if not data:
return error_response("请求体不能为空", 400)
# 构建可更新的数据字典
update_data = {}
if 'name' in data:
update_data['name'] = data['name']
if 'description' in data:
update_data['description'] = data.get('description')
if 'chunk_size' in data:
update_data['chunk_size'] = data['chunk_size']
if 'chunk_overlap' in data:
update_data['chunk_overlap'] = data['chunk_overlap']
# JSON请求时,cover_image相关变量置空
+ cover_image_data = None
+ cover_image_filename = None
+ delete_cover = data.get('delete_cover', False)
# 调用服务更新知识库,传入各字段及封面参数
updated_kb = kb_service.update(
+ kb_id=kb_id, # 知识库ID
+ cover_image_data=cover_image_data, # 封面图片的二进制内容
+ cover_image_filename=cover_image_filename, # 封面图片文件名
+ delete_cover=delete_cover, # 是否删除封面图片
+ **update_data # 其它可变字段
)
# 更新后如果找不到,返回404
if not updated_kb:
return error_response("未找到知识库", 404)
# 更新成功后,将最新的知识库数据返回给前端
return success_response(updated_kb, "知识库更新成功")
# 定义路由,获取指定知识库ID的封面图片,仅限登录用户访问
+@bp.route('/kb/<kb_id>/cover')
+@login_required
+def kb_cover(kb_id):
+ """获取知识库封面图片"""
# 获取当前已登录用户的信息
+ current_user = get_current_user()
# 根据知识库ID从知识库服务获取对应的知识库信息
+ kb = kb_service.get_by_id(kb_id)
# 检查知识库是否存在
+ if not kb:
# 如果知识库不存在,记录警告日志
+ logger.warning(f"知识库不存在: {kb_id}")
+ abort(404)
# 检查是否有权限访问(只能查看自己的知识库封面)
+ if kb.get('user_id') != current_user['id']:
# 如果不是当前用户的知识库,记录警告日志
+ logger.warning(f"用户 {current_user['id']} 尝试访问知识库 {kb_id} 的封面,但该知识库属于用户 {kb.get('user_id')}")
+ abort(403)
# 获取知识库的封面图片路径
+ cover_path = kb.get('cover_image')
# 检查是否有封面图片
+ if not cover_path:
# 如果没有封面,记录调试日志
+ logger.debug(f"知识库 {kb_id} 没有封面图片")
+ abort(404)
+ try:
# 通过存储服务下载封面图片数据
+ image_data = storage_service.download_file(cover_path)
# 如果未能获取到图片数据,记录错误日志并返回404
+ if not image_data:
+ logger.error(f"从路径下载封面图片失败: {cover_path}")
+ abort(404)
# 根据文件扩展名判断图片MIME类型
+ file_ext = os.path.splitext(cover_path)[1].lower()
# 自定义映射,优先根据文件扩展名判断图片MIME类型
+ mime_type_map = {
+ '.jpg': 'image/jpeg',
+ '.jpeg': 'image/jpeg',
+ '.png': 'image/png',
+ '.gif': 'image/gif',
+ '.webp': 'image/webp'
+ }
# 优先根据自定义映射获取MIME类型
+ mime_type = mime_type_map.get(file_ext)
+ if not mime_type:
# 如果没有命中自定义映射,则使用mimetypes猜测类型
+ mime_type, _ = mimetypes.guess_type(cover_path)
+ if not mime_type:
# 如果还未识别出类型,则默认用JPEG
+ mime_type = 'image/jpeg'
# 通过send_file响应图片数据和MIME类型,不以附件形式发送
+ return send_file(
+ BytesIO(image_data),#图片数据
+ mimetype=mime_type,#MIME类型
+ as_attachment=False#不以附件形式发送
+ )
+ except FileNotFoundError as e:
# 捕获文件未找到异常,记录错误日志
+ logger.error(f"封面图片文件未找到: {cover_path}, 错误: {e}")
+ abort(404)
+ except Exception as e:
# 捕获其他未预期异常,记录错误日志(包含堆栈信息)
+ logger.error(f"提供知识库 {kb_id} 的封面图片时出错, 路径: {cover_path}, 错误: {e}", exc_info=True)
+ abort(404)8.3. config.py #
app/config.py
"""
配置管理模块
"""
# 导入操作系统相关模块
import os
# 导入 Path,处理路径
from pathlib import Path
# 导入 dotenv,用于加载 .env 文件中的环境变量
from dotenv import load_dotenv
# 加载 .env 文件中的环境变量到系统环境变量
load_dotenv()
# 定义应用配置类
class Config:
"""应用配置类"""
# 基础配置
# 项目根目录路径(取上级目录)
BASE_DIR = Path(__file__).parent.parent
# 加载环境变量 SECRET_KEY,若未设置则使用默认开发密钥
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
# 应用配置
# 读取应用监听的主机地址,默认为本地所有地址
APP_HOST = os.environ.get('APP_HOST', '0.0.0.0')
# 读取应用监听的端口,默认为 5000,类型为 int
APP_PORT = int(os.environ.get('APP_PORT', 5000))
# 读取 debug 模式配置,字符串转小写等于 'true' 则为 True(开启调试)
APP_DEBUG = os.environ.get('APP_DEBUG', 'false').lower() == 'true'
# 读取允许上传的最大文件大小,默认为 100MB,类型为 int
MAX_FILE_SIZE = int(os.environ.get('MAX_FILE_SIZE', 104857600)) # 100MB
# 允许上传的文件扩展名集合
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt', 'md'}
# 允许上传的图片扩展名集合
+ ALLOWED_IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
# 允许上传的图片最大大小,默认为 5MB,类型为 int
+ MAX_IMAGE_SIZE = int(os.environ.get('MAX_IMAGE_SIZE', 5242880)) # 5MB
# 日志配置
# 日志目录,默认 './logs'
LOG_DIR = os.environ.get('LOG_DIR', './logs')
# 日志文件名,默认 'rag_lite.log'
LOG_FILE = os.environ.get('LOG_FILE', 'rag_lite.log')
# 日志等级,默认 'INFO'
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
# 是否启用控制台日志,默认 True
LOG_ENABLE_CONSOLE = os.environ.get('LOG_ENABLE_CONSOLE', 'true').lower() == 'true'
# 是否启用文件日志,默认 True
LOG_ENABLE_FILE = os.environ.get('LOG_ENABLE_FILE', 'true').lower() == 'true'
# 数据库配置
# 数据库主机地址,默认为 'localhost'
DB_HOST = os.environ.get('DB_HOST', 'localhost')
# 数据库端口号,默认为 3306
DB_PORT = int(os.environ.get('DB_PORT', 3306))
# 数据库用户名,默认为 'root'
DB_USER = os.environ.get('DB_USER', 'root')
# 数据库密码,默认为 'root'
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'root')
# 数据库名称,默认为 'rag-lite'
DB_NAME = os.environ.get('DB_NAME', 'rag-lite')
# 数据库字符集,默认为 'utf8mb4'
DB_CHARSET = os.environ.get('DB_CHARSET', 'utf8mb4')
# 存储配置
+ STORAGE_TYPE = os.environ.get('STORAGE_TYPE', 'local') # 'local' 或 'minio'
+ STORAGE_DIR = os.environ.get('STORAGE_DIR', './storages')
8.4. knowledgebase_service.py #
app/services/knowledgebase_service.py
# 知识库服务
"""
知识库服务
"""
+import os
+from app.config import Config
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService
# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 导入存储服务
+from app.services.storage_service import storage_service
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
"""知识库服务"""
# 定义创建知识库的方法
def create(self, name: str, user_id: str, description: str = None,
+ chunk_size: int = 512, chunk_overlap: int = 50,
+ cover_image_data: bytes = None, cover_image_filename: str = None) -> dict:
"""
创建知识库
Args:
name: 知识库名称
user_id: 用户ID
description: 描述
chunk_size: 分块大小
chunk_overlap: 分块重叠
+ cover_image_data: 封面图片数据(可选)
+ cover_image_filename: 封面图片文件名(可选)
Returns:
创建的知识库字典
"""
+ cover_image_path = None
# 处理封面图片上传
+ if cover_image_data and cover_image_filename:
# 验证文件类型
+ file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
+ if not file_ext_without_dot:
+ raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")
+ if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
+ raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")
# 验证文件大小
+ if len(cover_image_data) == 0:
+ raise ValueError("上传的图片文件为空")
+ if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
+ raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")
# 启动数据库事务,上下文管理器自动处理提交或回滚
with self.transaction() as session:
# 先创建知识库对象
kb = Knowledgebase(
name=name, # 设置知识库名称
user_id=user_id, # 设置用户ID
description=description, # 设置知识库描述
chunk_size=chunk_size, # 设置分块大小
chunk_overlap=chunk_overlap # 设置分块重叠
)
# 将知识库对象添加到session
session.add(kb)
# 刷新session,生成知识库ID
session.flush() # 刷新以获取 ID,但不提交
# 上传封面图片(如果有)
+ if cover_image_data and cover_image_filename:
+ try:
# 构建封面图片路径(统一使用小写扩展名)
+ file_ext_with_dot = os.path.splitext(cover_image_filename)[1].lower()
+ cover_image_path = f"covers/{kb.id}{file_ext_with_dot}"
+ self.logger.info(f"正在为新知识库 {kb.id} 上传封面图片: 文件名={cover_image_filename}, 路径={cover_image_path}, 大小={len(cover_image_data)} 字节")
# 上传到存储
+ storage_service.upload_file(cover_image_path, cover_image_data)
# 验证文件是否成功上传
+ if not storage_service.file_exists(cover_image_path):
+ raise ValueError(f"上传后文件不存在: {cover_image_path}")
+ self.logger.info(f"成功上传封面图片: {cover_image_path}")
# 更新知识库的封面路径
+ kb.cover_image = cover_image_path
+ session.flush()
+ except Exception as e:
+ self.logger.error(f"上传知识库 {kb.id} 的封面图片时出错: {e}", exc_info=True)
# 如果上传失败,继续创建知识库,但不设置封面
+ cover_image_path = None
# 刷新kb对象的数据库状态
session.refresh(kb)
# 转换kb对象为字典(在session内部,避免分离后出错)
kb_dict = kb.to_dict()
# 记录创建知识库的日志,包含ID
self.logger.info(f"创建了知识库,ID: {kb.id}")
# 返回知识库字典信息
return kb_dict
# 定义获取知识库列表的方法
def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
"""
获取知识库列表
Args:
user_id: 用户ID(可选)
page: 页码
page_size: 每页数量
Returns:
包含 items, total, page, page_size 的字典
"""
# 使用数据库会话
with self.session() as session:
# 查询Knowledgebase表
query = session.query(Knowledgebase)
# 如果指定了user_id,则筛选属于该用户的知识库
if user_id:
query = query.filter(Knowledgebase.user_id == user_id)
# 统计总记录数
total = query.count()
# 计算分页偏移量
offset = (page - 1) * page_size
# 获取当前页的数据列表
kbs = query.offset(offset).limit(page_size).all()
# 初始化知识库字典列表
items = []
# 遍历查询结果,将每一项转为dict后添加到items列表
for kb in kbs:
kb_dict = kb.to_dict()
items.append(kb_dict)
# 返回包含分页信息和数据条目的字典
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size
}
def delete(self, kb_id: str) -> bool:
"""
删除知识库
Args:
kb_id: 知识库ID
Returns:
是否删除成功
"""
with self.transaction() as session:
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
if not kb:
return False
session.delete(kb)
self.logger.info(f"Deleted knowledgebase: {kb_id}")
return True
def get_by_id(self, kb_id: str) -> Optional[dict]:
"""根据ID获取知识库"""
with self.session() as session:
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
if kb:
# 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
return kb.to_dict()
return None
# 定义 update 方法,用于更新知识库
+ def update(self, kb_id: str, cover_image_data: bytes = None,
+ cover_image_filename: str = None, delete_cover: bool = False, **kwargs) -> Optional[dict]:
"""
更新知识库
Args:
kb_id: 知识库ID
cover_image_data: 新的封面图片数据(可选)
cover_image_filename: 新的封面图片文件名(可选)
delete_cover: 是否删除封面图片(可选)
**kwargs: 要更新的字段(name, description, chunk_size, chunk_overlap 等)
Returns:
更新后的知识库字典,如果不存在则返回 None
"""
# 开启数据库事务
with self.transaction() as session:
# 查询指定ID的知识库对象
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
# 如果未找到知识库,则返回 None
if not kb:
return None
# 处理封面图片更新
+ old_cover_path = kb.cover_image if kb.cover_image else None
+ if delete_cover:
# 删除封面图片
+ if old_cover_path:
+ try:
+ storage_service.delete_file(old_cover_path)
+ self.logger.info(f"已删除封面图片: {old_cover_path}")
+ except Exception as e:
+ self.logger.warning(f"删除封面图片时出错: {e}")
+ kwargs['cover_image'] = None
+ elif cover_image_data and cover_image_filename:
# 上传新封面图片
# 验证文件类型
+ file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
+ if not file_ext_without_dot:
+ raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")
+ if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
+ raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")
# 验证文件大小
+ if len(cover_image_data) == 0:
+ raise ValueError("上传的图片文件为空")
+ if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
+ raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")
+ try:
# 构建新封面图片路径(使用原始扩展名,保持大小写)
+ file_ext_with_dot = os.path.splitext(cover_image_filename)[1]
# 统一使用小写扩展名,避免路径不一致的问题
+ file_ext_with_dot = file_ext_with_dot.lower()
+ new_cover_path = f"covers/{kb_id}{file_ext_with_dot}"
+ self.logger.info(f"正在处理知识库 {kb_id} 的封面图片更新: 文件名={cover_image_filename}, 扩展名={file_ext_without_dot}, 大小={len(cover_image_data)} 字节, 新路径={new_cover_path}, 旧路径={old_cover_path}")
# 先上传新封面(确保上传成功后再删除旧封面)
+ storage_service.upload_file(new_cover_path, cover_image_data)
# 验证文件是否成功上传
+ if not storage_service.file_exists(new_cover_path):
+ raise ValueError(f"上传后文件不存在: {new_cover_path}")
+ self.logger.info(f"成功上传封面图片: {new_cover_path}")
# 删除旧封面(如果存在且与新封面路径不同)
+ if old_cover_path and old_cover_path != new_cover_path:
+ try:
+ storage_service.delete_file(old_cover_path)
+ self.logger.info(f"已删除旧封面图片: {old_cover_path}")
+ except Exception as e:
+ self.logger.warning(f"删除旧封面图片时出错: {e}")
# 继续执行,不因为删除旧文件失败而中断更新
# 更新数据库中的封面路径
+ kwargs['cover_image'] = new_cover_path
+ except Exception as e:
+ self.logger.error(f"上传知识库 {kb_id} 的封面图片时出错: {e}", exc_info=True)
+ raise ValueError(f"上传封面图片失败: {str(e)}")
# 遍历要更新的字段和值
for key, value in kwargs.items():
# 判断知识库对象是否有该字段,且值不为 None
+ if hasattr(kb, key) and (key == 'cover_image' or value is not None):
# 设置该字段的新值
setattr(kb, key, value)
# 刷新session,保证对象属性为最新状态
session.flush()
# 刷新对象,避免未提交前读取到旧数据
session.refresh(kb)
# 在事务内部将对象转为字典,避免 session 关闭后访问失败
kb_dict = kb.to_dict()
# 如果本次更新包含 'cover_image' 字段,记录详细日志
if 'cover_image' in kwargs:
self.logger.info(f"更新知识库 {kb_id}, 封面图片={kb_dict.get('cover_image')}")
else:
# 否则仅记录知识库ID
self.logger.info(f"更新知识库: {kb_id}")
# 返回更新后的知识库字典
return kb_dict
# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()
8.5. kb_list.html #
app/templates/kb_list.html
{% extends "base.html" %}
{% block title %}知识库管理 - RAG Lite{% endblock %}
{% block content %}
<style>
@media (min-width: 992px) {
#kbList > div {
flex: 0 0 20%;
max-width: 20%;
}
}
</style>
<div class="row">
<div class="col-12">
<nav aria-label="breadcrumb" class="mb-3">
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href="/">首页</a></li>
<li class="breadcrumb-item active">知识库管理</li>
</ol>
</nav>
<div class="d-flex justify-content-between align-items-center mb-4">
<h2><i class="bi bi-collection"></i> 知识库管理</h2>
<button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
<i class="bi bi-plus-circle"></i> 创建知识库
</button>
</div>
<!-- 知识库列表 -->
<div class="row" id="kbList">
{% if kbs %}
{% for kb in kbs %}
<div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
<div class="card h-100">
+ {% if kb.cover_image %}
+ <img src="/kb/{{ kb.id }}/cover" class="card-img-top" alt="{{ kb.name|e }}" style="height: 150px; object-fit: scale-down;">
+ {% else %}
<div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
<i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
</div>
+ {% endif %}
<div class="card-body">
<h5 class="card-title">
<i class="bi bi-folder"></i> {{ kb.name }}
</h5>
<p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
</div>
<div class="card-footer bg-transparent">
<button class="btn btn-sm btn-warning"
data-kb-id="{{ kb.id }}"
data-kb-name="{{ kb.name }}"
data-kb-description="{{ kb.description or '' }}"
data-kb-chunk-size="{{ kb.chunk_size }}"
data-kb-chunk-overlap="{{ kb.chunk_overlap }}"
data-kb-cover-image="{{ kb.cover_image or '' }}"
onclick="editKbFromButton(this)">
<i class="bi bi-pencil"></i> 编辑
</button>
<button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
<i class="bi bi-trash"></i> 删除
</button>
</div>
</div>
</div>
{% endfor %}
{% else %}
<div class="col-12">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
</div>
</div>
{% endif %}
</div>
<!-- 分页控件 -->
{% if pagination and pagination.total > pagination.page_size %}
<nav aria-label="知识库列表分页" class="mt-4">
<ul class="pagination justify-content-center">
{% set current_page = pagination.page %}
{% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}
<!-- 上一页 -->
<li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
<i class="bi bi-chevron-left"></i> 上一页
</a>
</li>
<!-- 页码 -->
{% set start_page = [1, current_page - 2] | max %}
{% set end_page = [total_pages, current_page + 2] | min %}
{% if start_page > 1 %}
<li class="page-item">
<a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
</li>
{% if start_page > 2 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endif %}
{% for page_num in range(start_page, end_page + 1) %}
<li class="page-item {% if page_num == current_page %}active{% endif %}">
<a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
{{ page_num }}
</a>
</li>
{% endfor %}
{% if end_page < total_pages %}
{% if end_page < total_pages - 1 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
<li class="page-item">
<a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
</li>
{% endif %}
<!-- 下一页 -->
<li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
下一页 <i class="bi bi-chevron-right"></i>
</a>
</li>
</ul>
<div class="text-center text-muted small mt-2">
共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
</div>
</nav>
{% endif %}
</div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">创建知识库</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
<div class="modal-body">
<div class="mb-3">
<label class="form-label">名称 <span class="text-danger">*</span></label>
<input type="text" class="form-control" name="name" required>
</div>
<div class="mb-3">
<label class="form-label">描述</label>
<textarea class="form-control" name="description" rows="3"></textarea>
</div>
+ <div class="mb-3">
+ <label class="form-label">封面图片(可选)</label>
+ <input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="coverImageInput">
+ <div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB</div>
+ <div id="coverImagePreview" class="mt-2" style="display: none;">
+ <img id="coverPreviewImg" src="" alt="封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
+ </div>
+ </div>
<div class="row">
<div class="col-md-6 mb-3">
<label class="form-label">分块大小</label>
<input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
<div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
</div>
<div class="col-md-6 mb-3">
<label class="form-label">分块重叠</label>
<input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
<div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="submit" class="btn btn-primary">创建</button>
</div>
</form>
</div>
</div>
</div>
<!-- 编辑知识库模态框 -->
<div class="modal fade" id="editKbModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">编辑知识库</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<form id="editKbForm" onsubmit="updateKb(event)" enctype="multipart/form-data">
<input type="hidden" name="kb_id" id="editKbId">
<div class="modal-body">
<div class="mb-3">
<label class="form-label">名称 <span class="text-danger">*</span></label>
<input type="text" class="form-control" name="name" id="editKbName" required>
</div>
<div class="mb-3">
<label class="form-label">描述</label>
<textarea class="form-control" name="description" id="editKbDescription" rows="3"></textarea>
</div>
+ <div class="mb-3">
+ <label class="form-label">封面图片</label>
+ <div id="editCoverPreview" class="mb-2">
+ <img id="editCoverPreviewImg" src="" alt="当前封面" class="img-thumbnail" style="max-width: 200px; max-height: 200px; display: none;">
+ <div id="editCoverNoImage" class="text-muted small" style="display: none;">暂无封面</div>
+ </div>
+ <input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="editCoverImageInput">
+ <div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB。留空则不修改封面。</div>
+ <div class="form-check mt-2">
+ <input class="form-check-input" type="checkbox" name="delete_cover" id="editDeleteCover" value="true">
+ <label class="form-check-label" for="editDeleteCover">
+ 删除封面图片
+ </label>
+ </div>
+ <div id="editCoverNewPreview" class="mt-2" style="display: none;">
+ <img id="editCoverNewPreviewImg" src="" alt="新封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
+ </div>
+ </div>
<div class="row">
<div class="col-md-6 mb-3">
<label class="form-label">分块大小</label>
<input type="number" class="form-control" name="chunk_size" id="editKbChunkSize" value="512" min="100" max="2000">
<div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
</div>
<div class="col-md-6 mb-3">
<label class="form-label">分块重叠</label>
<input type="number" class="form-control" name="chunk_overlap" id="editKbChunkOverlap" value="50" min="0" max="200">
<div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="submit" class="btn btn-primary">保存</button>
</div>
</form>
</div>
</div>
</div>
{% endblock %}
{% block extra_js %}
<script>
+// 异步函数,用于创建知识库
async function createKb(event) {
+ // 阻止表单默认提交
event.preventDefault();
+ // 获取表单对象
const form = event.target;
+ // 构造 FormData,收集表单数据
const formData = new FormData(form);
try {
+ // 发送 POST 请求到后端 API,提交表单数据
const response = await fetch('/api/v1/kb', {
method: 'POST',
+ // body为FormData,浏览器会自动设置Content-Type
+ body: formData
});
+ // 如果响应成功,刷新页面
if (response.ok) {
location.reload();
} else {
+ // 否则获取错误信息并弹窗提示
const error = await response.json();
alert('创建失败: ' + error.message);
}
} catch (error) {
+ // 捕获异常并弹窗提示用户
alert('创建失败: ' + error.message);
}
}
+// 异步函数,用于删除知识库
async function deleteKb(kbId, kbName) {
+ // 弹窗确认是否删除知识库
if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
return;
}
try {
+ // 发送 DELETE 请求到后端 API
const response = await fetch(`/api/v1/kb/${kbId}`, {
method: 'DELETE'
});
+ // 如果响应成功,刷新页面
if (response.ok) {
location.reload();
} else {
+ // 否则弹窗提示错误信息
const error = await response.json();
alert('删除失败: ' + error.message);
}
} catch (error) {
+ // 捕获异常并弹窗提示
alert('删除失败: ' + error.message);
}
}
+// 从按钮的 data 属性读取知识库数据,然后打开编辑界面
function editKbFromButton(button) {
+ // 获取知识库ID
const kbId = button.getAttribute('data-kb-id');
+ // 获取知识库名称
const name = button.getAttribute('data-kb-name');
+ // 获取描述,默认为空字符串
const description = button.getAttribute('data-kb-description') || '';
+ // 获取分块大小,默认512
const chunkSize = parseInt(button.getAttribute('data-kb-chunk-size')) || 512;
+ // 获取分块重叠,默认50
const chunkOverlap = parseInt(button.getAttribute('data-kb-chunk-overlap')) || 50;
+ // 获取封面图片路径,默认为空
+ const coverImage = button.getAttribute('data-kb-cover-image') || '';
+ // 调用编辑函数,填充数据到表单
+ editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage);
}
+// 编辑知识库时弹出模态框并初始化数据
+function editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage) {
+ // 设置表单的知识库ID
document.getElementById('editKbId').value = kbId;
+ // 设置知识库名称
document.getElementById('editKbName').value = name;
+ // 设置描述
document.getElementById('editKbDescription').value = description || '';
+ // 设置分块大小
document.getElementById('editKbChunkSize').value = chunkSize;
+ // 设置分块重叠
document.getElementById('editKbChunkOverlap').value = chunkOverlap;
+ // 初始化不勾选删除封面
+ document.getElementById('editDeleteCover').checked = false;
+ // 清空已选择的新封面文件
+ document.getElementById('editCoverImageInput').value = '';
+ // 获取当前封面预览图片元素
+ const previewImg = document.getElementById('editCoverPreviewImg');
+ // 获取“暂无封面”提示元素
+ const noImageDiv = document.getElementById('editCoverNoImage');
+ // 获取新封面预览div
+ const newPreview = document.getElementById('editCoverNewPreview');
+ // 获取新封面预览图片元素
+ const newPreviewImg = document.getElementById('editCoverNewPreviewImg');
+ // 隐藏新图片预览
+ if (newPreview) {
+ newPreview.style.display = 'none';
+ }
+ // 清空新图片预览src
+ if (newPreviewImg) {
+ newPreviewImg.src = '';
+ }
+ // 如果有旧封面,则显示
+ if (coverImage) {
+ previewImg.src = `/kb/${kbId}/cover`;
+ previewImg.style.display = 'block';
+ noImageDiv.style.display = 'none';
+ } else {
+ // 否则显示“暂无封面”
+ previewImg.style.display = 'none';
+ noImageDiv.style.display = 'block';
+ }
+ // 展示编辑知识库的模态框
const modal = new bootstrap.Modal(document.getElementById('editKbModal'));
modal.show();
}
+// 获取编辑时选择封面图片的input
+const editCoverImageInput = document.getElementById('editCoverImageInput');
+// 如果找到了input,则监听change事件
+if (editCoverImageInput) {
+ editCoverImageInput.addEventListener('change', function(e) {
+ // 获取用户选择的第一个文件
+ const file = e.target.files[0];
+ // 获取新封面预览容器
+ const newPreview = document.getElementById('editCoverNewPreview');
+ // 获取新封面预览图片元素
+ const newPreviewImg = document.getElementById('editCoverNewPreviewImg');
+ // 获取删除封面复选框
+ const deleteCheckbox = document.getElementById('editDeleteCover');
+ // 如果有选文件
+ if (file) {
+ // 定义允许的图片类型
+ const validTypes = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif', 'image/webp'];
+ // 如果不符合要求的格式,弹窗提示并重置
+ if (!validTypes.includes(file.type)) {
+ alert('不支持的图片格式,请选择 JPG、PNG、GIF 或 WEBP 格式的图片');
+ e.target.value = '';
+ if (newPreview) {
+ newPreview.style.display = 'none';
+ }
+ return;
+ }
+ // 如果图片超过5MB,弹窗提示并重置
+ if (file.size > 5 * 1024 * 1024) {
+ alert('图片文件大小超过 5MB 限制');
+ e.target.value = '';
+ if (newPreview) {
+ newPreview.style.display = 'none';
+ }
+ return;
+ }
+ // 文件读取器,用于预览图片
+ const reader = new FileReader();
+ // 文件读取完成后显示预览
+ reader.onload = function(event) {
+ if (newPreviewImg) {
+ newPreviewImg.src = event.target.result;
+ }
+ if (newPreview) {
+ newPreview.style.display = 'block';
+ }
+ // 选择新图片时自动取消删除封面的选项
+ if (deleteCheckbox) {
+ deleteCheckbox.checked = false;
+ }
+ };
+ // 读取图片失败时弹窗提示
+ reader.onerror = function() {
+ alert('读取图片文件失败,请重试');
+ e.target.value = '';
+ if (newPreview) {
+ newPreview.style.display = 'none';
+ }
+ };
+ // 以DataURL形式读取以便预览
+ reader.readAsDataURL(file);
+ } else {
+ // 未选择文件则隐藏新封面预览
+ if (newPreview) {
+ newPreview.style.display = 'none';
+ }
+ }
+ });
+}
+// 监听删除封面复选框的变化
+document.getElementById('editDeleteCover')?.addEventListener('change', function(e) {
+ // 获取封面图片上传input
+ const fileInput = document.getElementById('editCoverImageInput');
+ // 获取新封面预览div
+ const newPreview = document.getElementById('editCoverNewPreview');
+ // 如果选中“删除封面”,则清空图片和隐藏新封面预览
+ if (e.target.checked) {
+ fileInput.value = ''; // 清空文件选择
+ newPreview.style.display = 'none';
+ }
+});
+// 异步函数,用于更新知识库
async function updateKb(event) {
+ // 阻止表单默认提交
event.preventDefault();
+ // 获取表单对象
const form = event.target;
+ // 获取表单数据
const formData = new FormData(form);
+ // 从formData中获取知识库ID
const kbId = formData.get('kb_id');
try {
+ // 发送PUT请求到后端API更新知识库
const response = await fetch(`/api/v1/kb/${kbId}`, {
method: 'PUT',
body: formData
});
+ // 如果更新成功,刷新页面
if (response.ok) {
location.reload();
} else {
+ // 否则弹窗显示错误
const error = await response.json();
alert('更新失败: ' + error.message);
}
} catch (error) {
+ // 捕获异常弹窗显示
alert('更新失败: ' + error.message);
}
}
+// 获取封面图片文件输入框并监听change事件(当用户选择文件时触发)
+document.getElementById('coverImageInput')?.addEventListener('change', function(e) {
+ // 获取用户选中的第一个文件
+ const file = e.target.files[0];
+ // 获取用于显示预览的容器
+ const preview = document.getElementById('coverImagePreview');
+ // 获取显示图片的img标签
+ const previewImg = document.getElementById('coverPreviewImg');
+ // 如果用户选择了文件
+ if (file) {
+ // 创建文件读取器
+ const reader = new FileReader();
+ // 文件读取完成后回调
+ reader.onload = function(e) {
+ // 将img标签的src设置为读取得到的图片数据
+ previewImg.src = e.target.result;
+ // 显示预览容器
+ preview.style.display = 'block';
+ };
+ // 以DataURL的形式读取图片文件
+ reader.readAsDataURL(file);
+ } else {
+ // 如果没有选择文件,则隐藏预览
+ preview.style.display = 'none';
+ }
+});
</script>
{% endblock %}8.6 storage__init__.py #
app\services\storage__init__.py
"""
存储服务模块
提供统一的存储接口,支持多种存储后端
"""
from app.services.storage.factory import StorageFactory
__all__ = [
'StorageFactory'
]8.7 storage\base.py #
app\services\storage\base.py
"""
存储服务抽象接口
"""
# 导入抽象基类和抽象方法装饰器
from abc import ABC, abstractmethod
# 导入可选类型提示
from typing import Optional
# 定义存储服务抽象接口类,继承自ABC
class StorageInterface(ABC):
"""存储服务抽象接口"""
# 定义抽象方法:上传文件
@abstractmethod
def upload_file(self, file_path: str, file_data: bytes,
content_type: str = 'application/octet-stream') -> str:
"""
上传文件
Args:
file_path: 文件路径(相对路径)
file_data: 文件数据(bytes)
content_type: 内容类型
Returns:
文件路径
"""
# 方法体由子类实现
pass
# 定义抽象方法:下载文件
@abstractmethod
def download_file(self, file_path: str) -> bytes:
"""
下载文件
Args:
file_path: 文件路径
Returns:
文件数据(bytes)
"""
# 方法体由子类实现
pass
# 定义抽象方法:删除文件
@abstractmethod
def delete_file(self, file_path: str) -> None:
"""
删除文件
Args:
file_path: 文件路径
"""
# 方法体由子类实现
pass
# 定义抽象方法:检查文件是否存在
@abstractmethod
def file_exists(self, file_path: str) -> bool:
"""
检查文件是否存在
Args:
file_path: 文件路径
Returns:
是否存在
"""
# 方法体由子类实现
pass
# 定义抽象方法:获取文件访问URL,可选
@abstractmethod
def get_file_url(self, file_path: str, expires_in: Optional[int] = None) -> Optional[str]:
"""
获取文件访问URL(可选,某些存储后端支持)
Args:
file_path: 文件路径
expires_in: URL过期时间(秒),None表示永久
Returns:
文件URL,如果不支持则返回None
"""
# 方法体由子类实现
pass8.8 storage\factory.py #
app\services\storage\factory.py
"""
存储服务工厂
"""
# 导入日志模块
import logging
# 导入可选类型提示
from typing import Optional
# 导入存储服务接口
from app.services.storage.base import StorageInterface
# 导入本地存储服务实现
from app.services.storage.local_storage import LocalStorage
# 导入配置
from app.config import Config
# 获取logger实例
logger = logging.getLogger(__name__)
# 定义存储服务工厂类
class StorageFactory:
"""存储服务工厂"""
# 定义用于实现单例的静态属性
_instance: Optional[StorageInterface] = None
# 定义创建存储服务实例的类方法
@classmethod
def create_storage(cls, storage_type: Optional[str] = None, **kwargs) -> StorageInterface:
"""
创建存储服务实例
Args:
storage_type: 存储类型 ('local' 或 'minio'),如果为None则从配置读取
**kwargs: 存储服务的初始化参数
Returns:
存储服务实例
"""
# 如果没有传递storage_type,则优先从配置读取(默认local)
if storage_type is None:
storage_type = getattr(Config, 'STORAGE_TYPE', 'local')
# 统一转为小写
storage_type = storage_type.lower()
# 判断存储类型,如果是local则返回本地存储实例
if storage_type == 'local':
# 获取可选的存储目录参数
storage_dir = kwargs.get('storage_dir')
return LocalStorage(storage_dir=storage_dir)
else:
# 不支持的类型抛出异常
raise ValueError(f"Unsupported storage type: {storage_type}")
# 定义获取实例的类方法(用于单例懒加载)
@classmethod
def get_instance(cls) -> StorageInterface:
"""
获取单例存储服务实例(懒加载)
Returns:
存储服务实例
"""
# 如果还未创建实例,则创建一个
if cls._instance is None:
cls._instance = cls.create_storage()
# 返回单例
return cls._instance8.9 local_storage.py #
app\services\storage\local_storage.py
"""
本地文件系统存储实现
"""
# 导入os模块
import os
# 导入日志模块
import logging
# 导入路径操作类
from pathlib import Path
# 导入可选类型提示
from typing import Optional
# 导入存储接口基类
from app.services.storage.base import StorageInterface
# 导入配置信息
from app.config import Config
# 获取logger实例
logger = logging.getLogger(__name__)
class LocalStorage(StorageInterface):
"""本地文件系统存储实现"""
# 初始化本地存储
def __init__(self, storage_dir: Optional[str] = None):
"""
初始化本地存储
Args:
storage_dir: 存储目录,如果为None则使用配置中的值
"""
# 如果没有传入存储目录,则用配置中的存储目录
if storage_dir is None:
storage_dir = Config.STORAGE_DIR
# 判断路径是否为绝对路径
if os.path.isabs(storage_dir):
# 如果是绝对路径,直接用
self.storage_dir = Path(storage_dir)
else:
# 如果是相对路径,则以项目根目录为基准
base_dir = Path(__file__).parent.parent.parent.parent
self.storage_dir = base_dir / storage_dir
# 确保存储目录存在,创建多级目录
self.storage_dir.mkdir(parents=True, exist_ok=True)
# 记录初始化日志
logger.info(f"本地存储已初始化,目录:{self.storage_dir}")
# 获取文件的完整路径
def _get_full_path(self, file_path: str) -> Path:
"""获取文件的完整路径"""
# 拼接存储目录和文件路径
return self.storage_dir / file_path
# 上传文件到本地存储
def upload_file(self, file_path: str, file_data: bytes,
content_type: str = 'application/octet-stream') -> str:
"""上传文件到本地存储"""
try:
# 获取文件完整路径
full_path = self._get_full_path(file_path)
# 创建父目录(如果不存在)
full_path.parent.mkdir(parents=True, exist_ok=True)
# 写入文件内容
with open(full_path, 'wb') as f:
f.write(file_data)
# 记录日志
logger.info(f"文件已上传:{file_path}")
# 返回文件相对路径
return file_path
except Exception as e:
# 上传失败,记录错误日志,继续抛出异常
logger.error(f"上传文件出错:{e}")
raise
# 从本地存储下载文件
def download_file(self, file_path: str) -> bytes:
"""从本地存储下载文件"""
try:
# 获取文件完整路径
full_path = self._get_full_path(file_path)
# 文件不存在则抛出异常
if not full_path.exists():
raise FileNotFoundError(f"文件不存在:{file_path}")
# 读取文件内容
with open(full_path, 'rb') as f:
data = f.read()
# 记录日志
logger.info(f"文件已下载:{file_path}")
# 返回文件数据
return data
except Exception as e:
# 下载出现异常,记录日志并抛出异常
logger.error(f"下载文件出错:{e}")
raise
# 删除本地存储文件
def delete_file(self, file_path: str) -> None:
"""删除文件"""
try:
# 获取文件完整路径
full_path = self._get_full_path(file_path)
# 如果文件存在则删除
if full_path.exists():
full_path.unlink()
# 记录日志
logger.info(f"文件已删除:{file_path}")
# 尝试删除父目录(为空才能删,主要防止目录冗余)
try:
full_path.parent.rmdir()
except OSError:
# 如果目录不为空,忽略该异常
pass
except Exception as e:
# 删除文件时出错
logger.error(f"删除文件出错:{e}")
raise
# 检查文件是否存在
def file_exists(self, file_path: str) -> bool:
"""检查文件是否存在"""
# 获取文件完整路径
full_path = self._get_full_path(file_path)
# 判断文件是否存在
return full_path.exists()
# 获取文件URL,本地存储不支持直接返回None
def get_file_url(self, file_path: str, expires_in: Optional[int] = None) -> Optional[str]:
"""本地存储不支持URL,返回None"""
return None8.10. config.py #
app/config.py
"""
配置管理模块
"""
# 导入操作系统相关模块
import os
# 导入 Path,处理路径
from pathlib import Path
# 导入 dotenv,用于加载 .env 文件中的环境变量
from dotenv import load_dotenv
# 加载 .env 文件中的环境变量到系统环境变量
load_dotenv()
# 定义应用配置类
class Config:
"""应用配置类"""
# 基础配置
# 项目根目录路径(取上级目录)
BASE_DIR = Path(__file__).parent.parent
# 加载环境变量 SECRET_KEY,若未设置则使用默认开发密钥
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
# 应用配置
# 读取应用监听的主机地址,默认为本地所有地址
APP_HOST = os.environ.get('APP_HOST', '0.0.0.0')
# 读取应用监听的端口,默认为 5000,类型为 int
APP_PORT = int(os.environ.get('APP_PORT', 5000))
# 读取 debug 模式配置,字符串转小写等于 'true' 则为 True(开启调试)
APP_DEBUG = os.environ.get('APP_DEBUG', 'false').lower() == 'true'
# 读取允许上传的最大文件大小,默认为 100MB,类型为 int
MAX_FILE_SIZE = int(os.environ.get('MAX_FILE_SIZE', 104857600)) # 100MB
# 允许上传的文件扩展名集合
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt', 'md'}
# 允许上传的图片扩展名集合
ALLOWED_IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
# 允许上传的图片最大大小,默认为 5MB,类型为 int
MAX_IMAGE_SIZE = int(os.environ.get('MAX_IMAGE_SIZE', 5242880)) # 5MB
# 日志配置
# 日志目录,默认 './logs'
LOG_DIR = os.environ.get('LOG_DIR', './logs')
# 日志文件名,默认 'rag_lite.log'
LOG_FILE = os.environ.get('LOG_FILE', 'rag_lite.log')
# 日志等级,默认 'INFO'
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
# 是否启用控制台日志,默认 True
LOG_ENABLE_CONSOLE = os.environ.get('LOG_ENABLE_CONSOLE', 'true').lower() == 'true'
# 是否启用文件日志,默认 True
LOG_ENABLE_FILE = os.environ.get('LOG_ENABLE_FILE', 'true').lower() == 'true'
# 数据库配置
# 数据库主机地址,默认为 'localhost'
DB_HOST = os.environ.get('DB_HOST', 'localhost')
# 数据库端口号,默认为 3306
DB_PORT = int(os.environ.get('DB_PORT', 3306))
# 数据库用户名,默认为 'root'
DB_USER = os.environ.get('DB_USER', 'root')
# 数据库密码,默认为 'root'
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'root')
# 数据库名称,默认为 'rag-lite'
DB_NAME = os.environ.get('DB_NAME', 'rag-lite')
# 数据库字符集,默认为 'utf8mb4'
DB_CHARSET = os.environ.get('DB_CHARSET', 'utf8mb4')
+ # 存储配置
+ STORAGE_TYPE = os.environ.get('STORAGE_TYPE', 'local') # 'local' 或 'minio'
+ STORAGE_DIR = os.environ.get('STORAGE_DIR', './storage')9.保存封面到Minio #
- Minio
# 说明:Windows PowerShell 中下载并运行 MinIO Invoke-WebRequest -Uri https://dl.min.io/server/minio/release/windows-amd64/minio.exe -OutFile minio.exe .\minio.exe server D:\minio-data --console-address ":9001"
9.1. factory.py #
app/services/storage/factory.py
"""
存储服务工厂
"""
# 导入日志模块
import logging
# 导入可选类型提示
from typing import Optional
# 导入存储服务接口
from app.services.storage.base import StorageInterface
# 导入本地存储服务实现
from app.services.storage.local_storage import LocalStorage
# 导入MinIO存储服务实现
+from app.services.storage.minio_storage import MinIOStorage
# 导入配置
from app.config import Config
# 获取logger实例
logger = logging.getLogger(__name__)
# 定义存储服务工厂类
class StorageFactory:
"""存储服务工厂"""
# 定义用于实现单例的静态属性
_instance: Optional[StorageInterface] = None
# 定义创建存储服务实例的类方法
@classmethod
def create_storage(cls, storage_type: Optional[str] = None, **kwargs) -> StorageInterface:
"""
创建存储服务实例
Args:
storage_type: 存储类型 ('local' 或 'minio'),如果为None则从配置读取
**kwargs: 存储服务的初始化参数
Returns:
存储服务实例
"""
# 如果没有传递storage_type,则优先从配置读取(默认local)
if storage_type is None:
storage_type = getattr(Config, 'STORAGE_TYPE', 'local')
# 统一转为小写
storage_type = storage_type.lower()
# 判断存储类型,如果是local则返回本地存储实例
if storage_type == 'local':
# 获取可选的存储目录参数
storage_dir = kwargs.get('storage_dir')
return LocalStorage(storage_dir=storage_dir)
+ # 判断存储类型是否为 'minio'
+ elif storage_type == 'minio':
+ # 优先从参数中获取 endpoint,否则从配置中获取 MINIO_ENDPOINT,没有则为''
+ endpoint = kwargs.get('endpoint') or getattr(Config, 'MINIO_ENDPOINT', '')
+ # 优先从参数中获取 access_key,否则从配置中获取 MINIO_ACCESS_KEY,没有则为''
+ access_key = kwargs.get('access_key') or getattr(Config, 'MINIO_ACCESS_KEY', '')
+ # 优先从参数中获取 secret_key,否则从配置中获取 MINIO_SECRET_KEY,没有则为''
+ secret_key = kwargs.get('secret_key') or getattr(Config, 'MINIO_SECRET_KEY', '')
+ # 优先从参数中获取 bucket_name,否则从配置中获取 MINIO_BUCKET_NAME,默认值为 'rag-lite'
+ bucket_name = kwargs.get('bucket_name') or getattr(Config, 'MINIO_BUCKET_NAME', 'rag-lite')
+ # 优先从参数中获取 secure,否则从配置中获取 MINIO_SECURE,默认值为 False
+ secure = kwargs.get('secure', getattr(Config, 'MINIO_SECURE', False))
+ # 优先从参数中获取 region,否则从配置中获取 MINIO_REGION,默认值为 None
+ region = kwargs.get('region', getattr(Config, 'MINIO_REGION', None))
+ # 如果 endpoint、access_key 或 secret_key 其中任一为空,则抛出 ValueError 异常
+ if not endpoint or not access_key or not secret_key:
+ raise ValueError(
+ "MinIO 存储需要提供 endpoint、access_key 和 secret_key,请在环境变量中配置或通过参数传递。"
+ )
+ # 创建并返回 MinIOStorage 实例
+ return MinIOStorage(
+ endpoint=endpoint,#MinIO 服务端点(如:localhost:9000)
+ access_key=access_key,#访问密钥
+ secret_key=secret_key,#秘密密钥
+ bucket_name=bucket_name,#存储桶名称
+ secure=secure,#是否使用HTTPS
+ region=region#区域(可选)
+ )
else:
# 不支持的类型抛出异常
raise ValueError(f"Unsupported storage type: {storage_type}")
# 定义获取实例的类方法(用于单例懒加载)
@classmethod
def get_instance(cls) -> StorageInterface:
"""
获取单例存储服务实例(懒加载)
Returns:
存储服务实例
"""
# 如果还未创建实例,则创建一个
if cls._instance is None:
cls._instance = cls.create_storage()
# 返回单例
return cls._instance9.2. minio_storage.py #
app/services/storage/minio_storage.py
# MinIO 对象存储实现的说明文档
"""
MinIO 对象存储实现
"""
# 导入日志模块
import logging
# 导入可选类型提示
from typing import Optional
# 导入内存字节流处理
from io import BytesIO
# 导入存储接口基类
from app.services.storage.base import StorageInterface
# 导入Minio类和异常
from minio import Minio
# 导入Minio异常
from minio.error import S3Error
# 获取日志记录器
logger = logging.getLogger(__name__)
# 定义 MinIOStorage 类,继承 StorageInterface
class MinIOStorage(StorageInterface):
# MinIO 对象存储实现
"""MinIO 对象存储实现"""
# 初始化方法,接受端点、密钥、桶名等参数
def __init__(self, endpoint: str, access_key: str, secret_key: str,
bucket_name: str, secure: bool = False, region: Optional[str] = None):
# 初始化 MinIO 存储
"""
初始化 MinIO 存储
Args:
endpoint: MinIO 服务端点(如:localhost:9000)
access_key: 访问密钥
secret_key: 秘密密钥
bucket_name: 存储桶名称
secure: 是否使用HTTPS
region: 区域(可选)
"""
# 创建 Minio 客户端实例
self.client = Minio(
endpoint,#MinIO 服务端点(如:localhost:9000)
access_key=access_key,#访问密钥
secret_key=secret_key,#秘密密钥
secure=secure,#是否使用HTTPS
region=region#区域(可选)
)
# 保存桶名称
self.bucket_name = bucket_name
# 检查桶是否存在,不存在则创建
if not self.client.bucket_exists(bucket_name):
# 创建存储桶
self.client.make_bucket(bucket_name)
# 记录桶创建日志
logger.info(f"已创建桶: {bucket_name}")
# 记录MinIO初始化完成日志
logger.info(f"MinIO 存储初始化完成,桶名: {bucket_name}")
# 上传文件到 MinIO
def upload_file(self, file_path: str, file_data: bytes,
content_type: str = 'application/octet-stream') -> str:
# 上传文件到 MinIO
"""上传文件到 MinIO"""
try:
# 创建 BytesIO 流用于上传
data_stream = BytesIO(file_data)
# 使用 put_object 方法上传文件
self.client.put_object(
self.bucket_name,
file_path,
data_stream,
length=len(file_data),
content_type=content_type
)
# 上传成功,记录日志
logger.info(f"已上传文件到 MinIO: {file_path}")
# 返回文件路径
return file_path
# 捕获 S3Error 类型异常
except S3Error as e:
# 上传时报错,记录日志后抛出异常
logger.error(f"上传文件到 MinIO 时报错: {e}")
raise
# 捕获其它异常
except Exception as e:
# 上传时出现异常,记录日志并抛出
logger.error(f"上传文件到 MinIO 时发生异常: {e}")
raise
# 下载文件方法
def download_file(self, file_path: str) -> bytes:
# 从 MinIO 下载文件
"""从 MinIO 下载文件"""
try:
# 获取对象句柄
response = self.client.get_object(self.bucket_name, file_path)
# 读取数据
data = response.read()
# 关闭响应体
response.close()
# 释放连接
response.release_conn()
# 记录下载日志
logger.info(f"已从 MinIO 下载文件: {file_path}")
# 返回二进制数据
return data
# 捕获 S3Error 异常
except S3Error as e:
# 如果对象不存在,抛出文件未找到异常
if e.code == 'NoSuchKey':
raise FileNotFoundError(f"文件不存在: {file_path}")
# 其它S3错误,记录日志后抛出
logger.error(f"从 MinIO 下载文件时报错: {e}")
raise
# 捕获其它异常
except Exception as e:
# 下载时发生其它异常,记录并抛出
logger.error(f"从 MinIO 下载文件时发生异常: {e}")
raise
# 删除文件方法
def delete_file(self, file_path: str) -> None:
# 从 MinIO 删除文件
"""从 MinIO 删除文件"""
try:
# 调用 Minio API 删除文件
self.client.remove_object(self.bucket_name, file_path)
# 记录删除日志
logger.info(f"已从 MinIO 删除文件: {file_path}")
# 捕获 S3Error 异常
except S3Error as e:
# 记录删除时报错日志
logger.error(f"从 MinIO 删除文件时报错: {e}")
raise
# 捕获其它异常
except Exception as e:
# 删除文件发生异常,记录并抛出
logger.error(f"从 MinIO 删除文件时发生异常: {e}")
raise
# 判断文件是否存在方法
def file_exists(self, file_path: str) -> bool:
# 检查文件是否存在于 MinIO
"""检查文件是否存在于 MinIO"""
try:
# 检查对象状态(存在则无异常)
self.client.stat_object(self.bucket_name, file_path)
# 存在返回 True
return True
# 捕获 S3Error 异常
except S3Error as e:
# 如果对象不存在,返回 False
if e.code == 'NoSuchKey':
return False
# 其它 S3 错误抛出
raise
# 捕获其它异常
except Exception as e:
# 检查文件存在时异常,记录并抛出
logger.error(f"在 MinIO 检查文件是否存在时发生异常: {e}")
raise
# 获取文件访问URL方法
def get_file_url(self, file_path: str, expires_in: Optional[int] = None) -> Optional[str]:
# 获取 MinIO 文件的预签名URL
"""获取 MinIO 文件的预签名URL"""
try:
# 如果未指定过期时间,默认为7天
if expires_in is None:
expires_in = 7 * 24 * 3600 # 默认7天
# 生成预签名URL
url = self.client.presigned_get_object(
self.bucket_name,
file_path,
expires=expires_in
)
# 返回预签名URL
return url
# 捕获 S3Error 异常
except S3Error as e:
# 生成URL时报错,记录日志返回None
logger.error(f"生成 MinIO 文件预签名URL时报错: {e}")
return None
# 捕获其它异常
except Exception as e:
# 发生其它异常,记录日志返回None
logger.error(f"生成 MinIO 文件预签名URL时发生异常: {e}")
return None
9.3. .env #
.env
# 应用配置
APP_HOST=0.0.0.0
APP_PORT=5000
APP_DEBUG=True
MAX_FILE_SIZE=104857600
SECRET_KEY=dev-secret-key-change-in-production
# 日志配置
LOG_DIR=./logs
LOG_FILE=rag_lite.log
LOG_LEVEL=INFO
LOG_ENABLE_FILE=True
LOG_ENABLE_CONSOLE=True
# 数据库配置
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=root
DB_NAME=rag-lite
DB_CHARSET=utf8mb4
# 存储配置
+STORAGE_TYPE=minio
+MINIO_ENDPOINT=127.0.0.1:9000
+MINIO_ACCESS_KEY=minioadmin
+MINIO_SECRET_KEY=minioadmin9.4. config.py #
app/config.py
"""
配置管理模块
"""
# 导入操作系统相关模块
import os
# 导入 Path,处理路径
from pathlib import Path
# 导入 dotenv,用于加载 .env 文件中的环境变量
from dotenv import load_dotenv
# 加载 .env 文件中的环境变量到系统环境变量
load_dotenv()
# 定义应用配置类
class Config:
"""应用配置类"""
# 基础配置
# 项目根目录路径(取上级目录)
BASE_DIR = Path(__file__).parent.parent
# 加载环境变量 SECRET_KEY,若未设置则使用默认开发密钥
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
# 应用配置
# 读取应用监听的主机地址,默认为本地所有地址
APP_HOST = os.environ.get('APP_HOST', '0.0.0.0')
# 读取应用监听的端口,默认为 5000,类型为 int
APP_PORT = int(os.environ.get('APP_PORT', 5000))
# 读取 debug 模式配置,字符串转小写等于 'true' 则为 True(开启调试)
APP_DEBUG = os.environ.get('APP_DEBUG', 'false').lower() == 'true'
# 读取允许上传的最大文件大小,默认为 100MB,类型为 int
MAX_FILE_SIZE = int(os.environ.get('MAX_FILE_SIZE', 104857600)) # 100MB
# 允许上传的文件扩展名集合
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt', 'md'}
# 允许上传的图片扩展名集合
ALLOWED_IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
# 允许上传的图片最大大小,默认为 5MB,类型为 int
MAX_IMAGE_SIZE = int(os.environ.get('MAX_IMAGE_SIZE', 5242880)) # 5MB
# 日志配置
# 日志目录,默认 './logs'
LOG_DIR = os.environ.get('LOG_DIR', './logs')
# 日志文件名,默认 'rag_lite.log'
LOG_FILE = os.environ.get('LOG_FILE', 'rag_lite.log')
# 日志等级,默认 'INFO'
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
# 是否启用控制台日志,默认 True
LOG_ENABLE_CONSOLE = os.environ.get('LOG_ENABLE_CONSOLE', 'true').lower() == 'true'
# 是否启用文件日志,默认 True
LOG_ENABLE_FILE = os.environ.get('LOG_ENABLE_FILE', 'true').lower() == 'true'
# 数据库配置
# 数据库主机地址,默认为 'localhost'
DB_HOST = os.environ.get('DB_HOST', 'localhost')
# 数据库端口号,默认为 3306
DB_PORT = int(os.environ.get('DB_PORT', 3306))
# 数据库用户名,默认为 'root'
DB_USER = os.environ.get('DB_USER', 'root')
# 数据库密码,默认为 'root'
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'root')
# 数据库名称,默认为 'rag-lite'
DB_NAME = os.environ.get('DB_NAME', 'rag-lite')
# 数据库字符集,默认为 'utf8mb4'
DB_CHARSET = os.environ.get('DB_CHARSET', 'utf8mb4')
# 存储配置
STORAGE_TYPE = os.environ.get('STORAGE_TYPE', 'local') # 'local' 或 'minio'
STORAGE_DIR = os.environ.get('STORAGE_DIR', './storage')
# MinIO 配置(当 STORAGE_TYPE='minio' 时使用)
+ MINIO_ENDPOINT = os.environ.get('MINIO_ENDPOINT', '')
+ MINIO_ACCESS_KEY = os.environ.get('MINIO_ACCESS_KEY', '')
+ MINIO_SECRET_KEY = os.environ.get('MINIO_SECRET_KEY', '')
+ MINIO_BUCKET_NAME = os.environ.get('MINIO_BUCKET_NAME', 'rag-lite')
+ MINIO_SECURE = os.environ.get('MINIO_SECURE', 'false').lower() == 'true'
+ MINIO_REGION = os.environ.get('MINIO_REGION', None)10.搜索和排序 #
10.1. knowledgebase.py #
app/blueprints/knowledgebase.py
# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""
# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template,send_file,abort
# 使用BytesIO将图片数据包装为文件流
from io import BytesIO
# 导入logging模块
import logging
# 导入mimetypes、os模块用于类型判断
import mimetypes
# 导入os模块用于路径操作
import os
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params,check_ownership)
# 导入存储服务
from app.services.storage_service import storage_service
# 配置logger
logger = logging.getLogger(__name__)
# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)
# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
# 接口用途说明文档字符串
"""创建知识库"""
# 获取当前用户,如未登录则返回错误响应
current_user, err = get_current_user_or_error()
if err:
return err
# 检查请求是否为multipart/form-data(用于文件上传的表单方式)
if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单数据中获取知识库名称
name = request.form.get('name')
# 如果未传入name参数,返回错误
if not name:
return error_response("name is required", 400)
# 获取描述字段,没有则为None
description = request.form.get('description') or None
# 获取分块大小,默认为512
chunk_size = int(request.form.get('chunk_size', 512))
# 获取分块重叠,默认为50
chunk_overlap = int(request.form.get('chunk_overlap', 50))
# 设置封面图片数据变量初值为None
cover_image_data = None
# 设置封面图片文件名变量初值为None
cover_image_filename = None
# 判断请求中是否包含'cover_image'文件
if 'cover_image' in request.files:
# 获取上传的封面图片文件对象
cover_file = request.files['cover_image']
# 如果上传的文件存在且有文件名
if cover_file and cover_file.filename:
# 读取文件内容为二进制数据
cover_image_data = cover_file.read()
# 获取上传文件的文件名
cover_image_filename = cover_file.filename
# 记录封面图片上传的信息到日志,包括文件名、字节大小和内容类型
logger.info(f"收到新知识库的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")
else:
# 如果是json请求数据(向后兼容旧用法)
data = request.get_json()
# 判断是否存在name字段,不存在则报错
if not data or 'name' not in data:
return error_response("name is required", 400)
# 获取知识库名称
name = data['name']
# 获取描述
description = data.get('description')
# 获取分块大小,默认为512
chunk_size = data.get('chunk_size', 512)
# 获取分块重叠,默认为50
chunk_overlap = data.get('chunk_overlap', 50)
# 设置封面图片数据变量初值为None
cover_image_data = None
# 设置封面图片文件名变量初值为None
cover_image_filename = None
# 调用知识库服务,创建知识库,返回知识库信息字典
kb_dict = kb_service.create(
name=name, # 知识库名称
user_id=current_user['id'], # 用户ID
description=description, # 知识库描述
chunk_size=chunk_size, # 分块大小
chunk_overlap=chunk_overlap, # 分块重叠
cover_image_data=cover_image_data, # 封面图片数据
cover_image_filename=cover_image_filename # 封面图片文件名
)
# 返回成功响应,包含知识库信息
return success_response(kb_dict)
# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
# 设置本函数用途说明(文档字符串)
"""知识库列表页面"""
# 获取当前登录用户信息
current_user = get_current_user()
# 获取分页参数(页码和每页大小),最大每页100
page, page_size = get_pagination_params(max_page_size=100)
# 获取搜索和排序参数
+ search = request.args.get('search', '').strip() or None
+ sort_by = request.args.get('sort_by', 'created_at')
+ sort_order = request.args.get('sort_order', 'desc')
# 验证排序参数
+ if sort_by not in ['created_at', 'name', 'updated_at']:
+ sort_by = 'created_at'
+ if sort_order not in ['asc', 'desc']:
+ sort_order = 'desc'
# 调用知识库服务,获取分页后的知识库列表结果
result = kb_service.list(
user_id=current_user['id'], # 用户ID
page=page, # 页码
+ page_size=page_size, # 每页大小
+ search=search, # 搜索关键词
+ sort_by=sort_by, # 排序字段
+ sort_order=sort_order # 排序方向
)
# 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
return render_template('kb_list.html',
kbs=result['items'],
+ pagination=result,
+ search=search or '',
+ sort_by=sort_by,
+ sort_order=sort_order)
# 注册DELETE方法的API路由,用于删除知识库
@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
# 要求API登录
@api_login_required
# 处理API错误的装饰器
@handle_api_error
def api_delete(kb_id):
"""删除知识库"""
# 获取当前用户信息,如果未登录则返回错误
current_user, err = get_current_user_or_error()
if err:
return err
# 根据知识库ID获取知识库信息
kb_dict = kb_service.get_by_id(kb_id)
# 如果知识库不存在,返回404错误
if not kb_dict:
return error_response("未找到知识库", 404)
# 验证当前用户是否拥有该知识库的操作权限
has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
if not has_permission:
return err
# 调用服务删除知识库
success = kb_service.delete(kb_id)
# 如果删除失败,返回404错误
if not success:
return error_response("未找到知识库", 404)
# 返回删除成功的响应
return success_response("知识库删除成功")
# 注册PUT方法的API路由(用于更新知识库)
@bp.route('/api/v1/kb/<kb_id>', methods=['PUT'])
# 要求API登录
@api_login_required
# 捕获API内部错误的装饰器
@handle_api_error
def api_update(kb_id):
# 定义API用于更新知识库(含封面图片)
"""更新知识库(支持封面图片更新)"""
# 获取当前登录用户信息,如果未登录则返回错误响应
current_user, err = get_current_user_or_error()
if err:
return err
# 获取指定ID的知识库记录,验证其是否存在
kb_dict = kb_service.get_by_id(kb_id)
if not kb_dict:
return error_response("未找到知识库", 404)
# 校验当前用户是否有操作该知识库的权限
has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
if not has_permission:
return err
# 判断请求内容类型是否为multipart/form-data(一般用于带文件上传的表单提交)
if request.content_type and 'multipart/form-data' in request.content_type:
# 从表单中获取普通字段
name = request.form.get('name')
description = request.form.get('description') or None
chunk_size = request.form.get('chunk_size')
chunk_overlap = request.form.get('chunk_overlap')
# 初始化封面图片相关变量
cover_image_data = None
cover_image_filename = None
# 获得delete_cover字段(类型字符串,需判断是否为'true')
delete_cover = request.form.get('delete_cover') == 'true'
# 如果有上传封面图片,则读取文件内容
if 'cover_image' in request.files:
cover_file = request.files['cover_image']
if cover_file and cover_file.filename:
cover_image_data = cover_file.read()
cover_image_filename = cover_file.filename
# 记录上传日志
logger.info(f"收到知识库 {kb_id} 的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")
# 构建待更新的数据
update_data = {}
if name:
update_data['name'] = name
if description is not None:
update_data['description'] = description
if chunk_size:
update_data['chunk_size'] = int(chunk_size)
if chunk_overlap:
update_data['chunk_overlap'] = int(chunk_overlap)
else:
# 非表单上传,则按JSON结构解析请求内容
data = request.get_json()
# 如果请求体是空的,直接返回错误
if not data:
return error_response("请求体不能为空", 400)
# 构建可更新的数据字典
update_data = {}
if 'name' in data:
update_data['name'] = data['name']
if 'description' in data:
update_data['description'] = data.get('description')
if 'chunk_size' in data:
update_data['chunk_size'] = data['chunk_size']
if 'chunk_overlap' in data:
update_data['chunk_overlap'] = data['chunk_overlap']
# JSON请求时,cover_image相关变量置空
cover_image_data = None
cover_image_filename = None
delete_cover = data.get('delete_cover', False)
# 调用服务更新知识库,传入各字段及封面参数
updated_kb = kb_service.update(
kb_id=kb_id, # 知识库ID
cover_image_data=cover_image_data, # 封面图片的二进制内容
cover_image_filename=cover_image_filename, # 封面图片文件名
delete_cover=delete_cover, # 是否删除封面图片
**update_data # 其它可变字段
)
# 更新后如果找不到,返回404
if not updated_kb:
return error_response("未找到知识库", 404)
# 更新成功后,将最新的知识库数据返回给前端
return success_response(updated_kb, "知识库更新成功")
# 定义路由,获取指定知识库ID的封面图片,仅限登录用户访问
@bp.route('/kb/<kb_id>/cover')
@login_required
def kb_cover(kb_id):
"""获取知识库封面图片"""
# 获取当前已登录用户的信息
current_user = get_current_user()
# 根据知识库ID从知识库服务获取对应的知识库信息
kb = kb_service.get_by_id(kb_id)
# 检查知识库是否存在
if not kb:
# 如果知识库不存在,记录警告日志
logger.warning(f"知识库不存在: {kb_id}")
abort(404)
# 检查是否有权限访问(只能查看自己的知识库封面)
if kb.get('user_id') != current_user['id']:
# 如果不是当前用户的知识库,记录警告日志
logger.warning(f"用户 {current_user['id']} 尝试访问知识库 {kb_id} 的封面,但该知识库属于用户 {kb.get('user_id')}")
abort(403)
# 获取知识库的封面图片路径
cover_path = kb.get('cover_image')
# 检查是否有封面图片
if not cover_path:
# 如果没有封面,记录调试日志
logger.debug(f"知识库 {kb_id} 没有封面图片")
abort(404)
try:
# 通过存储服务下载封面图片数据
image_data = storage_service.download_file(cover_path)
# 如果未能获取到图片数据,记录错误日志并返回404
if not image_data:
logger.error(f"从路径下载封面图片失败: {cover_path}")
abort(404)
# 根据文件扩展名判断图片MIME类型
file_ext = os.path.splitext(cover_path)[1].lower()
# 自定义映射,优先根据文件扩展名判断图片MIME类型
mime_type_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}
# 优先根据自定义映射获取MIME类型
mime_type = mime_type_map.get(file_ext)
if not mime_type:
# 如果没有命中自定义映射,则使用mimetypes猜测类型
mime_type, _ = mimetypes.guess_type(cover_path)
if not mime_type:
# 如果还未识别出类型,则默认用JPEG
mime_type = 'image/jpeg'
# 通过send_file响应图片数据和MIME类型,不以附件形式发送
return send_file(
BytesIO(image_data),#图片数据
mimetype=mime_type,#MIME类型
as_attachment=False#不以附件形式发送
)
except FileNotFoundError as e:
# 捕获文件未找到异常,记录错误日志
logger.error(f"封面图片文件未找到: {cover_path}, 错误: {e}")
abort(404)
except Exception as e:
# 捕获其他未预期异常,记录错误日志(包含堆栈信息)
logger.error(f"提供知识库 {kb_id} 的封面图片时出错, 路径: {cover_path}, 错误: {e}", exc_info=True)
abort(404)10.2. knowledgebase_service.py #
app/services/knowledgebase_service.py
# 知识库服务
"""
知识库服务
"""
import os
from app.config import Config
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService
# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 导入存储服务
from app.services.storage_service import storage_service
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
"""知识库服务"""
# 定义创建知识库的方法
def create(self, name: str, user_id: str, description: str = None,
chunk_size: int = 512, chunk_overlap: int = 50,
cover_image_data: bytes = None, cover_image_filename: str = None) -> dict:
"""
创建知识库
Args:
name: 知识库名称
user_id: 用户ID
description: 描述
chunk_size: 分块大小
chunk_overlap: 分块重叠
cover_image_data: 封面图片数据(可选)
cover_image_filename: 封面图片文件名(可选)
Returns:
创建的知识库字典
"""
cover_image_path = None
# 处理封面图片上传
if cover_image_data and cover_image_filename:
# 验证文件类型
file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
if not file_ext_without_dot:
raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")
if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")
# 验证文件大小
if len(cover_image_data) == 0:
raise ValueError("上传的图片文件为空")
if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")
# 启动数据库事务,上下文管理器自动处理提交或回滚
with self.transaction() as session:
# 先创建知识库对象
kb = Knowledgebase(
name=name, # 设置知识库名称
user_id=user_id, # 设置用户ID
description=description, # 设置知识库描述
chunk_size=chunk_size, # 设置分块大小
chunk_overlap=chunk_overlap # 设置分块重叠
)
# 将知识库对象添加到session
session.add(kb)
# 刷新session,生成知识库ID
session.flush() # 刷新以获取 ID,但不提交
# 上传封面图片(如果有)
if cover_image_data and cover_image_filename:
try:
# 构建封面图片路径(统一使用小写扩展名)
file_ext_with_dot = os.path.splitext(cover_image_filename)[1].lower()
cover_image_path = f"covers/{kb.id}{file_ext_with_dot}"
self.logger.info(f"正在为新知识库 {kb.id} 上传封面图片: 文件名={cover_image_filename}, 路径={cover_image_path}, 大小={len(cover_image_data)} 字节")
# 上传到存储
storage_service.upload_file(cover_image_path, cover_image_data)
# 验证文件是否成功上传
if not storage_service.file_exists(cover_image_path):
raise ValueError(f"上传后文件不存在: {cover_image_path}")
self.logger.info(f"成功上传封面图片: {cover_image_path}")
# 更新知识库的封面路径
kb.cover_image = cover_image_path
session.flush()
except Exception as e:
self.logger.error(f"上传知识库 {kb.id} 的封面图片时出错: {e}", exc_info=True)
# 如果上传失败,继续创建知识库,但不设置封面
cover_image_path = None
# 刷新kb对象的数据库状态
session.refresh(kb)
# 转换kb对象为字典(在session内部,避免分离后出错)
kb_dict = kb.to_dict()
# 记录创建知识库的日志,包含ID
self.logger.info(f"创建了知识库,ID: {kb.id}")
# 返回知识库字典信息
return kb_dict
# 定义获取知识库列表的方法
+ def list(self, user_id: str = None, page: int = 1, page_size: int = 10,
+ search: str = None, sort_by: str = 'created_at', sort_order: str = 'desc') -> Dict:
"""
获取知识库列表
Args:
user_id: 用户ID(可选)
page: 页码
page_size: 每页数量
+ search: 搜索关键词(搜索名称和描述)
+ sort_by: 排序字段(created_at, name, updated_at)
+ sort_order: 排序方向(asc, desc)
Returns:
包含 items, total, page, page_size 的字典
"""
# 使用数据库会话
with self.session() as session:
# 查询Knowledgebase表
query = session.query(Knowledgebase)
# 如果指定了user_id,则筛选属于该用户的知识库
if user_id:
query = query.filter(Knowledgebase.user_id == user_id)
# 如果提供了搜索关键词,则按名称和描述进行模糊搜索
+ if search:
# 构造模糊搜索的SQL模式
+ search_pattern = f"%{search}%"
# 通过SQLAlchemy的filter方法实现名称或描述字段的模糊查询
+ query = query.filter(
# 名称模糊匹配
+ (Knowledgebase.name.like(search_pattern)) |
# 描述模糊匹配
+ (Knowledgebase.description.like(search_pattern))
+ )
# 排序逻辑处理
# 初始化排序字段变量
+ sort_field = None
# 如果排序字段为'name',按名称排序
+ if sort_by == 'name':
+ sort_field = Knowledgebase.name
# 如果排序字段为'updated_at',按更新时间排序
+ elif sort_by == 'updated_at':
+ sort_field = Knowledgebase.updated_at
# 否则默认按创建时间排序
+ else:
+ sort_field = Knowledgebase.created_at
# 根据排序顺序(升序或降序)添加排序
+ if sort_order == 'asc':
# 升序排列
+ query = query.order_by(sort_field.asc())
+ else:
# 默认降序排列
+ query = query.order_by(sort_field.desc())
# 统计总记录数
total = query.count()
# 计算分页偏移量
offset = (page - 1) * page_size
# 获取当前页的数据列表
kbs = query.offset(offset).limit(page_size).all()
# 初始化知识库字典列表
items = []
# 遍历查询结果,将每一项转为dict后添加到items列表
for kb in kbs:
kb_dict = kb.to_dict()
items.append(kb_dict)
# 返回包含分页信息和数据条目的字典
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size
}
def delete(self, kb_id: str) -> bool:
"""
删除知识库
Args:
kb_id: 知识库ID
Returns:
是否删除成功
"""
with self.transaction() as session:
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
if not kb:
return False
session.delete(kb)
self.logger.info(f"Deleted knowledgebase: {kb_id}")
return True
def get_by_id(self, kb_id: str) -> Optional[dict]:
"""根据ID获取知识库"""
with self.session() as session:
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
if kb:
# 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
return kb.to_dict()
return None
# 定义 update 方法,用于更新知识库
def update(self, kb_id: str, cover_image_data: bytes = None,
cover_image_filename: str = None, delete_cover: bool = False, **kwargs) -> Optional[dict]:
"""
更新知识库
Args:
kb_id: 知识库ID
cover_image_data: 新的封面图片数据(可选)
cover_image_filename: 新的封面图片文件名(可选)
delete_cover: 是否删除封面图片(可选)
**kwargs: 要更新的字段(name, description, chunk_size, chunk_overlap 等)
Returns:
更新后的知识库字典,如果不存在则返回 None
"""
# 开启数据库事务
with self.transaction() as session:
# 查询指定ID的知识库对象
kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
# 如果未找到知识库,则返回 None
if not kb:
return None
# 处理封面图片更新
old_cover_path = kb.cover_image if kb.cover_image else None
if delete_cover:
# 删除封面图片
if old_cover_path:
try:
storage_service.delete_file(old_cover_path)
self.logger.info(f"已删除封面图片: {old_cover_path}")
except Exception as e:
self.logger.warning(f"删除封面图片时出错: {e}")
kwargs['cover_image'] = None
elif cover_image_data and cover_image_filename:
# 上传新封面图片
# 验证文件类型
file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
if not file_ext_without_dot:
raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")
if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")
# 验证文件大小
if len(cover_image_data) == 0:
raise ValueError("上传的图片文件为空")
if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")
try:
# 构建新封面图片路径(使用原始扩展名,保持大小写)
file_ext_with_dot = os.path.splitext(cover_image_filename)[1]
# 统一使用小写扩展名,避免路径不一致的问题
file_ext_with_dot = file_ext_with_dot.lower()
new_cover_path = f"covers/{kb_id}{file_ext_with_dot}"
self.logger.info(f"正在处理知识库 {kb_id} 的封面图片更新: 文件名={cover_image_filename}, 扩展名={file_ext_without_dot}, 大小={len(cover_image_data)} 字节, 新路径={new_cover_path}, 旧路径={old_cover_path}")
# 先上传新封面(确保上传成功后再删除旧封面)
storage_service.upload_file(new_cover_path, cover_image_data)
# 验证文件是否成功上传
if not storage_service.file_exists(new_cover_path):
raise ValueError(f"上传后文件不存在: {new_cover_path}")
self.logger.info(f"成功上传封面图片: {new_cover_path}")
# 删除旧封面(如果存在且与新封面路径不同)
if old_cover_path and old_cover_path != new_cover_path:
try:
storage_service.delete_file(old_cover_path)
self.logger.info(f"已删除旧封面图片: {old_cover_path}")
except Exception as e:
self.logger.warning(f"删除旧封面图片时出错: {e}")
# 继续执行,不因为删除旧文件失败而中断更新
# 更新数据库中的封面路径
kwargs['cover_image'] = new_cover_path
except Exception as e:
self.logger.error(f"上传知识库 {kb_id} 的封面图片时出错: {e}", exc_info=True)
raise ValueError(f"上传封面图片失败: {str(e)}")
# 遍历要更新的字段和值
for key, value in kwargs.items():
# 判断知识库对象是否有该字段,且值不为 None
if hasattr(kb, key) and (key == 'cover_image' or value is not None):
# 设置该字段的新值
setattr(kb, key, value)
# 刷新session,保证对象属性为最新状态
session.flush()
# 刷新对象,避免未提交前读取到旧数据
session.refresh(kb)
# 在事务内部将对象转为字典,避免 session 关闭后访问失败
kb_dict = kb.to_dict()
# 如果本次更新包含 'cover_image' 字段,记录详细日志
if 'cover_image' in kwargs:
self.logger.info(f"更新知识库 {kb_id}, 封面图片={kb_dict.get('cover_image')}")
else:
# 否则仅记录知识库ID
self.logger.info(f"更新知识库: {kb_id}")
# 返回更新后的知识库字典
return kb_dict
# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()
10.3. kb_list.html #
app/templates/kb_list.html
{% extends "base.html" %}
{% block title %}知识库管理 - RAG Lite{% endblock %}
{% block content %}
<style>
@media (min-width: 992px) {
#kbList > div {
flex: 0 0 20%;
max-width: 20%;
}
}
</style>
<div class="row">
<div class="col-12">
<nav aria-label="breadcrumb" class="mb-3">
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href="/">首页</a></li>
<li class="breadcrumb-item active">知识库管理</li>
</ol>
</nav>
<div class="d-flex justify-content-between align-items-center mb-4">
<h2><i class="bi bi-collection"></i> 知识库管理</h2>
<button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
<i class="bi bi-plus-circle"></i> 创建知识库
</button>
</div>
+ <!-- 搜索和排序工具栏 -->
+ <div class="card mb-4">
+ <div class="card-body">
+ <form method="GET" action="/kb" id="searchForm" class="row g-3 align-items-end">
+ <div class="col-md-6">
+ <label for="searchInput" class="form-label">搜索</label>
+ <div class="input-group">
+ <span class="input-group-text"><i class="bi bi-search"></i></span>
+ <input type="text" class="form-control" id="searchInput" name="search"
+ placeholder="搜索知识库名称或描述..." value="{{ search }}">
+ {% if search %}
+ <button type="button" class="btn btn-outline-secondary" onclick="clearSearch()">
+ <i class="bi bi-x"></i>
+ </button>
+ {% endif %}
+ </div>
+ </div>
+ <div class="col-md-3">
+ <label for="sortBySelect" class="form-label">排序字段</label>
+ <select class="form-select" id="sortBySelect" name="sort_by" onchange="updateSearch()">
+ <option value="created_at" {% if sort_by == 'created_at' %}selected{% endif %}>创建时间</option>
+ <option value="name" {% if sort_by == 'name' %}selected{% endif %}>名称</option>
+ <option value="updated_at" {% if sort_by == 'updated_at' %}selected{% endif %}>更新时间</option>
+ </select>
+ </div>
+ <div class="col-md-2">
+ <label for="sortOrderSelect" class="form-label">排序方向</label>
+ <select class="form-select" id="sortOrderSelect" name="sort_order" onchange="updateSearch()">
+ <option value="desc" {% if sort_order == 'desc' %}selected{% endif %}>降序</option>
+ <option value="asc" {% if sort_order == 'asc' %}selected{% endif %}>升序</option>
+ </select>
+ </div>
+ <div class="col-md-1">
+ <button type="submit" class="btn btn-primary w-100">
+ <i class="bi bi-search"></i> 搜索
+ </button>
+ </div>
+ <input type="hidden" name="page" value="1">
+ <input type="hidden" name="page_size" value="{{ pagination.page_size if pagination else 10 }}">
+ </form>
+ </div>
+ </div>
<!-- 知识库列表 -->
<div class="row" id="kbList">
{% if kbs %}
{% for kb in kbs %}
<div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
<div class="card h-100">
{% if kb.cover_image %}
<img src="/kb/{{ kb.id }}/cover" class="card-img-top" alt="{{ kb.name|e }}" style="height: 150px; object-fit: scale-down;">
{% else %}
<div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
<i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
</div>
{% endif %}
<div class="card-body">
<h5 class="card-title">
<i class="bi bi-folder"></i> {{ kb.name }}
</h5>
<p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
</div>
<div class="card-footer bg-transparent">
<button class="btn btn-sm btn-warning"
data-kb-id="{{ kb.id }}"
data-kb-name="{{ kb.name }}"
data-kb-description="{{ kb.description or '' }}"
data-kb-chunk-size="{{ kb.chunk_size }}"
data-kb-chunk-overlap="{{ kb.chunk_overlap }}"
data-kb-cover-image="{{ kb.cover_image or '' }}"
onclick="editKbFromButton(this)">
<i class="bi bi-pencil"></i> 编辑
</button>
<button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
<i class="bi bi-trash"></i> 删除
</button>
</div>
</div>
</div>
{% endfor %}
{% else %}
<div class="col-12">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
</div>
</div>
{% endif %}
</div>
<!-- 分页控件 -->
{% if pagination and pagination.total > pagination.page_size %}
<nav aria-label="知识库列表分页" class="mt-4">
<ul class="pagination justify-content-center">
{% set current_page = pagination.page %}
{% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}
<!-- 上一页 -->
<li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
<i class="bi bi-chevron-left"></i> 上一页
</a>
</li>
<!-- 页码 -->
{% set start_page = [1, current_page - 2] | max %}
{% set end_page = [total_pages, current_page + 2] | min %}
{% if start_page > 1 %}
<li class="page-item">
<a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
</li>
{% if start_page > 2 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endif %}
{% for page_num in range(start_page, end_page + 1) %}
<li class="page-item {% if page_num == current_page %}active{% endif %}">
<a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
{{ page_num }}
</a>
</li>
{% endfor %}
{% if end_page < total_pages %}
{% if end_page < total_pages - 1 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
<li class="page-item">
<a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
</li>
{% endif %}
<!-- 下一页 -->
<li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
<a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
{% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
下一页 <i class="bi bi-chevron-right"></i>
</a>
</li>
</ul>
<div class="text-center text-muted small mt-2">
共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
</div>
</nav>
{% endif %}
</div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">创建知识库</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
<div class="modal-body">
<div class="mb-3">
<label class="form-label">名称 <span class="text-danger">*</span></label>
<input type="text" class="form-control" name="name" required>
</div>
<div class="mb-3">
<label class="form-label">描述</label>
<textarea class="form-control" name="description" rows="3"></textarea>
</div>
<div class="mb-3">
<label class="form-label">封面图片(可选)</label>
<input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="coverImageInput">
<div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB</div>
<div id="coverImagePreview" class="mt-2" style="display: none;">
<img id="coverPreviewImg" src="" alt="封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
</div>
</div>
<div class="row">
<div class="col-md-6 mb-3">
<label class="form-label">分块大小</label>
<input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
<div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
</div>
<div class="col-md-6 mb-3">
<label class="form-label">分块重叠</label>
<input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
<div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="submit" class="btn btn-primary">创建</button>
</div>
</form>
</div>
</div>
</div>
<!-- 编辑知识库模态框 -->
<div class="modal fade" id="editKbModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">编辑知识库</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<form id="editKbForm" onsubmit="updateKb(event)" enctype="multipart/form-data">
<input type="hidden" name="kb_id" id="editKbId">
<div class="modal-body">
<div class="mb-3">
<label class="form-label">名称 <span class="text-danger">*</span></label>
<input type="text" class="form-control" name="name" id="editKbName" required>
</div>
<div class="mb-3">
<label class="form-label">描述</label>
<textarea class="form-control" name="description" id="editKbDescription" rows="3"></textarea>
</div>
<div class="mb-3">
<label class="form-label">封面图片</label>
<div id="editCoverPreview" class="mb-2">
<img id="editCoverPreviewImg" src="" alt="当前封面" class="img-thumbnail" style="max-width: 200px; max-height: 200px; display: none;">
<div id="editCoverNoImage" class="text-muted small" style="display: none;">暂无封面</div>
</div>
<input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="editCoverImageInput">
<div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB。留空则不修改封面。</div>
<div class="form-check mt-2">
<input class="form-check-input" type="checkbox" name="delete_cover" id="editDeleteCover" value="true">
<label class="form-check-label" for="editDeleteCover">
删除封面图片
</label>
</div>
<div id="editCoverNewPreview" class="mt-2" style="display: none;">
<img id="editCoverNewPreviewImg" src="" alt="新封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
</div>
</div>
<div class="row">
<div class="col-md-6 mb-3">
<label class="form-label">分块大小</label>
<input type="number" class="form-control" name="chunk_size" id="editKbChunkSize" value="512" min="100" max="2000">
<div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
</div>
<div class="col-md-6 mb-3">
<label class="form-label">分块重叠</label>
<input type="number" class="form-control" name="chunk_overlap" id="editKbChunkOverlap" value="50" min="0" max="200">
<div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="submit" class="btn btn-primary">保存</button>
</div>
</form>
</div>
</div>
</div>
{% endblock %}
{% block extra_js %}
<script>
// 异步函数,用于创建知识库
async function createKb(event) {
// 阻止表单默认提交
event.preventDefault();
// 获取表单对象
const form = event.target;
// 构造 FormData,收集表单数据
const formData = new FormData(form);
try {
// 发送 POST 请求到后端 API,提交表单数据
const response = await fetch('/api/v1/kb', {
method: 'POST',
// body为FormData,浏览器会自动设置Content-Type
body: formData
});
// 如果响应成功,刷新页面
if (response.ok) {
location.reload();
} else {
// 否则获取错误信息并弹窗提示
const error = await response.json();
alert('创建失败: ' + error.message);
}
} catch (error) {
// 捕获异常并弹窗提示用户
alert('创建失败: ' + error.message);
}
}
// 异步函数,用于删除知识库
async function deleteKb(kbId, kbName) {
// 弹窗确认是否删除知识库
if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
return;
}
try {
// 发送 DELETE 请求到后端 API
const response = await fetch(`/api/v1/kb/${kbId}`, {
method: 'DELETE'
});
// 如果响应成功,刷新页面
if (response.ok) {
location.reload();
} else {
// 否则弹窗提示错误信息
const error = await response.json();
alert('删除失败: ' + error.message);
}
} catch (error) {
// 捕获异常并弹窗提示
alert('删除失败: ' + error.message);
}
}
// 从按钮的 data 属性读取知识库数据,然后打开编辑界面
function editKbFromButton(button) {
// 获取知识库ID
const kbId = button.getAttribute('data-kb-id');
// 获取知识库名称
const name = button.getAttribute('data-kb-name');
// 获取描述,默认为空字符串
const description = button.getAttribute('data-kb-description') || '';
// 获取分块大小,默认512
const chunkSize = parseInt(button.getAttribute('data-kb-chunk-size')) || 512;
// 获取分块重叠,默认50
const chunkOverlap = parseInt(button.getAttribute('data-kb-chunk-overlap')) || 50;
// 获取封面图片路径,默认为空
const coverImage = button.getAttribute('data-kb-cover-image') || '';
// 调用编辑函数,填充数据到表单
editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage);
}
// 编辑知识库时弹出模态框并初始化数据
function editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage) {
// 设置表单的知识库ID
document.getElementById('editKbId').value = kbId;
// 设置知识库名称
document.getElementById('editKbName').value = name;
// 设置描述
document.getElementById('editKbDescription').value = description || '';
// 设置分块大小
document.getElementById('editKbChunkSize').value = chunkSize;
// 设置分块重叠
document.getElementById('editKbChunkOverlap').value = chunkOverlap;
// 初始化不勾选删除封面
document.getElementById('editDeleteCover').checked = false;
// 清空已选择的新封面文件
document.getElementById('editCoverImageInput').value = '';
// 获取当前封面预览图片元素
const previewImg = document.getElementById('editCoverPreviewImg');
// 获取“暂无封面”提示元素
const noImageDiv = document.getElementById('editCoverNoImage');
// 获取新封面预览div
const newPreview = document.getElementById('editCoverNewPreview');
// 获取新封面预览图片元素
const newPreviewImg = document.getElementById('editCoverNewPreviewImg');
// 隐藏新图片预览
if (newPreview) {
newPreview.style.display = 'none';
}
// 清空新图片预览src
if (newPreviewImg) {
newPreviewImg.src = '';
}
// 如果有旧封面,则显示
if (coverImage) {
previewImg.src = `/kb/${kbId}/cover`;
previewImg.style.display = 'block';
noImageDiv.style.display = 'none';
} else {
// 否则显示“暂无封面”
previewImg.style.display = 'none';
noImageDiv.style.display = 'block';
}
// 展示编辑知识库的模态框
const modal = new bootstrap.Modal(document.getElementById('editKbModal'));
modal.show();
}
// 获取编辑时选择封面图片的input
const editCoverImageInput = document.getElementById('editCoverImageInput');
// 如果找到了input,则监听change事件
if (editCoverImageInput) {
editCoverImageInput.addEventListener('change', function(e) {
// 获取用户选择的第一个文件
const file = e.target.files[0];
// 获取新封面预览容器
const newPreview = document.getElementById('editCoverNewPreview');
// 获取新封面预览图片元素
const newPreviewImg = document.getElementById('editCoverNewPreviewImg');
// 获取删除封面复选框
const deleteCheckbox = document.getElementById('editDeleteCover');
// 如果有选文件
if (file) {
// 定义允许的图片类型
const validTypes = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif', 'image/webp'];
// 如果不符合要求的格式,弹窗提示并重置
if (!validTypes.includes(file.type)) {
alert('不支持的图片格式,请选择 JPG、PNG、GIF 或 WEBP 格式的图片');
e.target.value = '';
if (newPreview) {
newPreview.style.display = 'none';
}
return;
}
// 如果图片超过5MB,弹窗提示并重置
if (file.size > 5 * 1024 * 1024) {
alert('图片文件大小超过 5MB 限制');
e.target.value = '';
if (newPreview) {
newPreview.style.display = 'none';
}
return;
}
// 文件读取器,用于预览图片
const reader = new FileReader();
// 文件读取完成后显示预览
reader.onload = function(event) {
if (newPreviewImg) {
newPreviewImg.src = event.target.result;
}
if (newPreview) {
newPreview.style.display = 'block';
}
// 选择新图片时自动取消删除封面的选项
if (deleteCheckbox) {
deleteCheckbox.checked = false;
}
};
// 读取图片失败时弹窗提示
reader.onerror = function() {
alert('读取图片文件失败,请重试');
e.target.value = '';
if (newPreview) {
newPreview.style.display = 'none';
}
};
// 以DataURL形式读取以便预览
reader.readAsDataURL(file);
} else {
// 未选择文件则隐藏新封面预览
if (newPreview) {
newPreview.style.display = 'none';
}
}
});
}
// 监听删除封面复选框的变化
document.getElementById('editDeleteCover')?.addEventListener('change', function(e) {
// 获取封面图片上传input
const fileInput = document.getElementById('editCoverImageInput');
// 获取新封面预览div
const newPreview = document.getElementById('editCoverNewPreview');
// 如果选中“删除封面”,则清空图片和隐藏新封面预览
if (e.target.checked) {
fileInput.value = ''; // 清空文件选择
newPreview.style.display = 'none';
}
});
// 异步函数,用于更新知识库
async function updateKb(event) {
// 阻止表单默认提交
event.preventDefault();
// 获取表单对象
const form = event.target;
// 获取表单数据
const formData = new FormData(form);
// 从formData中获取知识库ID
const kbId = formData.get('kb_id');
try {
// 发送PUT请求到后端API更新知识库
const response = await fetch(`/api/v1/kb/${kbId}`, {
method: 'PUT',
body: formData
});
// 如果更新成功,刷新页面
if (response.ok) {
location.reload();
} else {
// 否则弹窗显示错误
const error = await response.json();
alert('更新失败: ' + error.message);
}
} catch (error) {
// 捕获异常弹窗显示
alert('更新失败: ' + error.message);
}
}
// 获取封面图片文件输入框并监听change事件(当用户选择文件时触发)
document.getElementById('coverImageInput')?.addEventListener('change', function(e) {
// 获取用户选中的第一个文件
const file = e.target.files[0];
// 获取用于显示预览的容器
const preview = document.getElementById('coverImagePreview');
// 获取显示图片的img标签
const previewImg = document.getElementById('coverPreviewImg');
// 如果用户选择了文件
if (file) {
// 创建文件读取器
const reader = new FileReader();
// 文件读取完成后回调
reader.onload = function(e) {
// 将img标签的src设置为读取得到的图片数据
previewImg.src = e.target.result;
// 显示预览容器
preview.style.display = 'block';
};
// 以DataURL的形式读取图片文件
reader.readAsDataURL(file);
} else {
// 如果没有选择文件,则隐藏预览
preview.style.display = 'none';
}
});
+// 搜索和排序功能
+function updateSearch() {
+ document.getElementById('searchForm').submit();
+}
+function clearSearch() {
+ document.getElementById('searchInput').value = '';
+ // 移除搜索参数,保留排序参数
+ const url = new URL(window.location.href);
+ url.searchParams.delete('search');
+ url.searchParams.set('page', '1');
+ window.location.href = url.toString();
+}
+// 搜索框回车提交
+document.getElementById('searchInput')?.addEventListener('keypress', function(e) {
+ if (e.key === 'Enter') {
+ e.preventDefault();
+ updateSearch();
+ }
+});
</script>
{% endblock %}