From e80c7fadc14717e7007bb6f9aff358db58b91433 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Tue, 25 Feb 2025 17:03:23 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20App=20/=20Service=20=E6=89=80=E6=9C=89?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=BB=A5=E6=9C=AC=E5=9C=B0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E4=B8=BA=E7=AC=AC=E4=B8=80=E5=85=AC=E6=B0=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- apps/entities/appcenter.py | 11 ++++- apps/entities/collection.py | 2 + apps/entities/flow.py | 3 ++ apps/entities/pool.py | 2 - apps/manager/appcenter.py | 54 +++++++++++--------- apps/manager/service.py | 45 +++++++++++------ apps/routers/appcenter.py | 12 ++++- apps/scheduler/pool/loader/app.py | 71 ++++++++++++++------------- apps/scheduler/pool/loader/flow.py | 2 +- apps/scheduler/pool/loader/service.py | 37 +++++--------- 10 files changed, 137 insertions(+), 102 deletions(-) diff --git a/apps/entities/appcenter.py b/apps/entities/appcenter.py index eb02d6fb0..3baa849c1 100644 --- a/apps/entities/appcenter.py +++ b/apps/entities/appcenter.py @@ -37,6 +37,15 @@ class AppPermissionData(BaseModel): ) +class AppFlowInfo(BaseModel): + """应用工作流数据结构""" + + id: str = Field(..., description="工作流ID") + name: str = Field(..., description="工作流名称") + description: str = Field(..., description="工作流简介") + debug: bool = Field(..., description="是否经过调试") + + class AppData(BaseModel): """应用信息数据结构""" @@ -49,4 +58,4 @@ class AppData(BaseModel): history_len: int = Field(3, alias="dialogRounds", ge=1, le=10, description="对话轮次(1~10)") permission: AppPermissionData = Field( default_factory=lambda: AppPermissionData(authorizedUsers=None), description="权限配置") - workflows: list[dict] = Field(default=[], description="工作流ID,名称列表") + workflows: list[AppFlowInfo] = Field(default=[], description="工作流信息列表") diff --git a/apps/entities/collection.py b/apps/entities/collection.py index f542cd9da..4a74f0355 100644 --- a/apps/entities/collection.py +++ b/apps/entities/collection.py @@ -57,6 +57,8 @@ class User(BaseModel): conversations: list[str] = [] domains: list[UserDomainData] = [] app_usage: dict[str, AppUsageData] = {} + fav_apps: list[str] = [] + fav_services: list[str] = [] class Conversation(BaseModel): diff --git a/apps/entities/flow.py b/apps/entities/flow.py index 258d6690a..5382b12a1 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -11,6 +11,7 @@ from apps.entities.enum_var import ( MetadataType, PermissionType, ) +from apps.entities.pool import AppFlow class StepPos(BaseModel): @@ -134,10 +135,12 @@ class AppMetadata(MetadataBase): """App的元数据""" type: MetadataType = MetadataType.APP + published: bool = Field(description="是否发布", default=False) links: list[AppLink] = Field(description="相关链接", default=[]) first_questions: list[str] = Field(description="首次提问", default=[]) history_len: int = Field(description="对话轮次", default=3, le=10) permission: Optional[Permission] = Field(description="应用权限配置", default=None) + flows: list[AppFlow] = Field(description="Flow列表", default=[]) class ServiceApiSpec(BaseModel): diff --git a/apps/entities/pool.py b/apps/entities/pool.py index 45b3c3d63..0b4d103dd 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -38,7 +38,6 @@ class ServicePool(BaseData): author: str = Field(description="作者的用户ID") permission: Permission = Field(description="服务可见性配置", default=Permission()) - favorites: list[str] = Field(description="收藏此服务的用户列表", default=[]) hashes: dict[str, str] = Field(description="服务关联的 OpenAPI YAML 和元数据文件哈希") @@ -116,5 +115,4 @@ class AppPool(BaseData): history_len: int = Field(3, ge=1, le=10, description="对话轮次(1~10)") permission: Permission = Field(description="应用权限配置", default=Permission()) flows: list[AppFlow] = Field(description="Flow列表", default=[]) - favorites: list[str] = Field(description="收藏此应用的用户列表", default=[]) hashes: dict[str, str] = Field(description="关联文件的hash值", default={}) diff --git a/apps/manager/appcenter.py b/apps/manager/appcenter.py index a683e0bdc..1605e1e4a 100644 --- a/apps/manager/appcenter.py +++ b/apps/manager/appcenter.py @@ -63,6 +63,7 @@ class AppCenterManager: ) # 执行应用搜索 apps, total_apps = await AppCenterManager._search_apps_by_filter(filters, page, page_size) + fav_apps = await AppCenterManager._get_favorite_app_ids_by_user(user_sub) # 构建返回的应用卡片列表 return [ AppCenterCardItem( @@ -71,7 +72,7 @@ class AppCenterManager: name=app.name, description=app.description, author=app.author, - favorited=(user_sub in app.favorites), + favorited=(app.id in fav_apps), published=app.published, ) for app in apps @@ -111,6 +112,7 @@ class AppCenterManager: else base_filter ) apps, total_apps = await AppCenterManager._search_apps_by_filter(filters, page, page_size) + fav_apps = await AppCenterManager._get_favorite_app_ids_by_user(user_sub) return [ AppCenterCardItem( appId=app.id, @@ -118,7 +120,7 @@ class AppCenterManager: name=app.name, description=app.description, author=app.author, - favorited=(user_sub in app.favorites), + favorited=(app.id in fav_apps), published=app.published, ) for app in apps @@ -145,10 +147,10 @@ class AppCenterManager: :return: 应用列表,总应用数 """ try: - fav_app = await AppCenterManager._get_favorite_app_ids_by_user(user_sub) + fav_apps = await AppCenterManager._get_favorite_app_ids_by_user(user_sub) # 搜索条件 base_filter = { - "_id": {"$in": fav_app}, + "_id": {"$in": fav_apps}, "published": True, } filters: dict[str, Any] = ( @@ -258,12 +260,11 @@ class AppCenterManager: app_data = AppPool.model_validate(await app_collection.find_one({"_id": app_id})) if not app_data: return False + metadata.flows = app_data.flows + metadata.published = app_data.published app_loader = AppLoader.remote() await app_loader.save.remote(metadata, app_id) # type: ignore[attr-type] ray.kill(app_loader) - # 如果工作流ID列表不一致,则需要取消发布状态 - if {flow.id for flow in app_data.flows} != set(data.workflows): - await app_collection.update_one({"_id": app_id}, {"$set": {"published": False}}) return True except Exception as e: LOGGER.error(f"[AppCenterManager] Update app failed: {e}") @@ -298,26 +299,28 @@ class AppCenterManager: """ try: app_collection = MongoDB.get_collection("app") + user_collection = MongoDB.get_collection("user") db_data = await app_collection.find_one({"_id": app_id}) if not db_data: return AppCenterManager.ModFavAppFlag.NOT_FOUND + db_user = await user_collection.find_one({"_id": user_sub}) + if not db_user: + return AppCenterManager.ModFavAppFlag.BAD_REQUEST + user_data = User.model_validate(db_user) - app_data = AppPool.model_validate(db_data) - already_favorited = user_sub in app_data.favorites - + already_favorited = app_id in user_data.fav_apps if favorited == already_favorited: return AppCenterManager.ModFavAppFlag.BAD_REQUEST if favorited: - await app_collection.update_one( - {"_id": app_id}, - {"$addToSet": {"favorites": user_sub}}, - upsert=True, + await user_collection.update_one( + {"_id": user_sub}, + {"$addToSet": {"fav_apps": app_id}}, ) else: - await app_collection.update_one( - {"_id": app_id}, - {"$pull": {"favorites": user_sub}}, + await user_collection.update_one( + {"_id": user_sub}, + {"$pull": {"fav_apps": app_id}}, ) return AppCenterManager.ModFavAppFlag.SUCCESS except Exception as e: @@ -339,14 +342,21 @@ class AppCenterManager: return False if app_data.author != user_sub: return False + # 删除应用 app_loader = AppLoader.remote() await app_loader.delete.remote(app_id) # type: ignore[attr-type] ray.kill(app_loader) user_collection = MongoDB.get_collection("user") - await user_collection.update_one( - {"_id": user_sub}, + # 删除用户使用记录 + await user_collection.update_many( + {f"app_usage.{app_id}": {"$exists": True}}, {"$unset": {f"app_usage.{app_id}": ""}}, ) + # 删除用户收藏 + await user_collection.update_many( + {"fav_apps": {"$in": [app_id]}}, + {"$pull": {"fav_apps": app_id}}, + ) return True except Exception as e: LOGGER.error(f"[AppCenterManager] Delete app failed: {e}") @@ -466,9 +476,9 @@ class AppCenterManager: async def _get_favorite_app_ids_by_user(user_sub: str) -> list[str]: """获取用户收藏的应用ID""" try: - app_collection = MongoDB.get_collection("app") - cursor = app_collection.find({"favorites": {"$in": [user_sub]}}) - return [AppPool.model_validate(doc).id async for doc in cursor] + user_collection = MongoDB.get_collection("user") + user_data = User.model_validate(await user_collection.find_one({"_id": user_sub})) + return user_data.fav_apps except Exception as e: LOGGER.info(f"[AppCenterManager] Get favorite app ids by user_sub failed: {e}") return [] diff --git a/apps/manager/service.py b/apps/manager/service.py index ac264ee23..b056df283 100644 --- a/apps/manager/service.py +++ b/apps/manager/service.py @@ -12,6 +12,7 @@ from jsonschema import ValidationError from apps.common.config import config from apps.constants import LOGGER, SERVICE_DIR +from apps.entities.collection import User from apps.entities.enum_var import SearchType from apps.entities.file_type import OpenAPI from apps.entities.flow import ServiceApiConfig, ServiceMetadata @@ -35,6 +36,7 @@ class ServiceCenterManager: """获取所有服务列表""" filters = ServiceCenterManager._build_filters({}, search_type, keyword) if keyword else {} service_pools, total_count = await ServiceCenterManager._search_service(filters, page, page_size) + fav_service_ids = await ServiceCenterManager._get_favorite_service_ids_by_user(user_sub) services = [ ServiceCardItem( serviceId=service_pool.id, @@ -42,7 +44,7 @@ class ServiceCenterManager: name=service_pool.name, description=service_pool.description, author=service_pool.author, - favorited=(user_sub in service_pool.favorites), + favorited=(service_pool.id in fav_service_ids), ) for service_pool in service_pools ] @@ -60,6 +62,7 @@ class ServiceCenterManager: base_filter = {"author": user_sub} filters = ServiceCenterManager._build_filters(base_filter, search_type, keyword) if keyword else base_filter service_pools, total_count = await ServiceCenterManager._search_service(filters, page, page_size) + fav_service_ids = await ServiceCenterManager._get_favorite_service_ids_by_user(user_sub) services = [ ServiceCardItem( serviceId=service_pool.id, @@ -67,7 +70,7 @@ class ServiceCenterManager: name=service_pool.name, description=service_pool.description, author=service_pool.author, - favorited=(user_sub in service_pool.favorites), + favorited=(service_pool.id in fav_service_ids), ) for service_pool in service_pools ] @@ -211,12 +214,13 @@ class ServiceCenterManager: service_id: str, ) -> bool: """删除服务""" - # 验证用户权限 service_collection = MongoDB.get_collection("service") + user_collection = MongoDB.get_collection("user") db_service = await service_collection.find_one({"_id": service_id}) if not db_service: msg = "Service not found" raise ValueError(msg) + # 验证用户权限 service_pool_store = ServicePool.model_validate(db_service) if service_pool_store.author != user_sub: msg = "Permission denied" @@ -225,6 +229,11 @@ class ServiceCenterManager: service_loader = ServiceLoader.remote() await service_loader.delete.remote(service_id) # type: ignore[attr-type] ray.kill(service_loader) + # 删除用户收藏 + await user_collection.update_many( + {"fav_services": {"$in": [service_id]}}, + {"$pull": {"fav_services": service_id}}, + ) return True @staticmethod @@ -236,23 +245,29 @@ class ServiceCenterManager: ) -> bool: """修改收藏状态""" service_collection = MongoDB.get_collection("service") + user_collection = MongoDB.get_collection("user") db_service = await service_collection.find_one({"_id": service_id}) if not db_service: msg = "Service not found" raise ValueError(msg) - service_pool_store = ServicePool.model_validate(db_service) - already_favorited = user_sub in service_pool_store.favorites + db_user = await user_collection.find_one({"sub": user_sub}) + if not db_user: + msg = "User not found" + raise ValueError(msg) + user_data = User.model_validate(db_user) + already_favorited = service_id in user_data.fav_services if already_favorited == favorited: return False if favorited: - service_pool_store.favorites.append(user_sub) - service_pool_store.favorites = list(set(service_pool_store.favorites)) + await user_collection.update_one( + {"_id": user_sub}, + {"$addToSet": {"fav_services": service_id}}, + ) else: - service_pool_store.favorites.remove(user_sub) - await service_collection.update_one( - {"_id": service_id}, - {"$set": service_pool_store.model_dump(exclude_none=True, by_alias=True)}, - ) + await user_collection.update_one( + {"_id": user_sub}, + {"$pull": {"fav_services": service_id}}, + ) return True @staticmethod @@ -277,9 +292,9 @@ class ServiceCenterManager: @staticmethod async def _get_favorite_service_ids_by_user(user_sub: str) -> list[str]: """获取用户收藏的服务ID""" - service_collection = MongoDB.get_collection("service") - cursor = service_collection.find({"favorites": {"$in": [user_sub]}}) - return [ServicePool.model_validate(doc).id async for doc in cursor] + user_collection = MongoDB.get_collection("user") + user_data = User.model_validate(await user_collection.find_one({"_id": user_sub})) + return user_data.fav_services @staticmethod def _validate_service_data(data: dict[str, Any]) -> bool: diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index 4560a909c..dc78c1f86 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -10,7 +10,7 @@ from fastapi.responses import JSONResponse from apps.dependency.csrf import verify_csrf_token from apps.dependency.user import get_user, verify_user -from apps.entities.appcenter import AppPermissionData +from apps.entities.appcenter import AppFlowInfo, AppPermissionData from apps.entities.enum_var import SearchType from apps.entities.request_data import CreateAppRequest, ModFavAppRequest from apps.entities.response_data import ( @@ -182,7 +182,15 @@ async def get_application( result={}, ).model_dump(exclude_none=True, by_alias=True), ) - workflows = [{"id": flow.id, "name": flow.name, "debug":flow.debug} for flow in app_data.flows] + workflows = [ + AppFlowInfo( + id=flow.id, + name=flow.name, + description=flow.description, + debug=flow.debug, + ) + for flow in app_data.flows + ] return JSONResponse( status_code=status.HTTP_200_OK, content=GetAppPropertyRsp( diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 996d2796f..35b7b2bc7 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -14,7 +14,7 @@ from sqlalchemy.dialects.postgresql import insert from apps.common.config import config from apps.constants import APP_DIR, FLOW_DIR, LOGGER from apps.entities.flow import AppMetadata, MetadataType, Permission -from apps.entities.pool import AppPool +from apps.entities.pool import AppFlow, AppPool from apps.entities.vector import AppPoolVector from apps.models.mongo import MongoDB from apps.models.postgres import PostgreSQL @@ -50,11 +50,25 @@ class AppLoader: flow_path = app_path / FLOW_DIR flow_loader = FlowLoader() - flows = [] + flow_ids = [app_flow.id for app_flow in metadata.flows] + metadata.flows = [] async for flow_file in flow_path.rglob("*.yaml"): + if flow_file.stem not in flow_ids: + LOGGER.warning(f"[AppLoader] 工作流 {flow_file} 不在元数据中") flow = await flow_loader.load(flow_file) - if flow: - flows.append(flow) + if not flow: + err = f"[AppLoader] 工作流 {flow_file} 加载失败" + LOGGER.error(err) + raise ValueError(err) + metadata.flows.append( + AppFlow( + _id=flow_file.stem, + name=flow.name, + description=flow.description, + path=str(flow_file), + debug=flow.debug, + ), + ) await self._update_db(metadata) @@ -65,13 +79,15 @@ class AppLoader: :param app_id: 应用 ID """ # 创建文件夹 - app_path = Path(config["SEMANTICS_DIR"]) / APP_DIR / app_id - if not await app_path.exists(): - await app_path.mkdir(parents=True, exist_ok=True) + app_path = pathlib.Path(config["SEMANTICS_DIR"]) / APP_DIR / app_id + if not app_path.exists(): + app_path.mkdir(parents=True, exist_ok=True) # 保存元数据 await MetadataLoader().save_one(MetadataType.APP, metadata, app_id) - # 保存工作流 - await self._update_db(metadata) + # 重新载入 + file_checker = FileChecker() + file_checker.diff_one(app_path) + await self.load(app_id, file_checker.hashes) async def delete(self, app_id: str) -> None: """删除App,并更新数据库 @@ -98,16 +114,16 @@ class AppLoader: async def _update_db(self, metadata: AppMetadata) -> None: """更新数据库""" if not metadata.hashes: - LOGGER.warning(f"[AppLoader] 应用 {metadata.id} 的哈希值为空") - # 重新计算哈希值 - metadata.hashes = FileChecker().check_one(pathlib.Path(config["SEMANTICS_DIR"]) / APP_DIR / metadata.id) + err = f"[AppLoader] 应用 {metadata.id} 的哈希值为空" + LOGGER.error(err) + raise ValueError(err) # 更新应用数据 try: app_collection = MongoDB.get_collection("app") - if not await app_collection.find_one({"_id": metadata.id}): - # 创建应用时需写入完整数据结构,自动初始化创建时间、flow列表、收藏列表和权限 - await app_collection.insert_one( - jsonable_encoder( + await app_collection.update_one( + {"_id": metadata.id}, + { + "$set": jsonable_encoder( AppPool( _id=metadata.id, icon=metadata.icon, @@ -118,28 +134,13 @@ class AppLoader: first_questions=metadata.first_questions, history_len=metadata.history_len, permission=metadata.permission if metadata.permission else Permission(), + flows=metadata.flows, hashes=metadata.hashes, ), ), - ) - else: - # 更新应用数据:部分映射 AppMetadata 到 AppPool,其他字段不更新 - await app_collection.update_one( - {"_id": metadata.id}, - { - "$set": { - "icon": metadata.icon, - "name": metadata.name, - "description": metadata.description, - "author": metadata.author, - "links": metadata.links, - "first_questions": metadata.first_questions, - "history_len": metadata.history_len, - "permission": metadata.permission if metadata.permission else Permission(), - "hashes": metadata.hashes, - }, - }, - ) + }, + upsert=True, + ) except Exception as e: err = f"[AppLoader] 更新 MongoDB 失败:{e}" LOGGER.error(err) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 7ccc2ead3..3140feb42 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -75,7 +75,7 @@ class FlowLoader: async def save(self, app_id: str, flow_id: str, flow: Flow) -> None: """保存工作流""" - flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + flow_path = Path(config["SERVICE_DIR"]) / APP_DIR / app_id / FLOW_DIR / f"{flow_id}.yaml" if not await flow_path.parent.exists(): await flow_path.parent.mkdir(parents=True) if not await flow_path.exists(): diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index d4e59414c..335816469 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -72,8 +72,9 @@ class ServiceLoader: await openapi_loader.save_one.remote(openapi_path, data) # type: ignore[arg-type] ray.kill(openapi_loader) # 重新载入 - hashes = FileChecker().check_one(service_path) - await self.load(service_id, hashes) + file_checker = FileChecker() + file_checker.diff_one(service_path) + await self.load(service_id, file_checker.hashes) async def delete(self, service_id: str) -> None: """删除Service,并更新数据库""" @@ -100,9 +101,9 @@ class ServiceLoader: async def _update_db(self, nodes: list[NodePool], metadata: ServiceMetadata) -> None: """更新数据库""" if not metadata.hashes: - LOGGER.warning(f"[ServiceLoader] 服务 {metadata.id} 的哈希值为空") - # 重新计算哈希值 - metadata.hashes = FileChecker().check_one(pathlib.Path(config["SEMANTICS_DIR"]) / "service" / metadata.id) + err = f"[ServiceLoader] 服务 {metadata.id} 的哈希值为空" + LOGGER.error(err) + raise ValueError(err) # 更新MongoDB service_collection = MongoDB.get_collection("service") node_collection = MongoDB.get_collection("node") @@ -110,10 +111,10 @@ class ServiceLoader: # 先删除旧的节点 await node_collection.delete_many({"service_id": metadata.id}) # 插入或更新 Service - if not await service_collection.find_one({"_id": metadata.id}): - # 创建服务时需写入完整数据结构,自动初始化创建时间、收藏列表和权限 - await service_collection.insert_one( - jsonable_encoder( + await service_collection.update_one( + {"_id": metadata.id}, + { + "$set": jsonable_encoder( ServicePool( _id=metadata.id, name=metadata.name, @@ -123,21 +124,9 @@ class ServiceLoader: hashes=metadata.hashes, ), ), - ) - else: - # 更新服务数据:部分映射 ServiceMetadata 到 ServicePool,其他字段不更新 - await service_collection.update_one( - {"_id": metadata.id}, - { - "$set": { - "name": metadata.name, - "description": metadata.description, - "author": metadata.author, - "permission": metadata.permission if metadata.permission else Permission(), - "hashes": metadata.hashes, - }, - }, - ) + }, + upsert=True, + ) for node in nodes: await node_collection.insert_one(jsonable_encoder(node)) except Exception as e: -- Gitee