From 174bab5d30a89450a090b0c669c93c9fec36fbb5 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 21 Jan 2025 19:40:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84=E3=80=81=E9=80=82=E9=85=8DFunctionCall?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/pool.py | 5 +- apps/llm/function.py | 74 +++++++++++++++++---- apps/manager/conversation.py | 4 +- apps/manager/task.py | 10 +-- apps/routers/conversation.py | 34 +++++----- apps/routers/record.py | 20 +++--- apps/scheduler/pool/loader/app.py | 27 ++++++++ apps/scheduler/pool/loader/flow.py | 89 +++++++++++--------------- apps/scheduler/pool/loader/metadata.py | 36 +++++++++-- apps/scheduler/pool/loader/openapi.py | 35 ++++------ apps/scheduler/pool/loader/service.py | 40 ++++++++++++ apps/scheduler/pool/pool.py | 10 +-- sample/apps/test_app/flows/flow.yaml | 6 +- 13 files changed, 256 insertions(+), 134 deletions(-) create mode 100644 apps/scheduler/pool/loader/app.py create mode 100644 apps/scheduler/pool/loader/service.py diff --git a/apps/entities/pool.py b/apps/entities/pool.py index 1d06e674e..8ce0782f5 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -50,12 +50,13 @@ class NodePool(PoolBase): """ id: str = Field(description="Node的ID") + service_id: str = Field(description="Node所属的Service ID") type: CallType = Field(description="Call的类型") - service: str = Field(description="服务名称") - meta_call: Optional[str] = Field(description="基类Call的ID", default=None) + base_node_id: Optional[str] = Field(description="基类Node的ID", default=None) input_schema: dict[str, Any] = Field(description="输入参数的schema", default={}) output_schema: dict[str, Any] = Field(description="输出参数的schema", default={}) params: dict[str, Any] = Field(description="参数", default={}) + params_schema: dict[str, Any] = Field(description="参数的schema", default={}) path: str = Field(description="Node的路径;包括Node的作用域等") diff --git a/apps/llm/function.py b/apps/llm/function.py index 039581fce..c25acf44c 100644 --- a/apps/llm/function.py +++ b/apps/llm/function.py @@ -28,21 +28,30 @@ class FunctionLLM: - vllm """ if config["SCHEDULER_BACKEND"] == "sglang": - self._client = sglang.RuntimeEndpoint(config["SCHEDULER_URL"], api_key=config["SCHEDULER_API_KEY"]) + if not config["SCHEDULER_API_KEY"]: + self._client = sglang.RuntimeEndpoint(config["SCHEDULER_URL"]) + else: + self._client = sglang.RuntimeEndpoint(config["SCHEDULER_URL"], api_key=config["SCHEDULER_API_KEY"]) self._client.chat_template = get_chat_template("chatml") sglang.set_default_backend(self._client) - if config["SCHEDULER_BACKEND"] == "vllm": - self._client = openai.AsyncOpenAI( - base_url=config["SCHEDULER_URL"], - api_key=config["SCHEDULER_API_KEY"], - ) + if config["SCHEDULER_BACKEND"] == "vllm" or config["SCHEDULER_BACKEND"] == "openai": + if not config["SCHEDULER_API_KEY"]: + self._client = openai.AsyncOpenAI(base_url=config["SCHEDULER_URL"]) + else: + self._client = openai.AsyncOpenAI( + base_url=config["SCHEDULER_URL"], + api_key=config["SCHEDULER_API_KEY"], + ) if config["SCHEDULER_BACKEND"] == "ollama": - self._client = ollama.AsyncClient( - host=config["SCHEDULER_URL"], - headers={ - # "Authorization": f"Bearer {config['SCHEDULER_API_KEY']}", - }, - ) + if not config["SCHEDULER_API_KEY"]: + self._client = ollama.AsyncClient(host=config["SCHEDULER_URL"]) + else: + self._client = ollama.AsyncClient( + host=config["SCHEDULER_URL"], + headers={ + "Authorization": f"Bearer {config['SCHEDULER_API_KEY']}", + }, + ) @staticmethod @sglang.function @@ -107,6 +116,47 @@ class FunctionLLM: return result + async def _call_openai(self, messages: list[dict[str, Any]], schema: dict[str, Any], max_tokens: int, temperature: float) -> str: + """调用openai模型生成JSON + + :param messages: 历史消息列表 + :param schema: 输出JSON Schema + :param max_tokens: 最大Token长度 + :param temperature: 大模型温度 + :return: 生成的JSON + """ + model = config["SCHEDULER_MODEL"] + if not model: + err_msg = "未设置FuntionCall所用模型!" + raise ValueError(err_msg) + + param = { + "model": model, + "messages": messages, + "max_tokens": max_tokens, + "temperature": temperature, + } + + if schema: + tool_data = { + "type": "function", + "function": { + "name": "output", + "description": "Call the function to get the output", + "parameters": schema, + }, + } + param["tools"] = [tool_data] + param["tool_choice"] = "required" + + response = await self._client.chat.completions.create(**param) # type: ignore[] + try: + ans = response.choices[0].message.tool_calls[0].function.arguments or "" + except IndexError: + ans = "" + return ans + + async def _call_ollama(self, messages: list[dict[str, Any]], schema: dict[str, Any], max_tokens: int, temperature: float) -> str: """调用ollama模型生成JSON diff --git a/apps/manager/conversation.py b/apps/manager/conversation.py index 185f91afe..5d5a2825e 100644 --- a/apps/manager/conversation.py +++ b/apps/manager/conversation.py @@ -38,12 +38,14 @@ class ConversationManager: return None @staticmethod - async def add_conversation_by_user_sub(user_sub: str) -> Optional[Conversation]: + async def add_conversation_by_user_sub(user_sub: str, app_id: str, *, is_debug: bool) -> Optional[Conversation]: """通过用户ID查询历史记录""" conversation_id = str(uuid.uuid4()) conv = Conversation( _id=conversation_id, user_sub=user_sub, + app_id=app_id, + is_debug=is_debug, ) try: async with MongoDB.get_session() as session, await session.start_transaction(): diff --git a/apps/manager/task.py b/apps/manager/task.py index eb9ee384f..51a0d0055 100644 --- a/apps/manager/task.py +++ b/apps/manager/task.py @@ -106,9 +106,9 @@ class TaskManager: # 创建新的Record,缺失的数据延迟关联 new_record = RecordData( id=str(uuid.uuid4()), - conversation_id=post_body.conversation_id, - group_id=str(uuid.uuid4()) if not post_body.group_id else post_body.group_id, - task_id="", + conversationId=post_body.conversation_id, + groupId=str(uuid.uuid4()) if not post_body.group_id else post_body.group_id, + taskId="", content=RecordContent( question=post_body.question, answer="", @@ -119,7 +119,7 @@ class TaskManager: time=0, feature=post_body.features.model_dump(by_alias=True), ), - created_at=round(datetime.now(timezone.utc).timestamp(), 3), + createdAt=round(datetime.now(timezone.utc).timestamp(), 3), ) if not task: @@ -240,7 +240,7 @@ class TaskManager: try: async for history in flow_context_collection.find({"task_id": task_id}): history_obj = FlowHistory.model_validate(history) - flow_context[history_obj.step_name] = history_obj + flow_context[history_obj.step_id] = history_obj return flow_context except Exception as e: diff --git a/apps/routers/conversation.py b/apps/routers/conversation.py index 90382ae47..83e8ef72d 100644 --- a/apps/routers/conversation.py +++ b/apps/routers/conversation.py @@ -22,6 +22,8 @@ from apps.entities.response_data import ( ConversationListItem, ConversationListMsg, ConversationListRsp, + DeleteConversationMsg, + DeleteConversationRsp, ResponseData, UpdateConversationRsp, ) @@ -54,7 +56,7 @@ async def create_new_conversation(user_sub: str, conv_list: list[Conversation]) # 新建对话 if create_new: - new_conv = await ConversationManager.add_conversation_by_user_sub(user_sub) + new_conv = await ConversationManager.add_conversation_by_user_sub(user_sub, app_id, is_debug=False) if not new_conv: err = "Create new conversation failed." raise RuntimeError(err) @@ -71,10 +73,10 @@ async def get_conversation_list(user_sub: Annotated[str, Depends(get_user)]): # # 把已有对话转换为列表 result_conversations = [ ConversationListItem( - conversation_id=conv.id, + conversationId=conv.id, title=conv.title, - doc_count=await DocumentManager.get_doc_count(user_sub, conv.id), - created_time=datetime.fromtimestamp(conv.created_at, tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"), + docCount=await DocumentManager.get_doc_count(user_sub, conv.id), + createdTime=datetime.fromtimestamp(conv.created_at, tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"), ) for conv in conversations ] @@ -90,10 +92,10 @@ async def get_conversation_list(user_sub: Annotated[str, Depends(get_user)]): # if new_conv: result_conversations.append(ConversationListItem( - conversation_id=new_conv.id, + conversationId=new_conv.id, title=new_conv.title, - doc_count=0, - created_time=datetime.fromtimestamp(new_conv.created_at, tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"), + docCount=0, + createdTime=datetime.fromtimestamp(new_conv.created_at, tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"), )) return JSONResponse(status_code=status.HTTP_200_OK, @@ -129,19 +131,19 @@ async def add_conversation(user_sub: Annotated[str, Depends(get_user)]): # noqa return JSONResponse(status_code=status.HTTP_200_OK, content=AddConversationRsp( code=status.HTTP_200_OK, message="success", - result=AddConversationMsg(conversation_id=new_conv.id), + result=AddConversationMsg(conversationId=new_conv.id), ).model_dump(exclude_none=True, by_alias=True)) @router.put("", response_model=UpdateConversationRsp, dependencies=[Depends(verify_csrf_token)]) async def update_conversation( # noqa: ANN201 post_body: ModifyConversationData, - conversation_id: Annotated[str, Query()], + conversationId: Annotated[str, Query()], # noqa: N803 user_sub: Annotated[str, Depends(get_user)], ): """更新特定Conversation的数据""" # 判断Conversation是否合法 - conv = await ConversationManager.get_conversation_by_conversation_id(user_sub, conversation_id) + conv = await ConversationManager.get_conversation_by_conversation_id(user_sub, conversationId) if not conv or conv.user_sub != user_sub: LOGGER.error("Conversation: conversation_id not found.") return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content=ResponseData( @@ -153,7 +155,7 @@ async def update_conversation( # noqa: ANN201 # 更新Conversation数据 change_status = await ConversationManager.update_conversation_by_conversation_id( user_sub, - conversation_id, + conversationId, { "title": post_body.title, }, @@ -171,10 +173,10 @@ async def update_conversation( # noqa: ANN201 code=status.HTTP_200_OK, message="success", result=ConversationListItem( - conversation_id=conv.id, + conversationId=conv.id, title=conv.title, - doc_count=await DocumentManager.get_doc_count(user_sub, conv.id), - created_time=datetime.fromtimestamp(conv.created_at, tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"), + docCount=await DocumentManager.get_doc_count(user_sub, conv.id), + createdTime=datetime.fromtimestamp(conv.created_at, tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"), ), ).model_dump(exclude_none=True, by_alias=True), ) @@ -208,8 +210,8 @@ async def delete_conversation(request: Request, post_body: DeleteConversationDat deleted_conversation.append(conversation_id) - return JSONResponse(status_code=status.HTTP_200_OK, content=ResponseData( + return JSONResponse(status_code=status.HTTP_200_OK, content=DeleteConversationRsp( code=status.HTTP_200_OK, message="success", - result={"conversation_id_list": deleted_conversation}, + result=DeleteConversationMsg(conversationIdList=deleted_conversation), ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/routers/record.py b/apps/routers/record.py index c95b4636d..361adc749 100644 --- a/apps/routers/record.py +++ b/apps/routers/record.py @@ -56,16 +56,16 @@ async def get_record(conversation_id: str, user_sub: Annotated[str, Depends(get_ tmp_record = RecordData( id=record.record_id, - group_id=record_group.id, - task_id=record_group.task_id, - conversation_id=conversation_id, + groupId=record_group.id, + taskId=record_group.task_id, + conversationId=conversation_id, content=record_data, metadata=record.metadata if record.metadata else RecordMetadata( input_tokens=0, output_tokens=0, time=0, ), - created_at=record.created_at, + createdAt=record.created_at, ) # 获得Record关联的文档 @@ -76,17 +76,15 @@ async def get_record(conversation_id: str, user_sub: Annotated[str, Depends(get_ if flow_list: tmp_record.flow = RecordFlow( id=flow_list[0].id, - record_id=record.record_id, - plugin_id=flow_list[0].plugin_id, - flow_id=flow_list[0].flow_id, - step_num=len(flow_list), + recordId=record.record_id, + flowId=flow_list[0].flow_id, + stepNum=len(flow_list), steps=[], ) for flow in flow_list: tmp_record.flow.steps.append(RecordFlowStep( - step_name=flow.step_name, - step_status=flow.status, - step_order=flow.step_order, + stepId=flow.step_id, + stepStatus=flow.status, input=flow.input_data , output=flow.output_data, )) diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py new file mode 100644 index 000000000..4738cfd8c --- /dev/null +++ b/apps/scheduler/pool/loader/app.py @@ -0,0 +1,27 @@ +"""App加载器 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from pathlib import Path + +from apps.common.config import config +from apps.scheduler.pool.loader.metadata import MetadataLoader + + +class AppLoader: + """应用加载器""" + + @classmethod + def load(cls, app_dir: str) -> None: + """从文件系统中加载应用 + + :param app_dir: 应用目录 + """ + path = Path(config["SERVICE_DIR"]) / app_dir + metadata = MetadataLoader.load(path / "metadata.yaml") + + + @classmethod + async def save(cls, metadata_type: MetadataType, metadata: dict[str, Any], resource_id: str) -> None: + """保存应用""" + await MetadataLoader.save(metadata_type, metadata, resource_id) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index a254546ba..703a99b29 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -1,60 +1,45 @@ +"""Flow加载器 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from pathlib import Path + +import yaml + +from apps.common.config import config +from apps.entities.flow import Flow + + class FlowLoader: """工作流加载器""" - @staticmethod - async def generate(flow_id: str) -> None: - """从数据库中加载工作流""" - pass + @classmethod + def load(cls, app_id: str, flow_id: str) -> Flow: + """从文件系统中加载【单个】工作流""" + flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + with flow_path.open(encoding="utf-8") as f: + flow_yaml = yaml.safe_load(f) - @staticmethod - def load() -> None: - """执行工作流加载""" - pass + if "name" not in flow_yaml: + err = f"工作流名称不能为空:{flow_path!s}" + raise ValueError(err) + + if "::" in flow_yaml["id"]: + err = f"工作流名称包含非法字符:{flow_path!s}" + raise ValueError(err) + try: + # 检查Flow格式,并转换为Flow对象 + flow = Flow.model_validate(flow_yaml) + except Exception as e: + err = f"工作流格式错误:{e!s}; 文件路径:{flow_path!s}" + raise ValueError(err) from e + return flow -def _load_flow(self) -> list[dict[str, Any]]: - flow_path = self._plugin_location / FLOW_DIR - flows = [] - if flow_path.is_dir(): - for current_flow_path in flow_path.iterdir(): - LOGGER.info(f"载入Flow: {current_flow_path}") - - with Path(current_flow_path).open(encoding="utf-8") as f: - flow_yaml = yaml.safe_load(f) - - if "/" in flow_yaml["id"]: - err = "Flow名称包含非法字符!" - raise ValueError(err) - - if "on_error" in flow_yaml: - error_step = Step(name="error", **flow_yaml["on_error"]) - else: - error_step = Step( - name="error", - call_type="llm", - params={ - "user_prompt": "当前工具执行发生错误,原始错误信息为:{data}. 请向用户展示错误信息,并给出可能的解决方案。\n\n背景信息:{context}", - }, - ) - - steps = {} - for step in flow_yaml["steps"]: - steps[step["name"]] = Step(**step) - - if "next_flow" not in flow_yaml: - next_flow = None - else: - next_flow = [] - for next_flow_item in flow_yaml["next_flow"]: - next_flow.append(NextFlow( - id=next_flow_item["id"], - question=next_flow_item["question"], - )) - flows.append({ - "id": flow_yaml["id"], - "description": flow_yaml["description"], - "data": Flow(on_error=error_step, steps=steps, next_flow=next_flow), - }) - return flows \ No newline at end of file + + @classmethod + def save(cls, app_id: str, flow_id: str, flow: Flow) -> None: + """保存工作流""" + pass diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index d4568b64b..6e51c09cd 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -7,6 +7,7 @@ from typing import Any, Union import yaml +from apps.common.config import config from apps.constants import LOGGER from apps.entities.enum_var import MetadataType from apps.entities.flow import ( @@ -18,9 +19,9 @@ from apps.entities.flow import ( class MetadataLoader: """元数据加载器""" - @staticmethod - async def load(file_path: Path) -> Union[AppMetadata, ServiceMetadata]: - """检查metadata.yaml是否正确""" + @classmethod + async def load(cls, file_path: Path) -> Union[AppMetadata, ServiceMetadata]: + """加载【单个】元数据""" # 检查yaml格式 try: metadata_dict = yaml.safe_load(file_path.read_text()) @@ -49,7 +50,28 @@ class MetadataLoader: return metadata - @staticmethod - async def save(metadata: dict[str, Any], file_path: Path) -> None: - """将元数据保存到文件""" - pass + @classmethod + async def save(cls, metadata_type: MetadataType, metadata: dict[str, Any], resource_id: str) -> None: + """保存【单个】元数据""" + class_dict = { + MetadataType.APP: AppMetadata, + MetadataType.SERVICE: ServiceMetadata, + } + + # 检查资源路径 + if metadata_type == MetadataType.APP: + resource_path = Path(config["SERVICE_DIR"]) / "app" / resource_id / "metadata.yaml" + elif metadata_type == MetadataType.SERVICE: + resource_path = Path(config["SERVICE_DIR"]) / "service" / resource_id / "metadata.yaml" + + # 保存元数据 + try: + metadata_class: type[Union[AppMetadata, ServiceMetadata]] = class_dict[metadata_type] + data = metadata_class(**metadata) + except Exception as e: + err = f"metadata.yaml格式错误: {e}" + LOGGER.error(err) + raise RuntimeError(err) from e + + yaml_data = data.model_dump(by_alias=True, exclude_none=True) + resource_path.write_text(yaml.safe_dump(yaml_data)) diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 13291edec..14601c20b 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -8,51 +8,42 @@ from typing import Any import yaml from apps.constants import LOGGER +from apps.entities.pool import NodePool from apps.scheduler.openapi import ( ReducedOpenAPISpec, reduce_openapi_spec, ) -from apps.scheduler.pool.util import get_bytes_hash class OpenAPILoader: """OpenAPI文档载入器""" - @staticmethod - def _load_spec(yaml_path: str) -> tuple[str, ReducedOpenAPISpec]: + @classmethod + def load(cls, yaml_path: str) -> ReducedOpenAPISpec: """从本地磁盘加载OpenAPI文档""" path = Path(yaml_path) if not path.exists(): msg = f"File not found: {yaml_path}" raise FileNotFoundError(msg) - with path.open(mode="rb") as f: - content = f.read() - hash_value = get_bytes_hash(content) - spec = yaml.safe_load(content) - return hash_value, reduce_openapi_spec(spec) + with path.open(mode="r") as f: + spec = yaml.safe_load(f) + + return reduce_openapi_spec(spec) @classmethod - def _process_spec(cls, spec: ReducedOpenAPISpec) -> dict[str, Any]: - """处理OpenAPI文档""" - pass + def _process_spec(cls, spec: ReducedOpenAPISpec) -> list[NodePool]: + """将OpenAPI文档拆解为Node""" + - @staticmethod - async def load_one(yaml_path: str) -> None: + @classmethod + async def load_one(cls, yaml_path: str) -> None: """加载单个OpenAPI文档,可以直接指定路径""" try: - hash_val, spec_raw = OpenAPILoader._load_spec(yaml_path) + spec = cls.load(yaml_path) except Exception as e: err = f"加载OpenAPI文档失败:{e}" LOGGER.error(msg=err) raise RuntimeError(err) from e - - - - @classmethod - def load(cls) -> ReducedOpenAPISpec: - """执行OpenAPI文档的加载""" - pass - diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py new file mode 100644 index 000000000..d5ef4254b --- /dev/null +++ b/apps/scheduler/pool/loader/service.py @@ -0,0 +1,40 @@ +"""加载配置文件夹的Service部分 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from pathlib import Path +from typing import Any + +from apps.common.config import config +from apps.models.mongo import MongoDB +from apps.scheduler.pool.loader.metadata import MetadataLoader + + +class ServiceLoader: + """Service 加载器""" + + _collection = MongoDB.get_collection("service") + + + @classmethod + async def load(cls, service_dir: Path) -> None: + """加载单个Service""" + service_path = Path(config["SERVICE_DIR"]) / "service" / service_dir + # 载入元数据 + metadata = MetadataLoader.load(service_path / "metadata.yaml") + # 载入OpenAPI文档 + + + + + + @staticmethod + async def save(cls) -> dict[str, Any]: + """加载所有Service""" + pass + + + @staticmethod + async def load_all(cls) -> dict[str, Any]: + """执行Service的加载""" + pass diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index 7eae3b3b2..1f4040e96 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -1,24 +1,24 @@ -"""配置池,包含语义接口、应用等的载入和保存 +"""资源池,包含语义接口、应用等的载入和保存 Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ class Pool: - """配置池""" + """资源池""" @classmethod def load(cls) -> None: - """加载配置池""" + """加载全部文件系统内的资源""" pass @classmethod def save(cls, *, is_deletion: bool = False) -> None: - """保存配置池""" + """保存【单个】资源""" pass @classmethod def get_flow(cls, app_id: str, flow_id: str) -> None: - """获取Flow""" + """获取【单个】Flow完整数据""" pass diff --git a/sample/apps/test_app/flows/flow.yaml b/sample/apps/test_app/flows/flow.yaml index 5d558dc40..5f52807af 100644 --- a/sample/apps/test_app/flows/flow.yaml +++ b/sample/apps/test_app/flows/flow.yaml @@ -13,8 +13,9 @@ on_error: 错误信息:{{ error.message }} # 各个节点定义 -nodes: +steps: - id: query_data # 节点的Pool ID + node: api name: 查询数据 # 节点名称 description: 从API中查询测试数据 # 节点描述 pos: # 节点在画布上的位置 @@ -23,6 +24,7 @@ nodes: params: # 节点的参数 endpoint: GET /api/test # API Endpoint名称 - id: check_data + node: check_data name: 判断数据 description: 判断工具的返回值是否包含有效数据 pos: # 节点在画布上的位置 @@ -36,6 +38,7 @@ nodes: - branch: invalid description: 返回值不存在有效数据 - id: gen_reply + node: llm name: 生成回复 description: 使用大模型生成回复 pos: # 节点在画布上的位置 @@ -59,6 +62,7 @@ nodes: 使用自然语言解释这一信息,并展示为Markdown列表。 - id: format_output + node: format_output name: 格式化输出 description: 按照特定格式输出 pos: # 节点在画布上的位置 -- Gitee