From 25e6baddd552704b036caa7d01d58544e9334997 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Fri, 21 Feb 2025 16:04:18 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E8=BD=BD=E7=B3=BB=E7=BB=9FCall?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .dockerignore | 5 +- apps/common/config.py | 2 +- apps/entities/pool.py | 4 +- apps/main.py | 19 ++-- apps/models/postgres.py | 2 + apps/scheduler/pool/check.py | 10 +- apps/scheduler/pool/loader/app.py | 11 ++- apps/scheduler/pool/loader/call.py | 96 ++++++++++--------- apps/scheduler/pool/loader/metadata.py | 12 +-- apps/scheduler/pool/loader/openapi.py | 7 ++ apps/scheduler/pool/loader/service.py | 6 +- apps/scheduler/pool/pool.py | 50 ++++------ apps/scheduler/pool/util.py | 15 --- assets/.env.example | 5 +- .../euler_copilot/configs/framework/.env | 2 +- deploy/chart/euler_copilot/configs/rag/.env | 2 +- 16 files changed, 120 insertions(+), 128 deletions(-) delete mode 100644 apps/scheduler/pool/util.py diff --git a/.dockerignore b/.dockerignore index 859ebc26..728281bc 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,4 +6,7 @@ Dockerfile .vscode *.bak .gitignore -.git/ \ No newline at end of file +.git/ +deploy/ +docs/ +.DS_Store diff --git a/apps/common/config.py b/apps/common/config.py index 66dd553d..699fda48 100644 --- a/apps/common/config.py +++ b/apps/common/config.py @@ -85,7 +85,7 @@ class ConfigModel(BaseModel): SCHEDULER_MAX_TOKENS: int = Field(description="参数猜解最大Token数", default=8192) SCHEDULER_TEMPERATURE: float = Field(description="参数猜解温度", default=0.7) # 插件位置 - SERVICE_DIR: Optional[str] = Field(description="插件路径", default=None) + SEMANTICS_DIR: Optional[str] = Field(description="语义配置路径", default=None) # SQL接口路径 SQL_URL: str = Field(description="Chat2DB接口路径") diff --git a/apps/entities/pool.py b/apps/entities/pool.py index 810b6358..fc35cfce 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -73,8 +73,8 @@ class NodePool(BaseData): service_id: Optional[str] = Field(description="Node所属的Service ID", default=None) call_id: str = Field(description="所使用的Call的ID") - path: str = Field(description="Node的路径") - known_params: dict[str, Any] = Field(description="已知的用于Call部分的参数", default={}) + path: Optional[str] = Field(description="Node的路径", default=None) + known_params: Optional[dict[str, Any]] = Field(description="已知的用于Call部分的参数", default=None) class AppFlow(BaseData): diff --git a/apps/main.py b/apps/main.py index 37f8309e..5278d1fe 100644 --- a/apps/main.py +++ b/apps/main.py @@ -33,8 +33,7 @@ from apps.routers import ( record, service, ) - -# from apps.scheduler.pool.loader import Loader +from apps.scheduler.pool.pool import Pool # 定义FastAPI app app = FastAPI(docs_url=None, redoc_url=None) @@ -69,8 +68,6 @@ scheduler.start() scheduler.add_job(DeleteUserCron.delete_user, "cron", hour=3) # 包装Ray - - @serve.deployment(ray_actor_options={"num_gpus": 0}) @serve.ingress(app) class FastAPIWrapper: @@ -79,10 +76,16 @@ class FastAPIWrapper: # 运行 if __name__ == "__main__": - # 初始化 - WordsCheck.init() - # Loader.init() - # 启动Ray + # 初始化Ray ray.init(dashboard_host="0.0.0.0", num_cpus=4) # noqa: S104 + + # 初始化必要资源 + WordsCheck.init() + + pool_actor = Pool.remote() + ray.get(pool_actor.init.remote()) # type: ignore[] + ray.kill(pool_actor) + + # 启动FastAPI serve.start(http_options=HTTPOptions(host="0.0.0.0", port=8002)) # noqa: S104 serve.run(FastAPIWrapper.bind(), blocking=True) diff --git a/apps/models/postgres.py b/apps/models/postgres.py index f76fd74e..461d398c 100644 --- a/apps/models/postgres.py +++ b/apps/models/postgres.py @@ -37,6 +37,8 @@ class PostgreSQL: @classmethod async def get_session(cls) -> AsyncSession: """获取异步会话""" + if not cls._is_inited: + await cls.init() return async_sessionmaker(cls._engine, class_=AsyncSession, expire_on_commit=False)() diff --git a/apps/scheduler/pool/check.py b/apps/scheduler/pool/check.py index 6270b334..9a2954e9 100644 --- a/apps/scheduler/pool/check.py +++ b/apps/scheduler/pool/check.py @@ -2,12 +2,12 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +from hashlib import sha256 from pathlib import Path from apps.common.config import config from apps.entities.enum_var import MetadataType from apps.models.mongo import MongoDB -from apps.scheduler.pool.util import get_long_hash class FileChecker: @@ -16,7 +16,7 @@ class FileChecker: def __init__(self) -> None: """初始化文件检查器""" self._hashes = {} - self._dir_path = Path(config["SERVICE_DIR"]) + self._dir_path = Path(config["SEMANTICS_DIR"]) def check_one(self, path: Path) -> None: @@ -30,7 +30,7 @@ class FileChecker: for file in path.iterdir(): if file.is_file(): - self._hashes[str(file.relative_to(self._dir_path))] = get_long_hash(file.read_bytes()) + self._hashes[str(file.relative_to(self._dir_path))] = sha256(file.read_bytes()).hexdigest() elif file.is_dir(): self.check_one(file) @@ -46,10 +46,10 @@ class FileChecker: """生成更新列表和删除列表""" if check_type == MetadataType.APP: collection = MongoDB.get_collection("app") - self._dir_path = Path(config["SERVICE_DIR"]) / "app" + self._dir_path = Path(config["SEMANTICS_DIR"]) / "app" elif check_type == MetadataType.SERVICE: collection = MongoDB.get_collection("service") - self._dir_path = Path(config["SERVICE_DIR"]) / "service" + self._dir_path = Path(config["SEMANTICS_DIR"]) / "service" changed_list = [] deleted_list = [] diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 4738cfd8..75573d6b 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -2,9 +2,12 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from pathlib import Path +from typing import Any + +from anyio import Path from apps.common.config import config +from apps.entities.flow import MetadataType from apps.scheduler.pool.loader.metadata import MetadataLoader @@ -12,13 +15,13 @@ class AppLoader: """应用加载器""" @classmethod - def load(cls, app_dir: str) -> None: + async def load(cls, app_dir: str) -> None: """从文件系统中加载应用 :param app_dir: 应用目录 """ - path = Path(config["SERVICE_DIR"]) / app_dir - metadata = MetadataLoader.load(path / "metadata.yaml") + path = Path(config["SEMANTICS_DIR"]) / app_dir + metadata = await MetadataLoader.load(path / "metadata.yaml") @classmethod diff --git a/apps/scheduler/pool/loader/call.py b/apps/scheduler/pool/loader/call.py index e6f6f4c9..3332e5aa 100644 --- a/apps/scheduler/pool/loader/call.py +++ b/apps/scheduler/pool/loader/call.py @@ -4,9 +4,10 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ import importlib import sys +from hashlib import shake_128 from pathlib import Path -from pydantic import BaseModel +from sqlalchemy.dialects.postgresql import insert import apps.scheduler.call as system_call from apps.common.config import config @@ -19,11 +20,14 @@ from apps.entities.pool import ( from apps.entities.vector import NodePoolVector from apps.models.mongo import MongoDB from apps.models.postgres import PostgreSQL -from apps.scheduler.pool.util import get_short_hash class CallLoader: - """Call 加载器""" + """Call 加载器 + + 系统Call放在apps.scheduler.call下 + 用户Call放在call下 + """ @staticmethod def _check_class(user_cls) -> bool: # noqa: ANN001 @@ -34,28 +38,28 @@ class CallLoader: flag = False if not hasattr(user_cls, "description") or not isinstance(user_cls.description, str): flag = False - if not hasattr(user_cls, "params") or not issubclass(user_cls.params, BaseModel): + if not hasattr(user_cls, "output_schema") or not isinstance(user_cls.output_schema, dict): + flag = False + if not hasattr(user_cls, "params_schema") or not isinstance(user_cls.params_schema, dict): flag = False if not hasattr(user_cls, "init") or not callable(user_cls.init): flag = False - if not hasattr(user_cls, "call") or not callable(user_cls.call): + if not callable(user_cls) or not callable(user_cls.__call__): flag = False - if not flag: - LOGGER.info(msg=f"类{user_cls.__name__}不符合Call标准要求。") - return flag - @classmethod - async def _load_system_call(cls) -> tuple[list[CallPool], list[NodePool]]: + + async def _load_system_call(self) -> tuple[list[CallPool], list[NodePool]]: """加载系统Call""" call_metadata = [] node_metadata = [] + # 检查合法性 for call_id in system_call.__all__: call_cls = getattr(system_call, call_id) - if not cls._check_class(call_cls): - err = f"类{call_cls.__name__}不符合Call标准要求。" + if not self._check_class(call_cls): + err = f"系统类{call_cls.__name__}不符合Call标准要求。" LOGGER.info(msg=err) continue @@ -65,7 +69,7 @@ class CallLoader: type=CallType.SYSTEM, name=call_cls.name, description=call_cls.description, - path=call_id, + path=f"apps.scheduler.call.{call_id}", ), ) @@ -75,19 +79,18 @@ class CallLoader: call_id=call_id, name=call_cls.name, description=call_cls.description, - path=call_id, ), ) return call_metadata, node_metadata - @classmethod - async def _load_single_call_dir(cls, call_name: str) -> tuple[list[CallPool], list[NodePool]]: + + async def _load_single_call_dir(self, call_name: str) -> tuple[list[CallPool], list[NodePool]]: """加载单个Call package""" call_metadata = [] node_metadata = [] - call_dir = Path(config["SERVICE_DIR"]) / CALL_DIR / call_name + call_dir = Path(config["SEMANTICS_DIR"]) / CALL_DIR / call_name if not (call_dir / "__init__.py").exists(): LOGGER.info(msg=f"模块{call_dir}不存在__init__.py文件,尝试自动创建。") try: @@ -103,9 +106,11 @@ class CallLoader: err = f"载入模块call.{call_name}失败:{e}。" raise RuntimeError(err) from e + sys.modules["call." + call_name] = call_package + # 已载入包,处理包中每个工具 if not hasattr(call_package, "__all__"): - err = f"包call.{call_name}不符合模块要求,无法处理。" + err = f"模块call.{call_name}不符合模块要求,无法处理。" LOGGER.info(msg=err) raise ValueError(err) @@ -113,24 +118,24 @@ class CallLoader: try: call_cls = getattr(call_package, call_id) except Exception as e: - err = f"载入工具{call_name}.{call_id}失败:{e};跳过载入。" + err = f"载入工具call.{call_name}.{call_id}失败:{e};跳过载入。" LOGGER.info(msg=err) continue - if not cls._check_class(call_cls): - err = f"工具{call_name}.{call_id}不符合标准要求;跳过载入。" + if not self._check_class(call_cls): + err = f"工具call.{call_name}.{call_id}不符合标准要求;跳过载入。" LOGGER.info(msg=err) continue cls_path = f"{call_package.service}::call.{call_name}.{call_id}" - cls_hash = get_short_hash(cls_path.encode()) + cls_hash = shake_128(cls_path.encode()).hexdigest(8) call_metadata.append( CallPool( _id=cls_hash, type=CallType.PYTHON, name=call_cls.name, description=call_cls.description, - path=cls_path, + path=f"call.{call_name}.{call_id}", ), ) node_metadata.append( @@ -139,16 +144,14 @@ class CallLoader: call_id=cls_hash, name=call_cls.name, description=call_cls.description, - path=cls_path, ), ) return call_metadata, node_metadata - @classmethod - async def _load_all_user_call(cls) -> tuple[list[CallPool], list[NodePool]]: + async def _load_all_user_call(self) -> tuple[list[CallPool], list[NodePool]]: """加载Python Call""" - call_dir = Path(config["SERVICE_DIR"]) / CALL_DIR + call_dir = Path(config["SEMANTICS_DIR"]) / CALL_DIR call_metadata = [] node_metadata = [] @@ -169,7 +172,7 @@ class CallLoader: continue # 载入包 try: - call_metadata, node_metadata = await CallLoader._load_single_call_dir(call_file.name) + call_metadata, node_metadata = await self._load_single_call_dir(call_file.name) call_metadata.extend(call_metadata) node_metadata.extend(node_metadata) @@ -185,8 +188,7 @@ class CallLoader: # 更新数据库 - @staticmethod - async def _update_db(call_metadata: list[CallPool], node_metadata: list[NodePool]) -> None: + async def _update_db(self, call_metadata: list[CallPool], node_metadata: list[NodePool]) -> None: """更新数据库;call和node下标一致""" # 更新MongoDB call_collection = MongoDB.get_collection("call") @@ -208,16 +210,18 @@ class CallLoader: session = await PostgreSQL.get_session() node_vecs = await PostgreSQL.get_embedding(node_descriptions) for i, data in enumerate(node_vecs): - node_vec = NodePoolVector( - _id=node_metadata[i].id, + insert_stmt = insert(NodePoolVector).values( + id=node_metadata[i].id, embedding=data, + ).on_conflict_do_update( + index_elements=["id"], + set_={"embedding": data}, ) - session.add(node_vec) + await session.execute(insert_stmt) await session.commit() - @staticmethod - async def init() -> None: + async def load(self) -> None: """初始化Call信息""" # 清空collection call_collection = MongoDB.get_collection("call") @@ -230,27 +234,32 @@ class CallLoader: # 载入所有已知的Call信息 try: - sys_call_metadata, sys_node_metadata = await CallLoader._load_system_call() + sys_call_metadata, sys_node_metadata = await self._load_system_call() except Exception as e: err = f"载入系统Call信息失败:{e};停止运行。" LOGGER.error(msg=err) raise RuntimeError(err) from e - user_call_metadata, user_node_metadata = await CallLoader._load_all_user_call() + try: + user_call_metadata, user_node_metadata = await self._load_all_user_call() + except Exception as e: + err = f"载入用户Call信息失败:{e};只可使用基本功能。" + LOGGER.error(msg=err) + user_call_metadata = [] + user_node_metadata = [] # 合并Call元数据 call_metadata = sys_call_metadata + user_call_metadata node_metadata = sys_node_metadata + user_node_metadata # 更新数据库 - await CallLoader._update_db(call_metadata, node_metadata) + await self._update_db(call_metadata, node_metadata) - @staticmethod - async def load_one(call_name: str) -> None: + async def load_one(self, call_name: str) -> None: """加载单个Call""" try: - call_metadata, node_metadata = await CallLoader._load_single_call_dir(call_name) + call_metadata, node_metadata = await self._load_single_call_dir(call_name) except Exception as e: err = f"载入Call信息失败:{e}。" LOGGER.error(msg=err) @@ -258,11 +267,10 @@ class CallLoader: # 有数据时更新数据库 if call_metadata: - await CallLoader._update_db(call_metadata, node_metadata) + await self._update_db(call_metadata, node_metadata) - @staticmethod - async def get() -> list[CallPool]: + async def get(self) -> list[CallPool]: """获取当前已知的所有Python Call元数据""" call_collection = MongoDB.get_collection("call") result: list[CallPool] = [] diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index 1038ac90..3b087f47 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -20,8 +20,8 @@ class MetadataLoader: """元数据加载器""" @classmethod - async def load(cls, file_path: Path) -> Union[AppMetadata, ServiceMetadata]: - """加载【单个】元数据""" + async def load_one(cls, file_path: Path) -> Union[AppMetadata, ServiceMetadata]: + """加载单个元数据""" # 检查yaml格式 try: metadata_dict = yaml.safe_load(await file_path.read_text()) @@ -51,8 +51,8 @@ class MetadataLoader: @classmethod - async def save(cls, metadata_type: MetadataType, metadata: dict[str, Any], resource_id: str) -> None: - """保存【单个】元数据""" + async def save_one(cls, metadata_type: MetadataType, metadata: dict[str, Any], resource_id: str) -> None: + """保存单个元数据""" class_dict = { MetadataType.APP: AppMetadata, MetadataType.SERVICE: ServiceMetadata, @@ -60,9 +60,9 @@ class MetadataLoader: # 检查资源路径 if metadata_type == MetadataType.APP: - resource_path = Path(config["SERVICE_DIR"]) / "app" / resource_id / "metadata.yaml" + resource_path = Path(config["SEMANTICS_DIR"]) / "app" / resource_id / "metadata.yaml" elif metadata_type == MetadataType.SERVICE: - resource_path = Path(config["SERVICE_DIR"]) / "service" / resource_id / "metadata.yaml" + resource_path = Path(config["SEMANTICS_DIR"]) / "service" / resource_id / "metadata.yaml" # 保存元数据 try: diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index a5ff7d0f..9f72f701 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -62,6 +62,7 @@ class OpenAPILoader: return nodes + @classmethod async def load_one(cls, yaml_folder: Path, service_metadata: ServiceMetadata) -> list[NodePool]: """加载单个OpenAPI文档,可以直接指定路径""" @@ -75,3 +76,9 @@ class OpenAPILoader: service_id = yaml_folder.parent.name return cls._process_spec(service_id, spec, service_metadata) + + + @classmethod + async def save_one(cls, nodes: list[NodePool]) -> None: + """保存单个OpenAPI文档""" + pass diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index 63ef4a4f..4ad9182a 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -24,9 +24,9 @@ class ServiceLoader: @classmethod - async def load_one(cls, service_dir: Path) -> None: + async def load_one(cls, service_id: str) -> None: """加载单个Service""" - service_path = Path(config["SERVICE_DIR"]) / "service" / service_dir + service_path = Path(config["SEMANTICS_DIR"]) / "service" / service_id # 载入元数据 metadata = await MetadataLoader.load(service_path / "metadata.yaml") if not isinstance(metadata, ServiceMetadata): @@ -57,7 +57,7 @@ class ServiceLoader: node_vecs = await PostgreSQL.get_embedding(node_descriptions) for i, data in enumerate(node_vecs): node_vec = NodePoolVector( - _id=nodes[i].id, + id=nodes[i].id, embedding=data, ) session.add(node_vec) diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index a772d732..db79c9db 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -2,50 +2,34 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from apps.scheduler.pool.check import FileChecker - +import ray -from apps.entities.flow_topology import DependencyItem, FlowItem, NodeItem, PositionItem +from apps.entities.enum_var import MetadataType +from apps.entities.flow_topology import FlowItem +from apps.scheduler.pool.check import FileChecker +from apps.scheduler.pool.loader import CallLoader +@ray.remote class Pool: """资源池""" - @classmethod - def load(cls) -> None: + async def init(self) -> None: """加载全部文件系统内的资源""" - pass + # 加载Call + await CallLoader().load() + + # 加载Services + checker = FileChecker() + changed_service, deleted_service = await checker.diff(MetadataType.SERVICE) + # 加载App + changed_app, deleted_app = await checker.diff(MetadataType.APP) - @classmethod - def save(cls, *, is_deletion: bool = False) -> None: + def save(self, *, is_deletion: bool = False) -> None: """保存【单个】资源""" pass - @classmethod - def get_flow(cls, app_id: str, flow_id: str) -> FlowItem: - ret = FlowItem( - { - "flowId": flow_id, - "nodes": [ - (NodeItem){ - "name": "test", - "node_id": "test", - "type": "test", - "parameters": {}, - "position": (PositionItem){ - "x": 0, - "y": 0, - }, - "editable": true, - "enable": true, - "description":"", - } - ], - "edeges": [], - "editable": true, - "enable": true, - ) - return FlowItem + def get_flow(self, app_id: str, flow_id: str) -> FlowItem: pass diff --git a/apps/scheduler/pool/util.py b/apps/scheduler/pool/util.py deleted file mode 100644 index 11a75c53..00000000 --- a/apps/scheduler/pool/util.py +++ /dev/null @@ -1,15 +0,0 @@ -"""工具函数 - -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. -""" -from hashlib import sha256, shake_128 - - -def get_short_hash(s: bytes) -> str: - """获取字节串的哈希值""" - return shake_128(s).hexdigest(8) - - -def get_long_hash(s: bytes) -> str: - """获取字节串的哈希值""" - return sha256(s).hexdigest() diff --git a/assets/.env.example b/assets/.env.example index 71443fe1..39bffab2 100644 --- a/assets/.env.example +++ b/assets/.env.example @@ -29,9 +29,6 @@ WORDS_LIST= # logging LOG= -# Vectorize -VECTORIZE_HOST= - # RAG RAG_HOST= @@ -83,7 +80,7 @@ SCHEDULER_MAX_TOKENS=8192 SCHEDULER_TEMPERATURE=0.7 # 插件 -SERVICE_DIR= +SEMANTICS_DIR= # SQL SQL_URL= diff --git a/deploy/chart/euler_copilot/configs/framework/.env b/deploy/chart/euler_copilot/configs/framework/.env index 99e48bca..bf15c85b 100644 --- a/deploy/chart/euler_copilot/configs/framework/.env +++ b/deploy/chart/euler_copilot/configs/framework/.env @@ -34,7 +34,7 @@ SESSION_TTL=30 LOG="stdout" # Embedding -EMBEDDING_URL={{ .Values.models.embedding.url }} +EMBEDDING_URL={{ .Values.models.embedding.url }}/v1/embeddings EMBEDDING_KEY={{ .Values.models.embedding.key }} EMBEDDING_MODEL={{ .Values.models.embedding.name }} diff --git a/deploy/chart/euler_copilot/configs/rag/.env b/deploy/chart/euler_copilot/configs/rag/.env index ed3c3b23..031fc8f1 100644 --- a/deploy/chart/euler_copilot/configs/rag/.env +++ b/deploy/chart/euler_copilot/configs/rag/.env @@ -24,7 +24,7 @@ REDIS_PWD=${redis-password} TASK_RETRY_TIME=3 # Embedding Service -EMBEDDING_ENDPOINT={{ .Values.models.embedding.url }}/embeddings +EMBEDDING_ENDPOINT={{ .Values.models.embedding.url }}/v1/embeddings EMBEDDING_API_KEY={{ .Values.models.embedding.key }} EMBEDDING_MODEL_NAME={{ .Values.models.embedding.name }} -- Gitee