diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index 603ea65b66238628c60679449925892b85d6b189..4db38587cecfd96fb909fa69f55371c1f39b3cb5 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -2,21 +2,34 @@
"""MCP Agent执行器"""
import logging
-
+import uuid
from pydantic import Field
-
+from typing import Any
+from apps.llm.patterns.rewrite import QuestionRewrite
from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.executor.base import BaseExecutor
from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus
from apps.scheduler.mcp_agent.host import MCPHost
from apps.scheduler.mcp_agent.plan import MCPPlanner
+from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector
from apps.scheduler.pool.mcp.client import MCPClient
-from apps.schemas.mcp import MCPCollection, MCPTool
-from apps.schemas.task import ExecutorState, StepQueueItem
+from apps.schemas.mcp import (
+ GoalEvaluationResult,
+ RestartStepIndex,
+ ToolRisk,
+ ErrorType,
+ ToolExcutionErrorType,
+ MCPPlan,
+ MCPCollection,
+ MCPTool
+)
+from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem
from apps.schemas.message import param
from apps.services.task import TaskManager
from apps.services.appcenter import AppCenterManager
from apps.services.mcp_service import MCPServiceManager
+from apps.services.task import TaskManager
+from apps.services.user import UserManager
logger = logging.getLogger(__name__)
@@ -31,7 +44,9 @@ class MCPAgentExecutor(BaseExecutor):
mcp_client: dict[str, MCPClient] = Field(
description="MCP客户端列表,key为mcp_id", default={}
)
- tool_list: list[MCPTool] = Field(description="MCP工具列表", default=[])
+ tools: dict[str, MCPTool] = Field(
+ description="MCP工具列表,key为tool_id", default={}
+ )
params: param | None = Field(
default=None, description="流执行过程中的参数补充", alias="params"
)
@@ -65,10 +80,372 @@ class MCPAgentExecutor(BaseExecutor):
self.mcp_list.append(mcp_service)
self.mcp_client[mcp_id] = await MCPHost.get_client(self.task.ids.user_sub, mcp_id)
- self.tool_list.extend(mcp_service.tools)
+ for tool in mcp_service.tools:
+ self.tools[tool.id] = tool
+
+ async def plan(self, is_replan: bool = False, start_index: int | None = None) -> None:
+ if is_replan:
+ error_message = "之前的计划遇到以下报错\n\n"+self.task.state.error_message
+ else:
+ error_message = "初始化计划"
+ tools = MCPSelector.select_top_tool(
+ self.task.runtime.question, list(self.tools.values()),
+ additional_info=error_message, top_n=40)
+ if is_replan:
+ logger.info("[MCPAgentExecutor] 重新规划流程")
+ if not start_index:
+ start_index = await MCPPlanner.get_replan_start_step_index(self.task.runtime.question,
+ self.task.state.error_message,
+ self.task.runtime.temporary_plans,
+ self.resoning_llm)
+ current_plan = self.task.runtime.temporary_plans.plans[start_index:]
+ error_message = self.task.state.error_message
+ temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question,
+ is_replan=is_replan,
+ error_message=error_message,
+ current_plan=current_plan,
+ tool_list=tools,
+ max_steps=self.max_steps-start_index-1,
+ reasoning_llm=self.resoning_llm
+ )
+ self.msg_queue.push_output(
+ self.task,
+ EventType.STEP_CANCEL,
+ data={}
+ )
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ self.task.context[-1].step_status = StepStatus.CANCELLED
+ self.task.runtime.temporary_plans = self.task.runtime.temporary_plans.plans[:start_index] + temporary_plans.plans
+ self.task.state.step_index = start_index
+ else:
+ start_index = 0
+ self.task.runtime.temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, tool_list=tools, max_steps=self.max_steps, reasoning_llm=self.resoning_llm)
+ for i in range(start_index, len(self.task.runtime.temporary_plans.plans)):
+ self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4())
+
+ async def get_tool_input_param(self, is_first: bool) -> dict[str, Any]:
+ if is_first:
+ # 获取第一个输入参数
+ self.task.state.current_input = await MCPHost._get_first_input_params(self.tools[self.task.state.step_id], self.task.runtime.question, self.task)
+ else:
+ # 获取后续输入参数
+ if isinstance(self.params, param):
+ params = self.params.content
+ params_description = self.params.description
+ else:
+ params = {}
+ params_description = ""
+ self.task.state.current_input = await MCPHost._fill_params(self.tools[self.task.state.step_id], self.task.state.current_input, self.task.state.error_message, params, params_description)
+
+ async def reset_step_to_index(self, start_index: int) -> None:
+ """重置步骤到开始"""
+ logger.info("[MCPAgentExecutor] 重置步骤到索引 %d", start_index)
+ if self.task.runtime.temporary_plans:
+ self.task.state.flow_status = FlowStatus.RUNNING
+ self.task.state.step_id = self.task.runtime.temporary_plans.plans[start_index].step_id
+ self.task.state.step_index = 0
+ self.task.state.step_name = self.task.runtime.temporary_plans.plans[start_index].tool
+ self.task.state.step_description = self.task.runtime.temporary_plans.plans[start_index].content
+ self.task.state.step_status = StepStatus.RUNNING
+ self.task.state.retry_times = 0
+ else:
+ self.task.state.flow_status = FlowStatus.SUCCESS
+ self.task.state.step_id = FINAL_TOOL_ID
+
+ async def confirm_before_step(self) -> None:
+ logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index)
+ # 发送确认消息
+ confirm_message = await MCPPlanner.get_tool_risk(self.tools[self.task.state.step_id], self.task.state.current_input, "", self.resoning_llm)
+ self.msg_queue.push_output(self.task, EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(
+ exclude_none=True, by_alias=True))
+ self.msg_queue.push_output(self.task, EventType.FLOW_STOP, {})
+ self.task.state.flow_status = FlowStatus.WAITING
+ self.task.state.step_status = StepStatus.WAITING
+ self.task.context.append(
+ FlowStepHistory(
+ step_id=self.task.state.step_id,
+ step_name=self.task.state.step_name,
+ step_description=self.task.state.step_description,
+ step_status=self.task.state.step_status,
+ flow_id=self.task.state.flow_id,
+ flow_name=self.task.state.flow_name,
+ flow_status=self.task.state.flow_status,
+ input_data={},
+ output_data={},
+ ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True),
+ )
+ )
+
+ async def run_step(self):
+ """执行步骤"""
+ self.task.state.flow_status = FlowStatus.RUNNING
+ self.task.state.step_status = StepStatus.RUNNING
+ logger.info("[MCPAgentExecutor] 执行步骤 %d", self.task.state.step_index)
+ # 获取MCP客户端
+ mcp_tool = self.tools[self.task.state.step_id]
+ mcp_client = self.mcp_client[mcp_tool.mcp_id]
+ if not mcp_client:
+ logger.error("[MCPAgentExecutor] MCP客户端未找到: %s", mcp_tool.mcp_id)
+ self.task.state.flow_status = FlowStatus.ERROR
+ error = "[MCPAgentExecutor] MCP客户端未找到: {}".format(mcp_tool.mcp_id)
+ raise Exception(error)
+ try:
+ output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input)
+ self.msg_queue.push_output(
+ self.task,
+ EventType.STEP_INPUT,
+ self.task.state.current_input
+ )
+ self.msg_queue.push_output(
+ self.task,
+ EventType.STEP_OUTPUT,
+ output_params
+ )
+ self.task.context.append(
+ FlowStepHistory(
+ step_id=self.task.state.step_id,
+ step_name=self.task.state.step_name,
+ step_description=self.task.state.step_description,
+ step_status=StepStatus.SUCCESS,
+ flow_id=self.task.state.flow_id,
+ flow_name=self.task.state.flow_name,
+ flow_status=self.task.state.flow_status,
+ input_data=self.task.state.current_input,
+ output_data=output_params,
+ )
+ )
+ self.task.state.step_status = StepStatus.SUCCESS
+ except Exception as e:
+ logging.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e))
+ import traceback
+ self.task.state.error_message = traceback.format_exc()
+ self.task.state.step_status = StepStatus.ERROR
+
+ async def generate_params_with_null(self) -> None:
+ """生成参数补充"""
+ mcp_tool = self.tools[self.task.state.step_id]
+ params_with_null = await MCPPlanner.get_missing_param(
+ mcp_tool,
+ self.task.state.current_input,
+ self.task.state.error_message,
+ self.resoning_llm
+ )
+ self.msg_queue.push_output(
+ self.task,
+ EventType.STEP_WAITING_FOR_PARAM,
+ data={
+ "message": "当运行产生如下报错:\n" + self.task.state.error_message,
+ "params": params_with_null
+ }
+ )
+ self.task.state.flow_status = FlowStatus.WAITING
+ self.task.state.step_status = StepStatus.PARAM
+ self.task.context.append(
+ FlowStepHistory(
+ step_id=self.task.state.step_id,
+ step_name=self.task.state.step_name,
+ step_description=self.task.state.step_description,
+ step_status=self.task.state.step_status,
+ flow_id=self.task.state.flow_id,
+ flow_name=self.task.state.flow_name,
+ flow_status=self.task.state.flow_status,
+ input_data={},
+ output_data={},
+ ex_data={
+ "message": "当运行产生如下报错:\n" + self.task.state.error_message,
+ "params": params_with_null
+ }
+ )
+ )
+
+ async def get_next_step(self) -> None:
+ self.task.state.step_index += 1
+ if self.task.state.step_index < len(self.task.runtime.temporary_plans):
+ if self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id == FINAL_TOOL_ID:
+ # 最后一步
+ self.task.state.flow_status = FlowStatus.SUCCESS
+ self.task.state.step_status = StepStatus.SUCCESS
+ self.msg_queue.push_output(
+ self.task,
+ EventType.FLOW_SUCCESS,
+ data={}
+ )
+ return
+ self.task.state.step_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id
+ self.task.state.step_name = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
+ self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content
+ self.task.state.step_status = StepStatus.INIT
+ self.task.state.current_input = {}
+ self.msg_queue.push_output(
+ self.task,
+ EventType.STEP_INIT,
+ data={}
+ )
+ else:
+ # 没有下一步了,结束流程
+ self.task.state.flow_status = FlowStatus.SUCCESS
+ self.task.state.step_status = StepStatus.SUCCESS
+ self.msg_queue.push_output(
+ self.task,
+ EventType.FLOW_SUCCESS,
+ data={}
+ )
+ return
+
+ async def error_handle_after_step(self) -> None:
+ """步骤执行失败后的错误处理"""
+ self.task.state.step_status = StepStatus.ERROR
+ self.task.state.flow_status = FlowStatus.ERROR
+ self.msg_queue.push_output(
+ self.task,
+ EventType.FLOW_FAILED,
+ data={}
+ )
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ del self.task.context[-1]
+ self.task.context.append(
+ FlowStepHistory(
+ step_id=self.task.state.step_id,
+ step_name=self.task.state.step_name,
+ step_description=self.task.state.step_description,
+ step_status=self.task.state.step_status,
+ flow_id=self.task.state.flow_id,
+ flow_name=self.task.state.flow_name,
+ flow_status=self.task.state.flow_status,
+ input_data={},
+ output_data={},
+ )
+ )
+
+ async def work(self) -> None:
+ """执行当前步骤"""
+ if self.task.state.step_status == StepStatus.INIT:
+ self.get_tool_input_param(is_first=True)
+ user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
+ if not user_info.auto_execute:
+ # 等待用户确认
+ await self.confirm_before_step()
+ return
+ self.step.state.step_status = StepStatus.RUNNING
+ elif self.task.state.step_status in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]:
+ if self.task.context[-1].step_status == StepStatus.PARAM:
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ del self.task.context[-1]
+ elif self.task.state.step_status == StepStatus.WAITING:
+ if self.params.content:
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ del self.task.context[-1]
+ else:
+ self.task.state.flow_status = FlowStatus.CANCELLED
+ self.task.state.step_status = StepStatus.CANCELLED
+ self.msg_queue.push_output(
+ self.task,
+ EventType.STEP_CANCEL,
+ data={}
+ )
+ self.msg_queue.push_output(
+ self.task,
+ EventType.FLOW_CANCEL,
+ data={}
+ )
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ self.task.context[-1].step_status = StepStatus.CANCELLED
+ if self.task.state.step_status == StepStatus.PARAM:
+ self.get_tool_input_param(is_first=False)
+ max_retry = 5
+ for i in range(max_retry):
+ if i != 0:
+ self.get_tool_input_param(is_first=False)
+ await self.run_step()
+ if self.task.state.step_status == StepStatus.SUCCESS:
+ break
+ elif self.task.state.step_status == StepStatus.ERROR:
+ # 错误处理
+ if self.task.state.retry_times >= 3:
+ await self.error_handle_after_step()
+ else:
+ user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
+ mcp_tool = self.tools[self.task.state.step_id]
+ error_type = await MCPPlanner.get_tool_execute_error_type(
+ self.task.runtime.question,
+ self.task.runtime.temporary_plans,
+ mcp_tool,
+ self.task.state.current_input,
+ self.task.state.error_message,
+ self.resoning_llm
+ )
+ if error_type.type == ErrorType.DECORRECT_PLAN or user_info.auto_execute:
+ await self.plan(is_replan=True)
+ self.reset_step_to_index(self.task.state.step_index)
+ elif error_type.type == ErrorType.MISSING_PARAM:
+ await self.generate_params_with_null()
+ elif self.task.state.step_status == StepStatus.SUCCESS:
+ await self.get_next_step()
+
+ async def summarize(self) -> None:
+ async for chunk in MCPPlanner.generate_answer(
+ self.task.runtime.question,
+ self.task.runtime.temporary_plans,
+ (await MCPHost.assemble_memory(self.task)),
+ self.resoning_llm
+ ):
+ self.msg_queue.push_output(
+ self.task,
+ EventType.TEXT_ADD,
+ data=chunk
+ )
+ self.task.runtime.answer += chunk
async def run(self) -> None:
"""执行MCP Agent的主逻辑"""
# 初始化MCP服务
self.load_state()
self.load_mcp()
+ if self.task.state.flow_status == FlowStatus.INIT:
+ # 初始化状态
+ self.task.state.flow_id = str(uuid.uuid4())
+ self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm)
+ self.task.runtime.temporary_plans = await self.plan(is_replan=False)
+ self.reset_step_to_index(0)
+ TaskManager.save_task(self.task.id, self.task)
+ self.task.state.flow_status = FlowStatus.RUNNING
+ self.msg_queue.push_output(
+ self.task,
+ EventType.FLOW_START,
+ data={}
+ )
+ try:
+ while self.task.state.step_index < len(self.task.runtime.temporary_plans) and \
+ self.task.state.flow_status == FlowStatus.RUNNING:
+ self.work()
+ TaskManager.save_task(self.task.id, self.task)
+ except Exception as e:
+ logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e))
+ self.task.state.flow_status = FlowStatus.ERROR
+ self.task.state.error_message = str(e)
+ self.task.state.step_status = StepStatus.ERROR
+ self.msg_queue.push_output(
+ self.task,
+ EventType.STEP_ERROR,
+ data={}
+ )
+ self.msg_queue.push_output(
+ self.task,
+ EventType.FLOW_FAILED,
+ data={}
+ )
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ del self.task.context[-1]
+ self.task.context.append(
+ FlowStepHistory(
+ step_id=self.task.state.step_id,
+ step_name=self.task.state.step_name,
+ step_description=self.task.state.step_description,
+ step_status=self.task.state.step_status,
+ flow_id=self.task.state.flow_id,
+ flow_name=self.task.state.flow_name,
+ flow_status=self.task.state.flow_status,
+ input_data={},
+ output_data={},
+ )
+ )
diff --git a/apps/scheduler/mcp/select.py b/apps/scheduler/mcp/select.py
index 2ff5034471c5e9c38f166c6187b76dfb4596f734..f3d6e0d4ae3aea9508d48f33499ac2bcf3de98a9 100644
--- a/apps/scheduler/mcp/select.py
+++ b/apps/scheduler/mcp/select.py
@@ -39,7 +39,6 @@ class MCPSelector:
sql += f"'{mcp_id}', "
return sql.rstrip(", ") + ")"
-
async def _get_top_mcp_by_embedding(
self,
query: str,
@@ -72,7 +71,6 @@ class MCPSelector:
}])
return llm_mcp_list
-
async def _get_mcp_by_llm(
self,
query: str,
@@ -100,7 +98,6 @@ class MCPSelector:
# 使用小模型提取JSON
return await self._call_function_mcp(result, mcp_ids)
-
async def _call_reasoning(self, prompt: str) -> str:
"""调用大模型进行推理"""
logger.info("[MCPHelper] 调用推理大模型")
@@ -116,7 +113,6 @@ class MCPSelector:
self.output_tokens += llm.output_tokens
return result
-
async def _call_function_mcp(self, reasoning_result: str, mcp_ids: list[str]) -> MCPSelectResult:
"""调用结构化输出小模型提取JSON"""
logger.info("[MCPHelper] 调用结构化输出小模型")
@@ -136,7 +132,6 @@ class MCPSelector:
raise
return result
-
async def select_top_mcp(
self,
query: str,
@@ -153,7 +148,6 @@ class MCPSelector:
# 通过LLM选择最合适的
return await self._get_mcp_by_llm(query, llm_mcp_list, mcp_list)
-
@staticmethod
async def select_top_tool(query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTool]:
"""选择最合适的工具"""
diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py
index 3217f539388d428009fc38e787ab0577cb73f51d..ced175ef9b56135caf8c67c4fa71c42406683ec6 100644
--- a/apps/scheduler/mcp_agent/host.py
+++ b/apps/scheduler/mcp_agent/host.py
@@ -60,7 +60,7 @@ class MCPHost:
context_list=task.context,
)
- async def _get_first_input_params(schema: dict[str, Any], query: str) -> dict[str, Any]:
+ async def _get_first_input_params(mcp_tool: MCPTool, query: str, task: Task) -> dict[str, Any]:
"""填充工具参数"""
# 更清晰的输入·指令,这样可以调用generate
llm_query = rf"""
@@ -74,13 +74,13 @@ class MCPHost:
llm_query,
[
{"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": await MCPHost.assemble_memory()},
+ {"role": "user", "content": await MCPHost.assemble_memory(task)},
],
- schema,
+ mcp_tool.input_schema,
)
return await json_generator.generate()
- async def _fill_params(mcp_tool: MCPTool, schema: dict[str, Any],
+ async def _fill_params(mcp_tool: MCPTool,
current_input: dict[str, Any],
error_message: str = "", params: dict[str, Any] = {},
params_description: str = "") -> dict[str, Any]:
@@ -88,7 +88,7 @@ class MCPHost:
prompt = _env.from_string(REPAIR_PARAMS).render(
tool_name=mcp_tool.name,
tool_description=mcp_tool.description,
- input_schema=schema,
+ input_schema=mcp_tool.input_schema,
current_input=current_input,
error_message=error_message,
params=params,
@@ -101,7 +101,7 @@ class MCPHost:
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
- schema,
+ mcp_tool.input_schema,
)
return await json_generator.generate()
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index 13e7a98dc334dc55d74ebd174f644c66fe382c09..91d293fbfe07fe6c241acef1b4f37345a5e9f853 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -9,38 +9,36 @@ from apps.llm.function import JsonGenerator
from apps.scheduler.mcp_agent.prompt import (
EVALUATE_GOAL,
GENERATE_FLOW_NAME,
+ GET_REPLAN_START_STEP_INDEX,
CREATE_PLAN,
RECREATE_PLAN,
RISK_EVALUATE,
+ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS,
GET_MISSING_PARAMS,
FINAL_ANSWER
)
from apps.schemas.mcp import (
GoalEvaluationResult,
+ RestartStepIndex,
ToolRisk,
+ ToolExcutionErrorType,
MCPPlan,
MCPTool
)
from apps.scheduler.slot.slot import Slot
+_env = SandboxedEnvironment(
+ loader=BaseLoader,
+ autoescape=True,
+ trim_blocks=True,
+ lstrip_blocks=True,
+)
+
class MCPPlanner:
"""MCP 用户目标拆解与规划"""
-
- def __init__(self, user_goal: str, resoning_llm: ReasoningLLM = None) -> None:
- """初始化MCP规划器"""
- self.user_goal = user_goal
- self._env = SandboxedEnvironment(
- loader=BaseLoader,
- autoescape=True,
- trim_blocks=True,
- lstrip_blocks=True,
- )
- self.resoning_llm = resoning_llm or ReasoningLLM()
- self.input_tokens = 0
- self.output_tokens = 0
-
- async def get_resoning_result(self, prompt: str) -> str:
+ @staticmethod
+ async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取推理结果"""
# 调用推理大模型
message = [
@@ -48,7 +46,7 @@ class MCPPlanner:
{"role": "user", "content": prompt},
]
result = ""
- async for chunk in self.resoning_llm.call(
+ async for chunk in resoning_llm.call(
message,
streaming=False,
temperature=0.07,
@@ -56,12 +54,10 @@ class MCPPlanner:
):
result += chunk
- # 保存token用量
- self.input_tokens += self.resoning_llm.input_tokens
- self.output_tokens += self.resoning_llm.output_tokens
return result
- async def _parse_result(self, result: str, schema: dict[str, Any]) -> str:
+ @staticmethod
+ async def _parse_result(result: str, schema: dict[str, Any]) -> str:
"""解析推理结果"""
json_generator = JsonGenerator(
result,
@@ -74,126 +70,210 @@ class MCPPlanner:
json_result = await json_generator.generate()
return json_result
- async def evaluate_goal(self, tool_list: list[MCPTool]) -> GoalEvaluationResult:
+ @staticmethod
+ async def evaluate_goal(
+ tool_list: list[MCPTool],
+ resoning_llm: ReasoningLLM = ReasoningLLM()) -> GoalEvaluationResult:
"""评估用户目标的可行性"""
# 获取推理结果
- result = await self._get_reasoning_evaluation(tool_list)
+ result = await MCPPlanner._get_reasoning_evaluation(tool_list, resoning_llm)
# 解析为结构化数据
- evaluation = await self._parse_evaluation_result(result)
+ evaluation = await MCPPlanner._parse_evaluation_result(result)
# 返回评估结果
return evaluation
- async def _get_reasoning_evaluation(self, tool_list: list[MCPTool]) -> str:
+ @staticmethod
+ async def _get_reasoning_evaluation(
+ goal, tool_list: list[MCPTool],
+ resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取推理大模型的评估结果"""
- template = self._env.from_string(EVALUATE_GOAL)
+ template = _env.from_string(EVALUATE_GOAL)
prompt = template.render(
- goal=self.user_goal,
+ goal=goal,
tools=tool_list,
)
- result = await self.get_resoning_result(prompt)
+ result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
return result
- async def _parse_evaluation_result(self, result: str) -> GoalEvaluationResult:
+ @staticmethod
+ async def _parse_evaluation_result(result: str) -> GoalEvaluationResult:
"""将推理结果解析为结构化数据"""
schema = GoalEvaluationResult.model_json_schema()
- evaluation = await self._parse_result(result, schema)
+ evaluation = await MCPPlanner._parse_result(result, schema)
# 使用GoalEvaluationResult模型解析结果
return GoalEvaluationResult.model_validate(evaluation)
- async def get_flow_name(self) -> str:
+ async def get_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取当前流程的名称"""
- result = await self._get_reasoning_flow_name()
+ result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm)
return result
- async def _get_reasoning_flow_name(self) -> str:
+ @staticmethod
+ async def _get_reasoning_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取推理大模型的流程名称"""
- template = self._env.from_string(GENERATE_FLOW_NAME)
- prompt = template.render(goal=self.user_goal)
- result = await self.get_resoning_result(prompt)
+ template = _env.from_string(GENERATE_FLOW_NAME)
+ prompt = template.render(goal=user_goal)
+ result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
return result
- async def create_plan(self, tool_list: list[MCPTool], max_steps: int = 6) -> MCPPlan:
+ @staticmethod
+ async def get_replan_start_step_index(
+ user_goal: str, error_message: str, current_plan: MCPPlan | None = None,
+ history: str = "",
+ reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan:
+ """获取重新规划的步骤索引"""
+ # 获取推理结果
+ template = _env.from_string(GET_REPLAN_START_STEP_INDEX)
+ prompt = template.render(
+ goal=user_goal,
+ error_message=error_message,
+ current_plan=current_plan.model_dump(exclude_none=True, by_alias=True),
+ history=history,
+ )
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ # 解析为结构化数据
+ schema = RestartStepIndex.model_json_schema()
+ schema["properties"]["start_index"]["maximum"] = len(current_plan.plans) - 1
+ schema["properties"]["start_index"]["minimum"] = 0
+ restart_index = await MCPPlanner._parse_result(result, schema)
+ # 使用RestartStepIndex模型解析结果
+ return RestartStepIndex.model_validate(restart_index)
+
+ @staticmethod
+ async def create_plan(
+ user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None,
+ tool_list: list[MCPTool] = [],
+ max_steps: int = 6, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan:
"""规划下一步的执行流程,并输出"""
# 获取推理结果
- result = await self._get_reasoning_plan(tool_list, max_steps)
+ result = await MCPPlanner._get_reasoning_plan(user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm)
# 解析为结构化数据
- return await self._parse_plan_result(result, max_steps)
+ return await MCPPlanner._parse_plan_result(result, max_steps)
+ @staticmethod
async def _get_reasoning_plan(
- self, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan = MCPPlan(),
+ user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None,
tool_list: list[MCPTool] = [],
- max_steps: int = 10) -> str:
+ max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取推理大模型的结果"""
# 格式化Prompt
if is_replan:
- template = self._env.from_string(RECREATE_PLAN)
+ template = _env.from_string(RECREATE_PLAN)
prompt = template.render(
- current_plan=current_plan,
+ current_plan=current_plan.model_dump(exclude_none=True, by_alias=True),
error_message=error_message,
- goal=self.user_goal,
+ goal=user_goal,
tools=tool_list,
max_num=max_steps,
)
else:
- template = self._env.from_string(CREATE_PLAN)
+ template = _env.from_string(CREATE_PLAN)
prompt = template.render(
- goal=self.user_goal,
+ goal=user_goal,
tools=tool_list,
max_num=max_steps,
)
- result = await self.get_resoning_result(prompt)
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
return result
- async def _parse_plan_result(self, result: str, max_steps: int) -> MCPPlan:
+ @staticmethod
+ async def _parse_plan_result(result: str, max_steps: int) -> MCPPlan:
"""将推理结果解析为结构化数据"""
# 格式化Prompt
schema = MCPPlan.model_json_schema()
schema["properties"]["plans"]["maxItems"] = max_steps
- plan = await self._parse_result(result, schema)
+ plan = await MCPPlanner._parse_result(result, schema)
# 使用Function模型解析结果
return MCPPlan.model_validate(plan)
- async def get_tool_risk(self, tool: MCPTool, input_parm: dict[str, Any], additional_info: str = "") -> ToolRisk:
+ @staticmethod
+ async def get_tool_risk(
+ tool: MCPTool, input_parm: dict[str, Any],
+ additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk:
"""获取MCP工具的风险评估结果"""
# 获取推理结果
- result = await self._get_reasoning_risk(tool, input_parm, additional_info)
+ result = await MCPPlanner._get_reasoning_risk(tool, input_parm, additional_info, resoning_llm)
# 解析为结构化数据
- risk = await self._parse_risk_result(result)
+ risk = await MCPPlanner._parse_risk_result(result)
# 返回风险评估结果
return risk
- async def _get_reasoning_risk(self, tool: MCPTool, input_param: dict[str, Any], additional_info: str) -> str:
+ @staticmethod
+ async def _get_reasoning_risk(
+ tool: MCPTool, input_param: dict[str, Any],
+ additional_info: str, resoning_llm: ReasoningLLM) -> str:
"""获取推理大模型的风险评估结果"""
- template = self._env.from_string(RISK_EVALUATE)
+ template = _env.from_string(RISK_EVALUATE)
prompt = template.render(
tool_name=tool.name,
tool_description=tool.description,
input_param=input_param,
additional_info=additional_info,
)
- result = await self.get_resoning_result(prompt)
+ result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
return result
- async def _parse_risk_result(self, result: str) -> ToolRisk:
+ @staticmethod
+ async def _parse_risk_result(result: str) -> ToolRisk:
"""将推理结果解析为结构化数据"""
schema = ToolRisk.model_json_schema()
- risk = await self._parse_result(result, schema)
+ risk = await MCPPlanner._parse_result(result, schema)
# 使用ToolRisk模型解析结果
return ToolRisk.model_validate(risk)
+ @staticmethod
+ async def _get_reasoning_tool_execute_error_type(
+ user_goal: str, current_plan: MCPPlan,
+ tool: MCPTool, input_param: dict[str, Any],
+ error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ """获取推理大模型的工具执行错误类型"""
+ template = _env.from_string(TOOL_EXECUTE_ERROR_TYPE_ANALYSIS)
+ prompt = template.render(
+ goal=user_goal,
+ current_plan=current_plan.model_dump(exclude_none=True, by_alias=True),
+ tool_name=tool.name,
+ tool_description=tool.description,
+ input_param=input_param,
+ error_message=error_message,
+ )
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ return result
+
+ @staticmethod
+ async def _parse_tool_execute_error_type_result(result: str) -> ToolExcutionErrorType:
+ """将推理结果解析为工具执行错误类型"""
+ schema = ToolExcutionErrorType.model_json_schema()
+ error_type = await MCPPlanner._parse_result(result, schema)
+ # 使用ToolExcutionErrorType模型解析结果
+ return ToolExcutionErrorType.model_validate(error_type)
+
+ @staticmethod
+ async def get_tool_execute_error_type(
+ user_goal: str, current_plan: MCPPlan,
+ tool: MCPTool, input_param: dict[str, Any],
+ error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolExcutionErrorType:
+ """获取MCP工具执行错误类型"""
+ # 获取推理结果
+ result = await MCPPlanner._get_reasoning_tool_execute_error_type(
+ user_goal, current_plan, tool, input_param, error_message, reasoning_llm)
+ error_type = await MCPPlanner._parse_tool_execute_error_type_result(result)
+ # 返回工具执行错误类型
+ return error_type
+
+ @staticmethod
async def get_missing_param(
- self, tool: MCPTool, schema: dict[str, Any],
+ tool: MCPTool,
input_param: dict[str, Any],
- error_message: str) -> list[str]:
+ error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> list[str]:
"""获取缺失的参数"""
- slot = Slot(schema=schema)
+ slot = Slot(schema=tool.input_schema)
+ template = _env.from_string(GET_MISSING_PARAMS)
schema_with_null = slot.add_null_to_basic_types()
- template = self._env.from_string(GET_MISSING_PARAMS)
prompt = template.render(
tool_name=tool.name,
tool_description=tool.description,
@@ -201,26 +281,26 @@ class MCPPlanner:
schema=schema_with_null,
error_message=error_message,
)
- result = await self.get_resoning_result(prompt)
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
# 解析为结构化数据
- input_param_with_null = await self._parse_result(result, schema_with_null)
+ input_param_with_null = await MCPPlanner._parse_result(result, schema_with_null)
return input_param_with_null
- async def generate_answer(self, plan: MCPPlan, memory: str) -> AsyncGenerator[str, None]:
+ @staticmethod
+ async def generate_answer(
+ user_goal: str, plan: MCPPlan, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[
+ str, None]:
"""生成最终回答"""
- template = self._env.from_string(FINAL_ANSWER)
+ template = _env.from_string(FINAL_ANSWER)
prompt = template.render(
- plan=plan,
+ plan=plan.model_dump(exclude_none=True, by_alias=True),
memory=memory,
- goal=self.user_goal,
+ goal=user_goal,
)
- async for chunk in self.resoning_llm.call(
+ async for chunk in resoning_llm.call(
[{"role": "user", "content": prompt}],
streaming=False,
temperature=0.07,
):
yield chunk
-
- self.input_tokens = self.resoning_llm.input_tokens
- self.output_tokens = self.resoning_llm.output_tokens
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index 9cbc2f5bcb8e643bf9a844a527c740862fe9d180..b5bc085c642eb1cf800a3a37684e594e041a535b 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -62,6 +62,62 @@ MCP_SELECT = dedent(r"""
### 请一步一步思考:
""")
+TOOL_SELECT = dedent(r"""
+ 你是一个乐于助人的智能助手。
+ 你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。
+ ## 选择MCP工具时的注意事项:
+ 1. 确保充分理解当前目标,选择实现目标所需的MCP工具。
+ 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。
+ 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。
+ 必须按照以下格式生成选择结果,不要输出任何其他内容:
+ ```json
+ {
+ "tool_ids": ["工具ID1", "工具ID2", ...]
+ }
+ ```
+
+ # 示例
+ ## 目标
+ 调优mysql性能
+ ## MCP工具列表
+
+ - mcp_tool_1 MySQL链接池工具;用于优化MySQL链接池
+ - mcp_tool_2 MySQL性能调优工具;用于分析MySQL性能瓶颈
+ - mcp_tool_3 MySQL查询优化工具;用于优化MySQL查询语句
+ - mcp_tool_4 MySQL索引优化工具;用于优化MySQL索引
+ - mcp_tool_5 文件存储工具;用于存储文件
+ - mcp_tool_6 mongoDB工具;用于操作MongoDB数据库
+
+ ## 附加信息
+ 1. 当前MySQL数据库的版本是8.0.26
+ 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf,并含有以下配置项
+ ```json
+ {
+ "max_connections": 1000,
+ "innodb_buffer_pool_size": "1G",
+ "query_cache_size": "64M"
+ }
+ ##输出
+ ```json
+ {
+ "tool_ids": ["mcp_tool_1", "mcp_tool_2", "mcp_tool_3", "mcp_tool_4"]
+ }
+ ```
+ # 现在开始!
+ ## 目标
+ {{goal}}
+ ## MCP工具列表
+
+ {% for tool in tools %}
+ - {{tool.id}} {{tool.name}};{{tool.description}}
+ {% endfor %}
+
+ ## 附加信息
+ {{additional_info}}
+ # 输出
+ """
+ )
+
EVALUATE_GOAL = dedent(r"""
你是一个计划评估器。
请根据用户的目标和当前的工具集合以及一些附加信息,判断基于当前的工具集合,是否能够完成用户的目标。
@@ -76,18 +132,18 @@ EVALUATE_GOAL = dedent(r"""
```
# 样例
- ## 目标
- 我需要扫描当前mysql数据库,分析性能瓶颈,并调优
+ # 目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
- ## 工具集合
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
+ # 工具集合
+ 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
- - mysql_analyzer分析MySQL数据库性能
- - performance_tuner调优数据库性能
- - Final结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ - mysql_analyzer 分析MySQL数据库性能
+ - performance_tuner 调优数据库性能
+ - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
- ## 附加信息
+ # 附加信息
1. 当前MySQL数据库的版本是8.0.26
2. 当前MySQL数据库的配置文件路径是/etc/my.cnf
@@ -100,17 +156,17 @@ EVALUATE_GOAL = dedent(r"""
```
# 目标
- {{ goal }}
+ {{goal}}
# 工具集合
- {% for tool in tools %}
- - {{ tool.id }}{{tool.name}};{{ tool.description }}
- {% endfor %}
+ { % for tool in tools % }
+ - {{tool.id}} {{tool.name}};{{tool.description}}
+ { % endfor % }
# 附加信息
- {{ additional_info }}
+ {{additional_info}}
""")
GENERATE_FLOW_NAME = dedent(r"""
@@ -123,15 +179,79 @@ GENERATE_FLOW_NAME = dedent(r"""
4. 流程名称应该尽量简短,小于20个字或者单词。
5. 只输出流程名称,不要输出其他内容。
# 样例
- ## 目标
- 我需要扫描当前mysql数据库,分析性能瓶颈,并调优
- ## 输出
+ # 目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
+ # 输出
扫描MySQL数据库并分析性能瓶颈,进行调优
# 现在开始生成流程名称:
# 目标
- {{ goal }}
+ {{goal}}
# 输出
""")
+GET_REPLAN_START_STEP_INDEX = dedent(r"""
+ 你是一个智能助手,你的任务是根据用户的目标、报错信息和当前计划和历史,获取重新规划的步骤起始索引。
+
+ # 样例
+ # 目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
+ # 报错信息
+ 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。
+ # 当前计划
+ ```json
+ {
+ "plans": [
+ {
+ "step_id": "step_1",
+ "content": "生成端口扫描命令",
+ "tool": "command_generator",
+ "instruction": "生成端口扫描命令:扫描
+ },
+ {
+ "step_id": "step_2",
+ "content": "在执行Result[0]生成的命令",
+ "tool": "command_executor",
+ "instruction": "执行端口扫描命令"
+ }
+ ]
+ }
+ # 历史
+ [
+ {
+ id: "0",
+ task_id: "task_1",
+ flow_id: "flow_1",
+ flow_name: "MYSQL性能调优",
+ flow_status: "RUNNING",
+ step_id: "step_1",
+ step_name: "生成端口扫描命令",
+ step_description: "生成端口扫描命令:扫描当前MySQL数据库的端口",
+ step_status: "FAILED",
+ input_data: {
+ "command": "nmap -p 3306
+ "target": "localhost"
+ },
+ output_data: {
+ "error": "- bash: curl: command not found"
+ }
+ }
+ ]
+ # 输出
+ {
+ "start_index": 0,
+ "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,因此需要从第一步重新规划。"
+ }
+ # 现在开始获取重新规划的步骤起始索引:
+ # 目标
+ {{goal}}
+ # 报错信息
+ {{error_message}}
+ # 当前计划
+ {{current_plan}}
+ # 历史
+ {{history}}
+ # 输出
+ """)
+
CREATE_PLAN = dedent(r"""
你是一个计划生成器。
请分析用户的目标,并生成一个计划。你后续将根据这个计划,一步一步地完成用户的目标。
@@ -163,40 +283,38 @@ CREATE_PLAN = dedent(r"""
}
```
- - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。\
-思考过程应放置在 XML标签中。
+ - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。
+思考过程应放置在 XML标签中。
- 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。
- - 计划不得多于{{ max_num }}条,且每条计划内容应少于150字。
+ - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。
# 工具
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
+ 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
- {% for tool in tools %}
- - {{ tool.id }}{{tool.name}};{{ tool.description }}
- {% endfor %}
+ { % for tool in tools % }
+ - {{tool.id}} {{tool.name}};{{tool.description}}
+ { % endfor % }
# 样例
- ## 目标
+ # 目标
- 在后台运行一个新的alpine:latest容器,将主机/root文件夹挂载至/data,并执行top命令。
+ 在后台运行一个新的alpine: latest容器,将主机/root文件夹挂载至/data,并执行top命令。
- ## 计划
+ # 计划
- 1. 这个目标需要使用Docker来完成,首先需要选择合适的MCP Server
+ 1. 这个目标需要使用Docker来完成, 首先需要选择合适的MCP Server
2. 目标可以拆解为以下几个部分:
- - 运行alpine:latest容器
+ - 运行alpine: latest容器
- 挂载主机目录
- 在后台运行
- 执行top命令
- 3. 需要先选择MCP Server,然后生成Docker命令,最后执行命令
-
-
- ```json
+ 3. 需要先选择MCP Server, 然后生成Docker命令, 最后执行命令
+ ```json
{
"plans": [
{
@@ -225,7 +343,7 @@ CREATE_PLAN = dedent(r"""
# 现在开始生成计划:
- ## 目标
+ # 目标
{{goal}}
@@ -263,26 +381,24 @@ RECREATE_PLAN = dedent(r"""
}
```
- - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。\
-思考过程应放置在 XML标签中。
+ - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。
+思考过程应放置在 XML标签中。
- 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。
- - 计划不得多于{{ max_num }}条,且每条计划内容应少于150字。
+ - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。
# 样例
- ## 目标
+ # 目标
请帮我扫描一下192.168.1.1的这台机器的端口,看看有哪些端口开放。
- ## 工具
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
+ # 工具
+ 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
- - command_generator生成命令行指令
- - tool_selector选择合适的工具
- - command_executor执行命令行指令
- - Final结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
-
-
- ## 当前计划
+ - command_generator 生成命令行指令
+ - tool_selector 选择合适的工具
+ - command_executor 执行命令行指令
+ - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ # 当前计划
```json
{
"plans": [
@@ -304,25 +420,23 @@ RECREATE_PLAN = dedent(r"""
]
}
```
- ## 运行报错
- 执行端口扫描命令时,出现了错误:`-bash: curl: command not found`。
- ## 重新生成的计划
+ # 运行报错
+ 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。
+ # 重新生成的计划
- 1. 这个目标需要使用网络扫描工具来完成,首先需要选择合适的网络扫描工具
+ 1. 这个目标需要使用网络扫描工具来完成, 首先需要选择合适的网络扫描工具
2. 目标可以拆解为以下几个部分:
- 生成端口扫描命令
- 执行端口扫描命令
- 3.但是在执行端口扫描命令时,出现了错误:`-bash: curl: command not found`。
+ 3.但是在执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。
4.我将计划调整为:
- 需要先生成一个命令,查看当前机器支持哪些网络扫描工具
- 执行这个命令,查看当前机器支持哪些网络扫描工具
- 然后从中选择一个网络扫描工具
- 基于选择的网络扫描工具,生成端口扫描命令
- 执行端口扫描命令
-
-
- ```json
+ ```json
{
"plans": [
{
@@ -367,19 +481,19 @@ RECREATE_PLAN = dedent(r"""
# 工具
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
+ 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
- {% for tool in tools %}
- - {{ tool.id }}{{tool.name}};{{ tool.description }}
- {% endfor %}
+ { % for tool in tools % }
+ - {{tool.id}} {{tool.name}};{{tool.description}}
+ { % endfor % }
# 当前计划
- {{ current_plan }}
+ {{current_plan}}
# 运行报错
- {{ error_message }}
+ {{error_message}}
# 重新生成的计划
""")
@@ -393,18 +507,18 @@ RISK_EVALUATE = dedent(r"""
}
```
# 样例
- ## 工具名称
+ # 工具名称
mysql_analyzer
- ## 工具描述
+ # 工具描述
分析MySQL数据库性能
- ## 工具入参
+ # 工具入参
{
"host": "192.0.0.1",
"port": 3306,
"username": "root",
"password": "password"
}
- ## 附加信息
+ # 附加信息
1. 当前MySQL数据库的版本是8.0.26
2. 当前MySQL数据库的配置文件路径是/etc/my.cnf,并含有以下配置项
```ini
@@ -412,7 +526,7 @@ RISK_EVALUATE = dedent(r"""
innodb_buffer_pool_size=1G
innodb_log_file_size=256M
```
- ## 输出
+ # 输出
```json
{
"risk": "中",
@@ -421,35 +535,35 @@ RISK_EVALUATE = dedent(r"""
```
# 工具
- {{ tool_name }}
- {{ tool_description }}
+ {{tool_name}}
+ {{tool_description}}
# 工具入参
- {{ input_param }}
+ {{input_param}}
# 附加信息
- {{ additional_info }}
+ {{additional_info}}
# 输出
"""
)
# 根据当前计划和报错信息决定下一步执行,具体计划有需要用户补充工具入参、重计划当前步骤、重计划接下来的所有计划
-JUDGE_NEXT_STEP = dedent(r"""
+TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
你是一个计划决策器。
- 你的任务是根据当前计划、当前使用的工具、工具入参和工具运行报错,决定下一步执行的操作。
+ 你的任务是根据用户目标、当前计划、当前使用的工具、工具入参和工具运行报错,决定下一步执行的操作。
请根据以下规则进行判断:
- 1. 仅通过补充工具入参来解决问题的,返回 fill_params;
- 2. 需要重计划当前步骤的,返回 replan_current_step;
- 3. 需要重计划接下来的所有计划的,返回 replan_all_steps;
+ 1. 仅通过补充工具入参来解决问题的,返回 missing_param;
+ 2. 需要重计划当前步骤的,返回 decorrect_plan
+ 3.推理过程必须清晰明了,能够让人理解你的判断依据,并且不超过100字。
你的输出要以json格式返回,格式如下:
```json
{
- "next_step": "fill_params/replan_current_step/replan_all_steps",
- "reason": "你的判断依据"
+ "error_type": "missing_param/decorrect_plan,
+ "reason": "你的推理过程"
}
```
- 注意:
- reason字段必须清晰明了,能够让人理解你的判断依据,并且不超过50个中文字或者100个英文单词。
# 样例
- ## 当前计划
+ # 用户目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
+ # 当前计划
{"plans": [
{
"content": "生成端口扫描命令",
@@ -467,38 +581,40 @@ JUDGE_NEXT_STEP = dedent(r"""
"instruction": ""
}
]}
- ## 当前使用的工具
+ # 当前使用的工具
- command_executor
- 执行命令行指令
+ command_executor
+ 执行命令行指令
- ## 工具入参
+ # 工具入参
{
"command": "nmap -sS -p--open 192.168.1.1"
}
- ## 工具运行报错
- 执行端口扫描命令时,出现了错误:`-bash: nmap: command not found`。
- ## 输出
+ # 工具运行报错
+ 执行端口扫描命令时,出现了错误:`- bash: nmap: command not found`。
+ # 输出
```json
{
- "next_step": "replan_all_steps",
- "reason": "当前工具执行报错,提示nmap命令未找到,需要增加command_generator和command_executor的步骤,生成nmap安装命令并执行,之后再生成端口扫描命令并执行。"
+ "error_type": "decorrect_plan",
+ "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,因此需要重计划当前步骤。"
}
```
+ # 用户目标
+ {{goal}}
# 当前计划
- {{ current_plan }}
+ {{current_plan}}
# 当前使用的工具
- {{ tool_name }}
- {{ tool_description }}
+ {{tool_name}}
+ {{tool_description}}
# 工具入参
- {{ input_param }}
+ {{input_param}}
# 工具运行报错
- {{ error_message }}
+ {{error_message}}
# 输出
"""
- )
+ )
# 获取缺失的参数的json结构体
GET_MISSING_PARAMS = dedent(r"""
你是一个工具参数获取器。
@@ -570,10 +686,10 @@ GET_MISSING_PARAMS = dedent(r"""
}
```
# 工具
- < tool >
- < name > {{tool_name}} < /name >
- < description > {{tool_description}} < /description >
- < / tool >
+
+ {{tool_name}}
+ {{tool_description}}
+
# 工具入参
{{input_param}}
# 工具入参schema(部分字段允许为null)
@@ -588,12 +704,12 @@ REPAIR_PARAMS = dedent(r"""
你的任务是根据当前的工具信息、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。
# 样例
- ## 工具信息
+ # 工具信息
- mysql_analyzer
- 分析MySQL数据库性能
+ mysql_analyzer
+ 分析MySQL数据库性能
- ## 工具入参的schema
+ # 工具入参的schema
{
"type": "object",
"properties": {
@@ -616,21 +732,21 @@ REPAIR_PARAMS = dedent(r"""
},
"required": ["host", "port", "username", "password"]
}
- ## 工具当前的入参
+ # 工具当前的入参
{
"host": "192.0.0.1",
"port": 3306,
"username": "root",
"password": "password"
}
- ## 工具的报错
+ # 工具的报错
执行端口扫描命令时,出现了错误:`password is not correct`。
- ## 补充的参数
+ # 补充的参数
{
"username": "admin",
"password": "admin123"
}
- ## 补充的参数描述
+ # 补充的参数描述
用户希望使用admin用户和admin123密码来连接MySQL数据库。
# 输出
```json
@@ -643,8 +759,8 @@ REPAIR_PARAMS = dedent(r"""
```
# 工具
- {{tool_name}}
- {{tool_description}}
+ {{tool_name}}
+ {{tool_description}}
# 工具入参scheme
{{input_schema}}
@@ -664,17 +780,17 @@ FINAL_ANSWER = dedent(r"""
# 用户目标
- {{ goal }}
+ {{goal}}
# 计划执行情况
为了完成上述目标,你实施了以下计划:
- {{ memory }}
+ {{memory}}
# 其他背景信息:
- {{ status }}
+ {{status}}
# 现在,请根据以上信息,向用户报告目标的完成情况:
diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py
index 37d1e752a55b78c95d5ff48782267924bdff65d7..933527c38bb94a05632b2586c8b49f4487bbd3fd 100644
--- a/apps/scheduler/mcp_agent/select.py
+++ b/apps/scheduler/mcp_agent/select.py
@@ -2,7 +2,7 @@
"""选择MCP Server及其工具"""
import logging
-import uuid
+import random
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from typing import AsyncGenerator
@@ -13,176 +13,94 @@ from apps.common.mongo import MongoDB
from apps.llm.embedding import Embedding
from apps.llm.function import FunctionLLM
from apps.llm.reasoning import ReasoningLLM
-from apps.scheduler.mcp.prompt import (
- MCP_SELECT,
-)
+from apps.llm.token import TokenCalculator
+from apps.scheduler.mcp_agent.prompt import TOOL_SELECT
from apps.schemas.mcp import (
+ BaseModel,
MCPCollection,
MCPSelectResult,
MCPTool,
+ MCPToolIdsSelectResult
)
-
+from apps.common.config import Config
logger = logging.getLogger(__name__)
+_env = SandboxedEnvironment(
+ loader=BaseLoader,
+ autoescape=True,
+ trim_blocks=True,
+ lstrip_blocks=True,
+)
-class MCPSelector:
- """MCP选择器"""
-
- def __init__(self, resoning_llm: ReasoningLLM = None) -> None:
- """初始化助手类"""
- self.resoning_llm = resoning_llm or ReasoningLLM()
- self.input_tokens = 0
- self.output_tokens = 0
-
- @staticmethod
- def _assemble_sql(mcp_list: list[str]) -> str:
- """组装SQL"""
- sql = "("
- for mcp_id in mcp_list:
- sql += f"'{mcp_id}', "
- return sql.rstrip(", ") + ")"
-
- async def _get_top_mcp_by_embedding(
- self,
- query: str,
- mcp_list: list[str],
- ) -> list[dict[str, str]]:
- """通过向量检索获取Top5 MCP Server"""
- logger.info("[MCPHelper] 查询MCP Server向量: %s, %s", query, mcp_list)
- mcp_table = await LanceDB().get_table("mcp")
- query_embedding = await Embedding.get_embedding([query])
- mcp_vecs = await (await mcp_table.search(
- query=query_embedding,
- vector_column_name="embedding",
- )).where(f"id IN {MCPSelector._assemble_sql(mcp_list)}").limit(5).to_list()
-
- # 拿到名称和description
- logger.info("[MCPHelper] 查询MCP Server名称和描述: %s", mcp_vecs)
- mcp_collection = MongoDB().get_collection("mcp")
- llm_mcp_list: list[dict[str, str]] = []
- for mcp_vec in mcp_vecs:
- mcp_id = mcp_vec["id"]
- mcp_data = await mcp_collection.find_one({"_id": mcp_id})
- if not mcp_data:
- logger.warning("[MCPHelper] 查询MCP Server名称和描述失败: %s", mcp_id)
- continue
- mcp_data = MCPCollection.model_validate(mcp_data)
- llm_mcp_list.extend([{
- "id": mcp_id,
- "name": mcp_data.name,
- "description": mcp_data.description,
- }])
- return llm_mcp_list
-
- async def _get_mcp_by_llm(
- self,
- query: str,
- mcp_list: list[dict[str, str]],
- mcp_ids: list[str],
- ) -> MCPSelectResult:
- """通过LLM选择最合适的MCP Server"""
- # 初始化jinja2环境
- env = SandboxedEnvironment(
- loader=BaseLoader,
- autoescape=True,
- trim_blocks=True,
- lstrip_blocks=True,
- )
- template = env.from_string(MCP_SELECT)
- # 渲染模板
- mcp_prompt = template.render(
- mcp_list=mcp_list,
- goal=query,
- )
-
- # 调用大模型进行推理
- result = await self._call_reasoning(mcp_prompt)
-
- # 使用小模型提取JSON
- return await self._call_function_mcp(result, mcp_ids)
-
- async def _call_reasoning(self, prompt: str) -> AsyncGenerator[str, None]:
- """调用大模型进行推理"""
- logger.info("[MCPHelper] 调用推理大模型")
- message = [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
- ]
- async for chunk in self.resoning_llm.call(message):
- yield chunk
-
- async def _call_function_mcp(self, reasoning_result: str, mcp_ids: list[str]) -> MCPSelectResult:
- """调用结构化输出小模型提取JSON"""
- logger.info("[MCPHelper] 调用结构化输出小模型")
- llm = FunctionLLM()
- message = [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": reasoning_result},
- ]
- schema = MCPSelectResult.model_json_schema()
- # schema中加入选项
- schema["properties"]["mcp_id"]["enum"] = mcp_ids
- result = await llm.call(messages=message, schema=schema)
- try:
- result = MCPSelectResult.model_validate(result)
- except Exception:
- logger.exception("[MCPHelper] 解析MCP Select Result失败")
- raise
- return result
-
- async def select_top_mcp(
- self,
- query: str,
- mcp_list: list[str],
- ) -> MCPSelectResult:
- """
- 选择最合适的MCP Server
+FINAL_TOOL_ID = "FIANL"
+SUMMARIZE_TOOL_ID = "SUMMARIZE"
- 先通过Embedding选择Top5,然后通过LLM选择Top 1
- """
- # 通过向量检索获取Top5
- llm_mcp_list = await self._get_top_mcp_by_embedding(query, mcp_list)
- # 通过LLM选择最合适的
- return await self._get_mcp_by_llm(query, llm_mcp_list, mcp_list)
+class MCPSelector:
+ """MCP选择器"""
@staticmethod
- async def select_top_tool(query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTool]:
+ async def select_top_tool(
+ goal: str, tool_list: list[MCPTool],
+ additional_info: str | None = None, top_n: int | None = None) -> list[MCPTool]:
"""选择最合适的工具"""
- tool_vector = await LanceDB().get_table("mcp_tool")
- query_embedding = await Embedding.get_embedding([query])
- tool_vecs = await (await tool_vector.search(
- query=query_embedding,
- vector_column_name="embedding",
- )).where(f"mcp_id IN {MCPSelector._assemble_sql(mcp_list)}").limit(top_n).to_list()
-
- # 拿到工具
- tool_collection = MongoDB().get_collection("mcp")
- llm_tool_list = []
-
- for tool_vec in tool_vecs:
- # 到MongoDB里找对应的工具
- logger.info("[MCPHelper] 查询MCP Tool名称和描述: %s", tool_vec["mcp_id"])
- tool_data = await tool_collection.aggregate([
- {"$match": {"_id": tool_vec["mcp_id"]}},
- {"$unwind": "$tools"},
- {"$match": {"tools.id": tool_vec["id"]}},
- {"$project": {"_id": 0, "tools": 1}},
- {"$replaceRoot": {"newRoot": "$tools"}},
- ])
- async for tool in tool_data:
- tool_obj = MCPTool.model_validate(tool)
- llm_tool_list.append(tool_obj)
- llm_tool_list.append(
- MCPTool(
- id="00000000-0000-0000-0000-000000000000",
- name="Final",
- description="It is the final step, indicating the end of the plan execution.")
- )
- llm_tool_list.append(
- MCPTool(
- id="00000000-0000-0000-0000-000000000001",
- name="Chat",
- description="It is a chat tool to communicate with the user.")
- )
- return llm_tool_list
+ random.shuffle(tool_list)
+ max_tokens = Config().get_config().function_call.max_tokens
+ template = _env.from_string(TOOL_SELECT)
+ if TokenCalculator.calculate_token_length(
+ messages=[{"role": "user", "content": template.render(
+ goal=goal, tools=[], additional_info=additional_info
+ )}],
+ pure_text=True) > max_tokens:
+ logger.warning("[MCPSelector] 工具选择模板长度超过最大令牌数,无法进行选择")
+ return []
+ llm = FunctionLLM()
+ current_index = 0
+ tool_ids = []
+ while current_index < len(tool_list):
+ index = current_index
+ sub_tools = []
+ while index < len(tool_list):
+ tool = tool_list[index]
+ tokens = TokenCalculator.calculate_token_length(
+ messages=[{"role": "user", "content": template.render(
+ goal=goal, tools=[tool],
+ additional_info=additional_info
+ )}],
+ pure_text=True
+ )
+ if tokens > max_tokens:
+ continue
+ sub_tools.append(tool)
+
+ tokens = TokenCalculator.calculate_token_length(messages=[{"role": "user", "content": template.render(
+ goal=goal, tools=sub_tools, additional_info=additional_info)}, ], pure_text=True)
+ if tokens > max_tokens:
+ del sub_tools[-1]
+ break
+ else:
+ index += 1
+ current_index = index
+ if sub_tools:
+ message = [
+ {"role": "system", "content": "You are a helpful assistant."},
+ {"role": "user", "content": template.render(tools=sub_tools)},
+ ]
+ schema = MCPToolIdsSelectResult.model_json_schema()
+ schema["properties"]["tool_ids"]["enum"] = [tool.id for tool in sub_tools]
+ result = await llm.call(messages=message, schema=schema)
+ try:
+ result = MCPToolIdsSelectResult.model_validate(result)
+ tool_ids.extend(result.tool_ids)
+ except Exception:
+ logger.exception("[MCPSelector] 解析MCP工具ID选择结果失败")
+ continue
+ mcp_tools = [tool for tool in tool_list if tool.id in tool_ids]
+
+ if top_n is not None:
+ mcp_tools = mcp_tools[:top_n]
+ mcp_tools.append(MCPTool(id=FINAL_TOOL_ID, name="Final",
+ description="终止", mcp_id=FINAL_TOOL_ID, input_schema={}))
+ # mcp_tools.append(MCPTool(id=SUMMARIZE_TOOL_ID, name="Summarize",
+ # description="总结工具", mcp_id=SUMMARIZE_TOOL_ID, input_schema={}))
+ return mcp_tools
diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py
index f6325369355489cff2577a15d24817b8d54e26dd..b81448475c690f2772442f808ff629b139b6039f 100644
--- a/apps/scheduler/scheduler/scheduler.py
+++ b/apps/scheduler/scheduler/scheduler.py
@@ -4,7 +4,9 @@
import asyncio
import logging
from datetime import UTC, datetime
-
+from apps.llm.reasoning import ReasoningLLM
+from apps.schemas.config import LLMConfig
+from apps.llm.patterns.rewrite import QuestionRewrite
from apps.common.config import Config
from apps.common.mongo import MongoDB
from apps.common.queue import MessageQueue
@@ -67,8 +69,8 @@ class Scheduler:
except Exception as e:
logger.error(f"[Scheduler] 活动监控过程中发生错误: {e}")
- async def run(self) -> None: # noqa: PLR0911
- """运行调度器"""
+ async def get_llm_use_in_chat_with_rag(self) -> LLM:
+ """获取RAG大模型"""
try:
# 获取当前会话使用的大模型
llm_id = await LLMManager.get_llm_id_by_conversation_id(
@@ -97,14 +99,25 @@ class Scheduler:
logger.exception("[Scheduler] 获取大模型失败")
await self.queue.close()
return
+
+ async def get_kb_ids_use_in_chat_with_rag(self) -> list[str]:
+ """获取知识库ID列表"""
try:
- # 获取当前会话使用的知识库
kb_ids = await KnowledgeBaseManager.get_kb_ids_by_conversation_id(
- self.task.ids.user_sub, self.task.ids.conversation_id)
+ self.task.ids.user_sub, self.task.ids.conversation_id,
+ )
+ if not kb_ids:
+ logger.error("[Scheduler] 获取知识库ID失败")
+ await self.queue.close()
+ return []
+ return kb_ids
except Exception:
logger.exception("[Scheduler] 获取知识库ID失败")
await self.queue.close()
- return
+ return []
+
+ async def run(self) -> None: # noqa: PLR0911
+ """运行调度器"""
try:
# 获取当前问答可供关联的文档
docs, doc_ids = await get_docs(self.task.ids.user_sub, self.post_body)
@@ -114,13 +127,18 @@ class Scheduler:
return
history, _ = await get_context(self.task.ids.user_sub, self.post_body, 3)
# 已使用文档
-
# 如果是智能问答,直接执行
logger.info("[Scheduler] 开始执行")
# 创建用于通信的事件
kill_event = asyncio.Event()
monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.ids.user_sub))
if not self.post_body.app or self.post_body.app.app_id == "":
+ llm = await self.get_llm_use_in_chat_with_rag()
+ kb_ids = await self.get_kb_ids_use_in_chat_with_rag()
+ if not llm:
+ logger.error("[Scheduler] 获取大模型失败")
+ await self.queue.close()
+ return
self.task = await push_init_message(self.task, self.queue, 3, is_flow=False)
rag_data = RAGQueryReq(
kbIds=kb_ids,
@@ -199,6 +217,27 @@ class Scheduler:
if not app_metadata:
logger.error("[Scheduler] 未找到Agent应用")
return
+ llm = await LLMManager.get_llm_by_id(
+ self.task.ids.user_sub, app_metadata.llm_id,
+ )
+ if not llm:
+ logger.error("[Scheduler] 获取大模型失败")
+ await self.queue.close()
+ return
+ reasion_llm = ReasoningLLM(
+ LLMConfig(
+ endpoint=llm.openai_base_url,
+ key=llm.openai_api_key,
+ model=llm.model_name,
+ max_tokens=llm.max_tokens,
+ )
+ )
+ if background.conversation:
+ try:
+ question_obj = QuestionRewrite()
+ post_body.question = await question_obj.generate(history=background.conversation, question=post_body.question, llm=reasion_llm)
+ except Exception:
+ logger.exception("[Scheduler] 问题重写失败")
if app_metadata.app_type == AppType.FLOW.value:
logger.info("[Scheduler] 获取工作流元数据")
flow_info = await Pool().get_flow_metadata(app_info.app_id)
@@ -229,8 +268,6 @@ class Scheduler:
# 初始化Executor
logger.info("[Scheduler] 初始化Executor")
- logger.error(f"{flow_data}")
- logger.error(f"{self.task}")
flow_exec = FlowExecutor(
flow_id=flow_id,
flow=flow_data,
@@ -258,6 +295,7 @@ class Scheduler:
servers_id=servers_id,
background=background,
agent_id=app_info.app_id,
+ params=post_body.app.params
)
# 开始运行
logger.info("[Scheduler] 运行Executor")
diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py
index 3fb65028799b91652257f994df6887f38acb7631..3bcabd5795657cc1c495189963b5f734b8beb8eb 100644
--- a/apps/schemas/enum_var.py
+++ b/apps/schemas/enum_var.py
@@ -15,6 +15,7 @@ class SlotType(str, Enum):
class StepStatus(str, Enum):
"""步骤状态"""
UNKNOWN = "unknown"
+ INIT = "init"
WAITING = "waiting"
RUNNING = "running"
SUCCESS = "success"
@@ -55,12 +56,15 @@ class EventType(str, Enum):
STEP_WAITING_FOR_START = "step.waiting_for_start"
STEP_WAITING_FOR_PARAM = "step.waiting_for_param"
FLOW_START = "flow.start"
+ STEP_INIT = "step.init"
STEP_INPUT = "step.input"
STEP_OUTPUT = "step.output"
+ STEP_CANCEL = "step.cancel"
+ STEP_ERROR = "step.error"
FLOW_STOP = "flow.stop"
FLOW_FAILED = "flow.failed"
FLOW_SUCCESS = "flow.success"
- FLOW_CANCELLED = "flow.cancelled"
+ FLOW_CANCEL = "flow.cancel"
DONE = "done"
diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py
index 368865ac526edf2f6f6075a7409cdd277a70de2a..21c403d4a7a8a4a744a16231b8427e2e736f8020 100644
--- a/apps/schemas/mcp.py
+++ b/apps/schemas/mcp.py
@@ -111,6 +111,13 @@ class GoalEvaluationResult(BaseModel):
reason: str = Field(description="评估原因")
+class RestartStepIndex(BaseModel):
+ """MCP重新规划的步骤索引"""
+
+ start_index: int = Field(description="重新规划的起始步骤索引")
+ reasoning: str = Field(description="重新规划的原因")
+
+
class Risk(str, Enum):
"""MCP工具风险类型"""
@@ -126,6 +133,20 @@ class ToolRisk(BaseModel):
reason: str = Field(description="风险原因", default="")
+class ErrorType(str, Enum):
+ """MCP工具错误类型"""
+
+ MISSING_PARAM = "missing_param"
+ DECORRECT_PLAN = "decorrect_plan"
+
+
+class ToolExcutionErrorType(BaseModel):
+ """MCP工具执行错误"""
+
+ type: ErrorType = Field(description="错误类型", default=ErrorType.MISSING_PARAM)
+ reason: str = Field(description="错误原因", default="")
+
+
class MCPSelectResult(BaseModel):
"""MCP选择结果"""
@@ -138,6 +159,12 @@ class MCPToolSelectResult(BaseModel):
name: str = Field(description="工具名称")
+class MCPToolIdsSelectResult(BaseModel):
+ """MCP工具ID选择结果"""
+
+ tool_ids: list[str] = Field(description="工具ID列表")
+
+
class MCPPlanItem(BaseModel):
"""MCP 计划"""
step_id: str = Field(description="步骤的ID", default="")
diff --git a/apps/schemas/message.py b/apps/schemas/message.py
index e7341324266c26447b9d36195141e09addc4520d..1f46ff578e55e30813761af3deab5b6d6dbfa018 100644
--- a/apps/schemas/message.py
+++ b/apps/schemas/message.py
@@ -84,7 +84,7 @@ class FlowStartContent(BaseModel):
"""flow.start消息的content"""
question: str = Field(description="用户问题")
- params: dict[str, Any] = Field(description="预先提供的参数")
+ params: dict[str, Any] | None = Field(description="预先提供的参数", default=None)
class MessageBase(HeartbeatData):
@@ -95,5 +95,5 @@ class MessageBase(HeartbeatData):
conversation_id: str = Field(min_length=36, max_length=36, alias="conversationId")
task_id: str = Field(min_length=36, max_length=36, alias="taskId")
flow: MessageFlow | None = None
- content: dict[str, Any] = {}
+ content: Any | None = Field(default=None, description="消息内容")
metadata: MessageMetadata
diff --git a/apps/schemas/pool.py b/apps/schemas/pool.py
index 27e16b370ec83acc11e1f435ac1da296fe2a9560..7df6dab8d98c6e1098271ce2adb93a5045de5fba 100644
--- a/apps/schemas/pool.py
+++ b/apps/schemas/pool.py
@@ -110,3 +110,6 @@ class AppPool(BaseData):
flows: list[AppFlow] = Field(description="Flow列表", default=[])
hashes: dict[str, str] = Field(description="关联文件的hash值", default={})
mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表")
+ llm_id: str = Field(
+ default="empty", alias="llmId", description="应用使用的大模型ID(如果有的话)"
+ )
diff --git a/apps/schemas/task.py b/apps/schemas/task.py
index eccc95a567be95d1270ffb7797313d07b6467318..336bfedc508e7558e92c73fac55e9e44c8092f3f 100644
--- a/apps/schemas/task.py
+++ b/apps/schemas/task.py
@@ -30,6 +30,7 @@ class FlowStepHistory(BaseModel):
step_status: StepStatus = Field(description="当前步骤状态")
input_data: dict[str, Any] = Field(description="当前Step执行的输入", default={})
output_data: dict[str, Any] = Field(description="当前Step执行后的结果", default={})
+ ex_data: dict[str, Any] | None = Field(description="额外数据", default=None)
created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3))
@@ -43,14 +44,13 @@ class ExecutorState(BaseModel):
flow_status: FlowStatus = Field(description="Flow状态", default=FlowStatus.INIT)
# 任务级数据
step_id: str = Field(description="当前步骤ID", default="")
+ step_index: int = Field(description="当前步骤索引", default=0)
step_name: str = Field(description="当前步骤名称", default="")
step_status: StepStatus = Field(description="当前步骤状态", default=StepStatus.UNKNOWN)
step_description: str = Field(description="当前步骤描述", default="")
app_id: str = Field(description="应用ID", default="")
current_input: dict[str, Any] = Field(description="当前输入数据", default={})
- params: dict[str, Any] = Field(description="补充的参数", default={})
- params_description: str = Field(description="补充的参数描述", default="")
- error_info: str = Field(description="错误信息", default="")
+ error_message: str = Field(description="错误信息", default="")
retry_times: int = Field(description="当前步骤重试次数", default=0)