From 30c334a2546cd874d306b43c484c5c5d1a188bfb Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 30 Sep 2025 10:57:11 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=9B=B4=E6=94=B9=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E5=8E=BB=E9=99=A4Flow=E5=AD=97=E6=A0=B7=EF=BC=9B=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0Step=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/queue.py | 11 +++++------ apps/llm/function.py | 1 + apps/models/__init__.py | 12 +++++++++++- apps/models/task.py | 30 ++++++++++++++++++++++++------ apps/routers/chat.py | 39 +++------------------------------------ 5 files changed, 44 insertions(+), 49 deletions(-) diff --git a/apps/common/queue.py b/apps/common/queue.py index a916c460..c0981527 100644 --- a/apps/common/queue.py +++ b/apps/common/queue.py @@ -56,22 +56,21 @@ class MessageQueue: # 如果使用了Flow flow = MessageFlow( appId=task.state.appId, - flowId=task.state.executorId, - flowName=task.state.executorName, - flowStatus=task.state.executorStatus, + executorId=task.state.executorId, + executorName=task.state.executorName, + executorStatus=task.state.executorStatus, stepId=task.state.stepId, stepName=task.state.stepName, - stepDescription=task.state.stepDescription, stepStatus=task.state.stepStatus, + stepType=task.state.stepType, ) else: flow = None message = MessageBase( event=event_type, - id=task.metadata.record_id, + id=task.metadata.id, conversationId=task.metadata.conversationId, - taskId=task.metadata.id, metadata=metadata, flow=flow, content=data, diff --git a/apps/llm/function.py b/apps/llm/function.py index 6725c46c..63e9ec1d 100644 --- a/apps/llm/function.py +++ b/apps/llm/function.py @@ -120,6 +120,7 @@ class JsonGenerator: autoescape=False, trim_blocks=True, lstrip_blocks=True, + extensions=["jinja2.ext.loopcontrols"], ) self._err_info = "" diff --git a/apps/models/__init__.py b/apps/models/__init__.py index b67b00ee..899b564b 100644 --- a/apps/models/__init__.py +++ b/apps/models/__init__.py @@ -14,7 +14,16 @@ from .record import FootNoteType, Record, RecordFootNote, RecordMetadata from .service import Service, ServiceACL, ServiceHashes from .session import Session, SessionActivity, SessionType from .tag import Tag -from .task import ExecutorCheckpoint, ExecutorHistory, ExecutorStatus, LanguageType, StepStatus, Task, TaskRuntime +from .task import ( + ExecutorCheckpoint, + ExecutorHistory, + ExecutorStatus, + LanguageType, + StepStatus, + StepType, + Task, + TaskRuntime, +) from .user import User, UserAppUsage, UserFavorite, UserFavoriteType, UserTag __all__ = [ @@ -57,6 +66,7 @@ __all__ = [ "SessionActivity", "SessionType", "StepStatus", + "StepType", "Tag", "Task", "TaskRuntime", diff --git a/apps/models/task.py b/apps/models/task.py index 5692fff8..86183666 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -44,14 +44,32 @@ class StepStatus(str, PyEnum): CANCELLED = "cancelled" +class StepType(str, PyEnum): + """步骤类型""" + + API = "api" + CHOICE = "choice" + CMD = "cmd" + CONVERT = "convert" + FACTS = "facts" + GRAPH = "graph" + LLM = "llm" + MCP = "mcp" + RAG = "rag" + SLOT = "slot" + SUMMARY = "summary" + SQL = "sql" + SUGGEST = "suggest" + + class Task(Base): """任务""" __tablename__ = "framework_task" userSub: Mapped[str] = mapped_column(String(255), ForeignKey("framework_user.sub")) # noqa: N815 """用户ID""" - conversationId: Mapped[uuid.UUID] = mapped_column( # noqa: N815 - UUID(as_uuid=True), ForeignKey("framework_conversation.id"), nullable=False, + conversationId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815 + UUID(as_uuid=True), ForeignKey("framework_conversation.id"), nullable=True, ) """对话ID""" checkpointId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815 @@ -124,12 +142,12 @@ class ExecutorCheckpoint(Base): """步骤名称""" stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815 """步骤状态""" + stepType: Mapped[StepType] = mapped_column(Enum(StepType), nullable=False) # noqa: N815 + """步骤类型""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """检查点ID""" executorDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 """执行器描述""" - stepDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 - """步骤描述""" data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) """步骤额外数据""" errorMessage: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 @@ -153,12 +171,12 @@ class ExecutorHistory(Base): """步骤ID""" stepName: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 """步骤名称""" + stepType: Mapped[StepType] = mapped_column(Enum(StepType), nullable=False) # noqa: N815 + """步骤类型""" stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815 """步骤状态""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """执行器历史ID""" - stepDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 - """步骤描述""" inputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 """步骤输入数据""" outputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 6bfb853d..32e8c81c 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -3,7 +3,6 @@ import asyncio import logging -import uuid from collections.abc import AsyncGenerator from fastapi import APIRouter, Depends, HTTPException, Request, status @@ -15,16 +14,12 @@ from apps.dependency import verify_personal_token, verify_session from apps.models import ExecutorStatus from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data -from apps.schemas.request_data import RequestData, RequestDataApp +from apps.schemas.request_data import RequestData from apps.schemas.response_data import ResponseData -from apps.schemas.task import TaskData from apps.services import ( Activity, - ConversationManager, FlowManager, QuestionBlacklistManager, - RecordManager, - TaskManager, UserBlacklistManager, ) @@ -40,34 +35,6 @@ router = APIRouter( ) -async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> TaskData: - """初始化Task""" - # 更改信息并刷新数据库 - if post_body.task_id is None: - conversation = await ConversationManager.get_conversation_by_conversation_id( - user_sub=user_sub, - conversation_id=post_body.conversation_id, - ) - if not conversation: - err = "[Chat] 用户没有权限访问该对话!" - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=err) - task_ids = await TaskManager.delete_tasks_by_conversation_id(post_body.conversation_id) - await RecordManager.update_record_flow_status_to_cancelled_by_task_ids(task_ids) - task = await TaskManager.init_new_task(user_sub=user_sub, session_id=session_id, post_body=post_body) - task.runtime.question = post_body.question - task.state.app_id = post_body.app.app_id if post_body.app else "" - else: - if not post_body.task_id: - err = "[Chat] task_id 不可为空!" - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="task_id cannot be empty") - task = await TaskManager.get_task_data_by_task_id(post_body.task_id) - post_body.app = RequestDataApp(appId=task.state.app_id) - post_body.conversation_id = task.ids.conversation_id - post_body.language = task.language - post_body.question = task.runtime.question - return task - - async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]: """进行实际问答,并从MQ中获取消息""" try: @@ -103,14 +70,14 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) # 获取最终答案 task = scheduler.task - if task.state.flow_status == ExecutorStatus.ERROR: + if task.state.executorStatus == ExecutorStatus.ERROR: _logger.error("[Chat] 生成答案失败") yield "data: [ERROR]\n\n" await Activity.remove_active(user_sub) return # 对结果进行敏感词检查 - if await WordsCheck().check(task.runtime.answer) != 1: + if await WordsCheck().check(task.runtime.fullAnswer) != 1: yield "data: [SENSITIVE]\n\n" _logger.info("[Chat] 答案包含敏感词!") await Activity.remove_active(user_sub) -- Gitee From 669a070b3c177fa7ca8dc2bdee012d24a31e4841 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 30 Sep 2025 10:59:39 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E4=BF=AE=E6=AD=A3Service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/schemas/enum_var.py | 1 - apps/schemas/message.py | 40 ++++++++---------------------------- apps/schemas/request_data.py | 7 +++---- apps/schemas/scheduler.py | 18 ++++++++-------- apps/services/mcp_service.py | 25 ++++++++++++++-------- 5 files changed, 37 insertions(+), 54 deletions(-) diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index d5a8ba26..5606ed92 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -28,7 +28,6 @@ class EventType(str, Enum): INIT = "init" TEXT_ADD = "text.add" GRAPH = "graph" - DOCUMENT_ADD = "document.add" STEP_WAITING_FOR_START = "step.waiting_for_start" STEP_WAITING_FOR_PARAM = "step.waiting_for_param" FLOW_START = "flow.start" diff --git a/apps/schemas/message.py b/apps/schemas/message.py index 1e36cd27..3b2b1706 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -2,7 +2,6 @@ """队列中的消息结构""" import uuid -from datetime import UTC, datetime from typing import Any from pydantic import BaseModel, Field @@ -31,20 +30,16 @@ class HeartbeatData(BaseModel): class MessageFlow(BaseModel): """消息中有关Flow信息的部分""" - app_id: uuid.UUID = Field(description="插件ID", alias="appId") - flow_id: str = Field(description="Flow ID", alias="flowId") - flow_name: str = Field(description="Flow名称", alias="flowName") - flow_status: ExecutorStatus = Field(description="Flow状态", alias="flowStatus", default=ExecutorStatus.UNKNOWN) + app_id: uuid.UUID | None = Field(description="插件ID", alias="appId", default=None) + executor_id: str = Field(description="Flow ID", alias="executorId") + executor_name: str = Field(description="Flow名称", alias="executorName") + executor_status: ExecutorStatus = Field( + description="Flow状态", alias="executorStatus", default=ExecutorStatus.UNKNOWN, + ) step_id: uuid.UUID = Field(description="当前步骤ID", alias="stepId") step_name: str = Field(description="当前步骤名称", alias="stepName") - sub_step_id: str | None = Field(description="当前子步骤ID", alias="subStepId", default=None) - sub_step_name: str | None = Field(description="当前子步骤名称", alias="subStepName", default=None) + step_type: str = Field(description="当前步骤类型", alias="stepType") step_status: StepStatus = Field(description="当前步骤状态", alias="stepStatus") - step_description: str | None = Field( - description="当前步骤描述", - alias="stepDescription", - default=None, - ) class MessageMetadata(RecordMetadata): @@ -75,28 +70,11 @@ class TextAddContent(BaseModel): text: str = Field(min_length=1, description="流式生成的文本内容") -class DocumentAddContent(BaseModel): - """document.add消息的content""" - - document_id: str = Field(description="文档UUID", alias="documentId") - document_order: int = Field(description="文档在对话中的顺序,从1开始", alias="documentOrder") - document_author: str = Field(description="文档作者", alias="documentAuthor", default="") - document_name: str = Field(description="文档名称", alias="documentName") - document_abstract: str = Field(description="文档摘要", alias="documentAbstract", default="") - document_type: str = Field(description="文档MIME类型", alias="documentType", default="") - document_size: float = Field(ge=0, description="文档大小,单位是KB,保留两位小数", alias="documentSize", default=0) - created_at: float = Field( - description="文档创建时间,单位是秒", alias="createdAt", - default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3), - ) - - class MessageBase(HeartbeatData): """基础消息事件结构""" - id: str = Field(min_length=36, max_length=36) - conversation_id: uuid.UUID = Field(min_length=36, max_length=36, alias="conversationId") - task_id: uuid.UUID = Field(min_length=36, max_length=36, alias="taskId") + id: uuid.UUID = Field(min_length=36, max_length=36) + conversation_id: uuid.UUID | None = Field(min_length=36, max_length=36, alias="conversationId", default=None) flow: MessageFlow | None = None content: Any | None = Field(default=None, description="消息内容") metadata: MessageMetadata diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index b59918b9..7d1916b8 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -25,15 +25,14 @@ class RequestData(BaseModel): """POST /api/chat 请求的总的数据结构""" question: str = Field(max_length=2000, description="用户输入") - conversation_id: uuid.UUID = Field( - default=uuid.UUID("00000000-0000-0000-0000-000000000000"), alias="conversationId", description="聊天ID", + conversation_id: uuid.UUID | None = Field( + default=None, alias="conversationId", description="聊天ID", ) language: LanguageType = Field(default=LanguageType.CHINESE, description="语言") - files: list[str] = Field(default=[], description="文件列表") app: RequestDataApp | None = Field(default=None, description="应用") - debug: bool = Field(default=False, description="是否调试") task_id: str | None = Field(default=None, alias="taskId", description="任务ID") llm_id: str = Field(alias="llmId", description="大模型ID") + kb_ids: list[uuid.UUID] = Field(default=[], description="知识库ID列表") class PostTagData(BaseModel): diff --git a/apps/schemas/scheduler.py b/apps/schemas/scheduler.py index 2686b4d4..6b48db20 100644 --- a/apps/schemas/scheduler.py +++ b/apps/schemas/scheduler.py @@ -10,7 +10,6 @@ from apps.llm import Embedding, FunctionLLM, ReasoningLLM from apps.models import ExecutorHistory, LanguageType from .enum_var import CallOutputType -from .scheduler import ExecutorBackground class LLMConfig(BaseModel): @@ -36,6 +35,15 @@ class CallIds(BaseModel): session_id: str | None = Field(description="当前用户的Session ID") app_id: uuid.UUID | None = Field(description="当前应用的ID") user_sub: str = Field(description="当前用户的用户ID") + conversation_id: uuid.UUID | None = Field(description="当前对话的ID") + + +class ExecutorBackground(BaseModel): + """Executor的背景信息""" + + num: int = Field(description="对话记录最大数量", default=0) + conversation: list[dict[str, str]] = Field(description="对话记录", default=[]) + facts: list[str] = Field(description="当前Executor的背景信息", default=[]) class CallVars(BaseModel): @@ -50,14 +58,6 @@ class CallVars(BaseModel): language: LanguageType = Field(description="语言", default=LanguageType.CHINESE) -class ExecutorBackground(BaseModel): - """Executor的背景信息""" - - num: int = Field(description="对话记录最大数量", default=0) - conversation: list[dict[str, str]] = Field(description="对话记录", default=[]) - facts: list[str] = Field(description="当前Executor的背景信息", default=[]) - - class CallError(Exception): """Call错误""" diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index 6f48c953..14cd6230 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -9,7 +9,7 @@ from typing import Any import magic from fastapi import UploadFile from PIL import Image -from sqlalchemy import and_, or_, select +from sqlalchemy import and_, delete, or_, select from apps.common.postgres import postgres from apps.constants import ( @@ -25,6 +25,7 @@ from apps.models import ( MCPTools, MCPType, ) +from apps.models.app import AppMCP from apps.scheduler.pool.loader.mcp import MCPLoader from apps.scheduler.pool.mcp.pool import MCPPool from apps.schemas.enum_var import SearchType @@ -310,8 +311,15 @@ class MCPServiceManager: msg = "[MCPServiceManager] MCP服务未找到或无权限" raise ValueError(msg) - for user_id in db_service.activated: - await MCPServiceManager.deactive_mcpservice(user_sub=user_id, mcp_id=data.mcp_id) + # 查询所有激活该MCP服务的用户 + async with postgres.session() as session: + activated_users = (await session.scalars(select(MCPActivated).where( + MCPActivated.mcpId == data.mcp_id, + ))).all() + + # 为每个激活的用户取消激活 + for activated_user in activated_users: + await MCPServiceManager.deactive_mcpservice(user_sub=activated_user.userSub, mcp_id=data.mcp_id) mcp_config = MCPServerConfig( name=data.name, @@ -349,12 +357,11 @@ class MCPServiceManager: # 删除对应的mcp await MCPLoader.delete_mcp(mcp_id) - # 遍历所有应用,将其中的MCP依赖删除 - app_collection = MongoDB().get_collection("application") - await app_collection.update_many( - {"mcp_service": mcp_id}, - {"$pull": {"mcp_service": mcp_id}}, - ) + # 从PostgreSQL中删除所有应用对应该MCP服务的关联记录 + async with postgres.session() as session: + stmt = delete(AppMCP).where(AppMCP.mcpId == mcp_id) + await session.execute(stmt) + await session.commit() @staticmethod -- Gitee From 9044fa21246735e058cb7438f950a1403cca0dd0 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 30 Sep 2025 11:00:26 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E6=BC=94=E8=BF=9BQAExecutor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/call/core.py | 1 + apps/scheduler/call/facts/facts.py | 1 + apps/scheduler/call/llm/llm.py | 1 + apps/scheduler/call/rag/rag.py | 59 ++++---- apps/scheduler/call/rag/schema.py | 2 +- apps/scheduler/executor/agent.py | 6 +- apps/scheduler/executor/base.py | 9 +- apps/scheduler/executor/prompt.py | 194 +++++++++++++++----------- apps/scheduler/executor/qa.py | 140 ++++--------------- apps/scheduler/executor/step.py | 9 +- apps/scheduler/scheduler/context.py | 20 --- apps/scheduler/scheduler/scheduler.py | 143 +++++++++++-------- 12 files changed, 280 insertions(+), 305 deletions(-) diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index c927ae6c..cf116203 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -104,6 +104,7 @@ class CoreCall(BaseModel): session_id=executor.task.runtime.sessionId, user_sub=executor.task.metadata.userSub, app_id=executor.task.state.appId, + conversation_id=executor.task.metadata.conversationId, ), question=executor.question, step_data=history, diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py index 70bf8e63..a13bee73 100644 --- a/apps/scheduler/call/facts/facts.py +++ b/apps/scheduler/call/facts/facts.py @@ -83,6 +83,7 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): autoescape=False, trim_blocks=True, lstrip_blocks=True, + extensions=["jinja2.ext.loopcontrols"], ) # 提取事实信息 diff --git a/apps/scheduler/call/llm/llm.py b/apps/scheduler/call/llm/llm.py index 05c6e3da..8140fb04 100644 --- a/apps/scheduler/call/llm/llm.py +++ b/apps/scheduler/call/llm/llm.py @@ -64,6 +64,7 @@ class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput): autoescape=False, trim_blocks=True, lstrip_blocks=True, + extensions=["jinja2.ext.loopcontrols"], ) # 上下文信息 diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py index 918e2565..c1e6e5b4 100644 --- a/apps/scheduler/call/rag/rag.py +++ b/apps/scheduler/call/rag/rag.py @@ -23,6 +23,7 @@ from apps.schemas.scheduler import ( CallOutputChunk, CallVars, ) +from apps.services import DocumentManager from .prompt import QUESTION_REWRITE from .schema import ( @@ -118,29 +119,37 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): return doc_chunk_list - #TODO:修改为正确的获取临时文档的逻辑 - async def _get_doc_info(self, doc_ids: list[str], data: RAGInput) -> AsyncGenerator[CallOutputChunk, None]: - """获取文档信息""" - doc_chunk_list = [] - # 处理指定文档ID的情况 + async def _get_temp_docs(self, conversation_id: uuid.UUID) -> list[str]: + """获取当前会话的临时文档""" + doc_ids = [] + # 从Conversation中获取刚上传的文档 + docs = await DocumentManager.get_unused_docs(conversation_id) + # 从最近10条Record中获取文档 + docs += await DocumentManager.get_used_docs(conversation_id, 10, "question") + doc_ids += [doc.id for doc in docs] + return doc_ids + + + async def _get_doc_info(self, doc_ids: list[str], data: RAGInput) -> list[DocItem]: + """获取文档信息,支持临时文档和知识库文档""" + doc_chunk_list: list[DocItem] = [] + + # 处理临时文档 if doc_ids: tmp_data = deepcopy(data) - tmp_data.kbIds = [ uuid.UUID("00000000-0000-0000-0000-000000000000") ] + tmp_data.kbIds = [uuid.UUID("00000000-0000-0000-0000-000000000000")] + tmp_data.docIds = doc_ids doc_chunk_list.extend(await self._fetch_doc_chunks(tmp_data)) # 处理知识库ID的情况 if data.kbIds: - doc_chunk_list.extend(await self._fetch_doc_chunks(data)) + kb_data = deepcopy(data) + # 知识库查询时不使用docIds,只使用kbIds + kb_data.docIds = None + doc_chunk_list.extend(await self._fetch_doc_chunks(kb_data)) - # 将文档分片转换为文本片段并返回 - for doc_chunk in doc_chunk_list: - for chunk in doc_chunk.chunks: - text = chunk.text.replace("\n", "") - yield CallOutputChunk( - type=CallOutputType.DATA, - content=text, - ) + return doc_chunk_list async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: @@ -155,8 +164,6 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): lstrip_blocks=True, ) tmpl = env.from_string(QUESTION_REWRITE[self._sys_vars.language]) - - # 仅使用当前问题,不使用历史问答数据 prompt = tmpl.render(question=data.query) # 使用_json方法直接获取JSON结果 @@ -169,18 +176,22 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): except Exception: _logger.exception("[RAG] 问题重写失败,使用原始问题") - # 获取文档片段 - doc_chunk_list = await self._fetch_doc_chunks(data) + # 获取临时文档ID列表 + if self._sys_vars.ids.conversation_id: + temp_doc_ids = await self._get_temp_docs(self._sys_vars.ids.conversation_id) + else: + temp_doc_ids = [] - corpus = [] - for doc_chunk in doc_chunk_list: - for chunk in doc_chunk.chunks: - corpus.extend([chunk.text.replace("\n", "")]) + # 合并传入的doc_ids和临时文档ID + all_doc_ids = list(set((data.docIds or []) + temp_doc_ids)) + + # 获取文档片段 + doc_chunk_list = await self._get_doc_info(all_doc_ids, data) yield CallOutputChunk( type=CallOutputType.DATA, content=RAGOutput( question=data.query, - corpus=corpus, + corpus=doc_chunk_list, ).model_dump(exclude_none=True, by_alias=True), ) diff --git a/apps/scheduler/call/rag/schema.py b/apps/scheduler/call/rag/schema.py index 69239acf..55894b7f 100644 --- a/apps/scheduler/call/rag/schema.py +++ b/apps/scheduler/call/rag/schema.py @@ -53,7 +53,7 @@ class RAGOutput(DataBase): """RAG工具的输出""" question: str = Field(description="用户输入") - corpus: list[str] = Field(description="知识库的语料列表") + corpus: list[DocItem] = Field(description="知识库的语料列表") class RAGInput(DataBase): diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 6d912e5e..cb37c69e 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -53,6 +53,8 @@ class MCPAgentExecutor(BaseExecutor): _logger.error(err) raise RuntimeError(err) self._user = user + # 获取历史 + await self._load_history() async def load_mcp(self) -> None: """加载MCP服务器列表""" @@ -168,7 +170,7 @@ class MCPAgentExecutor(BaseExecutor): return try: - output_data = await mcp_client.call_tool(mcp_tool.name, self._current_input) + output_data = await mcp_client.call_tool(mcp_tool.toolName, self._current_input) except anyio.ClosedResourceError as e: _logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcpId) # 停止当前用户MCP进程 @@ -418,7 +420,7 @@ class MCPAgentExecutor(BaseExecutor): ) await self.get_next_step() else: - mcp_tool = self.tools[self.task.state.toolId] + mcp_tool = self.tools[self.task.state.stepName] is_param_error = await self._planner.is_param_error( await self._host.assemble_memory(self.task.runtime, self.task.context), self.task.state.errorMessage, diff --git a/apps/scheduler/executor/base.py b/apps/scheduler/executor/base.py index 637e2cc7..a7e46534 100644 --- a/apps/scheduler/executor/base.py +++ b/apps/scheduler/executor/base.py @@ -26,7 +26,6 @@ class BaseExecutor(BaseModel, ABC): msg_queue: "MessageQueue" llm: "LLMConfig" - background: "ExecutorBackground" question: str model_config = ConfigDict( @@ -42,6 +41,14 @@ class BaseExecutor(BaseModel, ABC): async def _load_history(self, n: int = 3) -> None: """加载历史记录""" + # 不存在conversationId,为首个问题 + if not self.task.metadata.conversationId: + self.background = ExecutorBackground( + conversation=[], + facts=[], + num=n, + ) + return # 获取最后n+5条Record records = await RecordManager.query_record_by_conversation_id( self.task.metadata.userSub, self.task.metadata.conversationId, n + 5, diff --git a/apps/scheduler/executor/prompt.py b/apps/scheduler/executor/prompt.py index 33c72cad..cd50338f 100644 --- a/apps/scheduler/executor/prompt.py +++ b/apps/scheduler/executor/prompt.py @@ -133,39 +133,36 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = { LanguageType.CHINESE: r""" 你是一个专业的智能助手,擅长基于提供的文档内容回答用户问题。 - - - 任务:结合背景信息全面回答用户提问,并为关键信息添加脚注引用。 - + 任务:全面结合上下文和文档内容,回答用户提问,并为关键信息添加脚注引用。 - - 背景信息: 标签中提供相关文档内容 + - 上下文:请参考先前的对话 + - 文档内容: 标签中提供相关文档内容 - 用户问题: 标签中提供具体问题 - 参考示例: 标签中展示期望的输出格式 1. 格式要求: - - 纯文本输出,不使用任何格式标记(标题、加粗、列表等) + - 输出不要包含任何XML标签 - 脚注格式:[[1]]、[[2]]、[[3]],数字对应文档ID - 脚注紧跟相关句子的标点符号后 2. 内容要求: - - 基于文档内容回答,不编造信息 + - 不编造信息,充分结合上下文和文档内容 - 如问题与文档无关,直接回答而不使用文档 - - 回答应结构清晰:背景→核心→扩展→总结 - - 充分引用文档内容,提供详细信息 + - 回答应结构清晰:背景→核心→扩展→总结,并且内容全面 3. 引用规范: - - 文档ID从1开始按顺序递增 - 不得使用示例中的文档序号 - 关键信息必须添加脚注 + - 标注时选择关联性最强的切块对应的文档 - - + + 1 openEuler介绍文档 @@ -174,8 +171,8 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = { openEuler社区的核心目标是面向服务器、云、边缘计算等场景,为用户提供一个稳定、安全、高效的操作系统平台,并且支持x86、ARM等多种硬件架构。 - - + + 2 社区发展报告 @@ -184,8 +181,8 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = { 社区成员通过技术贡献、代码提交、文档编写、测试验证等多种方式,共同推动开源操作系统的发展,并为用户提供技术支持和社区服务。 - - + + openEuler社区的目标是什么?有哪些特色? @@ -205,115 +202,156 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = { 形成了良性的开源生态系统。[[2]] - - {bac_info} - + + {% set __ctx_len = ctx_length|default(0) %} + {% set max_length = max_length if max_length is not none else (__ctx_len * 0.7)|int %} + {% set __total_len = 0 %} + {% set __stop = false %} + {% for item in documents %} + + {{item.doc_id}} + {{item.doc_name}} + {% for chunk in item.chunks %} + {% set __chunk_len = (chunk.text|length) %} + {% if (__total_len + __chunk_len) > max_length %} + {% set __stop = true %} + {% break %} + {% endif %} + {{chunk.text}} + {% set __total_len = __total_len + __chunk_len %} + {% endfor %} + + {% if __stop %} + {% break %} + {% endif %} + {% endfor %} + - {user_question} + {{user_question}} 请基于上述背景信息和用户问题,按照指令要求生成详细、结构清晰的回答: """, LanguageType.ENGLISH: r""" - You are a professional assistant who specializes in answering user questions based on provided document content. - - - Task: Answer user questions comprehensively based on background information and add footnote references to \ -key information. - + You are a professional intelligent assistant, adept at answering user questions + based on the provided documents. + Task: Combine the context and document content comprehensively to answer the + user's question, and add footnote citations for key information. - - Background information: Provided in tags with relevant document content - - User question: Provided in tags with specific questions - - Reference example: Shown in tags with expected output format + - Context: Please refer to the prior conversation + - Document content: Relevant content is provided within the tag + - User question: The specific question is provided within the tag + - Reference example: The expected output format is shown within the tag 1. Format requirements: - - Plain text output, no formatting marks (headings, bold, lists, etc.) + - Do not include any XML tags in the output - Footnote format: [[1]], [[2]], [[3]], with numbers corresponding to document IDs - - Footnotes placed right after punctuation of related sentences + - Place footnotes immediately after the punctuation of the related sentence 2. Content requirements: - - Answer based on document content, do not fabricate information - - If the question is unrelated to the document, answer directly without using the document - - Answers should be well-structured: background → core → expansion → summary - - Fully cite document content with detailed information + - Do not fabricate information; fully integrate context and document content + - If the question is unrelated to the documents, answer directly without using the documents + - The answer should be clearly structured: background → core → expansion → summary, and be comprehensive 3. Citation specifications: - - Document IDs start from 1 and increase sequentially - - Do not use document numbers from examples - - Key information must have footnotes + - Do not use the document numbers from the example + - Key information must include footnotes + - When annotating, select the document corresponding to the most relevant chunk - - + + 1 - openEuler Introduction Document + Introduction to openEuler - The openEuler community is an open-source operating system community dedicated to promoting the \ -development of Linux operating systems. The community was initiated by Huawei in 2019, aiming to build an open \ -and collaborative operating system ecosystem. + The openEuler community is an open-source operating system community dedicated + to advancing the Linux operating system. The community was initiated by Huawei + in 2019 to build an open and collaborative OS ecosystem. - The core goal of the openEuler community is to provide users with a stable, secure, and efficient \ -operating system platform for scenarios such as servers, clouds, and edge computing, supporting multiple hardware \ -architectures such as x86 and ARM. + The core goal of the openEuler community is to provide a stable, secure, and + efficient operating system platform for scenarios such as servers, cloud, and + edge computing, and it supports multiple hardware architectures including x86 + and ARM. - - + + 2 Community Development Report - Members of the openEuler community come from all over the world, including developers, users, \ -enterprise partners, and academic institutions. As of 2023, the community has over 300 enterprises and \ -organizations participating in contributions. + Members of the openEuler community come from around the world, including + developers, users, enterprise partners, and academic institutions. As of 2023, + over 300 enterprises and organizations have contributed to the community. - Community members contribute through various ways such as technical contributions, code \ -submissions, documentation writing, and testing verification, jointly promoting the development of \ -open-source operating systems and providing technical support and community services to users. + Community members jointly promote the development of open-source operating + systems and provide users with technical support and community services through + technical contributions, code submissions, documentation writing, and testing. - - + + What are the goals and characteristics of the openEuler community? - Please generate a detailed and well-structured answer based on the background information and user \ -question according to the instructions: + Please generate a detailed and well-structured answer based on the above background + information and user question: - The openEuler community is an open-source operating system community initiated by Huawei, dedicated \ -to promoting the development of Linux operating systems since its establishment in 2019. [[1]] + The openEuler community, initiated by Huawei in 2019, is an open-source operating + system community dedicated to advancing Linux development. [[1]] - The core goal of the community is to provide users with a stable, secure, and efficient operating \ -system platform for various application scenarios such as servers, clouds, and edge computing. [[1]] - At the same time, openEuler supports multiple hardware architectures such as x86 and ARM, offering \ -excellent cross-platform compatibility. [[1]] + Its core goal is to provide a stable, secure, and efficient operating system platform + for servers, cloud, and edge computing. [[1]] It also supports multiple hardware + architectures, including x86 and ARM, offering strong cross-platform compatibility. [[1]] - In terms of community building, openEuler gathers developers, users, enterprise partners, and \ -academic institutions from around the world. [[2]] - Currently, over 300 enterprises and organizations have participated in community contributions, \ -[[2]] jointly promoting the development of open-source operating systems through technical contributions, \ -code submissions, documentation writing, and testing verification. [[2]] + In terms of community building, openEuler brings together developers, users, enterprise + partners, and academic institutions worldwide. [[2]] To date, over 300 enterprises and + organizations have participated in contributions, jointly promoting open-source OS + development through technical contributions, code submissions, documentation, and + testing. [[2]] - This open collaboration model not only promotes technological innovation but also provides \ -comprehensive technical support and community services to users, forming a positive open-source ecosystem. [[2]] + This open collaboration model not only drives technological innovation but also provides + comprehensive technical support and community services to users, forming a healthy + open-source ecosystem. [[2]] - - {bac_info} - + + {# Compute default max length: prefer provided max_length, else use ctx_length*0.7 #} + {% set __ctx_len = ctx_length|default(0) %} + {% set max_length = max_length if max_length is not none else (__ctx_len * 0.7)|int %} + {% set __total_len = 0 %} + {% set __stop = false %} + {% for item in documents %} + + {{item.doc_id}} + {{item.doc_name}} + {% for chunk in item.chunks %} + {% set __chunk_len = (chunk.text|length) %} + {% if (__total_len + __chunk_len) > max_length %} + {% set __stop = true %} + {% break %} + {% endif %} + {{chunk.text}} + {% set __total_len = __total_len + __chunk_len %} + {% endfor %} + + {% if __stop %} + {% break %} + {% endif %} + {% endfor %} + - {user_question} + {{user_question}} - Please generate a detailed and well-structured answer based on the background information and user question \ -according to the instructions: + Please generate a detailed and well-structured answer based on the above background information and user question: """, } diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py index 46b11922..338f7ade 100644 --- a/apps/scheduler/executor/qa.py +++ b/apps/scheduler/executor/qa.py @@ -1,25 +1,19 @@ """用于执行智能问答的Executor""" -import json import logging import uuid -from collections.abc import AsyncGenerator from datetime import UTC, datetime -from apps.llm import TokenCalculator -from apps.models import Document, ExecutorCheckpoint, ExecutorStatus, StepStatus +from apps.models import ExecutorCheckpoint, ExecutorStatus, StepStatus from apps.models.task import LanguageType -from apps.scheduler.call.rag.schema import DocItem, RAGInput +from apps.scheduler.call.rag.schema import DocItem, RAGOutput from apps.schemas.document import DocumentInfo from apps.schemas.enum_var import EventType, SpecialCallType from apps.schemas.flow import Step -from apps.schemas.message import DocumentAddContent, TextAddContent -from apps.schemas.record import RecordDocument +from apps.schemas.message import DocumentAddContent from apps.schemas.task import StepQueueItem -from apps.services import DocumentManager from .base import BaseExecutor -from .prompt import GEN_RAG_ANSWER from .step import StepExecutor _logger = logging.getLogger(__name__) @@ -101,81 +95,27 @@ class QAExecutor(BaseExecutor): appId=None, ) - async def _get_docs(self, conversation_id: uuid.UUID) -> tuple[list[RecordDocument] | list[Document], list[str]]: - """获取当前问答可供关联的文档""" - doc_ids = [] - # 从Conversation中获取刚上传的文档 - docs = await DocumentManager.get_unused_docs(conversation_id) - # 从最近10条Record中获取文档 - docs += await DocumentManager.get_used_docs(conversation_id, 10, "question") - doc_ids += [doc.id for doc in docs] - return docs, doc_ids - - async def _push_rag_text(self, content: str) -> None: - """推送RAG单个消息块""" - # 如果是换行 - if not content or not content.rstrip().rstrip("\n"): - return - await self._push_message( - event_type=EventType.TEXT_ADD.value, - data=TextAddContent(text=content).model_dump(exclude_none=True, by_alias=True), - ) - - async def _construct_prompt( - self, - doc_ids: list[str], - data: RAGInput, - ) -> AsyncGenerator[str, None]: - """获取RAG服务的结果""" - # 获取文档信息 - doc_chunk_list = await self._fetch_doc_chunks(data) - bac_info, doc_info_list = await self._assemble_doc_info( - doc_chunk_list=doc_chunk_list, max_tokens=8192, - ) - # 构建提示词 - prompt_template = GEN_RAG_ANSWER[self.task.runtime.language] - prompt = prompt_template.format(bac_info=bac_info, user_question=data.query) - # 计算token数 - input_tokens = TokenCalculator().calculate_token_length(messages=[{"role": "system", "content": prompt}]) - output_tokens = 0 - doc_cnt: int = 0 - for doc_info in doc_info_list: - doc_cnt = max(doc_cnt, doc_info.order) - yield ( - "data: " - + json.dumps( - { - "event_type": EventType.DOCUMENT_ADD.value, - "input_tokens": input_tokens, - "output_tokens": output_tokens, - "content": doc_info.model_dump(), - }, - ensure_ascii=False, - ) - + "\n\n" - ) - # 使用LLM的推理模型调用大模型 - messages = [ - {"role": "system", "content": prompt}, - {"role": "user", "content": data.query}, - ] - async def _assemble_doc_info( self, doc_chunk_list: list[DocItem], max_tokens: int, - ) -> tuple[str, list[DocumentInfo]]: + ) -> list[DocumentInfo]: """组装文档信息""" - bac_info = "" doc_info_list = [] doc_cnt = 0 doc_id_map = {} - remaining_tokens = round(max_tokens * 0.8) + # 预留tokens + _ = round(max_tokens * 0.8) for doc_chunk in doc_chunk_list: if doc_chunk.docId not in doc_id_map: doc_cnt += 1 # 创建DocumentInfo对象 + created_at_value = ( + doc_chunk.docCreatedAt.timestamp() + if isinstance(doc_chunk.docCreatedAt, datetime) + else doc_chunk.docCreatedAt + ) doc_info = DocumentInfo( id=doc_chunk.docId, order=doc_cnt, @@ -184,55 +124,15 @@ class QAExecutor(BaseExecutor): extension=doc_chunk.docExtension, abstract=doc_chunk.docAbstract, size=doc_chunk.docSize, - created_at=doc_chunk.docCreatedAt.timestamp() if isinstance(doc_chunk.docCreatedAt, datetime) else doc_chunk.docCreatedAt, + created_at=created_at_value, ) doc_info_list.append(doc_info) doc_id_map[doc_chunk.docId] = doc_cnt - doc_index = doc_id_map[doc_chunk.docId] - - if bac_info: - bac_info += "\n\n" - bac_info += f"""""" - - for chunk in doc_chunk.chunks: - if remaining_tokens <= 0: - break - chunk_text = chunk.text - chunk_text = TokenCalculator().get_k_tokens_words_from_content( - content=chunk_text, k=remaining_tokens, - ) - remaining_tokens -= TokenCalculator().calculate_token_length(messages=[ - {"role": "user", "content": ""}, - {"role": "user", "content": chunk_text}, - {"role": "user", "content": ""}, - ], pure_text=True) - bac_info += f""" - - {chunk_text} - - """ - bac_info += "" - return bac_info, doc_info_list - async def _push_rag_doc(self, doc: DocumentInfo, document_order: int = 1) -> None: - """推送RAG使用的文档信息""" - await self._push_message( - event_type=EventType.DOCUMENT_ADD.value, - data=DocumentAddContent( - documentId=str(doc.id), - documentOrder=doc.order, - documentAuthor=doc.author, - documentName=doc.name, - documentAbstract=doc.abstract, - documentType=doc.extension, - documentSize=doc.size, - createdAt=doc.created_at, - ).model_dump(exclude_none=True, by_alias=True), - ) + return doc_info_list async def run(self) -> None: """运行QA""" - # 运行RAG步骤 rag_exec = StepExecutor( msg_queue=self.msg_queue, task=self.task, @@ -246,9 +146,23 @@ class QAExecutor(BaseExecutor): question=self.question, llm=self.llm, ) + await rag_exec.init() + await rag_exec.run() + + # 解析并推送文档信息 + if first_chunk and isinstance(first_chunk.content, dict): + rag_output = RAGOutput.model_validate(first_chunk.content) + doc_chunk_list: list[DocItem] = [ + DocItem.model_validate(item) if not isinstance(item, DocItem) else item + for item in rag_output.corpus + ] + doc_info_list = await self._assemble_doc_info(doc_chunk_list, 8192) + for doc_info in doc_info_list: + await self._push_rag_doc(doc_info) # 保存答案 + full_answer = "" self.task.runtime.fullAnswer = full_answer self.task.runtime.fullTime = round(datetime.now(UTC).timestamp(), 2) - self.task.runtime.time diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index ce664d21..a6470955 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any from pydantic import ConfigDict -from apps.models import ExecutorHistory, StepStatus +from apps.models import ExecutorHistory, StepStatus, StepType from apps.scheduler.call.core import CoreCall from apps.scheduler.call.empty import Empty from apps.scheduler.call.facts.facts import FactsCall @@ -23,7 +23,7 @@ from apps.schemas.enum_var import ( SpecialCallType, ) from apps.schemas.message import TextAddContent -from apps.schemas.scheduler import CallError, CallOutputChunk +from apps.schemas.scheduler import CallError, CallOutputChunk, ExecutorBackground from apps.services import NodeManager from .base import BaseExecutor @@ -38,6 +38,7 @@ class StepExecutor(BaseExecutor): """工作流中步骤相关函数""" step: "StepQueueItem" + background: "ExecutorBackground" model_config = ConfigDict( arbitrary_types_allowed=True, @@ -90,7 +91,7 @@ class StepExecutor(BaseExecutor): # State写入ID和运行状态 self.task.state.stepId = self.step.step_id - self.task.state.stepDescription = self.step.step.description + self.task.state.stepType = StepType(self.step.step.type) self.task.state.stepName = self.step.step.name # 获取并验证Call类 @@ -264,7 +265,7 @@ class StepExecutor(BaseExecutor): executorStatus=self.task.state.executorStatus, stepId=self.step.step_id, stepName=self.step.step.name, - stepDescription=self.step.step.description, + stepType=StepType(self.step.step.type), stepStatus=self.task.state.stepStatus, inputData=self.obj.input, outputData=output_data, diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index 03438416..ddaddba8 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -23,26 +23,6 @@ logger = logging.getLogger(__name__) async def save_data(scheduler: "Scheduler") -> None: """保存当前Executor、Task、Record等的数据""" - # 构造RecordContent - used_docs = [] - order_to_id = {} - for docs in task.runtime.documents: - used_docs.append( - RecordGroupDocument( - _id=docs["id"], - author=docs.get("author", ""), - order=docs.get("order", 0), - name=docs["name"], - abstract=docs.get("abstract", ""), - extension=docs.get("extension", ""), - size=docs.get("size", 0), - associated="answer", - created_at=docs.get("created_at", round(datetime.now(UTC).timestamp(), 3)), - ), - ) - if docs.get("order") is not None: - order_to_id[docs["order"]] = docs["id"] - foot_note_pattern = re.compile(r"\[\[(\d+)\]\]") footnote_list = [] offset = 0 diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index bdc5bc6f..55b312b9 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -11,8 +11,8 @@ from jinja2.sandbox import SandboxedEnvironment from apps.common.queue import MessageQueue from apps.llm import Embedding, FunctionLLM, JsonGenerator, ReasoningLLM -from apps.models import AppType, ExecutorStatus, Task, TaskRuntime, User -from apps.scheduler.executor import FlowExecutor, MCPAgentExecutor +from apps.models import AppType, Conversation, ExecutorStatus, Task, TaskRuntime, User +from apps.scheduler.executor import FlowExecutor, MCPAgentExecutor, QAExecutor from apps.scheduler.pool.pool import Pool from apps.schemas.enum_var import EventType from apps.schemas.message import ( @@ -20,11 +20,12 @@ from apps.schemas.message import ( InitContentFeature, ) from apps.schemas.request_data import RequestData -from apps.schemas.scheduler import ExecutorBackground, LLMConfig, TopFlow +from apps.schemas.scheduler import LLMConfig, TopFlow from apps.schemas.task import TaskData from apps.services import ( Activity, AppCenterManager, + ConversationManager, KnowledgeBaseManager, LLMManager, TaskManager, @@ -33,7 +34,7 @@ from apps.services import ( from .prompt import FLOW_SELECT -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class Scheduler: @@ -64,14 +65,14 @@ class Scheduler: user = await UserManager.get_user(user_sub) if not user: err = f"[Scheduler] 用户 {user_sub} 不存在" - logger.error(err) + _logger.error(err) raise RuntimeError(err) self.user = user # 获取Task task = await TaskManager.get_task_data_by_task_id(task_id) if not task: - logger.info("[Scheduler] 新建任务") + _logger.info("[Scheduler] 新建任务") task = TaskData( metadata=Task( id=task_id, @@ -92,6 +93,7 @@ class Scheduler: autoescape=False, trim_blocks=True, lstrip_blocks=True, + extensions=["jinja2.ext.loopcontrols"], ) # LLM @@ -140,16 +142,16 @@ class Scheduler: is_active = await Activity.is_active(user_sub) if not is_active: - logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_sub) + _logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_sub) kill_event.set() break # 控制检查频率 await asyncio.sleep(check_interval) except asyncio.CancelledError: - logger.info("[Scheduler] 活动监控任务已取消") + _logger.info("[Scheduler] 活动监控任务已取消") except Exception: - logger.exception("[Scheduler] 活动监控过程中发生错误") + _logger.exception("[Scheduler] 活动监控过程中发生错误") kill_event.set() @@ -159,18 +161,18 @@ class Scheduler: reasoning_llm = await LLMManager.get_llm(reasoning_llm_id) if not reasoning_llm: err = "[Scheduler] 获取问答用大模型ID失败" - logger.error(err) + _logger.error(err) raise ValueError(err) reasoning_llm = ReasoningLLM(reasoning_llm) # 获取功能性的大模型信息 function_llm = None if not self.user.functionLLM: - logger.error("[Scheduler] 用户 %s 没有设置函数调用大模型,相关功能将被禁用", self.user.userSub) + _logger.error("[Scheduler] 用户 %s 没有设置函数调用大模型,相关功能将被禁用", self.user.userSub) else: function_llm = await LLMManager.get_llm(self.user.functionLLM) if not function_llm: - logger.error( + _logger.error( "[Scheduler] 用户 %s 设置的函数调用大模型ID %s 不存在,相关功能将被禁用", self.user.userSub, self.user.functionLLM, ) @@ -179,11 +181,11 @@ class Scheduler: embedding_llm = None if not self.user.embeddingLLM: - logger.error("[Scheduler] 用户 %s 没有设置向量模型,相关功能将被禁用", self.user.userSub) + _logger.error("[Scheduler] 用户 %s 没有设置向量模型,相关功能将被禁用", self.user.userSub) else: embedding_llm = await LLMManager.get_llm(self.user.embeddingLLM) if not embedding_llm: - logger.error( + _logger.error( "[Scheduler] 用户 %s 设置的向量模型ID %s 不存在,相关功能将被禁用", self.user.userSub, self.user.embeddingLLM, ) @@ -201,22 +203,22 @@ class Scheduler: """获取Top1 Flow""" if not self.llm.function: err = "[Scheduler] 未设置Function模型" - logger.error(err) + _logger.error(err) raise RuntimeError(err) # 获取所选应用的所有Flow if not self.post_body.app or not self.post_body.app.app_id: err = "[Scheduler] 未选择应用" - logger.error(err) + _logger.error(err) raise RuntimeError(err) flow_list = await Pool().get_flow_metadata(self.post_body.app.app_id) if not flow_list: err = "[Scheduler] 未找到应用中合法的Flow" - logger.error(err) + _logger.error(err) raise RuntimeError(err) - logger.info("[Scheduler] 选择应用 %s 最合适的Flow", self.post_body.app.app_id) + _logger.info("[Scheduler] 选择应用 %s 最合适的Flow", self.post_body.app.app_id) choices = [{ "name": flow.id, "description": f"{flow.name}, {flow.description}", @@ -239,8 +241,9 @@ class Scheduler: result = TopFlow.model_validate(result_str) return result.choice + async def create_new_conversation( - title: str, user_sub: str, app_id: uuid.UUID | None = None, + self, title: str, user_sub: str, app_id: uuid.UUID | None = None, *, debug: bool = False, ) -> Conversation: @@ -273,7 +276,7 @@ class Scheduler: async def run(self) -> None: """运行调度器""" # 如果是智能问答,直接执行 - logger.info("[Scheduler] 开始执行") + _logger.info("[Scheduler] 开始执行") # 创建用于通信的事件 kill_event = asyncio.Event() monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.metadata.userSub)) @@ -282,25 +285,17 @@ class Scheduler: if self.post_body.app and self.post_body.app.app_id: rag_method = False - if self.task.state.appId: - rag_method = False - if rag_method: kb_ids = await KnowledgeBaseManager.get_selected_kb(self.task.metadata.userSub) await self._push_init_message(3, is_flow=False) - rag_data = RAGQueryReq( - kbIds=kb_ids, - query=self.post_body.question, - tokensLimit=self.llm.reasoning.config.maxToken, - ) # 启动监控任务和主任务 - main_task = asyncio.create_task(push_rag_message( + main_task = asyncio.create_task(self._run_qa( self.task, self.queue, self.task.ids.user_sub, llm, history, doc_ids, rag_data)) else: # 查找对应的App元数据 app_data = await AppCenterManager.fetch_app_data_by_id(self.post_body.app.app_id) if not app_data: - logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id) + _logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id) await self.queue.close() return @@ -311,8 +306,6 @@ class Scheduler: else: # Agent 应用 is_flow = False - # 需要执行Flow - self.task = await push_init_message(self.task, self.queue, app_data.history_len, is_flow=is_flow) # 启动监控任务和主任务 main_task = asyncio.create_task(self._run_agent(self.queue, self.post_body, executor_background)) # 等待任一任务完成 @@ -321,20 +314,20 @@ class Scheduler: return_when=asyncio.FIRST_COMPLETED, ) - # 如果是监控任务触发,终止主任务 + # 如果用户手动终止,则cancel主任务 if kill_event.is_set(): - logger.warning("[Scheduler] 用户活动状态检测不活跃,正在终止工作流执行...") + _logger.warning("[Scheduler] 用户取消执行,正在终止...") main_task.cancel() if self.task.state.executorStatus in [ExecutorStatus.RUNNING, ExecutorStatus.WAITING]: self.task.state.executorStatus = ExecutorStatus.CANCELLED try: await main_task - logger.info("[Scheduler] 工作流执行已被终止") + _logger.info("[Scheduler] 工作流执行已被终止") except Exception: - logger.exception("[Scheduler] 终止工作流时发生错误") + _logger.exception("[Scheduler] 终止工作流时发生错误") # 更新Task,发送结束消息 - logger.info("[Scheduler] 发送结束消息") + _logger.info("[Scheduler] 发送结束消息") await self.queue.push_output(self.task, event_type=EventType.DONE.value, data={}) # 关闭Queue await self.queue.close() @@ -343,42 +336,48 @@ class Scheduler: async def _run_qa(self) -> None: - pass + qa_executor = QAExecutor( + task=self.task, + msg_queue=self.queue, + question=self.post_body.question, + llm=self.llm, + ) + _logger.info("[Scheduler] 开始智能问答") + await qa_executor.init() + await qa_executor.run() + self.task = qa_executor.task async def _run_flow(self) -> None: # 获取应用信息 if not self.post_body.app or not self.post_body.app.app_id: - logger.error("[Scheduler] 未选择应用") + _logger.error("[Scheduler] 未选择应用") return - logger.info("[Scheduler] 获取工作流元数据") + _logger.info("[Scheduler] 获取工作流元数据") flow_info = await Pool().get_flow_metadata(self.post_body.app.app_id) # 如果flow_info为空,则直接返回 if not flow_info: - logger.error("[Scheduler] 未找到工作流元数据") + _logger.error("[Scheduler] 未找到工作流元数据") return # 如果用户选了特定的Flow - if self.post_body.app.flow_id: - logger.info("[Scheduler] 获取工作流定义") - flow_id = self.post_body.app.flow_id - flow_data = await Pool().get_flow(self.post_body.app.app_id, flow_id) + if not self.post_body.app.flow_id: + _logger.info("[Scheduler] 选择最合适的流") + flow_id = await self.get_top_flow() else: # 如果用户没有选特定的Flow,则根据语义选择一个Flow - logger.info("[Scheduler] 选择最合适的流") - flow_id = await self.get_top_flow() - logger.info("[Scheduler] 获取工作流定义") - flow_data = await Pool().get_flow(self.post_body.app.app_id, flow_id) + flow_id = self.post_body.app.flow_id + _logger.info("[Scheduler] 获取工作流定义") + flow_data = await Pool().get_flow(self.post_body.app.app_id, flow_id) # 如果flow_data为空,则直接返回 if not flow_data: - logger.error("[Scheduler] 未找到工作流定义") + _logger.error("[Scheduler] 未找到工作流定义") return # 初始化Executor - logger.info("[Scheduler] 初始化Executor") flow_exec = FlowExecutor( flow_id=flow_id, flow=flow_data, @@ -390,33 +389,53 @@ class Scheduler: ) # 开始运行 - logger.info("[Scheduler] 运行Executor") + _logger.info("[Scheduler] 运行工作流执行器") await flow_exec.init() await flow_exec.run() self.task = flow_exec.task - async def _run_agent( - self, queue: MessageQueue, post_body: RequestData, background: ExecutorBackground, - ) -> None: + async def _run_agent(self) -> None: """构造Executor并执行""" + # 获取应用信息 + if not self.post_body.app or not self.post_body.app.app_id: + _logger.error("[Scheduler] 未选择MCP应用") + return + # 初始化Executor agent_exec = MCPAgentExecutor( task=self.task, - msg_queue=queue, - question=post_body.question, - history_len=self.post_body.app.history_len if hasattr(self.post_body.app, 'history_len') else 3, - background=background, + msg_queue=self.queue, + question=self.post_body.question, agent_id=self.post_body.app.app_id, - params=self.post_body.params if hasattr(self.post_body, 'params') else {}, + params=self.post_body.app.params, llm=self.llm, ) # 开始运行 - logger.info("[Scheduler] 运行Executor") + _logger.info("[Scheduler] 运行MCP执行器") + await agent_exec.init() await agent_exec.run() self.task = agent_exec.task async def _save_task(self) -> None: """保存Task""" - await TaskManager.save_task(self.task) + # 构造RecordContent + used_docs = [] + order_to_id = {} + for docs in task.runtime.documents: + used_docs.append( + RecordGroupDocument( + _id=docs["id"], + author=docs.get("author", ""), + order=docs.get("order", 0), + name=docs["name"], + abstract=docs.get("abstract", ""), + extension=docs.get("extension", ""), + size=docs.get("size", 0), + associated="answer", + created_at=docs.get("created_at", round(datetime.now(UTC).timestamp(), 3)), + ), + ) + if docs.get("order") is not None: + order_to_id[docs["order"]] = docs["id"] -- Gitee From 7094c1defd4d71fe2357d59fad0f987899c91c12 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 30 Sep 2025 17:39:04 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- manual/source/conf.py | 6 ++ manual/source/index.rst | 1 + manual/source/models/index.rst | 1 - manual/source/models/lance.rst | 21 ----- manual/source/service/activity.rst | 118 ++++++++++++++++++++++++++ manual/source/service/index.rst | 9 ++ manual/source/service/tag.rst | 130 +++++++++++++++++++++++++++++ pyproject.toml | 1 + 8 files changed, 265 insertions(+), 22 deletions(-) delete mode 100644 manual/source/models/lance.rst create mode 100644 manual/source/service/activity.rst create mode 100644 manual/source/service/index.rst create mode 100644 manual/source/service/tag.rst diff --git a/manual/source/conf.py b/manual/source/conf.py index 5b61cf42..0505fdb6 100644 --- a/manual/source/conf.py +++ b/manual/source/conf.py @@ -11,6 +11,7 @@ release = "0.10.0" extensions = [ "sphinx.ext.autodoc", "sphinxcontrib.autodoc_pydantic", + "sphinxcontrib.plantuml", ] templates_path = ["_templates"] @@ -26,3 +27,8 @@ autodoc_pydantic_model_show_field_summary = True autodoc_pydantic_field_show_default = True sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +plantuml_output_format = "svg" +plantuml = ( + r"java -jar C:\Users\ObjectNF\scoop\apps\plantuml\current\plantuml.jar" +) diff --git a/manual/source/index.rst b/manual/source/index.rst index 676840dd..62bfa69b 100644 --- a/manual/source/index.rst +++ b/manual/source/index.rst @@ -8,3 +8,4 @@ openEuler Intelligence Framework 文档 pool/index models/index call/index + service/index diff --git a/manual/source/models/index.rst b/manual/source/models/index.rst index d93bd139..f0e0f2cd 100644 --- a/manual/source/models/index.rst +++ b/manual/source/models/index.rst @@ -10,5 +10,4 @@ .. toctree:: :maxdepth: 2 - lance mongo diff --git a/manual/source/models/lance.rst b/manual/source/models/lance.rst deleted file mode 100644 index 43bf10a4..00000000 --- a/manual/source/models/lance.rst +++ /dev/null @@ -1,21 +0,0 @@ -############## -LanceDB 数据库 -############## - -LanceDB 是一个基于矢量数据库的存储系统,用于存储和检索向量化的数据。 - -LanceDB 的详细介绍请参考 `LanceDB 官方文档 `__ ,其特性如下: - -- Serverless,无需安装和部署服务 -- 支持大规模数据存储和检索 - - -************* -LanceDB连接器 -************* - -.. autoclass:: apps.models.lance.LanceDB - :members: - :undoc-members: - :private-members: - :inherited-members: diff --git a/manual/source/service/activity.rst b/manual/source/service/activity.rst new file mode 100644 index 00000000..0de41d17 --- /dev/null +++ b/manual/source/service/activity.rst @@ -0,0 +1,118 @@ +############################ +Activity 模块(限流与并发) +############################ + +.. currentmodule:: apps.services.activity + +概述 +==== + +``Activity`` 模块负责用户请求的限流与全局并发控制,提供以下能力: + +- 单用户滑动窗口限流:限制单个用户在指定时间窗口内的请求数量 +- 全局并发限制:控制系统同时执行的任务总数 +- 活动状态管理:提供活跃任务的登记与释放 +- 数据持久化:使用 PostgreSQL 持久化活动记录 + +核心常量 +======== + +以下常量由 ``apps.constants`` 提供,可通过配置调整: + +- ``MAX_CONCURRENT_TASKS``:全局同时运行任务上限(默认 30) +- ``SLIDE_WINDOW_QUESTION_COUNT``:滑动窗口内单用户最大请求数(默认 5) +- ``SLIDE_WINDOW_TIME``:滑动窗口时间(秒,默认 15) + +数据模型 +======== + +使用 ``SessionActivity`` 记录活跃行为: + +- ``id``:主键 ID(自增) +- ``userSub``:用户实体 ID(外键关联) +- ``timestamp``:活动时间戳(UTC) + +API 参考 +======== + +Activity 类 +----------- + +.. autoclass:: Activity + :members: + :undoc-members: + :show-inheritance: + +设计说明 +======== + +- 滑动窗口限流:统计用户在最近 ``SLIDE_WINDOW_TIME`` 秒内的请求数量,超过 ``SLIDE_WINDOW_QUESTION_COUNT`` 即判定限流。 +- 全局并发控制:通过统计 ``SessionActivity`` 的记录数判断是否达到 ``MAX_CONCURRENT_TASKS`` 上限。 +- 并发安全:依赖数据库事务保障登记与释放的原子性。 +- 时间精度:所有时间戳使用 UTC。 + +注意事项 +======== + +1. 在调用活跃登记前应先进行可用性检查。 +2. 任务完成后需及时释放活跃记录以避免占用并发额度。 +3. 合理调整常量以适配不同吞吐需求。 + + + +流程图(is_active) +=================== + +.. uml:: + + @startuml + title Activity.is_active 流程 + start + :获取当前 UTC 时间; + :开启数据库会话; + :统计 userSub 在 [now - SLIDE_WINDOW_TIME, now] 的请求数; + if (count >= SLIDE_WINDOW_QUESTION_COUNT?) then (是) + :返回 True (达到限流); + stop + else (否) + :统计当前活跃任务数量 current_active; + if (current_active >= MAX_CONCURRENT_TASKS?) then (是) + :返回 True (达到并发上限); + else (否) + :返回 False (可执行); + endif + endif + stop + @enduml + + +时序图(set_active/remove_active) +=================================== + +.. uml:: + + @startuml + title 活跃任务登记与释放时序 + actor Client + participant "Activity" as Activity + database "PostgreSQL" as PG + + == set_active == + Client -> Activity: set_active(user_sub) + Activity -> PG: 查询当前活跃数量 + PG --> Activity: current_active + alt 达到并发上限 + Activity -> Client: 抛出 ActivityError + else 未达上限 + Activity -> PG: merge SessionActivity(userSub, timestamp) + Activity -> PG: commit + Activity -> Client: 完成 + end + + == remove_active == + Client -> Activity: remove_active(user_sub) + Activity -> PG: delete SessionActivity where userSub=user_sub + Activity -> PG: commit + Activity -> Client: 完成 + + @enduml diff --git a/manual/source/service/index.rst b/manual/source/service/index.rst new file mode 100644 index 00000000..5cb76ac3 --- /dev/null +++ b/manual/source/service/index.rst @@ -0,0 +1,9 @@ +######################## +服务模块文档(Service) +######################## + +.. toctree:: + :maxdepth: 2 + + activity + tag diff --git a/manual/source/service/tag.rst b/manual/source/service/tag.rst new file mode 100644 index 00000000..6f72c9f0 --- /dev/null +++ b/manual/source/service/tag.rst @@ -0,0 +1,130 @@ +######################## +Tag 模块(用户标签) +######################## + +.. currentmodule:: apps.services.tag + +概述 +==== + +``Tag`` 模块提供用户标签的增删改查能力,统一通过数据库会话访问 ``Tag`` 与 ``UserTag`` 两类模型,实现: + +- 获取全部标签 +- 按名称查询标签 +- 按用户 ``sub`` 查询其拥有的标签集合 +- 新增标签(若存在则合并) +- 按名称更新标签定义 +- 删除标签 + +依赖 +==== + +- 数据库会话:``apps.common.postgres.postgres.session`` (异步上下文) +- 数据模型:``apps.models.Tag``、``apps.models.UserTag`` +- 入参模型:``apps.schemas.request_data.PostTagData`` (字段:``tag``、``description``) + +API 参考 +======== + +TagManager 类 +-------------- + +.. autoclass:: TagManager + :members: + :undoc-members: + :show-inheritance: + +设计说明 +======== + +- 读取路径:查询 ``Tag`` 表,或先查 ``UserTag`` 再关联回 ``Tag`` 表。 +- 写入路径:采用 ``session.merge`` 以便在新增/幂等更新间取得平衡。 +- 时间戳:更新时写入 ``updatedAt = datetime.now(tz=UTC)``。 +- 异常处理:更新/删除时若目标标签不存在,记录错误并抛出 ``ValueError``。 + +流程图(get_tag_by_user_sub) +============================= + +.. uml:: + + @startuml + title 通过 user_sub 查询用户标签流程 + start + :传入 user_sub; + :开启数据库会话; + :查询 UserTag where userSub = user_sub; + if (是否存在 user_tags?) then (是) + :遍历 user_tags; + :按 tag id 逐个查询 Tag; + :若 Tag 存在则加入结果列表; + else (否) + :返回空列表; + stop + endif + :返回标签结果列表; + stop + @enduml + +流程图(add/update/delete) +============================ + +.. uml:: + + @startuml + title 标签写操作流程(add / update / delete) + start + :开启数据库会话; + partition add_tag { + :构造 Tag(name=data.tag, definition=data.description); + :session.merge(tag); + :commit; + } + partition update_tag_by_name { + :按名称查询 Tag; + if (是否存在?) then (否) + :logger.error 并抛出 ValueError; + stop + else (是) + :更新 definition, updatedAt; + :session.merge(tag); + :commit 并返回 tag; + endif + } + partition delete_tag { + :按名称查询 Tag; + if (是否存在?) then (否) + :logger.error 并抛出 ValueError; + stop + else (是) + :session.delete(tag); + :commit; + endif + } + stop + @enduml + +时序图(get_tag_by_name / add_tag) +=================================== + +.. uml:: + + @startuml + title 查询与新增标签时序 + actor Client + participant "TagManager" as TM + database "PostgreSQL" as PG + + == get_tag_by_name == + Client -> TM: get_tag_by_name(name) + TM -> PG: SELECT Tag WHERE name = :name LIMIT 1 + PG --> TM: Tag | None + TM -> Client: 返回 Tag | None + + == add_tag == + Client -> TM: add_tag(PostTagData) + TM -> PG: MERGE Tag(name, definition) + TM -> PG: COMMIT + TM -> Client: 完成 + @enduml + + diff --git a/pyproject.toml b/pyproject.toml index 857fa8a3..6aa0bd60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,4 +49,5 @@ dev = [ "ruff==0.12.5", "sphinx==8.2.3", "sphinx-rtd-theme==3.0.2", + "sphinxcontrib-plantuml>=0.31", ] -- Gitee From 716fe2f11fa171d5cd6922870ea74ff51ab4179a Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 30 Sep 2025 17:39:53 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E5=92=8C=E5=AF=BC=E5=85=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/postgres.py | 4 +--- apps/llm/embedding.py | 3 +-- apps/llm/providers/ollama.py | 2 +- apps/llm/providers/openai.py | 2 +- apps/models/base.py | 5 +++++ apps/models/llm.py | 4 ++-- apps/models/record.py | 2 +- apps/models/task.py | 16 ++++++++-------- apps/models/user.py | 4 +--- 9 files changed, 21 insertions(+), 21 deletions(-) diff --git a/apps/common/postgres.py b/apps/common/postgres.py index 74b0dcfb..6631714d 100644 --- a/apps/common/postgres.py +++ b/apps/common/postgres.py @@ -7,9 +7,7 @@ from contextlib import asynccontextmanager from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine -from apps.models import ( - Base, -) +from apps.models import Base from .config import config diff --git a/apps/llm/embedding.py b/apps/llm/embedding.py index 679683f7..61162a0b 100644 --- a/apps/llm/embedding.py +++ b/apps/llm/embedding.py @@ -103,7 +103,7 @@ class Embedding: embedding = await self.get_embedding(["测试文本"]) return len(embedding[0]) - async def _delete_vector(self) -> None: + async def delete_vector(self) -> None: """删除所有Vector表""" async with postgres.session() as session: await session.execute(text("DROP TABLE IF EXISTS framework_flow_vector")) @@ -148,7 +148,6 @@ class Embedding: async def init(self) -> None: """在使用Embedding前初始化数据库表等资源""" - await self._delete_vector() # 检测维度 dim = await self._get_embedding_dimension() await self._create_vector_table(dim) diff --git a/apps/llm/providers/ollama.py b/apps/llm/providers/ollama.py index 7d0578a7..e4935561 100644 --- a/apps/llm/providers/ollama.py +++ b/apps/llm/providers/ollama.py @@ -7,7 +7,7 @@ from typing import Any from ollama import AsyncClient, ChatResponse from typing_extensions import override -from apps.llm import TokenCalculator +from apps.llm.token import TokenCalculator from apps.models import LLMType from apps.schemas.llm import LLMChunk, LLMFunctions diff --git a/apps/llm/providers/openai.py b/apps/llm/providers/openai.py index d13e1d0b..8a6fdcc3 100644 --- a/apps/llm/providers/openai.py +++ b/apps/llm/providers/openai.py @@ -15,7 +15,7 @@ from openai.types.chat import ( ) from typing_extensions import override -from apps.llm import TokenCalculator +from apps.llm.token import TokenCalculator from apps.models import LLMType from apps.schemas.llm import LLMChunk, LLMFunctions diff --git a/apps/models/base.py b/apps/models/base.py index 5f4424f2..432e5647 100644 --- a/apps/models/base.py +++ b/apps/models/base.py @@ -1,8 +1,13 @@ """SQLAlchemy模型基类""" +from typing import Any, ClassVar + from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass class Base(MappedAsDataclass, DeclarativeBase): """SQLAlchemy模型基类""" + # 生成文档时需要启动这个参数,否则会触发重复导入告警 + __table_args__: ClassVar[dict[str, Any]] = {"extend_existing": True} + diff --git a/apps/models/llm.py b/apps/models/llm.py index 6249f5b3..422721b5 100644 --- a/apps/models/llm.py +++ b/apps/models/llm.py @@ -55,12 +55,12 @@ class LLMData(Base): """LLM最大Token数量""" temperature: Mapped[float] = mapped_column(Float, default=0.7, nullable=False) """LLM温度""" - llmType: Mapped[list[LLMType]] = mapped_column(ARRAY(Enum(LLMType)), default=[], nullable=False) # noqa: N815 + llmType: Mapped[list[LLMType]] = mapped_column(ARRAY(Enum(LLMType)), default_factory=list, nullable=False) # noqa: N815 """LLM类型""" provider: Mapped[LLMProvider] = mapped_column( Enum(LLMProvider), default=None, nullable=True, ) - extraConfig: Mapped[dict[str, Any]] = mapped_column(JSONB, default={}, nullable=False) # noqa: N815 + extraConfig: Mapped[dict[str, Any]] = mapped_column(JSONB, default_factory=dict, nullable=False) # noqa: N815 """大模型API类型""" createdAt: Mapped[DateTime] = mapped_column( # noqa: N815 DateTime, diff --git a/apps/models/record.py b/apps/models/record.py index 7b3bec8d..2bc80d84 100644 --- a/apps/models/record.py +++ b/apps/models/record.py @@ -54,7 +54,7 @@ class RecordMetadata(Base): """问答对输入token数""" outputTokens: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # noqa: N815 """问答对输出token数""" - featureSwitch: Mapped[dict[str, Any]] = mapped_column(JSONB, default={}, nullable=False) # noqa: N815 + featureSwitch: Mapped[dict[str, Any]] = mapped_column(JSONB, default_factory=dict, nullable=False) # noqa: N815 """问答对功能开关""" diff --git a/apps/models/task.py b/apps/models/task.py index 86183666..61f12b78 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -107,13 +107,13 @@ class TaskRuntime(Base): """用户输入""" fullAnswer: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 """完整输出""" - fact: Mapped[list[str]] = mapped_column(JSONB, nullable=False, default=[]) + fact: Mapped[list[str]] = mapped_column(JSONB, nullable=False, default_factory=list) """记忆""" reasoning: Mapped[str] = mapped_column(Text, nullable=False, default="") """中间推理""" - filledSlot: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + filledSlot: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815 """计划""" - document: Mapped[list[dict[uuid.UUID, Any]]] = mapped_column(JSONB, nullable=False, default={}) + document: Mapped[list[dict[uuid.UUID, Any]]] = mapped_column(JSONB, nullable=False, default_factory=dict) """关联文档""" language: Mapped[LanguageType] = mapped_column(Enum(LanguageType), nullable=False, default=LanguageType.CHINESE) """语言""" @@ -148,9 +148,9 @@ class ExecutorCheckpoint(Base): """检查点ID""" executorDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 """执行器描述""" - data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) + data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) """步骤额外数据""" - errorMessage: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + errorMessage: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815 """错误信息""" @@ -177,11 +177,11 @@ class ExecutorHistory(Base): """步骤状态""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """执行器历史ID""" - inputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + inputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815 """步骤输入数据""" - outputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + outputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815 """步骤输出数据""" - extraData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + extraData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815 """步骤额外数据""" updatedAt: Mapped[datetime] = mapped_column( # noqa: N815 DateTime(timezone=True), nullable=False, default_factory=lambda: datetime.now(UTC), diff --git a/apps/models/user.py b/apps/models/user.py index b32a3cd5..278456a3 100644 --- a/apps/models/user.py +++ b/apps/models/user.py @@ -5,7 +5,7 @@ import uuid from datetime import UTC, datetime from hashlib import sha256 -from sqlalchemy import ARRAY, BigInteger, Boolean, DateTime, Enum, ForeignKey, Integer, String +from sqlalchemy import BigInteger, Boolean, DateTime, Enum, ForeignKey, Integer, String from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Mapped, mapped_column @@ -34,8 +34,6 @@ class User(Base): String(100), default_factory=lambda: sha256(str(uuid.uuid4()).encode()).hexdigest()[:16], nullable=False, ) """用户个人令牌""" - selectedKB: Mapped[list[uuid.UUID]] = mapped_column(ARRAY(UUID), default=[], nullable=False) # noqa: N815 - """用户选择的知识库的ID""" functionLLM: Mapped[str | None] = mapped_column(String(255), default=None, nullable=True) # noqa: N815 """用户选择的函数模型ID""" embeddingLLM: Mapped[str | None] = mapped_column(String(255), default=None, nullable=True) # noqa: N815 -- Gitee From ab17053f04049b193ffc02f0b0a2eaaa920f2641 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 30 Sep 2025 17:54:47 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E6=9B=B4=E6=96=B0Pool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/flow.py | 2 + apps/scheduler/executor/qa.py | 1 - apps/scheduler/pool/loader/mcp.py | 77 ++++++++++++++++++++------- apps/scheduler/pool/loader/service.py | 49 ++++++++++++++--- apps/services/llm.py | 28 ++++++++++ 5 files changed, 131 insertions(+), 26 deletions(-) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index b15c6154..955dc65d 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -13,6 +13,7 @@ from apps.models import ( ExecutorStatus, LanguageType, StepStatus, + StepType, ) from apps.schemas.enum_var import EventType, SpecialCallType from apps.schemas.flow import Flow, Step @@ -89,6 +90,7 @@ class FlowExecutor(BaseExecutor): stepStatus=StepStatus.RUNNING, stepId=self.flow.basicConfig.startStep, stepName=self.flow.steps[self.flow.basicConfig.startStep].name, + stepType=StepType(self.flow.steps[self.flow.basicConfig.startStep].type), ) # 是否到达Flow结束终点(变量) self._reached_end: bool = False diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py index 338f7ade..e529cfc8 100644 --- a/apps/scheduler/executor/qa.py +++ b/apps/scheduler/executor/qa.py @@ -150,7 +150,6 @@ class QAExecutor(BaseExecutor): await rag_exec.run() - # 解析并推送文档信息 if first_chunk and isinstance(first_chunk.content, dict): rag_output = RAGOutput.model_validate(first_chunk.content) diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index 4e073a5a..2c9ea0d9 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -177,16 +177,11 @@ class MCPLoader(metaclass=SingletonMeta): async def cancel_all_installing_task() -> None: """取消正在安装的MCP模板任务""" template_path = MCP_PATH / "template" - logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path) + logger.info("[MCPLoader] 取消正在安装的MCP模板任务: %s", template_path) - # 遍历所有模板 - mcp_ids = [] - async for mcp_dir in template_path.iterdir(): - # 不是目录 - if not await mcp_dir.is_dir(): - logger.warning("[MCPLoader] 跳过非目录: %s", mcp_dir.as_posix()) - continue - mcp_ids.append(mcp_dir.name) + # 获取所有MCP ID + mcp_configs = await MCPLoader._get_all_mcp_configs() + mcp_ids = list(mcp_configs.keys()) async with postgres.session() as session: await session.execute( @@ -201,15 +196,14 @@ class MCPLoader(metaclass=SingletonMeta): @staticmethod - async def _init_all_template() -> None: + async def _get_all_mcp_configs() -> dict[str, MCPServerConfig]: """ - 初始化所有文件夹中的MCP模板 + 获取所有MCP模板的配置 - 遍历 ``template`` 目录下的所有MCP模板,并初始化。在Framework启动时进行此流程,确保所有MCP均可正常使用。 - 这一过程会与数据库内的条目进行对比,若发生修改,则重新创建数据库条目。 + :return: MCP ID到配置的映射 """ + mcp_configs = {} template_path = MCP_PATH / "template" - logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path) # 遍历所有模板 async for mcp_dir in template_path.iterdir(): @@ -224,12 +218,32 @@ class MCPLoader(metaclass=SingletonMeta): logger.warning("[MCPLoader] 跳过没有配置文件的MCP模板: %s", mcp_dir.as_posix()) continue - # 读取配置并加载 - config = await MCPLoader._load_config(config_path) + try: + # 读取配置 + config = await MCPLoader._load_config(config_path) + mcp_configs[mcp_dir.name] = config + except Exception as e: # noqa: BLE001 + logger.warning("[MCPLoader] 加载MCP配置失败: %s, 错误: %s", mcp_dir.name, str(e)) + continue + + return mcp_configs + + @staticmethod + async def _init_all_template() -> None: + """ + 初始化所有文件夹中的MCP模板 + + 遍历 ``template`` 目录下的所有MCP模板,并初始化。在Framework启动时进行此流程,确保所有MCP均可正常使用。 + 这一过程会与数据库内的条目进行对比,若发生修改,则重新创建数据库条目。 + """ + template_path = MCP_PATH / "template" + logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path) + mcp_configs = await MCPLoader._get_all_mcp_configs() + for mcp_id, config in mcp_configs.items(): # 初始化第一个MCP Server - logger.info("[MCPLoader] 初始化MCP模板: %s", mcp_dir.as_posix()) - await MCPLoader.init_one_template(mcp_dir.name, config) + logger.info("[MCPLoader] 初始化MCP模板: %s", mcp_id) + await MCPLoader.init_one_template(mcp_id, config) @staticmethod @@ -639,6 +653,33 @@ class MCPLoader(metaclass=SingletonMeta): await session.commit() + @staticmethod + async def set_vector(embedding_model: Embedding) -> None: + """ + 将MCP工具描述进行向量化并存入数据库 + + :param Embedding embedding_model: 嵌入模型 + :return: 无 + """ + try: + # 获取所有MCP配置 + mcp_configs = await MCPLoader._get_all_mcp_configs() + + for mcp_id, config in mcp_configs.items(): + try: + # 进行向量化 + await MCPLoader._insert_template_tool_vector(mcp_id, config, embedding_model) + logger.info("[MCPLoader] MCP工具向量化完成: %s", mcp_id) + except Exception as e: # noqa: BLE001 + logger.warning("[MCPLoader] MCP工具向量化失败: %s, 错误: %s", mcp_id, str(e)) + continue + + logger.info("[MCPLoader] 所有MCP工具向量化完成") + except Exception as e: + err = "[MCPLoader] MCP工具向量化失败" + logger.exception(err) + raise RuntimeError(err) from e + @staticmethod async def init() -> None: """ diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index def88146..671d25f1 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -6,7 +6,7 @@ import shutil import uuid from anyio import Path -from sqlalchemy import delete +from sqlalchemy import delete, select from apps.common.config import config from apps.common.postgres import postgres @@ -31,6 +31,23 @@ BASE_PATH = Path(config.deploy.data_dir) / "semantics" / "service" class ServiceLoader: """Service 加载器""" + @staticmethod + async def _load_all_services() -> list[tuple[Service, list[NodeInfo]]]: + """从数据库加载所有服务和对应的节点""" + async with postgres.session() as session: + # 查询所有服务 + services_query = select(Service) + services = list((await session.scalars(services_query)).all()) + + # 为每个服务查询对应的节点 + service_nodes = [] + for service in services: + nodes_query = select(NodeInfo).where(NodeInfo.serviceId == service.id) + nodes = list((await session.scalars(nodes_query)).all()) + service_nodes.append((service, nodes)) + + return service_nodes + @staticmethod async def load(service_id: uuid.UUID, hashes: dict[str, str]) -> None: """加载单个Service""" @@ -145,11 +162,29 @@ class ServiceLoader: await session.commit() + @staticmethod + async def set_vector(embedding_model: Embedding) -> None: + """将所有服务和节点的向量化数据存入数据库""" + service_nodes = await ServiceLoader._load_all_services() + + # 为每个服务调用现有的update_vector方法 + for service, nodes in service_nodes: + await ServiceLoader._update_vector( + nodes, + service.id, + service.description, + embedding_model, + ) @staticmethod - async def _update_vector(nodes: list[NodeInfo], metadata: ServiceMetadata, embedding_model: Embedding) -> None: + async def _update_vector( + nodes: list[NodeInfo], + service_id: uuid.UUID, + service_description: str, + embedding_model: Embedding, + ) -> None: """更新向量数据""" - service_vecs = await embedding_model.get_embedding([metadata.description]) + service_vecs = await embedding_model.get_embedding([service_description]) node_descriptions = [] for node in nodes: node_descriptions += [node.description] @@ -158,20 +193,20 @@ class ServiceLoader: async with postgres.session() as session: # 删除旧数据 await session.execute( - delete(embedding_model.ServicePoolVector).where(embedding_model.ServicePoolVector.id == metadata.id), + delete(embedding_model.ServicePoolVector).where(embedding_model.ServicePoolVector.id == service_id), ) await session.execute( - delete(embedding_model.NodePoolVector).where(embedding_model.NodePoolVector.serviceId == metadata.id), + delete(embedding_model.NodePoolVector).where(embedding_model.NodePoolVector.serviceId == service_id), ) # 插入新数据 session.add(embedding_model.ServicePoolVector( - id=metadata.id, + id=service_id, embedding=service_vecs[0], )) for vec in node_vecs: node_data = embedding_model.NodePoolVector( id=node.id, - serviceId=metadata.id, + serviceId=service_id, embedding=vec, ) session.add(node_data) diff --git a/apps/services/llm.py b/apps/services/llm.py index eab2d1b6..203474cf 100644 --- a/apps/services/llm.py +++ b/apps/services/llm.py @@ -6,7 +6,9 @@ import logging from sqlalchemy import select from apps.common.postgres import postgres +from apps.llm import Embedding from apps.models import LLMData, User +from apps.scheduler.pool.pool import Pool from apps.schemas.request_data import ( UpdateLLMReq, UpdateUserSelectedLLMReq, @@ -185,6 +187,10 @@ class LLMManager: req: UpdateUserSelectedLLMReq, ) -> None: """更新用户的默认LLM""" + # 检查embedding模型是否发生变化 + old_embedding_llm = None + new_embedding_llm = req.embeddingLLM + async with postgres.session() as session: user = (await session.scalars( select(User).where(User.userSub == user_sub), @@ -192,6 +198,28 @@ class LLMManager: if not user: err = f"[LLMManager] 用户 {user_sub} 不存在" raise ValueError(err) + + old_embedding_llm = user.embeddingLLM user.functionLLM = req.functionLLM user.embeddingLLM = req.embeddingLLM await session.commit() + + # 如果embedding模型发生变化,触发向量化过程 + if old_embedding_llm != new_embedding_llm and new_embedding_llm: + try: + # 获取新的embedding模型配置 + embedding_llm_config = await LLMManager.get_llm(new_embedding_llm) + if embedding_llm_config: + # 创建Embedding实例 + embedding_model = Embedding(embedding_llm_config) + await embedding_model.init() + + # 获取Pool实例并触发向量化 + pool = Pool() + await pool.set_vector(embedding_model) + + logger.info("[LLMManager] 用户 %s 的embedding模型已更新,向量化过程已完成", user_sub) + else: + logger.error("[LLMManager] 用户 %s 选择的embedding模型 %s 不存在", user_sub, new_embedding_llm) + except Exception: + logger.exception("[LLMManager] 用户 %s 的embedding模型向量化过程失败", user_sub) -- Gitee From 29471d66027a9cb61a1516ce5db43eac69eb2e47 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 30 Sep 2025 17:55:36 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E5=88=A0=E9=99=A4Version?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/schemas/flow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/schemas/flow.py b/apps/schemas/flow.py index 26152f0e..17007bdf 100644 --- a/apps/schemas/flow.py +++ b/apps/schemas/flow.py @@ -94,7 +94,6 @@ class MetadataBase(BaseModel): icon: str = Field(description="图标", default="") name: str = Field(description="元数据名称") description: str = Field(description="元数据描述") - version: str = Field(description="元数据版本") author: str = Field(description="创建者的用户名") hashes: dict[str, str] | None = Field(description="资源(App、Service等)下所有文件的hash值", default=None) -- Gitee