From 842104a97f5ffb8096a9ab4d7a16e224cbf95981 Mon Sep 17 00:00:00 2001 From: zxstty Date: Tue, 5 Aug 2025 14:41:59 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=AE=8C=E5=96=84app=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E7=9A=84=E6=95=B0=E6=8D=AE=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84?= =?UTF-8?q?&record=E7=9A=84rask=E5=8F=AF=E4=BB=A5=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E4=B8=BAnone?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/appcenter.py | 15 ++++++++++++--- apps/routers/mcp_service.py | 2 ++ apps/schemas/appcenter.py | 10 +++++++++- apps/schemas/record.py | 4 ++-- apps/services/mcp_service.py | 6 ++++++ 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index 0ec4db915..df540eaf2 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -9,7 +9,7 @@ from fastapi.responses import JSONResponse from apps.dependency.user import get_user, verify_user from apps.exceptions import InstancePermissionError -from apps.schemas.appcenter import AppFlowInfo, AppPermissionData +from apps.schemas.appcenter import AppFlowInfo, AppMcpServiceInfo, AppPermissionData from apps.schemas.enum_var import AppFilterType, AppType from apps.schemas.request_data import CreateAppRequest, ModFavAppRequest from apps.schemas.response_data import ( @@ -25,7 +25,7 @@ from apps.schemas.response_data import ( ResponseData, ) from apps.services.appcenter import AppCenterManager - +from apps.services.mcp_service import MCPServiceManager logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/app", @@ -214,6 +214,15 @@ async def get_application( ) for flow in app_data.flows ] + mcp_service = [] + if app_data.mcp_service: + for service in app_data.mcp_service: + mcp_collection = await MCPServiceManager.get_mcp_service(service) + mcp_service.append(AppMcpServiceInfo( + id=mcp_collection.id, + name=mcp_collection.name, + description=mcp_collection.description, + )) return JSONResponse( status_code=status.HTTP_200_OK, content=GetAppPropertyRsp( @@ -234,7 +243,7 @@ async def get_application( authorizedUsers=app_data.permission.users, ), workflows=workflows, - mcpService=app_data.mcp_service, + mcpService=mcp_service, ), ).model_dump(exclude_none=True, by_alias=True), ) diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py index de484e78b..848780e21 100644 --- a/apps/routers/mcp_service.py +++ b/apps/routers/mcp_service.py @@ -53,6 +53,7 @@ async def get_mcpservice_list( ] = SearchType.ALL, keyword: Annotated[str | None, Query(..., alias="keyword", description="搜索关键字")] = None, page: Annotated[int, Query(..., alias="page", ge=1, description="页码")] = 1, + is_active: Annotated[bool | None, Query(None, alias="isActive", description="是否激活")] = None, ) -> JSONResponse: """获取服务列表""" try: @@ -61,6 +62,7 @@ async def get_mcpservice_list( user_sub, keyword, page, + is_active ) except Exception as e: err = f"[MCPServiceCenter] 获取MCP服务列表失败: {e}" diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py index a89f39df1..a65fffb2d 100644 --- a/apps/schemas/appcenter.py +++ b/apps/schemas/appcenter.py @@ -50,6 +50,14 @@ class AppFlowInfo(BaseModel): debug: bool = Field(..., description="是否经过调试") +class AppMcpServiceInfo(BaseModel): + """应用关联的MCP服务信息""" + + id: str = Field(..., description="MCP服务ID") + name: str = Field(..., description="MCP服务名称") + description: str = Field(..., description="MCP服务简介") + + class AppData(BaseModel): """应用信息数据结构""" @@ -64,4 +72,4 @@ class AppData(BaseModel): permission: AppPermissionData = Field( default_factory=lambda: AppPermissionData(authorizedUsers=None), description="权限配置") workflows: list[AppFlowInfo] = Field(default=[], description="工作流信息列表") - mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表") + mcp_service: list[AppMcpServiceInfo] = Field(default=[], alias="mcpService", description="MCP服务id列表") diff --git a/apps/schemas/record.py b/apps/schemas/record.py index 6a394375b..144a6c572 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -130,8 +130,8 @@ class Record(RecordData): user_sub: str key: dict[str, Any] = {} - task_id: str - content: str + task_id: str | None = Field(default=None, description="任务ID") + content: str = Field(default="", description="Record内容,已加密") comment: RecordComment = Field(default=RecordComment()) flow: FlowHistory = Field( default=FlowHistory(), description="Flow执行历史信息") diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index 2c84a2114..ba510350f 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -78,6 +78,7 @@ class MCPServiceManager: user_sub: str, keyword: str | None, page: int, + is_active: bool | None = None, ) -> list[MCPServiceCardItem]: """ 获取所有MCP服务列表 @@ -89,6 +90,11 @@ class MCPServiceManager: :return: MCP服务列表 """ filters = MCPServiceManager._build_filters(search_type, keyword) + if is_active is not None: + if is_active: + filters["activated"] = {"$in": [user_sub]} + else: + filters["activated"] = {"$nin": [user_sub]} mcpservice_pools = await MCPServiceManager._search_mcpservice(filters, page) return [ MCPServiceCardItem( -- Gitee From a537bf5ea71f958fdaa47b72997ea4b8d9381b9e Mon Sep 17 00:00:00 2001 From: zxstty Date: Tue, 5 Aug 2025 14:58:39 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=8E=BB=E9=99=A4chat=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E7=9A=84new=5Ftask=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/chat.py | 2 +- apps/schemas/request_data.py | 1 - apps/services/record.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 06bc2dd78..f92efd453 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -41,7 +41,7 @@ async def init_task(post_body: RequestData, user_sub: str) -> Task: post_body.group_id = str(uuid.uuid4()) # 更改信息并刷新数据库 - if post_body.new_task: + if post_body.task_id is None: conversation = await ConversationManager.get_conversation_by_conversation_id( user_sub=user_sub, conversation_id=post_body.conversation_id, diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index 8719c2e98..8d053e1cc 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -48,7 +48,6 @@ class RequestData(BaseModel): app: RequestDataApp | None = Field(default=None, description="应用") debug: bool = Field(default=False, description="是否调试") task_id: str | None = Field(default=None, alias="taskId", description="任务ID") - new_task: bool = Field(default=True, description="是否新建任务") class QuestionBlacklistRequest(BaseModel): diff --git a/apps/services/record.py b/apps/services/record.py index 6b61f91ec..cf8373b04 100644 --- a/apps/services/record.py +++ b/apps/services/record.py @@ -142,7 +142,7 @@ class RecordManager: record_group_collection = MongoDB().get_collection("record_group") try: await record_group_collection.update_many( - {"records.flow.flow_id": {"$in": task_ids}, "records.flow.flow_status": {"$nin": [FlowStatus.ERROR.value, FlowStatus.SUCCESS.value]}}, + {"records.task_id": {"$in": task_ids}, "records.flow.flow_status": {"$nin": [FlowStatus.ERROR.value, FlowStatus.SUCCESS.value]}}, {"$set": {"records.$[elem].flow.flow_status": FlowStatus.CANCELLED}}, array_filters=[{"elem.flow.flow_id": {"$in": task_ids}}], ) -- Gitee From ed0d8fcbb1377b7aa9be9d467b2ffd5efe31816f Mon Sep 17 00:00:00 2001 From: zxstty Date: Wed, 6 Aug 2025 11:15:21 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dtask=E5=92=8Cscheduler?= =?UTF-8?q?=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/chat.py | 14 +++-- apps/routers/mcp_service.py | 2 +- apps/scheduler/executor/agent.py | 81 +++++++++++++-------------- apps/scheduler/executor/base.py | 6 +- apps/scheduler/scheduler/context.py | 1 - apps/scheduler/scheduler/scheduler.py | 20 ++----- apps/schemas/record.py | 2 +- apps/services/activity.py | 4 +- apps/services/task.py | 2 +- 9 files changed, 62 insertions(+), 70 deletions(-) diff --git a/apps/routers/chat.py b/apps/routers/chat.py index f92efd453..5e0354758 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -34,7 +34,7 @@ router = APIRouter( ) -async def init_task(post_body: RequestData, user_sub: str) -> Task: +async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> Task: """初始化Task""" # 生成group_id if not post_body.group_id: @@ -51,7 +51,9 @@ async def init_task(post_body: RequestData, user_sub: str) -> Task: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=err) task_ids = await TaskManager.delete_tasks_by_conversation_id(post_body.conversation_id) await RecordManager.update_record_flow_status_to_cancelled_by_task_ids(task_ids) - task = await TaskManager.init_new_task(user_sub=user_sub, conversation_id=post_body.conversation_id, post_body=post_body) + task = await TaskManager.init_new_task(user_sub=user_sub, session_id=session_id, post_body=post_body) + task.runtime.question = post_body.question + task.ids.group_id = post_body.group_id else: if not post_body.task_id: err = "[Chat] task_id 不可为空!" @@ -60,7 +62,7 @@ async def init_task(post_body: RequestData, user_sub: str) -> Task: return task -async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerator[str, None]: +async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]: """进行实际问答,并从MQ中获取消息""" try: await Activity.set_active(user_sub) @@ -72,7 +74,7 @@ async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerato await Activity.remove_active(user_sub) return - task = await init_task(post_body, user_sub) + task = await init_task(post_body, user_sub, session_id) # 创建queue;由Scheduler进行关闭 queue = MessageQueue() @@ -80,6 +82,7 @@ async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerato # 在单独Task中运行Scheduler,拉齐queue.get的时机 scheduler = Scheduler(task, queue, post_body) + logger.info(f"[Chat] 用户是否活跃: {await Activity.is_active(user_sub)}") scheduler_task = asyncio.create_task(scheduler.run()) # 处理每一条消息 @@ -130,6 +133,7 @@ async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerato async def chat( post_body: RequestData, user_sub: Annotated[str, Depends(get_user)], + session_id: Annotated[str, Depends(get_session)], ) -> StreamingResponse: """LLM流式对话接口""" # 问题黑名单检测 @@ -142,7 +146,7 @@ async def chat( if await Activity.is_active(user_sub): raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests") - res = chat_generator(post_body, user_sub) + res = chat_generator(post_body, user_sub, session_id) return StreamingResponse( content=res, media_type="text/event-stream", diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py index 848780e21..82fa72de1 100644 --- a/apps/routers/mcp_service.py +++ b/apps/routers/mcp_service.py @@ -53,7 +53,7 @@ async def get_mcpservice_list( ] = SearchType.ALL, keyword: Annotated[str | None, Query(..., alias="keyword", description="搜索关键字")] = None, page: Annotated[int, Query(..., alias="page", ge=1, description="页码")] = 1, - is_active: Annotated[bool | None, Query(None, alias="isActive", description="是否激活")] = None, + is_active: Annotated[bool | None, Query(..., alias="isActive", description="是否激活")] = None, ) -> JSONResponse: """获取服务列表""" try: diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 4db38587c..fc799fa16 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -28,7 +28,6 @@ from apps.schemas.message import param from apps.services.task import TaskManager from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager -from apps.services.task import TaskManager from apps.services.user import UserManager logger = logging.getLogger(__name__) @@ -55,6 +54,12 @@ class MCPAgentExecutor(BaseExecutor): description="推理大模型", ) + async def update_tokens(self) -> None: + """更新令牌数""" + self.task.tokens.input_tokens = self.resoning_llm.input_tokens + self.task.tokens.output_tokens = self.resoning_llm.output_tokens + await TaskManager.save_task(self.task.id, self.task) + async def load_state(self) -> None: """从数据库中加载FlowExecutor的状态""" logger.info("[FlowExecutor] 加载Executor状态") @@ -108,8 +113,8 @@ class MCPAgentExecutor(BaseExecutor): max_steps=self.max_steps-start_index-1, reasoning_llm=self.resoning_llm ) - self.msg_queue.push_output( - self.task, + self.update_tokens() + self.push_message( EventType.STEP_CANCEL, data={} ) @@ -123,7 +128,7 @@ class MCPAgentExecutor(BaseExecutor): for i in range(start_index, len(self.task.runtime.temporary_plans.plans)): self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4()) - async def get_tool_input_param(self, is_first: bool) -> dict[str, Any]: + async def get_tool_input_param(self, is_first: bool) -> None: if is_first: # 获取第一个输入参数 self.task.state.current_input = await MCPHost._get_first_input_params(self.tools[self.task.state.step_id], self.task.runtime.question, self.task) @@ -156,9 +161,10 @@ class MCPAgentExecutor(BaseExecutor): logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index) # 发送确认消息 confirm_message = await MCPPlanner.get_tool_risk(self.tools[self.task.state.step_id], self.task.state.current_input, "", self.resoning_llm) - self.msg_queue.push_output(self.task, EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( + self.update_tokens() + self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( exclude_none=True, by_alias=True)) - self.msg_queue.push_output(self.task, EventType.FLOW_STOP, {}) + self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.WAITING self.task.context.append( @@ -188,16 +194,15 @@ class MCPAgentExecutor(BaseExecutor): logger.error("[MCPAgentExecutor] MCP客户端未找到: %s", mcp_tool.mcp_id) self.task.state.flow_status = FlowStatus.ERROR error = "[MCPAgentExecutor] MCP客户端未找到: {}".format(mcp_tool.mcp_id) - raise Exception(error) + self.task.state.error_message = error try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) - self.msg_queue.push_output( - self.task, + self.update_tokens() + self.push_message( EventType.STEP_INPUT, self.task.state.current_input ) - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_OUTPUT, output_params ) @@ -216,7 +221,7 @@ class MCPAgentExecutor(BaseExecutor): ) self.task.state.step_status = StepStatus.SUCCESS except Exception as e: - logging.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e)) + logger.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e)) import traceback self.task.state.error_message = traceback.format_exc() self.task.state.step_status = StepStatus.ERROR @@ -230,14 +235,18 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.error_message, self.resoning_llm ) - self.msg_queue.push_output( - self.task, + self.update_tokens() + self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ "message": "当运行产生如下报错:\n" + self.task.state.error_message, "params": params_with_null } ) + self.push_message( + EventType.FLOW_STOP, + data={} + ) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.PARAM self.task.context.append( @@ -265,8 +274,7 @@ class MCPAgentExecutor(BaseExecutor): # 最后一步 self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_status = StepStatus.SUCCESS - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_SUCCESS, data={} ) @@ -276,8 +284,7 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content self.task.state.step_status = StepStatus.INIT self.task.state.current_input = {} - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_INIT, data={} ) @@ -285,8 +292,7 @@ class MCPAgentExecutor(BaseExecutor): # 没有下一步了,结束流程 self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_status = StepStatus.SUCCESS - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_SUCCESS, data={} ) @@ -296,8 +302,7 @@ class MCPAgentExecutor(BaseExecutor): """步骤执行失败后的错误处理""" self.task.state.step_status = StepStatus.ERROR self.task.state.flow_status = FlowStatus.ERROR - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_FAILED, data={} ) @@ -320,13 +325,13 @@ class MCPAgentExecutor(BaseExecutor): async def work(self) -> None: """执行当前步骤""" if self.task.state.step_status == StepStatus.INIT: - self.get_tool_input_param(is_first=True) + await self.get_tool_input_param(is_first=True) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) if not user_info.auto_execute: # 等待用户确认 await self.confirm_before_step() return - self.step.state.step_status = StepStatus.RUNNING + self.task.state.step_status = StepStatus.RUNNING elif self.task.state.step_status in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: if self.task.context[-1].step_status == StepStatus.PARAM: if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: @@ -338,24 +343,22 @@ class MCPAgentExecutor(BaseExecutor): else: self.task.state.flow_status = FlowStatus.CANCELLED self.task.state.step_status = StepStatus.CANCELLED - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_CANCEL, data={} ) - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_CANCEL, data={} ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.CANCELLED if self.task.state.step_status == StepStatus.PARAM: - self.get_tool_input_param(is_first=False) - max_retry = 5 + await self.get_tool_input_param(is_first=False) + max_retry = 5 for i in range(max_retry): if i != 0: - self.get_tool_input_param(is_first=False) + await self.get_tool_input_param(is_first=False) await self.run_step() if self.task.state.step_status == StepStatus.SUCCESS: break @@ -389,8 +392,7 @@ class MCPAgentExecutor(BaseExecutor): (await MCPHost.assemble_memory(self.task)), self.resoning_llm ): - self.msg_queue.push_output( - self.task, + self.push_message( EventType.TEXT_ADD, data=chunk ) @@ -405,32 +407,29 @@ class MCPAgentExecutor(BaseExecutor): # 初始化状态 self.task.state.flow_id = str(uuid.uuid4()) self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) - self.task.runtime.temporary_plans = await self.plan(is_replan=False) + await self.plan(is_replan=False) self.reset_step_to_index(0) TaskManager.save_task(self.task.id, self.task) self.task.state.flow_status = FlowStatus.RUNNING - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_START, data={} ) try: while self.task.state.step_index < len(self.task.runtime.temporary_plans) and \ self.task.state.flow_status == FlowStatus.RUNNING: - self.work() + await self.work() TaskManager.save_task(self.task.id, self.task) except Exception as e: logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_ERROR, data={} ) - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_FAILED, data={} ) diff --git a/apps/scheduler/executor/base.py b/apps/scheduler/executor/base.py index cf2f4e683..8dcb99c71 100644 --- a/apps/scheduler/executor/base.py +++ b/apps/scheduler/executor/base.py @@ -49,10 +49,8 @@ class BaseExecutor(BaseModel, ABC): question=self.question, params=self.task.runtime.filled, ).model_dump(exclude_none=True, by_alias=True) - elif event_type == EventType.FLOW_STOP.value: - data = {} elif event_type == EventType.TEXT_ADD.value and isinstance(data, str): - data=TextAddContent(text=data).model_dump(exclude_none=True, by_alias=True) + data = TextAddContent(text=data).model_dump(exclude_none=True, by_alias=True) if data is None: data = {} @@ -62,7 +60,7 @@ class BaseExecutor(BaseModel, ABC): await self.msg_queue.push_output( self.task, event_type=event_type, - data=data, # type: ignore[arg-type] + data=data, # type: ignore[arg-type] ) @abstractmethod diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index 3b26f42f2..dc35d4bd0 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -158,7 +158,6 @@ async def save_data(task: Task, user_sub: str, post_body: RequestData) -> None: facts=task.runtime.facts, data={}, ) - try: # 加密Record数据 encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True)) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index b81448475..13eb7ee0b 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -56,7 +56,6 @@ class Scheduler: while not kill_event.is_set(): # 检查用户活动状态 is_active = await Activity.is_active(user_sub) - if not is_active: logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_sub) kill_event.set() @@ -78,8 +77,7 @@ class Scheduler: ) if not llm_id: logger.error("[Scheduler] 获取大模型ID失败") - await self.queue.close() - return + return None if llm_id == "empty": llm = LLM( _id="empty", @@ -89,16 +87,16 @@ class Scheduler: model_name=Config().get_config().llm.model, max_tokens=Config().get_config().llm.max_tokens, ) + return llm else: llm = await LLMManager.get_llm_by_id(self.task.ids.user_sub, llm_id) if not llm: logger.error("[Scheduler] 获取大模型失败") - await self.queue.close() - return + return None + return llm except Exception: logger.exception("[Scheduler] 获取大模型失败") - await self.queue.close() - return + return None async def get_kb_ids_use_in_chat_with_rag(self) -> list[str]: """获取知识库ID列表""" @@ -106,10 +104,6 @@ class Scheduler: kb_ids = await KnowledgeBaseManager.get_kb_ids_by_conversation_id( self.task.ids.user_sub, self.task.ids.conversation_id, ) - if not kb_ids: - logger.error("[Scheduler] 获取知识库ID失败") - await self.queue.close() - return [] return kb_ids except Exception: logger.exception("[Scheduler] 获取知识库ID失败") @@ -135,10 +129,6 @@ class Scheduler: if not self.post_body.app or self.post_body.app.app_id == "": llm = await self.get_llm_use_in_chat_with_rag() kb_ids = await self.get_kb_ids_use_in_chat_with_rag() - if not llm: - logger.error("[Scheduler] 获取大模型失败") - await self.queue.close() - return self.task = await push_init_message(self.task, self.queue, 3, is_flow=False) rag_data = RAGQueryReq( kbIds=kb_ids, diff --git a/apps/schemas/record.py b/apps/schemas/record.py index 144a6c572..dbc06b106 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -94,7 +94,7 @@ class RecordData(BaseModel): id: str group_id: str = Field(alias="groupId") conversation_id: str = Field(alias="conversationId") - task_id: str = Field(alias="taskId") + task_id: str | None = Field(default=None, alias="taskId") document: list[RecordDocument] = [] flow: RecordFlow | None = None content: RecordContent diff --git a/apps/services/activity.py b/apps/services/activity.py index 299a49a64..88142b9ee 100644 --- a/apps/services/activity.py +++ b/apps/services/activity.py @@ -3,11 +3,13 @@ import uuid from datetime import UTC, datetime - +import logging from apps.common.mongo import MongoDB from apps.constants import SLIDE_WINDOW_QUESTION_COUNT, SLIDE_WINDOW_TIME from apps.exceptions import ActivityError +logger = logging.getLogger(__name__) + class Activity: """用户活动控制,限制单用户同一时间只能提问一个问题""" diff --git a/apps/services/task.py b/apps/services/task.py index 2f75a8c3a..eec4e197a 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -115,7 +115,6 @@ class TaskManager: @staticmethod async def init_new_task( - cls, user_sub: str, session_id: str | None = None, post_body: RequestData | None = None, @@ -180,6 +179,7 @@ class TaskManager: task_ids.append(task["_id"]) if task_ids: await task_collection.delete_many({"conversation_id": conversation_id}) + return task_ids except Exception: logger.exception("[TaskManager] 删除ConversationID的Task信息失败") return [] -- Gitee