diff --git a/apps/common/queue.py b/apps/common/queue.py index 3b28be4d726d4583a8f142590c1d1edf4d4883db..e2d4cbe22d10ad988d49c0ac7c3da391177f4eaf 100644 --- a/apps/common/queue.py +++ b/apps/common/queue.py @@ -65,22 +65,20 @@ class MessageQueue: if not history_ids: # 如果new_history为空,则说明是第一次执行,创建一个空值 flow = MessageFlow( - plugin_id=tcb.flow_state.plugin_id, - flow_id=tcb.flow_state.name, - step_name="start", - step_status=StepStatus.RUNNING, - step_progress="", + appId=tcb.flow_state.plugin_id, + flowId=tcb.flow_state.name, + stepId="start", + stepStatus=StepStatus.RUNNING, ) else: # 如果new_history不为空,则说明是继续执行,使用最后一个FlowHistory history = tcb.flow_context[tcb.flow_state.step_name] flow = MessageFlow( - plugin_id=history.plugin_id, - flow_id=history.flow_id, - step_name=history.step_name, - step_status=history.status, - step_progress=history.step_order, + appId=history.plugin_id, + flowId=history.flow_id, + stepId=history.step_name, + stepStatus=history.status, ) else: flow = None diff --git a/apps/entities/collection.py b/apps/entities/collection.py index 9b1e08ed784e9abc3015d1db6ee30ce05515a44a..3f727790bad7424619dff82394c0df41c340a3f0 100644 --- a/apps/entities/collection.py +++ b/apps/entities/collection.py @@ -62,6 +62,8 @@ class Conversation(BaseModel): user_sub: str title: str = NEW_CHAT created_at: float = Field(default_factory=lambda: round(datetime.now(tz=timezone.utc).timestamp(), 3)) + is_debug: bool = False + app_id: str tasks: list[str] = [] unused_docs: list[str] = [] record_groups: list[str] = [] diff --git a/apps/entities/enum_var.py b/apps/entities/enum_var.py index bb3cdfebd1f1bc57689f453fa17c7aac0f051743..f87ee9fa31dcbca129aed61156d6ad71780ca2ce 100644 --- a/apps/entities/enum_var.py +++ b/apps/entities/enum_var.py @@ -87,3 +87,11 @@ class EdgeType(str, Enum): NORMAL = "normal" LOOP = "loop" + + +class SaveType(str, Enum): + """检查类型""" + + APP = "app" + SERVICE = "service" + FLOW = "flow" diff --git a/apps/entities/flow.py b/apps/entities/flow.py index 1920fbb27d6c8302d73dc9de0c4fecca29aaca36..bf7ab7533cb441288ddb500c36a3caa3c648f7de 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -8,17 +8,18 @@ from pydantic import BaseModel, Field from apps.entities.enum_var import ( AppPermissionType, - MetadataType, EdgeType, + MetadataType, ) -class NodePos(BaseModel): +class StepPos(BaseModel): """节点在画布上的位置""" x: int = Field(description="节点在画布上的X坐标") y: int = Field(description="节点在画布上的Y坐标") + class Edge(BaseModel): """Flow中Edge的数据""" @@ -28,14 +29,15 @@ class Edge(BaseModel): edge_type: Optional[EdgeType] = Field(description="边的类型", alias="type") -class Node(BaseModel): - """Flow中Node的数据""" +class Step(BaseModel): + """Flow中Step的数据""" - id: str = Field(description="节点的ID;与NodePool中的ID对应") - name: str = Field(description="节点名称") - description: str = Field(description="节点描述") - pos: NodePos = Field(description="节点在画布上的位置", default=NodePos(x=0, y=0)) - params: dict[str, Any] = Field(description="用户手动指定的节点参数", default={}) + id: str = Field(description="Step的ID") + node: str = Field(description="Step的Node ID") + name: str = Field(description="Step的名称") + description: str = Field(description="Step的描述") + pos: StepPos = Field(description="Step在画布上的位置", default=StepPos(x=0, y=0)) + params: dict[str, Any] = Field(description="用户手动指定的Node参数", default={}) class NextFlow(BaseModel): @@ -54,11 +56,11 @@ class FlowError(BaseModel): class Flow(BaseModel): """Flow(工作流)的数据格式""" - + name: str = Field(description="Flow的名称") description: str = Field(description="Flow的描述") on_error: FlowError = FlowError(use_llm=True) - nodes: list[Node] = Field(description="节点列表", default=[]) + steps: list[Step] = Field(description="节点列表", default=[]) edges: list[Edge] = Field(description="边列表", default=[]) next_flow: Optional[list[NextFlow]] = None diff --git a/apps/entities/message.py b/apps/entities/message.py index 94e626a869bf11afc47cf25ed34bc3d53e310bb2..9988f6d1ab9f008301bc079decbe1afb9f03e4ef 100644 --- a/apps/entities/message.py +++ b/apps/entities/message.py @@ -21,11 +21,10 @@ class HeartbeatData(BaseModel): class MessageFlow(BaseModel): """消息中有关Flow信息的部分""" - plugin_id: str = Field(description="插件ID") - flow_id: str = Field(description="Flow ID") - step_name: str = Field(description="当前步骤名称") - step_status: StepStatus = Field(description="当前步骤状态") - step_progress: str = Field(description="当前步骤进度,例如1/4") + app_id: str = Field(description="插件ID", alias="appId") + flow_id: str = Field(description="Flow ID", alias="flowId") + step_id: str = Field(description="当前步骤ID", alias="stepId") + step_status: StepStatus = Field(description="当前步骤状态", alias="stepStatus") class MessageMetadata(RecordMetadata): @@ -37,17 +36,17 @@ class MessageMetadata(RecordMetadata): class InitContentFeature(BaseModel): """init消息的feature""" - max_tokens: int = Field(description="最大生成token数", ge=0) - context_num: int = Field(description="上下文消息数量", le=10, ge=0) - enable_feedback: bool = Field(description="是否启用反馈") - enable_regenerate: bool = Field(description="是否启用重新生成") + max_tokens: int = Field(description="最大生成token数", ge=0, alias="maxTokens") + context_num: int = Field(description="上下文消息数量", le=10, ge=0, alias="contextNum") + enable_feedback: bool = Field(description="是否启用反馈", alias="enableFeedback") + enable_regenerate: bool = Field(description="是否启用重新生成", alias="enableRegenerate") class InitContent(BaseModel): """init消息的content""" feature: InitContentFeature = Field(description="问答功能开关") - created_at: float = Field(description="创建时间") + created_at: float = Field(description="创建时间", alias="createdAt") class TextAddContent(BaseModel): @@ -59,18 +58,18 @@ class TextAddContent(BaseModel): class DocumentAddContent(BaseModel): """document.add消息的content""" - document_id: str = Field(min_length=36, max_length=36, description="文档UUID") - document_name: str = Field(description="文档名称") - document_type: str = Field(description="文档MIME类型") - document_size: float = Field(ge=0, description="文档大小,单位是KB,保留两位小数") + document_id: str = Field(min_length=36, max_length=36, description="文档UUID", alias="documentId") + document_name: str = Field(description="文档名称", alias="documentName") + document_type: str = Field(description="文档MIME类型", alias="documentType") + document_size: float = Field(ge=0, description="文档大小,单位是KB,保留两位小数", alias="documentSize") class SuggestContent(BaseModel): """suggest消息的content""" - plugin_id: str = Field(description="插件ID") - flow_id: str = Field(description="Flow ID") - flow_description: str = Field(description="Flow描述") + app_id: str = Field(description="插件ID", alias="appId") + flow_id: str = Field(description="Flow ID", alias="flowId") + flow_description: str = Field(description="Flow描述", alias="flowDescription") question: str = Field(description="用户问题") @@ -84,14 +83,14 @@ class FlowStartContent(BaseModel): class StepInputContent(BaseModel): """step.input消息的content""" - call_type: str = Field(description="Call类型") + call_type: str = Field(description="Call类型", alias="callType") params: dict[str, Any] = Field(description="Step最后输入的参数") class StepOutputContent(BaseModel): """step.output消息的content""" - call_type: str = Field(description="Call类型") + call_type: str = Field(description="Call类型", alias="callType") message: str = Field(description="LLM大模型输出的自然语言文本") output: dict[str, Any] = Field(description="Step输出的结构化数据") @@ -107,9 +106,9 @@ class MessageBase(HeartbeatData): """基础消息事件结构""" id: str = Field(min_length=36, max_length=36) - group_id: str = Field(min_length=36, max_length=36) - conversation_id: str = Field(min_length=36, max_length=36) - task_id: str = Field(min_length=36, max_length=36) + group_id: str = Field(min_length=36, max_length=36, alias="groupId") + conversation_id: str = Field(min_length=36, max_length=36, alias="conversationId") + task_id: str = Field(min_length=36, max_length=36, alias="taskId") flow: Optional[MessageFlow] = None content: dict[str, Any] = {} metadata: MessageMetadata diff --git a/apps/entities/pool.py b/apps/entities/pool.py index e5386276bb47904136f227e9e2d0712c761f0c5f..1d06e674e5a158b256165758f5e488df9d607d62 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -2,10 +2,10 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +from datetime import datetime, timezone from typing import Any, Optional -from pydantic import BaseModel, Field -from datetime import datetime, timezone +from pydantic import BaseModel, Field from apps.entities.enum_var import CallType @@ -49,7 +49,9 @@ class NodePool(PoolBase): 2. Python Node的路径格式样例:“tune::call.tune.CheckSystem” """ + id: str = Field(description="Node的ID") type: CallType = Field(description="Call的类型") + service: str = Field(description="服务名称") meta_call: Optional[str] = Field(description="基类Call的ID", default=None) input_schema: dict[str, Any] = Field(description="输入参数的schema", default={}) output_schema: dict[str, Any] = Field(description="输出参数的schema", default={}) diff --git a/apps/entities/record.py b/apps/entities/record.py index e720bb9cbdfdef4601e6712a8c182a47d75b67f7..7fc2c105bc2c21075a45891ce8b13599c842d31c 100644 --- a/apps/entities/record.py +++ b/apps/entities/record.py @@ -30,9 +30,8 @@ class RecordDocument(Document): class RecordFlowStep(BaseModel): """Record表子项:flow的单步数据结构""" - step_name: str - step_status: StepStatus - step_order: str + step_name: str = Field(alias="stepName") + step_status: StepStatus = Field(alias="stepStatus") input: dict[str, Any] output: dict[str, Any] @@ -41,10 +40,9 @@ class RecordFlow(BaseModel): """Flow的执行信息""" id: str - record_id: str - plugin_id: str - flow_id: str - step_num: int + record_id: str = Field(alias="recordId") + flow_id: str = Field(alias="flowId") + step_num: int = Field(alias="stepNum") steps: list[RecordFlowStep] @@ -52,11 +50,12 @@ class RecordData(BaseModel): """GET /api/record/{conversation_id} Result内元素数据结构""" id: str - group_id: str - conversation_id: str - task_id: str + app_id: str = Field(alias="appId") + group_id: str = Field(alias="groupId") + conversation_id: str = Field(alias="conversationId") + task_id: str = Field(alias="taskId") document: list[RecordDocument] = [] flow: Optional[RecordFlow] = None content: RecordContent metadata: RecordMetadata - created_at: float + created_at: float = Field(alias="createdAt") diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index 515a6f6a39c85f7503fbd8554028312a665d3d52..50926e73d4b44079be07fe9e851bc8496bdbdf47 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -301,6 +301,3 @@ class NodeParameterPutMsg(BaseModel): class NodeParameterPutRsp(ResponseData): """PUT /api/flow/node/parameter 返回数据结构""" result:NodeParameterPutMsg - - - \ No newline at end of file diff --git a/apps/entities/task.py b/apps/entities/task.py index a3a1ae184ba643667830f1e9eeb090904a1a33d8..62e8c34f93c00ab66810cc3dc54f01cd566f10bf 100644 --- a/apps/entities/task.py +++ b/apps/entities/task.py @@ -21,9 +21,7 @@ class FlowHistory(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4()), alias="_id") task_id: str = Field(description="任务ID") flow_id: str = Field(description="FlowID") - plugin_id: str = Field(description="插件ID") step_name: str = Field(description="当前步骤名称") - step_order: str = Field(description="当前步骤进度") status: StepStatus = Field(description="当前步骤状态") input_data: dict[str, Any] = Field(description="当前Step执行的输入", default={}) output_data: dict[str, Any] = Field(description="当前Step执行后的结果", default={}) @@ -39,7 +37,7 @@ class ExecutorState(BaseModel): status: StepStatus = Field(description="执行器状态") # 附加信息 step_name: str = Field(description="当前步骤名称") - plugin_id: str = Field(description="插件ID") + app_id: str = Field(description="应用ID") # 运行时数据 thought: str = Field(description="大模型的思考内容", default="") slot_data: dict[str, Any] = Field(description="待使用的参数", default={}) diff --git a/apps/manager/session.py b/apps/manager/session.py index 4098203f9871b0b2653dfd9d9738502a0d561eb2..cee4c08e80d03385eefcd9a592d8c128a1119c21 100644 --- a/apps/manager/session.py +++ b/apps/manager/session.py @@ -78,13 +78,8 @@ class SessionManager: except Exception as e: LOGGER.error(f"Read session error: {e}") - # if not ip: - # session_id = SessionManager.create_session(session_ip) - # return session_id - # elif ip != session_ip: - # session_id = SessionManager.create_session(session_ip) - # return session_id - # else: + if not ip or ip != session_ip: + return await SessionManager.create_session(session_ip) return session_id @staticmethod diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index c1278e3c3ee67ad8b3564aff31cc0b51c55be076..7eae3b3b26f712c8c94f2408a2531f5f3a0afb1c 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -1,343 +1,24 @@ -"""数据池 +"""配置池,包含语义接口、应用等的载入和保存 Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -import hashlib -import hmac -import json -import pickle -from threading import Lock -from typing import Any, ClassVar, Optional -import chromadb -from langchain_community.agent_toolkits.openapi.spec import ReducedOpenAPISpec -from rank_bm25 import BM25Okapi -from sqlalchemy import Engine, create_engine -from sqlalchemy.orm import sessionmaker +class Pool: + """配置池""" -from apps.common.config import config -from apps.common.singleton import Singleton -from apps.constants import LOGGER -from apps.entities.flow import Flow -from apps.entities.plugin import PluginData -from apps.scheduler.pool.entities import Base, CallItem, FlowItem, PluginItem -from apps.scheduler.vector import DocumentWrapper, VectorDB + @classmethod + def load(cls) -> None: + """加载配置池""" + pass -class Pool(metaclass=Singleton): - """数据池""" + @classmethod + def save(cls, *, is_deletion: bool = False) -> None: + """保存配置池""" + pass - write_lock: Lock = Lock() - relation_db: Engine - flow_collection: chromadb.Collection - plugin_collection: chromadb.Collection - flow_pool: ClassVar[dict[str, Any]] = {} - call_pool: ClassVar[dict[str, Any]] = {} - - def __init__(self) -> None: - """初始化内存中的SQLite数据库和内存中的ChromaDB""" - with self.write_lock: - # Init SQLite - self.relation_db = create_engine("sqlite:///:memory:") - Base.metadata.create_all(self.relation_db) - - # Init ChromaDB - self.create_collection() - - @staticmethod - def serialize_data(origin_data) -> tuple[bytes, str]: # noqa: ANN001 - """使用Pickle序列化数据 - - 为保证数据不被篡改,使用HMAC对数据进行签名 - """ - data = pickle.dumps(origin_data) - hmac_obj = hmac.new(key=bytes.fromhex(config["PICKLE_KEY"]), msg=data, digestmod=hashlib.sha256) - signature = hmac_obj.hexdigest() - return data, signature - - @staticmethod - def deserialize_data(data: bytes, signature: str): # noqa: ANN205 - """反序列化数据 - - 使用HMAC对数据进行签名验证 - """ - hmac_obj = hmac.new(key=bytes.fromhex(config["PICKLE_KEY"]), msg=data, digestmod=hashlib.sha256) - current_signature = hmac_obj.hexdigest() - if current_signature != signature: - err = "Pickle data has been modified!" - raise AssertionError(err) - - return pickle.loads(data) # noqa: S301 - - def create_collection(self) -> None: - """创建ChromaDB的Collection""" - flow_collection = VectorDB.get_collection("flow") - if flow_collection is None: - err = "Create flow collection failed!" - raise RuntimeError(err) - self.flow_collection = flow_collection - - plugin_collection = VectorDB.get_collection("plugin") - if plugin_collection is None: - err = "Create plugin collection failed!" - raise RuntimeError(err) - self.plugin_collection = plugin_collection - - def add_plugin(self, plugin_id: str, metadata: dict, spec: Optional[ReducedOpenAPISpec] = None) -> None: - """载入单个Plugin""" - spec_data, signature = self.serialize_data(spec) - - auth = json.dumps(metadata["auth"]) if "auth" in metadata else "{}" - - plugin = PluginItem( - id=plugin_id, - show_name=metadata["name"], - description=metadata["description"], - auth=auth, - spec=spec_data, - signature=signature, - ) - with self.write_lock: - try: - with sessionmaker(bind=self.relation_db)() as session: - session.add(plugin) - session.commit() - except Exception as e: - LOGGER.error(f"Import plugin failed: {e!s}") - - doc = DocumentWrapper( - data=metadata["description"], - id=plugin_id, - ) - - with self.write_lock: - VectorDB.add_docs(self.plugin_collection, [doc]) - - def add_flows(self, plugin: str, flows: list[dict[str, Any]]) -> None: - """载入单个Flow""" - docs = [] - flow_rows = [] - - # 此处,flow在向量数据库中名字加上了plugin前缀,防止ID冲突 - for item in flows: - current_row = FlowItem( - plugin=plugin, - name=item["id"], - description=item["description"], - ) - flow_rows.append(current_row) - - doc = DocumentWrapper( - id=plugin + "/" + item["id"], - data=item["description"], - metadata={ - "plugin": plugin, - }, - ) - docs.append(doc) - with self.write_lock: - self.flow_pool[plugin + "/" + item["id"]] = item["data"] - - with self.write_lock: - try: - with sessionmaker(bind=self.relation_db)() as session: - session.add_all(flow_rows) - session.commit() - except Exception as e: - LOGGER.error(f"Import flow failed: {e!s}") - VectorDB.add_docs(self.flow_collection, docs) - - def add_calls(self, plugin: Optional[str], calls: list[Any]) -> None: - """载入单个Call""" - call_metadata = [] - - for item in calls: - current_metadata = CallItem( - plugin=plugin, - name=str(item.name), - description=str(item.description), - ) - call_metadata.append(current_metadata) - - with self.write_lock: - call_prefix = "" - if plugin is not None: - call_prefix += plugin + "/" - self.call_pool[call_prefix + str(item.name)] = item - - with self.write_lock, sessionmaker(bind=self.relation_db)() as session: - try: - session.add_all(call_metadata) - session.commit() - except Exception as e: - LOGGER.error(f"Import plugin {plugin} call failed: {e!s}") - - def clean_db(self) -> None: - """清空SQLite和ChromaDB""" - try: - with self.write_lock: - Base.metadata.drop_all(bind=self.relation_db) - Base.metadata.create_all(bind=self.relation_db) - - VectorDB.delete_collection("flow") - VectorDB.delete_collection("plugin") - self.create_collection() - - Pool.flow_pool = {} - Pool.call_pool = {} - except Exception as e: - LOGGER.error(f"Clean DB failed: {e!s}") - - - def get_plugin_list(self) -> list[PluginData]: - """从数据库中获取所有插件信息""" - try: - with sessionmaker(bind=self.relation_db)() as session: - result = session.query(PluginItem).all() - except Exception as e: - LOGGER.error(f"Get Plugin from DB failed: {e!s}") - return [] - - plugin_list: list[PluginData] = [PluginData( - id=str(item.id), - name=str(item.show_name), - description=str(item.description), - auth=json.loads(str(item.auth)), - ) - for item in result - ] - - return plugin_list - - def get_flow(self, name: str, plugin: str) -> tuple[Optional[FlowItem], Optional[Flow]]: - """查找Flow名对应的元数据和Step""" - try: - with sessionmaker(bind=self.relation_db)() as session: - result = session.query(FlowItem).filter_by(name=name, plugin=plugin).first() - except Exception as e: - LOGGER.error(f"Get Flow from DB failed: {e!s}") - return None, None - - return result, self.flow_pool.get(plugin + "/" + name, None) - - - def get_plugin(self, name: str) -> Optional[PluginItem]: - """查找Plugin名对应的元数据""" - try: - with sessionmaker(bind=self.relation_db)() as session: - result = session.query(PluginItem).filter_by(id=name).first() - except Exception as e: - LOGGER.error(f"Get Plugin from DB failed: {e!s}") - return None - - return result - - def get_k_plugins(self, question: str, top_k: int = 5) -> list[PluginItem]: - """查找k个最符合条件的Plugin,返回数据""" - result = self.plugin_collection.query( - query_texts=[question], - n_results=top_k, - ) - - ids = result.get("ids", None) - if ids is None: - LOGGER.error(f"Vector search failed: {result}") - return [] - - result_list = [] - with sessionmaker(bind=self.relation_db)() as session: - for current_id in ids[0]: - try: - result_item = session.query(PluginItem).filter_by(name=current_id).first() - if result_item is None: - continue - result_list.append(result_item) - except Exception as e: - LOGGER.error(f"Get data from VectorDB failed: {e!s}") - - return result_list - - def get_k_flows(self, question: str, plugin_list: list[str], top_k: int = 5) -> list[FlowItem]: - """查找k个最符合条件的Flow,返回数据""" - result = self.flow_collection.query( - query_texts=[question], - n_results=top_k * 4, - where=Pool._construct_vector_query(plugin_list), - ) - - ids = result.get("ids", None) - docs = result.get("documents", None) - if ids is None or docs is None: - LOGGER.error(f"Vector search failed: {result}") - return [] - - # 使用bm25s进行重排,此处list有序;考虑到文字可能很短,因此直接用字符作为token - corpus = [list(item) for item in docs[0]] - question_tokens = list(question) - retriever = BM25Okapi(corpus) - corpus_ids = list(range(len(corpus))) - results = retriever.get_top_n(question_tokens, corpus_ids, top_k) - retrieved_ids = [ids[0][i] for i in results] - - result_list = [] - with sessionmaker(bind=self.relation_db)() as session: - for current_id in retrieved_ids: - plugin_name, flow_name = current_id.split("/") - try: - result_item = session.query(FlowItem).filter_by(name=flow_name, plugin=plugin_name).first() - if result_item is None: - continue - result_list.append(result_item) - except Exception as e: - LOGGER.error(f"Get data from VectorDB failed: {e!s}") - - return result_list - - @staticmethod - def _construct_vector_query(plugin_list: list[str]) -> dict[str, Any]: - constraint = {} - if len(plugin_list) == 0: - return {} - if len(plugin_list) == 1: - constraint["plugin"] = { - "$eq": plugin_list[0], - } - else: - constraint["$or"] = [] - for plugin in plugin_list: - constraint["$or"].append({ - "plugin": { - "$eq": plugin, - }, - }) - return constraint - - - def get_call(self, name: str, plugin: str) -> tuple[Optional[CallItem], Optional[Any]]: - """从Call Pool里面拿出对应的Call类 - - :param name: - :param plugin: - :return: - """ - if plugin: - try: - with sessionmaker(bind=self.relation_db)() as session: - call_item = session.query(CallItem).filter_by(name=name).filter_by(plugin=plugin).first() - if call_item: - return call_item, self.call_pool.get(name, None) - except Exception as e: - LOGGER.error(f"Get Call from DB failed: {e!s}") - return None, None - - try: - with sessionmaker(bind=self.relation_db)() as session: - call_item = session.query(CallItem).filter_by(name=name).filter_by(plugin=None).first() - if call_item: - return call_item, self.call_pool.get(name, None) - except Exception as e: - LOGGER.error(f"Get Call from DB failed: {e!s}") - return None, None - - return None, None + @classmethod + def get_flow(cls, app_id: str, flow_id: str) -> None: + """获取Flow""" + pass diff --git a/apps/scheduler/pool/util.py b/apps/scheduler/pool/util.py index 1beb7930828e9254fe0daa97cf2b27af2d369f05..11a75c538f6e127afd847adb520e239e34bb1166 100644 --- a/apps/scheduler/pool/util.py +++ b/apps/scheduler/pool/util.py @@ -2,14 +2,14 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -import hashlib +from hashlib import sha256, shake_128 -def get_bytes_hash(s: bytes) -> str: +def get_short_hash(s: bytes) -> str: """获取字节串的哈希值""" - return hashlib.sha256(s).hexdigest() + return shake_128(s).hexdigest(8) -def get_str_hash(s: str) -> str: - """获取字符串的哈希值""" - return get_bytes_hash(s.encode("utf-8")) +def get_long_hash(s: bytes) -> str: + """获取字节串的哈希值""" + return sha256(s).hexdigest()