diff --git a/apps/scheduler/call/graph/graph.py b/apps/scheduler/call/graph/graph.py index 893d1a3209d23fea078116735794993241271742..71443eb271d753b300a60ecc167d906ea496f787 100644 --- a/apps/scheduler/call/graph/graph.py +++ b/apps/scheduler/call/graph/graph.py @@ -41,10 +41,9 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): """初始化Render Call,校验参数,读取option模板""" try: option_location = Path(__file__).parent / "option.json" - f = await Path(option_location).open(encoding="utf-8") - data = await f.read() + with open(option_location, encoding="utf-8") as f: + data = f.read() self._option_template = json.loads(data) - await f.aclose() except Exception as e: raise CallError(message=f"图表模板读取失败:{e!s}", data={}) from e @@ -52,14 +51,14 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): if not self.dataset_key: last_step_id = call_vars.history_order[-1] self.dataset_key = f"{last_step_id}/dataset" - data = self._extract_history_variables(self.dataset_key, call_vars.history) + data = self._extract_history_variables( + self.dataset_key, call_vars.history) return RenderInput( question=call_vars.question, data=data, ) - async def _exec( self, input_data: dict[str, Any], language: LanguageType = LanguageType.CHINESE ) -> AsyncGenerator[CallOutputChunk, None]: @@ -96,9 +95,11 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): self.tokens.output_tokens += style_obj.output_tokens add_style = llm_output.get("additional_style", "") - self._parse_options(column_num, llm_output["chart_type"], add_style, llm_output["scale_type"]) + self._parse_options( + column_num, llm_output["chart_type"], add_style, llm_output["scale_type"]) except Exception as e: - raise CallError(message=f"图表生成失败:{e!s}", data={"data": data}) from e + raise CallError(message=f"图表生成失败:{e!s}", data={ + "data": data}) from e yield CallOutputChunk( type=CallOutputType.DATA, @@ -107,7 +108,6 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): ).model_dump(exclude_none=True, by_alias=True), ) - @staticmethod def _separate_key_value(data: list[dict[str, Any]]) -> list[dict[str, Any]]: """ @@ -124,7 +124,6 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): result.append({"type": key, "value": val}) return result - def _parse_options( self, column_num: int, chart_style: str, additional_style: str, scale_style: str ) -> None: diff --git a/apps/scheduler/pool/loader/btdl.py b/apps/scheduler/pool/loader/btdl.py index 072303b7fa7e0ebd1ca4ed8ef3258ff64431aed8..ae50a034d183f0f6a57036c1124b6139c889bdb0 100644 --- a/apps/scheduler/pool/loader/btdl.py +++ b/apps/scheduler/pool/loader/btdl.py @@ -14,6 +14,8 @@ btdl_spec = [] {"docker": ("描述", [{全局options}], {"cmd1名字": ("cmd1描述", "cmd1用法", [{cmd1选项}], [{cmd1参数}], "cmd1例子")})} """ + + class BTDLLoader: """二进制描述文件 加载器""" @@ -84,7 +86,8 @@ class BTDLLoader: new_item.pop("name") options.update({option_name: new_item}) - id = hashlib.md5(f"o_{binary_name}_sub_{name}_{option_name}".encode()).hexdigest() + id = hashlib.md5( + f"o_{binary_name}_sub_{name}_{option_name}".encode()).hexdigest() option_docs.append(DocumentWrapper( id=id, data=new_item["description"], @@ -118,7 +121,8 @@ class BTDLLoader: new_item.pop("name") arguments.update({argument_name: new_item}) - id = hashlib.md5(f"a_{binary_name}_sub_{name}_{argument_name}".encode()).hexdigest() + id = hashlib.md5( + f"a_{binary_name}_sub_{name}_{argument_name}".encode()).hexdigest() arguments_docs.append(DocumentWrapper( id=id, data=new_item["description"], @@ -139,7 +143,8 @@ class BTDLLoader: examples = "以下是几组命令行,以及它的作用的示例:\n" for items in subcmd_spec["examples"]: - examples += "`{}`: {}\n".format(items["command"], items["description"]) + examples += "`{}`: {}\n".format( + items["command"], items["description"]) else: examples = "" @@ -167,7 +172,8 @@ class BTDLLoader: new_item.pop("name") result.update({name: new_item}) - id = hashlib.md5(f"g_{binary_name}_{name}".encode()).hexdigest() + id = hashlib.md5( + f"g_{binary_name}_{name}".encode()).hexdigest() result_doc.append(DocumentWrapper( id=id, data=new_item["description"], @@ -206,7 +212,8 @@ class BTDLLoader: sub_cmds_doc = [] for sub_cmd in cmd_spec["commands"]: sub_cmds.update(self._load_single_subcmd(key, sub_cmd)) - id = hashlib.md5(f"s_{key}_{sub_cmd['name']}".encode()).hexdigest() + id = hashlib.md5( + f"s_{key}_{sub_cmd['name']}".encode()).hexdigest() sub_cmds_doc.append(DocumentWrapper( id=id, data=sub_cmd["description"], diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 332528cc71b7115c4719f0dbc07f0abcb2b60967..26e1c2953f40b494a456c470366c7fecf5d70bd2 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -36,8 +36,8 @@ class FlowLoader: async def _load_yaml_file(self, flow_path: Path) -> dict[str, Any]: """从YAML文件加载工作流配置""" try: - async with aiofiles.open(flow_path, encoding="utf-8") as f: - return yaml.safe_load(await f.read()) + with open(flow_path, encoding="utf-8") as f: + return yaml.safe_load(f.read()) except Exception: logger.exception("[FlowLoader] 加载YAML文件失败:%s", flow_path) return {} @@ -207,10 +207,10 @@ class FlowLoader: await flow_path.parent.mkdir(parents=True) flow_dict = flow.model_dump(by_alias=True, exclude_none=True) - async with aiofiles.open(flow_path, mode="w", encoding="utf-8") as f: + with open(flow_path, mode="w", encoding="utf-8") as f: yaml.add_representer(str, yaml_str_presenter) yaml.add_representer(EdgeType, yaml_enum_presenter) - await f.write( + f.write( yaml.dump( flow_dict, allow_unicode=True, @@ -251,7 +251,9 @@ class FlowLoader: async def _update_db(self, app_id: str, metadata: AppFlow) -> None: # noqa: C901 """更新数据库""" + import time try: + st = time.time() app_collection = MongoDB.get_collection("app") # 获取当前的flows app_data = await app_collection.find_one({"_id": app_id}) @@ -260,6 +262,8 @@ class FlowLoader: logger.error(err) return app_obj = AppPool.model_validate(app_data) + en = time.time() + logger.error(f"[FlowLoader] MongoDB获取app耗时: {en-st} 秒") flows = app_obj.flows for flow in flows: @@ -267,7 +271,7 @@ class FlowLoader: flows.remove(flow) break flows.append(metadata) - + st = time.time() # 执行更新操作 await app_collection.update_one( filter={ @@ -280,10 +284,15 @@ class FlowLoader: }, upsert=True, ) + en = time.time() + logger.error(f"[FlowLoader] MongoDB更新app耗时: {en-st} 秒") + st = time.time() flow_path = BASE_PATH / app_id / "flow" / f"{metadata.id}.yaml" - async with aiofiles.open(flow_path, "rb") as f: - new_hash = sha256(await f.read()).hexdigest() - + with open(flow_path, "rb") as f: + new_hash = sha256(f.read()).hexdigest() + en = time.time() + logger.error(f"[FlowLoader] 计算flow文件hash耗时: {en-st} 秒") + st = time.time() key = f"hashes.flow/{metadata.id}.yaml" await app_collection.aggregate( [ @@ -292,10 +301,11 @@ class FlowLoader: "input": "$$ROOT", "value": new_hash}}}, ], ) + en = time.time() + logger.error(f"[FlowLoader] MongoDB更新flow文件hash耗时: {en-st} 秒") except Exception: logger.exception("[FlowLoader] 更新 MongoDB 失败") - import time st = time.time() await VectorManager.delete_vectors( vector_type=VectorPoolType.FLOW, @@ -306,7 +316,10 @@ class FlowLoader: # 不抛出异常,继续执行后续操作 # 进行向量化 + st = time.time() service_embedding = await Embedding.get_embedding([metadata.description]) + en = time.time() + logger.error(f"[FlowLoader] 获取flow描述向量耗时: {en-st} 秒") st = time.time() flow_pool_vector_entity = FlowPoolVector( id=metadata.id, @@ -326,10 +339,10 @@ class FlowLoader: await subflow_path.parent.mkdir(parents=True, exist_ok=True) flow_dict = flow.model_dump(by_alias=True, exclude_none=True) - async with aiofiles.open(subflow_path, mode="w", encoding="utf-8") as f: + with open(subflow_path, mode="w", encoding="utf-8") as f: yaml.add_representer(str, yaml_str_presenter) yaml.add_representer(EdgeType, yaml_enum_presenter) - await f.write( + f.write( yaml.dump( flow_dict, allow_unicode=True, @@ -360,8 +373,8 @@ class FlowLoader: return None try: - async with aiofiles.open(subflow_path, mode="r", encoding="utf-8") as f: - content = await f.read() + with open(subflow_path, mode="r", encoding="utf-8") as f: + content = f.read() flow_dict = yaml.safe_load(content) return Flow(**flow_dict) except Exception: diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index be38141659bbaf5d5ef80456729bf8064bf230b6..f211ac2b7cf702976ad881532f2c7fa824723e6c 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -74,10 +74,8 @@ class MCPLoader(metaclass=SingletonMeta): err = f"MCP配置文件不存在: {config_path}" logger.error(err) raise FileNotFoundError(err) - - f = await config_path.open("r", encoding="utf-8") - f_content = json.loads(await f.read()) - await f.aclose() + with open(config_path, encoding="utf-8") as f: + f_content = json.loads(f.read()) return MCPServerConfig.model_validate(f_content) @@ -111,11 +109,10 @@ class MCPLoader(metaclass=SingletonMeta): # 重新保存config template_config = MCP_PATH / "template" / mcp_id / "config.json" - f = await template_config.open("w+", encoding="utf-8") - config_data = config.model_dump( - by_alias=True, exclude_none=True) - await f.write(json.dumps(config_data, indent=4, ensure_ascii=False)) - await f.aclose() + with open(template_config, "w+", encoding="utf-8") as f: + config_data = config.model_dump( + by_alias=True, exclude_none=True) + f.write(json.dumps(config_data, indent=4, ensure_ascii=False)) else: logger.info(f"[Installer] SSE/StreamableHTTP方式的MCP模板,无需安装: {mcp_id}") # noqa: T201 @@ -348,10 +345,9 @@ class MCPLoader(metaclass=SingletonMeta): config_path = MCP_PATH / "template" / mcp_id / "config.json" await Path.mkdir(config_path.parent, parents=True, exist_ok=True) - f = await config_path.open("w+", encoding="utf-8") - config_dict = config.model_dump(by_alias=True, exclude_none=True) - await f.write(json.dumps(config_dict, indent=4, ensure_ascii=False)) - await f.aclose() + with open(config_path, "w+", encoding="utf-8") as f: + config_dict = config.model_dump(by_alias=True, exclude_none=True) + f.write(json.dumps(config_dict, indent=4, ensure_ascii=False)) @staticmethod async def get_icon(mcp_id: str) -> str: @@ -366,9 +362,8 @@ class MCPLoader(metaclass=SingletonMeta): if not await icon_path.exists(): logger.warning("[MCPLoader] MCP模板图标不存在: %s", mcp_id) return "" - f = await icon_path.open("rb") - icon = await f.read() - await f.aclose() + with open(icon_path, "rb") as f: + icon = f.read() header = "data:image/png;base64," return header + base64.b64encode(icon).decode("utf-8") @@ -385,9 +380,8 @@ class MCPLoader(metaclass=SingletonMeta): err = f"MCP模板配置文件不存在: {mcp_id}" logger.error(err) raise FileNotFoundError(err) - f = await config_path.open("r", encoding="utf-8") - config = json.loads(await f.read()) - await f.aclose() + with open(config_path, encoding="utf-8") as f: + config = json.loads(f.read()) return MCPServerConfig.model_validate(config) @staticmethod @@ -456,15 +450,14 @@ class MCPLoader(metaclass=SingletonMeta): "--directory", str(user_path)+'/project'] + mcp_config.config.args user_config_path = user_path / "config.json" # 更新用户配置 - f = await user_config_path.open("w", encoding="utf-8", errors="ignore") - await f.write( - json.dumps( - mcp_config.model_dump(by_alias=True, exclude_none=True), - indent=4, - ensure_ascii=False, + with open(user_config_path, "w+", encoding="utf-8") as f: + f.write( + json.dumps( + mcp_config.model_dump(by_alias=True, exclude_none=True), + indent=4, + ensure_ascii=False, + ) ) - ) - await f.aclose() # 更新数据库 mcp_collection = MongoDB.get_collection("mcp") await mcp_collection.update_one( diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 91528d4d4a0d8fb56788e2628b2debc20ebce613..b99e4629ddd0fe53048b99a4bf901fb2ab93b321 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -30,10 +30,8 @@ class OpenAPILoader: if not await yaml_path.exists(): msg = f"File not found: {yaml_path}" raise FileNotFoundError(msg) - - f = await yaml_path.open(mode="r") - spec = yaml.safe_load(await f.read()) - await f.aclose() + with open(yaml_path, mode="r", encoding="utf-8") as f: + spec = yaml.safe_load(f) return reduce_openapi_spec(spec) diff --git a/apps/services/service.py b/apps/services/service.py index 4e501a65d64f4987a635fc64160e2ef309d841ba..338c1b643d518e7a9628c4907b2afb4abcc4a13a 100644 --- a/apps/services/service.py +++ b/apps/services/service.py @@ -40,7 +40,8 @@ class ServiceCenterManager: page_size: int, ) -> tuple[list[ServiceCardItem], int]: """获取所有服务列表""" - filters = ServiceCenterManager._build_filters({}, search_type, keyword) if keyword else {} + filters = ServiceCenterManager._build_filters( + {}, search_type, keyword) if keyword else {} service_pools, total_count = await ServiceCenterManager._search_service(filters, page, page_size) fav_service_ids = await ServiceCenterManager._get_favorite_service_ids_by_user(user_sub) services = [ @@ -70,7 +71,8 @@ class ServiceCenterManager: return [], 0 keyword = user_sub base_filter = {"author": user_sub} - filters = ServiceCenterManager._build_filters(base_filter, search_type, keyword) if keyword else base_filter + filters = ServiceCenterManager._build_filters( + base_filter, search_type, keyword) if keyword else base_filter service_pools, total_count = await ServiceCenterManager._search_service(filters, page, page_size) fav_service_ids = await ServiceCenterManager._get_favorite_service_ids_by_user(user_sub) services = [ @@ -97,7 +99,8 @@ class ServiceCenterManager: """获取用户收藏的服务""" fav_service_ids = await ServiceCenterManager._get_favorite_service_ids_by_user(user_sub) base_filter = {"_id": {"$in": fav_service_ids}} - filters = ServiceCenterManager._build_filters(base_filter, search_type, keyword) if keyword else base_filter + filters = ServiceCenterManager._build_filters( + base_filter, search_type, keyword) if keyword else base_filter service_pools, total_count = await ServiceCenterManager._search_service(filters, page, page_size) services = [ ServiceCardItem( @@ -237,10 +240,11 @@ class ServiceCenterManager: msg = "Permission denied" raise InstancePermissionError(msg) service_path = ( - Path(Config().get_config().deploy.data_dir) / "semantics" / "service" / service_id / "openapi" / "api.yaml" + Path(Config().get_config().deploy.data_dir) / "semantics" / + "service" / service_id / "openapi" / "api.yaml" ) - async with await service_path.open() as f: - service_data = yaml.safe_load(await f.read()) + with open(service_path, "r", encoding="utf-8") as f: + service_data = yaml.safe_load(f) return service_pool_store.name, service_data @staticmethod @@ -267,10 +271,11 @@ class ServiceCenterManager: raise ServiceIDError(msg) metadata_path = ( - Path(Config().get_config().deploy.data_dir) / "semantics" / "service" / service_id / "metadata.yaml" + Path(Config().get_config().deploy.data_dir) / + "semantics" / "service" / service_id / "metadata.yaml" ) async with await metadata_path.open() as f: - metadata_data = yaml.safe_load(await f.read()) + metadata_data = yaml.safe_load(f.read()) return ServiceMetadata.model_validate(metadata_data) @staticmethod @@ -350,9 +355,11 @@ class ServiceCenterManager: skip = (page - 1) * page_size db_services = await service_collection.find(search_conditions).skip(skip).limit(page_size).to_list() if not db_services and total > 0: - logger.warning("[ServiceCenterManager] 没有找到符合条件的服务: %s", search_conditions) + logger.warning( + "[ServiceCenterManager] 没有找到符合条件的服务: %s", search_conditions) return [], -1 - service_pools = [ServicePool.model_validate(db_service) for db_service in db_services] + service_pools = [ServicePool.model_validate( + db_service) for db_service in db_services] return service_pools, total @staticmethod