diff --git a/apps/llm/generator.py b/apps/llm/generator.py
index da5933d5f2344972103235fc5bc7fdc78b1b32ba..7ae83eba57b03cf9bfa57ab490ea3088386c83b5 100644
--- a/apps/llm/generator.py
+++ b/apps/llm/generator.py
@@ -10,11 +10,10 @@ from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from jsonschema import Draft7Validator
-from apps.models import LanguageType, LLMType
+from apps.models import LLMType
from apps.schemas.llm import LLMFunctions
from .llm import LLM
-from .prompt import JSON_GEN
from .token import token_calculator
_logger = logging.getLogger(__name__)
@@ -66,24 +65,17 @@ class JsonGenerator:
def _build_messages(
self,
- functions: list[dict[str, Any]],
+ prompt: str,
conversation: list[dict[str, str]],
- language: LanguageType = LanguageType.CHINESE,
) -> list[dict[str, str]]:
- """构建messages,提取query并使用JSON_GEN模板格式化"""
+ """构建messages,使用传入的prompt替换最后一条用户消息"""
if conversation[-1]["role"] == "user":
- query = conversation[-1]["content"]
+ # 验证最后一项是用户消息
+ pass
else:
err = "[JSONGenerator] 对话历史中最后一项必须是用户消息"
raise RuntimeError(err)
- template = self._env.from_string(JSON_GEN[language])
- prompt = template.render(
- query=query,
- use_xml_format=False,
- functions=functions,
- )
-
messages = [*conversation[:-1], {"role": "user", "content": prompt}]
# 计算Token数量
@@ -111,8 +103,8 @@ class JsonGenerator:
else:
break
- # 重新构建 messages 并计算 token
- messages = [*trimmed_conversation, {"role": "user", "content": prompt}]
+ # 重新构建 messages 并计算 token,添加原始最后一条用户消息以保持完整对话
+ messages = [*trimmed_conversation, conversation[-1], {"role": "user", "content": prompt}]
token_count = token_calculator.calculate_token_length(messages)
_logger.info(
@@ -126,8 +118,8 @@ class JsonGenerator:
async def _single_trial(
self,
function: dict[str, Any],
+ prompt: str,
conversation: list[dict[str, str]],
- language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
"""单次尝试,包含校验逻辑;function使用OpenAI标准Function格式"""
if self._llm is None:
@@ -140,10 +132,10 @@ class JsonGenerator:
# 执行生成
if self._support_function_call:
# 如果支持FunctionCall
- result = await self._call_with_function(function, conversation, language)
+ result = await self._call_with_function(function, prompt, conversation)
else:
# 如果不支持FunctionCall
- result = await self._call_without_function(function, conversation, language)
+ result = await self._call_without_function(function, prompt, conversation)
# 校验结果
try:
@@ -162,15 +154,15 @@ class JsonGenerator:
async def _call_with_function(
self,
function: dict[str, Any],
+ prompt: str,
conversation: list[dict[str, str]],
- language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
"""使用FunctionCall方式调用"""
if self._llm is None:
err = "[JSONGenerator] 未初始化,请先调用init()方法"
raise RuntimeError(err)
- messages = self._build_messages([function], conversation, language)
+ messages = self._build_messages(prompt, conversation)
tool = LLMFunctions(
name=function["name"],
@@ -192,15 +184,15 @@ class JsonGenerator:
async def _call_without_function(
self,
function: dict[str, Any],
+ prompt: str,
conversation: list[dict[str, str]],
- language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
"""不使用FunctionCall方式调用"""
if self._llm is None:
err = "[JSONGenerator] 未初始化,请先调用init()方法"
raise RuntimeError(err)
- messages = self._build_messages([function], conversation, language)
+ messages = self._build_messages(prompt, conversation)
# 使用LLM的call方法获取响应
full_response = ""
@@ -220,8 +212,8 @@ class JsonGenerator:
async def generate(
self,
function: dict[str, Any],
+ prompt: str,
conversation: list[dict[str, str]] | None = None,
- language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
"""生成JSON;function使用OpenAI标准Function格式"""
if self._llm is None:
@@ -239,7 +231,7 @@ class JsonGenerator:
count += 1
try:
# 如果_single_trial没有抛出异常,直接返回结果,不进行重试
- return await self._single_trial(function, retry_conversation, language)
+ return await self._single_trial(function, prompt, retry_conversation)
except JsonValidationError as e:
_logger.exception(
"[JSONGenerator] 第 %d/%d 次尝试失败",
diff --git a/apps/llm/prompt.py b/apps/llm/prompt.py
index 62689ccce929ca9f517ebee3367ea0d2b516e445..44b618d1ae89ad9a2ef2d173e4e03b572aa29c6f 100644
--- a/apps/llm/prompt.py
+++ b/apps/llm/prompt.py
@@ -8,27 +8,27 @@ from apps.models import LanguageType
JSON_GEN: dict[LanguageType, str] = {
LanguageType.CHINESE: dedent(
r"""
- 你是一个智能助手,可以访问帮助回答用户查询的工具。
- 你的任务是使用可用的工具和背景信息来响应查询。
+ 你是一个智能助手,可以调用帮助回答用户查询的函数。
+ 你的任务是使用可用的函数和背景信息来响应查询。
- - 你可以访问能够帮助收集信息的工具
- - 逐步使用工具,每次使用都基于之前的结果{% if use_xml_format %}
+ - 你可以调用能够帮助收集信息的函数
+ - 逐步调用函数,每次调用都基于之前的结果{% if use_xml_format %}
- 用户的查询在 标签中提供
- - 使用 XML 样式的标签格式化工具调用,其中工具名称是根标签,每个参数是嵌套标签
- - 使用架构中指定的确切工具名称和参数名称
+ - 使用 XML 样式的标签格式化函数调用,其中函数名称是根标签,每个参数是嵌套标签
+ - 使用架构中指定的确切函数名称和参数名称
- 基本格式结构:
- <工具名称>
+ <函数名称>
<参数名称>值参数名称>
- 工具名称>
+ 函数名称>
- 参数类型:
* 字符串:搜索文本
* 数字:10
* 布尔值:true
* 数组(重复标签):项目1项目2
* 对象(嵌套标签):值{% else %}
- - 必须使用提供的工具来回答查询
- - 工具调用必须遵循工具架构中定义的JSON格式{% endif %}
+ - 必须调用提供的函数来回答查询
+ - 函数调用必须遵循函数架构中定义的JSON格式{% endif %}
{% if use_xml_format %}
@@ -80,42 +80,42 @@ JSON_GEN: dict[LanguageType, str] = {
{% endfor %}
{% else %}
- ## 可用工具
+ ## 可用函数
{% for func in functions %}
- **{{ func.name }}**:
- - 工具描述:{{ func.description }}
- - 工具参数Schema:{{ func.parameters }}
+ - 函数描述:{{ func.description }}
+ - 函数参数Schema:{{ func.parameters }}
{% endfor %}
- 请根据用户查询选择合适的工具来回答问题。你必须使用上述工具之一来处理查询。
+ 请根据用户查询选择合适的函数来回答问题。你必须调用上述函数之一来处理查询。
**用户查询**: {{ query }}{% endif %}
""",
),
LanguageType.ENGLISH: dedent(
r"""
- You are an intelligent assistant with access to tools that help answer user queries.
- Your task is to respond to queries using the available tools and background information.
+ You are an intelligent assistant with access to functions that help answer user queries.
+ Your task is to respond to queries using the available functions and background information.
- - You have access to tools that can help gather information
- - Use tools step-by-step, with each use informed by previous results{% if use_xml_format %}
+ - You have access to functions that can help gather information
+ - Call functions step-by-step, with each call informed by previous results{% if use_xml_format %}
- The user's query is provided in the tags
- - Format tool calls using XML-style tags where the tool name is the root tag \
+ - Format function calls using XML-style tags where the function name is the root tag \
and each parameter is a nested tag
- - Use the exact tool name and parameter names as specified in the schema
+ - Use the exact function name and parameter names as specified in the schema
- Basic format structure:
-
+
value
-
+
- Parameter types:
* String: search text
* Number: 10
* Boolean: true
* Array (repeat tags): item1item2
* Object (nest tags): value{% else %}
- - You must use the provided tools to answer the query
- - Tool calls must follow the JSON format defined in the tool schemas{% endif %}
+ - You must call the provided functions to answer the query
+ - Function calls must follow the JSON format defined in the function schemas{% endif %}
{% if use_xml_format %}
@@ -167,13 +167,13 @@ and each parameter is a nested tag
{% endfor %}
{% else %}
- ## Available Tools
+ ## Available Functions
{% for func in functions %}
- **{{ func.name }}**: {{ func.description }}
{% endfor %}
- Please select the appropriate tool(s) from above to answer the user's query.
- You must use one of the tools listed above to process the query.
+ Please select the appropriate function(s) from above to answer the user's query.
+ You must call one of the functions listed above to process the query.
**User Query**: {{ query }}{% endif %}
""",
diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py
index c677d70573d57a8e066f2a9b4a15a10d9665542e..9c53725654e04a13fb56e43b34eca6beab3de49e 100644
--- a/apps/scheduler/call/core.py
+++ b/apps/scheduler/call/core.py
@@ -8,6 +8,7 @@ Core Call类是定义了所有Call都应具有的方法和参数的PyDantic类
import logging
import uuid
from collections.abc import AsyncGenerator
+from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Self
from pydantic import BaseModel, ConfigDict, Field
@@ -82,6 +83,23 @@ class CoreCall(BaseModel):
raise NotImplementedError(err)
+ def _load_prompt(self, prompt_id: str) -> str:
+ """
+ 从Markdown文件加载提示词
+
+ :param prompt_id: 提示词ID,例如 "domain", "facts" 等
+ :return: 提示词内容
+ """
+ # 自动获取language
+ language = self._sys_vars.language.value
+
+ # 组装Prompt文件路径: prompt_id.language.md (例如: domain.en.md)
+ filename = f"{prompt_id}.{language}.md"
+ prompt_dir = Path(__file__).parent.parent.parent / "data" / "prompts" / "call"
+ prompt_file = prompt_dir / filename
+ return prompt_file.read_text(encoding="utf-8")
+
+
@staticmethod
def _assemble_call_vars(executor: "StepExecutor") -> CallVars:
"""组装CallVars"""
diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py
index e32642821c217a8a84da066fc68a222d4fec8d21..783e39309c872038c13d36ba7df11fa2b1781c98 100644
--- a/apps/scheduler/call/facts/facts.py
+++ b/apps/scheduler/call/facts/facts.py
@@ -16,7 +16,7 @@ from apps.schemas.scheduler import CallInfo, CallOutputChunk, CallVars
from apps.services.tag import TagManager
from apps.services.user_tag import UserTagManager
-from .prompt import DOMAIN_FUNCTION, DOMAIN_PROMPT, FACTS_FUNCTION, FACTS_PROMPT
+from .func import DOMAIN_FUNCTION, FACTS_FUNCTION
from .schema import (
DomainGen,
FactsGen,
@@ -80,19 +80,17 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput):
"""执行工具"""
data = FactsInput(**input_data)
- # 组装conversation消息
- facts_prompt = FACTS_PROMPT[self._sys_vars.language]
- facts_conversation = [
- {"role": "system", "content": "You are a helpful assistant."},
- *data.message,
- {"role": "user", "content": facts_prompt},
- ]
+ # 加载并组装facts提示词
+ facts_prompt = self._load_prompt("facts")
# 提取事实信息
facts_result = await json_generator.generate(
function=FACTS_FUNCTION[self._sys_vars.language],
- conversation=facts_conversation,
- language=self._sys_vars.language,
+ conversation=[
+ {"role": "system", "content": "You are a helpful assistant."},
+ *data.message,
+ ],
+ prompt=facts_prompt,
)
facts_obj = FactsGen.model_validate(facts_result)
@@ -100,26 +98,25 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput):
all_tags = await TagManager.get_all_tag()
tag_names = [tag.name for tag in all_tags]
+ # 加载domain提示词模板
+ domain_prompt_template_str = self._load_prompt("domain")
+
# 使用jinja2渲染DOMAIN_PROMPT
jinja_env = SandboxedEnvironment(
loader=BaseLoader(),
autoescape=False,
)
- domain_prompt_template = jinja_env.from_string(DOMAIN_PROMPT[self._sys_vars.language])
+ domain_prompt_template = jinja_env.from_string(domain_prompt_template_str)
domain_prompt = domain_prompt_template.render(available_keywords=tag_names)
- # 组装conversation消息
- domain_conversation = [
- {"role": "system", "content": "You are a helpful assistant."},
- *data.message,
- {"role": "user", "content": domain_prompt},
- ]
-
# 更新用户画像
domain_result = await json_generator.generate(
function=DOMAIN_FUNCTION[self._sys_vars.language],
- conversation=domain_conversation,
- language=self._sys_vars.language,
+ conversation=[
+ {"role": "system", "content": "You are a helpful assistant."},
+ *data.message,
+ ],
+ prompt=domain_prompt,
)
domain_list = DomainGen.model_validate(domain_result)
diff --git a/apps/scheduler/call/facts/prompt.py b/apps/scheduler/call/facts/func.py
similarity index 31%
rename from apps/scheduler/call/facts/prompt.py
rename to apps/scheduler/call/facts/func.py
index a2420f87e951a3c14d4d4cb2ac277cead093f953..d66be4f17b3a00692f6d2569a341a48a6d0114d9 100644
--- a/apps/scheduler/call/facts/prompt.py
+++ b/apps/scheduler/call/facts/func.py
@@ -1,154 +1,10 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""记忆提取工具的提示词"""
-from textwrap import dedent
from typing import Any
from apps.models import LanguageType
-DOMAIN_PROMPT: dict[LanguageType, str] = {
- LanguageType.CHINESE: dedent(
- r"""
- # 任务说明
- 根据对话历史,从下面的"备选提示词列表"中选择最合适的标签。这些标签将用于内容推荐、用户画像构建和个性化服务。
-
- ## 备选提示词列表
- {{ available_keywords }}
-
- ## 选择要求
-
- 1. **精准匹配**:只能从备选提示词列表中选择,不要自创新标签
- 2. **话题相关性**:选择与对话主题直接相关的标签
- 3. **数量控制**:选择3-8个最相关的标签
- 4. **质量标准**:
- - 避免选择重复或高度相似的标签
- - 优先选择具有区分度的标签
- - 按相关性从高到低排序
- 5. **输出格式**:返回JSON对象,包含keywords字段,值为字符串数组
-
- ## 示例
-
- 假设备选提示词列表包含:
- ["北京", "上海", "天气", "气温", "Python", "Java", "装饰器", "设计模式", "餐厅", "美食"]
-
- **示例1:天气查询**
- - 用户:"北京天气如何?"
- - 助手:"北京今天晴。"
- - 选择结果:["北京", "天气", "气温"]
-
- **示例2:如果对话内容与备选列表无关**
- - 用户:"今天心情不错"
- - 助手:"很高兴听到这个消息。"
- - 选择结果:[](如果备选列表中没有相关标签,返回空数组)
- """,
- ),
- LanguageType.ENGLISH: dedent(
- r"""
- # Task Description
- Based on conversation history, select the most appropriate tags from the "Available Keywords List" below. \
-These tags will be used for content recommendation, user profiling, and personalized services.
-
- ## Available Keywords List
- {available_keywords}
-
- ## Selection Requirements
-
- 1. **Exact Match**: Only select from the available keywords list, do not create new tags
- 2. **Topic Relevance**: Select tags directly related to the conversation topic
- 3. **Quantity Control**: Select 3-8 most relevant tags
- 4. **Quality Standards**:
- - Avoid selecting duplicate or highly similar tags
- - Prioritize selecting distinctive tags
- - Sort by relevance from high to low
- 5. **Output Format**: Return JSON object containing keywords field with string array value
-
- ## Examples
-
- Assume the available keywords list contains:
- ["Beijing", "Shanghai", "weather", "temperature", "Python", "Java", "decorator", "design pattern", \
-"restaurant", "food"]
-
- **Example 1: Weather Query**
- - User: "What's the weather like in Beijing?"
- - Assistant: "Beijing is sunny today."
- - Selection result: ["Beijing", "weather", "temperature"]
-
- **Example 2: If conversation content is unrelated to the available list**
- - User: "I'm feeling good today"
- - Assistant: "Glad to hear that."
- - Selection result: [] (return empty array if no relevant tags in the available list)
- """,
- ),
-}
-
-FACTS_PROMPT: dict[str, str] = {
- LanguageType.CHINESE: dedent(
- r"""
- # 任务说明
- 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。
-
- ## 关注的信息类型
-
- 1. **实体**:对话中涉及到的实体。例如:姓名、地点、组织、事件等
- 2. **偏好**:对待实体的态度。例如喜欢、讨厌等
- 3. **关系**:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等
- 4. **动作**:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等
-
- ## 提取要求
-
- 1. 事实必须准确,只能从对话中提取
- 2. 事实必须清晰、简洁、易于理解,每条事实少于30个字
- 3. 输出格式:返回JSON对象,包含facts字段,值为字符串数组
-
- ## 示例
-
- **示例1:景点查询**
- - 用户:"杭州西湖有哪些景点?"
- - 助手:"西湖周围有许多著名的景点,包括苏堤、白堤、断桥、三潭印月等。"
- - 提取结果:["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"]
-
- **示例2:用户偏好**
- - 用户:"我喜欢看科幻电影"
- - 助手:"科幻电影确实很吸引人,比如《星际穿越》等。"
- - 提取结果:["用户喜欢看科幻电影", "用户可能对《星际穿越》感兴趣"]
- """,
- ),
- LanguageType.ENGLISH: dedent(
- r"""
- # Task Description
- Extract key information from the conversation and organize it into unique, easily understandable facts, \
-including user preferences, relationships, entities, etc.
-
- ## Information Types to Focus On
-
- 1. **Entities**: Entities involved in the conversation. For example: names, locations, organizations, \
-events, etc.
- 2. **Preferences**: Attitudes towards entities. For example: like, dislike, etc.
- 3. **Relationships**: Relationships between users and entities, or between two entities. For example: \
-include, parallel, mutually exclusive, etc.
- 4. **Actions**: Specific actions that affect entities. For example: query, search, browse, click, etc.
-
- ## Extraction Requirements
-
- 1. Facts must be accurate and can only be extracted from the conversation
- 2. Facts must be clear, concise, and easy to understand, each fact less than 30 words
- 3. Output format: Return JSON object containing facts field with string array value
-
- ## Examples
-
- **Example 1: Attraction Query**
- - User: "What are the attractions in Hangzhou West Lake?"
- - Assistant: "Notable attractions include Su Causeway, Bai Causeway, Broken Bridge, etc."
- - Extraction result: ["Hangzhou West Lake has Su Causeway, Bai Causeway, Broken Bridge, etc."]
-
- **Example 2: User Preference**
- - User: "I like watching sci-fi movies"
- - Assistant: "Sci-fi movies are indeed attractive, such as Interstellar."
- - Extraction result: ["User likes watching sci-fi movies", "User may be interested in Interstellar"]
- """,
- ),
-}
-
DOMAIN_FUNCTION: dict[LanguageType, dict[str, Any]] = {
LanguageType.CHINESE: {
"name": "extract_domain",
diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index 2a96f920f479f4dd0663c8f098e5efd269fffb1f..b1b309a09f8b0cb6475c47e5f8de0fd497e56880 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -3,11 +3,9 @@
import logging
import uuid
-from typing import cast
-import anyio
from mcp.types import TextContent
-from pydantic import Field
+from pydantic import Field, ValidationError
from apps.constants import AGENT_FINAL_STEP_NAME, AGENT_MAX_RETRY_TIMES, AGENT_MAX_STEPS
from apps.models import ExecutorHistory, ExecutorStatus, MCPTools, StepStatus
@@ -18,8 +16,9 @@ from apps.scheduler.mcp_agent.plan import MCPPlanner
from apps.scheduler.pool.mcp.pool import mcp_pool
from apps.schemas.enum_var import EventType
from apps.schemas.flow import AgentAppMetadata
-from apps.schemas.mcp import Step
+from apps.schemas.mcp import MCPRiskConfirm, Step
from apps.schemas.message import FlowParams
+from apps.schemas.task import AgentHistoryExtra
from apps.services.appcenter import AppCenterManager
from apps.services.mcp_service import MCPServiceManager
from apps.services.user import UserManager
@@ -40,12 +39,13 @@ class MCPAgentExecutor(BaseExecutor):
async def init(self) -> None:
"""初始化MCP Agent"""
# 初始化必要变量
- self._step_cnt = 0
- self._retry_times = 0
- self._mcp_list = []
- self._current_input = {}
- self._current_tool = None
- self._tool_list = {}
+ self._step_cnt: int = 0
+ self._retry_times: int = 0
+ self._mcp_list: list = []
+ self._current_input: dict = {}
+ self._current_tool: MCPTools | None = None
+ self._current_goal: str = ""
+ self._tool_list: dict[str, MCPTools] = {}
# 初始化MCP Host相关对象
self._planner = MCPPlanner(self.task)
self._host = MCPHost(self.task)
@@ -104,25 +104,19 @@ class MCPAgentExecutor(BaseExecutor):
inputSchema={}, outputSchema={},
)
- def _validate_task_state(self) -> None:
- """验证任务状态是否存在"""
+ async def get_tool_input_param(self, *, is_first: bool) -> None:
+ """获取工具输入参数"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
- async def get_tool_input_param(self, *, is_first: bool) -> None:
- """获取工具输入参数"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
-
if is_first:
# 获取第一个输入参数
- self._current_tool = self._tool_list[state.stepName]
+ self._current_tool = self._tool_list[self.task.state.stepName]
# 更新host的task引用以确保使用最新的context
- self._host.task = self.task
self._current_input = await self._host.get_first_input_params(
- self._current_tool, self.task.runtime.userInput,
+ self._current_tool, self.task,
)
else:
# 获取后续输入参数
@@ -132,23 +126,14 @@ class MCPAgentExecutor(BaseExecutor):
else:
params = {}
params_description = ""
- self._current_tool = self._tool_list[state.stepName]
+ self._current_tool = self._tool_list[self.task.state.stepName]
self._current_input = await self._host.fill_params(
self._current_tool,
- self.task.runtime.userInput,
+ self.task,
self._current_input,
- state.errorMessage,
params,
params_description,
)
- self.task.state = state
-
- def _validate_current_tool(self) -> None:
- """验证当前工具是否存在"""
- if self._current_tool is None:
- err = "[MCPAgentExecutor] 当前工具不存在"
- _logger.error(err)
- raise RuntimeError(err)
def _get_error_message_str(self, error_message: dict | str | None) -> str:
"""将错误消息转换为字符串"""
@@ -158,28 +143,42 @@ class MCPAgentExecutor(BaseExecutor):
def _update_last_context_status(self, step_status: StepStatus, output_data: dict | None = None) -> None:
"""更新最后一个context的状态(如果是当前步骤)"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
- if len(self.task.context) and self.task.context[-1].stepId == state.stepId:
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId:
self.task.context[-1].stepStatus = step_status
if output_data is not None:
self.task.context[-1].outputData = output_data
+ def _set_step_error(self, error_msg: str, data: dict | None = None) -> None:
+ """设置步骤状态为ERROR"""
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ self.task.state.stepStatus = StepStatus.ERROR
+ self.task.state.errorMessage = {
+ "err_msg": error_msg,
+ "data": data if data is not None else self._current_input,
+ }
+
async def _add_error_to_context(self, step_status: StepStatus) -> None:
"""添加错误到context,先移除重复的步骤再添加"""
self._remove_last_context_if_same_step()
self.task.context.append(
- self._create_executor_history(step_status=step_status),
+ self._create_executor_history(step_status=step_status, step_goal=self._current_goal),
)
async def _handle_final_step(self) -> None:
"""处理最终步骤,设置状态并总结"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
- state.executorStatus = ExecutorStatus.SUCCESS
- state.stepStatus = StepStatus.SUCCESS
- self.task.state = state
- await self._push_message(EventType.EXECUTOR_STOP, data={})
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ self.task.state.executorStatus = ExecutorStatus.SUCCESS
+ self.task.state.stepStatus = StepStatus.SUCCESS
await self.summarize()
def _create_executor_history(
@@ -187,42 +186,50 @@ class MCPAgentExecutor(BaseExecutor):
step_status: StepStatus,
input_data: dict | None = None,
output_data: dict | None = None,
- extra_data: dict | None = None,
+ step_goal: str = "",
) -> ExecutorHistory:
"""创建ExecutorHistory对象"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
- current_tool = cast("MCPTools", self._current_tool)
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+
+ # 使用AgentHistoryExtra数据结构保存extra_data
+ extra_data = AgentHistoryExtra(step_goal=step_goal).model_dump()
+
return ExecutorHistory(
taskId=self.task.metadata.id,
- stepId=state.stepId,
- stepName=current_tool.toolName if self._current_tool else state.stepName,
- stepType=str(current_tool.id) if self._current_tool else "",
+ stepId=self.task.state.stepId,
+ stepName=self._current_tool.toolName if self._current_tool else self.task.state.stepName,
+ stepType=str(self._current_tool.id) if self._current_tool else "",
stepStatus=step_status,
- executorId=state.executorId,
- executorName=state.executorName,
- executorStatus=state.executorStatus,
+ executorId=self.task.state.executorId,
+ executorName=self.task.state.executorName,
+ executorStatus=self.task.state.executorStatus,
inputData=input_data or {},
outputData=output_data or {},
- extraData=extra_data or {},
+ extraData=extra_data,
)
def _remove_last_context_if_same_step(self) -> None:
"""如果最后一个context是当前步骤,则删除它"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
- if len(self.task.context) and self.task.context[-1].stepId == state.stepId:
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId:
del self.task.context[-1]
async def _handle_step_error_and_continue(self) -> None:
"""处理步骤错误并继续下一步"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
- error_output = {"message": state.errorMessage}
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ error_output = {"message": self.task.state.errorMessage}
# 先更新stepStatus
- state.stepStatus = StepStatus.ERROR
- self.task.state = state
+ self.task.state.stepStatus = StepStatus.ERROR
await self._push_message(
EventType.STEP_OUTPUT,
@@ -230,7 +237,7 @@ class MCPAgentExecutor(BaseExecutor):
)
# 更新或添加错误到context
- if len(self.task.context) and self.task.context[-1].stepId == state.stepId:
+ if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId:
self._update_last_context_status(StepStatus.ERROR, error_output)
else:
self.task.context.append(
@@ -238,90 +245,68 @@ class MCPAgentExecutor(BaseExecutor):
step_status=StepStatus.ERROR,
input_data=self._current_input,
output_data=error_output,
+ step_goal=self._current_goal,
),
)
await self.get_next_step()
async def confirm_before_step(self) -> None:
"""确认前步骤"""
- self._validate_task_state()
- self._validate_current_tool()
- state = cast("ExecutorCheckpoint", self.task.state)
- current_tool = cast("MCPTools", self._current_tool)
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ if self._current_tool is None:
+ err = "[MCPAgentExecutor] 当前工具不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
- confirm_message = await self._planner.get_tool_risk(current_tool, self._current_input, "")
+ confirm_message = await self._planner.get_tool_risk(self._current_tool, self._current_input, "")
# 先更新状态
- state.executorStatus = ExecutorStatus.WAITING
- state.stepStatus = StepStatus.WAITING
- self.task.state = state
+ self.task.state.executorStatus = ExecutorStatus.WAITING
+ self.task.state.stepStatus = StepStatus.WAITING
await self._push_message(
EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True),
)
self.task.context.append(
self._create_executor_history(
- step_status=state.stepStatus,
- extra_data=confirm_message.model_dump(exclude_none=True, by_alias=True),
+ step_status=self.task.state.stepStatus,
+ step_goal=self._current_goal,
),
)
async def run_step(self) -> None:
"""执行步骤"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
- state.executorStatus = ExecutorStatus.RUNNING
- state.stepStatus = StepStatus.RUNNING
- self.task.state = state
- self._validate_current_tool()
- current_tool = cast("MCPTools", self._current_tool)
-
- mcp_client = await mcp_pool.get(current_tool.mcpId, self.task.metadata.userId)
- if not mcp_client:
- _logger.exception("[MCPAgentExecutor] MCP客户端不存在: %s", current_tool.mcpId)
- state.stepStatus = StepStatus.ERROR
- state.errorMessage = {
- "err_msg": f"MCP客户端不存在: {current_tool.mcpId}",
- "data": self._current_input,
- }
- self.task.state = state
- return
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ self.task.state.executorStatus = ExecutorStatus.RUNNING
+ self.task.state.stepStatus = StepStatus.RUNNING
+ if self._current_tool is None:
+ err = "[MCPAgentExecutor] 当前工具不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
try:
- output_data = await mcp_client.call_tool(current_tool.toolName, self._current_input)
- except anyio.ClosedResourceError as e:
- _logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", current_tool.mcpId)
- # 停止当前用户MCP进程
- await mcp_pool.stop(current_tool.mcpId, self.task.metadata.userId)
- state.stepStatus = StepStatus.ERROR
- state.errorMessage = {
- "err_msg": str(e),
- "data": self._current_input,
- }
- self.task.state = state
- return
+ mcp_client = await mcp_pool.get(self._current_tool.mcpId, self.task.metadata.userId)
+ output_data = await mcp_client.call_tool(self._current_tool.toolName, self._current_input)
except Exception as e:
- _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", state.stepName)
- state.stepStatus = StepStatus.ERROR
- state.errorMessage = {
- "err_msg": str(e),
- "data": self._current_input,
- }
- self.task.state = state
+ _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", self.task.state.stepName)
+ # 统一停止MCP进程,不再区分异常类型
+ await mcp_pool.stop(self._current_tool.mcpId, self.task.metadata.userId)
+ self._set_step_error(str(e))
return
- _logger.error("当前工具名称: %s, 输出参数: %s", state.stepName, output_data)
+ _logger.info("[MCPAgentExecutor] 当前工具名称: %s, 输出参数: %s", self.task.state.stepName, output_data)
if output_data.isError:
err = ""
for output in output_data.content:
if isinstance(output, TextContent):
err += output.text
- state.stepStatus = StepStatus.ERROR
- state.errorMessage = {
- "err_msg": err,
- "data": {},
- }
- self.task.state = state
+ self._set_step_error(err, {})
return
message = ""
@@ -333,8 +318,7 @@ class MCPAgentExecutor(BaseExecutor):
}
# 先更新状态为成功
- state.stepStatus = StepStatus.SUCCESS
- self.task.state = state
+ self.task.state.stepStatus = StepStatus.SUCCESS
await self._push_message(EventType.STEP_INPUT, self._current_input)
await self._push_message(EventType.STEP_OUTPUT, output_data)
@@ -343,55 +327,57 @@ class MCPAgentExecutor(BaseExecutor):
step_status=StepStatus.SUCCESS,
input_data=self._current_input,
output_data=output_data,
+ step_goal=self._current_goal,
),
)
async def generate_params_with_null(self) -> None:
"""生成参数补充"""
- self._validate_task_state()
- self._validate_current_tool()
- state = cast("ExecutorCheckpoint", self.task.state)
- current_tool = cast("MCPTools", self._current_tool)
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ if self._current_tool is None:
+ err = "[MCPAgentExecutor] 当前工具不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
params_with_null = await self._planner.get_missing_param(
- current_tool,
+ self._current_tool,
self._current_input,
- state.errorMessage,
+ self.task.state.errorMessage,
)
# TODO
- error_msg = self._get_error_message_str(state.errorMessage)
+ error_msg = self._get_error_message_str(self.task.state.errorMessage)
# 先更新状态
- state.executorStatus = ExecutorStatus.WAITING
- state.stepStatus = StepStatus.PARAM
- self.task.state = state
+ self.task.state.executorStatus = ExecutorStatus.WAITING
+ self.task.state.stepStatus = StepStatus.PARAM
await self._push_message(
EventType.STEP_WAITING_FOR_PARAM, data={"message": error_msg, "params": params_with_null},
)
self.task.context.append(
self._create_executor_history(
- step_status=state.stepStatus,
- extra_data={
- "message": error_msg,
- "params": params_with_null,
- },
+ step_status=self.task.state.stepStatus,
+ step_goal=self._current_goal,
),
)
async def get_next_step(self) -> None:
"""获取下一步"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
if self._step_cnt < AGENT_MAX_STEPS:
self._step_cnt += 1
- history = await self._host.assemble_memory(self.task.runtime, self.task.context)
max_retry = 3
step = None
for _ in range(max_retry):
try:
- step = await self._planner.create_next_step(history, list(self._tool_list.values()))
+ step = await self._planner.create_next_step(list(self._tool_list.values()), self.task)
if step.tool_name in self._tool_list:
break
except Exception:
@@ -401,77 +387,100 @@ class MCPAgentExecutor(BaseExecutor):
tool_name=AGENT_FINAL_STEP_NAME,
description=AGENT_FINAL_STEP_NAME,
)
- state.stepId = uuid.uuid4()
- state.stepName = step.tool_name
- state.stepStatus = StepStatus.INIT
+ self.task.state.stepId = uuid.uuid4()
+ self.task.state.stepName = step.tool_name
+ self.task.state.stepStatus = StepStatus.INIT
+ # 保存步骤目标
+ self._current_goal = step.description
else:
# 没有下一步了,结束流程
- state.stepName = AGENT_FINAL_STEP_NAME
- self.task.state = state
+ self.task.state.stepName = AGENT_FINAL_STEP_NAME
+ self._current_goal = AGENT_FINAL_STEP_NAME
async def error_handle_after_step(self) -> None:
"""步骤执行失败后的错误处理"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
- error_output = {"message": state.errorMessage}
- state.stepStatus = StepStatus.ERROR
- state.executorStatus = ExecutorStatus.ERROR
- self.task.state = state
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
+ error_output = {"message": self.task.state.errorMessage}
+ self.task.state.stepStatus = StepStatus.ERROR
+ self.task.state.executorStatus = ExecutorStatus.ERROR
await self._push_message(EventType.STEP_OUTPUT, data=error_output)
- await self._push_message(EventType.EXECUTOR_STOP, data={})
- await self._add_error_to_context(state.stepStatus)
+ await self._add_error_to_context(self.task.state.stepStatus)
async def work(self) -> None: # noqa: C901, PLR0912
"""执行当前步骤"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
- if state.stepStatus == StepStatus.INIT:
+ # 处理初始化状态
+ if self.task.state.stepStatus == StepStatus.INIT:
await self.get_tool_input_param(is_first=True)
if not self._user.autoExecute:
# 等待用户确认
await self.confirm_before_step()
return
- state.stepStatus = StepStatus.RUNNING
- self.task.state = state
- elif state.stepStatus in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]:
- if state.stepStatus == StepStatus.PARAM:
- self._remove_last_context_if_same_step()
- await self.get_tool_input_param(is_first=False)
- elif state.stepStatus == StepStatus.WAITING:
- if self.params:
- self._remove_last_context_if_same_step()
- else:
- state.executorStatus = ExecutorStatus.CANCELLED
- state.stepStatus = StepStatus.CANCELLED
- self.task.state = state
-
- await self._push_message(EventType.STEP_OUTPUT, data={})
- await self._push_message(EventType.EXECUTOR_STOP, data={})
- self._update_last_context_status(StepStatus.CANCELLED)
- return
- max_retry = 5
- for i in range(max_retry):
+ self.task.state.stepStatus = StepStatus.RUNNING
+
+ # 处理参数补充状态
+ if self.task.state.stepStatus == StepStatus.PARAM:
+ self._remove_last_context_if_same_step()
+ await self.get_tool_input_param(is_first=False)
+
+ # 处理等待状态
+ if self.task.state.stepStatus == StepStatus.WAITING:
+ should_cancel = False
+ if self.params:
+ # 解析风险确认参数
+ try:
+ risk_confirm = MCPRiskConfirm.model_validate(self.params)
+ if risk_confirm.confirm:
+ # 用户确认继续执行
+ self._remove_last_context_if_same_step()
+ else:
+ # 用户拒绝执行
+ should_cancel = True
+ except ValidationError as e:
+ _logger.warning("[MCPAgentExecutor] 解析风险确认参数失败: %s, 取消执行", e)
+ should_cancel = True
+ else:
+ # 没有参数则取消执行
+ should_cancel = True
+
+ if should_cancel:
+ self.task.state.executorStatus = ExecutorStatus.CANCELLED
+ self.task.state.stepStatus = StepStatus.CANCELLED
+ await self._push_message(EventType.STEP_OUTPUT, data={})
+ self._update_last_context_status(StepStatus.CANCELLED)
+ return
+
+ # 执行步骤(PARAM、WAITING、RUNNING 状态都需要执行)
+ if self.task.state.stepStatus in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]:
+ for i in range(AGENT_MAX_RETRY_TIMES):
if i != 0:
await self.get_tool_input_param(is_first=True)
await self.run_step()
- if state.stepStatus == StepStatus.SUCCESS:
+ if self.task.state.stepStatus == StepStatus.SUCCESS:
break
- elif state.stepStatus == StepStatus.ERROR:
- # 错误处理
+
+ # 处理错误状态
+ elif self.task.state.stepStatus == StepStatus.ERROR:
if self._retry_times >= AGENT_MAX_RETRY_TIMES:
await self.error_handle_after_step()
elif self._user.autoExecute:
await self._handle_step_error_and_continue()
else:
- mcp_tool = self._tool_list[state.stepName]
- error_msg = self._get_error_message_str(state.errorMessage)
+ mcp_tool = self._tool_list[self.task.state.stepName]
+ error_msg = self._get_error_message_str(self.task.state.errorMessage)
is_param_error = await self._planner.is_param_error(
- await self._host.assemble_memory(self.task.runtime, self.task.context),
+ self.task,
error_msg,
mcp_tool,
- "",
+ self._current_goal,
self._current_input,
)
if is_param_error.is_param_error:
@@ -479,14 +488,16 @@ class MCPAgentExecutor(BaseExecutor):
await self.generate_params_with_null()
else:
await self._handle_step_error_and_continue()
- elif state.stepStatus == StepStatus.SUCCESS:
+
+ # 处理成功状态
+ elif self.task.state.stepStatus == StepStatus.SUCCESS:
await self.get_next_step()
async def summarize(self) -> None:
"""总结"""
thinking_started = False
async for chunk in self._planner.generate_answer(
- await self._host.assemble_memory(self.task.runtime, self.task.context),
+ self.task,
self.llm,
):
if chunk.reasoning_content:
@@ -528,48 +539,50 @@ class MCPAgentExecutor(BaseExecutor):
async def run(self) -> None:
"""执行MCP Agent的主逻辑"""
- self._validate_task_state()
- state = cast("ExecutorCheckpoint", self.task.state)
+ if not self.task.state:
+ err = "[MCPAgentExecutor] 任务状态不存在"
+ _logger.error(err)
+ raise RuntimeError(err)
# 初始化MCP服务
await self.load_mcp()
- if state.executorStatus == ExecutorStatus.INIT:
+ if self.task.state.executorStatus == ExecutorStatus.INIT:
# 初始化状态
- state.executorId = str(uuid.uuid4())
- state.executorName = (await self._planner.get_flow_name()).flow_name
+ self.task.state.executorId = str(uuid.uuid4())
+ self.task.state.executorName = (await self._planner.get_flow_name()).flow_name
await self.get_next_step()
- self.task.state = state
- state.executorStatus = ExecutorStatus.RUNNING
- self.task.state = state
-
- if state.stepName == AGENT_FINAL_STEP_NAME:
- # 如果已经是最后一步,直接结束
- await self._handle_final_step()
- return
+ self.task.state.executorStatus = ExecutorStatus.RUNNING
try:
- while state.executorStatus == ExecutorStatus.RUNNING:
- await self.work()
-
- if state.stepName == AGENT_FINAL_STEP_NAME:
+ if self.task.state.stepName == AGENT_FINAL_STEP_NAME:
# 如果已经是最后一步,直接结束
await self._handle_final_step()
+ else:
+ while self.task.state.executorStatus == ExecutorStatus.RUNNING:
+ await self.work()
+ if self.task.state.stepName == AGENT_FINAL_STEP_NAME:
+ # 如果已经是最后一步,直接结束
+ await self._handle_final_step()
+ break
except Exception as e:
_logger.exception("[MCPAgentExecutor] 执行过程中发生错误")
- state.executorStatus = ExecutorStatus.ERROR
- state.errorMessage = {
+ self.task.state.executorStatus = ExecutorStatus.ERROR
+ self.task.state.errorMessage = {
"err_msg": str(e),
"data": {},
}
- state.stepStatus = StepStatus.ERROR
- self.task.state = state
- error_output = {"message": state.errorMessage}
+ self.task.state.stepStatus = StepStatus.ERROR
+ error_output = {"message": self.task.state.errorMessage}
await self._push_message(EventType.STEP_OUTPUT, data=error_output)
- await self._push_message(EventType.EXECUTOR_STOP, data={})
- await self._add_error_to_context(state.stepStatus)
+ await self._add_error_to_context(self.task.state.stepStatus)
finally:
+ # 统一推送EXECUTOR_STOP消息
+ if self.task.state.executorStatus != ExecutorStatus.RUNNING:
+ await self._push_message(EventType.EXECUTOR_STOP, data={})
+
+ # 清理MCP客户端
for mcp_service in self._mcp_list:
try:
await mcp_pool.stop(mcp_service.id, self.task.metadata.userId)
diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py
index 9de9e7e42c77f5afbf9a9f7c552be0b6b1a66150..6630a4b6b87be1e5f2a30f3ede72ec5639e5540a 100644
--- a/apps/scheduler/executor/flow.py
+++ b/apps/scheduler/executor/flow.py
@@ -77,11 +77,11 @@ class FlowExecutor(BaseExecutor):
# 尝试恢复State
if (
- self.state
- and self.state.executorStatus != ExecutorStatus.INIT
+ not self.task.state
+ or self.task.state.executorStatus == ExecutorStatus.INIT
):
# 创建ExecutorState
- self.state = ExecutorCheckpoint(
+ self.task.state = ExecutorCheckpoint(
taskId=self.task.metadata.id,
appId=self.post_body_app.app_id,
executorId=str(self.flow_id),
@@ -142,20 +142,25 @@ class FlowExecutor(BaseExecutor):
async def _find_flow_next(self) -> list[StepQueueItem]:
"""在当前步骤执行前,尝试获取下一步"""
+ if not self.task.state:
+ err = "[FlowExecutor] 任务状态不存在"
+ logger.error(err)
+ raise RuntimeError(err)
+
# 如果当前步骤为结束,则直接返回
- if self.state.stepId == "end" or not self.state.stepId:
+ if self.task.state.stepId == "end" or not self.task.state.stepId:
return []
if self.current_step.step.type == SpecialCallType.CHOICE.value:
# 如果是choice节点,获取分支ID
branch_id = self.task.context[-1].outputData["branch_id"]
if branch_id:
- next_steps = await self._find_next_id(str(self.state.stepId) + "." + branch_id)
+ next_steps = await self._find_next_id(str(self.task.state.stepId) + "." + branch_id)
logger.info("[FlowExecutor] 分支ID:%s", branch_id)
else:
logger.warning("[FlowExecutor] 没有找到分支ID,返回空列表")
return []
else:
- next_steps = await self._find_next_id(self.state.stepId)
+ next_steps = await self._find_next_id(self.task.state.stepId)
# 如果step没有任何出边,直接跳到end
if not next_steps:
return [
@@ -183,10 +188,15 @@ class FlowExecutor(BaseExecutor):
"""
logger.info("[FlowExecutor] 运行工作流")
+ if not self.task.state:
+ err = "[FlowExecutor] 任务状态不存在"
+ logger.error(err)
+ raise RuntimeError(err)
+
# 获取首个步骤
first_step = StepQueueItem(
- step_id=self.state.stepId,
- step=self.flow.steps[self.state.stepId],
+ step_id=self.task.state.stepId,
+ step=self.flow.steps[self.task.state.stepId],
)
# 头插开始前的系统步骤,并执行
@@ -203,13 +213,13 @@ class FlowExecutor(BaseExecutor):
# 插入首个步骤
self.step_queue.append(first_step)
- self.state.executorStatus = ExecutorStatus.RUNNING
+ self.task.state.executorStatus = ExecutorStatus.RUNNING
# 运行Flow(未达终点)
is_error = False
while not self._reached_end:
# 如果当前步骤出错,执行错误处理步骤
- if self.state.stepStatus == StepStatus.ERROR:
+ if self.task.state.stepStatus == StepStatus.ERROR:
logger.warning("[FlowExecutor] Executor出错,执行错误处理步骤")
self.step_queue.clear()
self.step_queue.appendleft(
@@ -227,7 +237,7 @@ class FlowExecutor(BaseExecutor):
params={
"user_prompt": FLOW_ERROR_PROMPT[self.task.runtime.language].replace(
"{{ error_info }}",
- self.state.errorMessage["err_msg"],
+ self.task.state.errorMessage["err_msg"],
),
},
),
@@ -252,9 +262,9 @@ class FlowExecutor(BaseExecutor):
# 更新Task状态
if is_error:
- self.state.executorStatus = ExecutorStatus.ERROR
+ self.task.state.executorStatus = ExecutorStatus.ERROR
else:
- self.state.executorStatus = ExecutorStatus.SUCCESS
+ self.task.state.executorStatus = ExecutorStatus.SUCCESS
# 尾插运行结束后的系统步骤
for step in FIXED_STEPS_AFTER_END:
diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py
index 8ae8b5051d841363ad605960de920c6850f4e317..420193bbac2c76615586b5929b9517d5b8c30cd4 100644
--- a/apps/scheduler/mcp/host.py
+++ b/apps/scheduler/mcp/host.py
@@ -48,11 +48,11 @@ class MCPHost:
logger.warning("用户 %s 未启用MCP %s", self._user_id, mcp_id)
return None
- # 获取MCP配置
+ # 获取MCP配置(如果失败会抛出RuntimeError)
try:
return await mcp_pool.get(mcp_id, self._user_id)
- except KeyError:
- logger.warning("用户 %s 的MCP %s 没有运行中的实例,请检查环境", self._user_id, mcp_id)
+ except (KeyError, RuntimeError) as e:
+ logger.warning("获取MCP客户端失败: %s", e)
return None
@@ -161,12 +161,8 @@ class MCPHost:
async def call_tool(self, tool: MCPTools, plan_item: MCPPlanItem) -> list[dict[str, Any]]:
"""调用工具"""
- # 拿到Client
+ # 拿到Client(如果失败会抛出异常)
client = await mcp_pool.get(tool.mcpId, self._user_id)
- if client is None:
- err = f"[MCPHost] MCP Server不合法: {tool.mcpId}"
- logger.error(err)
- raise ValueError(err)
# 填充参数
params = await self._fill_params(tool, plan_item.instruction)
diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py
index afcd1f440f3baeb86472bfea5046ab7c1af15450..1c97ad28b9a21583438d34b6c216d2172484ef0d 100644
--- a/apps/scheduler/mcp/prompt.py
+++ b/apps/scheduler/mcp/prompt.py
@@ -394,7 +394,7 @@ MEMORY_TEMPLATE: dict[str, str] = {
LanguageType.CHINESE: dedent(
r"""
{% if msg_type == "user" %}
- 第{{ step_index }}步:{{ step_description }}
+ 第{{ step_index }}步:{{ step_goal }}
调用工具 `{{ step_name }}`,并提供参数 `{{ input_data | tojson }}`。
{% elif msg_type == "assistant" %}
第{{ step_index }}步执行完成。
@@ -406,7 +406,7 @@ MEMORY_TEMPLATE: dict[str, str] = {
LanguageType.ENGLISH: dedent(
r"""
{% if msg_type == "user" %}
- Step {{ step_index }}: {{ step_description }}
+ Step {{ step_index }}: {{ step_goal }}
Called tool `{{ step_name }}` and provided parameters `{{ input_data | tojson }}`
{% elif msg_type == "assistant" %}
Step {{ step_index }} execution completed.
diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py
index 3c4c9da77c76de47ba87764a3ca4567d53edca8c..430a1a9c5f1e12ca57d98c2356977f7baaa51cf6 100644
--- a/apps/scheduler/mcp_agent/base.py
+++ b/apps/scheduler/mcp_agent/base.py
@@ -1,23 +1,77 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP基类"""
+import json
import logging
+from typing import Any
+
+from jinja2 import BaseLoader
+from jinja2.sandbox import SandboxedEnvironment
from apps.models import LanguageType
+from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE
from apps.schemas.task import TaskData
_logger = logging.getLogger(__name__)
+_env = SandboxedEnvironment(
+ loader=BaseLoader,
+ autoescape=False,
+ trim_blocks=True,
+ lstrip_blocks=True,
+)
+
+
+def tojson_filter(value: dict[str, Any]) -> str:
+ """将字典转换为紧凑JSON字符串"""
+ return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
+
+
+_env.filters["tojson"] = tojson_filter
class MCPBase:
"""MCP基类"""
_user_id: str
- task: TaskData
_language: LanguageType
+ _goal: str
def __init__(self, task: TaskData) -> None:
"""初始化MCP基类"""
self._user_id = task.metadata.userId
- self.task = task
+ self._goal = task.runtime.userInput
self._language = task.runtime.language
+
+ @staticmethod
+ async def assemble_memory(task: TaskData) -> list[dict[str, str]]:
+ """组装记忆"""
+ history = []
+ template = MEMORY_TEMPLATE[task.runtime.language]
+
+ for index, item in enumerate(task.context, start=1):
+ # 用户消息:步骤描述和工具调用
+ step_goal = item.extraData["step_goal"]
+ user_content = _env.from_string(template).render(
+ msg_type="user",
+ step_index=index,
+ step_goal=step_goal,
+ step_name=item.stepName,
+ input_data=item.inputData,
+ )
+ history.append({
+ "role": "user",
+ "content": user_content,
+ })
+
+ # 助手消息:步骤执行结果
+ assistant_content = _env.from_string(template).render(
+ msg_type="assistant",
+ step_index=index,
+ step_status=item.stepStatus.value,
+ output_data=item.outputData,
+ )
+ history.append({
+ "role": "assistant",
+ "content": assistant_content,
+ })
+ return history
diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py
index e7a33f277c71a61918c0a48ed89013533b739fe7..c3f06d4fc0fd7f841ee6827095be87ea7394e4a6 100644
--- a/apps/scheduler/mcp_agent/host.py
+++ b/apps/scheduler/mcp_agent/host.py
@@ -1,7 +1,6 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP宿主"""
-import json
import logging
from typing import Any
@@ -9,10 +8,10 @@ from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from apps.llm import json_generator
-from apps.models import ExecutorHistory, LanguageType, MCPTools, TaskRuntime
-from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE
+from apps.models import LanguageType, MCPTools
from apps.scheduler.mcp_agent.base import MCPBase
-from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS
+from apps.scheduler.mcp_agent.prompt import REPAIR_PARAMS, get_gen_params_prompt
+from apps.schemas.task import TaskData
_logger = logging.getLogger(__name__)
_env = SandboxedEnvironment(
@@ -21,14 +20,6 @@ _env = SandboxedEnvironment(
trim_blocks=True,
lstrip_blocks=True,
)
-
-
-def tojson_filter(value: dict[str, Any]) -> str:
- """将字典转换为紧凑JSON字符串"""
- return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
-
-
-_env.filters["tojson"] = tojson_filter
_LLM_QUERY_FIX = {
LanguageType.CHINESE: "请生成修复之后的工具参数",
LanguageType.ENGLISH: "Please generate the tool parameters after repair",
@@ -38,27 +29,23 @@ _LLM_QUERY_FIX = {
class MCPHost(MCPBase):
"""MCP宿主服务"""
- @staticmethod
- async def assemble_memory(runtime: TaskRuntime, context: list[ExecutorHistory]) -> str:
- """组装记忆"""
- return _env.from_string(MEMORY_TEMPLATE[runtime.language]).render(
- context_list=context,
- )
-
async def get_first_input_params(
- self, mcp_tool: MCPTools, current_goal: str,
+ self, mcp_tool: MCPTools, task: TaskData,
) -> dict[str, Any]:
"""填充工具参数"""
+ # 加载提示词模板
+ prompt_template = get_gen_params_prompt(task.runtime.language)
+
# 更清晰的输入指令,这样可以调用generate
- prompt = _env.from_string(GEN_PARAMS[self.task.runtime.language]).render(
+ prompt = _env.from_string(prompt_template).render(
tool_name=mcp_tool.toolName,
tool_description=mcp_tool.description,
- goal=self.task.runtime.userInput,
- current_goal=current_goal,
+ goal=task.runtime.userInput,
+ current_goal=task.runtime.userInput,
input_schema=mcp_tool.inputSchema,
- background_info=await self.assemble_memory(self.task.runtime, self.task.context),
+ background_info=await self.assemble_memory(task),
)
- _logger.info("[MCPHost] 填充工具参数: %s", prompt)
+ _logger.info("[MCPHost] 填充工具参数: %s", mcp_tool.toolName)
# 使用json_generator解析结果
function = {
"name": mcp_tool.toolName,
@@ -71,23 +58,24 @@ class MCPHost(MCPBase):
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
- language=self._language,
+ prompt=task.runtime.language,
)
- async def fill_params( # noqa: D102, PLR0913
+ async def fill_params(
self,
mcp_tool: MCPTools,
- current_goal: str,
+ task: TaskData,
current_input: dict[str, Any],
- error_message: str | dict = "",
params: dict[str, Any] | None = None,
params_description: str = "",
) -> dict[str, Any]:
- llm_query = _LLM_QUERY_FIX[self._language]
- prompt = _env.from_string(REPAIR_PARAMS[self._language]).render(
+ """填充并修复工具参数"""
+ llm_query = _LLM_QUERY_FIX[task.runtime.language]
+ error_message = task.state.errorMessage if task.state else {}
+ prompt = _env.from_string(REPAIR_PARAMS[task.runtime.language]).render(
tool_name=mcp_tool.toolName,
- goal=self.task.runtime.userInput,
- current_goal=current_goal,
+ goal=task.runtime.userInput,
+ current_goal=task.runtime.userInput,
tool_description=mcp_tool.description,
input_schema=mcp_tool.inputSchema,
input_params=current_input,
@@ -109,5 +97,5 @@ class MCPHost(MCPBase):
{"role": "user", "content": prompt},
{"role": "user", "content": llm_query},
],
- language=self._language,
+ language=task.runtime.language,
)
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index dd1ad17d1d5e46a378fff18858ec432c1a48759a..70a6715ebc2642b22456612c17b3ea5f2745fac6 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -33,6 +33,7 @@ from apps.schemas.mcp import (
Step,
ToolRisk,
)
+from apps.schemas.task import TaskData
_env = SandboxedEnvironment(
loader=BaseLoader,
@@ -49,7 +50,7 @@ class MCPPlanner(MCPBase):
async def get_flow_name(self) -> FlowName:
"""获取当前流程的名称"""
template = _env.from_string(GENERATE_FLOW_NAME[self._language])
- prompt = template.render(goal=self.task.runtime.userInput)
+ prompt = template.render(goal=self._goal)
result = await json_generator.generate(
function=GET_FLOW_NAME_FUNCTION[self._language],
@@ -61,26 +62,28 @@ class MCPPlanner(MCPBase):
)
return FlowName.model_validate(result)
- async def create_next_step(self, history: str, tools: list[MCPTools]) -> Step:
+ async def create_next_step(self, tools: list[MCPTools], task: TaskData) -> Step:
"""创建下一步的执行步骤"""
- # 构建提示词
template = _env.from_string(GEN_STEP[self._language])
- prompt = template.render(goal=self.task.runtime.userInput, history=history, tools=tools)
+ prompt = template.render(goal=self._goal, tools=tools)
- # 获取函数定义并动态设置tool_id的enum
function = deepcopy(CREATE_NEXT_STEP_FUNCTION[self._language])
function["parameters"]["properties"]["tool_name"]["enum"] = [tool.toolName for tool in tools]
+ history = await self.assemble_memory(task)
+
+ conversation = [
+ {"role": "system", "content": "You are a helpful assistant."},
+ *history,
+ {"role": "user", "content": prompt},
+ ]
+
step = await json_generator.generate(
function=function,
- conversation=[
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
- ],
+ conversation=conversation,
language=self._language,
)
logger.info("[MCPPlanner] 创建下一步的执行步骤: %s", step)
- # 使用Step模型解析结果
return Step.model_validate(step)
async def get_tool_risk(
@@ -90,7 +93,6 @@ class MCPPlanner(MCPBase):
additional_info: str = "",
) -> ToolRisk:
"""获取MCP工具的风险评估结果"""
- # 构建提示词
template = _env.from_string(RISK_EVALUATE[self._language])
prompt = template.render(
tool_name=tool.toolName,
@@ -108,38 +110,40 @@ class MCPPlanner(MCPBase):
language=self._language,
)
- # 返回风险评估结果
return ToolRisk.model_validate(risk)
async def is_param_error(
self,
- history: str,
+ task: TaskData,
error_message: str,
tool: MCPTools,
- step_description: str,
+ step_goal: str,
input_params: dict[str, Any],
) -> IsParamError:
"""判断错误信息是否是参数错误"""
tmplate = _env.from_string(IS_PARAM_ERROR[self._language])
prompt = tmplate.render(
- goal=self.task.runtime.userInput,
- history=history,
+ goal=self._goal,
step_id=tool.toolName,
step_name=tool.toolName,
- step_description=step_description,
+ step_goal=step_goal,
input_params=input_params,
error_message=error_message,
)
+ history = await self.assemble_memory(task)
+
+ conversation = [
+ {"role": "system", "content": "You are a helpful assistant."},
+ *history,
+ {"role": "user", "content": prompt},
+ ]
+
is_param_error = await json_generator.generate(
function=IS_PARAM_ERROR_FUNCTION[self._language],
- conversation=[
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
- ],
+ conversation=conversation,
language=self._language,
)
- # 使用IsParamError模型解析结果
return IsParamError.model_validate(is_param_error)
async def get_missing_param(
@@ -157,11 +161,9 @@ class MCPPlanner(MCPBase):
error_message=error_message,
)
- # 获取函数定义并设置parameters为schema_with_null
function = deepcopy(GET_MISSING_PARAMS_FUNCTION[self._language])
function["parameters"] = schema_with_null
- # 解析为结构化数据
return await json_generator.generate(
function=function,
conversation=[
@@ -171,15 +173,22 @@ class MCPPlanner(MCPBase):
language=self._language,
)
- async def generate_answer(self, memory: str, llm: LLM) -> AsyncGenerator[LLMChunk, None]:
- """生成最终回答,返回LLMChunk"""
+ async def generate_answer(self, task: TaskData, llm: LLM) -> AsyncGenerator[LLMChunk, None]:
+ """生成最终回答,返回LLMChunk"""
template = _env.from_string(FINAL_ANSWER[self._language])
prompt = template.render(
- memory=memory,
- goal=self.task.runtime.userInput,
+ goal=self._goal,
)
+
+ history = await self.assemble_memory(task)
+ messages = [
+ {"role": "system", "content": "You are a helpful assistant."},
+ *history,
+ {"role": "user", "content": prompt},
+ ]
+
async for chunk in llm.call(
- [{"role": "user", "content": prompt}],
+ messages,
streaming=True,
):
yield chunk
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index 390c4be4db1e9593baa55286818ed57f3c7af757..9a09b15d145669f97c1f112fe5720f106bda77c8 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -1,10 +1,26 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP相关的大模型Prompt"""
+from pathlib import Path
from textwrap import dedent
from apps.models import LanguageType
+
+def _load_prompt(prompt_id: str, language: LanguageType) -> str:
+ """
+ 从Markdown文件加载提示词
+
+ :param prompt_id: 提示词ID,例如 "gen_params" 等
+ :param language: 语言类型
+ :return: 提示词内容
+ """
+ # 组装Prompt文件路径: prompt_id.language.md (例如: gen_params.en.md)
+ filename = f"{prompt_id}.{language.value}.md"
+ prompt_dir = Path(__file__).parent.parent.parent / "data" / "prompts" / "system" / "mcp"
+ prompt_file = prompt_dir / filename
+ return prompt_file.read_text(encoding="utf-8")
+
GENERATE_FLOW_NAME: dict[LanguageType, str] = {
LanguageType.CHINESE: dedent(
r"""
@@ -98,12 +114,12 @@ CREATE_NEXT_STEP_FUNCTION: dict[LanguageType, dict] = {
"properties": {
"tool_name": {
"type": "string",
- "description": "工具名称",
+ "description": "工具名称,必须从可用工具列表中选择一个",
"enum": [],
},
"description": {
"type": "string",
- "description": "步骤描述",
+ "description": "步骤描述,清晰说明本步骤要做什么",
},
},
"required": ["tool_name", "description"],
@@ -126,12 +142,12 @@ CREATE_NEXT_STEP_FUNCTION: dict[LanguageType, dict] = {
"properties": {
"tool_name": {
"type": "string",
- "description": "Tool Name",
+ "description": "Tool name, must be selected from the available tools list",
"enum": [],
},
"description": {
"type": "string",
- "description": "Step description",
+ "description": "Step description, clearly explain what this step will do",
},
},
"required": ["tool_name", "description"],
@@ -148,77 +164,95 @@ CREATE_NEXT_STEP_FUNCTION: dict[LanguageType, dict] = {
GEN_STEP: dict[LanguageType, str] = {
LanguageType.CHINESE: dedent(
r"""
- 根据用户目标、执行历史和可用工具,生成下一个执行步骤。
-
- ## 任务要求
-
- 作为计划生成器,你需要:
- - **选择最合适的工具**:从可用工具集中选择当前阶段最适合的工具
- - **推进目标完成**:基于历史记录,制定能完成阶段性任务的步骤
- - **严格使用工具ID**:工具ID必须精确匹配可用工具列表中的ID
- - **判断完成状态**:若目标已达成,选择`Final`工具结束流程
-
- ## 示例
-
- 假设用户需要扫描一个MySQL数据库(地址192.168.1.1,端口3306,使用root账户和password密码),分析其性能瓶颈并进行调优。
- 之前已经完成了端口扫描,确认了3306端口开放。现在需要选择下一步操作。
- 查看可用工具列表后,发现有MySQL性能分析工具(mcp_tool_1)、文件存储工具(mcp_tool_2)和结束工具(Final)。
- 此时应选择MySQL性能分析工具,并描述这一步要做什么:使用提供的数据库连接信息(192.168.1.1:3306,root/password)来扫描和分析数据库性能。
-
- ---
-
- ## 当前任务
-
- **目标**:{{goal}}
-
- **历史记录**:
- {{history}}
-
- **可用工具**:
- {% for tool in tools %}
- - **{{tool.toolName}}**:{{tool.description}}
- {% endfor %}
+ 你的任务是分析对话历史和用户目标,然后**调用`create_next_step`函数**来规划下一个执行步骤。
+
+ ## 重要提醒
+ **你必须且只能调用名为`create_next_step`的函数**,该函数接受两个参数:
+ 1. `tool_name`: 从下方可用工具列表中选择一个工具名称
+ 2. `description`: 清晰描述本步骤要完成的具体任务
+
+ ## 可用工具列表
+ {% for tool in tools %}
+ - **工具名**: `{{tool.toolName}}`
+ **功能描述**: {{tool.description}}
+ {% endfor %}
+
+ ## 调用规范
+ - **函数名固定**: 必须调用`create_next_step`,不要使用工具名作为函数名
+ - **tool_name的值**: 必须是上述可用工具列表中的某个工具名称
+ - **description的值**: 描述本步骤的具体操作内容
+ - **任务完成时**: 如果用户目标已经完成,将`tool_name`参数设为`Final`
+
+ ## 错误示例❌(严禁模仿)
+ ```
+ # 错误:直接使用工具名作为函数名或返回工具名的字典
+ {'Final': '{"description": "任务完成"}'}
+ ```
+
+ ## 正确示例✅
+ ```
+ # 正确:调用create_next_step函数,tool_name参数的值才是工具名称
+ create_next_step(
+ tool_name="Final",
+ description="已完成所有分析任务,可以结束流程"
+ )
+ ```
+
+ ---
+ ## 当前任务
+ **用户目标**: {{goal}}
+
+ 请根据上方对话历史中已执行步骤的结果,**调用`create_next_step`函数**规划下一步。
+ 记住:函数名必须是`create_next_step`,`tool_name`参数的值才是具体的工具名称。
""",
- ),
+ ).strip(),
LanguageType.ENGLISH: dedent(
r"""
- Generate the next execution step based on user goals, execution history, and available tools.
-
- ## Task Requirements
-
- As a plan generator, you need to:
- - **Select the most appropriate tool**: Choose the best tool from available tools for the current stage
- - **Advance goal completion**: Based on execution history, formulate steps to complete phased tasks
- - **Strictly use tool IDs**: Tool ID must exactly match the ID in the available tools list
- - **Determine completion status**: If goal is achieved, select `Final` tool to end the workflow
-
- ## Example
-
- Suppose the user needs to scan a MySQL database (at 192.168.1.1:3306, using root account with password),
- analyze its performance bottlenecks, and optimize it.
- Previously, a port scan was completed and confirmed that port 3306 is open. Now we need to select the
- next action.
- Looking at the available tools list, there is a MySQL performance analysis tool (mcp_tool_1), a file
- storage tool (mcp_tool_2), and a final tool (Final).
- At this point, we should select the MySQL performance analysis tool and describe what this step will do:
- use the provided database connection information (192.168.1.1:3306, root/password) to scan and analyze
- database performance.
-
- ---
-
- ## Current Task
-
- **Goal**: {{goal}}
-
- **Execution History**:
- {{history}}
-
- **Available Tools**:
- {% for tool in tools %}
- - **{{tool.toolName}}**: {{tool.description}}
- {% endfor %}
+ Your task is to analyze the conversation history and user goal, then **call the `create_next_step` \
+function** to plan the next execution step.
+
+ ## Important Reminder
+ **You must and can only call the function named `create_next_step`**, which accepts two parameters:
+ 1. `tool_name`: Select a tool name from the available tools list below
+ 2. `description`: Clearly describe the specific task this step will accomplish
+
+ ## Available Tools List
+ {% for tool in tools %}
+ - **Tool Name**: `{{tool.toolName}}`
+ **Description**: {{tool.description}}
+ {% endfor %}
+
+ ## Calling Specifications
+ - **Fixed function name**: Must call `create_next_step`, do not use tool names as function names
+ - **tool_name value**: Must be one of the tool names from the available tools list above
+ - **description value**: Describe the specific operations of this step
+ - **When task completes**: If the user goal is achieved, set `tool_name` parameter to `Final`
+
+ ## Incorrect Examples ❌ (Strictly Forbidden)
+ ```
+ # Error: Using tool name as function name or returning a dictionary with tool name as key
+ {'Final': '{"description": "Task completed"}'}
+ ```
+
+ ## Correct Examples ✅
+ ```
+ # Correct: Call create_next_step function, the tool_name parameter value is the tool name
+ create_next_step(
+ tool_name="Final",
+ description="All analysis tasks completed, workflow can be ended"
+ )
+ ```
+
+ ---
+ ## Current Task
+ **User Goal**: {{goal}}
+
+ Please **call the `create_next_step` function** to plan the next step based on the results of \
+executed steps in the conversation history above.
+ Remember: The function name must be `create_next_step`, and the `tool_name` parameter value is \
+the specific tool name.
""",
- ),
+ ).strip(),
}
EVALUATE_TOOL_RISK_FUNCTION: dict[LanguageType, dict] = {
@@ -398,25 +432,23 @@ IS_PARAM_ERROR_FUNCTION: dict[LanguageType, dict] = {
IS_PARAM_ERROR: dict[LanguageType, str] = {
LanguageType.CHINESE: dedent(
r"""
- 判断以下工具执行失败是否由参数错误导致,并调用 check_parameter_error 工具返回结果。
+ 判断以下工具执行失败是否由参数错误导致,并调用 check_parameter_error 工具返回结果。
**判断标准**:
- **参数错误**:缺失必需参数、参数值不正确、参数格式/类型错误等
- **非参数错误**:权限问题、网络故障、系统异常、业务逻辑错误等
**示例**:当mysql_analyzer工具入参为 {"host": "192.0.0.1", ...},报错"host is not correct"时,
- 错误明确指出host参数值不正确,属于**参数错误**,应调用工具返回 {"is_param_error": true}
+ 错误明确指出host参数值不正确,属于**参数错误**,应调用工具返回 {"is_param_error": true}
---
- **背景信息**:
- - 用户目标:{{goal}}
- - 执行历史:{{history}}
+ **用户目标**:{{goal}}
**当前失败步骤**(步骤{{step_id}}):
- 工具:{{step_name}}
- - 说明:{{step_instruction}}
- - 入参:{{input_param}}
+ - 目标:{{step_goal}}
+ - 入参:{{input_params}}
- 报错:{{error_message}}
请基于报错信息和上下文综合判断,调用 check_parameter_error 工具返回判断结果。
@@ -439,14 +471,12 @@ should call the tool to return {"is_param_error": true}
---
- **Background**:
- - User goal: {{goal}}
- - Execution history: {{history}}
+ **User Goal**: {{goal}}
**Current Failed Step** (Step {{step_id}}):
- Tool: {{step_name}}
- - Instruction: {{step_instruction}}
- - Input: {{input_param}}
+ - Goal: {{step_goal}}
+ - Input: {{input_params}}
- Error: {{error_message}}
Please make a comprehensive judgment based on the error message and context, and call the \
@@ -587,157 +617,16 @@ for user to provide credentials again
),
}
-GEN_PARAMS: dict[LanguageType, str] = {
- LanguageType.CHINESE: dedent(
- r"""
- 根据总体目标、阶段目标、工具信息和背景信息,生成符合schema的工具参数。
-
- ## 任务要求
- - **严格遵循Schema**:生成的参数必须完全符合工具入参schema的类型和格式规范
- - **充分理解上下文**:全面分析总体目标、阶段目标和背景信息,提取所有可用的参数值
- - **匹配阶段目标**:确保生成的参数能够完成当前阶段目标
-
- ## 示例
-
- **工具**:
- - **名称**:`mysql_analyzer`
- - **描述**:分析MySQL数据库性能
-
- **总体目标**:扫描MySQL数据库(IP: 192.168.1.1,端口: 3306,用户名: root,密码: password),\
-分析性能瓶颈并调优
-
- **阶段目标**:连接MySQL数据库并分析性能
-
- **参数Schema**:
- ```json
- {
- "type": "object",
- "properties": {
- "host": {"type": "string", "description": "主机地址"},
- "port": {"type": "integer", "description": "端口号"},
- "username": {"type": "string", "description": "用户名"},
- "password": {"type": "string", "description": "密码"}
- },
- "required": ["host", "port", "username", "password"]
- }
- ```
-
- **背景信息**:
- - **步骤1**:生成端口扫描命令 → 成功,输出:`{"command": "nmap -sS -p --open 192.168.1.1"}`
- - **步骤2**:执行端口扫描 → 成功,确认3306端口开放
- **输出**:
- ```json
- {
- "host": "192.168.1.1",
- "port": 3306,
- "username": "root",
- "password": "password"
- }
- ```
+def get_gen_params_prompt(language: LanguageType) -> str:
+ """
+ 获取GEN_PARAMS提示词
- ---
+ :param language: 语言类型
+ :return: 提示词内容
+ """
+ return _load_prompt("gen_params", language)
- ## 当前任务
-
- **工具**:
- - **名称**:`{{tool_name}}`
- - **描述**:{{tool_description}}
-
- **总体目标**:
- {{goal}}
-
- **阶段目标**:
- {{current_goal}}
-
- **参数Schema**:
- ```json
- {{input_schema}}
- ```
-
- **背景信息**:
- {{background_info}}
-
- **输出**:
- """,
- ).strip("\n"),
- LanguageType.ENGLISH: dedent(
- r"""
- Generate tool parameters that conform to the schema based on the overall goal, phase goal, tool \
-information, and background context.
-
- ## Task Requirements
- - **Strictly Follow Schema**: Generated parameters must fully conform to the tool input schema's type and \
-format specifications
- - **Fully Understand Context**: Comprehensively analyze the overall goal, phase goal, and background \
-information to extract all available parameter values
- - **Match Phase Goal**: Ensure the generated parameters can accomplish the current phase goal
-
- ## Example
-
- **Tool**:
- - **Name**: `mysql_analyzer`
- - **Description**: Analyze MySQL database performance
-
- **Overall Goal**: Scan MySQL database (IP: 192.168.1.1, Port: 3306, Username: root, Password: password), \
-analyze performance bottlenecks, and optimize
-
- **Phase Goal**: Connect to MySQL database and analyze performance
-
- **Parameter Schema**:
- ```json
- {
- "type": "object",
- "properties": {
- "host": {"type": "string", "description": "Host address"},
- "port": {"type": "integer", "description": "Port number"},
- "username": {"type": "string", "description": "Username"},
- "password": {"type": "string", "description": "Password"}
- },
- "required": ["host", "port", "username", "password"]
- }
- ```
-
- **Background Information**:
- - **Step 1**: Generate port scan command → Success, output: `{"command": "nmap -sS -p --open 192.168.1.1"}`
- - **Step 2**: Execute port scan → Success, confirmed port 3306 is open
-
- **Output**:
- ```json
- {
- "host": "192.168.1.1",
- "port": 3306,
- "username": "root",
- "password": "password"
- }
- ```
-
- ---
-
- ## Current Task
-
- **Tool**:
- - **Name**: `{{tool_name}}`
- - **Description**: {{tool_description}}
-
- **Overall Goal**:
- {{goal}}
-
- **Phase Goal**:
- {{current_goal}}
-
- **Parameter Schema**:
- ```json
- {{input_schema}}
- ```
-
- **Background Information**:
- {{background_info}}
-
- **Output**:
- """,
- ).strip("\n"),
-}
REPAIR_PARAMS: dict[LanguageType, str] = {
LanguageType.CHINESE: dedent(
@@ -962,14 +851,6 @@ FINAL_ANSWER: dict[LanguageType, str] = {
# 用户目标
{{goal}}
- # 计划执行情况
- 为了完成上述目标,你实施了以下计划:
-
- {{memory}}
-
- # 其他背景信息:
- {{status}}
-
# 现在,请根据以上信息,向用户报告目标的完成情况:
""",
@@ -982,14 +863,6 @@ completion status to the user.
# User Goal
{{goal}}
- # Plan Execution Status
- To achieve the above goal, you implemented the following plan:
-
- {{memory}}
-
- # Additional Background Information:
- {{status}}
-
# Now, based on the above information, report the goal completion status to the user:
""",
diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py
index de64646a8885aba891829c93da4a95ee7b5e975e..43045aeecc3a3414bc0433230d44e016f278b142 100644
--- a/apps/scheduler/pool/mcp/pool.py
+++ b/apps/scheduler/pool/mcp/pool.py
@@ -73,19 +73,22 @@ class _MCPPool:
return result is not None
- async def get(self, mcp_id: str, user_id: str) -> MCPClient | None:
- """获取MCP客户端"""
+ async def get(self, mcp_id: str, user_id: str) -> MCPClient:
+ """获取MCP客户端,如果无法获取则抛出异常"""
item = await self._get_from_dict(mcp_id, user_id)
if item is None:
# 检查用户是否已激活
if not await self._validate_user(mcp_id, user_id):
- logger.warning("用户 %s 未激活MCP %s", user_id, mcp_id)
- return None
+ err = f"用户 {user_id} 未激活MCP {mcp_id}"
+ logger.warning(err)
+ raise RuntimeError(err)
# 初始化进程
item = await self.init_mcp(mcp_id, user_id)
if item is None:
- return None
+ err = f"初始化MCP {mcp_id} 失败(用户:{user_id})"
+ logger.error(err)
+ raise RuntimeError(err)
if user_id not in self.pool:
self.pool[user_id] = {}
@@ -97,6 +100,14 @@ class _MCPPool:
async def stop(self, mcp_id: str, user_id: str) -> None:
"""停止MCP客户端"""
+ if user_id not in self.pool:
+ logger.warning("[MCPPool] 用户 %s 不存在于池中,无法停止MCP %s", user_id, mcp_id)
+ return
+
+ if mcp_id not in self.pool[user_id]:
+ logger.warning("[MCPPool] 用户 %s 的MCP %s 不存在于池中,无法停止", user_id, mcp_id)
+ return
+
await self.pool[user_id][mcp_id].stop()
del self.pool[user_id][mcp_id]
diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py
index 2c5be573bc910ede231570662dcf0595d5c9aa6a..c371cbbd8ae7f3de0a437329a87dd1d85c091d00 100644
--- a/apps/schemas/mcp.py
+++ b/apps/schemas/mcp.py
@@ -125,6 +125,12 @@ class Step(BaseModel):
description: str = Field(description="步骤描述")
+class MCPRiskConfirm(BaseModel):
+ """MCP工具风险确认"""
+
+ confirm: bool = Field(description="是否确认")
+
+
class UpdateMCPServiceRequest(BaseModel):
"""POST /api/mcpservice 请求数据结构"""
diff --git a/apps/schemas/task.py b/apps/schemas/task.py
index 8b74073d36ced56920c134748e6ce42b2dc31f85..999034199b95cd538151e56a2846eeff360aac3f 100644
--- a/apps/schemas/task.py
+++ b/apps/schemas/task.py
@@ -20,13 +20,20 @@ class TaskData(BaseModel):
context: list[ExecutorHistory] = Field(description="执行历史")
-class CheckpointExtra(BaseModel):
+class AgentCheckpointExtra(BaseModel):
"""Executor额外数据"""
current_input: dict[str, Any] = Field(description="当前输入数据", default={})
error_message: str = Field(description="错误信息", default="")
retry_times: int = Field(description="当前步骤重试次数", default=0)
+
+class AgentHistoryExtra(BaseModel):
+ """执行器历史额外数据"""
+
+ step_goal: str = Field(description="步骤目标")
+
+
class StepQueueItem(BaseModel):
"""步骤栈中的元素"""
diff --git a/assets/.config.example.toml b/data/.config.example.toml
similarity index 100%
rename from assets/.config.example.toml
rename to data/.config.example.toml
diff --git a/data/prompts/call/domain.en.md b/data/prompts/call/domain.en.md
new file mode 100644
index 0000000000000000000000000000000000000000..56bfc3ae0186b1aaae19cdf8221785c375d722bf
--- /dev/null
+++ b/data/prompts/call/domain.en.md
@@ -0,0 +1,97 @@
+# Task Description
+
+You are a domain tag extraction assistant. Your task is to analyze conversation history,
+select the most relevant domain keywords from the available tags list,
+and return results by calling the `extract_domain` function.
+
+## Available Keywords List
+
+{{ available_keywords }}
+
+## Selection Requirements
+
+1. **Exact Match**: Only select from the available list, do not create new tags
+2. **Topic Relevance**: Select tags directly related to the conversation topic
+3. **Quantity Control**: Select 3-8 most relevant tags
+4. **Quality Standards**: Avoid duplicate or similar tags, prioritize
+ distinctive tags, sort by relevance
+{% raw %}{% if use_xml_format %}{% endraw %}
+
+## Function Specification
+
+**Function Name**: `extract_domain`
+**Function Description**: Extract domain keyword tags from conversation
+**Function Parameter Schema**:
+
+```json
+{
+ "type": "object",
+ "properties": {
+ "keywords": {
+ "type": "array",
+ "items": {"type": "string"},
+ "description": "List of keywords or tags"
+ }
+ },
+ "required": ["keywords"]
+}
+```
+
+## Output Format
+
+Use XML format to call the function, basic format:
+
+```xml
+
+keyword1
+keyword2
+
+```
+
+## Call Examples
+
+### Example 1: Normal Extraction (XML)
+
+- Conversation: User asks "What's the weather like in Beijing?",
+ Assistant replies "Beijing is sunny today"
+- Available list contains: ["Beijing", "Shanghai", "weather", "temperature",
+ "Python", "Java"]
+- Function call:
+
+```xml
+
+Beijing
+weather
+temperature
+
+```
+
+### Example 2: No Relevant Tags (XML)
+
+- Conversation: User says "I'm feeling good today"
+- If no relevant tags in the available list, return empty tags:
+
+```xml
+
+
+```
+
+Please use XML format to call the `extract_domain` function for tag
+extraction.{% raw %}{% else %}{% endraw %}
+
+## Function Call Examples
+
+### Example 1: Normal Extraction
+
+- Conversation: User asks "What's the weather like in Beijing?",
+ Assistant replies "Beijing is sunny today"
+- Available list contains: ["Beijing", "Shanghai", "weather", "temperature",
+ "Python", "Java"]
+- Function call result: ["Beijing", "weather", "temperature"]
+
+### Example 2: No Relevant Tags
+
+- Conversation: User says "I'm feeling good today"
+- If no relevant tags in the available list, return empty array: []
+
+Please call the `extract_domain` function to complete tag extraction.{% raw %}{% endif %}{% endraw %}
diff --git a/data/prompts/call/domain.zh.md b/data/prompts/call/domain.zh.md
new file mode 100644
index 0000000000000000000000000000000000000000..e93a452dff9b78648b8739a0950059296bc88c12
--- /dev/null
+++ b/data/prompts/call/domain.zh.md
@@ -0,0 +1,90 @@
+# 任务说明
+
+你是一个领域标签提取助手。你的任务是分析对话历史,从备选标签列表中选择最相关的领域关键词,
+并通过调用 `extract_domain` 函数返回结果。
+
+## 备选标签列表
+
+{{ available_keywords }}
+
+## 选择要求
+
+1. **精准匹配**:只能从备选列表中选择,不可自创标签
+2. **话题相关性**:选择与对话主题直接相关的标签
+3. **数量控制**:选择3-8个最相关的标签
+4. **质量标准**:避免重复或相似标签,优先选择具有区分度的标签,按相关性排序
+{% raw %}{% if use_xml_format %}{% endraw %}
+
+## 函数说明
+
+**函数名称**:`extract_domain`
+**函数描述**:从对话中提取领域关键词标签
+**函数参数Schema**:
+
+```json
+{
+ "type": "object",
+ "properties": {
+ "keywords": {
+ "type": "array",
+ "items": {"type": "string"},
+ "description": "关键词或标签列表"
+ }
+ },
+ "required": ["keywords"]
+}
+```
+
+## 输出格式
+
+使用XML格式调用函数,基本格式:
+
+```xml
+
+关键词1
+关键词2
+
+```
+
+## 调用示例
+
+### 示例1:正常提取(XML)
+
+- 对话:用户询问"北京天气如何?",助手回复"北京今天晴"
+- 备选列表包含:["北京", "上海", "天气", "气温", "Python", "Java"]
+- 函数调用:
+
+```xml
+
+北京
+天气
+气温
+
+```
+
+### 示例2:无相关标签(XML)
+
+- 对话:用户说"今天心情不错"
+- 如果备选列表中没有相关标签,返回空标签:
+
+```xml
+
+
+```
+
+请使用XML格式调用 `extract_domain` 函数完成标签提取。{% raw %}{% else %}{% endraw %}
+
+## 函数调用示例
+
+### 示例1:正常提取
+
+- 对话:用户询问"北京天气如何?",助手回复"北京今天晴"
+- 备选列表包含:["北京", "上海", "天气", "气温", "Python", "Java"]
+- 函数调用结果:["北京", "天气", "气温"]
+
+### 示例2:无相关标签
+
+- 对话:用户说"今天心情不错"
+- 如果备选列表中没有相关标签,返回空数组:[]
+
+请调用 `extract_domain` 函数完成标签提取。{% raw %}{% endif %}{% endraw %}
diff --git a/data/prompts/call/facts.en.md b/data/prompts/call/facts.en.md
new file mode 100644
index 0000000000000000000000000000000000000000..07ba3d1924104922bb7fe7f9144f534101891658
--- /dev/null
+++ b/data/prompts/call/facts.en.md
@@ -0,0 +1,101 @@
+# Task Description
+
+You are a fact extraction assistant. Your task is to extract key facts from the conversation
+and return structured results by calling the `extract_facts` function.
+
+## Information Types to Focus On
+
+1. **Entities**: Names, locations, organizations, events, etc.
+2. **Preferences**: Attitudes towards entities, such as like, dislike, etc.
+3. **Relationships**: Relationships between users and entities, or between
+ entities
+4. **Actions**: Actions affecting entities, such as query, search, browse,
+ click, etc.
+
+## Extraction Requirements
+
+1. Facts must be accurate and extracted only from the conversation
+2. Facts must be clear and concise, each less than 30 words
+3. Each fact should be independent and complete, easy to understand
+{% if use_xml_format %}
+
+## Function Specification
+
+**Function Name**: `extract_facts`
+**Function Description**: Extract key fact information from conversation
+**Function Parameter Schema**:
+
+```json
+{
+ "type": "object",
+ "properties": {
+ "facts": {
+ "type": "array",
+ "items": {"type": "string"},
+ "description": "Fact entries extracted from conversation"
+ }
+ },
+ "required": ["facts"]
+}
+```
+
+## Output Format
+
+Use XML format to call the function, basic format:
+
+```xml
+
+fact1
+fact2
+
+```
+
+## Call Examples
+
+### Example 1: Normal Extraction (XML)
+
+- Conversation: User asks "What are the attractions in Hangzhou West Lake?",
+ Assistant replies "Notable attractions include Su Causeway, Bai Causeway,
+ Broken Bridge, etc."
+- Function call:
+
+```xml
+
+Hangzhou West Lake has Su Causeway, Bai Causeway, Broken Bridge, etc.
+
+```
+
+### Example 2: Multiple Information Types (XML)
+
+- Conversation: User says "I work in Beijing and often go to Starbucks in
+ Sanlitun"
+- Function call:
+
+```xml
+
+User works in Beijing
+User often goes to Starbucks in Sanlitun
+
+```
+
+Please use XML format to call the `extract_facts` function for fact
+extraction.{% else %}
+
+## Function Call Examples
+
+### Example 1: Normal Extraction
+
+- Conversation: User asks "What are the attractions in Hangzhou West Lake?",
+ Assistant replies "Notable attractions include Su Causeway, Bai Causeway,
+ Broken Bridge, etc."
+- Function call result: ["Hangzhou West Lake has Su Causeway, Bai Causeway,
+ Broken Bridge, etc."]
+
+### Example 2: Multiple Information Types
+
+- Conversation: User says "I work in Beijing and often go to Starbucks in
+ Sanlitun"
+- Function call result: ["User works in Beijing", "User often goes to
+ Starbucks in Sanlitun"]
+
+Please call the `extract_facts` function to complete fact extraction.{% endif %}
diff --git a/data/prompts/call/facts.zh.md b/data/prompts/call/facts.zh.md
new file mode 100644
index 0000000000000000000000000000000000000000..1051af21b40b041dabb2f41c38e57afc54ff79e8
--- /dev/null
+++ b/data/prompts/call/facts.zh.md
@@ -0,0 +1,113 @@
+# 事实提取任务
+
+## 角色
+
+你是一个专业的事实信息提取助手,能够从对话中准确提取关键事实信息,并通过调用工具返回结构化结果。
+
+## 任务目标
+
+从对话中提取以下类型的关键信息:
+
+1. **实体**:姓名、地点、组织、事件等
+2. **偏好**:对实体的态度,如喜欢、讨厌等
+3. **关系**:用户与实体之间、实体与实体之间的关系
+4. **动作**:查询、搜索、浏览、点击等影响实体的动作
+
+## 提取要求
+
+1. 事实必须准确,仅从对话中提取
+2. 事实必须清晰简洁,每条少于30字
+3. 每条事实独立完整,易于理解
+
+## 工具
+
+你可以调用以下工具来完成事实提取任务。
+
+{% raw %}{% if use_xml_format %}{% endraw %}
+调用工具时,采用XML风格标签进行格式化。格式规范如下:
+
+```xml
+
+事实1
+事实2
+
+```
+
+格式样例(仅供参考):从对话中提取事实
+
+```xml
+
+杭州西湖有苏堤、白堤、断桥、三潭印月等景点
+
+```
+
+{% raw %}{% endif %}{% endraw %}
+
+### extract_facts
+
+描述: 从对话中提取关键事实信息
+
+JSON Schema:
+
+```json
+{
+ "type": "object",
+ "properties": {
+ "facts": {
+ "type": "array",
+ "items": {"type": "string"},
+ "description": "从对话中提取的事实条目"
+ }
+ },
+ "required": ["facts"]
+}
+```
+
+## 调用示例
+
+### 示例1:正常提取
+
+- 对话:用户问"杭州西湖有哪些景点?",助手回复"西湖周围有苏堤、白堤、断桥、三潭印月等景点"
+- 函数调用:
+
+{% raw %}{% if use_xml_format %}{% endraw %}
+
+```xml
+
+杭州西湖有苏堤、白堤、断桥、三潭印月等景点
+
+```
+
+{% raw %}{% else %}{% endraw %}
+
+```json
+["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"]
+```
+
+{% raw %}{% endif %}{% endraw %}
+
+### 示例2:多类型信息
+
+- 对话:用户说"我在北京工作,经常去三里屯的星巴克"
+- 函数调用:
+
+{% raw %}{% if use_xml_format %}{% endraw %}
+
+```xml
+
+用户在北京工作
+用户经常去三里屯的星巴克
+
+```
+
+{% raw %}{% else %}{% endraw %}
+
+```json
+["用户在北京工作", "用户经常去三里屯的星巴克"]
+```
+
+{% raw %}{% endif %}{% endraw %}
+
+---
+
+现在开始提取事实信息:
diff --git a/data/prompts/system/mcp/gen_params.en.md b/data/prompts/system/mcp/gen_params.en.md
new file mode 100644
index 0000000000000000000000000000000000000000..d573a5c0830e823a6548ef35442a8048915658b1
--- /dev/null
+++ b/data/prompts/system/mcp/gen_params.en.md
@@ -0,0 +1,52 @@
+# Tool Calling Task
+
+## Role
+
+You are a professional tool calling assistant capable of accurately calling specified tools to respond to user instructions by obtaining information (context, etc.).
+
+## User Instructions
+
+**Current Goal**: {{current_goal}}
+
+**Overall Goal (for reference)**: {{goal}}
+
+## Tools
+
+You can call these tools to respond to user instructions.
+
+{% raw %}{% if use_xml_format %}{% endraw %}
+When calling tools, format using XML-style tags. Format specification:
+
+```xml
+
+ Tool Name
+
+ Parameters for calling the tool, must be in JSON format and conform to JSON Schema
+
+
+```
+
+Format example (for reference only): Get weather for Hangzhou
+
+```xml
+
+ check_weather
+
+ {
+ "city": "Hangzhou"
+ }
+
+
+```
+
+{% raw %}{% endif %}{% endraw %}
+
+### {{ tool_name }}
+
+Description: {{ tool_description }}
+
+JSON Schema: {{ input_schema }}
+
+---
+
+Now begin responding to user instructions:
diff --git a/data/prompts/system/mcp/gen_params.zh.md b/data/prompts/system/mcp/gen_params.zh.md
new file mode 100644
index 0000000000000000000000000000000000000000..4d24766aa33213d968f795a729a4051d333a6bca
--- /dev/null
+++ b/data/prompts/system/mcp/gen_params.zh.md
@@ -0,0 +1,52 @@
+# 工具调用任务
+
+## 角色
+
+你是一个专业的工具调用助手,能够通过获取信息(上下文等),准确调用指定工具以响应用户给你的指令。
+
+## 用户指令
+
+**当前目标**: {{current_goal}}
+
+**总体目标(供参考)**: {{goal}}
+
+## 工具
+
+你可以调用这些工具,来响应用户的指令。
+
+{% raw %}{% if use_xml_format %}{% endraw %}
+调用工具时,采用XML风格标签进行格式化。格式规范如下:
+
+```xml
+
+ 工具名称
+
+ 调用工具时的参数,必须为JSON格式,且符合JSON Schema
+
+
+```
+
+格式样例(仅供参考):获取杭州市的天气
+
+```xml
+
+ check_weather
+
+ {
+ "city": "Hangzhou"
+ }
+
+
+```
+
+{% raw %}{% endif %}{% endraw %}
+
+### {{ tool_name }}
+
+描述: {{ tool_description }}
+
+JSON Schema:{{ input_schema }}
+
+---
+
+现在开始响应用户指令:
diff --git a/sample/README.md b/data/semantics/README.md
similarity index 100%
rename from sample/README.md
rename to data/semantics/README.md
diff --git a/sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml b/data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml
similarity index 100%
rename from sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml
rename to data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/flow/4eb1dc31-23d2-42a3-9e8a-14e83460610b.yaml
diff --git a/sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml b/data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml
similarity index 100%
rename from sample/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml
rename to data/semantics/app/13754910-336e-45a9-a9ba-5d3c1b9bf20c/metadata.yaml
diff --git a/sample/mcp/template/test_mcp/config.json b/data/semantics/mcp/template/test_mcp/config.json
similarity index 100%
rename from sample/mcp/template/test_mcp/config.json
rename to data/semantics/mcp/template/test_mcp/config.json
diff --git a/sample/mcp/template/test_mcp/project/README.md b/data/semantics/mcp/template/test_mcp/project/README.md
similarity index 100%
rename from sample/mcp/template/test_mcp/project/README.md
rename to data/semantics/mcp/template/test_mcp/project/README.md
diff --git a/sample/mcp/template/test_mcp/project/package.json b/data/semantics/mcp/template/test_mcp/project/package.json
similarity index 100%
rename from sample/mcp/template/test_mcp/project/package.json
rename to data/semantics/mcp/template/test_mcp/project/package.json
diff --git a/sample/service/test_service/metadata.yaml b/data/semantics/service/test_service/metadata.yaml
similarity index 100%
rename from sample/service/test_service/metadata.yaml
rename to data/semantics/service/test_service/metadata.yaml
diff --git a/sample/service/test_service/openapi/api.yaml b/data/semantics/service/test_service/openapi/api.yaml
similarity index 100%
rename from sample/service/test_service/openapi/api.yaml
rename to data/semantics/service/test_service/openapi/api.yaml
diff --git a/assets/tiktoken/9b5ad71b2ce5302211f9c61530b329a4922fc6a4 b/data/tiktoken_cache/9b5ad71b2ce5302211f9c61530b329a4922fc6a4
similarity index 100%
rename from assets/tiktoken/9b5ad71b2ce5302211f9c61530b329a4922fc6a4
rename to data/tiktoken_cache/9b5ad71b2ce5302211f9c61530b329a4922fc6a4