导航菜单

  • 1.langchain.intro
  • 2.langchain.chat_models
  • 3.langchain.prompts
  • 4.langchain.example_selectors
  • 5.output_parsers
  • 6.Runnable
  • 7.memory
  • 8.document_loaders
  • 9.text_splitters
  • 10.embeddings
  • 11.tool
  • 12.retrievers
  • 13.optimize
  • 14.项目介绍
  • 15.启动HTTP
  • 16.数据与模型
  • 17.权限管理
  • 18.知识库管理
  • 19.设置
  • 20.文档管理
  • 21.聊天
  • 22.API文档
  • 23.RAG优化
  • 24.索引时优化
  • 25.检索前优化
  • 26.检索后优化
  • 27.系统优化
  • 28.GraphRAG
  • 29.图
  • 30.为什么选择图数据库
  • 31.什么是 Neo4j
  • 32.安装和连接 Neo4j
  • 33.Neo4j核心概念
  • 34.Cypher基础
  • 35.模式匹配
  • 36.数据CRUD操作
  • 37.GraphRAG
  • 38.查询和过滤
  • 39.结果处理和聚合
  • 40.语句组合
  • 41.子查询
  • 42.模式和约束
  • 43.日期时间处理
  • 44.Cypher内置函数
  • 45.Python操作Neo4j
  • 46.neo4j
  • 47.py2neo
  • 48.Streamlit
  • 49.Pandas
  • 50.graphRAG
  • 51.deepdoc
  • 52.deepdoc
  • 53.deepdoc
  • 55.deepdoc
  • 54.deepdoc
  • Pillow
  • 1.本章目标
  • 2.目录结构
  • 3.创建知识库
    • 3.1. knowledgebase.py
    • 3.2. utils.py
    • 3.3. knowledgebase_service.py
    • 3.4. init.py
  • 4.知识库列表
    • 4.1. kb_list.html
    • 4.2. knowledgebase.py
    • 4.3. utils.py
    • 4.4. knowledgebase_service.py
    • 4.5. auth.py
  • 5.创建知识库
    • 5.1. knowledgebase.py
    • 5.2. utils.py
    • 5.3. kb_list.html
  • 6.删除知识库
    • 6.1. knowledgebase.py
    • 6.2. utils.py
    • 6.3. base_service.py
    • 6.4. knowledgebase_service.py
    • 6.5. kb_list.html
  • 7.更新知识库
    • 7.1. knowledgebase.py
    • 7.2. knowledgebase_service.py
    • 7.3. kb_list.html
  • 8.知识库封面
    • 8.1. storage_service.py
    • 8.2. knowledgebase.py
    • 8.3. config.py
    • 8.4. knowledgebase_service.py
    • 8.5. kb_list.html
    • 8.6 storage__init__.py
    • 8.7 storage\base.py
    • 8.8 storage\factory.py
    • 8.9 local_storage.py
    • 8.10. config.py
  • 9.保存封面到Minio
    • 9.1. factory.py
    • 9.2. minio_storage.py
    • 9.3. .env
    • 9.4. config.py
  • 10.搜索和排序
    • 10.1. knowledgebase.py
    • 10.2. knowledgebase_service.py
    • 10.3. kb_list.html

1.本章目标 #

本章将介绍如何在 rag-lite 项目中管理和扩展知识库。你将学习知识库的基本目录结构、核心模型与服务的作用,以及知识库存储扩展(如本地存储与 MinIO)的配置和使用方法。通过本章,你可以掌握知识库的基本操作流程和自定义能力,为后续集成与智能问答打下基础。

2.目录结构 #

# 项目根目录
rag-lite/
    # 应用目录
    ├── app/
        # 蓝图目录
        │   ├── blueprints/
        # 蓝图初始化文件
        │   │   ├── __init__.py
        # 认证相关蓝图
        │   │   ├── auth.py
        # 知识库相关蓝图
        │   │   ├── knowledgebase.py
        # 通用工具蓝图
        │   │   └── utils.py
        # 模型目录
        │   ├── models/
        # 模型初始化文件
        │   │   ├── __init__.py
        # 基础模型
        │   │   ├── base.py
        # 聊天消息模型
        │   │   ├── chat_message.py
        # 聊天会话模型
        │   │   ├── chat_session.py
        # 文档模型
        │   │   ├── document.py
        # 知识库模型
        │   │   ├── knowledgebase.py
        # 设置模型
        │   │   ├── settings.py
        # 用户模型
        │   │   └── user.py
        # 服务层目录
        │   ├── services/
            # 存储相关服务
            │   ├── storage/
                # 存储初始化文件
                │   │   ├── __init__.py
                # 存储基类
                │   │   ├── base.py
                # 存储工厂
                │   │   ├── factory.py
                # 本地存储实现
                │   │   ├── local_storage.py
                # minio 存储实现
                │   │   └── minio_storage.py
            # 服务层基类
            │   ├── base_service.py
            # 知识库服务
            │   ├── knowledgebase_service.py
            # 存储服务
            │   ├── storage_service.py
            # 用户服务
            │   └── user_service.py
        # 静态文件目录
        │   ├── static/
        # 模板文件目录
        │   ├── templates/
            # 公共页面模板
            │   ├── base.html
            # 主页模板
            │   ├── home.html
            # 知识库列表模板
            │   ├── kb_list.html
            # 登录页面模板
            │   ├── login.html
            # 注册页面模板
            │   └── register.html
        # 工具函数目录
        │   ├── utils/
            # 认证工具
            │   ├── auth.py
            # 数据库工具
            │   ├── db.py
            # 日志工具
            │   └── logger.py
        # 应用初始化文件
        │   ├── __init__.py
        # 应用配置文件
        │   └── config.py
    # 日志文件目录
    ├── logs/
        # 主日志文件
        │   └── rag_lite.log
    # 存储文件目录
    ├── storages/
        # 封面图片或其他资源
        │   └── covers/
    # 启动入口
    ├── main.py
    # 规划文档
    ├── plan.md
    # Python 构建配置文件
    └── pyproject.toml

3.创建知识库 #

3.1. knowledgebase.py #

app/blueprints/knowledgebase.py

# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""

# 导入Flask中的Blueprint和request
from flask import Blueprint,request
# 导入logging模块
import logging

# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 配置logger
logger = logging.getLogger(__name__)

# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)

# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
def api_create():
    # 设置接口功能描述
    """创建知识库"""
    # 从请求中解析JSON数据
    data = request.get_json()
    # 校验数据是否存在以及name键是否存在
    if not data or 'name' not in data:
        # 若校验失败,返回错误响应
        return error_response("name is required", 400)

    # 获取知识库名称
    name = data['name']
    # 获取用户id,可为空
    user_id = data.get('user_id')
    # 获取知识库描述,可为空
    description = data.get('description')
    # 获取分块大小,默认为512
    chunk_size = data.get('chunk_size', 512)
    # 获取分块重叠,默认为50
    chunk_overlap = data.get('chunk_overlap', 50)

    # 调用知识库服务创建知识库,返回信息字典
    kb_dict = kb_service.create(
        name=name,
        user_id=user_id,
        description=description,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )

    # 返回创建成功的知识库信息
    return success_response(kb_dict)

3.2. utils.py #

app/blueprints/utils.py

"""
路由工具函数
"""

# 导入Flask用于返回JSON响应
from flask import jsonify
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)

# 定义成功响应函数
def success_response(data=None, message="success"):
    """
    成功响应

    Args:
        data: 响应数据
        message: 响应消息

    Returns:
        JSON 响应
    """
    # 返回标准格式的JSON成功响应
    return jsonify({
        "code": 200,       # 状态码200,表示成功
        "message": message, # 响应消息
        "data": data        # 响应数据
    })

# 定义错误响应函数
def error_response(message: str, code: int = 400):
    """
    错误响应

    Args:
        message: 错误消息
        code: HTTP 状态码

    Returns:
        JSON 响应和状态码
    """
    # 返回标准格式的JSON错误响应,以及相应的HTTP状态码
    return jsonify({
        "code": code,        # 错误码,对应HTTP状态码
        "message": message,  # 错误消息
        "data": None         # 错误时无数据
    }), code

# 定义API错误处理装饰器
def handle_api_error(func):
    """
    API 错误处理装饰器

    使用示例:
        @handle_api_error
        def my_api():
            # API 逻辑
            return success_response(data)
    """
    # 保留原函数信息并定义包装器
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            # 正常执行被装饰的API函数
            return func(*args, **kwargs)
        except ValueError as e:
            # 捕获ValueError,日志记录warning信息并返回400错误响应
            logger.warning(f"ValueError in {func.__name__}: {e}")
            return error_response(str(e), 400)
        except Exception as e:
            # 捕获其他所有异常,日志记录error信息并返回500错误响应
            logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
            return error_response(str(e), 500)
    # 返回包装后的函数
    return wrapper

3.3. knowledgebase_service.py #

app/services/knowledgebase_service.py


# 从基础服务导入BaseService类
from app.services.base_service import BaseService

# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase

# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
    """知识库服务"""
    # 定义创建知识库的方法
    def create(self, name: str, user_id: str, description: str = None, 
               chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
        """
        创建知识库

        Args:
            name: 知识库名称
            user_id: 用户ID
            description: 描述
            chunk_size: 分块大小
            chunk_overlap: 分块重叠

        Returns:
            创建的知识库字典
        """
        # 启动数据库事务,上下文管理器自动处理提交或回滚
        with self.transaction() as session:
            # 先创建知识库对象
            kb = Knowledgebase(
                name=name,  # 设置知识库名称
                user_id=user_id,  # 设置用户ID
                description=description,  # 设置知识库描述
                chunk_size=chunk_size,  # 设置分块大小
                chunk_overlap=chunk_overlap  # 设置分块重叠
            )
            # 将知识库对象添加到session
            session.add(kb)
            # 刷新session,生成知识库ID
            session.flush()  # 刷新以获取 ID,但不提交
            # 刷新kb对象的数据库状态
            session.refresh(kb)
            # 转换kb对象为字典(在session内部,避免分离后出错)
            kb_dict = kb.to_dict()
            # 记录创建知识库的日志,包含ID
            self.logger.info(f"创建了知识库,ID: {kb.id}")
            # 返回知识库字典信息
            return kb_dict

# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()    

3.4. init.py #

app/init.py

# RAG Lite 应用模块说明
"""
RAG Lite Application
"""

# 导入操作系统相关模块
import os
# 从 Flask 包导入 Flask 应用对象
from flask import Flask
# 导入 Flask 跨域资源共享支持
from flask_cors import CORS
# 导入应用配置类
from app.config import Config
# 导入日志工具,用于获取日志记录器
from app.utils.logger import get_logger
# 导入数据库初始化函数
from app.utils.db import init_db
# 导入蓝图模块
+from app.blueprints import auth,knowledgebase
# 导入获取当前用户信息函数
from app.utils.auth import get_current_user
# 定义创建 Flask 应用的工厂函数
def create_app(config_class=Config):
    # 获取日志记录器,名称为当前模块名
    logger = get_logger(__name__)
    # 尝试初始化数据库
    try:
        # 输出日志,表示即将初始化数据库
        logger.info("初始化数据库...")
        # 执行数据库初始化函数
        init_db()
        # 输出日志,表示数据库初始化成功
        logger.info("数据库初始化成功")
    # 捕获任意异常
    except Exception as e:
        # 输出警告日志,提示数据库初始化失败,并输出异常信息
        logger.warning(f"数据库初始化失败: {e}")
        # 输出警告日志,提示检查数据库是否已存在,并建议手动创建数据表
        logger.warning("请确认数据库已存在,或手动创建数据表")

    # 创建 Flask 应用对象,并指定模板和静态文件目录
    base_dir = os.path.abspath(os.path.dirname(__file__))
    # 创建 Flask 应用对象,并指定模板和静态文件目录
    app = Flask(
        __name__,
        # 指定模板文件目录
        template_folder=os.path.join(base_dir, 'templates'),
        # 指定静态文件目录
        static_folder=os.path.join(base_dir, 'static')
    )
    # 从给定配置类加载配置信息到应用
    app.config.from_object(config_class)

    # 启用跨域请求支持
    CORS(app)

    # 记录应用创建日志信息
    logger.info("Flask 应用已创建")

    # 注册上下文处理器,使 current_user 在所有模板中可用
    @app.context_processor
    def inject_user():
        # 返回当前用户信息字典
        # 使用 get_current_user 获取当前用户信息,并将其添加到上下文字典中
        # 这样在模板中可以直接使用 current_user 变量
        return dict(current_user=get_current_user())

    # 注册蓝图
    app.register_blueprint(auth.bp)
    # 注册知识库蓝图
+   app.register_blueprint(knowledgebase.bp)
    # 定义首页路由
    @app.route('/')
    def index():
        return "Hello, World!"

    # 返回已配置的 Flask 应用对象
    return app

4.知识库列表 #

4.1. kb_list.html #

app/templates/kb_list.html

{% extends "base.html" %}

{% block title %}知识库管理 - RAG Lite{% endblock %}

{% block content %}
<style>
@media (min-width: 992px) {
    #kbList > div {
        flex: 0 0 20%;
        max-width: 20%;
    }
}
</style>
<div class="row">
    <div class="col-12">
        <nav aria-label="breadcrumb" class="mb-3">
            <ol class="breadcrumb">
                <li class="breadcrumb-item"><a href="/">首页</a></li>
                <li class="breadcrumb-item active">知识库管理</li>
            </ol>
        </nav>

        <div class="d-flex justify-content-between align-items-center mb-4">
            <h2><i class="bi bi-collection"></i> 知识库管理</h2>
        </div>
        <!-- 知识库列表 -->
        <div class="row" id="kbList">
            {% if kbs %}
                {% for kb in kbs %}
                <div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
                    <div class="card h-100">
                        <div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
                            <i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
                        </div>
                        <div class="card-body">
                            <h5 class="card-title">
                                <i class="bi bi-folder"></i> {{ kb.name }}
                            </h5>
                            <p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
                        </div>
                    </div>
                </div>
                {% endfor %}
            {% else %}
                <div class="col-12">
                    <div class="alert alert-info">
                        <i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
                    </div>
                </div>
            {% endif %}
        </div>

        <!-- 分页控件 -->
        {% if pagination and pagination.total > pagination.page_size %}
        <nav aria-label="知识库列表分页" class="mt-4">
            <ul class="pagination justify-content-center">
                {% set current_page = pagination.page %}
                {% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}

                <!-- 上一页 -->
                <li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}" 
                       {% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
                        <i class="bi bi-chevron-left"></i> 上一页
                    </a>
                </li>

                <!-- 页码 -->
                {% set start_page = [1, current_page - 2] | max %}
                {% set end_page = [total_pages, current_page + 2] | min %}

                {% if start_page > 1 %}
                <li class="page-item">
                    <a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
                </li>
                {% if start_page > 2 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                {% endif %}

                {% for page_num in range(start_page, end_page + 1) %}
                <li class="page-item {% if page_num == current_page %}active{% endif %}">
                    <a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
                        {{ page_num }}
                    </a>
                </li>
                {% endfor %}

                {% if end_page < total_pages %}
                {% if end_page < total_pages - 1 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                <li class="page-item">
                    <a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
                </li>
                {% endif %}

                <!-- 下一页 -->
                <li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
                       {% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
                        下一页 <i class="bi bi-chevron-right"></i>
                    </a>
                </li>
            </ul>
            <div class="text-center text-muted small mt-2">
                共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
            </div>
        </nav>
        {% endif %}
    </div>
</div>

{% endblock %}

{% block extra_js %}
<script>

</script>
{% endblock %}

4.2. knowledgebase.py #

app/blueprints/knowledgebase.py

# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""

# 导入Flask中的Blueprint和request
+from flask import Blueprint,request,render_template
# 导入logging模块
import logging

# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
+from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
+from app.blueprints.utils import (get_pagination_params)
# 配置logger
logger = logging.getLogger(__name__)

# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)

# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
+@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
def api_create():
    # 设置接口功能描述
    """创建知识库"""
    # 从请求中解析JSON数据
    data = request.get_json()
    # 校验数据是否存在以及name键是否存在
    if not data or 'name' not in data:
        # 若校验失败,返回错误响应
        return error_response("name is required", 400)

    # 获取知识库名称
    name = data['name']
    # 获取用户id,可为空
    user_id = data.get('user_id')
    # 获取知识库描述,可为空
    description = data.get('description')
    # 获取分块大小,默认为512
    chunk_size = data.get('chunk_size', 512)
    # 获取分块重叠,默认为50
    chunk_overlap = data.get('chunk_overlap', 50)

    # 调用知识库服务创建知识库,返回信息字典
    kb_dict = kb_service.create(
+       name=name,#知识库名称
+       user_id=user_id,#用户ID
+       description=description,#知识库描述
+       chunk_size=chunk_size,#分块大小
+       chunk_overlap=chunk_overlap,#分块重叠
    )

    # 返回创建成功的知识库信息
    return success_response(kb_dict)



# 注册'/kb'路由,处理GET请求,显示知识库列表页面
+@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
+@login_required
# 定义kb_list函数,渲染知识库列表页面
+def kb_list():
    # 设置本函数用途说明(文档字符串)
+   """知识库列表页面"""
    # 获取当前登录用户信息
+   current_user = get_current_user()
    # 获取分页参数(页码和每页大小),最大每页100
+   page, page_size = get_pagination_params(max_page_size=100)
    # 调用知识库服务,获取分页后的知识库列表结果
+   result = kb_service.list(
+       user_id=current_user['id'], # 用户ID
+       page=page, # 页码
+       page_size=page_size # 每页大小
+   )
    # 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
+   return render_template('kb_list.html', 
+                        kbs=result['items'],
+                        pagination=result)    

4.3. utils.py #

app/blueprints/utils.py

"""
路由工具函数
"""

# 导入Flask用于返回JSON响应
+from flask import jsonify,request
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入类型提示工具
+from typing import Tuple, Optional
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)

# 定义成功响应函数
def success_response(data=None, message="success"):
    """
    成功响应

    Args:
        data: 响应数据
        message: 响应消息

    Returns:
        JSON 响应
    """
    # 返回标准格式的JSON成功响应
    return jsonify({
        "code": 200,       # 状态码200,表示成功
        "message": message, # 响应消息
        "data": data        # 响应数据
    })

# 定义错误响应函数
def error_response(message: str, code: int = 400):
    """
    错误响应

    Args:
        message: 错误消息
        code: HTTP 状态码

    Returns:
        JSON 响应和状态码
    """
    # 返回标准格式的JSON错误响应,以及相应的HTTP状态码
    return jsonify({
        "code": code,        # 错误码,对应HTTP状态码
        "message": message,  # 错误消息
        "data": None         # 错误时无数据
    }), code

# 定义API错误处理装饰器
def handle_api_error(func):
    """
    API 错误处理装饰器

    使用示例:
        @handle_api_error
        def my_api():
            # API 逻辑
            return success_response(data)
    """
    # 保留原函数信息并定义包装器
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            # 正常执行被装饰的API函数
            return func(*args, **kwargs)
        except ValueError as e:
            # 捕获ValueError,日志记录warning信息并返回400错误响应
            logger.warning(f"ValueError in {func.__name__}: {e}")
            return error_response(str(e), 400)
        except Exception as e:
            # 捕获其他所有异常,日志记录error信息并返回500错误响应
            logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
            return error_response(str(e), 500)
    # 返回包装后的函数
    return wrapper

# 定义获取分页参数的函数,允许指定最大每页数量
+def get_pagination_params(max_page_size: int = 1000) -> Tuple[int, int]:
+   """
+   获取分页参数

+   Args:
+       max_page_size: 最大每页数量

+   Returns:
+       (page, page_size) 元组
+   """
    # 获取请求中的 'page' 参数,默认为1,并将其转换为整数
+   page = int(request.args.get('page', 1))
    # 获取请求中的 'page_size' 参数,默认为10,并将其转换为整数
+   page_size = int(request.args.get('page_size', 10))

    # 保证 page 至少为1
+   page = max(1, page)
    # 保证 page_size 至少为1且不超过 max_page_size
+   page_size = max(1, min(page_size, max_page_size))

    # 返回分页的(page, page_size)元组
+   return page, page_size

4.4. knowledgebase_service.py #

app/services/knowledgebase_service.py

# 知识库服务
+"""
+知识库服务
+"""
# 导入类型提示工具
+from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService

# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase

# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
    """知识库服务"""
    # 定义创建知识库的方法
    def create(self, name: str, user_id: str, description: str = None, 
               chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
        """
        创建知识库

        Args:
            name: 知识库名称
            user_id: 用户ID
            description: 描述
            chunk_size: 分块大小
            chunk_overlap: 分块重叠

        Returns:
            创建的知识库字典
        """
        # 启动数据库事务,上下文管理器自动处理提交或回滚
        with self.transaction() as session:
            # 先创建知识库对象
            kb = Knowledgebase(
                name=name,  # 设置知识库名称
                user_id=user_id,  # 设置用户ID
                description=description,  # 设置知识库描述
                chunk_size=chunk_size,  # 设置分块大小
                chunk_overlap=chunk_overlap  # 设置分块重叠
            )
            # 将知识库对象添加到session
            session.add(kb)
            # 刷新session,生成知识库ID
            session.flush()  # 刷新以获取 ID,但不提交
            # 刷新kb对象的数据库状态
            session.refresh(kb)
            # 转换kb对象为字典(在session内部,避免分离后出错)
            kb_dict = kb.to_dict()
            # 记录创建知识库的日志,包含ID
            self.logger.info(f"创建了知识库,ID: {kb.id}")
            # 返回知识库字典信息
            return kb_dict

    # 定义获取知识库列表的方法
+   def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
+       """
+       获取知识库列表

+       Args:
+           user_id: 用户ID(可选)
+           page: 页码
+           page_size: 每页数量

+       Returns:
+           包含 items, total, page, page_size 的字典
+       """
        # 使用数据库会话
+       with self.session() as session:
            # 查询Knowledgebase表
+           query = session.query(Knowledgebase)
            # 如果指定了user_id,则筛选属于该用户的知识库
+           if user_id:
+               query = query.filter(Knowledgebase.user_id == user_id)
            # 统计总记录数
+           total = query.count()
            # 计算分页偏移量
+           offset = (page - 1) * page_size
            # 获取当前页的数据列表
+           kbs = query.offset(offset).limit(page_size).all()

            # 初始化知识库字典列表
+           items = []
            # 遍历查询结果,将每一项转为dict后添加到items列表
+           for kb in kbs:
+               kb_dict = kb.to_dict()
+               items.append(kb_dict)

            # 返回包含分页信息和数据条目的字典
+           return {
+               'items': items,
+               'total': total,
+               'page': page,
+               'page_size': page_size
+           }        

# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()    

4.5. auth.py #

app/utils/auth.py

# 认证工具
"""
认证工具
"""
# 从 Flask 导入 session 和 g(全局对象)
+from flask import session, redirect, url_for, request, g
# 导入装饰器工具,用来保持原函数信息
+from functools import wraps
# 导入日志库 logging
import logging
# 导入 user_service 服务
from app.services.user_service import user_service
# 获取当前模块的日志记录器实例
logger = logging.getLogger(__name__)

# 定义获取当前登录用户信息的函数
def get_current_user():
    """
    获取当前登录用户信息
    使用 Flask 的 g 对象缓存,避免重复查询

    Returns:
        用户信息字典,如果未登录则返回 None
    """
    # 如果 g 对象没有 current_user 属性
    if not hasattr(g, 'current_user'):
        # 如果会话中有 user_id 字段
        if 'user_id' in session:
            # 通过 user_id 获取用户信息并缓存到 g.current_user
            g.current_user = user_service.get_by_id(session['user_id'])
        else:
            # 如果没有登录,将 g.current_user 设为 None
            g.current_user = None
    # 返回当前用户信息
    return g.current_user


# 定义 API 登录认证装饰器
+def api_login_required(f):
+   """
+   API 登录装饰器
+   用于 API 端点,返回 JSON 错误响应而不是重定向
+   """
    # 保持被装饰函数的元信息
+   @wraps(f)
    # 定义装饰后的函数
+   def decorated_function(*args, **kwargs):
        # 判断 session 中是否没有 user_id,未登录状态
+       if 'user_id' not in session:
            # 延迟导入 jsonify 防止循环引用
+           from flask import jsonify
            # 返回未授权的 JSON 错误响应和 401 状态码
+           return jsonify({
+               "code": 401,
+               "message": "Unauthorized",
+               "data": None
+           }), 401
        # 如果已登录,正常执行原函数
+       return f(*args, **kwargs)
    # 返回包装后的函数
+   return decorated_function


# 定义 API 登录认证装饰器
+def api_login_required(f):
+   """
+   API 登录装饰器
+   用于 API 端点,返回 JSON 错误响应而不是重定向
+   """
    # 使用 wraps 保持原函数的元信息
+   @wraps(f)
+   def decorated_function(*args, **kwargs):
        # 如果 session 中没有 user_id,说明未登录
+       if 'user_id' not in session:
            # 延迟导入 jsonify,避免循环引用
+           from flask import jsonify
            # 返回未授权的 JSON 响应,状态码 401
+           return jsonify({
+               "code": 401,
+               "message": "Unauthorized",
+               "data": None
+           }), 401
        # 已登录则继续执行原函数
+       return f(*args, **kwargs)
    # 返回包装后的函数
+   return decorated_function    


+def login_required(f):
+   """
+   登录装饰器
+   需要登录才能访问的页面使用此装饰器
+   """
+   @wraps(f)
+   def decorated_function(*args, **kwargs):
+       if 'user_id' not in session:
            # 保存原始请求的URL,登录后可以重定向回去
+           return redirect(url_for('auth.login', next=request.url))
+       return f(*args, **kwargs)
+   return decorated_function    

5.创建知识库 #

5.1. knowledgebase.py #

app/blueprints/knowledgebase.py

# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""

# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template
# 导入logging模块
import logging

# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
+from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params)
# 配置logger
logger = logging.getLogger(__name__)

# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)

# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
    # 接口用途说明文档字符串
    """创建知识库"""
    # 获取当前用户,如未登录则返回错误响应
+   current_user, err = get_current_user_or_error()
+   if err:
+       return err
    # 检查请求是否为multipart/form-data(用于文件上传的表单方式)
+   if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单数据中获取知识库名称
+       name = request.form.get('name')
        # 如果未传入name参数,返回错误
+       if not name:
+           return error_response("name is required", 400)
        # 获取描述字段,没有则为None
+       description = request.form.get('description') or None
        # 获取分块大小,默认为512
+       chunk_size = int(request.form.get('chunk_size', 512))
        # 获取分块重叠,默认为50
+       chunk_overlap = int(request.form.get('chunk_overlap', 50))
+   else:
        # 如果是json请求数据(向后兼容旧用法)
+       data = request.get_json()
        # 判断是否存在name字段,不存在则报错
+       if not data or 'name' not in data:
+           return error_response("name is required", 400)
        # 获取知识库名称
+       name = data['name']
        # 获取描述
+       description = data.get('description')
        # 获取分块大小,默认为512
+       chunk_size = data.get('chunk_size', 512)
        # 获取分块重叠,默认为50
+       chunk_overlap = data.get('chunk_overlap', 50)
    # 调用知识库服务,创建知识库,返回知识库信息字典
    kb_dict = kb_service.create(
+       name=name,                # 知识库名称
+       user_id=current_user['id'],   # 用户ID
+       description=description,      # 知识库描述
+       chunk_size=chunk_size,        # 分块大小
+       chunk_overlap=chunk_overlap,  # 分块重叠
    )
    # 返回成功响应,包含知识库信息
    return success_response(kb_dict)


# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
    # 设置本函数用途说明(文档字符串)
    """知识库列表页面"""
    # 获取当前登录用户信息
    current_user = get_current_user()
    # 获取分页参数(页码和每页大小),最大每页100
    page, page_size = get_pagination_params(max_page_size=100)
    # 调用知识库服务,获取分页后的知识库列表结果
    result = kb_service.list(
        user_id=current_user['id'], # 用户ID
        page=page, # 页码
        page_size=page_size # 每页大小
    )
    # 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
    return render_template('kb_list.html', 
                         kbs=result['items'],
                         pagination=result)    

5.2. utils.py #

app/blueprints/utils.py

"""
路由工具函数
"""

# 导入Flask用于返回JSON响应
from flask import jsonify,request
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入类型提示工具
from typing import Tuple, Optional
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)

# 定义成功响应函数
def success_response(data=None, message="success"):
    """
    成功响应

    Args:
        data: 响应数据
        message: 响应消息

    Returns:
        JSON 响应
    """
    # 返回标准格式的JSON成功响应
    return jsonify({
        "code": 200,       # 状态码200,表示成功
        "message": message, # 响应消息
        "data": data        # 响应数据
    })

# 定义错误响应函数
def error_response(message: str, code: int = 400):
    """
    错误响应

    Args:
        message: 错误消息
        code: HTTP 状态码

    Returns:
        JSON 响应和状态码
    """
    # 返回标准格式的JSON错误响应,以及相应的HTTP状态码
    return jsonify({
        "code": code,        # 错误码,对应HTTP状态码
        "message": message,  # 错误消息
        "data": None         # 错误时无数据
    }), code

# 定义API错误处理装饰器
def handle_api_error(func):
    """
    API 错误处理装饰器

    使用示例:
        @handle_api_error
        def my_api():
            # API 逻辑
            return success_response(data)
    """
    # 保留原函数信息并定义包装器
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            # 正常执行被装饰的API函数
            return func(*args, **kwargs)
        except ValueError as e:
            # 捕获ValueError,日志记录warning信息并返回400错误响应
            logger.warning(f"ValueError in {func.__name__}: {e}")
            return error_response(str(e), 400)
        except Exception as e:
            # 捕获其他所有异常,日志记录error信息并返回500错误响应
            logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
            return error_response(str(e), 500)
    # 返回包装后的函数
    return wrapper

# 定义获取分页参数的函数,允许指定最大每页数量
def get_pagination_params(max_page_size: int = 1000) -> Tuple[int, int]:
    """
    获取分页参数

    Args:
        max_page_size: 最大每页数量

    Returns:
        (page, page_size) 元组
    """
    # 获取请求中的 'page' 参数,默认为1,并将其转换为整数
    page = int(request.args.get('page', 1))
    # 获取请求中的 'page_size' 参数,默认为10,并将其转换为整数
    page_size = int(request.args.get('page_size', 10))

    # 保证 page 至少为1
    page = max(1, page)
    # 保证 page_size 至少为1且不超过 max_page_size
    page_size = max(1, min(page_size, max_page_size))

    # 返回分页的(page, page_size)元组
    return page, page_size

# 定义获取当前用户或返回错误的函数
+def get_current_user_or_error():
+   """
+   获取当前用户,如果未登录则返回错误响应

+   Returns:
+       如果成功返回 (user_dict, None),如果失败返回 (None, error_response)
+   """
    # 调用 get_current_user() 获取当前用户对象
+   current_user = get_current_user()
    # 如果没有获取到用户,则返回 (None, 错误响应)
+   if not current_user:
+       return None, error_response("Unauthorized", 401)
    # 如果获取到用户,则返回 (用户对象, None)
+   return current_user, None

5.3. kb_list.html #

app/templates/kb_list.html

{% extends "base.html" %}

{% block title %}知识库管理 - RAG Lite{% endblock %}

{% block content %}
<style>
@media (min-width: 992px) {
    #kbList > div {
        flex: 0 0 20%;
        max-width: 20%;
    }
}
</style>
<div class="row">
    <div class="col-12">
        <nav aria-label="breadcrumb" class="mb-3">
            <ol class="breadcrumb">
                <li class="breadcrumb-item"><a href="/">首页</a></li>
                <li class="breadcrumb-item active">知识库管理</li>
            </ol>
        </nav>

        <div class="d-flex justify-content-between align-items-center mb-4">
            <h2><i class="bi bi-collection"></i> 知识库管理</h2>
+           <button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
+               <i class="bi bi-plus-circle"></i> 创建知识库
+           </button>
        </div>
        <!-- 知识库列表 -->
        <div class="row" id="kbList">
            {% if kbs %}
                {% for kb in kbs %}
                <div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
                    <div class="card h-100">
                        <div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
                            <i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
                        </div>
                        <div class="card-body">
                            <h5 class="card-title">
                                <i class="bi bi-folder"></i> {{ kb.name }}
                            </h5>
                            <p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
                        </div>
                    </div>
                </div>
                {% endfor %}
            {% else %}
                <div class="col-12">
                    <div class="alert alert-info">
                        <i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
                    </div>
                </div>
            {% endif %}
        </div>

        <!-- 分页控件 -->
        {% if pagination and pagination.total > pagination.page_size %}
        <nav aria-label="知识库列表分页" class="mt-4">
            <ul class="pagination justify-content-center">
                {% set current_page = pagination.page %}
                {% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}

                <!-- 上一页 -->
                <li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}" 
                       {% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
                        <i class="bi bi-chevron-left"></i> 上一页
                    </a>
                </li>

                <!-- 页码 -->
                {% set start_page = [1, current_page - 2] | max %}
                {% set end_page = [total_pages, current_page + 2] | min %}

                {% if start_page > 1 %}
                <li class="page-item">
                    <a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
                </li>
                {% if start_page > 2 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                {% endif %}

                {% for page_num in range(start_page, end_page + 1) %}
                <li class="page-item {% if page_num == current_page %}active{% endif %}">
                    <a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
                        {{ page_num }}
                    </a>
                </li>
                {% endfor %}

                {% if end_page < total_pages %}
                {% if end_page < total_pages - 1 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                <li class="page-item">
                    <a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
                </li>
                {% endif %}

                <!-- 下一页 -->
                <li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
                       {% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
                        下一页 <i class="bi bi-chevron-right"></i>
                    </a>
                </li>
            </ul>
            <div class="text-center text-muted small mt-2">
                共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
            </div>
        </nav>
        {% endif %}
    </div>
</div>
+<!-- 创建知识库模态框 -->
+<div class="modal fade" id="createKbModal" tabindex="-1">
+   <div class="modal-dialog">
+       <div class="modal-content">
+           <div class="modal-header">
+               <h5 class="modal-title">创建知识库</h5>
+               <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
+           </div>
+           <form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
+               <div class="modal-body">
+                   <div class="mb-3">
+                       <label class="form-label">名称 <span class="text-danger">*</span></label>
+                       <input type="text" class="form-control" name="name" required>
+                   </div>
+                   <div class="mb-3">
+                       <label class="form-label">描述</label>
+                       <textarea class="form-control" name="description" rows="3"></textarea>
+                   </div>
+                   <div class="row">
+                       <div class="col-md-6 mb-3">
+                           <label class="form-label">分块大小</label>
+                           <input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
+                           <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
+                       </div>
+                       <div class="col-md-6 mb-3">
+                           <label class="form-label">分块重叠</label>
+                           <input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
+                           <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
+                       </div>
+                   </div>
+               </div>
+               <div class="modal-footer">
+                   <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
+                   <button type="submit" class="btn btn-primary">创建</button>
+               </div>
+           </form>
+       </div>
+   </div>
+</div>
{% endblock %}

{% block extra_js %}
<script>
+async function createKb(event) {
+   event.preventDefault();
+   const form = event.target;
+   const formData = new FormData(form);

+   try {
+       const response = await fetch('/api/v1/kb', {
+           method: 'POST',
+           body: formData  // 使用 FormData,不要设置 Content-Type,让浏览器自动设置
+       });

+       if (response.ok) {
+           location.reload();
+       } else {
+           const error = await response.json();
+           alert('创建失败: ' + error.message);
+       }
+   } catch (error) {
+       alert('创建失败: ' + error.message);
+   }
+}
</script>
{% endblock %}

6.删除知识库 #

6.1. knowledgebase.py #

app/blueprints/knowledgebase.py

# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""

# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template
# 导入logging模块
import logging

# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
+from app.blueprints.utils import (get_pagination_params,check_ownership)
# 配置logger
logger = logging.getLogger(__name__)

# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)

# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
    # 接口用途说明文档字符串
    """创建知识库"""
    # 获取当前用户,如未登录则返回错误响应
    current_user, err = get_current_user_or_error()
    if err:
        return err
    # 检查请求是否为multipart/form-data(用于文件上传的表单方式)
    if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单数据中获取知识库名称
        name = request.form.get('name')
        # 如果未传入name参数,返回错误
        if not name:
            return error_response("name is required", 400)
        # 获取描述字段,没有则为None
        description = request.form.get('description') or None
        # 获取分块大小,默认为512
        chunk_size = int(request.form.get('chunk_size', 512))
        # 获取分块重叠,默认为50
        chunk_overlap = int(request.form.get('chunk_overlap', 50))
    else:
        # 如果是json请求数据(向后兼容旧用法)
        data = request.get_json()
        # 判断是否存在name字段,不存在则报错
        if not data or 'name' not in data:
            return error_response("name is required", 400)
        # 获取知识库名称
        name = data['name']
        # 获取描述
        description = data.get('description')
        # 获取分块大小,默认为512
        chunk_size = data.get('chunk_size', 512)
        # 获取分块重叠,默认为50
        chunk_overlap = data.get('chunk_overlap', 50)
    # 调用知识库服务,创建知识库,返回知识库信息字典
    kb_dict = kb_service.create(
        name=name,                # 知识库名称
        user_id=current_user['id'],   # 用户ID
        description=description,      # 知识库描述
        chunk_size=chunk_size,        # 分块大小
        chunk_overlap=chunk_overlap,  # 分块重叠
    )
    # 返回成功响应,包含知识库信息
    return success_response(kb_dict)


# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
    # 设置本函数用途说明(文档字符串)
    """知识库列表页面"""
    # 获取当前登录用户信息
    current_user = get_current_user()
    # 获取分页参数(页码和每页大小),最大每页100
    page, page_size = get_pagination_params(max_page_size=100)
    # 调用知识库服务,获取分页后的知识库列表结果
    result = kb_service.list(
        user_id=current_user['id'], # 用户ID
        page=page, # 页码
        page_size=page_size # 每页大小
    )
    # 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
    return render_template('kb_list.html', 
                         kbs=result['items'],
                         pagination=result)    


+@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
+@api_login_required
+@handle_api_error
+def api_delete(kb_id):
+   """删除知识库"""
+   current_user, err = get_current_user_or_error()
+   if err:
+       return err

+   kb_dict = kb_service.get_by_id(kb_id)
+   if not kb_dict:
+       return error_response("Knowledgebase not found", 404)

    # 验证用户是否有权限访问该知识库
+   has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
+   if not has_permission:
+       return err

+   success = kb_service.delete(kb_id)
+   if not success:
+       return error_response("Knowledgebase not found", 404)

+   return success_response()

6.2. utils.py #

app/blueprints/utils.py

"""
路由工具函数
"""

# 导入Flask用于返回JSON响应
from flask import jsonify,request
# 导入装饰器工具,用来保持原函数信息
from functools import wraps
# 导入类型提示工具
from typing import Tuple, Optional
# 导入获取当前用户的工具函数
from app.utils.auth import get_current_user
# 导入日志模块
import logging
# 获取logger对象(当前模块名)
logger = logging.getLogger(__name__)

# 定义成功响应函数
def success_response(data=None, message="success"):
    """
    成功响应

    Args:
        data: 响应数据
        message: 响应消息

    Returns:
        JSON 响应
    """
    # 返回标准格式的JSON成功响应
    return jsonify({
        "code": 200,       # 状态码200,表示成功
        "message": message, # 响应消息
        "data": data        # 响应数据
    })

# 定义错误响应函数
def error_response(message: str, code: int = 400):
    """
    错误响应

    Args:
        message: 错误消息
        code: HTTP 状态码

    Returns:
        JSON 响应和状态码
    """
    # 返回标准格式的JSON错误响应,以及相应的HTTP状态码
    return jsonify({
        "code": code,        # 错误码,对应HTTP状态码
        "message": message,  # 错误消息
        "data": None         # 错误时无数据
    }), code

# 定义API错误处理装饰器
def handle_api_error(func):
    """
    API 错误处理装饰器

    使用示例:
        @handle_api_error
        def my_api():
            # API 逻辑
            return success_response(data)
    """
    # 保留原函数信息并定义包装器
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            # 正常执行被装饰的API函数
            return func(*args, **kwargs)
        except ValueError as e:
            # 捕获ValueError,日志记录warning信息并返回400错误响应
            logger.warning(f"ValueError in {func.__name__}: {e}")
            return error_response(str(e), 400)
        except Exception as e:
            # 捕获其他所有异常,日志记录error信息并返回500错误响应
            logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
            return error_response(str(e), 500)
    # 返回包装后的函数
    return wrapper

# 定义获取分页参数的函数,允许指定最大每页数量
def get_pagination_params(max_page_size: int = 1000) -> Tuple[int, int]:
    """
    获取分页参数

    Args:
        max_page_size: 最大每页数量

    Returns:
        (page, page_size) 元组
    """
    # 获取请求中的 'page' 参数,默认为1,并将其转换为整数
    page = int(request.args.get('page', 1))
    # 获取请求中的 'page_size' 参数,默认为10,并将其转换为整数
    page_size = int(request.args.get('page_size', 10))

    # 保证 page 至少为1
    page = max(1, page)
    # 保证 page_size 至少为1且不超过 max_page_size
    page_size = max(1, min(page_size, max_page_size))

    # 返回分页的(page, page_size)元组
    return page, page_size

# 定义获取当前用户或返回错误的函数
def get_current_user_or_error():
    """
    获取当前用户,如果未登录则返回错误响应

    Returns:
        如果成功返回 (user_dict, None),如果失败返回 (None, error_response)
    """
    # 调用 get_current_user() 获取当前用户对象
    current_user = get_current_user()
    # 如果没有获取到用户,则返回 (None, 错误响应)
    if not current_user:
        return None, error_response("Unauthorized", 401)
    # 如果获取到用户,则返回 (用户对象, None)
    return current_user, None

# 定义检查资源所有权的函数,判断当前用户是否为资源所有者
+def check_ownership(entity_user_id: str, current_user_id: str, 
+                  entity_name: str = "资源") -> Tuple[bool, Optional[Tuple]]:
    # 检查资源所属用户ID是否与当前用户ID相同
+   if entity_user_id != current_user_id:
        # 如果不同,返回False,并返回403未授权的错误响应
+       return False, error_response(f"Unauthorized to access this {entity_name}", 403)
    # 如果相同,则有权限,返回True和None
+   return True, None

6.3. base_service.py #

app/services/base_service.py

# 基础服务类
"""
基础服务类
"""
# 导入日志库
import logging
# 导入可选类型、泛型、类型变量和类型别名
from typing import Optional, TypeVar, Generic, Dict, Any
# 导入数据库会话和事务管理工具
from app.utils.db import db_session, db_transaction

# 创建日志记录器
logger = logging.getLogger(__name__)

# 定义泛型的类型变量T
T = TypeVar('T')


# 定义基础服务类,支持泛型
class BaseService(Generic[T]):
    # 基础服务类,提供通用的数据库操作方法

    # 初始化方法
    def __init__(self):
        # 初始化服务的日志记录器
        self.logger = logging.getLogger(self.__class__.__name__)

    # 数据库会话上下文管理器(只读)
    def session(self):
        """
        数据库会话上下文管理器(只读操作,不自动提交)

        使用示例:
            with self.session() as db:
                result = db.query(Model).all()
                # 不需要手动关闭 session
        """
        # 返回数据库会话
        return db_session()

    # 数据库事务上下文管理器(自动提交)
    def transaction(self):
        """
        数据库事务上下文管理器(自动提交,出错时回滚)

        使用示例:
            with self.transaction() as db:
                obj = Model(...)
                db.add(obj)
                # 自动提交,出错时自动回滚
        """
        # 返回数据库事务
        return db_transaction()

    def get_by_id(self, model_class: type, entity_id: str) -> Optional[T]:
        """
        根据ID获取实体(通用方法)

        Args:
            model_class: 模型类
            entity_id: 实体ID

        Returns:
            实体对象,如果不存在则返回 None
        """
        with self.session() as session:
            try:
                return session.query(model_class).filter(model_class.id == entity_id).first()
            except Exception as e:
                self.logger.error(f"Error getting {model_class.__name__} by id {entity_id}: {e}")
                return None    

6.4. knowledgebase_service.py #

app/services/knowledgebase_service.py

# 知识库服务
"""
知识库服务
"""
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService

# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase

# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
    """知识库服务"""
    # 定义创建知识库的方法
    def create(self, name: str, user_id: str, description: str = None, 
               chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
        """
        创建知识库

        Args:
            name: 知识库名称
            user_id: 用户ID
            description: 描述
            chunk_size: 分块大小
            chunk_overlap: 分块重叠

        Returns:
            创建的知识库字典
        """
        # 启动数据库事务,上下文管理器自动处理提交或回滚
        with self.transaction() as session:
            # 先创建知识库对象
            kb = Knowledgebase(
                name=name,  # 设置知识库名称
                user_id=user_id,  # 设置用户ID
                description=description,  # 设置知识库描述
                chunk_size=chunk_size,  # 设置分块大小
                chunk_overlap=chunk_overlap  # 设置分块重叠
            )
            # 将知识库对象添加到session
            session.add(kb)
            # 刷新session,生成知识库ID
            session.flush()  # 刷新以获取 ID,但不提交
            # 刷新kb对象的数据库状态
            session.refresh(kb)
            # 转换kb对象为字典(在session内部,避免分离后出错)
            kb_dict = kb.to_dict()
            # 记录创建知识库的日志,包含ID
            self.logger.info(f"创建了知识库,ID: {kb.id}")
            # 返回知识库字典信息
            return kb_dict

    # 定义获取知识库列表的方法
    def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
        """
        获取知识库列表

        Args:
            user_id: 用户ID(可选)
            page: 页码
            page_size: 每页数量

        Returns:
            包含 items, total, page, page_size 的字典
        """
        # 使用数据库会话
        with self.session() as session:
            # 查询Knowledgebase表
            query = session.query(Knowledgebase)
            # 如果指定了user_id,则筛选属于该用户的知识库
            if user_id:
                query = query.filter(Knowledgebase.user_id == user_id)
            # 统计总记录数
            total = query.count()
            # 计算分页偏移量
            offset = (page - 1) * page_size
            # 获取当前页的数据列表
            kbs = query.offset(offset).limit(page_size).all()

            # 初始化知识库字典列表
            items = []
            # 遍历查询结果,将每一项转为dict后添加到items列表
            for kb in kbs:
                kb_dict = kb.to_dict()
                items.append(kb_dict)

            # 返回包含分页信息和数据条目的字典
            return {
                'items': items,
                'total': total,
                'page': page,
                'page_size': page_size
+           }     

+   def delete(self, kb_id: str) -> bool:
+       """
+       删除知识库

+       Args:
+           kb_id: 知识库ID

+       Returns:
+           是否删除成功
+       """
+       with self.transaction() as session:
+           kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
+           if not kb:
+               return False
+           session.delete(kb)
+           self.logger.info(f"Deleted knowledgebase: {kb_id}")
+           return True    

+   def get_by_id(self, kb_id: str) -> Optional[dict]:
+       """根据ID获取知识库"""
+       with self.session() as session:
+           kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
+           if kb:
                # 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
+               return kb.to_dict()
+           return None               

# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()    

6.5. kb_list.html #

app/templates/kb_list.html

{% extends "base.html" %}

{% block title %}知识库管理 - RAG Lite{% endblock %}

{% block content %}
<style>
@media (min-width: 992px) {
    #kbList > div {
        flex: 0 0 20%;
        max-width: 20%;
    }
}
</style>
<div class="row">
    <div class="col-12">
        <nav aria-label="breadcrumb" class="mb-3">
            <ol class="breadcrumb">
                <li class="breadcrumb-item"><a href="/">首页</a></li>
                <li class="breadcrumb-item active">知识库管理</li>
            </ol>
        </nav>

        <div class="d-flex justify-content-between align-items-center mb-4">
            <h2><i class="bi bi-collection"></i> 知识库管理</h2>
            <button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
                <i class="bi bi-plus-circle"></i> 创建知识库
            </button>
        </div>
        <!-- 知识库列表 -->
        <div class="row" id="kbList">
            {% if kbs %}
                {% for kb in kbs %}
                <div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
                    <div class="card h-100">
                        <div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
                            <i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
                        </div>
                        <div class="card-body">
                            <h5 class="card-title">
                                <i class="bi bi-folder"></i> {{ kb.name }}
                            </h5>
                            <p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
                        </div>
+                       <div class="card-footer bg-transparent">
+                           <button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
+                               <i class="bi bi-trash"></i> 删除
+                           </button>
+                       </div>
                    </div>
                </div>
                {% endfor %}
            {% else %}
                <div class="col-12">
                    <div class="alert alert-info">
                        <i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
                    </div>
                </div>
            {% endif %}
        </div>

        <!-- 分页控件 -->
        {% if pagination and pagination.total > pagination.page_size %}
        <nav aria-label="知识库列表分页" class="mt-4">
            <ul class="pagination justify-content-center">
                {% set current_page = pagination.page %}
                {% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}

                <!-- 上一页 -->
                <li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}" 
                       {% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
                        <i class="bi bi-chevron-left"></i> 上一页
                    </a>
                </li>

                <!-- 页码 -->
                {% set start_page = [1, current_page - 2] | max %}
                {% set end_page = [total_pages, current_page + 2] | min %}

                {% if start_page > 1 %}
                <li class="page-item">
                    <a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
                </li>
                {% if start_page > 2 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                {% endif %}

                {% for page_num in range(start_page, end_page + 1) %}
                <li class="page-item {% if page_num == current_page %}active{% endif %}">
                    <a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
                        {{ page_num }}
                    </a>
                </li>
                {% endfor %}

                {% if end_page < total_pages %}
                {% if end_page < total_pages - 1 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                <li class="page-item">
                    <a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
                </li>
                {% endif %}

                <!-- 下一页 -->
                <li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
                       {% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
                        下一页 <i class="bi bi-chevron-right"></i>
                    </a>
                </li>
            </ul>
            <div class="text-center text-muted small mt-2">
                共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
            </div>
        </nav>
        {% endif %}
    </div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
    <div class="modal-dialog">
        <div class="modal-content">
            <div class="modal-header">
                <h5 class="modal-title">创建知识库</h5>
                <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
            </div>
            <form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
                <div class="modal-body">
                    <div class="mb-3">
                        <label class="form-label">名称 <span class="text-danger">*</span></label>
                        <input type="text" class="form-control" name="name" required>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">描述</label>
                        <textarea class="form-control" name="description" rows="3"></textarea>
                    </div>
                    <div class="row">
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块大小</label>
                            <input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
                            <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
                        </div>
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块重叠</label>
                            <input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
                            <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
                        </div>
                    </div>
                </div>
                <div class="modal-footer">
                    <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
                    <button type="submit" class="btn btn-primary">创建</button>
                </div>
            </form>
        </div>
    </div>
</div>
{% endblock %}

{% block extra_js %}
<script>
async function createKb(event) {
    event.preventDefault();
    const form = event.target;
    const formData = new FormData(form);

    try {
        const response = await fetch('/api/v1/kb', {
            method: 'POST',
            body: formData  // 使用 FormData,不要设置 Content-Type,让浏览器自动设置
        });

        if (response.ok) {
            location.reload();
        } else {
            const error = await response.json();
            alert('创建失败: ' + error.message);
        }
    } catch (error) {
        alert('创建失败: ' + error.message);
    }
}

+async function deleteKb(kbId, kbName) {
+   if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
+       return;
+   }
+   try {
+       const response = await fetch(`/api/v1/kb/${kbId}`, {
+           method: 'DELETE'
+       });

+       if (response.ok) {
+           location.reload();
+       } else {
+           const error = await response.json();
+           alert('删除失败: ' + error.message);
+       }
+   } catch (error) {
+       alert('删除失败: ' + error.message);
+   }
+}
</script>
{% endblock %}

7.更新知识库 #

7.1. knowledgebase.py #

app/blueprints/knowledgebase.py

# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""

# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template
# 导入logging模块
import logging

# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params,check_ownership)
# 配置logger
logger = logging.getLogger(__name__)

# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)

# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
    # 接口用途说明文档字符串
    """创建知识库"""
    # 获取当前用户,如未登录则返回错误响应
    current_user, err = get_current_user_or_error()
    if err:
        return err
    # 检查请求是否为multipart/form-data(用于文件上传的表单方式)
    if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单数据中获取知识库名称
        name = request.form.get('name')
        # 如果未传入name参数,返回错误
        if not name:
            return error_response("name is required", 400)
        # 获取描述字段,没有则为None
        description = request.form.get('description') or None
        # 获取分块大小,默认为512
        chunk_size = int(request.form.get('chunk_size', 512))
        # 获取分块重叠,默认为50
        chunk_overlap = int(request.form.get('chunk_overlap', 50))
    else:
        # 如果是json请求数据(向后兼容旧用法)
        data = request.get_json()
        # 判断是否存在name字段,不存在则报错
        if not data or 'name' not in data:
            return error_response("name is required", 400)
        # 获取知识库名称
        name = data['name']
        # 获取描述
        description = data.get('description')
        # 获取分块大小,默认为512
        chunk_size = data.get('chunk_size', 512)
        # 获取分块重叠,默认为50
        chunk_overlap = data.get('chunk_overlap', 50)
    # 调用知识库服务,创建知识库,返回知识库信息字典
    kb_dict = kb_service.create(
        name=name,                # 知识库名称
        user_id=current_user['id'],   # 用户ID
        description=description,      # 知识库描述
        chunk_size=chunk_size,        # 分块大小
        chunk_overlap=chunk_overlap,  # 分块重叠
    )
    # 返回成功响应,包含知识库信息
    return success_response(kb_dict)


# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
    # 设置本函数用途说明(文档字符串)
    """知识库列表页面"""
    # 获取当前登录用户信息
    current_user = get_current_user()
    # 获取分页参数(页码和每页大小),最大每页100
    page, page_size = get_pagination_params(max_page_size=100)
    # 调用知识库服务,获取分页后的知识库列表结果
    result = kb_service.list(
        user_id=current_user['id'], # 用户ID
        page=page, # 页码
        page_size=page_size # 每页大小
    )
    # 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
    return render_template('kb_list.html', 
                         kbs=result['items'],
                         pagination=result)    

# 注册DELETE方法的API路由,用于删除知识库
@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
# 要求API登录
@api_login_required
# 处理API错误的装饰器
@handle_api_error
def api_delete(kb_id):
    """删除知识库"""
    # 获取当前用户信息,如果未登录则返回错误
    current_user, err = get_current_user_or_error()
    if err:
        return err

    # 根据知识库ID获取知识库信息
    kb_dict = kb_service.get_by_id(kb_id)
    # 如果知识库不存在,返回404错误
    if not kb_dict:
+       return error_response("未找到知识库", 404)

    # 验证当前用户是否拥有该知识库的操作权限
    has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
    if not has_permission:
        return err

    # 调用服务删除知识库
    success = kb_service.delete(kb_id)
    # 如果删除失败,返回404错误
    if not success:
+       return error_response("未找到知识库", 404)

    # 返回删除成功的响应
+   return success_response("知识库删除成功")


# 注册PUT方法的API路由,用于更新知识库
+@bp.route('/api/v1/kb/<kb_id>', methods=['PUT'])
# 要求API登录
+@api_login_required
# 处理API错误的装饰器
+@handle_api_error
+def api_update(kb_id):
+   """更新知识库(支持封面图片更新)"""
    # 获取当前用户信息,如果未登录则返回错误
+   current_user, err = get_current_user_or_error()
+   if err:
+       return err

    # 获取并校验知识库是否存在
+   kb_dict = kb_service.get_by_id(kb_id)
+   if not kb_dict:
+       return error_response("未找到知识库", 404)

    # 检查是否拥有操作该知识库的权限
+   has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
+   if not has_permission:
+       return err

    # 检查请求内容类型是否为multipart/form-data(用于文件上传)
+   if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单数据获取字段
+       name = request.form.get('name')
+       description = request.form.get('description') or None
+       chunk_size = request.form.get('chunk_size')
+       chunk_overlap = request.form.get('chunk_overlap')

        # 组装待更新的数据字典
+       update_data = {}
+       if name:
+           update_data['name'] = name
+       if description is not None:
+           update_data['description'] = description
+       if chunk_size:
+           update_data['chunk_size'] = int(chunk_size)
+       if chunk_overlap:
+           update_data['chunk_overlap'] = int(chunk_overlap)
+   else:
        # 如果不是form-data,则按JSON方式解析提交内容
+       data = request.get_json()
        # 请求体如果为空,返回400错误
+       if not data:
+           return error_response("请求体不能为空", 400)

        # 组装待更新的数据字典
+       update_data = {}
+       if 'name' in data:
+           update_data['name'] = data['name']
+       if 'description' in data:
+           update_data['description'] = data.get('description')
+       if 'chunk_size' in data:
+           update_data['chunk_size'] = data['chunk_size']
+       if 'chunk_overlap' in data:
+           update_data['chunk_overlap'] = data['chunk_overlap']

    # 调用服务进行知识库更新
+   updated_kb = kb_service.update(
+       kb_id=kb_id,
+       **update_data
+   )

    # 如果未找到知识库,返回404
+   if not updated_kb:
+       return error_response("未找到知识库", 404)

    # 返回更新成功的响应及最新知识库数据
+   return success_response(updated_kb, "知识库更新成功")

7.2. knowledgebase_service.py #

app/services/knowledgebase_service.py

# 知识库服务
"""
知识库服务
"""
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService

# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase

# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
    """知识库服务"""
    # 定义创建知识库的方法
    def create(self, name: str, user_id: str, description: str = None, 
               chunk_size: int = 512, chunk_overlap: int = 50) -> dict:
        """
        创建知识库

        Args:
            name: 知识库名称
            user_id: 用户ID
            description: 描述
            chunk_size: 分块大小
            chunk_overlap: 分块重叠

        Returns:
            创建的知识库字典
        """
        # 启动数据库事务,上下文管理器自动处理提交或回滚
        with self.transaction() as session:
            # 先创建知识库对象
            kb = Knowledgebase(
                name=name,  # 设置知识库名称
                user_id=user_id,  # 设置用户ID
                description=description,  # 设置知识库描述
                chunk_size=chunk_size,  # 设置分块大小
                chunk_overlap=chunk_overlap  # 设置分块重叠
            )
            # 将知识库对象添加到session
            session.add(kb)
            # 刷新session,生成知识库ID
            session.flush()  # 刷新以获取 ID,但不提交
            # 刷新kb对象的数据库状态
            session.refresh(kb)
            # 转换kb对象为字典(在session内部,避免分离后出错)
            kb_dict = kb.to_dict()
            # 记录创建知识库的日志,包含ID
            self.logger.info(f"创建了知识库,ID: {kb.id}")
            # 返回知识库字典信息
            return kb_dict

    # 定义获取知识库列表的方法
    def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
        """
        获取知识库列表

        Args:
            user_id: 用户ID(可选)
            page: 页码
            page_size: 每页数量

        Returns:
            包含 items, total, page, page_size 的字典
        """
        # 使用数据库会话
        with self.session() as session:
            # 查询Knowledgebase表
            query = session.query(Knowledgebase)
            # 如果指定了user_id,则筛选属于该用户的知识库
            if user_id:
                query = query.filter(Knowledgebase.user_id == user_id)
            # 统计总记录数
            total = query.count()
            # 计算分页偏移量
            offset = (page - 1) * page_size
            # 获取当前页的数据列表
            kbs = query.offset(offset).limit(page_size).all()

            # 初始化知识库字典列表
            items = []
            # 遍历查询结果,将每一项转为dict后添加到items列表
            for kb in kbs:
                kb_dict = kb.to_dict()
                items.append(kb_dict)

            # 返回包含分页信息和数据条目的字典
            return {
                'items': items,
                'total': total,
                'page': page,
                'page_size': page_size
            }     

    def delete(self, kb_id: str) -> bool:
        """
        删除知识库

        Args:
            kb_id: 知识库ID

        Returns:
            是否删除成功
        """
        with self.transaction() as session:
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            if not kb:
                return False
            session.delete(kb)
            self.logger.info(f"Deleted knowledgebase: {kb_id}")
            return True    

    def get_by_id(self, kb_id: str) -> Optional[dict]:
        """根据ID获取知识库"""
        with self.session() as session:
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            if kb:
                # 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
                return kb.to_dict()
+           return None   
    # 定义 update 方法,用于更新知识库
+   def update(self, kb_id: str, **kwargs) -> Optional[dict]:
+       """
+       更新知识库

+       Args:
+           kb_id: 知识库ID
+           cover_image_data: 新的封面图片数据(可选)
+           cover_image_filename: 新的封面图片文件名(可选)
+           delete_cover: 是否删除封面图片(可选)
+           **kwargs: 要更新的字段(name, description, chunk_size, chunk_overlap 等)

+       Returns:
+           更新后的知识库字典,如果不存在则返回 None
+       """
        # 开启数据库事务
+       with self.transaction() as session:
            # 查询指定ID的知识库对象
+           kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            # 如果未找到知识库,则返回 None
+           if not kb:
+               return None
            # 遍历要更新的字段和值
+           for key, value in kwargs.items():
                # 判断知识库对象是否有该字段,且值不为 None
+               if hasattr(kb, key) and value is not None:
                    # 设置该字段的新值
+                   setattr(kb, key, value)

            # 刷新session,保证对象属性为最新状态
+           session.flush()
            # 刷新对象,避免未提交前读取到旧数据
+           session.refresh(kb)

            # 在事务内部将对象转为字典,避免 session 关闭后访问失败
+           kb_dict = kb.to_dict()

            # 如果本次更新包含 'cover_image' 字段,记录详细日志
+           if 'cover_image' in kwargs:
+               self.logger.info(f"更新知识库 {kb_id}, 封面图片={kb_dict.get('cover_image')}")
+           else:
                # 否则仅记录知识库ID
+               self.logger.info(f"更新知识库: {kb_id}")

            # 返回更新后的知识库字典
+           return kb_dict

# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()    

7.3. kb_list.html #

app/templates/kb_list.html

{% extends "base.html" %}

{% block title %}知识库管理 - RAG Lite{% endblock %}

{% block content %}
<style>
@media (min-width: 992px) {
    #kbList > div {
        flex: 0 0 20%;
        max-width: 20%;
    }
}
</style>
<div class="row">
    <div class="col-12">
        <nav aria-label="breadcrumb" class="mb-3">
            <ol class="breadcrumb">
                <li class="breadcrumb-item"><a href="/">首页</a></li>
                <li class="breadcrumb-item active">知识库管理</li>
            </ol>
        </nav>

        <div class="d-flex justify-content-between align-items-center mb-4">
            <h2><i class="bi bi-collection"></i> 知识库管理</h2>
            <button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
                <i class="bi bi-plus-circle"></i> 创建知识库
            </button>
        </div>
        <!-- 知识库列表 -->
        <div class="row" id="kbList">
            {% if kbs %}
                {% for kb in kbs %}
                <div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
                    <div class="card h-100">
                        <div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
                            <i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
                        </div>
                        <div class="card-body">
                            <h5 class="card-title">
                                <i class="bi bi-folder"></i> {{ kb.name }}
                            </h5>
                            <p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
                        </div>
                        <div class="card-footer bg-transparent">
+                           <button class="btn btn-sm btn-warning" 
+                                   data-kb-id="{{ kb.id }}"
+                                   data-kb-name="{{ kb.name }}"
+                                   data-kb-description="{{ kb.description or '' }}"
+                                   data-kb-chunk-size="{{ kb.chunk_size }}"
+                                   data-kb-chunk-overlap="{{ kb.chunk_overlap }}"
+                                   data-kb-cover-image="{{ kb.cover_image or '' }}"
+                                   onclick="editKbFromButton(this)">
+                               <i class="bi bi-pencil"></i> 编辑
+                           </button>
                            <button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
                                <i class="bi bi-trash"></i> 删除
                            </button>
                        </div>
                    </div>
                </div>
                {% endfor %}
            {% else %}
                <div class="col-12">
                    <div class="alert alert-info">
                        <i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
                    </div>
                </div>
            {% endif %}
        </div>

        <!-- 分页控件 -->
        {% if pagination and pagination.total > pagination.page_size %}
        <nav aria-label="知识库列表分页" class="mt-4">
            <ul class="pagination justify-content-center">
                {% set current_page = pagination.page %}
                {% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}

                <!-- 上一页 -->
                <li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}" 
                       {% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
                        <i class="bi bi-chevron-left"></i> 上一页
                    </a>
                </li>

                <!-- 页码 -->
                {% set start_page = [1, current_page - 2] | max %}
                {% set end_page = [total_pages, current_page + 2] | min %}

                {% if start_page > 1 %}
                <li class="page-item">
                    <a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
                </li>
                {% if start_page > 2 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                {% endif %}

                {% for page_num in range(start_page, end_page + 1) %}
                <li class="page-item {% if page_num == current_page %}active{% endif %}">
                    <a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
                        {{ page_num }}
                    </a>
                </li>
                {% endfor %}

                {% if end_page < total_pages %}
                {% if end_page < total_pages - 1 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                <li class="page-item">
                    <a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
                </li>
                {% endif %}

                <!-- 下一页 -->
                <li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
                       {% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
                        下一页 <i class="bi bi-chevron-right"></i>
                    </a>
                </li>
            </ul>
            <div class="text-center text-muted small mt-2">
                共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
            </div>
        </nav>
        {% endif %}
    </div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
    <div class="modal-dialog">
        <div class="modal-content">
            <div class="modal-header">
                <h5 class="modal-title">创建知识库</h5>
                <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
            </div>
            <form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
                <div class="modal-body">
                    <div class="mb-3">
                        <label class="form-label">名称 <span class="text-danger">*</span></label>
                        <input type="text" class="form-control" name="name" required>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">描述</label>
                        <textarea class="form-control" name="description" rows="3"></textarea>
                    </div>
                    <div class="row">
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块大小</label>
                            <input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
                            <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
                        </div>
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块重叠</label>
                            <input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
                            <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
                        </div>
                    </div>
                </div>
                <div class="modal-footer">
                    <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
                    <button type="submit" class="btn btn-primary">创建</button>
                </div>
            </form>
        </div>
    </div>
</div>

+<!-- 编辑知识库模态框 -->
+<div class="modal fade" id="editKbModal" tabindex="-1">
+   <div class="modal-dialog">
+       <div class="modal-content">
+           <div class="modal-header">
+               <h5 class="modal-title">编辑知识库</h5>
+               <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
+           </div>
+           <form id="editKbForm" onsubmit="updateKb(event)" enctype="multipart/form-data">
+               <input type="hidden" name="kb_id" id="editKbId">
+               <div class="modal-body">
+                   <div class="mb-3">
+                       <label class="form-label">名称 <span class="text-danger">*</span></label>
+                       <input type="text" class="form-control" name="name" id="editKbName" required>
+                   </div>
+                   <div class="mb-3">
+                       <label class="form-label">描述</label>
+                       <textarea class="form-control" name="description" id="editKbDescription" rows="3"></textarea>
+                   </div>
+                   <div class="row">
+                       <div class="col-md-6 mb-3">
+                           <label class="form-label">分块大小</label>
+                           <input type="number" class="form-control" name="chunk_size" id="editKbChunkSize" value="512" min="100" max="2000">
+                           <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
+                       </div>
+                       <div class="col-md-6 mb-3">
+                           <label class="form-label">分块重叠</label>
+                           <input type="number" class="form-control" name="chunk_overlap" id="editKbChunkOverlap" value="50" min="0" max="200">
+                           <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
+                       </div>
+                   </div>
+               </div>
+               <div class="modal-footer">
+                   <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
+                   <button type="submit" class="btn btn-primary">保存</button>
+               </div>
+           </form>
+       </div>
+   </div>
+</div>
{% endblock %}

{% block extra_js %}
<script>
async function createKb(event) {
    event.preventDefault();
    const form = event.target;
    const formData = new FormData(form);

    try {
        const response = await fetch('/api/v1/kb', {
            method: 'POST',
            body: formData  // 使用 FormData,不要设置 Content-Type,让浏览器自动设置
        });

        if (response.ok) {
            location.reload();
        } else {
            const error = await response.json();
            alert('创建失败: ' + error.message);
        }
    } catch (error) {
        alert('创建失败: ' + error.message);
    }
}

async function deleteKb(kbId, kbName) {
    if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
        return;
    }
    try {
        const response = await fetch(`/api/v1/kb/${kbId}`, {
            method: 'DELETE'
        });

        if (response.ok) {
            location.reload();
        } else {
            const error = await response.json();
            alert('删除失败: ' + error.message);
        }
    } catch (error) {
        alert('删除失败: ' + error.message);
    }
}
+// 从按钮的 data 属性读取数据并编辑知识库
+function editKbFromButton(button) {
+   const kbId = button.getAttribute('data-kb-id');
+   const name = button.getAttribute('data-kb-name');
+   const description = button.getAttribute('data-kb-description') || '';
+   const chunkSize = parseInt(button.getAttribute('data-kb-chunk-size')) || 512;
+   const chunkOverlap = parseInt(button.getAttribute('data-kb-chunk-overlap')) || 50;

+   editKb(kbId, name, description, chunkSize, chunkOverlap);
+}

+// 编辑知识库
+function editKb(kbId, name, description, chunkSize, chunkOverlap) {
+   // 填充表单数据
+   document.getElementById('editKbId').value = kbId;
+   document.getElementById('editKbName').value = name;
+   document.getElementById('editKbDescription').value = description || '';
+   document.getElementById('editKbChunkSize').value = chunkSize;
+   document.getElementById('editKbChunkOverlap').value = chunkOverlap;

+   // 显示模态框
+   const modal = new bootstrap.Modal(document.getElementById('editKbModal'));
+   modal.show();
+}
+async function updateKb(event) {
+   event.preventDefault();
+   const form = event.target;
+   const formData = new FormData(form);
+   const kbId = formData.get('kb_id');

+   try {
+       const response = await fetch(`/api/v1/kb/${kbId}`, {
+           method: 'PUT',
+           body: formData
+       });

+       if (response.ok) {
+           location.reload();
+       } else {
+           const error = await response.json();
+           alert('更新失败: ' + error.message);
+       }
+   } catch (error) {
+       alert('更新失败: ' + error.message);
+   }
+}
</script>
{% endblock %}

8.知识库封面 #

8.1. storage_service.py #

app/services/storage_service.py

# 从 app.services.storage.factory 导入 StorageFactory 类
from app.services.storage.factory import StorageFactory

# 获取 StorageFactory 的单例实例,并赋值给 storage_service
storage_service = StorageFactory.get_instance()

8.2. knowledgebase.py #

app/blueprints/knowledgebase.py

# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""

# 导入Flask中的Blueprint和request
+from flask import Blueprint,request,render_template,send_file,abort
# 使用BytesIO将图片数据包装为文件流
+from io import BytesIO
# 导入logging模块
import logging
# 导入mimetypes、os模块用于类型判断
+import mimetypes
# 导入os模块用于路径操作
+import os
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params,check_ownership)
# 导入存储服务
+from app.services.storage_service import storage_service
# 配置logger
logger = logging.getLogger(__name__)

# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)

# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
    # 接口用途说明文档字符串
    """创建知识库"""
    # 获取当前用户,如未登录则返回错误响应
    current_user, err = get_current_user_or_error()
    if err:
        return err
    # 检查请求是否为multipart/form-data(用于文件上传的表单方式)
    if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单数据中获取知识库名称
        name = request.form.get('name')
        # 如果未传入name参数,返回错误
        if not name:
            return error_response("name is required", 400)
        # 获取描述字段,没有则为None
        description = request.form.get('description') or None
        # 获取分块大小,默认为512
        chunk_size = int(request.form.get('chunk_size', 512))
        # 获取分块重叠,默认为50
        chunk_overlap = int(request.form.get('chunk_overlap', 50))
        # 设置封面图片数据变量初值为None
+       cover_image_data = None
        # 设置封面图片文件名变量初值为None
+       cover_image_filename = None
        # 判断请求中是否包含'cover_image'文件
+       if 'cover_image' in request.files:
            # 获取上传的封面图片文件对象
+           cover_file = request.files['cover_image']
            # 如果上传的文件存在且有文件名
+           if cover_file and cover_file.filename:
                # 读取文件内容为二进制数据
+               cover_image_data = cover_file.read()
                # 获取上传文件的文件名
+               cover_image_filename = cover_file.filename
                # 记录封面图片上传的信息到日志,包括文件名、字节大小和内容类型
+               logger.info(f"收到新知识库的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")
    else:
        # 如果是json请求数据(向后兼容旧用法)
        data = request.get_json()
        # 判断是否存在name字段,不存在则报错
        if not data or 'name' not in data:
            return error_response("name is required", 400)
        # 获取知识库名称
        name = data['name']
        # 获取描述
        description = data.get('description')
        # 获取分块大小,默认为512
        chunk_size = data.get('chunk_size', 512)
        # 获取分块重叠,默认为50
        chunk_overlap = data.get('chunk_overlap', 50)
        # 设置封面图片数据变量初值为None
+       cover_image_data = None
        # 设置封面图片文件名变量初值为None
+       cover_image_filename = None
    # 调用知识库服务,创建知识库,返回知识库信息字典
    kb_dict = kb_service.create(
        name=name,                # 知识库名称
        user_id=current_user['id'],   # 用户ID
        description=description,      # 知识库描述
        chunk_size=chunk_size,        # 分块大小
        chunk_overlap=chunk_overlap,  # 分块重叠
+       cover_image_data=cover_image_data, # 封面图片数据
+       cover_image_filename=cover_image_filename # 封面图片文件名
    )
    # 返回成功响应,包含知识库信息
    return success_response(kb_dict)


# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
    # 设置本函数用途说明(文档字符串)
    """知识库列表页面"""
    # 获取当前登录用户信息
    current_user = get_current_user()
    # 获取分页参数(页码和每页大小),最大每页100
    page, page_size = get_pagination_params(max_page_size=100)
    # 调用知识库服务,获取分页后的知识库列表结果
    result = kb_service.list(
        user_id=current_user['id'], # 用户ID
        page=page, # 页码
        page_size=page_size # 每页大小
    )
    # 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
    return render_template('kb_list.html', 
                         kbs=result['items'],
                         pagination=result)    

# 注册DELETE方法的API路由,用于删除知识库
@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
# 要求API登录
@api_login_required
# 处理API错误的装饰器
@handle_api_error
def api_delete(kb_id):
    """删除知识库"""
    # 获取当前用户信息,如果未登录则返回错误
    current_user, err = get_current_user_or_error()
    if err:
        return err

    # 根据知识库ID获取知识库信息
    kb_dict = kb_service.get_by_id(kb_id)
    # 如果知识库不存在,返回404错误
    if not kb_dict:
        return error_response("未找到知识库", 404)

    # 验证当前用户是否拥有该知识库的操作权限
    has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
    if not has_permission:
        return err

    # 调用服务删除知识库
    success = kb_service.delete(kb_id)
    # 如果删除失败,返回404错误
    if not success:
        return error_response("未找到知识库", 404)

    # 返回删除成功的响应
    return success_response("知识库删除成功")


# 注册PUT方法的API路由(用于更新知识库)
@bp.route('/api/v1/kb/<kb_id>', methods=['PUT'])
# 要求API登录
@api_login_required
# 捕获API内部错误的装饰器
@handle_api_error
def api_update(kb_id):
    # 定义API用于更新知识库(含封面图片)
    """更新知识库(支持封面图片更新)"""
    # 获取当前登录用户信息,如果未登录则返回错误响应
    current_user, err = get_current_user_or_error()
    if err:
        return err

    # 获取指定ID的知识库记录,验证其是否存在
    kb_dict = kb_service.get_by_id(kb_id)
    if not kb_dict:
        return error_response("未找到知识库", 404)

    # 校验当前用户是否有操作该知识库的权限
    has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
    if not has_permission:
        return err

    # 判断请求内容类型是否为multipart/form-data(一般用于带文件上传的表单提交)
    if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单中获取普通字段
        name = request.form.get('name')
        description = request.form.get('description') or None
        chunk_size = request.form.get('chunk_size')
        chunk_overlap = request.form.get('chunk_overlap')

        # 初始化封面图片相关变量
+       cover_image_data = None
+       cover_image_filename = None
        # 获得delete_cover字段(类型字符串,需判断是否为'true')
+       delete_cover = request.form.get('delete_cover') == 'true'

        # 如果有上传封面图片,则读取文件内容
+       if 'cover_image' in request.files:
+           cover_file = request.files['cover_image']
+           if cover_file and cover_file.filename:
+               cover_image_data = cover_file.read()
+               cover_image_filename = cover_file.filename
                # 记录上传日志
+               logger.info(f"收到知识库 {kb_id} 的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")

        # 构建待更新的数据
        update_data = {}
        if name:
            update_data['name'] = name
        if description is not None:
            update_data['description'] = description
        if chunk_size:
            update_data['chunk_size'] = int(chunk_size)
        if chunk_overlap:
            update_data['chunk_overlap'] = int(chunk_overlap)
    else:
        # 非表单上传,则按JSON结构解析请求内容
        data = request.get_json()
        # 如果请求体是空的,直接返回错误
        if not data:
            return error_response("请求体不能为空", 400)

        # 构建可更新的数据字典
        update_data = {}
        if 'name' in data:
            update_data['name'] = data['name']
        if 'description' in data:
            update_data['description'] = data.get('description')
        if 'chunk_size' in data:
            update_data['chunk_size'] = data['chunk_size']
        if 'chunk_overlap' in data:
            update_data['chunk_overlap'] = data['chunk_overlap']
        # JSON请求时,cover_image相关变量置空
+       cover_image_data = None
+       cover_image_filename = None
+       delete_cover = data.get('delete_cover', False)    

    # 调用服务更新知识库,传入各字段及封面参数
    updated_kb = kb_service.update(
+       kb_id=kb_id,                   # 知识库ID
+       cover_image_data=cover_image_data,   # 封面图片的二进制内容
+       cover_image_filename=cover_image_filename, # 封面图片文件名
+       delete_cover=delete_cover,           # 是否删除封面图片
+       **update_data                    # 其它可变字段
    )

    # 更新后如果找不到,返回404
    if not updated_kb:
        return error_response("未找到知识库", 404)

    # 更新成功后,将最新的知识库数据返回给前端
    return success_response(updated_kb, "知识库更新成功")


# 定义路由,获取指定知识库ID的封面图片,仅限登录用户访问
+@bp.route('/kb/<kb_id>/cover')
+@login_required
+def kb_cover(kb_id):
+   """获取知识库封面图片"""
    # 获取当前已登录用户的信息
+   current_user = get_current_user()
    # 根据知识库ID从知识库服务获取对应的知识库信息
+   kb = kb_service.get_by_id(kb_id)

    # 检查知识库是否存在
+   if not kb:
        # 如果知识库不存在,记录警告日志
+       logger.warning(f"知识库不存在: {kb_id}")
+       abort(404)

    # 检查是否有权限访问(只能查看自己的知识库封面)
+   if kb.get('user_id') != current_user['id']:
        # 如果不是当前用户的知识库,记录警告日志
+       logger.warning(f"用户 {current_user['id']} 尝试访问知识库 {kb_id} 的封面,但该知识库属于用户 {kb.get('user_id')}")
+       abort(403)

    # 获取知识库的封面图片路径
+   cover_path = kb.get('cover_image')
    # 检查是否有封面图片
+   if not cover_path:
        # 如果没有封面,记录调试日志
+       logger.debug(f"知识库 {kb_id} 没有封面图片")
+       abort(404)

+   try:
        # 通过存储服务下载封面图片数据
+       image_data = storage_service.download_file(cover_path)
        # 如果未能获取到图片数据,记录错误日志并返回404
+       if not image_data:
+           logger.error(f"从路径下载封面图片失败: {cover_path}")
+           abort(404)

        # 根据文件扩展名判断图片MIME类型
+       file_ext = os.path.splitext(cover_path)[1].lower()
        # 自定义映射,优先根据文件扩展名判断图片MIME类型
+       mime_type_map = {
+           '.jpg': 'image/jpeg',
+           '.jpeg': 'image/jpeg',
+           '.png': 'image/png',
+           '.gif': 'image/gif',
+           '.webp': 'image/webp'
+       }

        # 优先根据自定义映射获取MIME类型
+       mime_type = mime_type_map.get(file_ext)
+       if not mime_type:
            # 如果没有命中自定义映射,则使用mimetypes猜测类型
+           mime_type, _ = mimetypes.guess_type(cover_path)
+           if not mime_type:
                # 如果还未识别出类型,则默认用JPEG
+               mime_type = 'image/jpeg'

        # 通过send_file响应图片数据和MIME类型,不以附件形式发送
+       return send_file(
+           BytesIO(image_data),#图片数据
+           mimetype=mime_type,#MIME类型
+           as_attachment=False#不以附件形式发送
+       )
+   except FileNotFoundError as e:
        # 捕获文件未找到异常,记录错误日志
+       logger.error(f"封面图片文件未找到: {cover_path}, 错误: {e}")
+       abort(404)
+   except Exception as e:
        # 捕获其他未预期异常,记录错误日志(包含堆栈信息)
+       logger.error(f"提供知识库 {kb_id} 的封面图片时出错, 路径: {cover_path}, 错误: {e}", exc_info=True)
+       abort(404)

8.3. config.py #

app/config.py

"""
配置管理模块
"""

# 导入操作系统相关模块
import os
# 导入 Path,处理路径
from pathlib import Path
# 导入 dotenv,用于加载 .env 文件中的环境变量
from dotenv import load_dotenv

# 加载 .env 文件中的环境变量到系统环境变量
load_dotenv()

# 定义应用配置类
class Config:
    """应用配置类"""

    # 基础配置
    # 项目根目录路径(取上级目录)
    BASE_DIR = Path(__file__).parent.parent
    # 加载环境变量 SECRET_KEY,若未设置则使用默认开发密钥
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

    # 应用配置
    # 读取应用监听的主机地址,默认为本地所有地址
    APP_HOST = os.environ.get('APP_HOST', '0.0.0.0')
    # 读取应用监听的端口,默认为 5000,类型为 int
    APP_PORT = int(os.environ.get('APP_PORT', 5000))
    # 读取 debug 模式配置,字符串转小写等于 'true' 则为 True(开启调试)
    APP_DEBUG = os.environ.get('APP_DEBUG', 'false').lower() == 'true'
    # 读取允许上传的最大文件大小,默认为 100MB,类型为 int
    MAX_FILE_SIZE = int(os.environ.get('MAX_FILE_SIZE', 104857600))  # 100MB
    # 允许上传的文件扩展名集合
    ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt', 'md'}
    # 允许上传的图片扩展名集合
+   ALLOWED_IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
    # 允许上传的图片最大大小,默认为 5MB,类型为 int
+   MAX_IMAGE_SIZE = int(os.environ.get('MAX_IMAGE_SIZE', 5242880))  # 5MB

    # 日志配置
    # 日志目录,默认 './logs'
    LOG_DIR = os.environ.get('LOG_DIR', './logs')
    # 日志文件名,默认 'rag_lite.log'
    LOG_FILE = os.environ.get('LOG_FILE', 'rag_lite.log')
    # 日志等级,默认 'INFO'
    LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
    # 是否启用控制台日志,默认 True
    LOG_ENABLE_CONSOLE = os.environ.get('LOG_ENABLE_CONSOLE', 'true').lower() == 'true'
    # 是否启用文件日志,默认 True
    LOG_ENABLE_FILE = os.environ.get('LOG_ENABLE_FILE', 'true').lower() == 'true'

    # 数据库配置
    # 数据库主机地址,默认为 'localhost'
    DB_HOST = os.environ.get('DB_HOST', 'localhost')
    # 数据库端口号,默认为 3306
    DB_PORT = int(os.environ.get('DB_PORT', 3306))
    # 数据库用户名,默认为 'root'
    DB_USER = os.environ.get('DB_USER', 'root')
    # 数据库密码,默认为 'root'
    DB_PASSWORD = os.environ.get('DB_PASSWORD', 'root')
    # 数据库名称,默认为 'rag-lite'
    DB_NAME = os.environ.get('DB_NAME', 'rag-lite')
    # 数据库字符集,默认为 'utf8mb4'
    DB_CHARSET = os.environ.get('DB_CHARSET', 'utf8mb4')

    # 存储配置
+   STORAGE_TYPE = os.environ.get('STORAGE_TYPE', 'local')  # 'local' 或 'minio'
+   STORAGE_DIR = os.environ.get('STORAGE_DIR', './storages')

8.4. knowledgebase_service.py #

app/services/knowledgebase_service.py

# 知识库服务
"""
知识库服务
"""
+import os
+from app.config import Config
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService

# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 导入存储服务
+from app.services.storage_service import storage_service
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
    """知识库服务"""
    # 定义创建知识库的方法
    def create(self, name: str, user_id: str, description: str = None, 
+              chunk_size: int = 512, chunk_overlap: int = 50,
+              cover_image_data: bytes = None, cover_image_filename: str = None) -> dict:
        """
        创建知识库

        Args:
            name: 知识库名称
            user_id: 用户ID
            description: 描述
            chunk_size: 分块大小
            chunk_overlap: 分块重叠
+           cover_image_data: 封面图片数据(可选)
+           cover_image_filename: 封面图片文件名(可选)

        Returns:
            创建的知识库字典
        """
+       cover_image_path = None

        # 处理封面图片上传
+       if cover_image_data and cover_image_filename:
            # 验证文件类型
+           file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
+           if not file_ext_without_dot:
+               raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")

+           if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
+               raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")

            # 验证文件大小
+           if len(cover_image_data) == 0:
+               raise ValueError("上传的图片文件为空")

+           if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
+               raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")
        # 启动数据库事务,上下文管理器自动处理提交或回滚
        with self.transaction() as session:
            # 先创建知识库对象
            kb = Knowledgebase(
                name=name,  # 设置知识库名称
                user_id=user_id,  # 设置用户ID
                description=description,  # 设置知识库描述
                chunk_size=chunk_size,  # 设置分块大小
                chunk_overlap=chunk_overlap  # 设置分块重叠
            )
            # 将知识库对象添加到session
            session.add(kb)
            # 刷新session,生成知识库ID
            session.flush()  # 刷新以获取 ID,但不提交

            # 上传封面图片(如果有)
+           if cover_image_data and cover_image_filename:
+               try:
                    # 构建封面图片路径(统一使用小写扩展名)
+                   file_ext_with_dot = os.path.splitext(cover_image_filename)[1].lower()
+                   cover_image_path = f"covers/{kb.id}{file_ext_with_dot}"

+                   self.logger.info(f"正在为新知识库 {kb.id} 上传封面图片: 文件名={cover_image_filename}, 路径={cover_image_path}, 大小={len(cover_image_data)} 字节")

                    # 上传到存储
+                   storage_service.upload_file(cover_image_path, cover_image_data)

                    # 验证文件是否成功上传
+                   if not storage_service.file_exists(cover_image_path):
+                       raise ValueError(f"上传后文件不存在: {cover_image_path}")

+                   self.logger.info(f"成功上传封面图片: {cover_image_path}")

                    # 更新知识库的封面路径
+                   kb.cover_image = cover_image_path
+                   session.flush()
+               except Exception as e:
+                   self.logger.error(f"上传知识库 {kb.id} 的封面图片时出错: {e}", exc_info=True)
                    # 如果上传失败,继续创建知识库,但不设置封面
+                   cover_image_path = None

            # 刷新kb对象的数据库状态
            session.refresh(kb)
            # 转换kb对象为字典(在session内部,避免分离后出错)
            kb_dict = kb.to_dict()
            # 记录创建知识库的日志,包含ID
            self.logger.info(f"创建了知识库,ID: {kb.id}")
            # 返回知识库字典信息
            return kb_dict

    # 定义获取知识库列表的方法
    def list(self, user_id: str = None, page: int = 1, page_size: int = 10) -> Dict:
        """
        获取知识库列表

        Args:
            user_id: 用户ID(可选)
            page: 页码
            page_size: 每页数量

        Returns:
            包含 items, total, page, page_size 的字典
        """
        # 使用数据库会话
        with self.session() as session:
            # 查询Knowledgebase表
            query = session.query(Knowledgebase)
            # 如果指定了user_id,则筛选属于该用户的知识库
            if user_id:
                query = query.filter(Knowledgebase.user_id == user_id)
            # 统计总记录数
            total = query.count()
            # 计算分页偏移量
            offset = (page - 1) * page_size
            # 获取当前页的数据列表
            kbs = query.offset(offset).limit(page_size).all()

            # 初始化知识库字典列表
            items = []
            # 遍历查询结果,将每一项转为dict后添加到items列表
            for kb in kbs:
                kb_dict = kb.to_dict()
                items.append(kb_dict)

            # 返回包含分页信息和数据条目的字典
            return {
                'items': items,
                'total': total,
                'page': page,
                'page_size': page_size
            }     

    def delete(self, kb_id: str) -> bool:
        """
        删除知识库

        Args:
            kb_id: 知识库ID

        Returns:
            是否删除成功
        """
        with self.transaction() as session:
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            if not kb:
                return False
            session.delete(kb)
            self.logger.info(f"Deleted knowledgebase: {kb_id}")
            return True    

    def get_by_id(self, kb_id: str) -> Optional[dict]:
        """根据ID获取知识库"""
        with self.session() as session:
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            if kb:
                # 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
                return kb.to_dict()
            return None   
    # 定义 update 方法,用于更新知识库
+   def update(self, kb_id: str, cover_image_data: bytes = None, 
+              cover_image_filename: str = None, delete_cover: bool = False, **kwargs) -> Optional[dict]:
        """
        更新知识库

        Args:
            kb_id: 知识库ID
            cover_image_data: 新的封面图片数据(可选)
            cover_image_filename: 新的封面图片文件名(可选)
            delete_cover: 是否删除封面图片(可选)
            **kwargs: 要更新的字段(name, description, chunk_size, chunk_overlap 等)

        Returns:
            更新后的知识库字典,如果不存在则返回 None
        """
        # 开启数据库事务
        with self.transaction() as session:
            # 查询指定ID的知识库对象
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            # 如果未找到知识库,则返回 None
            if not kb:
                return None

            # 处理封面图片更新
+           old_cover_path = kb.cover_image if kb.cover_image else None

+           if delete_cover:
                # 删除封面图片
+               if old_cover_path:
+                   try:
+                       storage_service.delete_file(old_cover_path)
+                       self.logger.info(f"已删除封面图片: {old_cover_path}")
+                   except Exception as e:
+                       self.logger.warning(f"删除封面图片时出错: {e}")
+               kwargs['cover_image'] = None
+           elif cover_image_data and cover_image_filename:
                # 上传新封面图片
                # 验证文件类型
+               file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
+               if not file_ext_without_dot:
+                   raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")

+               if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
+                   raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")

                # 验证文件大小
+               if len(cover_image_data) == 0:
+                   raise ValueError("上传的图片文件为空")

+               if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
+                   raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")

+               try:
                    # 构建新封面图片路径(使用原始扩展名,保持大小写)
+                   file_ext_with_dot = os.path.splitext(cover_image_filename)[1]
                    # 统一使用小写扩展名,避免路径不一致的问题
+                   file_ext_with_dot = file_ext_with_dot.lower()
+                   new_cover_path = f"covers/{kb_id}{file_ext_with_dot}"

+                   self.logger.info(f"正在处理知识库 {kb_id} 的封面图片更新: 文件名={cover_image_filename}, 扩展名={file_ext_without_dot}, 大小={len(cover_image_data)} 字节, 新路径={new_cover_path}, 旧路径={old_cover_path}")

                    # 先上传新封面(确保上传成功后再删除旧封面)
+                   storage_service.upload_file(new_cover_path, cover_image_data)

                    # 验证文件是否成功上传
+                   if not storage_service.file_exists(new_cover_path):
+                       raise ValueError(f"上传后文件不存在: {new_cover_path}")

+                   self.logger.info(f"成功上传封面图片: {new_cover_path}")

                    # 删除旧封面(如果存在且与新封面路径不同)
+                   if old_cover_path and old_cover_path != new_cover_path:
+                       try:
+                           storage_service.delete_file(old_cover_path)
+                           self.logger.info(f"已删除旧封面图片: {old_cover_path}")
+                       except Exception as e:
+                           self.logger.warning(f"删除旧封面图片时出错: {e}")
                            # 继续执行,不因为删除旧文件失败而中断更新

                    # 更新数据库中的封面路径
+                   kwargs['cover_image'] = new_cover_path
+               except Exception as e:
+                   self.logger.error(f"上传知识库 {kb_id} 的封面图片时出错: {e}", exc_info=True)
+                   raise ValueError(f"上传封面图片失败: {str(e)}")    

            # 遍历要更新的字段和值
            for key, value in kwargs.items():
                # 判断知识库对象是否有该字段,且值不为 None
+               if hasattr(kb, key) and (key == 'cover_image' or value is not None):
                    # 设置该字段的新值
                    setattr(kb, key, value)

            # 刷新session,保证对象属性为最新状态
            session.flush()
            # 刷新对象,避免未提交前读取到旧数据
            session.refresh(kb)

            # 在事务内部将对象转为字典,避免 session 关闭后访问失败
            kb_dict = kb.to_dict()

            # 如果本次更新包含 'cover_image' 字段,记录详细日志
            if 'cover_image' in kwargs:
                self.logger.info(f"更新知识库 {kb_id}, 封面图片={kb_dict.get('cover_image')}")
            else:
                # 否则仅记录知识库ID
                self.logger.info(f"更新知识库: {kb_id}")

            # 返回更新后的知识库字典
            return kb_dict

# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()    

8.5. kb_list.html #

app/templates/kb_list.html

{% extends "base.html" %}

{% block title %}知识库管理 - RAG Lite{% endblock %}

{% block content %}
<style>
@media (min-width: 992px) {
    #kbList > div {
        flex: 0 0 20%;
        max-width: 20%;
    }
}
</style>
<div class="row">
    <div class="col-12">
        <nav aria-label="breadcrumb" class="mb-3">
            <ol class="breadcrumb">
                <li class="breadcrumb-item"><a href="/">首页</a></li>
                <li class="breadcrumb-item active">知识库管理</li>
            </ol>
        </nav>

        <div class="d-flex justify-content-between align-items-center mb-4">
            <h2><i class="bi bi-collection"></i> 知识库管理</h2>
            <button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
                <i class="bi bi-plus-circle"></i> 创建知识库
            </button>
        </div>
        <!-- 知识库列表 -->
        <div class="row" id="kbList">
            {% if kbs %}
                {% for kb in kbs %}
                <div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
                    <div class="card h-100">
+                       {% if kb.cover_image %}
+                       <img src="/kb/{{ kb.id }}/cover" class="card-img-top" alt="{{ kb.name|e }}" style="height: 150px; object-fit: scale-down;">
+                       {% else %}
                        <div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
                            <i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
                        </div>
+                       {% endif %}
                        <div class="card-body">
                            <h5 class="card-title">
                                <i class="bi bi-folder"></i> {{ kb.name }}
                            </h5>
                            <p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
                        </div>
                        <div class="card-footer bg-transparent">
                            <button class="btn btn-sm btn-warning" 
                                    data-kb-id="{{ kb.id }}"
                                    data-kb-name="{{ kb.name }}"
                                    data-kb-description="{{ kb.description or '' }}"
                                    data-kb-chunk-size="{{ kb.chunk_size }}"
                                    data-kb-chunk-overlap="{{ kb.chunk_overlap }}"
                                    data-kb-cover-image="{{ kb.cover_image or '' }}"
                                    onclick="editKbFromButton(this)">
                                <i class="bi bi-pencil"></i> 编辑
                            </button>
                            <button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
                                <i class="bi bi-trash"></i> 删除
                            </button>
                        </div>
                    </div>
                </div>
                {% endfor %}
            {% else %}
                <div class="col-12">
                    <div class="alert alert-info">
                        <i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
                    </div>
                </div>
            {% endif %}
        </div>

        <!-- 分页控件 -->
        {% if pagination and pagination.total > pagination.page_size %}
        <nav aria-label="知识库列表分页" class="mt-4">
            <ul class="pagination justify-content-center">
                {% set current_page = pagination.page %}
                {% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}

                <!-- 上一页 -->
                <li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}" 
                       {% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
                        <i class="bi bi-chevron-left"></i> 上一页
                    </a>
                </li>

                <!-- 页码 -->
                {% set start_page = [1, current_page - 2] | max %}
                {% set end_page = [total_pages, current_page + 2] | min %}

                {% if start_page > 1 %}
                <li class="page-item">
                    <a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
                </li>
                {% if start_page > 2 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                {% endif %}

                {% for page_num in range(start_page, end_page + 1) %}
                <li class="page-item {% if page_num == current_page %}active{% endif %}">
                    <a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
                        {{ page_num }}
                    </a>
                </li>
                {% endfor %}

                {% if end_page < total_pages %}
                {% if end_page < total_pages - 1 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                <li class="page-item">
                    <a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
                </li>
                {% endif %}

                <!-- 下一页 -->
                <li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
                       {% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
                        下一页 <i class="bi bi-chevron-right"></i>
                    </a>
                </li>
            </ul>
            <div class="text-center text-muted small mt-2">
                共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
            </div>
        </nav>
        {% endif %}
    </div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
    <div class="modal-dialog">
        <div class="modal-content">
            <div class="modal-header">
                <h5 class="modal-title">创建知识库</h5>
                <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
            </div>
            <form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
                <div class="modal-body">
                    <div class="mb-3">
                        <label class="form-label">名称 <span class="text-danger">*</span></label>
                        <input type="text" class="form-control" name="name" required>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">描述</label>
                        <textarea class="form-control" name="description" rows="3"></textarea>
                    </div>
+                   <div class="mb-3">
+                       <label class="form-label">封面图片(可选)</label>
+                       <input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="coverImageInput">
+                       <div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB</div>
+                       <div id="coverImagePreview" class="mt-2" style="display: none;">
+                           <img id="coverPreviewImg" src="" alt="封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
+                       </div>
+                   </div>
                    <div class="row">
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块大小</label>
                            <input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
                            <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
                        </div>
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块重叠</label>
                            <input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
                            <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
                        </div>
                    </div>
                </div>
                <div class="modal-footer">
                    <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
                    <button type="submit" class="btn btn-primary">创建</button>
                </div>
            </form>
        </div>
    </div>
</div>

<!-- 编辑知识库模态框 -->
<div class="modal fade" id="editKbModal" tabindex="-1">
    <div class="modal-dialog">
        <div class="modal-content">
            <div class="modal-header">
                <h5 class="modal-title">编辑知识库</h5>
                <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
            </div>
            <form id="editKbForm" onsubmit="updateKb(event)" enctype="multipart/form-data">
                <input type="hidden" name="kb_id" id="editKbId">
                <div class="modal-body">
                    <div class="mb-3">
                        <label class="form-label">名称 <span class="text-danger">*</span></label>
                        <input type="text" class="form-control" name="name" id="editKbName" required>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">描述</label>
                        <textarea class="form-control" name="description" id="editKbDescription" rows="3"></textarea>
                    </div>
+                   <div class="mb-3">
+                       <label class="form-label">封面图片</label>
+                       <div id="editCoverPreview" class="mb-2">
+                           <img id="editCoverPreviewImg" src="" alt="当前封面" class="img-thumbnail" style="max-width: 200px; max-height: 200px; display: none;">
+                           <div id="editCoverNoImage" class="text-muted small" style="display: none;">暂无封面</div>
+                       </div>
+                       <input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="editCoverImageInput">
+                       <div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB。留空则不修改封面。</div>
+                       <div class="form-check mt-2">
+                           <input class="form-check-input" type="checkbox" name="delete_cover" id="editDeleteCover" value="true">
+                           <label class="form-check-label" for="editDeleteCover">
+                               删除封面图片
+                           </label>
+                       </div>
+                       <div id="editCoverNewPreview" class="mt-2" style="display: none;">
+                           <img id="editCoverNewPreviewImg" src="" alt="新封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
+                       </div>
+                   </div>
                    <div class="row">
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块大小</label>
                            <input type="number" class="form-control" name="chunk_size" id="editKbChunkSize" value="512" min="100" max="2000">
                            <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
                        </div>
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块重叠</label>
                            <input type="number" class="form-control" name="chunk_overlap" id="editKbChunkOverlap" value="50" min="0" max="200">
                            <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
                        </div>
                    </div>
                </div>
                <div class="modal-footer">
                    <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
                    <button type="submit" class="btn btn-primary">保存</button>
                </div>
            </form>
        </div>
    </div>
</div>
{% endblock %}

{% block extra_js %}
<script>
+// 异步函数,用于创建知识库
async function createKb(event) {
+   // 阻止表单默认提交
    event.preventDefault();
+   // 获取表单对象
    const form = event.target;
+   // 构造 FormData,收集表单数据
    const formData = new FormData(form);

    try {
+       // 发送 POST 请求到后端 API,提交表单数据
        const response = await fetch('/api/v1/kb', {
            method: 'POST',
+           // body为FormData,浏览器会自动设置Content-Type
+           body: formData  
        });

+       // 如果响应成功,刷新页面
        if (response.ok) {
            location.reload();
        } else {
+           // 否则获取错误信息并弹窗提示
            const error = await response.json();
            alert('创建失败: ' + error.message);
        }
    } catch (error) {
+       // 捕获异常并弹窗提示用户
        alert('创建失败: ' + error.message);
    }
}

+// 异步函数,用于删除知识库
async function deleteKb(kbId, kbName) {
+   // 弹窗确认是否删除知识库
    if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
        return;
    }
    try {
+       // 发送 DELETE 请求到后端 API
        const response = await fetch(`/api/v1/kb/${kbId}`, {
            method: 'DELETE'
        });

+       // 如果响应成功,刷新页面
        if (response.ok) {
            location.reload();
        } else {
+           // 否则弹窗提示错误信息
            const error = await response.json();
            alert('删除失败: ' + error.message);
        }
    } catch (error) {
+       // 捕获异常并弹窗提示
        alert('删除失败: ' + error.message);
    }
}

+// 从按钮的 data 属性读取知识库数据,然后打开编辑界面
function editKbFromButton(button) {
+   // 获取知识库ID
    const kbId = button.getAttribute('data-kb-id');
+   // 获取知识库名称
    const name = button.getAttribute('data-kb-name');
+   // 获取描述,默认为空字符串
    const description = button.getAttribute('data-kb-description') || '';
+   // 获取分块大小,默认512
    const chunkSize = parseInt(button.getAttribute('data-kb-chunk-size')) || 512;
+   // 获取分块重叠,默认50
    const chunkOverlap = parseInt(button.getAttribute('data-kb-chunk-overlap')) || 50;
+   // 获取封面图片路径,默认为空
+   const coverImage = button.getAttribute('data-kb-cover-image') || '';

+   // 调用编辑函数,填充数据到表单
+   editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage);
}

+// 编辑知识库时弹出模态框并初始化数据
+function editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage) {
+   // 设置表单的知识库ID
    document.getElementById('editKbId').value = kbId;
+   // 设置知识库名称
    document.getElementById('editKbName').value = name;
+   // 设置描述
    document.getElementById('editKbDescription').value = description || '';
+   // 设置分块大小
    document.getElementById('editKbChunkSize').value = chunkSize;
+   // 设置分块重叠
    document.getElementById('editKbChunkOverlap').value = chunkOverlap;
+   // 初始化不勾选删除封面
+   document.getElementById('editDeleteCover').checked = false;
+   // 清空已选择的新封面文件
+   document.getElementById('editCoverImageInput').value = '';

+   // 获取当前封面预览图片元素
+   const previewImg = document.getElementById('editCoverPreviewImg');
+   // 获取“暂无封面”提示元素
+   const noImageDiv = document.getElementById('editCoverNoImage');
+   // 获取新封面预览div
+   const newPreview = document.getElementById('editCoverNewPreview');
+   // 获取新封面预览图片元素
+   const newPreviewImg = document.getElementById('editCoverNewPreviewImg');

+   // 隐藏新图片预览
+   if (newPreview) {
+       newPreview.style.display = 'none';
+   }
+   // 清空新图片预览src
+   if (newPreviewImg) {
+       newPreviewImg.src = '';
+   }

+   // 如果有旧封面,则显示
+   if (coverImage) {
+       previewImg.src = `/kb/${kbId}/cover`;
+       previewImg.style.display = 'block';
+       noImageDiv.style.display = 'none';
+   } else {
+       // 否则显示“暂无封面”
+       previewImg.style.display = 'none';
+       noImageDiv.style.display = 'block';
+   }

+   // 展示编辑知识库的模态框
    const modal = new bootstrap.Modal(document.getElementById('editKbModal'));
    modal.show();
}


+// 获取编辑时选择封面图片的input
+const editCoverImageInput = document.getElementById('editCoverImageInput');
+// 如果找到了input,则监听change事件
+if (editCoverImageInput) {
+   editCoverImageInput.addEventListener('change', function(e) {
+       // 获取用户选择的第一个文件
+       const file = e.target.files[0];
+       // 获取新封面预览容器
+       const newPreview = document.getElementById('editCoverNewPreview');
+       // 获取新封面预览图片元素
+       const newPreviewImg = document.getElementById('editCoverNewPreviewImg');
+       // 获取删除封面复选框
+       const deleteCheckbox = document.getElementById('editDeleteCover');

+       // 如果有选文件
+       if (file) {
+           // 定义允许的图片类型
+           const validTypes = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif', 'image/webp'];
+           // 如果不符合要求的格式,弹窗提示并重置
+           if (!validTypes.includes(file.type)) {
+               alert('不支持的图片格式,请选择 JPG、PNG、GIF 或 WEBP 格式的图片');
+               e.target.value = '';
+               if (newPreview) {
+                   newPreview.style.display = 'none';
+               }
+               return;
+           }

+           // 如果图片超过5MB,弹窗提示并重置
+           if (file.size > 5 * 1024 * 1024) {
+               alert('图片文件大小超过 5MB 限制');
+               e.target.value = '';
+               if (newPreview) {
+                   newPreview.style.display = 'none';
+               }
+               return;
+           }

+           // 文件读取器,用于预览图片
+           const reader = new FileReader();
+           // 文件读取完成后显示预览
+           reader.onload = function(event) {
+               if (newPreviewImg) {
+                   newPreviewImg.src = event.target.result;
+               }
+               if (newPreview) {
+                   newPreview.style.display = 'block';
+               }
+               // 选择新图片时自动取消删除封面的选项
+               if (deleteCheckbox) {
+                   deleteCheckbox.checked = false; 
+               }
+           };
+           // 读取图片失败时弹窗提示
+           reader.onerror = function() {
+               alert('读取图片文件失败,请重试');
+               e.target.value = '';
+               if (newPreview) {
+                   newPreview.style.display = 'none';
+               }
+           };
+           // 以DataURL形式读取以便预览
+           reader.readAsDataURL(file);
+       } else {
+           // 未选择文件则隐藏新封面预览
+           if (newPreview) {
+               newPreview.style.display = 'none';
+           }
+       }
+   });
+}

+// 监听删除封面复选框的变化
+document.getElementById('editDeleteCover')?.addEventListener('change', function(e) {
+   // 获取封面图片上传input
+   const fileInput = document.getElementById('editCoverImageInput');
+   // 获取新封面预览div
+   const newPreview = document.getElementById('editCoverNewPreview');

+   // 如果选中“删除封面”,则清空图片和隐藏新封面预览
+   if (e.target.checked) {
+       fileInput.value = ''; // 清空文件选择
+       newPreview.style.display = 'none';
+   }
+});

+// 异步函数,用于更新知识库
async function updateKb(event) {
+   // 阻止表单默认提交
    event.preventDefault();
+   // 获取表单对象
    const form = event.target;
+   // 获取表单数据
    const formData = new FormData(form);
+   // 从formData中获取知识库ID
    const kbId = formData.get('kb_id');

    try {
+       // 发送PUT请求到后端API更新知识库
        const response = await fetch(`/api/v1/kb/${kbId}`, {
            method: 'PUT',
            body: formData
        });

+       // 如果更新成功,刷新页面
        if (response.ok) {
            location.reload();
        } else {
+           // 否则弹窗显示错误
            const error = await response.json();
            alert('更新失败: ' + error.message);
        }
    } catch (error) {
+       // 捕获异常弹窗显示
        alert('更新失败: ' + error.message);
    }
}
+// 获取封面图片文件输入框并监听change事件(当用户选择文件时触发)
+document.getElementById('coverImageInput')?.addEventListener('change', function(e) {
+   // 获取用户选中的第一个文件
+   const file = e.target.files[0];
+   // 获取用于显示预览的容器
+   const preview = document.getElementById('coverImagePreview');
+   // 获取显示图片的img标签
+   const previewImg = document.getElementById('coverPreviewImg');

+   // 如果用户选择了文件
+   if (file) {
+       // 创建文件读取器
+       const reader = new FileReader();
+       // 文件读取完成后回调
+       reader.onload = function(e) {
+           // 将img标签的src设置为读取得到的图片数据
+           previewImg.src = e.target.result;
+           // 显示预览容器
+           preview.style.display = 'block';
+       };
+       // 以DataURL的形式读取图片文件
+       reader.readAsDataURL(file);
+   } else {
+       // 如果没有选择文件,则隐藏预览
+       preview.style.display = 'none';
+   }
+});
</script>
{% endblock %}

8.6 storage__init__.py #

app\services\storage__init__.py

"""
存储服务模块
提供统一的存储接口,支持多种存储后端
"""
from app.services.storage.factory import StorageFactory

__all__ = [
    'StorageFactory'
]

8.7 storage\base.py #

app\services\storage\base.py

"""
存储服务抽象接口
"""
# 导入抽象基类和抽象方法装饰器
from abc import ABC, abstractmethod
# 导入可选类型提示
from typing import Optional

# 定义存储服务抽象接口类,继承自ABC
class StorageInterface(ABC):
    """存储服务抽象接口"""

    # 定义抽象方法:上传文件
    @abstractmethod
    def upload_file(self, file_path: str, file_data: bytes, 
                   content_type: str = 'application/octet-stream') -> str:
        """
        上传文件

        Args:
            file_path: 文件路径(相对路径)
            file_data: 文件数据(bytes)
            content_type: 内容类型

        Returns:
            文件路径
        """
        # 方法体由子类实现
        pass

    # 定义抽象方法:下载文件
    @abstractmethod
    def download_file(self, file_path: str) -> bytes:
        """
        下载文件

        Args:
            file_path: 文件路径

        Returns:
            文件数据(bytes)
        """
        # 方法体由子类实现
        pass

    # 定义抽象方法:删除文件
    @abstractmethod
    def delete_file(self, file_path: str) -> None:
        """
        删除文件

        Args:
            file_path: 文件路径
        """
        # 方法体由子类实现
        pass

    # 定义抽象方法:检查文件是否存在
    @abstractmethod
    def file_exists(self, file_path: str) -> bool:
        """
        检查文件是否存在

        Args:
            file_path: 文件路径

        Returns:
            是否存在
        """
        # 方法体由子类实现
        pass

    # 定义抽象方法:获取文件访问URL,可选
    @abstractmethod
    def get_file_url(self, file_path: str, expires_in: Optional[int] = None) -> Optional[str]:
        """
        获取文件访问URL(可选,某些存储后端支持)

        Args:
            file_path: 文件路径
            expires_in: URL过期时间(秒),None表示永久

        Returns:
            文件URL,如果不支持则返回None
        """
        # 方法体由子类实现
        pass

8.8 storage\factory.py #

app\services\storage\factory.py

"""
存储服务工厂
"""
# 导入日志模块
import logging
# 导入可选类型提示
from typing import Optional
# 导入存储服务接口
from app.services.storage.base import StorageInterface
# 导入本地存储服务实现
from app.services.storage.local_storage import LocalStorage
# 导入配置
from app.config import Config

# 获取logger实例
logger = logging.getLogger(__name__)


# 定义存储服务工厂类
class StorageFactory:
    """存储服务工厂"""

    # 定义用于实现单例的静态属性
    _instance: Optional[StorageInterface] = None

    # 定义创建存储服务实例的类方法
    @classmethod
    def create_storage(cls, storage_type: Optional[str] = None, **kwargs) -> StorageInterface:
        """
        创建存储服务实例

        Args:
            storage_type: 存储类型 ('local' 或 'minio'),如果为None则从配置读取
            **kwargs: 存储服务的初始化参数

        Returns:
            存储服务实例
        """
        # 如果没有传递storage_type,则优先从配置读取(默认local)
        if storage_type is None:
            storage_type = getattr(Config, 'STORAGE_TYPE', 'local')

        # 统一转为小写
        storage_type = storage_type.lower()

        # 判断存储类型,如果是local则返回本地存储实例
        if storage_type == 'local':
            # 获取可选的存储目录参数
            storage_dir = kwargs.get('storage_dir')
            return LocalStorage(storage_dir=storage_dir)  
        else:
            # 不支持的类型抛出异常
            raise ValueError(f"Unsupported storage type: {storage_type}")

    # 定义获取实例的类方法(用于单例懒加载)
    @classmethod
    def get_instance(cls) -> StorageInterface:
        """
        获取单例存储服务实例(懒加载)

        Returns:
            存储服务实例
        """
        # 如果还未创建实例,则创建一个
        if cls._instance is None:
            cls._instance = cls.create_storage()
        # 返回单例
        return cls._instance

8.9 local_storage.py #

app\services\storage\local_storage.py

"""
本地文件系统存储实现
"""
# 导入os模块
import os
# 导入日志模块
import logging
# 导入路径操作类
from pathlib import Path
# 导入可选类型提示
from typing import Optional
# 导入存储接口基类
from app.services.storage.base import StorageInterface
# 导入配置信息
from app.config import Config

# 获取logger实例
logger = logging.getLogger(__name__)


class LocalStorage(StorageInterface):
    """本地文件系统存储实现"""

    # 初始化本地存储
    def __init__(self, storage_dir: Optional[str] = None):
        """
        初始化本地存储

        Args:
            storage_dir: 存储目录,如果为None则使用配置中的值
        """
        # 如果没有传入存储目录,则用配置中的存储目录
        if storage_dir is None:
            storage_dir = Config.STORAGE_DIR

        # 判断路径是否为绝对路径
        if os.path.isabs(storage_dir):
            # 如果是绝对路径,直接用
            self.storage_dir = Path(storage_dir)
        else:
            # 如果是相对路径,则以项目根目录为基准
            base_dir = Path(__file__).parent.parent.parent.parent
            self.storage_dir = base_dir / storage_dir

        # 确保存储目录存在,创建多级目录
        self.storage_dir.mkdir(parents=True, exist_ok=True)
        # 记录初始化日志
        logger.info(f"本地存储已初始化,目录:{self.storage_dir}")

    # 获取文件的完整路径
    def _get_full_path(self, file_path: str) -> Path:
        """获取文件的完整路径"""
        # 拼接存储目录和文件路径
        return self.storage_dir / file_path

    # 上传文件到本地存储
    def upload_file(self, file_path: str, file_data: bytes, 
                   content_type: str = 'application/octet-stream') -> str:
        """上传文件到本地存储"""
        try:
            # 获取文件完整路径
            full_path = self._get_full_path(file_path)
            # 创建父目录(如果不存在)
            full_path.parent.mkdir(parents=True, exist_ok=True)

            # 写入文件内容
            with open(full_path, 'wb') as f:
                f.write(file_data)

            # 记录日志
            logger.info(f"文件已上传:{file_path}")
            # 返回文件相对路径
            return file_path
        except Exception as e:
            # 上传失败,记录错误日志,继续抛出异常
            logger.error(f"上传文件出错:{e}")
            raise

    # 从本地存储下载文件
    def download_file(self, file_path: str) -> bytes:
        """从本地存储下载文件"""
        try:
            # 获取文件完整路径
            full_path = self._get_full_path(file_path)
            # 文件不存在则抛出异常
            if not full_path.exists():
                raise FileNotFoundError(f"文件不存在:{file_path}")

            # 读取文件内容
            with open(full_path, 'rb') as f:
                data = f.read()

            # 记录日志
            logger.info(f"文件已下载:{file_path}")
            # 返回文件数据
            return data
        except Exception as e:
            # 下载出现异常,记录日志并抛出异常
            logger.error(f"下载文件出错:{e}")
            raise

    # 删除本地存储文件
    def delete_file(self, file_path: str) -> None:
        """删除文件"""
        try:
            # 获取文件完整路径
            full_path = self._get_full_path(file_path)
            # 如果文件存在则删除
            if full_path.exists():
                full_path.unlink()
                # 记录日志
                logger.info(f"文件已删除:{file_path}")

                # 尝试删除父目录(为空才能删,主要防止目录冗余)
                try:
                    full_path.parent.rmdir()
                except OSError:
                    # 如果目录不为空,忽略该异常
                    pass
        except Exception as e:
            # 删除文件时出错
            logger.error(f"删除文件出错:{e}")
            raise

    # 检查文件是否存在
    def file_exists(self, file_path: str) -> bool:
        """检查文件是否存在"""
        # 获取文件完整路径
        full_path = self._get_full_path(file_path)
        # 判断文件是否存在
        return full_path.exists()

    # 获取文件URL,本地存储不支持直接返回None
    def get_file_url(self, file_path: str, expires_in: Optional[int] = None) -> Optional[str]:
        """本地存储不支持URL,返回None"""
        return None

8.10. config.py #

app/config.py

"""
配置管理模块
"""

# 导入操作系统相关模块
import os
# 导入 Path,处理路径
from pathlib import Path
# 导入 dotenv,用于加载 .env 文件中的环境变量
from dotenv import load_dotenv

# 加载 .env 文件中的环境变量到系统环境变量
load_dotenv()

# 定义应用配置类
class Config:
    """应用配置类"""

    # 基础配置
    # 项目根目录路径(取上级目录)
    BASE_DIR = Path(__file__).parent.parent
    # 加载环境变量 SECRET_KEY,若未设置则使用默认开发密钥
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

    # 应用配置
    # 读取应用监听的主机地址,默认为本地所有地址
    APP_HOST = os.environ.get('APP_HOST', '0.0.0.0')
    # 读取应用监听的端口,默认为 5000,类型为 int
    APP_PORT = int(os.environ.get('APP_PORT', 5000))
    # 读取 debug 模式配置,字符串转小写等于 'true' 则为 True(开启调试)
    APP_DEBUG = os.environ.get('APP_DEBUG', 'false').lower() == 'true'
    # 读取允许上传的最大文件大小,默认为 100MB,类型为 int
    MAX_FILE_SIZE = int(os.environ.get('MAX_FILE_SIZE', 104857600))  # 100MB
    # 允许上传的文件扩展名集合
    ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt', 'md'}
    # 允许上传的图片扩展名集合
    ALLOWED_IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
    # 允许上传的图片最大大小,默认为 5MB,类型为 int
    MAX_IMAGE_SIZE = int(os.environ.get('MAX_IMAGE_SIZE', 5242880))  # 5MB

    # 日志配置
    # 日志目录,默认 './logs'
    LOG_DIR = os.environ.get('LOG_DIR', './logs')
    # 日志文件名,默认 'rag_lite.log'
    LOG_FILE = os.environ.get('LOG_FILE', 'rag_lite.log')
    # 日志等级,默认 'INFO'
    LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
    # 是否启用控制台日志,默认 True
    LOG_ENABLE_CONSOLE = os.environ.get('LOG_ENABLE_CONSOLE', 'true').lower() == 'true'
    # 是否启用文件日志,默认 True
    LOG_ENABLE_FILE = os.environ.get('LOG_ENABLE_FILE', 'true').lower() == 'true'

    # 数据库配置
    # 数据库主机地址,默认为 'localhost'
    DB_HOST = os.environ.get('DB_HOST', 'localhost')
    # 数据库端口号,默认为 3306
    DB_PORT = int(os.environ.get('DB_PORT', 3306))
    # 数据库用户名,默认为 'root'
    DB_USER = os.environ.get('DB_USER', 'root')
    # 数据库密码,默认为 'root'
    DB_PASSWORD = os.environ.get('DB_PASSWORD', 'root')
    # 数据库名称,默认为 'rag-lite'
    DB_NAME = os.environ.get('DB_NAME', 'rag-lite')
    # 数据库字符集,默认为 'utf8mb4'
    DB_CHARSET = os.environ.get('DB_CHARSET', 'utf8mb4')

+   # 存储配置
+   STORAGE_TYPE = os.environ.get('STORAGE_TYPE', 'local')  # 'local' 或 'minio'
+   STORAGE_DIR = os.environ.get('STORAGE_DIR', './storage')

9.保存封面到Minio #

  • Minio
    # 说明:Windows PowerShell 中下载并运行 MinIO
    Invoke-WebRequest -Uri https://dl.min.io/server/minio/release/windows-amd64/minio.exe -OutFile minio.exe
    .\minio.exe server D:\minio-data --console-address ":9001"

9.1. factory.py #

app/services/storage/factory.py

"""
存储服务工厂
"""
# 导入日志模块
import logging
# 导入可选类型提示
from typing import Optional
# 导入存储服务接口
from app.services.storage.base import StorageInterface
# 导入本地存储服务实现
from app.services.storage.local_storage import LocalStorage
# 导入MinIO存储服务实现
+from app.services.storage.minio_storage import MinIOStorage
# 导入配置
from app.config import Config

# 获取logger实例
logger = logging.getLogger(__name__)


# 定义存储服务工厂类
class StorageFactory:
    """存储服务工厂"""

    # 定义用于实现单例的静态属性
    _instance: Optional[StorageInterface] = None

    # 定义创建存储服务实例的类方法
    @classmethod
    def create_storage(cls, storage_type: Optional[str] = None, **kwargs) -> StorageInterface:
        """
        创建存储服务实例

        Args:
            storage_type: 存储类型 ('local' 或 'minio'),如果为None则从配置读取
            **kwargs: 存储服务的初始化参数

        Returns:
            存储服务实例
        """
        # 如果没有传递storage_type,则优先从配置读取(默认local)
        if storage_type is None:
            storage_type = getattr(Config, 'STORAGE_TYPE', 'local')

        # 统一转为小写
        storage_type = storage_type.lower()

        # 判断存储类型,如果是local则返回本地存储实例
        if storage_type == 'local':
            # 获取可选的存储目录参数
            storage_dir = kwargs.get('storage_dir')
            return LocalStorage(storage_dir=storage_dir)
+       # 判断存储类型是否为 'minio'
+       elif storage_type == 'minio':
+           # 优先从参数中获取 endpoint,否则从配置中获取 MINIO_ENDPOINT,没有则为''
+           endpoint = kwargs.get('endpoint') or getattr(Config, 'MINIO_ENDPOINT', '')
+           # 优先从参数中获取 access_key,否则从配置中获取 MINIO_ACCESS_KEY,没有则为''
+           access_key = kwargs.get('access_key') or getattr(Config, 'MINIO_ACCESS_KEY', '')
+           # 优先从参数中获取 secret_key,否则从配置中获取 MINIO_SECRET_KEY,没有则为''
+           secret_key = kwargs.get('secret_key') or getattr(Config, 'MINIO_SECRET_KEY', '')
+           # 优先从参数中获取 bucket_name,否则从配置中获取 MINIO_BUCKET_NAME,默认值为 'rag-lite'
+           bucket_name = kwargs.get('bucket_name') or getattr(Config, 'MINIO_BUCKET_NAME', 'rag-lite')
+           # 优先从参数中获取 secure,否则从配置中获取 MINIO_SECURE,默认值为 False
+           secure = kwargs.get('secure', getattr(Config, 'MINIO_SECURE', False))
+           # 优先从参数中获取 region,否则从配置中获取 MINIO_REGION,默认值为 None
+           region = kwargs.get('region', getattr(Config, 'MINIO_REGION', None))
+           # 如果 endpoint、access_key 或 secret_key 其中任一为空,则抛出 ValueError 异常
+           if not endpoint or not access_key or not secret_key:
+               raise ValueError(
+                   "MinIO 存储需要提供 endpoint、access_key 和 secret_key,请在环境变量中配置或通过参数传递。"
+               )
+           # 创建并返回 MinIOStorage 实例
+           return MinIOStorage(
+               endpoint=endpoint,#MinIO 服务端点(如:localhost:9000)
+               access_key=access_key,#访问密钥
+               secret_key=secret_key,#秘密密钥
+               bucket_name=bucket_name,#存储桶名称
+               secure=secure,#是否使用HTTPS
+               region=region#区域(可选)
+           )    
        else:
            # 不支持的类型抛出异常
            raise ValueError(f"Unsupported storage type: {storage_type}")

    # 定义获取实例的类方法(用于单例懒加载)
    @classmethod
    def get_instance(cls) -> StorageInterface:
        """
        获取单例存储服务实例(懒加载)

        Returns:
            存储服务实例
        """
        # 如果还未创建实例,则创建一个
        if cls._instance is None:
            cls._instance = cls.create_storage()
        # 返回单例
        return cls._instance

9.2. minio_storage.py #

app/services/storage/minio_storage.py

# MinIO 对象存储实现的说明文档
"""
MinIO 对象存储实现
"""
# 导入日志模块
import logging
# 导入可选类型提示
from typing import Optional
# 导入内存字节流处理
from io import BytesIO
# 导入存储接口基类
from app.services.storage.base import StorageInterface

# 导入Minio类和异常
from minio import Minio
# 导入Minio异常
from minio.error import S3Error

# 获取日志记录器
logger = logging.getLogger(__name__)

# 定义 MinIOStorage 类,继承 StorageInterface
class MinIOStorage(StorageInterface):
    # MinIO 对象存储实现
    """MinIO 对象存储实现"""

    # 初始化方法,接受端点、密钥、桶名等参数
    def __init__(self, endpoint: str, access_key: str, secret_key: str, 
                 bucket_name: str, secure: bool = False, region: Optional[str] = None):
        # 初始化 MinIO 存储
        """
        初始化 MinIO 存储

        Args:
            endpoint: MinIO 服务端点(如:localhost:9000)
            access_key: 访问密钥
            secret_key: 秘密密钥
            bucket_name: 存储桶名称
            secure: 是否使用HTTPS
            region: 区域(可选)
        """
        # 创建 Minio 客户端实例
        self.client = Minio(
            endpoint,#MinIO 服务端点(如:localhost:9000)
            access_key=access_key,#访问密钥
            secret_key=secret_key,#秘密密钥
            secure=secure,#是否使用HTTPS
            region=region#区域(可选)
        )
        # 保存桶名称
        self.bucket_name = bucket_name

        # 检查桶是否存在,不存在则创建
        if not self.client.bucket_exists(bucket_name):
            # 创建存储桶
            self.client.make_bucket(bucket_name)
            # 记录桶创建日志
            logger.info(f"已创建桶: {bucket_name}")

        # 记录MinIO初始化完成日志
        logger.info(f"MinIO 存储初始化完成,桶名: {bucket_name}")

    # 上传文件到 MinIO
    def upload_file(self, file_path: str, file_data: bytes, 
                   content_type: str = 'application/octet-stream') -> str:
        # 上传文件到 MinIO
        """上传文件到 MinIO"""
        try:
            # 创建 BytesIO 流用于上传
            data_stream = BytesIO(file_data)
            # 使用 put_object 方法上传文件
            self.client.put_object(
                self.bucket_name,
                file_path,
                data_stream,
                length=len(file_data),
                content_type=content_type
            )
            # 上传成功,记录日志
            logger.info(f"已上传文件到 MinIO: {file_path}")
            # 返回文件路径
            return file_path
        # 捕获 S3Error 类型异常
        except S3Error as e:
            # 上传时报错,记录日志后抛出异常
            logger.error(f"上传文件到 MinIO 时报错: {e}")
            raise
        # 捕获其它异常
        except Exception as e:
            # 上传时出现异常,记录日志并抛出
            logger.error(f"上传文件到 MinIO 时发生异常: {e}")
            raise

    # 下载文件方法
    def download_file(self, file_path: str) -> bytes:
        # 从 MinIO 下载文件
        """从 MinIO 下载文件"""
        try:
            # 获取对象句柄
            response = self.client.get_object(self.bucket_name, file_path)
            # 读取数据
            data = response.read()
            # 关闭响应体
            response.close()
            # 释放连接
            response.release_conn()

            # 记录下载日志
            logger.info(f"已从 MinIO 下载文件: {file_path}")
            # 返回二进制数据
            return data
        # 捕获 S3Error 异常
        except S3Error as e:
            # 如果对象不存在,抛出文件未找到异常
            if e.code == 'NoSuchKey':
                raise FileNotFoundError(f"文件不存在: {file_path}")
            # 其它S3错误,记录日志后抛出
            logger.error(f"从 MinIO 下载文件时报错: {e}")
            raise
        # 捕获其它异常
        except Exception as e:
            # 下载时发生其它异常,记录并抛出
            logger.error(f"从 MinIO 下载文件时发生异常: {e}")
            raise

    # 删除文件方法
    def delete_file(self, file_path: str) -> None:
        # 从 MinIO 删除文件
        """从 MinIO 删除文件"""
        try:
            # 调用 Minio API 删除文件
            self.client.remove_object(self.bucket_name, file_path)
            # 记录删除日志
            logger.info(f"已从 MinIO 删除文件: {file_path}")
        # 捕获 S3Error 异常
        except S3Error as e:
            # 记录删除时报错日志
            logger.error(f"从 MinIO 删除文件时报错: {e}")
            raise
        # 捕获其它异常
        except Exception as e:
            # 删除文件发生异常,记录并抛出
            logger.error(f"从 MinIO 删除文件时发生异常: {e}")
            raise

    # 判断文件是否存在方法
    def file_exists(self, file_path: str) -> bool:
        # 检查文件是否存在于 MinIO
        """检查文件是否存在于 MinIO"""
        try:
            # 检查对象状态(存在则无异常)
            self.client.stat_object(self.bucket_name, file_path)
            # 存在返回 True
            return True
        # 捕获 S3Error 异常
        except S3Error as e:
            # 如果对象不存在,返回 False
            if e.code == 'NoSuchKey':
                return False
            # 其它 S3 错误抛出
            raise
        # 捕获其它异常
        except Exception as e:
            # 检查文件存在时异常,记录并抛出
            logger.error(f"在 MinIO 检查文件是否存在时发生异常: {e}")
            raise

    # 获取文件访问URL方法
    def get_file_url(self, file_path: str, expires_in: Optional[int] = None) -> Optional[str]:
        # 获取 MinIO 文件的预签名URL
        """获取 MinIO 文件的预签名URL"""
        try:
            # 如果未指定过期时间,默认为7天
            if expires_in is None:
                expires_in = 7 * 24 * 3600  # 默认7天

            # 生成预签名URL
            url = self.client.presigned_get_object(
                self.bucket_name,
                file_path,
                expires=expires_in
            )
            # 返回预签名URL
            return url
        # 捕获 S3Error 异常
        except S3Error as e:
            # 生成URL时报错,记录日志返回None
            logger.error(f"生成 MinIO 文件预签名URL时报错: {e}")
            return None
        # 捕获其它异常
        except Exception as e:
            # 发生其它异常,记录日志返回None
            logger.error(f"生成 MinIO 文件预签名URL时发生异常: {e}")
            return None

9.3. .env #

.env

# 应用配置
APP_HOST=0.0.0.0
APP_PORT=5000
APP_DEBUG=True
MAX_FILE_SIZE=104857600
SECRET_KEY=dev-secret-key-change-in-production


# 日志配置
LOG_DIR=./logs
LOG_FILE=rag_lite.log
LOG_LEVEL=INFO
LOG_ENABLE_FILE=True
LOG_ENABLE_CONSOLE=True

# 数据库配置
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=root
DB_NAME=rag-lite
DB_CHARSET=utf8mb4

# 存储配置
+STORAGE_TYPE=minio
+MINIO_ENDPOINT=127.0.0.1:9000
+MINIO_ACCESS_KEY=minioadmin
+MINIO_SECRET_KEY=minioadmin

9.4. config.py #

app/config.py

"""
配置管理模块
"""

# 导入操作系统相关模块
import os
# 导入 Path,处理路径
from pathlib import Path
# 导入 dotenv,用于加载 .env 文件中的环境变量
from dotenv import load_dotenv

# 加载 .env 文件中的环境变量到系统环境变量
load_dotenv()

# 定义应用配置类
class Config:
    """应用配置类"""

    # 基础配置
    # 项目根目录路径(取上级目录)
    BASE_DIR = Path(__file__).parent.parent
    # 加载环境变量 SECRET_KEY,若未设置则使用默认开发密钥
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

    # 应用配置
    # 读取应用监听的主机地址,默认为本地所有地址
    APP_HOST = os.environ.get('APP_HOST', '0.0.0.0')
    # 读取应用监听的端口,默认为 5000,类型为 int
    APP_PORT = int(os.environ.get('APP_PORT', 5000))
    # 读取 debug 模式配置,字符串转小写等于 'true' 则为 True(开启调试)
    APP_DEBUG = os.environ.get('APP_DEBUG', 'false').lower() == 'true'
    # 读取允许上传的最大文件大小,默认为 100MB,类型为 int
    MAX_FILE_SIZE = int(os.environ.get('MAX_FILE_SIZE', 104857600))  # 100MB
    # 允许上传的文件扩展名集合
    ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt', 'md'}
    # 允许上传的图片扩展名集合
    ALLOWED_IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'webp'}
    # 允许上传的图片最大大小,默认为 5MB,类型为 int
    MAX_IMAGE_SIZE = int(os.environ.get('MAX_IMAGE_SIZE', 5242880))  # 5MB

    # 日志配置
    # 日志目录,默认 './logs'
    LOG_DIR = os.environ.get('LOG_DIR', './logs')
    # 日志文件名,默认 'rag_lite.log'
    LOG_FILE = os.environ.get('LOG_FILE', 'rag_lite.log')
    # 日志等级,默认 'INFO'
    LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
    # 是否启用控制台日志,默认 True
    LOG_ENABLE_CONSOLE = os.environ.get('LOG_ENABLE_CONSOLE', 'true').lower() == 'true'
    # 是否启用文件日志,默认 True
    LOG_ENABLE_FILE = os.environ.get('LOG_ENABLE_FILE', 'true').lower() == 'true'

    # 数据库配置
    # 数据库主机地址,默认为 'localhost'
    DB_HOST = os.environ.get('DB_HOST', 'localhost')
    # 数据库端口号,默认为 3306
    DB_PORT = int(os.environ.get('DB_PORT', 3306))
    # 数据库用户名,默认为 'root'
    DB_USER = os.environ.get('DB_USER', 'root')
    # 数据库密码,默认为 'root'
    DB_PASSWORD = os.environ.get('DB_PASSWORD', 'root')
    # 数据库名称,默认为 'rag-lite'
    DB_NAME = os.environ.get('DB_NAME', 'rag-lite')
    # 数据库字符集,默认为 'utf8mb4'
    DB_CHARSET = os.environ.get('DB_CHARSET', 'utf8mb4')

    # 存储配置
    STORAGE_TYPE = os.environ.get('STORAGE_TYPE', 'local')  # 'local' 或 'minio'
    STORAGE_DIR = os.environ.get('STORAGE_DIR', './storage')

    # MinIO 配置(当 STORAGE_TYPE='minio' 时使用)
+   MINIO_ENDPOINT = os.environ.get('MINIO_ENDPOINT', '')
+   MINIO_ACCESS_KEY = os.environ.get('MINIO_ACCESS_KEY', '')
+   MINIO_SECRET_KEY = os.environ.get('MINIO_SECRET_KEY', '')
+   MINIO_BUCKET_NAME = os.environ.get('MINIO_BUCKET_NAME', 'rag-lite')
+   MINIO_SECURE = os.environ.get('MINIO_SECURE', 'false').lower() == 'true'
+   MINIO_REGION = os.environ.get('MINIO_REGION', None)

10.搜索和排序 #

10.1. knowledgebase.py #

app/blueprints/knowledgebase.py

# 知识库相关路由(视图 + API)
"""
知识库相关路由(视图 + API)
"""

# 导入Flask中的Blueprint和request
from flask import Blueprint,request,render_template,send_file,abort
# 使用BytesIO将图片数据包装为文件流
from io import BytesIO
# 导入logging模块
import logging
# 导入mimetypes、os模块用于类型判断
import mimetypes
# 导入os模块用于路径操作
import os
# 导入自定义工具函数:异常处理装饰器、错误响应、成功响应
from app.blueprints.utils import (handle_api_error,error_response,success_response,get_current_user_or_error)
# 导入知识库服务
from app.services.knowledgebase_service import kb_service
# 导入认证工具函数:登录认证装饰器、获取当前用户、API登录认证装饰器
from app.utils.auth import login_required, get_current_user,api_login_required
# 导入分页工具函数
from app.blueprints.utils import (get_pagination_params,check_ownership)
# 导入存储服务
from app.services.storage_service import storage_service
# 配置logger
logger = logging.getLogger(__name__)

# 创建Blueprint实例,注册在Flask应用下
bp = Blueprint('knowledgebase', __name__)

# 定义路由:POST请求到/api/v1/kb
@bp.route('/api/v1/kb', methods=['POST'])
# 应用API登录认证装饰器
@api_login_required
# 应用自定义异常处理装饰器
@handle_api_error
# 定义创建知识库的视图函数
# 定义用于创建知识库的API接口
def api_create():
    # 接口用途说明文档字符串
    """创建知识库"""
    # 获取当前用户,如未登录则返回错误响应
    current_user, err = get_current_user_or_error()
    if err:
        return err
    # 检查请求是否为multipart/form-data(用于文件上传的表单方式)
    if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单数据中获取知识库名称
        name = request.form.get('name')
        # 如果未传入name参数,返回错误
        if not name:
            return error_response("name is required", 400)
        # 获取描述字段,没有则为None
        description = request.form.get('description') or None
        # 获取分块大小,默认为512
        chunk_size = int(request.form.get('chunk_size', 512))
        # 获取分块重叠,默认为50
        chunk_overlap = int(request.form.get('chunk_overlap', 50))
        # 设置封面图片数据变量初值为None
        cover_image_data = None
        # 设置封面图片文件名变量初值为None
        cover_image_filename = None
        # 判断请求中是否包含'cover_image'文件
        if 'cover_image' in request.files:
            # 获取上传的封面图片文件对象
            cover_file = request.files['cover_image']
            # 如果上传的文件存在且有文件名
            if cover_file and cover_file.filename:
                # 读取文件内容为二进制数据
                cover_image_data = cover_file.read()
                # 获取上传文件的文件名
                cover_image_filename = cover_file.filename
                # 记录封面图片上传的信息到日志,包括文件名、字节大小和内容类型
                logger.info(f"收到新知识库的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")
    else:
        # 如果是json请求数据(向后兼容旧用法)
        data = request.get_json()
        # 判断是否存在name字段,不存在则报错
        if not data or 'name' not in data:
            return error_response("name is required", 400)
        # 获取知识库名称
        name = data['name']
        # 获取描述
        description = data.get('description')
        # 获取分块大小,默认为512
        chunk_size = data.get('chunk_size', 512)
        # 获取分块重叠,默认为50
        chunk_overlap = data.get('chunk_overlap', 50)
        # 设置封面图片数据变量初值为None
        cover_image_data = None
        # 设置封面图片文件名变量初值为None
        cover_image_filename = None
    # 调用知识库服务,创建知识库,返回知识库信息字典
    kb_dict = kb_service.create(
        name=name,                # 知识库名称
        user_id=current_user['id'],   # 用户ID
        description=description,      # 知识库描述
        chunk_size=chunk_size,        # 分块大小
        chunk_overlap=chunk_overlap,  # 分块重叠
        cover_image_data=cover_image_data, # 封面图片数据
        cover_image_filename=cover_image_filename # 封面图片文件名
    )
    # 返回成功响应,包含知识库信息
    return success_response(kb_dict)


# 注册'/kb'路由,处理GET请求,显示知识库列表页面
@bp.route('/kb')
# 要求登录用户才能访问该视图,用于Web页面
@login_required
# 定义kb_list函数,渲染知识库列表页面
def kb_list():
    # 设置本函数用途说明(文档字符串)
    """知识库列表页面"""
    # 获取当前登录用户信息
    current_user = get_current_user()
    # 获取分页参数(页码和每页大小),最大每页100
    page, page_size = get_pagination_params(max_page_size=100)
    # 获取搜索和排序参数
+   search = request.args.get('search', '').strip() or None
+   sort_by = request.args.get('sort_by', 'created_at')
+   sort_order = request.args.get('sort_order', 'desc')

    # 验证排序参数
+   if sort_by not in ['created_at', 'name', 'updated_at']:
+       sort_by = 'created_at'
+   if sort_order not in ['asc', 'desc']:
+       sort_order = 'desc'
    # 调用知识库服务,获取分页后的知识库列表结果
    result = kb_service.list(
        user_id=current_user['id'], # 用户ID
        page=page, # 页码
+       page_size=page_size, # 每页大小
+       search=search, # 搜索关键词 
+       sort_by=sort_by, # 排序字段
+       sort_order=sort_order # 排序方向

    )
    # 渲染知识库列表页面模板,传递数据,包括知识库列表、分页信息
    return render_template('kb_list.html', 
                         kbs=result['items'],
+                        pagination=result,
+                        search=search or '',
+                        sort_by=sort_by,
+                        sort_order=sort_order)    

# 注册DELETE方法的API路由,用于删除知识库
@bp.route('/api/v1/kb/<kb_id>', methods=['DELETE'])
# 要求API登录
@api_login_required
# 处理API错误的装饰器
@handle_api_error
def api_delete(kb_id):
    """删除知识库"""
    # 获取当前用户信息,如果未登录则返回错误
    current_user, err = get_current_user_or_error()
    if err:
        return err

    # 根据知识库ID获取知识库信息
    kb_dict = kb_service.get_by_id(kb_id)
    # 如果知识库不存在,返回404错误
    if not kb_dict:
        return error_response("未找到知识库", 404)

    # 验证当前用户是否拥有该知识库的操作权限
    has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
    if not has_permission:
        return err

    # 调用服务删除知识库
    success = kb_service.delete(kb_id)
    # 如果删除失败,返回404错误
    if not success:
        return error_response("未找到知识库", 404)

    # 返回删除成功的响应
    return success_response("知识库删除成功")


# 注册PUT方法的API路由(用于更新知识库)
@bp.route('/api/v1/kb/<kb_id>', methods=['PUT'])
# 要求API登录
@api_login_required
# 捕获API内部错误的装饰器
@handle_api_error
def api_update(kb_id):
    # 定义API用于更新知识库(含封面图片)
    """更新知识库(支持封面图片更新)"""
    # 获取当前登录用户信息,如果未登录则返回错误响应
    current_user, err = get_current_user_or_error()
    if err:
        return err

    # 获取指定ID的知识库记录,验证其是否存在
    kb_dict = kb_service.get_by_id(kb_id)
    if not kb_dict:
        return error_response("未找到知识库", 404)

    # 校验当前用户是否有操作该知识库的权限
    has_permission, err = check_ownership(kb_dict['user_id'], current_user['id'], "knowledgebase")
    if not has_permission:
        return err

    # 判断请求内容类型是否为multipart/form-data(一般用于带文件上传的表单提交)
    if request.content_type and 'multipart/form-data' in request.content_type:
        # 从表单中获取普通字段
        name = request.form.get('name')
        description = request.form.get('description') or None
        chunk_size = request.form.get('chunk_size')
        chunk_overlap = request.form.get('chunk_overlap')

        # 初始化封面图片相关变量
        cover_image_data = None
        cover_image_filename = None
        # 获得delete_cover字段(类型字符串,需判断是否为'true')
        delete_cover = request.form.get('delete_cover') == 'true'

        # 如果有上传封面图片,则读取文件内容
        if 'cover_image' in request.files:
            cover_file = request.files['cover_image']
            if cover_file and cover_file.filename:
                cover_image_data = cover_file.read()
                cover_image_filename = cover_file.filename
                # 记录上传日志
                logger.info(f"收到知识库 {kb_id} 的封面图片上传: 文件名={cover_image_filename}, 大小={len(cover_image_data)} 字节, 内容类型={cover_file.content_type}")

        # 构建待更新的数据
        update_data = {}
        if name:
            update_data['name'] = name
        if description is not None:
            update_data['description'] = description
        if chunk_size:
            update_data['chunk_size'] = int(chunk_size)
        if chunk_overlap:
            update_data['chunk_overlap'] = int(chunk_overlap)
    else:
        # 非表单上传,则按JSON结构解析请求内容
        data = request.get_json()
        # 如果请求体是空的,直接返回错误
        if not data:
            return error_response("请求体不能为空", 400)

        # 构建可更新的数据字典
        update_data = {}
        if 'name' in data:
            update_data['name'] = data['name']
        if 'description' in data:
            update_data['description'] = data.get('description')
        if 'chunk_size' in data:
            update_data['chunk_size'] = data['chunk_size']
        if 'chunk_overlap' in data:
            update_data['chunk_overlap'] = data['chunk_overlap']
        # JSON请求时,cover_image相关变量置空
        cover_image_data = None
        cover_image_filename = None
        delete_cover = data.get('delete_cover', False)    

    # 调用服务更新知识库,传入各字段及封面参数
    updated_kb = kb_service.update(
        kb_id=kb_id,                   # 知识库ID
        cover_image_data=cover_image_data,   # 封面图片的二进制内容
        cover_image_filename=cover_image_filename, # 封面图片文件名
        delete_cover=delete_cover,           # 是否删除封面图片
        **update_data                    # 其它可变字段
    )

    # 更新后如果找不到,返回404
    if not updated_kb:
        return error_response("未找到知识库", 404)

    # 更新成功后,将最新的知识库数据返回给前端
    return success_response(updated_kb, "知识库更新成功")


# 定义路由,获取指定知识库ID的封面图片,仅限登录用户访问
@bp.route('/kb/<kb_id>/cover')
@login_required
def kb_cover(kb_id):
    """获取知识库封面图片"""
    # 获取当前已登录用户的信息
    current_user = get_current_user()
    # 根据知识库ID从知识库服务获取对应的知识库信息
    kb = kb_service.get_by_id(kb_id)

    # 检查知识库是否存在
    if not kb:
        # 如果知识库不存在,记录警告日志
        logger.warning(f"知识库不存在: {kb_id}")
        abort(404)

    # 检查是否有权限访问(只能查看自己的知识库封面)
    if kb.get('user_id') != current_user['id']:
        # 如果不是当前用户的知识库,记录警告日志
        logger.warning(f"用户 {current_user['id']} 尝试访问知识库 {kb_id} 的封面,但该知识库属于用户 {kb.get('user_id')}")
        abort(403)

    # 获取知识库的封面图片路径
    cover_path = kb.get('cover_image')
    # 检查是否有封面图片
    if not cover_path:
        # 如果没有封面,记录调试日志
        logger.debug(f"知识库 {kb_id} 没有封面图片")
        abort(404)

    try:
        # 通过存储服务下载封面图片数据
        image_data = storage_service.download_file(cover_path)
        # 如果未能获取到图片数据,记录错误日志并返回404
        if not image_data:
            logger.error(f"从路径下载封面图片失败: {cover_path}")
            abort(404)

        # 根据文件扩展名判断图片MIME类型
        file_ext = os.path.splitext(cover_path)[1].lower()
        # 自定义映射,优先根据文件扩展名判断图片MIME类型
        mime_type_map = {
            '.jpg': 'image/jpeg',
            '.jpeg': 'image/jpeg',
            '.png': 'image/png',
            '.gif': 'image/gif',
            '.webp': 'image/webp'
        }

        # 优先根据自定义映射获取MIME类型
        mime_type = mime_type_map.get(file_ext)
        if not mime_type:
            # 如果没有命中自定义映射,则使用mimetypes猜测类型
            mime_type, _ = mimetypes.guess_type(cover_path)
            if not mime_type:
                # 如果还未识别出类型,则默认用JPEG
                mime_type = 'image/jpeg'

        # 通过send_file响应图片数据和MIME类型,不以附件形式发送
        return send_file(
            BytesIO(image_data),#图片数据
            mimetype=mime_type,#MIME类型
            as_attachment=False#不以附件形式发送
        )
    except FileNotFoundError as e:
        # 捕获文件未找到异常,记录错误日志
        logger.error(f"封面图片文件未找到: {cover_path}, 错误: {e}")
        abort(404)
    except Exception as e:
        # 捕获其他未预期异常,记录错误日志(包含堆栈信息)
        logger.error(f"提供知识库 {kb_id} 的封面图片时出错, 路径: {cover_path}, 错误: {e}", exc_info=True)
        abort(404)

10.2. knowledgebase_service.py #

app/services/knowledgebase_service.py

# 知识库服务
"""
知识库服务
"""
import os
from app.config import Config
# 导入类型提示工具
from typing import Optional, Dict
# 从基础服务导入BaseService类
from app.services.base_service import BaseService

# 从模型模块导入Knowledgebase类
from app.models.knowledgebase import Knowledgebase
# 导入存储服务
from app.services.storage_service import storage_service
# 定义KnowledgebaseService服务类,继承自BaseService,泛型参数为Knowledgebase
class KnowledgebaseService(BaseService[Knowledgebase]):
    """知识库服务"""
    # 定义创建知识库的方法
    def create(self, name: str, user_id: str, description: str = None, 
               chunk_size: int = 512, chunk_overlap: int = 50,
               cover_image_data: bytes = None, cover_image_filename: str = None) -> dict:
        """
        创建知识库

        Args:
            name: 知识库名称
            user_id: 用户ID
            description: 描述
            chunk_size: 分块大小
            chunk_overlap: 分块重叠
            cover_image_data: 封面图片数据(可选)
            cover_image_filename: 封面图片文件名(可选)

        Returns:
            创建的知识库字典
        """
        cover_image_path = None

        # 处理封面图片上传
        if cover_image_data and cover_image_filename:
            # 验证文件类型
            file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
            if not file_ext_without_dot:
                raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")

            if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
                raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")

            # 验证文件大小
            if len(cover_image_data) == 0:
                raise ValueError("上传的图片文件为空")

            if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
                raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")
        # 启动数据库事务,上下文管理器自动处理提交或回滚
        with self.transaction() as session:
            # 先创建知识库对象
            kb = Knowledgebase(
                name=name,  # 设置知识库名称
                user_id=user_id,  # 设置用户ID
                description=description,  # 设置知识库描述
                chunk_size=chunk_size,  # 设置分块大小
                chunk_overlap=chunk_overlap  # 设置分块重叠
            )
            # 将知识库对象添加到session
            session.add(kb)
            # 刷新session,生成知识库ID
            session.flush()  # 刷新以获取 ID,但不提交

            # 上传封面图片(如果有)
            if cover_image_data and cover_image_filename:
                try:
                    # 构建封面图片路径(统一使用小写扩展名)
                    file_ext_with_dot = os.path.splitext(cover_image_filename)[1].lower()
                    cover_image_path = f"covers/{kb.id}{file_ext_with_dot}"

                    self.logger.info(f"正在为新知识库 {kb.id} 上传封面图片: 文件名={cover_image_filename}, 路径={cover_image_path}, 大小={len(cover_image_data)} 字节")

                    # 上传到存储
                    storage_service.upload_file(cover_image_path, cover_image_data)

                    # 验证文件是否成功上传
                    if not storage_service.file_exists(cover_image_path):
                        raise ValueError(f"上传后文件不存在: {cover_image_path}")

                    self.logger.info(f"成功上传封面图片: {cover_image_path}")

                    # 更新知识库的封面路径
                    kb.cover_image = cover_image_path
                    session.flush()
                except Exception as e:
                    self.logger.error(f"上传知识库 {kb.id} 的封面图片时出错: {e}", exc_info=True)
                    # 如果上传失败,继续创建知识库,但不设置封面
                    cover_image_path = None

            # 刷新kb对象的数据库状态
            session.refresh(kb)
            # 转换kb对象为字典(在session内部,避免分离后出错)
            kb_dict = kb.to_dict()
            # 记录创建知识库的日志,包含ID
            self.logger.info(f"创建了知识库,ID: {kb.id}")
            # 返回知识库字典信息
            return kb_dict

    # 定义获取知识库列表的方法
+   def list(self, user_id: str = None, page: int = 1, page_size: int = 10, 
+            search: str = None, sort_by: str = 'created_at', sort_order: str = 'desc') -> Dict:
        """
        获取知识库列表

        Args:
            user_id: 用户ID(可选)
            page: 页码
            page_size: 每页数量
+           search: 搜索关键词(搜索名称和描述)
+           sort_by: 排序字段(created_at, name, updated_at)
+           sort_order: 排序方向(asc, desc)


        Returns:
            包含 items, total, page, page_size 的字典
        """
        # 使用数据库会话
        with self.session() as session:
            # 查询Knowledgebase表
            query = session.query(Knowledgebase)
            # 如果指定了user_id,则筛选属于该用户的知识库
            if user_id:
                query = query.filter(Knowledgebase.user_id == user_id)
            # 如果提供了搜索关键词,则按名称和描述进行模糊搜索
+           if search:
                # 构造模糊搜索的SQL模式
+               search_pattern = f"%{search}%"
                # 通过SQLAlchemy的filter方法实现名称或描述字段的模糊查询
+               query = query.filter(
                    # 名称模糊匹配
+                   (Knowledgebase.name.like(search_pattern)) |
                    # 描述模糊匹配
+                   (Knowledgebase.description.like(search_pattern))
+               )

            # 排序逻辑处理
            # 初始化排序字段变量
+           sort_field = None
            # 如果排序字段为'name',按名称排序
+           if sort_by == 'name':
+               sort_field = Knowledgebase.name
            # 如果排序字段为'updated_at',按更新时间排序
+           elif sort_by == 'updated_at':
+               sort_field = Knowledgebase.updated_at
            # 否则默认按创建时间排序
+           else:
+               sort_field = Knowledgebase.created_at

            # 根据排序顺序(升序或降序)添加排序
+           if sort_order == 'asc':
                # 升序排列
+               query = query.order_by(sort_field.asc())
+           else:
                # 默认降序排列
+               query = query.order_by(sort_field.desc())    
            # 统计总记录数
            total = query.count()
            # 计算分页偏移量
            offset = (page - 1) * page_size
            # 获取当前页的数据列表
            kbs = query.offset(offset).limit(page_size).all()

            # 初始化知识库字典列表
            items = []
            # 遍历查询结果,将每一项转为dict后添加到items列表
            for kb in kbs:
                kb_dict = kb.to_dict()
                items.append(kb_dict)

            # 返回包含分页信息和数据条目的字典
            return {
                'items': items,
                'total': total,
                'page': page,
                'page_size': page_size
            }     

    def delete(self, kb_id: str) -> bool:
        """
        删除知识库

        Args:
            kb_id: 知识库ID

        Returns:
            是否删除成功
        """
        with self.transaction() as session:
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            if not kb:
                return False
            session.delete(kb)
            self.logger.info(f"Deleted knowledgebase: {kb_id}")
            return True    

    def get_by_id(self, kb_id: str) -> Optional[dict]:
        """根据ID获取知识库"""
        with self.session() as session:
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            if kb:
                # 在 session 内部转换为字典,避免对象从 session 分离后访问属性出错
                return kb.to_dict()
            return None   
    # 定义 update 方法,用于更新知识库
    def update(self, kb_id: str, cover_image_data: bytes = None, 
               cover_image_filename: str = None, delete_cover: bool = False, **kwargs) -> Optional[dict]:
        """
        更新知识库

        Args:
            kb_id: 知识库ID
            cover_image_data: 新的封面图片数据(可选)
            cover_image_filename: 新的封面图片文件名(可选)
            delete_cover: 是否删除封面图片(可选)
            **kwargs: 要更新的字段(name, description, chunk_size, chunk_overlap 等)

        Returns:
            更新后的知识库字典,如果不存在则返回 None
        """
        # 开启数据库事务
        with self.transaction() as session:
            # 查询指定ID的知识库对象
            kb = session.query(Knowledgebase).filter(Knowledgebase.id == kb_id).first()
            # 如果未找到知识库,则返回 None
            if not kb:
                return None

            # 处理封面图片更新
            old_cover_path = kb.cover_image if kb.cover_image else None

            if delete_cover:
                # 删除封面图片
                if old_cover_path:
                    try:
                        storage_service.delete_file(old_cover_path)
                        self.logger.info(f"已删除封面图片: {old_cover_path}")
                    except Exception as e:
                        self.logger.warning(f"删除封面图片时出错: {e}")
                kwargs['cover_image'] = None
            elif cover_image_data and cover_image_filename:
                # 上传新封面图片
                # 验证文件类型
                file_ext_without_dot = os.path.splitext(cover_image_filename)[1][1:].lower() if '.' in cover_image_filename else ''
                if not file_ext_without_dot:
                    raise ValueError(f"文件名缺少扩展名: {cover_image_filename}")

                if file_ext_without_dot not in Config.ALLOWED_IMAGE_EXTENSIONS:
                    raise ValueError(f"不支持的图片格式: {file_ext_without_dot}。支持的格式: {', '.join(Config.ALLOWED_IMAGE_EXTENSIONS)}")

                # 验证文件大小
                if len(cover_image_data) == 0:
                    raise ValueError("上传的图片文件为空")

                if len(cover_image_data) > Config.MAX_IMAGE_SIZE:
                    raise ValueError(f"图片文件大小超过限制 {Config.MAX_IMAGE_SIZE / 1024 / 1024}MB")

                try:
                    # 构建新封面图片路径(使用原始扩展名,保持大小写)
                    file_ext_with_dot = os.path.splitext(cover_image_filename)[1]
                    # 统一使用小写扩展名,避免路径不一致的问题
                    file_ext_with_dot = file_ext_with_dot.lower()
                    new_cover_path = f"covers/{kb_id}{file_ext_with_dot}"

                    self.logger.info(f"正在处理知识库 {kb_id} 的封面图片更新: 文件名={cover_image_filename}, 扩展名={file_ext_without_dot}, 大小={len(cover_image_data)} 字节, 新路径={new_cover_path}, 旧路径={old_cover_path}")

                    # 先上传新封面(确保上传成功后再删除旧封面)
                    storage_service.upload_file(new_cover_path, cover_image_data)

                    # 验证文件是否成功上传
                    if not storage_service.file_exists(new_cover_path):
                        raise ValueError(f"上传后文件不存在: {new_cover_path}")

                    self.logger.info(f"成功上传封面图片: {new_cover_path}")

                    # 删除旧封面(如果存在且与新封面路径不同)
                    if old_cover_path and old_cover_path != new_cover_path:
                        try:
                            storage_service.delete_file(old_cover_path)
                            self.logger.info(f"已删除旧封面图片: {old_cover_path}")
                        except Exception as e:
                            self.logger.warning(f"删除旧封面图片时出错: {e}")
                            # 继续执行,不因为删除旧文件失败而中断更新

                    # 更新数据库中的封面路径
                    kwargs['cover_image'] = new_cover_path
                except Exception as e:
                    self.logger.error(f"上传知识库 {kb_id} 的封面图片时出错: {e}", exc_info=True)
                    raise ValueError(f"上传封面图片失败: {str(e)}")    

            # 遍历要更新的字段和值
            for key, value in kwargs.items():
                # 判断知识库对象是否有该字段,且值不为 None
                if hasattr(kb, key) and (key == 'cover_image' or value is not None):
                    # 设置该字段的新值
                    setattr(kb, key, value)

            # 刷新session,保证对象属性为最新状态
            session.flush()
            # 刷新对象,避免未提交前读取到旧数据
            session.refresh(kb)

            # 在事务内部将对象转为字典,避免 session 关闭后访问失败
            kb_dict = kb.to_dict()

            # 如果本次更新包含 'cover_image' 字段,记录详细日志
            if 'cover_image' in kwargs:
                self.logger.info(f"更新知识库 {kb_id}, 封面图片={kb_dict.get('cover_image')}")
            else:
                # 否则仅记录知识库ID
                self.logger.info(f"更新知识库: {kb_id}")

            # 返回更新后的知识库字典
            return kb_dict

# 创建KnowledgebaseService的单例对象
kb_service = KnowledgebaseService()    

10.3. kb_list.html #

app/templates/kb_list.html

{% extends "base.html" %}

{% block title %}知识库管理 - RAG Lite{% endblock %}

{% block content %}
<style>
@media (min-width: 992px) {
    #kbList > div {
        flex: 0 0 20%;
        max-width: 20%;
    }
}
</style>
<div class="row">
    <div class="col-12">
        <nav aria-label="breadcrumb" class="mb-3">
            <ol class="breadcrumb">
                <li class="breadcrumb-item"><a href="/">首页</a></li>
                <li class="breadcrumb-item active">知识库管理</li>
            </ol>
        </nav>

        <div class="d-flex justify-content-between align-items-center mb-4">
            <h2><i class="bi bi-collection"></i> 知识库管理</h2>
            <button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#createKbModal">
                <i class="bi bi-plus-circle"></i> 创建知识库
            </button>
        </div>
+       <!-- 搜索和排序工具栏 -->
+       <div class="card mb-4">
+           <div class="card-body">
+               <form method="GET" action="/kb" id="searchForm" class="row g-3 align-items-end">
+                   <div class="col-md-6">
+                       <label for="searchInput" class="form-label">搜索</label>
+                       <div class="input-group">
+                           <span class="input-group-text"><i class="bi bi-search"></i></span>
+                           <input type="text" class="form-control" id="searchInput" name="search" 
+                                  placeholder="搜索知识库名称或描述..." value="{{ search }}">
+                           {% if search %}
+                           <button type="button" class="btn btn-outline-secondary" onclick="clearSearch()">
+                               <i class="bi bi-x"></i>
+                           </button>
+                           {% endif %}
+                       </div>
+                   </div>
+                   <div class="col-md-3">
+                       <label for="sortBySelect" class="form-label">排序字段</label>
+                       <select class="form-select" id="sortBySelect" name="sort_by" onchange="updateSearch()">
+                           <option value="created_at" {% if sort_by == 'created_at' %}selected{% endif %}>创建时间</option>
+                           <option value="name" {% if sort_by == 'name' %}selected{% endif %}>名称</option>
+                           <option value="updated_at" {% if sort_by == 'updated_at' %}selected{% endif %}>更新时间</option>
+                       </select>
+                   </div>
+                   <div class="col-md-2">
+                       <label for="sortOrderSelect" class="form-label">排序方向</label>
+                       <select class="form-select" id="sortOrderSelect" name="sort_order" onchange="updateSearch()">
+                           <option value="desc" {% if sort_order == 'desc' %}selected{% endif %}>降序</option>
+                           <option value="asc" {% if sort_order == 'asc' %}selected{% endif %}>升序</option>
+                       </select>
+                   </div>
+                   <div class="col-md-1">
+                       <button type="submit" class="btn btn-primary w-100">
+                           <i class="bi bi-search"></i> 搜索
+                       </button>
+                   </div>
+                   <input type="hidden" name="page" value="1">
+                   <input type="hidden" name="page_size" value="{{ pagination.page_size if pagination else 10 }}">
+               </form>
+           </div>
+       </div>
        <!-- 知识库列表 -->
        <div class="row" id="kbList">
            {% if kbs %}
                {% for kb in kbs %}
                <div class="col-12 col-sm-6 col-md-4 col-lg mb-4">
                    <div class="card h-100">
                        {% if kb.cover_image %}
                        <img src="/kb/{{ kb.id }}/cover" class="card-img-top" alt="{{ kb.name|e }}" style="height: 150px; object-fit: scale-down;">
                        {% else %}
                        <div class="card-img-top bg-light d-flex align-items-center justify-content-center" style="height: 150px;">
                            <i class="bi bi-folder" style="font-size: 3rem; color: #6c757d;"></i>
                        </div>
                        {% endif %}
                        <div class="card-body">
                            <h5 class="card-title">
                                <i class="bi bi-folder"></i> {{ kb.name }}
                            </h5>
                            <p class="card-text text-muted small">{{ kb.description or '无描述' }}</p>
                        </div>
                        <div class="card-footer bg-transparent">
                            <button class="btn btn-sm btn-warning" 
                                    data-kb-id="{{ kb.id }}"
                                    data-kb-name="{{ kb.name }}"
                                    data-kb-description="{{ kb.description or '' }}"
                                    data-kb-chunk-size="{{ kb.chunk_size }}"
                                    data-kb-chunk-overlap="{{ kb.chunk_overlap }}"
                                    data-kb-cover-image="{{ kb.cover_image or '' }}"
                                    onclick="editKbFromButton(this)">
                                <i class="bi bi-pencil"></i> 编辑
                            </button>
                            <button class="btn btn-sm btn-danger" onclick="deleteKb('{{ kb.id }}', '{{ kb.name }}')">
                                <i class="bi bi-trash"></i> 删除
                            </button>
                        </div>
                    </div>
                </div>
                {% endfor %}
            {% else %}
                <div class="col-12">
                    <div class="alert alert-info">
                        <i class="bi bi-info-circle"></i> 还没有知识库,点击上方按钮创建一个吧!
                    </div>
                </div>
            {% endif %}
        </div>

        <!-- 分页控件 -->
        {% if pagination and pagination.total > pagination.page_size %}
        <nav aria-label="知识库列表分页" class="mt-4">
            <ul class="pagination justify-content-center">
                {% set current_page = pagination.page %}
                {% set total_pages = (pagination.total + pagination.page_size - 1) // pagination.page_size %}

                <!-- 上一页 -->
                <li class="page-item {% if current_page <= 1 %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page - 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}" 
                       {% if current_page <= 1 %}tabindex="-1" aria-disabled="true"{% endif %}>
                        <i class="bi bi-chevron-left"></i> 上一页
                    </a>
                </li>

                <!-- 页码 -->
                {% set start_page = [1, current_page - 2] | max %}
                {% set end_page = [total_pages, current_page + 2] | min %}

                {% if start_page > 1 %}
                <li class="page-item">
                    <a class="page-link" href="?page=1&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">1</a>
                </li>
                {% if start_page > 2 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                {% endif %}

                {% for page_num in range(start_page, end_page + 1) %}
                <li class="page-item {% if page_num == current_page %}active{% endif %}">
                    <a class="page-link" href="?page={{ page_num }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">
                        {{ page_num }}
                    </a>
                </li>
                {% endfor %}

                {% if end_page < total_pages %}
                {% if end_page < total_pages - 1 %}
                <li class="page-item disabled">
                    <span class="page-link">...</span>
                </li>
                {% endif %}
                <li class="page-item">
                    <a class="page-link" href="?page={{ total_pages }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}">{{ total_pages }}</a>
                </li>
                {% endif %}

                <!-- 下一页 -->
                <li class="page-item {% if current_page >= total_pages %}disabled{% endif %}">
                    <a class="page-link" href="?page={{ current_page + 1 }}&page_size={{ pagination.page_size }}{% if search %}&search={{ search|urlencode }}{% endif %}&sort_by={{ sort_by }}&sort_order={{ sort_order }}"
                       {% if current_page >= total_pages %}tabindex="-1" aria-disabled="true"{% endif %}>
                        下一页 <i class="bi bi-chevron-right"></i>
                    </a>
                </li>
            </ul>
            <div class="text-center text-muted small mt-2">
                共 {{ pagination.total }} 个知识库{% if search %}(搜索: "{{ search }}"){% endif %},第 {{ current_page }} / {{ total_pages }} 页
            </div>
        </nav>
        {% endif %}
    </div>
</div>
<!-- 创建知识库模态框 -->
<div class="modal fade" id="createKbModal" tabindex="-1">
    <div class="modal-dialog">
        <div class="modal-content">
            <div class="modal-header">
                <h5 class="modal-title">创建知识库</h5>
                <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
            </div>
            <form id="createKbForm" onsubmit="createKb(event)" enctype="multipart/form-data">
                <div class="modal-body">
                    <div class="mb-3">
                        <label class="form-label">名称 <span class="text-danger">*</span></label>
                        <input type="text" class="form-control" name="name" required>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">描述</label>
                        <textarea class="form-control" name="description" rows="3"></textarea>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">封面图片(可选)</label>
                        <input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="coverImageInput">
                        <div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB</div>
                        <div id="coverImagePreview" class="mt-2" style="display: none;">
                            <img id="coverPreviewImg" src="" alt="封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
                        </div>
                    </div>
                    <div class="row">
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块大小</label>
                            <input type="number" class="form-control" name="chunk_size" value="512" min="100" max="2000">
                            <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
                        </div>
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块重叠</label>
                            <input type="number" class="form-control" name="chunk_overlap" value="50" min="0" max="200">
                            <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
                        </div>
                    </div>
                </div>
                <div class="modal-footer">
                    <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
                    <button type="submit" class="btn btn-primary">创建</button>
                </div>
            </form>
        </div>
    </div>
</div>

<!-- 编辑知识库模态框 -->
<div class="modal fade" id="editKbModal" tabindex="-1">
    <div class="modal-dialog">
        <div class="modal-content">
            <div class="modal-header">
                <h5 class="modal-title">编辑知识库</h5>
                <button type="button" class="btn-close" data-bs-dismiss="modal"></button>
            </div>
            <form id="editKbForm" onsubmit="updateKb(event)" enctype="multipart/form-data">
                <input type="hidden" name="kb_id" id="editKbId">
                <div class="modal-body">
                    <div class="mb-3">
                        <label class="form-label">名称 <span class="text-danger">*</span></label>
                        <input type="text" class="form-control" name="name" id="editKbName" required>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">描述</label>
                        <textarea class="form-control" name="description" id="editKbDescription" rows="3"></textarea>
                    </div>
                    <div class="mb-3">
                        <label class="form-label">封面图片</label>
                        <div id="editCoverPreview" class="mb-2">
                            <img id="editCoverPreviewImg" src="" alt="当前封面" class="img-thumbnail" style="max-width: 200px; max-height: 200px; display: none;">
                            <div id="editCoverNoImage" class="text-muted small" style="display: none;">暂无封面</div>
                        </div>
                        <input type="file" class="form-control" name="cover_image" accept="image/jpeg,image/png,image/gif,image/webp" id="editCoverImageInput">
                        <div class="form-text">支持 JPG、PNG、GIF、WEBP 格式,最大 5MB。留空则不修改封面。</div>
                        <div class="form-check mt-2">
                            <input class="form-check-input" type="checkbox" name="delete_cover" id="editDeleteCover" value="true">
                            <label class="form-check-label" for="editDeleteCover">
                                删除封面图片
                            </label>
                        </div>
                        <div id="editCoverNewPreview" class="mt-2" style="display: none;">
                            <img id="editCoverNewPreviewImg" src="" alt="新封面预览" class="img-thumbnail" style="max-width: 200px; max-height: 200px;">
                        </div>
                    </div>
                    <div class="row">
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块大小</label>
                            <input type="number" class="form-control" name="chunk_size" id="editKbChunkSize" value="512" min="100" max="2000">
                            <div class="form-text">每个文本块的最大字符数,建议 512-1024</div>
                        </div>
                        <div class="col-md-6 mb-3">
                            <label class="form-label">分块重叠</label>
                            <input type="number" class="form-control" name="chunk_overlap" id="editKbChunkOverlap" value="50" min="0" max="200">
                            <div class="form-text">相邻块之间的重叠字符数,建议 50-100</div>
                        </div>
                    </div>
                </div>
                <div class="modal-footer">
                    <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
                    <button type="submit" class="btn btn-primary">保存</button>
                </div>
            </form>
        </div>
    </div>
</div>
{% endblock %}

{% block extra_js %}
<script>
// 异步函数,用于创建知识库
async function createKb(event) {
    // 阻止表单默认提交
    event.preventDefault();
    // 获取表单对象
    const form = event.target;
    // 构造 FormData,收集表单数据
    const formData = new FormData(form);

    try {
        // 发送 POST 请求到后端 API,提交表单数据
        const response = await fetch('/api/v1/kb', {
            method: 'POST',
            // body为FormData,浏览器会自动设置Content-Type
            body: formData  
        });

        // 如果响应成功,刷新页面
        if (response.ok) {
            location.reload();
        } else {
            // 否则获取错误信息并弹窗提示
            const error = await response.json();
            alert('创建失败: ' + error.message);
        }
    } catch (error) {
        // 捕获异常并弹窗提示用户
        alert('创建失败: ' + error.message);
    }
}

// 异步函数,用于删除知识库
async function deleteKb(kbId, kbName) {
    // 弹窗确认是否删除知识库
    if (!confirm(`确定要删除知识库 "${kbName}" 吗?此操作不可恢复!`)) {
        return;
    }
    try {
        // 发送 DELETE 请求到后端 API
        const response = await fetch(`/api/v1/kb/${kbId}`, {
            method: 'DELETE'
        });

        // 如果响应成功,刷新页面
        if (response.ok) {
            location.reload();
        } else {
            // 否则弹窗提示错误信息
            const error = await response.json();
            alert('删除失败: ' + error.message);
        }
    } catch (error) {
        // 捕获异常并弹窗提示
        alert('删除失败: ' + error.message);
    }
}

// 从按钮的 data 属性读取知识库数据,然后打开编辑界面
function editKbFromButton(button) {
    // 获取知识库ID
    const kbId = button.getAttribute('data-kb-id');
    // 获取知识库名称
    const name = button.getAttribute('data-kb-name');
    // 获取描述,默认为空字符串
    const description = button.getAttribute('data-kb-description') || '';
    // 获取分块大小,默认512
    const chunkSize = parseInt(button.getAttribute('data-kb-chunk-size')) || 512;
    // 获取分块重叠,默认50
    const chunkOverlap = parseInt(button.getAttribute('data-kb-chunk-overlap')) || 50;
    // 获取封面图片路径,默认为空
    const coverImage = button.getAttribute('data-kb-cover-image') || '';

    // 调用编辑函数,填充数据到表单
    editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage);
}

// 编辑知识库时弹出模态框并初始化数据
function editKb(kbId, name, description, chunkSize, chunkOverlap,coverImage) {
    // 设置表单的知识库ID
    document.getElementById('editKbId').value = kbId;
    // 设置知识库名称
    document.getElementById('editKbName').value = name;
    // 设置描述
    document.getElementById('editKbDescription').value = description || '';
    // 设置分块大小
    document.getElementById('editKbChunkSize').value = chunkSize;
    // 设置分块重叠
    document.getElementById('editKbChunkOverlap').value = chunkOverlap;
    // 初始化不勾选删除封面
    document.getElementById('editDeleteCover').checked = false;
    // 清空已选择的新封面文件
    document.getElementById('editCoverImageInput').value = '';

    // 获取当前封面预览图片元素
    const previewImg = document.getElementById('editCoverPreviewImg');
    // 获取“暂无封面”提示元素
    const noImageDiv = document.getElementById('editCoverNoImage');
    // 获取新封面预览div
    const newPreview = document.getElementById('editCoverNewPreview');
    // 获取新封面预览图片元素
    const newPreviewImg = document.getElementById('editCoverNewPreviewImg');

    // 隐藏新图片预览
    if (newPreview) {
        newPreview.style.display = 'none';
    }
    // 清空新图片预览src
    if (newPreviewImg) {
        newPreviewImg.src = '';
    }

    // 如果有旧封面,则显示
    if (coverImage) {
        previewImg.src = `/kb/${kbId}/cover`;
        previewImg.style.display = 'block';
        noImageDiv.style.display = 'none';
    } else {
        // 否则显示“暂无封面”
        previewImg.style.display = 'none';
        noImageDiv.style.display = 'block';
    }

    // 展示编辑知识库的模态框
    const modal = new bootstrap.Modal(document.getElementById('editKbModal'));
    modal.show();
}


// 获取编辑时选择封面图片的input
const editCoverImageInput = document.getElementById('editCoverImageInput');
// 如果找到了input,则监听change事件
if (editCoverImageInput) {
    editCoverImageInput.addEventListener('change', function(e) {
        // 获取用户选择的第一个文件
        const file = e.target.files[0];
        // 获取新封面预览容器
        const newPreview = document.getElementById('editCoverNewPreview');
        // 获取新封面预览图片元素
        const newPreviewImg = document.getElementById('editCoverNewPreviewImg');
        // 获取删除封面复选框
        const deleteCheckbox = document.getElementById('editDeleteCover');

        // 如果有选文件
        if (file) {
            // 定义允许的图片类型
            const validTypes = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif', 'image/webp'];
            // 如果不符合要求的格式,弹窗提示并重置
            if (!validTypes.includes(file.type)) {
                alert('不支持的图片格式,请选择 JPG、PNG、GIF 或 WEBP 格式的图片');
                e.target.value = '';
                if (newPreview) {
                    newPreview.style.display = 'none';
                }
                return;
            }

            // 如果图片超过5MB,弹窗提示并重置
            if (file.size > 5 * 1024 * 1024) {
                alert('图片文件大小超过 5MB 限制');
                e.target.value = '';
                if (newPreview) {
                    newPreview.style.display = 'none';
                }
                return;
            }

            // 文件读取器,用于预览图片
            const reader = new FileReader();
            // 文件读取完成后显示预览
            reader.onload = function(event) {
                if (newPreviewImg) {
                    newPreviewImg.src = event.target.result;
                }
                if (newPreview) {
                    newPreview.style.display = 'block';
                }
                // 选择新图片时自动取消删除封面的选项
                if (deleteCheckbox) {
                    deleteCheckbox.checked = false; 
                }
            };
            // 读取图片失败时弹窗提示
            reader.onerror = function() {
                alert('读取图片文件失败,请重试');
                e.target.value = '';
                if (newPreview) {
                    newPreview.style.display = 'none';
                }
            };
            // 以DataURL形式读取以便预览
            reader.readAsDataURL(file);
        } else {
            // 未选择文件则隐藏新封面预览
            if (newPreview) {
                newPreview.style.display = 'none';
            }
        }
    });
}

// 监听删除封面复选框的变化
document.getElementById('editDeleteCover')?.addEventListener('change', function(e) {
    // 获取封面图片上传input
    const fileInput = document.getElementById('editCoverImageInput');
    // 获取新封面预览div
    const newPreview = document.getElementById('editCoverNewPreview');

    // 如果选中“删除封面”,则清空图片和隐藏新封面预览
    if (e.target.checked) {
        fileInput.value = ''; // 清空文件选择
        newPreview.style.display = 'none';
    }
});

// 异步函数,用于更新知识库
async function updateKb(event) {
    // 阻止表单默认提交
    event.preventDefault();
    // 获取表单对象
    const form = event.target;
    // 获取表单数据
    const formData = new FormData(form);
    // 从formData中获取知识库ID
    const kbId = formData.get('kb_id');

    try {
        // 发送PUT请求到后端API更新知识库
        const response = await fetch(`/api/v1/kb/${kbId}`, {
            method: 'PUT',
            body: formData
        });

        // 如果更新成功,刷新页面
        if (response.ok) {
            location.reload();
        } else {
            // 否则弹窗显示错误
            const error = await response.json();
            alert('更新失败: ' + error.message);
        }
    } catch (error) {
        // 捕获异常弹窗显示
        alert('更新失败: ' + error.message);
    }
}
// 获取封面图片文件输入框并监听change事件(当用户选择文件时触发)
document.getElementById('coverImageInput')?.addEventListener('change', function(e) {
    // 获取用户选中的第一个文件
    const file = e.target.files[0];
    // 获取用于显示预览的容器
    const preview = document.getElementById('coverImagePreview');
    // 获取显示图片的img标签
    const previewImg = document.getElementById('coverPreviewImg');

    // 如果用户选择了文件
    if (file) {
        // 创建文件读取器
        const reader = new FileReader();
        // 文件读取完成后回调
        reader.onload = function(e) {
            // 将img标签的src设置为读取得到的图片数据
            previewImg.src = e.target.result;
            // 显示预览容器
            preview.style.display = 'block';
        };
        // 以DataURL的形式读取图片文件
        reader.readAsDataURL(file);
    } else {
        // 如果没有选择文件,则隐藏预览
        preview.style.display = 'none';
    }
});

+// 搜索和排序功能
+function updateSearch() {
+   document.getElementById('searchForm').submit();
+}

+function clearSearch() {
+   document.getElementById('searchInput').value = '';
+   // 移除搜索参数,保留排序参数
+   const url = new URL(window.location.href);
+   url.searchParams.delete('search');
+   url.searchParams.set('page', '1');
+   window.location.href = url.toString();
+}

+// 搜索框回车提交
+document.getElementById('searchInput')?.addEventListener('keypress', function(e) {
+   if (e.key === 'Enter') {
+       e.preventDefault();
+       updateSearch();
+   }
+});
</script>
{% endblock %}

← 上一节 17.权限管理 下一节 19.设置 →

访问验证

请输入访问令牌

Token不正确,请重新输入