diff --git a/apps/llm/generator.py b/apps/llm/generator.py index da5933d5f2344972103235fc5bc7fdc78b1b32ba..7ae83eba57b03cf9bfa57ab490ea3088386c83b5 100644 --- a/apps/llm/generator.py +++ b/apps/llm/generator.py @@ -10,11 +10,10 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from jsonschema import Draft7Validator -from apps.models import LanguageType, LLMType +from apps.models import LLMType from apps.schemas.llm import LLMFunctions from .llm import LLM -from .prompt import JSON_GEN from .token import token_calculator _logger = logging.getLogger(__name__) @@ -66,24 +65,17 @@ class JsonGenerator: def _build_messages( self, - functions: list[dict[str, Any]], + prompt: str, conversation: list[dict[str, str]], - language: LanguageType = LanguageType.CHINESE, ) -> list[dict[str, str]]: - """构建messages,提取query并使用JSON_GEN模板格式化""" + """构建messages,使用传入的prompt替换最后一条用户消息""" if conversation[-1]["role"] == "user": - query = conversation[-1]["content"] + # 验证最后一项是用户消息 + pass else: err = "[JSONGenerator] 对话历史中最后一项必须是用户消息" raise RuntimeError(err) - template = self._env.from_string(JSON_GEN[language]) - prompt = template.render( - query=query, - use_xml_format=False, - functions=functions, - ) - messages = [*conversation[:-1], {"role": "user", "content": prompt}] # 计算Token数量 @@ -111,8 +103,8 @@ class JsonGenerator: else: break - # 重新构建 messages 并计算 token - messages = [*trimmed_conversation, {"role": "user", "content": prompt}] + # 重新构建 messages 并计算 token,添加原始最后一条用户消息以保持完整对话 + messages = [*trimmed_conversation, conversation[-1], {"role": "user", "content": prompt}] token_count = token_calculator.calculate_token_length(messages) _logger.info( @@ -126,8 +118,8 @@ class JsonGenerator: async def _single_trial( self, function: dict[str, Any], + prompt: str, conversation: list[dict[str, str]], - language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: """单次尝试,包含校验逻辑;function使用OpenAI标准Function格式""" if self._llm is None: @@ -140,10 +132,10 @@ class JsonGenerator: # 执行生成 if self._support_function_call: # 如果支持FunctionCall - result = await self._call_with_function(function, conversation, language) + result = await self._call_with_function(function, prompt, conversation) else: # 如果不支持FunctionCall - result = await self._call_without_function(function, conversation, language) + result = await self._call_without_function(function, prompt, conversation) # 校验结果 try: @@ -162,15 +154,15 @@ class JsonGenerator: async def _call_with_function( self, function: dict[str, Any], + prompt: str, conversation: list[dict[str, str]], - language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: """使用FunctionCall方式调用""" if self._llm is None: err = "[JSONGenerator] 未初始化,请先调用init()方法" raise RuntimeError(err) - messages = self._build_messages([function], conversation, language) + messages = self._build_messages(prompt, conversation) tool = LLMFunctions( name=function["name"], @@ -192,15 +184,15 @@ class JsonGenerator: async def _call_without_function( self, function: dict[str, Any], + prompt: str, conversation: list[dict[str, str]], - language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: """不使用FunctionCall方式调用""" if self._llm is None: err = "[JSONGenerator] 未初始化,请先调用init()方法" raise RuntimeError(err) - messages = self._build_messages([function], conversation, language) + messages = self._build_messages(prompt, conversation) # 使用LLM的call方法获取响应 full_response = "" @@ -220,8 +212,8 @@ class JsonGenerator: async def generate( self, function: dict[str, Any], + prompt: str, conversation: list[dict[str, str]] | None = None, - language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: """生成JSON;function使用OpenAI标准Function格式""" if self._llm is None: @@ -239,7 +231,7 @@ class JsonGenerator: count += 1 try: # 如果_single_trial没有抛出异常,直接返回结果,不进行重试 - return await self._single_trial(function, retry_conversation, language) + return await self._single_trial(function, prompt, retry_conversation) except JsonValidationError as e: _logger.exception( "[JSONGenerator] 第 %d/%d 次尝试失败", diff --git a/apps/llm/prompt.py b/apps/llm/prompt.py index 62689ccce929ca9f517ebee3367ea0d2b516e445..44b618d1ae89ad9a2ef2d173e4e03b572aa29c6f 100644 --- a/apps/llm/prompt.py +++ b/apps/llm/prompt.py @@ -8,27 +8,27 @@ from apps.models import LanguageType JSON_GEN: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" - 你是一个智能助手,可以访问帮助回答用户查询的工具。 - 你的任务是使用可用的工具和背景信息来响应查询。 + 你是一个智能助手,可以调用帮助回答用户查询的函数。 + 你的任务是使用可用的函数和背景信息来响应查询。 - - 你可以访问能够帮助收集信息的工具 - - 逐步使用工具,每次使用都基于之前的结果{% if use_xml_format %} + - 你可以调用能够帮助收集信息的函数 + - 逐步调用函数,每次调用都基于之前的结果{% if use_xml_format %} - 用户的查询在 标签中提供 - - 使用 XML 样式的标签格式化工具调用,其中工具名称是根标签,每个参数是嵌套标签 - - 使用架构中指定的确切工具名称和参数名称 + - 使用 XML 样式的标签格式化函数调用,其中函数名称是根标签,每个参数是嵌套标签 + - 使用架构中指定的确切函数名称和参数名称 - 基本格式结构: - <工具名称> + <函数名称> <参数名称>值 - + - 参数类型: * 字符串:搜索文本 * 数字:10 * 布尔值:true * 数组(重复标签):项目1项目2 * 对象(嵌套标签):{% else %} - - 必须使用提供的工具来回答查询 - - 工具调用必须遵循工具架构中定义的JSON格式{% endif %} + - 必须调用提供的函数来回答查询 + - 函数调用必须遵循函数架构中定义的JSON格式{% endif %} {% if use_xml_format %} @@ -80,42 +80,42 @@ JSON_GEN: dict[LanguageType, str] = { {% endfor %} {% else %} - ## 可用工具 + ## 可用函数 {% for func in functions %} - **{{ func.name }}**: - - 工具描述:{{ func.description }} - - 工具参数Schema:{{ func.parameters }} + - 函数描述:{{ func.description }} + - 函数参数Schema:{{ func.parameters }} {% endfor %} - 请根据用户查询选择合适的工具来回答问题。你必须使用上述工具之一来处理查询。 + 请根据用户查询选择合适的函数来回答问题。你必须调用上述函数之一来处理查询。 **用户查询**: {{ query }}{% endif %} """, ), LanguageType.ENGLISH: dedent( r""" - You are an intelligent assistant with access to tools that help answer user queries. - Your task is to respond to queries using the available tools and background information. + You are an intelligent assistant with access to functions that help answer user queries. + Your task is to respond to queries using the available functions and background information. - - You have access to tools that can help gather information - - Use tools step-by-step, with each use informed by previous results{% if use_xml_format %} + - You have access to functions that can help gather information + - Call functions step-by-step, with each call informed by previous results{% if use_xml_format %} - The user's query is provided in the tags - - Format tool calls using XML-style tags where the tool name is the root tag \ + - Format function calls using XML-style tags where the function name is the root tag \ and each parameter is a nested tag - - Use the exact tool name and parameter names as specified in the schema + - Use the exact function name and parameter names as specified in the schema - Basic format structure: - + value - + - Parameter types: * String: search text * Number: 10 * Boolean: true * Array (repeat tags): item1item2 * Object (nest tags): value{% else %} - - You must use the provided tools to answer the query - - Tool calls must follow the JSON format defined in the tool schemas{% endif %} + - You must call the provided functions to answer the query + - Function calls must follow the JSON format defined in the function schemas{% endif %} {% if use_xml_format %} @@ -167,13 +167,13 @@ and each parameter is a nested tag {% endfor %} {% else %} - ## Available Tools + ## Available Functions {% for func in functions %} - **{{ func.name }}**: {{ func.description }} {% endfor %} - Please select the appropriate tool(s) from above to answer the user's query. - You must use one of the tools listed above to process the query. + Please select the appropriate function(s) from above to answer the user's query. + You must call one of the functions listed above to process the query. **User Query**: {{ query }}{% endif %} """, diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index c677d70573d57a8e066f2a9b4a15a10d9665542e..9c53725654e04a13fb56e43b34eca6beab3de49e 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -8,6 +8,7 @@ Core Call类是定义了所有Call都应具有的方法和参数的PyDantic类 import logging import uuid from collections.abc import AsyncGenerator +from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar, Self from pydantic import BaseModel, ConfigDict, Field @@ -82,6 +83,23 @@ class CoreCall(BaseModel): raise NotImplementedError(err) + def _load_prompt(self, prompt_id: str) -> str: + """ + 从Markdown文件加载提示词 + + :param prompt_id: 提示词ID,例如 "domain", "facts" 等 + :return: 提示词内容 + """ + # 自动获取language + language = self._sys_vars.language.value + + # 组装Prompt文件路径: prompt_id.language.md (例如: domain.en.md) + filename = f"{prompt_id}.{language}.md" + prompt_dir = Path(__file__).parent.parent.parent / "data" / "prompts" / "call" + prompt_file = prompt_dir / filename + return prompt_file.read_text(encoding="utf-8") + + @staticmethod def _assemble_call_vars(executor: "StepExecutor") -> CallVars: """组装CallVars""" diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py index e32642821c217a8a84da066fc68a222d4fec8d21..783e39309c872038c13d36ba7df11fa2b1781c98 100644 --- a/apps/scheduler/call/facts/facts.py +++ b/apps/scheduler/call/facts/facts.py @@ -16,7 +16,7 @@ from apps.schemas.scheduler import CallInfo, CallOutputChunk, CallVars from apps.services.tag import TagManager from apps.services.user_tag import UserTagManager -from .prompt import DOMAIN_FUNCTION, DOMAIN_PROMPT, FACTS_FUNCTION, FACTS_PROMPT +from .func import DOMAIN_FUNCTION, FACTS_FUNCTION from .schema import ( DomainGen, FactsGen, @@ -80,19 +80,17 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): """执行工具""" data = FactsInput(**input_data) - # 组装conversation消息 - facts_prompt = FACTS_PROMPT[self._sys_vars.language] - facts_conversation = [ - {"role": "system", "content": "You are a helpful assistant."}, - *data.message, - {"role": "user", "content": facts_prompt}, - ] + # 加载并组装facts提示词 + facts_prompt = self._load_prompt("facts") # 提取事实信息 facts_result = await json_generator.generate( function=FACTS_FUNCTION[self._sys_vars.language], - conversation=facts_conversation, - language=self._sys_vars.language, + conversation=[ + {"role": "system", "content": "You are a helpful assistant."}, + *data.message, + ], + prompt=facts_prompt, ) facts_obj = FactsGen.model_validate(facts_result) @@ -100,26 +98,25 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): all_tags = await TagManager.get_all_tag() tag_names = [tag.name for tag in all_tags] + # 加载domain提示词模板 + domain_prompt_template_str = self._load_prompt("domain") + # 使用jinja2渲染DOMAIN_PROMPT jinja_env = SandboxedEnvironment( loader=BaseLoader(), autoescape=False, ) - domain_prompt_template = jinja_env.from_string(DOMAIN_PROMPT[self._sys_vars.language]) + domain_prompt_template = jinja_env.from_string(domain_prompt_template_str) domain_prompt = domain_prompt_template.render(available_keywords=tag_names) - # 组装conversation消息 - domain_conversation = [ - {"role": "system", "content": "You are a helpful assistant."}, - *data.message, - {"role": "user", "content": domain_prompt}, - ] - # 更新用户画像 domain_result = await json_generator.generate( function=DOMAIN_FUNCTION[self._sys_vars.language], - conversation=domain_conversation, - language=self._sys_vars.language, + conversation=[ + {"role": "system", "content": "You are a helpful assistant."}, + *data.message, + ], + prompt=domain_prompt, ) domain_list = DomainGen.model_validate(domain_result) diff --git a/apps/scheduler/call/facts/prompt.py b/apps/scheduler/call/facts/func.py similarity index 31% rename from apps/scheduler/call/facts/prompt.py rename to apps/scheduler/call/facts/func.py index a2420f87e951a3c14d4d4cb2ac277cead093f953..d66be4f17b3a00692f6d2569a341a48a6d0114d9 100644 --- a/apps/scheduler/call/facts/prompt.py +++ b/apps/scheduler/call/facts/func.py @@ -1,154 +1,10 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """记忆提取工具的提示词""" -from textwrap import dedent from typing import Any from apps.models import LanguageType -DOMAIN_PROMPT: dict[LanguageType, str] = { - LanguageType.CHINESE: dedent( - r""" - # 任务说明 - 根据对话历史,从下面的"备选提示词列表"中选择最合适的标签。这些标签将用于内容推荐、用户画像构建和个性化服务。 - - ## 备选提示词列表 - {{ available_keywords }} - - ## 选择要求 - - 1. **精准匹配**:只能从备选提示词列表中选择,不要自创新标签 - 2. **话题相关性**:选择与对话主题直接相关的标签 - 3. **数量控制**:选择3-8个最相关的标签 - 4. **质量标准**: - - 避免选择重复或高度相似的标签 - - 优先选择具有区分度的标签 - - 按相关性从高到低排序 - 5. **输出格式**:返回JSON对象,包含keywords字段,值为字符串数组 - - ## 示例 - - 假设备选提示词列表包含: - ["北京", "上海", "天气", "气温", "Python", "Java", "装饰器", "设计模式", "餐厅", "美食"] - - **示例1:天气查询** - - 用户:"北京天气如何?" - - 助手:"北京今天晴。" - - 选择结果:["北京", "天气", "气温"] - - **示例2:如果对话内容与备选列表无关** - - 用户:"今天心情不错" - - 助手:"很高兴听到这个消息。" - - 选择结果:[](如果备选列表中没有相关标签,返回空数组) - """, - ), - LanguageType.ENGLISH: dedent( - r""" - # Task Description - Based on conversation history, select the most appropriate tags from the "Available Keywords List" below. \ -These tags will be used for content recommendation, user profiling, and personalized services. - - ## Available Keywords List - {available_keywords} - - ## Selection Requirements - - 1. **Exact Match**: Only select from the available keywords list, do not create new tags - 2. **Topic Relevance**: Select tags directly related to the conversation topic - 3. **Quantity Control**: Select 3-8 most relevant tags - 4. **Quality Standards**: - - Avoid selecting duplicate or highly similar tags - - Prioritize selecting distinctive tags - - Sort by relevance from high to low - 5. **Output Format**: Return JSON object containing keywords field with string array value - - ## Examples - - Assume the available keywords list contains: - ["Beijing", "Shanghai", "weather", "temperature", "Python", "Java", "decorator", "design pattern", \ -"restaurant", "food"] - - **Example 1: Weather Query** - - User: "What's the weather like in Beijing?" - - Assistant: "Beijing is sunny today." - - Selection result: ["Beijing", "weather", "temperature"] - - **Example 2: If conversation content is unrelated to the available list** - - User: "I'm feeling good today" - - Assistant: "Glad to hear that." - - Selection result: [] (return empty array if no relevant tags in the available list) - """, - ), -} - -FACTS_PROMPT: dict[str, str] = { - LanguageType.CHINESE: dedent( - r""" - # 任务说明 - 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 - - ## 关注的信息类型 - - 1. **实体**:对话中涉及到的实体。例如:姓名、地点、组织、事件等 - 2. **偏好**:对待实体的态度。例如喜欢、讨厌等 - 3. **关系**:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等 - 4. **动作**:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等 - - ## 提取要求 - - 1. 事实必须准确,只能从对话中提取 - 2. 事实必须清晰、简洁、易于理解,每条事实少于30个字 - 3. 输出格式:返回JSON对象,包含facts字段,值为字符串数组 - - ## 示例 - - **示例1:景点查询** - - 用户:"杭州西湖有哪些景点?" - - 助手:"西湖周围有许多著名的景点,包括苏堤、白堤、断桥、三潭印月等。" - - 提取结果:["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] - - **示例2:用户偏好** - - 用户:"我喜欢看科幻电影" - - 助手:"科幻电影确实很吸引人,比如《星际穿越》等。" - - 提取结果:["用户喜欢看科幻电影", "用户可能对《星际穿越》感兴趣"] - """, - ), - LanguageType.ENGLISH: dedent( - r""" - # Task Description - Extract key information from the conversation and organize it into unique, easily understandable facts, \ -including user preferences, relationships, entities, etc. - - ## Information Types to Focus On - - 1. **Entities**: Entities involved in the conversation. For example: names, locations, organizations, \ -events, etc. - 2. **Preferences**: Attitudes towards entities. For example: like, dislike, etc. - 3. **Relationships**: Relationships between users and entities, or between two entities. For example: \ -include, parallel, mutually exclusive, etc. - 4. **Actions**: Specific actions that affect entities. For example: query, search, browse, click, etc. - - ## Extraction Requirements - - 1. Facts must be accurate and can only be extracted from the conversation - 2. Facts must be clear, concise, and easy to understand, each fact less than 30 words - 3. Output format: Return JSON object containing facts field with string array value - - ## Examples - - **Example 1: Attraction Query** - - User: "What are the attractions in Hangzhou West Lake?" - - Assistant: "Notable attractions include Su Causeway, Bai Causeway, Broken Bridge, etc." - - Extraction result: ["Hangzhou West Lake has Su Causeway, Bai Causeway, Broken Bridge, etc."] - - **Example 2: User Preference** - - User: "I like watching sci-fi movies" - - Assistant: "Sci-fi movies are indeed attractive, such as Interstellar." - - Extraction result: ["User likes watching sci-fi movies", "User may be interested in Interstellar"] - """, - ), -} - DOMAIN_FUNCTION: dict[LanguageType, dict[str, Any]] = { LanguageType.CHINESE: { "name": "extract_domain", diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 2a96f920f479f4dd0663c8f098e5efd269fffb1f..b1b309a09f8b0cb6475c47e5f8de0fd497e56880 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -3,11 +3,9 @@ import logging import uuid -from typing import cast -import anyio from mcp.types import TextContent -from pydantic import Field +from pydantic import Field, ValidationError from apps.constants import AGENT_FINAL_STEP_NAME, AGENT_MAX_RETRY_TIMES, AGENT_MAX_STEPS from apps.models import ExecutorHistory, ExecutorStatus, MCPTools, StepStatus @@ -18,8 +16,9 @@ from apps.scheduler.mcp_agent.plan import MCPPlanner from apps.scheduler.pool.mcp.pool import mcp_pool from apps.schemas.enum_var import EventType from apps.schemas.flow import AgentAppMetadata -from apps.schemas.mcp import Step +from apps.schemas.mcp import MCPRiskConfirm, Step from apps.schemas.message import FlowParams +from apps.schemas.task import AgentHistoryExtra from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager from apps.services.user import UserManager @@ -40,12 +39,13 @@ class MCPAgentExecutor(BaseExecutor): async def init(self) -> None: """初始化MCP Agent""" # 初始化必要变量 - self._step_cnt = 0 - self._retry_times = 0 - self._mcp_list = [] - self._current_input = {} - self._current_tool = None - self._tool_list = {} + self._step_cnt: int = 0 + self._retry_times: int = 0 + self._mcp_list: list = [] + self._current_input: dict = {} + self._current_tool: MCPTools | None = None + self._current_goal: str = "" + self._tool_list: dict[str, MCPTools] = {} # 初始化MCP Host相关对象 self._planner = MCPPlanner(self.task) self._host = MCPHost(self.task) @@ -104,25 +104,19 @@ class MCPAgentExecutor(BaseExecutor): inputSchema={}, outputSchema={}, ) - def _validate_task_state(self) -> None: - """验证任务状态是否存在""" + async def get_tool_input_param(self, *, is_first: bool) -> None: + """获取工具输入参数""" if not self.task.state: err = "[MCPAgentExecutor] 任务状态不存在" _logger.error(err) raise RuntimeError(err) - async def get_tool_input_param(self, *, is_first: bool) -> None: - """获取工具输入参数""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - if is_first: # 获取第一个输入参数 - self._current_tool = self._tool_list[state.stepName] + self._current_tool = self._tool_list[self.task.state.stepName] # 更新host的task引用以确保使用最新的context - self._host.task = self.task self._current_input = await self._host.get_first_input_params( - self._current_tool, self.task.runtime.userInput, + self._current_tool, self.task, ) else: # 获取后续输入参数 @@ -132,23 +126,14 @@ class MCPAgentExecutor(BaseExecutor): else: params = {} params_description = "" - self._current_tool = self._tool_list[state.stepName] + self._current_tool = self._tool_list[self.task.state.stepName] self._current_input = await self._host.fill_params( self._current_tool, - self.task.runtime.userInput, + self.task, self._current_input, - state.errorMessage, params, params_description, ) - self.task.state = state - - def _validate_current_tool(self) -> None: - """验证当前工具是否存在""" - if self._current_tool is None: - err = "[MCPAgentExecutor] 当前工具不存在" - _logger.error(err) - raise RuntimeError(err) def _get_error_message_str(self, error_message: dict | str | None) -> str: """将错误消息转换为字符串""" @@ -158,28 +143,42 @@ class MCPAgentExecutor(BaseExecutor): def _update_last_context_status(self, step_status: StepStatus, output_data: dict | None = None) -> None: """更新最后一个context的状态(如果是当前步骤)""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - if len(self.task.context) and self.task.context[-1].stepId == state.stepId: + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: self.task.context[-1].stepStatus = step_status if output_data is not None: self.task.context[-1].outputData = output_data + def _set_step_error(self, error_msg: str, data: dict | None = None) -> None: + """设置步骤状态为ERROR""" + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + self.task.state.stepStatus = StepStatus.ERROR + self.task.state.errorMessage = { + "err_msg": error_msg, + "data": data if data is not None else self._current_input, + } + async def _add_error_to_context(self, step_status: StepStatus) -> None: """添加错误到context,先移除重复的步骤再添加""" self._remove_last_context_if_same_step() self.task.context.append( - self._create_executor_history(step_status=step_status), + self._create_executor_history(step_status=step_status, step_goal=self._current_goal), ) async def _handle_final_step(self) -> None: """处理最终步骤,设置状态并总结""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - state.executorStatus = ExecutorStatus.SUCCESS - state.stepStatus = StepStatus.SUCCESS - self.task.state = state - await self._push_message(EventType.EXECUTOR_STOP, data={}) + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + self.task.state.executorStatus = ExecutorStatus.SUCCESS + self.task.state.stepStatus = StepStatus.SUCCESS await self.summarize() def _create_executor_history( @@ -187,42 +186,50 @@ class MCPAgentExecutor(BaseExecutor): step_status: StepStatus, input_data: dict | None = None, output_data: dict | None = None, - extra_data: dict | None = None, + step_goal: str = "", ) -> ExecutorHistory: """创建ExecutorHistory对象""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - current_tool = cast("MCPTools", self._current_tool) + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + + # 使用AgentHistoryExtra数据结构保存extra_data + extra_data = AgentHistoryExtra(step_goal=step_goal).model_dump() + return ExecutorHistory( taskId=self.task.metadata.id, - stepId=state.stepId, - stepName=current_tool.toolName if self._current_tool else state.stepName, - stepType=str(current_tool.id) if self._current_tool else "", + stepId=self.task.state.stepId, + stepName=self._current_tool.toolName if self._current_tool else self.task.state.stepName, + stepType=str(self._current_tool.id) if self._current_tool else "", stepStatus=step_status, - executorId=state.executorId, - executorName=state.executorName, - executorStatus=state.executorStatus, + executorId=self.task.state.executorId, + executorName=self.task.state.executorName, + executorStatus=self.task.state.executorStatus, inputData=input_data or {}, outputData=output_data or {}, - extraData=extra_data or {}, + extraData=extra_data, ) def _remove_last_context_if_same_step(self) -> None: """如果最后一个context是当前步骤,则删除它""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - if len(self.task.context) and self.task.context[-1].stepId == state.stepId: + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: del self.task.context[-1] async def _handle_step_error_and_continue(self) -> None: """处理步骤错误并继续下一步""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - error_output = {"message": state.errorMessage} + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + error_output = {"message": self.task.state.errorMessage} # 先更新stepStatus - state.stepStatus = StepStatus.ERROR - self.task.state = state + self.task.state.stepStatus = StepStatus.ERROR await self._push_message( EventType.STEP_OUTPUT, @@ -230,7 +237,7 @@ class MCPAgentExecutor(BaseExecutor): ) # 更新或添加错误到context - if len(self.task.context) and self.task.context[-1].stepId == state.stepId: + if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: self._update_last_context_status(StepStatus.ERROR, error_output) else: self.task.context.append( @@ -238,90 +245,68 @@ class MCPAgentExecutor(BaseExecutor): step_status=StepStatus.ERROR, input_data=self._current_input, output_data=error_output, + step_goal=self._current_goal, ), ) await self.get_next_step() async def confirm_before_step(self) -> None: """确认前步骤""" - self._validate_task_state() - self._validate_current_tool() - state = cast("ExecutorCheckpoint", self.task.state) - current_tool = cast("MCPTools", self._current_tool) + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + if self._current_tool is None: + err = "[MCPAgentExecutor] 当前工具不存在" + _logger.error(err) + raise RuntimeError(err) - confirm_message = await self._planner.get_tool_risk(current_tool, self._current_input, "") + confirm_message = await self._planner.get_tool_risk(self._current_tool, self._current_input, "") # 先更新状态 - state.executorStatus = ExecutorStatus.WAITING - state.stepStatus = StepStatus.WAITING - self.task.state = state + self.task.state.executorStatus = ExecutorStatus.WAITING + self.task.state.stepStatus = StepStatus.WAITING await self._push_message( EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), ) self.task.context.append( self._create_executor_history( - step_status=state.stepStatus, - extra_data=confirm_message.model_dump(exclude_none=True, by_alias=True), + step_status=self.task.state.stepStatus, + step_goal=self._current_goal, ), ) async def run_step(self) -> None: """执行步骤""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - state.executorStatus = ExecutorStatus.RUNNING - state.stepStatus = StepStatus.RUNNING - self.task.state = state - self._validate_current_tool() - current_tool = cast("MCPTools", self._current_tool) - - mcp_client = await mcp_pool.get(current_tool.mcpId, self.task.metadata.userId) - if not mcp_client: - _logger.exception("[MCPAgentExecutor] MCP客户端不存在: %s", current_tool.mcpId) - state.stepStatus = StepStatus.ERROR - state.errorMessage = { - "err_msg": f"MCP客户端不存在: {current_tool.mcpId}", - "data": self._current_input, - } - self.task.state = state - return + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + self.task.state.executorStatus = ExecutorStatus.RUNNING + self.task.state.stepStatus = StepStatus.RUNNING + if self._current_tool is None: + err = "[MCPAgentExecutor] 当前工具不存在" + _logger.error(err) + raise RuntimeError(err) try: - output_data = await mcp_client.call_tool(current_tool.toolName, self._current_input) - except anyio.ClosedResourceError as e: - _logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", current_tool.mcpId) - # 停止当前用户MCP进程 - await mcp_pool.stop(current_tool.mcpId, self.task.metadata.userId) - state.stepStatus = StepStatus.ERROR - state.errorMessage = { - "err_msg": str(e), - "data": self._current_input, - } - self.task.state = state - return + mcp_client = await mcp_pool.get(self._current_tool.mcpId, self.task.metadata.userId) + output_data = await mcp_client.call_tool(self._current_tool.toolName, self._current_input) except Exception as e: - _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", state.stepName) - state.stepStatus = StepStatus.ERROR - state.errorMessage = { - "err_msg": str(e), - "data": self._current_input, - } - self.task.state = state + _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", self.task.state.stepName) + # 统一停止MCP进程,不再区分异常类型 + await mcp_pool.stop(self._current_tool.mcpId, self.task.metadata.userId) + self._set_step_error(str(e)) return - _logger.error("当前工具名称: %s, 输出参数: %s", state.stepName, output_data) + _logger.info("[MCPAgentExecutor] 当前工具名称: %s, 输出参数: %s", self.task.state.stepName, output_data) if output_data.isError: err = "" for output in output_data.content: if isinstance(output, TextContent): err += output.text - state.stepStatus = StepStatus.ERROR - state.errorMessage = { - "err_msg": err, - "data": {}, - } - self.task.state = state + self._set_step_error(err, {}) return message = "" @@ -333,8 +318,7 @@ class MCPAgentExecutor(BaseExecutor): } # 先更新状态为成功 - state.stepStatus = StepStatus.SUCCESS - self.task.state = state + self.task.state.stepStatus = StepStatus.SUCCESS await self._push_message(EventType.STEP_INPUT, self._current_input) await self._push_message(EventType.STEP_OUTPUT, output_data) @@ -343,55 +327,57 @@ class MCPAgentExecutor(BaseExecutor): step_status=StepStatus.SUCCESS, input_data=self._current_input, output_data=output_data, + step_goal=self._current_goal, ), ) async def generate_params_with_null(self) -> None: """生成参数补充""" - self._validate_task_state() - self._validate_current_tool() - state = cast("ExecutorCheckpoint", self.task.state) - current_tool = cast("MCPTools", self._current_tool) + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + if self._current_tool is None: + err = "[MCPAgentExecutor] 当前工具不存在" + _logger.error(err) + raise RuntimeError(err) params_with_null = await self._planner.get_missing_param( - current_tool, + self._current_tool, self._current_input, - state.errorMessage, + self.task.state.errorMessage, ) # TODO - error_msg = self._get_error_message_str(state.errorMessage) + error_msg = self._get_error_message_str(self.task.state.errorMessage) # 先更新状态 - state.executorStatus = ExecutorStatus.WAITING - state.stepStatus = StepStatus.PARAM - self.task.state = state + self.task.state.executorStatus = ExecutorStatus.WAITING + self.task.state.stepStatus = StepStatus.PARAM await self._push_message( EventType.STEP_WAITING_FOR_PARAM, data={"message": error_msg, "params": params_with_null}, ) self.task.context.append( self._create_executor_history( - step_status=state.stepStatus, - extra_data={ - "message": error_msg, - "params": params_with_null, - }, + step_status=self.task.state.stepStatus, + step_goal=self._current_goal, ), ) async def get_next_step(self) -> None: """获取下一步""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) if self._step_cnt < AGENT_MAX_STEPS: self._step_cnt += 1 - history = await self._host.assemble_memory(self.task.runtime, self.task.context) max_retry = 3 step = None for _ in range(max_retry): try: - step = await self._planner.create_next_step(history, list(self._tool_list.values())) + step = await self._planner.create_next_step(list(self._tool_list.values()), self.task) if step.tool_name in self._tool_list: break except Exception: @@ -401,77 +387,100 @@ class MCPAgentExecutor(BaseExecutor): tool_name=AGENT_FINAL_STEP_NAME, description=AGENT_FINAL_STEP_NAME, ) - state.stepId = uuid.uuid4() - state.stepName = step.tool_name - state.stepStatus = StepStatus.INIT + self.task.state.stepId = uuid.uuid4() + self.task.state.stepName = step.tool_name + self.task.state.stepStatus = StepStatus.INIT + # 保存步骤目标 + self._current_goal = step.description else: # 没有下一步了,结束流程 - state.stepName = AGENT_FINAL_STEP_NAME - self.task.state = state + self.task.state.stepName = AGENT_FINAL_STEP_NAME + self._current_goal = AGENT_FINAL_STEP_NAME async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) - error_output = {"message": state.errorMessage} - state.stepStatus = StepStatus.ERROR - state.executorStatus = ExecutorStatus.ERROR - self.task.state = state + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) + error_output = {"message": self.task.state.errorMessage} + self.task.state.stepStatus = StepStatus.ERROR + self.task.state.executorStatus = ExecutorStatus.ERROR await self._push_message(EventType.STEP_OUTPUT, data=error_output) - await self._push_message(EventType.EXECUTOR_STOP, data={}) - await self._add_error_to_context(state.stepStatus) + await self._add_error_to_context(self.task.state.stepStatus) async def work(self) -> None: # noqa: C901, PLR0912 """执行当前步骤""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) - if state.stepStatus == StepStatus.INIT: + # 处理初始化状态 + if self.task.state.stepStatus == StepStatus.INIT: await self.get_tool_input_param(is_first=True) if not self._user.autoExecute: # 等待用户确认 await self.confirm_before_step() return - state.stepStatus = StepStatus.RUNNING - self.task.state = state - elif state.stepStatus in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: - if state.stepStatus == StepStatus.PARAM: - self._remove_last_context_if_same_step() - await self.get_tool_input_param(is_first=False) - elif state.stepStatus == StepStatus.WAITING: - if self.params: - self._remove_last_context_if_same_step() - else: - state.executorStatus = ExecutorStatus.CANCELLED - state.stepStatus = StepStatus.CANCELLED - self.task.state = state - - await self._push_message(EventType.STEP_OUTPUT, data={}) - await self._push_message(EventType.EXECUTOR_STOP, data={}) - self._update_last_context_status(StepStatus.CANCELLED) - return - max_retry = 5 - for i in range(max_retry): + self.task.state.stepStatus = StepStatus.RUNNING + + # 处理参数补充状态 + if self.task.state.stepStatus == StepStatus.PARAM: + self._remove_last_context_if_same_step() + await self.get_tool_input_param(is_first=False) + + # 处理等待状态 + if self.task.state.stepStatus == StepStatus.WAITING: + should_cancel = False + if self.params: + # 解析风险确认参数 + try: + risk_confirm = MCPRiskConfirm.model_validate(self.params) + if risk_confirm.confirm: + # 用户确认继续执行 + self._remove_last_context_if_same_step() + else: + # 用户拒绝执行 + should_cancel = True + except ValidationError as e: + _logger.warning("[MCPAgentExecutor] 解析风险确认参数失败: %s, 取消执行", e) + should_cancel = True + else: + # 没有参数则取消执行 + should_cancel = True + + if should_cancel: + self.task.state.executorStatus = ExecutorStatus.CANCELLED + self.task.state.stepStatus = StepStatus.CANCELLED + await self._push_message(EventType.STEP_OUTPUT, data={}) + self._update_last_context_status(StepStatus.CANCELLED) + return + + # 执行步骤(PARAM、WAITING、RUNNING 状态都需要执行) + if self.task.state.stepStatus in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: + for i in range(AGENT_MAX_RETRY_TIMES): if i != 0: await self.get_tool_input_param(is_first=True) await self.run_step() - if state.stepStatus == StepStatus.SUCCESS: + if self.task.state.stepStatus == StepStatus.SUCCESS: break - elif state.stepStatus == StepStatus.ERROR: - # 错误处理 + + # 处理错误状态 + elif self.task.state.stepStatus == StepStatus.ERROR: if self._retry_times >= AGENT_MAX_RETRY_TIMES: await self.error_handle_after_step() elif self._user.autoExecute: await self._handle_step_error_and_continue() else: - mcp_tool = self._tool_list[state.stepName] - error_msg = self._get_error_message_str(state.errorMessage) + mcp_tool = self._tool_list[self.task.state.stepName] + error_msg = self._get_error_message_str(self.task.state.errorMessage) is_param_error = await self._planner.is_param_error( - await self._host.assemble_memory(self.task.runtime, self.task.context), + self.task, error_msg, mcp_tool, - "", + self._current_goal, self._current_input, ) if is_param_error.is_param_error: @@ -479,14 +488,16 @@ class MCPAgentExecutor(BaseExecutor): await self.generate_params_with_null() else: await self._handle_step_error_and_continue() - elif state.stepStatus == StepStatus.SUCCESS: + + # 处理成功状态 + elif self.task.state.stepStatus == StepStatus.SUCCESS: await self.get_next_step() async def summarize(self) -> None: """总结""" thinking_started = False async for chunk in self._planner.generate_answer( - await self._host.assemble_memory(self.task.runtime, self.task.context), + self.task, self.llm, ): if chunk.reasoning_content: @@ -528,48 +539,50 @@ class MCPAgentExecutor(BaseExecutor): async def run(self) -> None: """执行MCP Agent的主逻辑""" - self._validate_task_state() - state = cast("ExecutorCheckpoint", self.task.state) + if not self.task.state: + err = "[MCPAgentExecutor] 任务状态不存在" + _logger.error(err) + raise RuntimeError(err) # 初始化MCP服务 await self.load_mcp() - if state.executorStatus == ExecutorStatus.INIT: + if self.task.state.executorStatus == ExecutorStatus.INIT: # 初始化状态 - state.executorId = str(uuid.uuid4()) - state.executorName = (await self._planner.get_flow_name()).flow_name + self.task.state.executorId = str(uuid.uuid4()) + self.task.state.executorName = (await self._planner.get_flow_name()).flow_name await self.get_next_step() - self.task.state = state - state.executorStatus = ExecutorStatus.RUNNING - self.task.state = state - - if state.stepName == AGENT_FINAL_STEP_NAME: - # 如果已经是最后一步,直接结束 - await self._handle_final_step() - return + self.task.state.executorStatus = ExecutorStatus.RUNNING try: - while state.executorStatus == ExecutorStatus.RUNNING: - await self.work() - - if state.stepName == AGENT_FINAL_STEP_NAME: + if self.task.state.stepName == AGENT_FINAL_STEP_NAME: # 如果已经是最后一步,直接结束 await self._handle_final_step() + else: + while self.task.state.executorStatus == ExecutorStatus.RUNNING: + await self.work() + if self.task.state.stepName == AGENT_FINAL_STEP_NAME: + # 如果已经是最后一步,直接结束 + await self._handle_final_step() + break except Exception as e: _logger.exception("[MCPAgentExecutor] 执行过程中发生错误") - state.executorStatus = ExecutorStatus.ERROR - state.errorMessage = { + self.task.state.executorStatus = ExecutorStatus.ERROR + self.task.state.errorMessage = { "err_msg": str(e), "data": {}, } - state.stepStatus = StepStatus.ERROR - self.task.state = state - error_output = {"message": state.errorMessage} + self.task.state.stepStatus = StepStatus.ERROR + error_output = {"message": self.task.state.errorMessage} await self._push_message(EventType.STEP_OUTPUT, data=error_output) - await self._push_message(EventType.EXECUTOR_STOP, data={}) - await self._add_error_to_context(state.stepStatus) + await self._add_error_to_context(self.task.state.stepStatus) finally: + # 统一推送EXECUTOR_STOP消息 + if self.task.state.executorStatus != ExecutorStatus.RUNNING: + await self._push_message(EventType.EXECUTOR_STOP, data={}) + + # 清理MCP客户端 for mcp_service in self._mcp_list: try: await mcp_pool.stop(mcp_service.id, self.task.metadata.userId) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 9de9e7e42c77f5afbf9a9f7c552be0b6b1a66150..6630a4b6b87be1e5f2a30f3ede72ec5639e5540a 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -77,11 +77,11 @@ class FlowExecutor(BaseExecutor): # 尝试恢复State if ( - self.state - and self.state.executorStatus != ExecutorStatus.INIT + not self.task.state + or self.task.state.executorStatus == ExecutorStatus.INIT ): # 创建ExecutorState - self.state = ExecutorCheckpoint( + self.task.state = ExecutorCheckpoint( taskId=self.task.metadata.id, appId=self.post_body_app.app_id, executorId=str(self.flow_id), @@ -142,20 +142,25 @@ class FlowExecutor(BaseExecutor): async def _find_flow_next(self) -> list[StepQueueItem]: """在当前步骤执行前,尝试获取下一步""" + if not self.task.state: + err = "[FlowExecutor] 任务状态不存在" + logger.error(err) + raise RuntimeError(err) + # 如果当前步骤为结束,则直接返回 - if self.state.stepId == "end" or not self.state.stepId: + if self.task.state.stepId == "end" or not self.task.state.stepId: return [] if self.current_step.step.type == SpecialCallType.CHOICE.value: # 如果是choice节点,获取分支ID branch_id = self.task.context[-1].outputData["branch_id"] if branch_id: - next_steps = await self._find_next_id(str(self.state.stepId) + "." + branch_id) + next_steps = await self._find_next_id(str(self.task.state.stepId) + "." + branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) else: logger.warning("[FlowExecutor] 没有找到分支ID,返回空列表") return [] else: - next_steps = await self._find_next_id(self.state.stepId) + next_steps = await self._find_next_id(self.task.state.stepId) # 如果step没有任何出边,直接跳到end if not next_steps: return [ @@ -183,10 +188,15 @@ class FlowExecutor(BaseExecutor): """ logger.info("[FlowExecutor] 运行工作流") + if not self.task.state: + err = "[FlowExecutor] 任务状态不存在" + logger.error(err) + raise RuntimeError(err) + # 获取首个步骤 first_step = StepQueueItem( - step_id=self.state.stepId, - step=self.flow.steps[self.state.stepId], + step_id=self.task.state.stepId, + step=self.flow.steps[self.task.state.stepId], ) # 头插开始前的系统步骤,并执行 @@ -203,13 +213,13 @@ class FlowExecutor(BaseExecutor): # 插入首个步骤 self.step_queue.append(first_step) - self.state.executorStatus = ExecutorStatus.RUNNING + self.task.state.executorStatus = ExecutorStatus.RUNNING # 运行Flow(未达终点) is_error = False while not self._reached_end: # 如果当前步骤出错,执行错误处理步骤 - if self.state.stepStatus == StepStatus.ERROR: + if self.task.state.stepStatus == StepStatus.ERROR: logger.warning("[FlowExecutor] Executor出错,执行错误处理步骤") self.step_queue.clear() self.step_queue.appendleft( @@ -227,7 +237,7 @@ class FlowExecutor(BaseExecutor): params={ "user_prompt": FLOW_ERROR_PROMPT[self.task.runtime.language].replace( "{{ error_info }}", - self.state.errorMessage["err_msg"], + self.task.state.errorMessage["err_msg"], ), }, ), @@ -252,9 +262,9 @@ class FlowExecutor(BaseExecutor): # 更新Task状态 if is_error: - self.state.executorStatus = ExecutorStatus.ERROR + self.task.state.executorStatus = ExecutorStatus.ERROR else: - self.state.executorStatus = ExecutorStatus.SUCCESS + self.task.state.executorStatus = ExecutorStatus.SUCCESS # 尾插运行结束后的系统步骤 for step in FIXED_STEPS_AFTER_END: diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py index 8ae8b5051d841363ad605960de920c6850f4e317..420193bbac2c76615586b5929b9517d5b8c30cd4 100644 --- a/apps/scheduler/mcp/host.py +++ b/apps/scheduler/mcp/host.py @@ -48,11 +48,11 @@ class MCPHost: logger.warning("用户 %s 未启用MCP %s", self._user_id, mcp_id) return None - # 获取MCP配置 + # 获取MCP配置(如果失败会抛出RuntimeError) try: return await mcp_pool.get(mcp_id, self._user_id) - except KeyError: - logger.warning("用户 %s 的MCP %s 没有运行中的实例,请检查环境", self._user_id, mcp_id) + except (KeyError, RuntimeError) as e: + logger.warning("获取MCP客户端失败: %s", e) return None @@ -161,12 +161,8 @@ class MCPHost: async def call_tool(self, tool: MCPTools, plan_item: MCPPlanItem) -> list[dict[str, Any]]: """调用工具""" - # 拿到Client + # 拿到Client(如果失败会抛出异常) client = await mcp_pool.get(tool.mcpId, self._user_id) - if client is None: - err = f"[MCPHost] MCP Server不合法: {tool.mcpId}" - logger.error(err) - raise ValueError(err) # 填充参数 params = await self._fill_params(tool, plan_item.instruction) diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py index afcd1f440f3baeb86472bfea5046ab7c1af15450..1c97ad28b9a21583438d34b6c216d2172484ef0d 100644 --- a/apps/scheduler/mcp/prompt.py +++ b/apps/scheduler/mcp/prompt.py @@ -394,7 +394,7 @@ MEMORY_TEMPLATE: dict[str, str] = { LanguageType.CHINESE: dedent( r""" {% if msg_type == "user" %} - 第{{ step_index }}步:{{ step_description }} + 第{{ step_index }}步:{{ step_goal }} 调用工具 `{{ step_name }}`,并提供参数 `{{ input_data | tojson }}`。 {% elif msg_type == "assistant" %} 第{{ step_index }}步执行完成。 @@ -406,7 +406,7 @@ MEMORY_TEMPLATE: dict[str, str] = { LanguageType.ENGLISH: dedent( r""" {% if msg_type == "user" %} - Step {{ step_index }}: {{ step_description }} + Step {{ step_index }}: {{ step_goal }} Called tool `{{ step_name }}` and provided parameters `{{ input_data | tojson }}` {% elif msg_type == "assistant" %} Step {{ step_index }} execution completed. diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py index 3c4c9da77c76de47ba87764a3ca4567d53edca8c..430a1a9c5f1e12ca57d98c2356977f7baaa51cf6 100644 --- a/apps/scheduler/mcp_agent/base.py +++ b/apps/scheduler/mcp_agent/base.py @@ -1,23 +1,77 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP基类""" +import json import logging +from typing import Any + +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment from apps.models import LanguageType +from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE from apps.schemas.task import TaskData _logger = logging.getLogger(__name__) +_env = SandboxedEnvironment( + loader=BaseLoader, + autoescape=False, + trim_blocks=True, + lstrip_blocks=True, +) + + +def tojson_filter(value: dict[str, Any]) -> str: + """将字典转换为紧凑JSON字符串""" + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) + + +_env.filters["tojson"] = tojson_filter class MCPBase: """MCP基类""" _user_id: str - task: TaskData _language: LanguageType + _goal: str def __init__(self, task: TaskData) -> None: """初始化MCP基类""" self._user_id = task.metadata.userId - self.task = task + self._goal = task.runtime.userInput self._language = task.runtime.language + + @staticmethod + async def assemble_memory(task: TaskData) -> list[dict[str, str]]: + """组装记忆""" + history = [] + template = MEMORY_TEMPLATE[task.runtime.language] + + for index, item in enumerate(task.context, start=1): + # 用户消息:步骤描述和工具调用 + step_goal = item.extraData["step_goal"] + user_content = _env.from_string(template).render( + msg_type="user", + step_index=index, + step_goal=step_goal, + step_name=item.stepName, + input_data=item.inputData, + ) + history.append({ + "role": "user", + "content": user_content, + }) + + # 助手消息:步骤执行结果 + assistant_content = _env.from_string(template).render( + msg_type="assistant", + step_index=index, + step_status=item.stepStatus.value, + output_data=item.outputData, + ) + history.append({ + "role": "assistant", + "content": assistant_content, + }) + return history diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index e7a33f277c71a61918c0a48ed89013533b739fe7..c3f06d4fc0fd7f841ee6827095be87ea7394e4a6 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -1,7 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP宿主""" -import json import logging from typing import Any @@ -9,10 +8,10 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from apps.llm import json_generator -from apps.models import ExecutorHistory, LanguageType, MCPTools, TaskRuntime -from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE +from apps.models import LanguageType, MCPTools from apps.scheduler.mcp_agent.base import MCPBase -from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS +from apps.scheduler.mcp_agent.prompt import REPAIR_PARAMS, get_gen_params_prompt +from apps.schemas.task import TaskData _logger = logging.getLogger(__name__) _env = SandboxedEnvironment( @@ -21,14 +20,6 @@ _env = SandboxedEnvironment( trim_blocks=True, lstrip_blocks=True, ) - - -def tojson_filter(value: dict[str, Any]) -> str: - """将字典转换为紧凑JSON字符串""" - return json.dumps(value, ensure_ascii=False, separators=(",", ":")) - - -_env.filters["tojson"] = tojson_filter _LLM_QUERY_FIX = { LanguageType.CHINESE: "请生成修复之后的工具参数", LanguageType.ENGLISH: "Please generate the tool parameters after repair", @@ -38,27 +29,23 @@ _LLM_QUERY_FIX = { class MCPHost(MCPBase): """MCP宿主服务""" - @staticmethod - async def assemble_memory(runtime: TaskRuntime, context: list[ExecutorHistory]) -> str: - """组装记忆""" - return _env.from_string(MEMORY_TEMPLATE[runtime.language]).render( - context_list=context, - ) - async def get_first_input_params( - self, mcp_tool: MCPTools, current_goal: str, + self, mcp_tool: MCPTools, task: TaskData, ) -> dict[str, Any]: """填充工具参数""" + # 加载提示词模板 + prompt_template = get_gen_params_prompt(task.runtime.language) + # 更清晰的输入指令,这样可以调用generate - prompt = _env.from_string(GEN_PARAMS[self.task.runtime.language]).render( + prompt = _env.from_string(prompt_template).render( tool_name=mcp_tool.toolName, tool_description=mcp_tool.description, - goal=self.task.runtime.userInput, - current_goal=current_goal, + goal=task.runtime.userInput, + current_goal=task.runtime.userInput, input_schema=mcp_tool.inputSchema, - background_info=await self.assemble_memory(self.task.runtime, self.task.context), + background_info=await self.assemble_memory(task), ) - _logger.info("[MCPHost] 填充工具参数: %s", prompt) + _logger.info("[MCPHost] 填充工具参数: %s", mcp_tool.toolName) # 使用json_generator解析结果 function = { "name": mcp_tool.toolName, @@ -71,23 +58,24 @@ class MCPHost(MCPBase): {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, ], - language=self._language, + prompt=task.runtime.language, ) - async def fill_params( # noqa: D102, PLR0913 + async def fill_params( self, mcp_tool: MCPTools, - current_goal: str, + task: TaskData, current_input: dict[str, Any], - error_message: str | dict = "", params: dict[str, Any] | None = None, params_description: str = "", ) -> dict[str, Any]: - llm_query = _LLM_QUERY_FIX[self._language] - prompt = _env.from_string(REPAIR_PARAMS[self._language]).render( + """填充并修复工具参数""" + llm_query = _LLM_QUERY_FIX[task.runtime.language] + error_message = task.state.errorMessage if task.state else {} + prompt = _env.from_string(REPAIR_PARAMS[task.runtime.language]).render( tool_name=mcp_tool.toolName, - goal=self.task.runtime.userInput, - current_goal=current_goal, + goal=task.runtime.userInput, + current_goal=task.runtime.userInput, tool_description=mcp_tool.description, input_schema=mcp_tool.inputSchema, input_params=current_input, @@ -109,5 +97,5 @@ class MCPHost(MCPBase): {"role": "user", "content": prompt}, {"role": "user", "content": llm_query}, ], - language=self._language, + language=task.runtime.language, ) diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index dd1ad17d1d5e46a378fff18858ec432c1a48759a..70a6715ebc2642b22456612c17b3ea5f2745fac6 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -33,6 +33,7 @@ from apps.schemas.mcp import ( Step, ToolRisk, ) +from apps.schemas.task import TaskData _env = SandboxedEnvironment( loader=BaseLoader, @@ -49,7 +50,7 @@ class MCPPlanner(MCPBase): async def get_flow_name(self) -> FlowName: """获取当前流程的名称""" template = _env.from_string(GENERATE_FLOW_NAME[self._language]) - prompt = template.render(goal=self.task.runtime.userInput) + prompt = template.render(goal=self._goal) result = await json_generator.generate( function=GET_FLOW_NAME_FUNCTION[self._language], @@ -61,26 +62,28 @@ class MCPPlanner(MCPBase): ) return FlowName.model_validate(result) - async def create_next_step(self, history: str, tools: list[MCPTools]) -> Step: + async def create_next_step(self, tools: list[MCPTools], task: TaskData) -> Step: """创建下一步的执行步骤""" - # 构建提示词 template = _env.from_string(GEN_STEP[self._language]) - prompt = template.render(goal=self.task.runtime.userInput, history=history, tools=tools) + prompt = template.render(goal=self._goal, tools=tools) - # 获取函数定义并动态设置tool_id的enum function = deepcopy(CREATE_NEXT_STEP_FUNCTION[self._language]) function["parameters"]["properties"]["tool_name"]["enum"] = [tool.toolName for tool in tools] + history = await self.assemble_memory(task) + + conversation = [ + {"role": "system", "content": "You are a helpful assistant."}, + *history, + {"role": "user", "content": prompt}, + ] + step = await json_generator.generate( function=function, - conversation=[ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ], + conversation=conversation, language=self._language, ) logger.info("[MCPPlanner] 创建下一步的执行步骤: %s", step) - # 使用Step模型解析结果 return Step.model_validate(step) async def get_tool_risk( @@ -90,7 +93,6 @@ class MCPPlanner(MCPBase): additional_info: str = "", ) -> ToolRisk: """获取MCP工具的风险评估结果""" - # 构建提示词 template = _env.from_string(RISK_EVALUATE[self._language]) prompt = template.render( tool_name=tool.toolName, @@ -108,38 +110,40 @@ class MCPPlanner(MCPBase): language=self._language, ) - # 返回风险评估结果 return ToolRisk.model_validate(risk) async def is_param_error( self, - history: str, + task: TaskData, error_message: str, tool: MCPTools, - step_description: str, + step_goal: str, input_params: dict[str, Any], ) -> IsParamError: """判断错误信息是否是参数错误""" tmplate = _env.from_string(IS_PARAM_ERROR[self._language]) prompt = tmplate.render( - goal=self.task.runtime.userInput, - history=history, + goal=self._goal, step_id=tool.toolName, step_name=tool.toolName, - step_description=step_description, + step_goal=step_goal, input_params=input_params, error_message=error_message, ) + history = await self.assemble_memory(task) + + conversation = [ + {"role": "system", "content": "You are a helpful assistant."}, + *history, + {"role": "user", "content": prompt}, + ] + is_param_error = await json_generator.generate( function=IS_PARAM_ERROR_FUNCTION[self._language], - conversation=[ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ], + conversation=conversation, language=self._language, ) - # 使用IsParamError模型解析结果 return IsParamError.model_validate(is_param_error) async def get_missing_param( @@ -157,11 +161,9 @@ class MCPPlanner(MCPBase): error_message=error_message, ) - # 获取函数定义并设置parameters为schema_with_null function = deepcopy(GET_MISSING_PARAMS_FUNCTION[self._language]) function["parameters"] = schema_with_null - # 解析为结构化数据 return await json_generator.generate( function=function, conversation=[ @@ -171,15 +173,22 @@ class MCPPlanner(MCPBase): language=self._language, ) - async def generate_answer(self, memory: str, llm: LLM) -> AsyncGenerator[LLMChunk, None]: - """生成最终回答,返回LLMChunk""" + async def generate_answer(self, task: TaskData, llm: LLM) -> AsyncGenerator[LLMChunk, None]: + """生成最终回答,返回LLMChunk""" template = _env.from_string(FINAL_ANSWER[self._language]) prompt = template.render( - memory=memory, - goal=self.task.runtime.userInput, + goal=self._goal, ) + + history = await self.assemble_memory(task) + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + *history, + {"role": "user", "content": prompt}, + ] + async for chunk in llm.call( - [{"role": "user", "content": prompt}], + messages, streaming=True, ): yield chunk diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 390c4be4db1e9593baa55286818ed57f3c7af757..9a09b15d145669f97c1f112fe5720f106bda77c8 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -1,10 +1,26 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP相关的大模型Prompt""" +from pathlib import Path from textwrap import dedent from apps.models import LanguageType + +def _load_prompt(prompt_id: str, language: LanguageType) -> str: + """ + 从Markdown文件加载提示词 + + :param prompt_id: 提示词ID,例如 "gen_params" 等 + :param language: 语言类型 + :return: 提示词内容 + """ + # 组装Prompt文件路径: prompt_id.language.md (例如: gen_params.en.md) + filename = f"{prompt_id}.{language.value}.md" + prompt_dir = Path(__file__).parent.parent.parent / "data" / "prompts" / "system" / "mcp" + prompt_file = prompt_dir / filename + return prompt_file.read_text(encoding="utf-8") + GENERATE_FLOW_NAME: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" @@ -98,12 +114,12 @@ CREATE_NEXT_STEP_FUNCTION: dict[LanguageType, dict] = { "properties": { "tool_name": { "type": "string", - "description": "工具名称", + "description": "工具名称,必须从可用工具列表中选择一个", "enum": [], }, "description": { "type": "string", - "description": "步骤描述", + "description": "步骤描述,清晰说明本步骤要做什么", }, }, "required": ["tool_name", "description"], @@ -126,12 +142,12 @@ CREATE_NEXT_STEP_FUNCTION: dict[LanguageType, dict] = { "properties": { "tool_name": { "type": "string", - "description": "Tool Name", + "description": "Tool name, must be selected from the available tools list", "enum": [], }, "description": { "type": "string", - "description": "Step description", + "description": "Step description, clearly explain what this step will do", }, }, "required": ["tool_name", "description"], @@ -148,77 +164,95 @@ CREATE_NEXT_STEP_FUNCTION: dict[LanguageType, dict] = { GEN_STEP: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" - 根据用户目标、执行历史和可用工具,生成下一个执行步骤。 - - ## 任务要求 - - 作为计划生成器,你需要: - - **选择最合适的工具**:从可用工具集中选择当前阶段最适合的工具 - - **推进目标完成**:基于历史记录,制定能完成阶段性任务的步骤 - - **严格使用工具ID**:工具ID必须精确匹配可用工具列表中的ID - - **判断完成状态**:若目标已达成,选择`Final`工具结束流程 - - ## 示例 - - 假设用户需要扫描一个MySQL数据库(地址192.168.1.1,端口3306,使用root账户和password密码),分析其性能瓶颈并进行调优。 - 之前已经完成了端口扫描,确认了3306端口开放。现在需要选择下一步操作。 - 查看可用工具列表后,发现有MySQL性能分析工具(mcp_tool_1)、文件存储工具(mcp_tool_2)和结束工具(Final)。 - 此时应选择MySQL性能分析工具,并描述这一步要做什么:使用提供的数据库连接信息(192.168.1.1:3306,root/password)来扫描和分析数据库性能。 - - --- - - ## 当前任务 - - **目标**:{{goal}} - - **历史记录**: - {{history}} - - **可用工具**: - {% for tool in tools %} - - **{{tool.toolName}}**:{{tool.description}} - {% endfor %} + 你的任务是分析对话历史和用户目标,然后**调用`create_next_step`函数**来规划下一个执行步骤。 + + ## 重要提醒 + **你必须且只能调用名为`create_next_step`的函数**,该函数接受两个参数: + 1. `tool_name`: 从下方可用工具列表中选择一个工具名称 + 2. `description`: 清晰描述本步骤要完成的具体任务 + + ## 可用工具列表 + {% for tool in tools %} + - **工具名**: `{{tool.toolName}}` + **功能描述**: {{tool.description}} + {% endfor %} + + ## 调用规范 + - **函数名固定**: 必须调用`create_next_step`,不要使用工具名作为函数名 + - **tool_name的值**: 必须是上述可用工具列表中的某个工具名称 + - **description的值**: 描述本步骤的具体操作内容 + - **任务完成时**: 如果用户目标已经完成,将`tool_name`参数设为`Final` + + ## 错误示例❌(严禁模仿) + ``` + # 错误:直接使用工具名作为函数名或返回工具名的字典 + {'Final': '{"description": "任务完成"}'} + ``` + + ## 正确示例✅ + ``` + # 正确:调用create_next_step函数,tool_name参数的值才是工具名称 + create_next_step( + tool_name="Final", + description="已完成所有分析任务,可以结束流程" + ) + ``` + + --- + ## 当前任务 + **用户目标**: {{goal}} + + 请根据上方对话历史中已执行步骤的结果,**调用`create_next_step`函数**规划下一步。 + 记住:函数名必须是`create_next_step`,`tool_name`参数的值才是具体的工具名称。 """, - ), + ).strip(), LanguageType.ENGLISH: dedent( r""" - Generate the next execution step based on user goals, execution history, and available tools. - - ## Task Requirements - - As a plan generator, you need to: - - **Select the most appropriate tool**: Choose the best tool from available tools for the current stage - - **Advance goal completion**: Based on execution history, formulate steps to complete phased tasks - - **Strictly use tool IDs**: Tool ID must exactly match the ID in the available tools list - - **Determine completion status**: If goal is achieved, select `Final` tool to end the workflow - - ## Example - - Suppose the user needs to scan a MySQL database (at 192.168.1.1:3306, using root account with password), - analyze its performance bottlenecks, and optimize it. - Previously, a port scan was completed and confirmed that port 3306 is open. Now we need to select the - next action. - Looking at the available tools list, there is a MySQL performance analysis tool (mcp_tool_1), a file - storage tool (mcp_tool_2), and a final tool (Final). - At this point, we should select the MySQL performance analysis tool and describe what this step will do: - use the provided database connection information (192.168.1.1:3306, root/password) to scan and analyze - database performance. - - --- - - ## Current Task - - **Goal**: {{goal}} - - **Execution History**: - {{history}} - - **Available Tools**: - {% for tool in tools %} - - **{{tool.toolName}}**: {{tool.description}} - {% endfor %} + Your task is to analyze the conversation history and user goal, then **call the `create_next_step` \ +function** to plan the next execution step. + + ## Important Reminder + **You must and can only call the function named `create_next_step`**, which accepts two parameters: + 1. `tool_name`: Select a tool name from the available tools list below + 2. `description`: Clearly describe the specific task this step will accomplish + + ## Available Tools List + {% for tool in tools %} + - **Tool Name**: `{{tool.toolName}}` + **Description**: {{tool.description}} + {% endfor %} + + ## Calling Specifications + - **Fixed function name**: Must call `create_next_step`, do not use tool names as function names + - **tool_name value**: Must be one of the tool names from the available tools list above + - **description value**: Describe the specific operations of this step + - **When task completes**: If the user goal is achieved, set `tool_name` parameter to `Final` + + ## Incorrect Examples ❌ (Strictly Forbidden) + ``` + # Error: Using tool name as function name or returning a dictionary with tool name as key + {'Final': '{"description": "Task completed"}'} + ``` + + ## Correct Examples ✅ + ``` + # Correct: Call create_next_step function, the tool_name parameter value is the tool name + create_next_step( + tool_name="Final", + description="All analysis tasks completed, workflow can be ended" + ) + ``` + + --- + ## Current Task + **User Goal**: {{goal}} + + Please **call the `create_next_step` function** to plan the next step based on the results of \ +executed steps in the conversation history above. + Remember: The function name must be `create_next_step`, and the `tool_name` parameter value is \ +the specific tool name. """, - ), + ).strip(), } EVALUATE_TOOL_RISK_FUNCTION: dict[LanguageType, dict] = { @@ -398,25 +432,23 @@ IS_PARAM_ERROR_FUNCTION: dict[LanguageType, dict] = { IS_PARAM_ERROR: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" - 判断以下工具执行失败是否由参数错误导致,并调用 check_parameter_error 工具返回结果。 + 判断以下工具执行失败是否由参数错误导致,并调用 check_parameter_error 工具返回结果。 **判断标准**: - **参数错误**:缺失必需参数、参数值不正确、参数格式/类型错误等 - **非参数错误**:权限问题、网络故障、系统异常、业务逻辑错误等 **示例**:当mysql_analyzer工具入参为 {"host": "192.0.0.1", ...},报错"host is not correct"时, - 错误明确指出host参数值不正确,属于**参数错误**,应调用工具返回 {"is_param_error": true} + 错误明确指出host参数值不正确,属于**参数错误**,应调用工具返回 {"is_param_error": true} --- - **背景信息**: - - 用户目标:{{goal}} - - 执行历史:{{history}} + **用户目标**:{{goal}} **当前失败步骤**(步骤{{step_id}}): - 工具:{{step_name}} - - 说明:{{step_instruction}} - - 入参:{{input_param}} + - 目标:{{step_goal}} + - 入参:{{input_params}} - 报错:{{error_message}} 请基于报错信息和上下文综合判断,调用 check_parameter_error 工具返回判断结果。 @@ -439,14 +471,12 @@ should call the tool to return {"is_param_error": true} --- - **Background**: - - User goal: {{goal}} - - Execution history: {{history}} + **User Goal**: {{goal}} **Current Failed Step** (Step {{step_id}}): - Tool: {{step_name}} - - Instruction: {{step_instruction}} - - Input: {{input_param}} + - Goal: {{step_goal}} + - Input: {{input_params}} - Error: {{error_message}} Please make a comprehensive judgment based on the error message and context, and call the \ @@ -587,157 +617,16 @@ for user to provide credentials again ), } -GEN_PARAMS: dict[LanguageType, str] = { - LanguageType.CHINESE: dedent( - r""" - 根据总体目标、阶段目标、工具信息和背景信息,生成符合schema的工具参数。 - - ## 任务要求 - - **严格遵循Schema**:生成的参数必须完全符合工具入参schema的类型和格式规范 - - **充分理解上下文**:全面分析总体目标、阶段目标和背景信息,提取所有可用的参数值 - - **匹配阶段目标**:确保生成的参数能够完成当前阶段目标 - - ## 示例 - - **工具**: - - **名称**:`mysql_analyzer` - - **描述**:分析MySQL数据库性能 - - **总体目标**:扫描MySQL数据库(IP: 192.168.1.1,端口: 3306,用户名: root,密码: password),\ -分析性能瓶颈并调优 - - **阶段目标**:连接MySQL数据库并分析性能 - - **参数Schema**: - ```json - { - "type": "object", - "properties": { - "host": {"type": "string", "description": "主机地址"}, - "port": {"type": "integer", "description": "端口号"}, - "username": {"type": "string", "description": "用户名"}, - "password": {"type": "string", "description": "密码"} - }, - "required": ["host", "port", "username", "password"] - } - ``` - - **背景信息**: - - **步骤1**:生成端口扫描命令 → 成功,输出:`{"command": "nmap -sS -p --open 192.168.1.1"}` - - **步骤2**:执行端口扫描 → 成功,确认3306端口开放 - **输出**: - ```json - { - "host": "192.168.1.1", - "port": 3306, - "username": "root", - "password": "password" - } - ``` +def get_gen_params_prompt(language: LanguageType) -> str: + """ + 获取GEN_PARAMS提示词 - --- + :param language: 语言类型 + :return: 提示词内容 + """ + return _load_prompt("gen_params", language) - ## 当前任务 - - **工具**: - - **名称**:`{{tool_name}}` - - **描述**:{{tool_description}} - - **总体目标**: - {{goal}} - - **阶段目标**: - {{current_goal}} - - **参数Schema**: - ```json - {{input_schema}} - ``` - - **背景信息**: - {{background_info}} - - **输出**: - """, - ).strip("\n"), - LanguageType.ENGLISH: dedent( - r""" - Generate tool parameters that conform to the schema based on the overall goal, phase goal, tool \ -information, and background context. - - ## Task Requirements - - **Strictly Follow Schema**: Generated parameters must fully conform to the tool input schema's type and \ -format specifications - - **Fully Understand Context**: Comprehensively analyze the overall goal, phase goal, and background \ -information to extract all available parameter values - - **Match Phase Goal**: Ensure the generated parameters can accomplish the current phase goal - - ## Example - - **Tool**: - - **Name**: `mysql_analyzer` - - **Description**: Analyze MySQL database performance - - **Overall Goal**: Scan MySQL database (IP: 192.168.1.1, Port: 3306, Username: root, Password: password), \ -analyze performance bottlenecks, and optimize - - **Phase Goal**: Connect to MySQL database and analyze performance - - **Parameter Schema**: - ```json - { - "type": "object", - "properties": { - "host": {"type": "string", "description": "Host address"}, - "port": {"type": "integer", "description": "Port number"}, - "username": {"type": "string", "description": "Username"}, - "password": {"type": "string", "description": "Password"} - }, - "required": ["host", "port", "username", "password"] - } - ``` - - **Background Information**: - - **Step 1**: Generate port scan command → Success, output: `{"command": "nmap -sS -p --open 192.168.1.1"}` - - **Step 2**: Execute port scan → Success, confirmed port 3306 is open - - **Output**: - ```json - { - "host": "192.168.1.1", - "port": 3306, - "username": "root", - "password": "password" - } - ``` - - --- - - ## Current Task - - **Tool**: - - **Name**: `{{tool_name}}` - - **Description**: {{tool_description}} - - **Overall Goal**: - {{goal}} - - **Phase Goal**: - {{current_goal}} - - **Parameter Schema**: - ```json - {{input_schema}} - ``` - - **Background Information**: - {{background_info}} - - **Output**: - """, - ).strip("\n"), -} REPAIR_PARAMS: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( @@ -962,14 +851,6 @@ FINAL_ANSWER: dict[LanguageType, str] = { # 用户目标 {{goal}} - # 计划执行情况 - 为了完成上述目标,你实施了以下计划: - - {{memory}} - - # 其他背景信息: - {{status}} - # 现在,请根据以上信息,向用户报告目标的完成情况: """, @@ -982,14 +863,6 @@ completion status to the user. # User Goal {{goal}} - # Plan Execution Status - To achieve the above goal, you implemented the following plan: - - {{memory}} - - # Additional Background Information: - {{status}} - # Now, based on the above information, report the goal completion status to the user: """, diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py index de64646a8885aba891829c93da4a95ee7b5e975e..43045aeecc3a3414bc0433230d44e016f278b142 100644 --- a/apps/scheduler/pool/mcp/pool.py +++ b/apps/scheduler/pool/mcp/pool.py @@ -73,19 +73,22 @@ class _MCPPool: return result is not None - async def get(self, mcp_id: str, user_id: str) -> MCPClient | None: - """获取MCP客户端""" + async def get(self, mcp_id: str, user_id: str) -> MCPClient: + """获取MCP客户端,如果无法获取则抛出异常""" item = await self._get_from_dict(mcp_id, user_id) if item is None: # 检查用户是否已激活 if not await self._validate_user(mcp_id, user_id): - logger.warning("用户 %s 未激活MCP %s", user_id, mcp_id) - return None + err = f"用户 {user_id} 未激活MCP {mcp_id}" + logger.warning(err) + raise RuntimeError(err) # 初始化进程 item = await self.init_mcp(mcp_id, user_id) if item is None: - return None + err = f"初始化MCP {mcp_id} 失败(用户:{user_id})" + logger.error(err) + raise RuntimeError(err) if user_id not in self.pool: self.pool[user_id] = {} @@ -97,6 +100,14 @@ class _MCPPool: async def stop(self, mcp_id: str, user_id: str) -> None: """停止MCP客户端""" + if user_id not in self.pool: + logger.warning("[MCPPool] 用户 %s 不存在于池中,无法停止MCP %s", user_id, mcp_id) + return + + if mcp_id not in self.pool[user_id]: + logger.warning("[MCPPool] 用户 %s 的MCP %s 不存在于池中,无法停止", user_id, mcp_id) + return + await self.pool[user_id][mcp_id].stop() del self.pool[user_id][mcp_id] diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index 2c5be573bc910ede231570662dcf0595d5c9aa6a..c371cbbd8ae7f3de0a437329a87dd1d85c091d00 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -125,6 +125,12 @@ class Step(BaseModel): description: str = Field(description="步骤描述") +class MCPRiskConfirm(BaseModel): + """MCP工具风险确认""" + + confirm: bool = Field(description="是否确认") + + class UpdateMCPServiceRequest(BaseModel): """POST /api/mcpservice 请求数据结构""" diff --git a/apps/schemas/task.py b/apps/schemas/task.py index 8b74073d36ced56920c134748e6ce42b2dc31f85..999034199b95cd538151e56a2846eeff360aac3f 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -20,13 +20,20 @@ class TaskData(BaseModel): context: list[ExecutorHistory] = Field(description="执行历史") -class CheckpointExtra(BaseModel): +class AgentCheckpointExtra(BaseModel): """Executor额外数据""" current_input: dict[str, Any] = Field(description="当前输入数据", default={}) error_message: str = Field(description="错误信息", default="") retry_times: int = Field(description="当前步骤重试次数", default=0) + +class AgentHistoryExtra(BaseModel): + """执行器历史额外数据""" + + step_goal: str = Field(description="步骤目标") + + class StepQueueItem(BaseModel): """步骤栈中的元素""" diff --git a/assets/.config.example.toml b/data/.config.example.toml similarity index 100% rename from assets/.config.example.toml rename to data/.config.example.toml diff --git a/data/prompts/call/domain.en.md b/data/prompts/call/domain.en.md new file mode 100644 index 0000000000000000000000000000000000000000..56bfc3ae0186b1aaae19cdf8221785c375d722bf --- /dev/null +++ b/data/prompts/call/domain.en.md @@ -0,0 +1,97 @@ +# Task Description + +You are a domain tag extraction assistant. Your task is to analyze conversation history, +select the most relevant domain keywords from the available tags list, +and return results by calling the `extract_domain` function. + +## Available Keywords List + +{{ available_keywords }} + +## Selection Requirements + +1. **Exact Match**: Only select from the available list, do not create new tags +2. **Topic Relevance**: Select tags directly related to the conversation topic +3. **Quantity Control**: Select 3-8 most relevant tags +4. **Quality Standards**: Avoid duplicate or similar tags, prioritize + distinctive tags, sort by relevance +{% raw %}{% if use_xml_format %}{% endraw %} + +## Function Specification + +**Function Name**: `extract_domain` +**Function Description**: Extract domain keyword tags from conversation +**Function Parameter Schema**: + +```json +{ + "type": "object", + "properties": { + "keywords": { + "type": "array", + "items": {"type": "string"}, + "description": "List of keywords or tags" + } + }, + "required": ["keywords"] +} +``` + +## Output Format + +Use XML format to call the function, basic format: + +```xml + +keyword1 +keyword2 + +``` + +## Call Examples + +### Example 1: Normal Extraction (XML) + +- Conversation: User asks "What's the weather like in Beijing?", + Assistant replies "Beijing is sunny today" +- Available list contains: ["Beijing", "Shanghai", "weather", "temperature", + "Python", "Java"] +- Function call: + +```xml + +Beijing +weather +temperature + +``` + +### Example 2: No Relevant Tags (XML) + +- Conversation: User says "I'm feeling good today" +- If no relevant tags in the available list, return empty tags: + +```xml + + +``` + +Please use XML format to call the `extract_domain` function for tag +extraction.{% raw %}{% else %}{% endraw %} + +## Function Call Examples + +### Example 1: Normal Extraction + +- Conversation: User asks "What's the weather like in Beijing?", + Assistant replies "Beijing is sunny today" +- Available list contains: ["Beijing", "Shanghai", "weather", "temperature", + "Python", "Java"] +- Function call result: ["Beijing", "weather", "temperature"] + +### Example 2: No Relevant Tags + +- Conversation: User says "I'm feeling good today" +- If no relevant tags in the available list, return empty array: [] + +Please call the `extract_domain` function to complete tag extraction.{% raw %}{% endif %}{% endraw %} diff --git a/data/prompts/call/domain.zh.md b/data/prompts/call/domain.zh.md new file mode 100644 index 0000000000000000000000000000000000000000..e93a452dff9b78648b8739a0950059296bc88c12 --- /dev/null +++ b/data/prompts/call/domain.zh.md @@ -0,0 +1,90 @@ +# 任务说明 + +你是一个领域标签提取助手。你的任务是分析对话历史,从备选标签列表中选择最相关的领域关键词, +并通过调用 `extract_domain` 函数返回结果。 + +## 备选标签列表 + +{{ available_keywords }} + +## 选择要求 + +1. **精准匹配**:只能从备选列表中选择,不可自创标签 +2. **话题相关性**:选择与对话主题直接相关的标签 +3. **数量控制**:选择3-8个最相关的标签 +4. **质量标准**:避免重复或相似标签,优先选择具有区分度的标签,按相关性排序 +{% raw %}{% if use_xml_format %}{% endraw %} + +## 函数说明 + +**函数名称**:`extract_domain` +**函数描述**:从对话中提取领域关键词标签 +**函数参数Schema**: + +```json +{ + "type": "object", + "properties": { + "keywords": { + "type": "array", + "items": {"type": "string"}, + "description": "关键词或标签列表" + } + }, + "required": ["keywords"] +} +``` + +## 输出格式 + +使用XML格式调用函数,基本格式: + +```xml + +关键词1 +关键词2 + +``` + +## 调用示例 + +### 示例1:正常提取(XML) + +- 对话:用户询问"北京天气如何?",助手回复"北京今天晴" +- 备选列表包含:["北京", "上海", "天气", "气温", "Python", "Java"] +- 函数调用: + +```xml + +北京 +天气 +气温 + +``` + +### 示例2:无相关标签(XML) + +- 对话:用户说"今天心情不错" +- 如果备选列表中没有相关标签,返回空标签: + +```xml + + +``` + +请使用XML格式调用 `extract_domain` 函数完成标签提取。{% raw %}{% else %}{% endraw %} + +## 函数调用示例 + +### 示例1:正常提取 + +- 对话:用户询问"北京天气如何?",助手回复"北京今天晴" +- 备选列表包含:["北京", "上海", "天气", "气温", "Python", "Java"] +- 函数调用结果:["北京", "天气", "气温"] + +### 示例2:无相关标签 + +- 对话:用户说"今天心情不错" +- 如果备选列表中没有相关标签,返回空数组:[] + +请调用 `extract_domain` 函数完成标签提取。{% raw %}{% endif %}{% endraw %} diff --git a/data/prompts/call/facts.en.md b/data/prompts/call/facts.en.md new file mode 100644 index 0000000000000000000000000000000000000000..07ba3d1924104922bb7fe7f9144f534101891658 --- /dev/null +++ b/data/prompts/call/facts.en.md @@ -0,0 +1,101 @@ +# Task Description + +You are a fact extraction assistant. Your task is to extract key facts from the conversation +and return structured results by calling the `extract_facts` function. + +## Information Types to Focus On + +1. **Entities**: Names, locations, organizations, events, etc. +2. **Preferences**: Attitudes towards entities, such as like, dislike, etc. +3. **Relationships**: Relationships between users and entities, or between + entities +4. **Actions**: Actions affecting entities, such as query, search, browse, + click, etc. + +## Extraction Requirements + +1. Facts must be accurate and extracted only from the conversation +2. Facts must be clear and concise, each less than 30 words +3. Each fact should be independent and complete, easy to understand +{% if use_xml_format %} + +## Function Specification + +**Function Name**: `extract_facts` +**Function Description**: Extract key fact information from conversation +**Function Parameter Schema**: + +```json +{ + "type": "object", + "properties": { + "facts": { + "type": "array", + "items": {"type": "string"}, + "description": "Fact entries extracted from conversation" + } + }, + "required": ["facts"] +} +``` + +## Output Format + +Use XML format to call the function, basic format: + +```xml + +fact1 +fact2 + +``` + +## Call Examples + +### Example 1: Normal Extraction (XML) + +- Conversation: User asks "What are the attractions in Hangzhou West Lake?", + Assistant replies "Notable attractions include Su Causeway, Bai Causeway, + Broken Bridge, etc." +- Function call: + +```xml + +Hangzhou West Lake has Su Causeway, Bai Causeway, Broken Bridge, etc. + +``` + +### Example 2: Multiple Information Types (XML) + +- Conversation: User says "I work in Beijing and often go to Starbucks in + Sanlitun" +- Function call: + +```xml + +User works in Beijing +User often goes to Starbucks in Sanlitun + +``` + +Please use XML format to call the `extract_facts` function for fact +extraction.{% else %} + +## Function Call Examples + +### Example 1: Normal Extraction + +- Conversation: User asks "What are the attractions in Hangzhou West Lake?", + Assistant replies "Notable attractions include Su Causeway, Bai Causeway, + Broken Bridge, etc." +- Function call result: ["Hangzhou West Lake has Su Causeway, Bai Causeway, + Broken Bridge, etc."] + +### Example 2: Multiple Information Types + +- Conversation: User says "I work in Beijing and often go to Starbucks in + Sanlitun" +- Function call result: ["User works in Beijing", "User often goes to + Starbucks in Sanlitun"] + +Please call the `extract_facts` function to complete fact extraction.{% endif %} diff --git a/data/prompts/call/facts.zh.md b/data/prompts/call/facts.zh.md new file mode 100644 index 0000000000000000000000000000000000000000..1051af21b40b041dabb2f41c38e57afc54ff79e8 --- /dev/null +++ b/data/prompts/call/facts.zh.md @@ -0,0 +1,113 @@ +# 事实提取任务 + +## 角色 + +你是一个专业的事实信息提取助手,能够从对话中准确提取关键事实信息,并通过调用工具返回结构化结果。 + +## 任务目标 + +从对话中提取以下类型的关键信息: + +1. **实体**:姓名、地点、组织、事件等 +2. **偏好**:对实体的态度,如喜欢、讨厌等 +3. **关系**:用户与实体之间、实体与实体之间的关系 +4. **动作**:查询、搜索、浏览、点击等影响实体的动作 + +## 提取要求 + +1. 事实必须准确,仅从对话中提取 +2. 事实必须清晰简洁,每条少于30字 +3. 每条事实独立完整,易于理解 + +## 工具 + +你可以调用以下工具来完成事实提取任务。 + +{% raw %}{% if use_xml_format %}{% endraw %} +调用工具时,采用XML风格标签进行格式化。格式规范如下: + +```xml + +事实1 +事实2 + +``` + +格式样例(仅供参考):从对话中提取事实 + +```xml + +杭州西湖有苏堤、白堤、断桥、三潭印月等景点 + +``` + +{% raw %}{% endif %}{% endraw %} + +### extract_facts + +描述: 从对话中提取关键事实信息 + +JSON Schema: + +```json +{ + "type": "object", + "properties": { + "facts": { + "type": "array", + "items": {"type": "string"}, + "description": "从对话中提取的事实条目" + } + }, + "required": ["facts"] +} +``` + +## 调用示例 + +### 示例1:正常提取 + +- 对话:用户问"杭州西湖有哪些景点?",助手回复"西湖周围有苏堤、白堤、断桥、三潭印月等景点" +- 函数调用: + +{% raw %}{% if use_xml_format %}{% endraw %} + +```xml + +杭州西湖有苏堤、白堤、断桥、三潭印月等景点 + +``` + +{% raw %}{% else %}{% endraw %} + +```json +["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] +``` + +{% raw %}{% endif %}{% endraw %} + +### 示例2:多类型信息 + +- 对话:用户说"我在北京工作,经常去三里屯的星巴克" +- 函数调用: + +{% raw %}{% if use_xml_format %}{% endraw %} + +```xml + +用户在北京工作 +用户经常去三里屯的星巴克 + +``` + +{% raw %}{% else %}{% endraw %} + +```json +["用户在北京工作", "用户经常去三里屯的星巴克"] +``` + +{% raw %}{% endif %}{% endraw %} + +--- + +现在开始提取事实信息: diff --git a/data/prompts/system/mcp/gen_params.en.md b/data/prompts/system/mcp/gen_params.en.md new file mode 100644 index 0000000000000000000000000000000000000000..d573a5c0830e823a6548ef35442a8048915658b1 --- /dev/null +++ b/data/prompts/system/mcp/gen_params.en.md @@ -0,0 +1,52 @@ +# Tool Calling Task + +## Role + +You are a professional tool calling assistant capable of accurately calling specified tools to respond to user instructions by obtaining information (context, etc.). + +## User Instructions + +**Current Goal**: {{current_goal}} + +**Overall Goal (for reference)**: {{goal}} + +## Tools + +You can call these tools to respond to user instructions. + +{% raw %}{% if use_xml_format %}{% endraw %} +When calling tools, format using XML-style tags. Format specification: + +```xml + + Tool Name + + Parameters for calling the tool, must be in JSON format and conform to JSON Schema + + +``` + +Format example (for reference only): Get weather for Hangzhou + +```xml + + check_weather + + { + "city": "Hangzhou" + } + + +``` + +{% raw %}{% endif %}{% endraw %} + +### {{ tool_name }} + +Description: {{ tool_description }} + +JSON Schema: {{ input_schema }} + +--- + +Now begin responding to user instructions: diff --git a/data/prompts/system/mcp/gen_params.zh.md b/data/prompts/system/mcp/gen_params.zh.md new file mode 100644 index 0000000000000000000000000000000000000000..4d24766aa33213d968f795a729a4051d333a6bca --- /dev/null +++ b/data/prompts/system/mcp/gen_params.zh.md @@ -0,0 +1,52 @@ +# 工具调用任务 + +## 角色 + +你是一个专业的工具调用助手,能够通过获取信息(上下文等),准确调用指定工具以响应用户给你的指令。 + +## 用户指令 + +**当前目标**: {{current_goal}} + +**总体目标(供参考)**: {{goal}} + +## 工具 + +你可以调用这些工具,来响应用户的指令。 + +{% raw %}{% if use_xml_format %}{% endraw %} +调用工具时,采用XML风格标签进行格式化。格式规范如下: + +```xml + + 工具名称 + + 调用工具时的参数,必须为JSON格式,且符合JSON Schema + + +``` + +格式样例(仅供参考):获取杭州市的天气 + +```xml + + check_weather + + { + "city": "Hangzhou" + } + + +``` + +{% raw %}{% endif %}{% endraw %} + +### {{ tool_name }} + +描述: {{ tool_description }} + +JSON Schema:{{ input_schema }} + +--- + +现在开始响应用户指令: diff --git a/sample/README.md b/data/semantics/README.md similarity index 100% rename from sample/README.md rename to data/semantics/README.md diff --git a/sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml b/data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml similarity index 100% rename from sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml rename to data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml diff --git a/sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml b/data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml similarity index 100% rename from sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml rename to data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml diff --git a/sample/mcp/template/test_mcp/config.json b/data/semantics/mcp/template/test_mcp/config.json similarity index 100% rename from sample/mcp/template/test_mcp/config.json rename to data/semantics/mcp/template/test_mcp/config.json diff --git a/sample/mcp/template/test_mcp/project/README.md b/data/semantics/mcp/template/test_mcp/project/README.md similarity index 100% rename from sample/mcp/template/test_mcp/project/README.md rename to data/semantics/mcp/template/test_mcp/project/README.md diff --git a/sample/mcp/template/test_mcp/project/package.json b/data/semantics/mcp/template/test_mcp/project/package.json similarity index 100% rename from sample/mcp/template/test_mcp/project/package.json rename to data/semantics/mcp/template/test_mcp/project/package.json diff --git a/sample/service/test_service/metadata.yaml b/data/semantics/service/test_service/metadata.yaml similarity index 100% rename from sample/service/test_service/metadata.yaml rename to data/semantics/service/test_service/metadata.yaml diff --git a/sample/service/test_service/openapi/api.yaml b/data/semantics/service/test_service/openapi/api.yaml similarity index 100% rename from sample/service/test_service/openapi/api.yaml rename to data/semantics/service/test_service/openapi/api.yaml diff --git a/assets/tiktoken/9b5ad71b2ce5302211f9c61530b329a4922fc6a4 b/data/tiktoken_cache/9b5ad71b2ce5302211f9c61530b329a4922fc6a4 similarity index 100% rename from assets/tiktoken/9b5ad71b2ce5302211f9c61530b329a4922fc6a4 rename to data/tiktoken_cache/9b5ad71b2ce5302211f9c61530b329a4922fc6a4