diff --git a/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py b/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py index 0385af8d79ff38e5715a48582890ac8d8cc5cb0f..566fb7a6979d1ceebd9139273019306da9a768b2 100644 --- a/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py +++ b/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py @@ -35,79 +35,93 @@ class MemoryUseBean(CommonBean): def __init__(self, data: dict): super().__init__(data) self._constant_data = struct.unpack(self.CONSTANT_STRUCT, self._data.get(Constant.CONSTANT_BYTES)) + self._ptr = int(self._constant_data[MemoryEnum.PTR.value]) + self._stream_ptr = int(self._constant_data[MemoryEnum.STREAM_PTR.value]) + self._time_ns = ProfilerConfig().get_local_time( + ProfilerConfig().get_timestamp_from_syscnt(self._constant_data[MemoryEnum.TIME_NS.value])) + self._alloc_size = int(self._constant_data[MemoryEnum.ALLOC_SIZE.value]) + self._total_allocated = int(self._constant_data[MemoryEnum.TOTAL_ALLOCATED.value]) + self._total_reserved = int(self._constant_data[MemoryEnum.TOTAL_RESERVED.value]) + self._total_active = int(self._constant_data[MemoryEnum.TOTAL_ACTIVE.value]) + self._device_type = int(self._constant_data[MemoryEnum.DEVICE_TYPE.value]) + self._device_index = int(self._constant_data[MemoryEnum.DEVICE_INDEX.value]) + self._component_type = int(self._constant_data[MemoryEnum.COMPONENT_TYPE.value]) + self._data_type = int(self._constant_data[MemoryEnum.DATA_TYPE.value]) + self._allocator_type = int(self._constant_data[MemoryEnum.ALLOCATOR_TYPE.value]) + self._thread_id = int(self._constant_data[MemoryEnum.THREAD_ID.value]) + self._process_id = int(self._constant_data[MemoryEnum.PROCESS_ID.value]) @property def ptr(self) -> int: - return int(self._constant_data[MemoryEnum.PTR.value]) + return self._ptr @property def stream_ptr(self) -> int: - return int(self._constant_data[MemoryEnum.STREAM_PTR.value]) + return self._stream_ptr @property def time_ns(self) -> int: - time_ns = ProfilerConfig().get_timestamp_from_syscnt(self._constant_data[MemoryEnum.TIME_NS.value]) - return ProfilerConfig().get_local_time(time_ns) + return self._time_ns @property - def alloc_size(self) -> int: - return int(self._constant_data[MemoryEnum.ALLOC_SIZE.value]) / Constant.B_TO_KB + def alloc_size(self) -> float: + return self._alloc_size / Constant.B_TO_KB @property - def alloc_size_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.ALLOC_SIZE.value]) + def alloc_size_for_db(self) -> float: + return self._alloc_size @property - def total_allocated(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ALLOCATED.value]) / Constant.B_TO_MB + def total_allocated(self) -> float: + return self._total_allocated / Constant.B_TO_MB @property def total_allocated_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ALLOCATED.value]) + return self._total_allocated @property - def total_reserved(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_RESERVED.value]) / Constant.B_TO_MB + def total_reserved(self) -> float: + return self._total_reserved / Constant.B_TO_MB @property def total_reserved_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_RESERVED.value]) + return self._total_reserved @property - def total_active(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ACTIVE.value]) / Constant.B_TO_MB + def total_active(self) -> float: + return self._total_active / Constant.B_TO_MB @property def total_active_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ACTIVE.value]) + return self._total_active @property def device_type(self) -> int: - return int(self._constant_data[MemoryEnum.DEVICE_TYPE.value]) + return self._device_type @property def device_index(self) -> int: - return int(self._constant_data[MemoryEnum.DEVICE_INDEX.value]) + return self._device_index @property def component_type(self) -> int: - return int(self._constant_data[MemoryEnum.COMPONENT_TYPE.value]) + return self._component_type @property def data_type(self) -> int: - return int(self._constant_data[MemoryEnum.DATA_TYPE.value]) - + return self._data_type + @property def allocator_type(self) -> int: - return int(self._constant_data[MemoryEnum.ALLOCATOR_TYPE.value]) + return self._allocator_type @property def tid(self) -> int: - return int(self._constant_data[MemoryEnum.THREAD_ID.value]) + return self._thread_id @property def pid(self) -> int: - return int(self._constant_data[MemoryEnum.PROCESS_ID.value]) + return self._process_id def is_npu(self) -> bool: return self.device_type == self.NPU_ID diff --git a/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py b/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py index 34d95e97fd8582de82843c2b2fdade338bce4df4..54279591cc71a682cc21e615e311bab8f17e4b0e 100644 --- a/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py +++ b/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py @@ -64,7 +64,7 @@ class OpMarkBean: @property def name(self) -> str: if self.is_dequeue_start or self.is_dequeue_end: - return "Dequeue@" + str(self._origin_data[self.TLV_TYPE_DICT.get(Constant.NAME)]) + return "Dequeue@" + self._origin_name return "Enqueue" @property diff --git a/torch_npu/profiler/analysis/prof_common_func/_constant.py b/torch_npu/profiler/analysis/prof_common_func/_constant.py index 1a62c54d6f6af19bed262b7b5765d192e3fd3c0d..1ab47b23fad351c2eef30ee11cbb158600b0340b 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_constant.py +++ b/torch_npu/profiler/analysis/prof_common_func/_constant.py @@ -181,6 +181,9 @@ class Constant(object): FAIL = 1 # parser name + TASK_QUEUE_PARSER = "task_queue" + TORCH_OP_PARSER = "torch_op" + PYTHON_TRACE_PRE_PARSER = "python_trace_prepare" TRACE_PRE_PARSER = "trace_prepare" TREE_BUILD_PARSER = "build_tree" CANN_EXPORT_PARSER = "export" @@ -316,7 +319,7 @@ class DbConstant(): TABLE_OPERATOR_MEMORY = "OP_MEMORY" TABLE_NPU_OP_MEM = "NPU_OP_MEM" TABLE_META_DATA = "META_DATA" - + # rank device map table name TABLE_RANK_DEVICE_MAP = "RANK_DEVICE_MAP" # host info diff --git a/torch_npu/profiler/analysis/prof_common_func/_db_manager.py b/torch_npu/profiler/analysis/prof_common_func/_db_manager.py index 593a44dd781ddbfa5757d7ee121d567b6013d133..a790080841bad46d78e0ce5da1fab30bfbf4a68a 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_db_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_db_manager.py @@ -1,6 +1,6 @@ import os import sqlite3 - +import time from ._constant import Constant, print_warn_msg, print_error_msg from ._file_manager import FileManager from ._singleton import Singleton @@ -33,7 +33,7 @@ class DbManager: def create_connect_db(cls, db_path: str) -> tuple: """ create and connect database - """ + """ if os.path.exists(db_path): FileManager.check_db_file_vaild(db_path) try: @@ -41,7 +41,7 @@ class DbManager: conn = sqlite3.connect(db_path, timeout=2147483, check_same_thread=False) except sqlite3.Error as err: return EmptyClass("emoty conn"), EmptyClass("empty curs") - + try: curs = conn.cursor() os.chmod(db_path, Constant.FILE_AUTHORITY) @@ -60,7 +60,7 @@ class DbManager: cur.close() except sqlite3.Error as err: raise RuntimeError(f"Falied to close db connection cursor") from err - + try: conn.close() except sqlite3.Error as err: @@ -121,6 +121,7 @@ class DbManager: """ insert data into certain table """ + # start_time = time.perf_counter_ns() index = 0 if not data: return @@ -130,6 +131,8 @@ class DbManager: if not cls.executemany_sql(conn, sql, data[index:index + cls.INSERT_SIZE]): raise RuntimeError("Failed to insert data into profiler db file") index += cls.INSERT_SIZE + # cost_time = (time.perf_counter_ns() - start_time) / (1000.0 * 1000.0) + # print_warn_msg(f"insert data into table {table_name} data len {len(data)} finish. cost time {cost_time} ms.") @classmethod def fetch_all_data(cls, cur: sqlite3.Cursor, sql: str) -> list: @@ -151,7 +154,7 @@ class DbManager: except sqlite3.Error as err: print_error_msg("SQLite Error: %s" % " ".join(err.args)) return [] - + @classmethod def fetch_one_data(cls, cur: sqlite3.Cursor, sql: str) -> list: """ diff --git a/torch_npu/profiler/analysis/prof_common_func/_log.py b/torch_npu/profiler/analysis/prof_common_func/_log.py index 0fecde48c41b465cf04eff26282a02911655c032..7054fb13eec798f993d5913e01be9c746b4dcf90 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_log.py +++ b/torch_npu/profiler/analysis/prof_common_func/_log.py @@ -23,7 +23,7 @@ class ProfilerLogger: BACKUP_COUNT: Number of backup files to keep """ - LOG_FORMAT = "[%(asctime)s] [%(levelname)s] [%(name)s:%(lineno)d] %(message)s" + LOG_FORMAT = "[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s:%(lineno)d] %(message)s" DATE_FORMAT = "%Y-%m-%d-%H:%M:%S" DEFAULT_LOGGER_NAME = "AscendProfiler" DEFAULT_LOG_LEVEL = logging.INFO diff --git a/torch_npu/profiler/analysis/prof_common_func/_trace_event_manager.py b/torch_npu/profiler/analysis/prof_common_func/_trace_event_manager.py index fcbff9cc0c34afdbc0c43a6f02b4f414327a019b..dd10cafaa74642abecd0681054452a8288608d5b 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_trace_event_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_trace_event_manager.py @@ -52,9 +52,9 @@ class TraceEventManager: "tid": event.tid, "ts": convert_ns2us_str(event.ts), "cat": "async_task_queue"} @classmethod - def create_fwd_flow(cls, event: any) -> list: + def create_fwd_flow(cls, fwd_dict: dict) -> list: fwd_list = [] - for fwd_id, node in event.items(): + for fwd_id, node in fwd_dict.items(): if node.get('start') and node.get('end'): if node['start']['tid'] == node['end']['tid']: continue diff --git a/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py b/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py index 06ec8b7d6b03df281ad80ed29961412af94b9733..949ef78c3422202ad5cd12498059e61a3fb60e0f 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py +++ b/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py @@ -8,9 +8,9 @@ __all__ = [] class TreeBuilder: @classmethod - def build_tree(cls, event_list: list, enqueue_list: list) -> TorchOpNode: + def build_tree(cls, event_list: list, enqueue_list: list) -> list: all_node_list = [None] * (len(event_list) + 1) - event_list.extend(enqueue_list) + event_list = event_list + enqueue_list event_list.sort(key=lambda x: x.ts) root_node = TorchOpNode() last_node = root_node diff --git a/torch_npu/profiler/analysis/prof_config/_parser_config.py b/torch_npu/profiler/analysis/prof_config/_parser_config.py index 986ff3bb30dab008bb35b2593142076a7790817d..0789ecd834c9c3e3ac3d0f967127869bf23da4e1 100644 --- a/torch_npu/profiler/analysis/prof_config/_parser_config.py +++ b/torch_npu/profiler/analysis/prof_config/_parser_config.py @@ -17,7 +17,13 @@ from ..prof_common_func._constant import Constant from ..prof_view.cann_parse._cann_analyze import CANNAnalyzeParser from ..prof_view.cann_parse._cann_export import CANNExportParser, CANNTimelineParser from ..prof_view._memory_prepare_parser import MemoryPrepareParser -from ..prof_view.prepare_parse._fwk_pre_parser import TracePreParser, TreeBuildParser +from ..prof_view.prepare_parse._fwk_pre_parser import ( + PythonTracePreParser, + TracePreParser, + TreeBuildParser, + TaskQueueParser, + TorchOpParser +) from ..prof_view._kernel_view_parser import KernelViewParser from ..prof_view._operator_view_parser import OperatorViewParser from ..prof_view.prepare_parse._relation_parser import RelationParser @@ -37,6 +43,9 @@ class ParserConfig: LEVEL_NONE_CONFIG = { Constant.Text: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TorchOpParser, + TaskQueueParser, + PythonTracePreParser, TracePreParser, TreeBuildParser, CANNExportParser, @@ -52,6 +61,9 @@ class ParserConfig: }, Constant.Db: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TorchOpParser, + TaskQueueParser, + PythonTracePreParser, CANNExportParser, CANNTimelineParser, CANNAnalyzeParser, @@ -65,6 +77,9 @@ class ParserConfig: COMMON_CONFIG = { Constant.Text: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TorchOpParser, + TaskQueueParser, + PythonTracePreParser, TracePreParser, TreeBuildParser, CANNExportParser, @@ -80,13 +95,17 @@ class ParserConfig: IntegrateParser, CommunicationParser ], - Constant.EXPORT_CHROME_TRACE: [TracePreParser, TreeBuildParser, CANNExportParser, CANNTimelineParser, - TraceViewParser], - Constant.EXPORT_STACK: [TreeBuildParser, CANNExportParser, CANNTimelineParser, StackViewParser], + Constant.EXPORT_CHROME_TRACE: [TorchOpParser, TaskQueueParser, TracePreParser, TreeBuildParser, + CANNExportParser, CANNTimelineParser, TraceViewParser], + Constant.EXPORT_STACK: [TorchOpParser, TaskQueueParser, TreeBuildParser, CANNExportParser, CANNTimelineParser, + StackViewParser], Constant.EXPORT_MEMORY_TIMELINE: [MemoryTimelineParser] }, Constant.Db: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TorchOpParser, + TaskQueueParser, + PythonTracePreParser, CANNExportParser, CANNTimelineParser, CANNAnalyzeParser, @@ -111,6 +130,9 @@ class ParserConfig: PARSER_NAME_MAP = { # text parser + TorchOpParser: Constant.TORCH_OP_PARSER, + TaskQueueParser: Constant.TASK_QUEUE_PARSER, + PythonTracePreParser: Constant.PYTHON_TRACE_PRE_PARSER, TracePreParser: Constant.TRACE_PRE_PARSER, TreeBuildParser: Constant.TREE_BUILD_PARSER, CANNExportParser: Constant.CANN_EXPORT_PARSER, diff --git a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py index e61ecc36900de08a5d2477166b54a66f6e48360e..267ce7207dc30028ae5832427ba6b661a477ae48 100644 --- a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py +++ b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py @@ -21,41 +21,53 @@ __all__ = [] class ParserDepsConfig: COMMON_CONFIG = { - Constant.TRACE_PRE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, - Constant.TREE_BUILD_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.TORCH_OP_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.TASK_QUEUE_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.PYTHON_TRACE_PRE_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER]}, + Constant.TRACE_PRE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, + Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER, + Constant.PYTHON_TRACE_PRE_PARSER]}, + Constant.TREE_BUILD_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER]}, Constant.CANN_EXPORT_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, Constant.CANN_TIMELINE_PARSER: {Constant.MODE: ConcurrentMode.NON_BLOCKING | ConcurrentMode.PTHREAD, Constant.DEPS: []}, Constant.RELATION_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.CANN_TIMELINE_PARSER]}, + Constant.DEPS: [Constant.TASK_QUEUE_PARSER, Constant.CANN_TIMELINE_PARSER]}, Constant.CANN_ANALYZE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.CANN_TIMELINE_PARSER]}, Constant.OPERATOR_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_TIMELINE_PARSER, - Constant.RELATION_PARSER]}, + Constant.RELATION_PARSER, Constant.TORCH_OP_PARSER]}, Constant.TRACE_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.TRACE_PRE_PARSER, - Constant.CANN_TIMELINE_PARSER]}, + Constant.CANN_TIMELINE_PARSER, Constant.TASK_QUEUE_PARSER, + Constant.PYTHON_TRACE_PRE_PARSER]}, Constant.KERNEL_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_EXPORT_PARSER, Constant.RELATION_PARSER]}, Constant.TRACE_STEP_TIME_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_TIMELINE_PARSER, - Constant.RELATION_PARSER]}, + Constant.RELATION_PARSER, Constant.TORCH_OP_PARSER]}, Constant.MEMORY_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.CANN_EXPORT_PARSER, Constant.MEMORY_PREPARE]}, + Constant.DEPS: [Constant.CANN_EXPORT_PARSER, Constant.MEMORY_PREPARE, + Constant.TORCH_OP_PARSER]}, Constant.INTEGRATE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.CANN_EXPORT_PARSER]}, Constant.COMMUNICATION_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_ANALYZE_PARSER, Constant.RELATION_PARSER]}, Constant.STACK_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_TIMELINE_PARSER]}, + Constant.DEPS: [Constant.TASK_QUEUE_PARSER, Constant.TREE_BUILD_PARSER, + Constant.CANN_TIMELINE_PARSER, Constant.TORCH_OP_PARSER]}, Constant.MEMORY_PREPARE: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.TREE_BUILD_PARSER]}, + Constant.DEPS: [Constant.TASK_QUEUE_PARSER, Constant.TREE_BUILD_PARSER, + Constant.TORCH_OP_PARSER]}, Constant.DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.CANN_EXPORT_PARSER, Constant.MEMORY_PREPARE, - Constant.TREE_BUILD_PARSER, Constant.CANN_ANALYZE_PARSER]}, + Constant.DEPS: [Constant.CANN_EXPORT_PARSER, Constant.MEMORY_PREPARE, Constant.TORCH_OP_PARSER, + Constant.TREE_BUILD_PARSER, Constant.CANN_ANALYZE_PARSER, Constant.TASK_QUEUE_PARSER, + Constant.PYTHON_TRACE_PRE_PARSER]}, Constant.MEMORY_TIMELINE_PARSER: {} } diff --git a/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py b/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py index dc7142738ab212ac58a5caeba14230a2f138a404..6e437f3c9ed2f19267781ca214e32066b66733a5 100644 --- a/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py +++ b/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py @@ -1,5 +1,5 @@ import os - +from collections import defaultdict from ._fwk_file_parser import FwkFileParser from .._profiler_config import ProfilerConfig from ..prof_bean._torch_op_node import TorchOpNode @@ -20,16 +20,14 @@ class FwkCANNRelationParser: def combine_kernel_dict(cls, acl_to_npu_dict: dict, dequeue_data_list: list): if not dequeue_data_list: return acl_to_npu_dict - kernel_dict = {} + kernel_dict = defaultdict(list) index = 0 - acl_start_time_list = sorted(list(acl_to_npu_dict.keys())) - for acl_start_time in acl_start_time_list: + for acl_start_time in sorted(acl_to_npu_dict.keys()): while index < len(dequeue_data_list): if dequeue_data_list[index].ts > acl_start_time: break if acl_start_time <= dequeue_data_list[index].ts + dequeue_data_list[index].dur: - kernel_dict.setdefault(dequeue_data_list[index].corr_id, []).extend( - acl_to_npu_dict.get(acl_start_time, [])) + kernel_dict[dequeue_data_list[index].corr_id].extend(acl_to_npu_dict.get(acl_start_time, [])) break index += 1 return kernel_dict @@ -48,7 +46,7 @@ class FwkCANNRelationParser: break index += 1 - def get_kernel_dict(self) -> dict: + def get_kernel_dict(self, dequeue_data: list) -> dict: acl_to_npu_dict = CANNFileParser(self._profiler_path).get_acl_to_npu_data() if not acl_to_npu_dict and ProfilerConfig().get_level() != Constant.LEVEL_NONE: error_msg = ( @@ -57,8 +55,7 @@ class FwkCANNRelationParser: ) print_error_msg(error_msg) return acl_to_npu_dict - dequeue_data_list = FwkFileParser(self._profiler_path).get_dequeue_data() - return self.combine_kernel_dict(acl_to_npu_dict, dequeue_data_list) + return self.combine_kernel_dict(acl_to_npu_dict, dequeue_data) def get_step_range(self, root_node: TorchOpNode, kernel_dict: dict): if not kernel_dict: @@ -69,10 +66,10 @@ class FwkCANNRelationParser: if not step_node_list: self.logger.warning("Get step range failed, the step node list is empty.") return [] - + # Gather flow events start time in each step node if not FwkFileParser(self._profiler_path).has_task_queue_data(): - acl_start_time_list = sorted(list(kernel_dict.keys())) + acl_start_time_list = sorted(kernel_dict.keys()) self._update_step_node_info(step_node_list, acl_start_time_list) # Get step range on device by flow events step_range = [] diff --git a/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py b/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py index f78e3821d6e353e12c75ba04909506c1bc6db27e..f6bb4f6814d254fb213dc9b7285709ac86c3fb2a 100644 --- a/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py +++ b/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py @@ -1,6 +1,7 @@ import os import re from collections import defaultdict +from typing import List, Tuple from ..prof_bean._torch_op_bean import TorchOpBean from ..prof_common_func._binary_decoder import BinaryDecoder @@ -13,7 +14,6 @@ from ..prof_common_func._trace_event_manager import TraceEventManager from ..prof_common_func._tree_builder import TreeBuilder from ..prof_common_func._log import ProfilerLogger from ..prof_config._fwk_file_parser_config import FwkFileParserConfig -from ._python_trace_parser import PythonTraceParser __all__ = [] @@ -28,7 +28,7 @@ class FwkFileParser: ProfilerLogger.init(self._profiler_path, "FwkFileParser") self.logger = ProfilerLogger.get_instance() - def get_file_data_by_tag(self, file_tag: int) -> list: + def get_file_data_by_tag(self, file_tag: FileTag) -> list: file_path = self._file_list.get(file_tag) if not file_path: return [] @@ -41,63 +41,7 @@ class FwkFileParser: else: return BinaryDecoder.decode(all_bytes, file_bean, struct_size) - def get_enqueue_data(self) -> list: - enqueue_data_list = [] - op_mark_data = self.get_file_data_by_tag(FileTag.OP_MARK) - if not op_mark_data: - self.logger.error("Get enqueue data failed, the op mark data is empty.") - return enqueue_data_list - op_mark_data.sort(key=lambda x: x.time_ns) - tid_op_dict = defaultdict(lambda: defaultdict(list)) - match_failed_num = 0 - for op_mark in op_mark_data: - if not op_mark.is_enqueue: - continue - if op_mark.is_enqueue_start: - tid_op_dict[op_mark.tid][op_mark.origin_name].append(op_mark) - continue - start_op_list = tid_op_dict.get(op_mark.tid, {}).get(op_mark.origin_name, []) - if not start_op_list: - match_failed_num += 1 - continue - start_op = start_op_list.pop() - op_mark.ts = start_op.time_ns - op_mark.dur = op_mark.time_ns - start_op.time_ns - enqueue_data_list.append(op_mark) - start_op_list.clear() - if match_failed_num: - self.logger.warning(f"{match_failed_num} enqueue data match failed.") - return enqueue_data_list - - def get_dequeue_data(self) -> list: - dequeue_data_list = [] - op_mark_data = self.get_file_data_by_tag(FileTag.OP_MARK) - if not op_mark_data: - self.logger.error("Get dequeue data failed, the op mark data is empty.") - return dequeue_data_list - op_mark_data.sort(key=lambda x: x.time_ns) - tid_op_dict = defaultdict(lambda: defaultdict(list)) - match_failed_num = 0 - for op_mark in op_mark_data: - if not op_mark.is_dequeue: - continue - if op_mark.is_dequeue_start: - tid_op_dict[op_mark.tid][op_mark.origin_name].append(op_mark) - continue - start_op_list = tid_op_dict.get(op_mark.tid, {}).get(op_mark.origin_name, []) - if not start_op_list: - match_failed_num += 1 - continue - start_op = start_op_list.pop() - op_mark.ts = start_op.time_ns - op_mark.dur = op_mark.time_ns - start_op.time_ns - dequeue_data_list.append(op_mark) - start_op_list.clear() - if match_failed_num: - self.logger.warning(f"{match_failed_num} enqueue data match failed.") - return dequeue_data_list - - def get_task_queue_data(self) -> any: + def get_task_queue_data(self) -> Tuple[List, List]: enqueue_data_list, dequeue_data_list = [], [] op_mark_data = self.get_file_data_by_tag(FileTag.OP_MARK) if not op_mark_data: @@ -140,20 +84,15 @@ class FwkFileParser: self.logger.warning(f"{dequeue_match_failed_num} dequeue data match failed.") return enqueue_data_list, dequeue_data_list - def get_torch_op_tree_node(self, only_fwk: bool = False) -> list: - torch_op_list = self.get_file_data_by_tag(FileTag.TORCH_OP) - if not torch_op_list: + def get_torch_op_tree_node(self, torch_op_data: list, enqueue_data: list = []) -> list: + if not torch_op_data: self.logger.error("Get torch op tree node failed, the torch op data is empty.") return [] - enqueue_data_list = [] - if not only_fwk: - enqueue_data_list = self.get_enqueue_data() - result_data = TreeBuilder.build_tree(torch_op_list, enqueue_data_list) + result_data = TreeBuilder.build_tree(torch_op_data, enqueue_data) return result_data - def get_fwk_trace_data(self): - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) - enqueue_data_list, dequeue_data_list = self.get_task_queue_data() + def get_fwk_trace_data(self, torch_op_data: list, enqueue_data_list: list, dequeue_data_list: list, + python_trace_data: list) -> list: if torch_op_data: pid = torch_op_data[0].pid elif enqueue_data_list or dequeue_data_list: @@ -163,9 +102,9 @@ class FwkFileParser: return [] tid_dict = {} fwk_x_event_list = [None] * ( - len(torch_op_data) + len(enqueue_data_list) * 2 + len(dequeue_data_list) * 2) + len(torch_op_data) + len(enqueue_data_list) * 2 + len(dequeue_data_list) * 2 + len(python_trace_data)) index = 0 - fwd_dict = {} + fwd_dict = defaultdict(dict) correlation_id_name_dict = {} for torch_op in torch_op_data: self.filter_fwd_bwd_event(fwd_dict, torch_op) @@ -188,25 +127,19 @@ class FwkFileParser: index += 1 fwk_x_event_list[index] = TraceEventManager.create_task_queue_flow(Constant.FLOW_START_PH, enqueue_data) index += 1 + for python_trace in python_trace_data: + fwk_x_event_list[index] = TraceEventManager.create_x_event(python_trace, "python_function") + index += 1 other_event_list = TraceEventManager.create_m_event(pid, tid_dict) other_event_list.extend(TraceEventManager.create_fwd_flow(fwd_dict)) fwk_x_event_list.extend(other_event_list) - python_trace_data = self.get_python_trace_data(set(tid_dict.keys())) - if python_trace_data: - fwk_x_event_list.extend(python_trace_data) gc_record_data = self.get_gc_record_trace_data() if gc_record_data: fwk_x_event_list.extend(gc_record_data) return fwk_x_event_list - def get_python_trace_data(self, torch_tids: set) -> list: - trace_hash_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_HASH) - func_call_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_FUNC) - python_trace_parser = PythonTraceParser(torch_tids, trace_hash_data, func_call_data) - return python_trace_parser.get_python_trace_data() - @classmethod - def filter_fwd_bwd_event(cls, fwd_dict: dict, torch_op: TorchOpBean): + def filter_fwd_bwd_event(cls, fwd_dict: defaultdict, torch_op: TorchOpBean): seq_num = torch_op.args.get("Sequence number", -1) if seq_num < 0: return @@ -214,7 +147,7 @@ class FwkFileParser: mode = "start" if torch_op.args.get("Fwd thread id") == 0 else "end" if fwd_event.get(mode, {}).get("ts", -float('inf')) < torch_op.ts: node = {mode: {'pid': torch_op.pid, 'tid': torch_op.tid, 'ts': torch_op.ts}} - fwd_dict.setdefault(seq_num, {}).update(node) + fwd_dict[seq_num].update(node) def has_task_queue_data(self): return bool(self._file_list.get(FileTag.OP_MARK)) @@ -249,9 +182,8 @@ class FwkFileParser: start_connection_id += 1 - def get_fwk_api(self) -> dict: - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) - enqueue_data_list, dequeue_data_list = self.get_task_queue_data() + def get_fwk_api(self, torch_op_data: list, enqueue_data_list: list, dequeue_data_list: list, + python_trace_data: list) -> dict: if torch_op_data: pid = torch_op_data[0].pid elif enqueue_data_list or dequeue_data_list: @@ -264,7 +196,6 @@ class FwkFileParser: fwd_bwd_dict = {} torch_op_idx = 0 mstx_mark_apis = [] - torch_tids = set() for torch_op in torch_op_data: api = [torch_op.ts, torch_op.end_ns, contact_2num(pid, torch_op.tid), [], torch_op.name, @@ -277,7 +208,6 @@ class FwkFileParser: torch_op_apis.append(api) self.filter_fwd_bwd_api(fwd_bwd_dict, torch_op, torch_op_idx) torch_op_idx += 1 - torch_tids.add(torch_op.tid) connection_ids = [] task_enqueues = [] @@ -288,7 +218,7 @@ class FwkFileParser: [dequeue_data.ts, dequeue_data.ts + dequeue_data.dur, contact_2num(pid, dequeue_data.tid), dequeue_data.corr_id, dequeue_data.name]) correlation_id_name_dict[dequeue_data.corr_id] = dequeue_data.origin_name - torch_tids.add(dequeue_data.tid) + for enqueue_data in enqueue_data_list: name = enqueue_data.name if enqueue_data.corr_id in correlation_id_name_dict: @@ -298,30 +228,34 @@ class FwkFileParser: [enqueue_data.ts, enqueue_data.ts + enqueue_data.dur, contact_2num(pid, enqueue_data.tid), enqueue_data.corr_id, name]) connection_ids.append(enqueue_data.corr_id) - torch_tids.add(enqueue_data.tid) start_connection_id = max(connection_ids) + 1 if connection_ids else 0 self.update_fwd_bwd_connection_id(fwd_bwd_dict, torch_op_apis, start_connection_id) - trace_hash_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_HASH) - func_call_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_FUNC) - python_trace_parser = PythonTraceParser(torch_tids, trace_hash_data, func_call_data) - python_trace_apis = python_trace_parser.get_python_trace_api_data() + python_trace_apis = [None] * len(python_trace_data) + for i, event in enumerate(python_trace_data): + python_trace_apis[i] = [event.ts, event.ts + event.dur, contact_2num(event.pid, event.tid), event.name] + return {"torch_op": torch_op_apis, "task_enqueues": task_enqueues, "task_dequeues": task_dequeues, "python_trace": python_trace_apis, "mstx_op": mstx_mark_apis} - def get_first_fwk_op(self): - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) + def get_first_fwk_op(self, torch_op_data: list): if not torch_op_data: return None return min(torch_op_data, key=lambda op: op.ts) - def get_torch_op_tids(self): - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) + def get_torch_op_tids(self, torch_op_data: list = []): + if not torch_op_data: + torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) if not torch_op_data: return set() return {op.tid for op in torch_op_data} + def get_task_queue_tids(self, enqueue_data: list, dequeue_data: list): + enqueue_tids = {op.tid for op in enqueue_data} + dequeue_tids = {op.tid for op in dequeue_data} + return enqueue_tids | dequeue_tids + def get_gc_record_db_data(self): gc_events = self.get_file_data_by_tag(FileTag.GC_RECORD) if not gc_events: diff --git a/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py b/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py index d97c6786cea2991721581b829ea9f801378e4386..062bd4bc12658ba2b1b919f2680c39bfce40bd68 100644 --- a/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py +++ b/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py @@ -9,10 +9,10 @@ __all__ = [] MODULE_NAME_DELIMITER = "######" -class TraceTag(Enum): +class TraceTag(Enum): kPy_Call = 0 kPy_Return = 1 - kC_Call = 2 + kC_Call = 2 kC_Return = 3 @@ -58,7 +58,7 @@ class PyTraceEvent: @property def parent_id(self): return self._parent_id - + @parent_id.setter def parent_id(self, parent_id): self._parent_id = parent_id @@ -95,7 +95,7 @@ class PyTraceEvent: @property def dur(self): return self._end_time - self._start_time - + @property def params(self): return self._params @@ -131,27 +131,9 @@ class PythonTraceParser: self._hash_map = {} self._param_map = {} - def get_python_trace_data(self) -> list: - trace_event_list = self._gen_python_trace_event_data() - if not trace_event_list: - return [] - trace_data = [None] * len(trace_event_list) - for i, event in enumerate(trace_event_list): - trace_data[i] = TraceEventManager.create_x_event(event, "python_function") - return trace_data - - def get_python_trace_api_data(self) -> list: - trace_event_list = self._gen_python_trace_event_data() - if not trace_event_list: - return [] - trace_api_data = [None] * len(trace_event_list) - for i, event in enumerate(trace_event_list): - trace_api_data[i] = [event.ts, event.ts + event.dur, contact_2num(event.pid, event.tid), event.name] - return trace_api_data - def get_pycall_data(self) -> list: self._gen_param_map() - return self._gen_python_trace_event_data() + return self.gen_python_trace_event_data() def _group_tarce_data_by_tid(self): trace_data_by_tid = defaultdict(lambda: []) @@ -160,7 +142,7 @@ class PythonTraceParser: trace_data_by_tid[call_bean.tid].append(call_bean) return trace_data_by_tid - def _gen_python_trace_event_data(self): + def gen_python_trace_event_data(self): self._gen_hash_map() trace_event_by_tid = self._group_tarce_data_by_tid() trace_event_list = [] @@ -222,7 +204,7 @@ class PythonTraceParser: def _gen_hash_map(self): self._hash_map = {hash_bean.key: hash_bean.value for hash_bean in self._hash_data} - + def _gen_param_map(self): if self._param_data is not None: self._param_map = {param_bean.key: param_bean.params for param_bean in self._param_data} diff --git a/torch_npu/profiler/analysis/prof_view/_communication_parser.py b/torch_npu/profiler/analysis/prof_view/_communication_parser.py index e07f68b785b31eb509602a99a12760fad476a5f3..d0b9ab4bf79935fc1f0ab4abd5af7187b21f0093 100644 --- a/torch_npu/profiler/analysis/prof_view/_communication_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_communication_parser.py @@ -63,12 +63,14 @@ class CommunicationParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "CommunicationParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("CommunicationParser start.") try: self._init_step_list(deps_data) self.generate_view() except Exception as e: self.logger.error("Failed to generate communication.json or communication_matrix.json, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("CommunicationParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_integrate_parser.py b/torch_npu/profiler/analysis/prof_view/_integrate_parser.py index 28472a241177ed4f8f13c7b090e02a98db1113c2..1c296c953423b69cfedd3b35d6f589f7bda523ff 100644 --- a/torch_npu/profiler/analysis/prof_view/_integrate_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_integrate_parser.py @@ -30,12 +30,14 @@ class IntegrateParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "IntegrateParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("IntegrateParser start.") try: ProfilerConfig().load_info(self._profiler_path) self.generate_view() except Exception as e: self.logger.error("Failed to generate data_preprocess.csv or l2_cache.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("IntegrateParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py b/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py index ded9a612c6cfd98a7076fb749457e0c3da9aa44c..b913f5ac3c1a14cebed7b61cf5858126935ced34 100644 --- a/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py @@ -35,6 +35,7 @@ class KernelViewParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "KernelViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("KernelViewParser start.") try: ProfilerConfig().load_info(self._profiler_path) self._init_step_range(deps_data) @@ -42,6 +43,7 @@ class KernelViewParser(BaseParser): except Exception as e: self.logger.error("Failed to generate kernel_details.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("KernelViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py b/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py index 4cb4ed35db5af171fbc98b0d92ab3eb884956f65..0ea896036e33f7b4aae6bd6bc8e44980126f41af 100644 --- a/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py @@ -40,8 +40,11 @@ class MemoryPrepareParser(BaseParser): self.pta_record_list = [] self.memory_data = dict() self._torch_op_node = [] + self._torch_op_data = [] self._incomplete_num = 0 self._is_malloc_workspace_in_dequeue_enabled = False + self._enqueue_data = [] + self._dequeue_data = [] self._dequeue_record_dict = defaultdict(list) # {(pid, tid): [dequeue_records]} self._enqueue_record_dict = {} # {corrid: enqueue} self._dequeue_pids = set() @@ -62,19 +65,26 @@ class MemoryPrepareParser(BaseParser): return left def run(self, deps_data: dict): + self.logger.info("MemoryPrepareParser start.") try: self._torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + self._enqueue_data = task_queue_data.get("enqueue_data", []) + self._dequeue_data = task_queue_data.get("dequeue_data", []) self.generate_view() except Exception as e: self.logger.error("Failed to generate pytorch memory data, error: %s", str(e), exc_info=True) return Constant.FAIL, {} if self._incomplete_num > 0: print_warn_msg(f"{self._incomplete_num} memory record(s) are incomplete.") + self.logger.info("MemoryPrepareParser finish.") return Constant.SUCCESS, {"pta_record_list": self.pta_record_list, "memory_data": self.memory_data} def generate_view(self) -> None: ProfilerConfig().load_info(self._profiler_path) self._init_torch_op() + self._init_queue_info() self._add_pta_memory_data() def _find_matched_torch_op_name(self, mem_start_ts: int, torch_ops: list) -> str: @@ -88,35 +98,30 @@ class MemoryPrepareParser(BaseParser): return matched_torch_op.name def _init_queue_info(self): - enqueue_records = FwkFileParser(self._profiler_path).get_enqueue_data() - for enqueue_record in enqueue_records: - self._enqueue_record_dict[enqueue_record.corr_id] = enqueue_record - dequeue_records = FwkFileParser(self._profiler_path).get_dequeue_data() - for dequeue_record in dequeue_records: - self._dequeue_pids.add(dequeue_record.pid) - self._dequeue_tids.add(dequeue_record.tid) - key = (dequeue_record.pid, dequeue_record.tid) - self._dequeue_record_dict.setdefault(key, []).append(dequeue_record) + self._enqueue_record_dict = {record.corr_id: record for record in self._enqueue_data} + for record in self._dequeue_data: + self._dequeue_pids.add(record.pid) + self._dequeue_tids.add(record.tid) + self._dequeue_record_dict[(record.pid, record.tid)].append(record) def _add_pta_memory_data(self): - self._init_queue_info() pta_memory_data = FwkFileParser(self._profiler_path).get_file_data_by_tag(FileTag.MEMORY) - npu_memory_dict = {} - torch_op_dict = {} - pta_memory_data = sorted(pta_memory_data, key=lambda x: x.time_ns) + npu_memory_dict = defaultdict(list) + torch_op_dict = defaultdict(list) + pta_memory_data.sort(key=lambda x: x.time_ns) for record in pta_memory_data: if record.is_npu(): if record.is_inner_allocator(): - npu_memory_dict.setdefault(record.pid, []).append(record) + npu_memory_dict[record.pid].append(record) self.pta_record_list.append(record) for torch_op in self._torch_op_node: - torch_op_dict.setdefault(torch_op.pid, []).append(torch_op) + torch_op_dict[torch_op.pid].append(torch_op) for pid_key, memory_records in npu_memory_dict.items(): torch_ops = torch_op_dict.get(pid_key, []) if not torch_ops: warn(f"Lack of torch ops to connect memory record, whose process id is {pid_key}") continue - torch_ops = sorted(torch_ops, key=lambda x: x.start_time) + torch_ops.sort(key=lambda x: x.start_time) memory_dict = defaultdict(list) for record in memory_records: memory_dict[record.ptr].append(record) @@ -236,7 +241,7 @@ class MemoryPrepareParser(BaseParser): active_duration_time, records[0].total_allocated_for_db, records[0].total_reserved_for_db, records[0].total_active_for_db, records[free_idx].total_allocated_for_db, records[free_idx].total_reserved_for_db, records[free_idx].total_active_for_db, records[0].stream_ptr, records[0].device_index] - ret_list.append(combine_data[:]) + ret_list.append(combine_data) return ret_list def _complete_record_entry(self, ptr_records: list, torch_ops: list) -> list: @@ -281,11 +286,11 @@ class MemoryPrepareParser(BaseParser): active_duration_time, records[0].total_allocated, records[0].total_reserved, records[0].total_active, records[free_idx].total_allocated, records[free_idx].total_reserved, records[free_idx].total_active, records[0].stream_ptr, records[0].device_tag] - ret_list.append(combine_data[:]) + ret_list.append(combine_data) return ret_list def _init_torch_op(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) + self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) if self._torch_op_node: self._torch_op_node = self._torch_op_node[1:] diff --git a/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py b/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py index 47255efd09dbdca635e4888fd575f311fbcff5ef..997ec642b67ac83f21952cd34decda0d2d706cf3 100644 --- a/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py @@ -34,6 +34,7 @@ class MemoryViewParser(BaseParser): self.ge_record_list = [] self.memory_data = [] self.component_list = [] + self._torch_op_data = [] @staticmethod def _get_data_from_file(file_set: set, file_type_bean: any, bean_list: bool = False) -> list: @@ -73,13 +74,16 @@ class MemoryViewParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "MemoryViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("MemoryViewParser start.") try: self.memory_data = deps_data.get(Constant.MEMORY_PREPARE, {}).get("memory_data", {}).get(Constant.Text, []) self.pta_record_list = deps_data.get(Constant.MEMORY_PREPARE, {}).get("pta_record_list", []) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate operator_memory.csv or memory_record.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("MemoryViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: @@ -151,8 +155,8 @@ class MemoryViewParser(BaseParser): def _init_pta_data(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - torch_nop_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) - deps_data = {Constant.TREE_BUILD_PARSER: torch_nop_node} + torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) + deps_data = {Constant.TREE_BUILD_PARSER: torch_op_node, Constant.TORCH_OP_PARSER: self._torch_op_data} _, pta_data = MemoryPrepareParser(Constant.MEMORY_PREPARE, self._param_dict).run(deps_data) self.memory_data = pta_data.get("memory_data", {}).get(Constant.Text, []) self.pta_record_list = pta_data.get("pta_record_list", []) diff --git a/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py b/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py index 7c10e9d4bf45c2881fb8bd04ae3c2b1124f578c5..a75625fa1c9c27be094179ac851a839a31ed78f6 100644 --- a/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py @@ -20,19 +20,23 @@ class OperatorViewParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) self._torch_op_node = [] + self._torch_op_data = [] self._root_node = None self._kernel_dict = {} def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "OperatorViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("OperatorViewParser start.") try: self._torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) self._kernel_dict = deps_data.get(Constant.RELATION_PARSER, {}) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate operator_details.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("OperatorViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: @@ -70,7 +74,7 @@ class OperatorViewParser(BaseParser): def _init_torch_op(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) + self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) if self._torch_op_node: self._root_node = self._torch_op_node[0] self._torch_op_node = self._torch_op_node[1:] diff --git a/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py b/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py index b4a85271d99034e55936d682e9b4748f6251cf11..07936e7e5566bbb536bd45a09862fbecb82cb8a1 100644 --- a/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py @@ -11,7 +11,6 @@ from ..prof_common_func._file_manager import FileManager from ..prof_common_func._log import ProfilerLogger from ..prof_parse._fwk_cann_relation_parser import FwkCANNRelationParser from ..prof_parse._fwk_file_parser import FwkFileParser -from ....utils._path_manager import PathManager __all__ = [] @@ -19,20 +18,26 @@ __all__ = [] class StackViewParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) + self._torch_op_data = [] self._torch_op_node = [] self._root_node = None + self._dequeue_data = [] self._kernel_dict = {} self._metric = param_dict.get("metric") def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "StackViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("StackViewParser start.") try: self._torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + self._dequeue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}).get("dequeue_data", []) self.generate_view() except Exception as e: self.logger.error("Failed to export stack, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("StackViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: @@ -70,14 +75,14 @@ class StackViewParser(BaseParser): def _init_data(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) + self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) if not self._torch_op_node: return self._root_node = self._torch_op_node[0] self._torch_op_node = self._torch_op_node[1:] if self._metric == Constant.METRIC_NPU_TIME: - self._kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict() + self._kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict(self._dequeue_data) if not FwkFileParser(self._profiler_path).has_task_queue_data(): for acl_ts in self._kernel_dict.keys(): TreeBuilder.update_tree_node_info(acl_ts, self._root_node) diff --git a/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py b/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py index bcdb7d2c6eb3092cfee64b681534cab1357ba89c..19fd2db8cd772402b10041655ae6a2884a9a8ac8 100644 --- a/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py @@ -50,6 +50,7 @@ class TraceStepTimeParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) + self.torch_op_data = [] self.step_range = [] @classmethod @@ -102,7 +103,7 @@ class TraceStepTimeParser(BaseParser): if cur_step[StepInfoIndex.ID.value] == step: first_task_start_ts = cur_step[StepInfoIndex.FIRST_TASK_TS.value] if step is None: - first_fwk_op = FwkFileParser(self._profiler_path).get_first_fwk_op() + first_fwk_op = FwkFileParser(self._profiler_path).get_first_fwk_op(self.torch_op_data) return (first_task_start_ts - convert_ns2us_float(first_fwk_op.ts)) if first_fwk_op else 0 return first_task_start_ts - cur_step[StepInfoIndex.FWK_START_TS.value] return 0 @@ -165,12 +166,15 @@ class TraceStepTimeParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "TraceStepTimeParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("TraceStepTimeParser start.") try: self._init_step_range(deps_data) + self.torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate step_trace_time.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("TraceStepTimeParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py b/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py index c5e572e1bcfeba5ecaa4c4e6db93b47c896392eb..2438bac3eccc365692f67d9214e34f2f13b041bc 100644 --- a/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py @@ -26,6 +26,10 @@ class TraceViewParser(BaseParser): self._output_path) else self._output_path self._trace_data = [] self._torch_op_node = [] + self._torch_op_data = [] + self._enqueue_data = [] + self._dequeue_data = [] + self._python_trace_data = [] self._root_node = None @staticmethod @@ -47,21 +51,29 @@ class TraceViewParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "TraceViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("TraceViewParser start.") try: ProfilerConfig().load_info(self._profiler_path) torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) if torch_op_node: self._root_node = torch_op_node[0] self._torch_op_node = torch_op_node[1:] + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + self._enqueue_data = task_queue_data.get("enqueue_data", []) + self._dequeue_data = task_queue_data.get("dequeue_data", []) + self._python_trace_data = deps_data.get(Constant.PYTHON_TRACE_PRE_PARSER, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate trace_view.json, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("TraceViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data() + self._trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data( + self._torch_op_data, self._enqueue_data, self._dequeue_data, self._python_trace_data) else: msprof_timeline_data = CANNFileParser(self._profiler_path).get_timeline_all_data() self._trace_data.extend( @@ -86,8 +98,7 @@ class TraceViewParser(BaseParser): flow_event_list.extend( TraceEventManager.create_torch_to_npu_flow(matched_torch_op.event, kernel)) return flow_event_list - dequeue_data_list = FwkFileParser(self._profiler_path).get_dequeue_data() - kernel_dict = FwkCANNRelationParser.combine_kernel_dict(acl_to_npu_dict, dequeue_data_list) + kernel_dict = FwkCANNRelationParser.combine_kernel_dict(acl_to_npu_dict, self._dequeue_data) for torch_op_node in self._torch_op_node: for corr_id in torch_op_node.corr_id_self: kernel_list = kernel_dict.get(corr_id, []) diff --git a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py index 9c1916753f7845a005918d56b0ffceb17ebdcc00..8a1439b9636ad5a23f59c90ee17b05fc874f00f8 100644 --- a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py +++ b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py @@ -38,6 +38,7 @@ class CANNAnalyzeParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "CANNAnalyzeParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("CANNAnalyzeParser start.") try: ProfilerConfig().load_info(self._profiler_path) if not os.path.isdir(self._cann_path): @@ -63,4 +64,5 @@ class CANNAnalyzeParser(BaseParser): print_error_msg("Failed to analyze CANN Profiling data.") self.logger.error("Failed to analyze CANN Profiling data, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("CANNAnalyzeParser finish.") return Constant.SUCCESS, None diff --git a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py index b2fa92ddbffacae8672d98afe788609db925e155..e1fe4a2c219b8dfe37031c8a096360a0ca9990d0 100644 --- a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py +++ b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py @@ -46,6 +46,7 @@ class CANNExportParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "CANNExportParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("CANNExportParser start.") try: ProfilerConfig().load_info(self._profiler_path) if not os.path.isdir(self._cann_path): @@ -73,6 +74,7 @@ class CANNExportParser(BaseParser): return Constant.FAIL, None end_time = datetime.utcnow() print_info_msg(f"CANN profiling data parsed in a total time of {end_time - start_time}") + self.logger.info("CANNExportParser finish.") return Constant.SUCCESS, None def _check_msprof_environment(self): @@ -138,6 +140,9 @@ class CANNTimelineParser(BaseParser): self._cann_path = ProfilerPathManager.get_cann_path(self._profiler_path) def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "CANNTimelineParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("CANNTimelineParser start.") if not os.path.isdir(self._cann_path): return Constant.SUCCESS, None ProfilerConfig().load_info(self._profiler_path) @@ -147,6 +152,7 @@ class CANNTimelineParser(BaseParser): if os.path.exists(output_path): for file_name in os.listdir(output_path): if file_name.endswith('.csv'): + self.logger.info("CANNTimelineParser finish.") return Constant.SUCCESS, None try: time.sleep(Constant.SLEEP_TIME) @@ -157,6 +163,7 @@ class CANNTimelineParser(BaseParser): while True: for file in os.listdir(self._cann_path): if re.match(patten, file) and os.path.isfile(os.path.join(self._cann_path, file)): + self.logger.info("CANNTimelineParser finish.") return Constant.SUCCESS, None try: time.sleep(Constant.SLEEP_TIME) diff --git a/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py b/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py index a54ec86d4063512977acd4d6314b34f7f1f3616e..731002c56bab2855e896c6579ac1bd3cf79c55fb 100644 --- a/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py @@ -17,8 +17,10 @@ import os from ...prof_common_func._constant import Constant from ...prof_common_func._file_manager import FileManager +from ...prof_common_func._file_tag import FileTag from ...prof_common_func._log import ProfilerLogger from ...prof_parse._fwk_file_parser import FwkFileParser +from ...prof_parse._python_trace_parser import PythonTraceParser from ...prof_view._base_parser import BaseParser __all__ = [] @@ -32,14 +34,22 @@ class TracePreParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "TracePreParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("TracePreParser start.") try: - fwk_trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data() + torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + enqueue_data = task_queue_data.get("enqueue_data", []) + dequeue_data = task_queue_data.get("dequeue_data", []) + python_trace_data = deps_data.get(Constant.PYTHON_TRACE_PRE_PARSER, []) + fwk_trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data( + torch_op_data, enqueue_data, dequeue_data, python_trace_data) trace_file_path = os.path.join(self._output_path, Constant.TRACE_VIEW_TEMP) if os.path.isdir( self._output_path) else self._output_path FileManager.create_prepare_trace_json_by_path(trace_file_path, fwk_trace_data) except Exception as e: self.logger.error("Failed to create prepare trace json, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("TracePreParser finish.") return Constant.SUCCESS, None @@ -47,13 +57,81 @@ class TreeBuildParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) - ProfilerLogger.init(self._profiler_path, "TracePreParser") - self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "TreeBuildParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("TreeBuildParser start.") + enqueue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}).get("enqueue_data", []) + torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) try: - torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node() + torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(torch_op_data, enqueue_data) except Exception as e: self.logger.error("Failed to build torch op tree, error: %s", str(e), exc_info=True) return Constant.FAIL, [] + self.logger.info("TreeBuildParser finish.") return Constant.SUCCESS, torch_op_node + + +class TaskQueueParser(BaseParser): + + def __init__(self, name: str, param_dict: dict): + super().__init__(name, param_dict) + + def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "TaskQueueParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("TaskQueueParser start.") + try: + enqueue_data, dequeue_data = FwkFileParser(self._profiler_path).get_task_queue_data() + except Exception as e: + self.logger.error("Failed to get task queue data, error: %s", str(e), exc_info=True) + return Constant.FAIL, {} + self.logger.info("TaskQueueParser finish.") + return Constant.SUCCESS, {"enqueue_data": enqueue_data, "dequeue_data": dequeue_data} + + +class TorchOpParser(BaseParser): + + def __init__(self, name: str, param_dict: dict): + super().__init__(name, param_dict) + + def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "TorchOpParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("TorchOpParser start.") + try: + torch_op_data = FwkFileParser(self._profiler_path).get_file_data_by_tag(FileTag.TORCH_OP) + except Exception as e: + self.logger.error("Failed to get torch op tree, error: %s", str(e), exc_info=True) + return Constant.FAIL, [] + self.logger.info("TorchOpParser finish.") + return Constant.SUCCESS, torch_op_data + + +class PythonTracePreParser(BaseParser): + + def __init__(self, name: str, param_dict: dict): + super().__init__(name, param_dict) + + def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "PythonTracePreParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("PythonTracePreParser start.") + try: + torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + enqueue_data = task_queue_data.get("enqueue_data", []) + dequeue_data = task_queue_data.get("dequeue_data", []) + fwk_file_parser = FwkFileParser(self._profiler_path) + torch_op_tids = fwk_file_parser.get_torch_op_tids(torch_op_data) + task_queue_tids = fwk_file_parser.get_task_queue_tids(enqueue_data, dequeue_data) + trace_hash_data = fwk_file_parser.get_file_data_by_tag(FileTag.PYTHON_TRACER_HASH) + func_call_data = fwk_file_parser.get_file_data_by_tag(FileTag.PYTHON_TRACER_FUNC) + python_trace_parser = PythonTraceParser(torch_op_tids | task_queue_tids, trace_hash_data, func_call_data) + python_tracer_data = python_trace_parser.gen_python_trace_event_data() + except Exception as e: + self.logger.error("Failed to get python tracer data, error: %s", str(e), exc_info=True) + return Constant.FAIL, [] + self.logger.info("PythonTracePreParser finish.") + return Constant.SUCCESS, python_tracer_data diff --git a/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py b/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py index 27437eaa654bf55529ec7f6c7e7577d4c237d440..f48c50713beea2853db87cec0b9634a8fcecd5c4 100644 --- a/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py @@ -23,13 +23,17 @@ __all__ = [] class RelationParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) + self._dequeue_data = [] def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "RelationParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("RelationParser start.") + self._dequeue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}).get("dequeue_data", []) try: - kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict() + kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict(self._dequeue_data) except Exception as e: self.logger.error("Failed to get acl to npu flow dict, error: %s", str(e), exc_info=True) return Constant.FAIL, {} + self.logger.info("RelationParser finish.") return Constant.SUCCESS, kernel_dict diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py index 940fa6417d5ccb0cc397327c12ece6ffb01a280a..fa65d23670036027e6166492a4094847fb770243 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py @@ -24,6 +24,7 @@ class BasicDbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("BasicDbParser start.") try: cann_db_path = self.get_cann_db_path() if cann_db_path: @@ -36,6 +37,7 @@ class BasicDbParser(BaseParser): except Exception as error: self.logger.error("Failed to generate basic db file. Error: %s", str(error), exc_info=True) return Constant.FAIL, "" + self.logger.info("BasicDbParser finish.") return Constant.SUCCESS, "" def get_cann_db_path(self): @@ -52,7 +54,7 @@ class BasicDbParser(BaseParser): continue return file_path return "" - + def create_ascend_db(self): if not TorchDb().create_connect_db(): raise RuntimeError(f"Failed to connect to db file: {TorchDb().get_db_path()}") @@ -64,7 +66,7 @@ class BasicDbParser(BaseParser): TableColumnsManager.TableColumns.get(DbConstant.TABLE_RANK_DEVICE_MAP)) TorchDb().insert_data_into_table(DbConstant.TABLE_RANK_DEVICE_MAP, [[ProfilerConfig().rank_id, ProfilerPathManager.get_device_id(self._cann_path)]]) - + def save_host_info_to_db(self): if TorchDb().judge_table_exist(DbConstant.TABLE_HOST_INFO): return diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py index 89cc322980ad7fab77e707897700e556c71bf3be..83cc061c6a96c4f439b6fb379e9dca15b1ad86b7 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py @@ -37,6 +37,7 @@ class DbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("DbParser start.") ProfilerConfig().load_info(self._profiler_path) torch_db_path = DbConstant.DB_ASCEND_PYTORCH_PROFILER if ProfilerConfig().rank_id != -1: @@ -60,4 +61,5 @@ class DbParser(BaseParser): finally: TorchDb().close() AnalysisDb().close() + self.logger.info("DbParser finish.") return Constant.SUCCESS, "" diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py index 2ae2ac6474e707e27f1717298b9de3cac8d725b9..c50b4eb791297a14cea745f608a5cb1d0d987cd3 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py @@ -61,16 +61,26 @@ class FwkApiDbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("FwkApiDbParser start.") try: self.init_db_connect() self.set_start_string_id() self.get_max_cann_id() - fwk_api_data = FwkFileParser(self._profiler_path).get_fwk_api() + torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + enqueue_data = task_queue_data.get("enqueue_data", []) + dequeue_data = task_queue_data.get("dequeue_data", []) + python_trace_data = deps_data.get(Constant.PYTHON_TRACE_PRE_PARSER, []) + fwk_api_data = FwkFileParser(self._profiler_path).get_fwk_api( + torch_op_data, enqueue_data, dequeue_data, python_trace_data) + self.logger.info("FwkApiDbParser get fwk api data.") self.get_api_data_for_db(fwk_api_data) + self.logger.info("FwkApiDbParser get api data for db.") self.save_api_data_to_db() except Exception as error: self.logger.error("Failed to generate framework api table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("FwkApiDbParser finish.") return Constant.SUCCESS, None def get_api_data_for_db(self, fwk_api_data: dict): diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py index a570e909e325fc36659bd1567b917246dd8147da..b464241f7c3e83ab87feb3d6ddd91c3870f08daa 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py @@ -30,6 +30,7 @@ class GCRecordDbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("GCRecordDbParser start.") try: self.init_db_connect() self._gc_record_data = FwkFileParser(self._profiler_path).get_gc_record_db_data() @@ -37,6 +38,7 @@ class GCRecordDbParser(BaseParser): except Exception as error: self.logger.error("Failed to generate gc record table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("GCRecordDbParser finish.") return Constant.SUCCESS, None def init_db_connect(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py index 34a5fc27f856530c83cb66ba93a63afe367aa746..64b6209655fb2d99ef68ab7262a8c1ad47a09d42 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py @@ -60,6 +60,7 @@ class MemoryDbParser(BaseParser): self._pta_record_list = [] self._ge_record_list = [] self._record_list = [] + self._torch_op_data = [] ProfilerLogger.init(self._profiler_path, "MemoryDbParser") self.logger = ProfilerLogger.get_instance() @@ -75,20 +76,23 @@ class MemoryDbParser(BaseParser): pta_ge_record_list[MemoryRecordTableRow.TOTAL_ACTIVATE.value] = last_record[MemoryRecordTableRow.TOTAL_ACTIVATE.value] + cur_record[MemoryRecordTableRow.TOTAL_ACTIVATE.value] pta_ge_record_list[MemoryRecordTableRow.STREAM_PTR.value] = cur_record[MemoryRecordTableRow.STREAM_PTR.value] if cur_record[MemoryRecordTableRow.STREAM_PTR.value] else last_record[MemoryRecordTableRow.STREAM_PTR.value] return [cur_record, pta_ge_record_list] - + def run(self, deps_data: dict): + self.logger.info("MemoryDbParser start.") try: self.init_db_connect() self.set_start_string_id() self._pta_op_memory_data = deps_data.get(Constant.MEMORY_PREPARE, {}).get("memory_data", {}).get(Constant.Db, []) self._pta_memory_bean_list = deps_data.get(Constant.MEMORY_PREPARE, {}).get("pta_record_list", []) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) self.init_pta_memory_data() self.save_memory_data_to_db() except Exception as error: self.logger.error("Failed to generate memory_record table or op_memory table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("MemoryDbParser finish.") return Constant.SUCCESS, None - + def init_db_connect(self): if not TorchDb().create_connect_db(): raise RuntimeError(f"Failed to connect to db file: {TorchDb().get_db_path()}") @@ -140,14 +144,16 @@ class MemoryDbParser(BaseParser): operator_value = namedtuple('operator_value', ['size', 'timestamp', 'total_allocate', 'total_reserve']) for mem_record in ge_mem_records: if mem_record[GeOpMemRecordsOri.SIZE.value] > 0: - record_key = operator_key(operator=mem_record[GeOpMemRecordsOri.NAME.value], addr=mem_record[GeOpMemRecordsOri.ADDR.value], device_id=mem_record[GeOpMemRecordsOri.DEVICE_ID.value]) + record_key = operator_key(operator=mem_record[GeOpMemRecordsOri.NAME.value], + addr=mem_record[GeOpMemRecordsOri.ADDR.value], device_id=mem_record[GeOpMemRecordsOri.DEVICE_ID.value]) record_value = operator_value(size=mem_record[GeOpMemRecordsOri.SIZE.value], timestamp=mem_record[GeOpMemRecordsOri.TIME_STAMP.value], total_allocate=mem_record[GeOpMemRecordsOri.TOTAL_ALLOCATED.value], total_reserve=mem_record[GeOpMemRecordsOri.TOTAL_RESERVED.value]) allocated_datas[record_key] = record_value elif mem_record[GeOpMemRecordsOri.SIZE.value] < 0: - record_key = operator_key(operator=mem_record[GeOpMemRecordsOri.NAME.value], addr=mem_record[GeOpMemRecordsOri.ADDR.value], device_id=mem_record[GeOpMemRecordsOri.DEVICE_ID.value]) + record_key = operator_key(operator=mem_record[GeOpMemRecordsOri.NAME.value], + addr=mem_record[GeOpMemRecordsOri.ADDR.value], device_id=mem_record[GeOpMemRecordsOri.DEVICE_ID.value]) record_value = operator_value(size=mem_record[GeOpMemRecordsOri.SIZE.value], timestamp=mem_record[GeOpMemRecordsOri.TIME_STAMP.value], total_allocate=mem_record[GeOpMemRecordsOri.TOTAL_ALLOCATED.value], @@ -191,7 +197,7 @@ class MemoryDbParser(BaseParser): memory_bean.total_allocated_for_db, memory_bean.total_reserved_for_db, memory_bean.total_active_for_db, memory_bean.stream_ptr, memory_bean.device_index]) - + def get_pta_ge_record_list(self): """ ge records are to be sorted firstly and pta records are already sorted, @@ -238,7 +244,7 @@ class MemoryDbParser(BaseParser): def init_pta_memory_data(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - torch_nop_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) + torch_nop_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) deps_data = {Constant.TREE_BUILD_PARSER: torch_nop_node} _, pta_data = MemoryPrepareParser(Constant.MEMORY_PREPARE, self._param_dict).run(deps_data) self._pta_op_memory_data = pta_data.get("memory_data", {}).get(Constant.Db, []) @@ -247,7 +253,7 @@ class MemoryDbParser(BaseParser): def save_strings_id(self): TorchDb().create_table_with_headers(DbConstant.TABLE_STRING_IDS, TableColumnsManager.TableColumns.get(DbConstant.TABLE_STRING_IDS)) TorchDb().insert_data_into_table(DbConstant.TABLE_STRING_IDS, Str2IdManager().get_all_string_2_id_data()) - + def save_memory_data_to_db(self): self.get_ge_memory_data() self.save_memory_record_data_to_db() diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py index df3b8fea4f29c6ff1f32ca1290623b56a2ff60f0..3c6e5674eba6fc9aaa0c2d160b213d065c2d01cb 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py @@ -31,12 +31,14 @@ class StepInfoDbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("StepInfoDbParser start.") try: torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) step_range = self.get_step_range(torch_op_node[0] if torch_op_node else None) except Exception as error: self.logger.error("Failed to get step info from db, error: %s", str(error), exc_info=True) return Constant.FAIL, [] + self.logger.info("StepInfoDbParser finish.") return Constant.SUCCESS, step_range def get_api_data_in_time_range(self, begin_ts, end_ts) -> list: diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py index db82064fdefbc32b0d034f8b34db9b8276e18208..4901bb0bfc5999d728d7a886bc5a5fc7d77eabaa 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py @@ -37,6 +37,7 @@ class TraceStepTimeDbParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) + self.torch_op_data = [] self.step_range = [] self.compute_task_info = defaultdict(list) self.communication_op_info = defaultdict(list) @@ -58,7 +59,7 @@ class TraceStepTimeDbParser(BaseParser): if not first_task_start_ts: return 0 if step_info.get(Constant.STEP_ID) is None: - first_fwk_op = FwkFileParser(self._profiler_path).get_first_fwk_op() + first_fwk_op = FwkFileParser(self._profiler_path).get_first_fwk_op(self.torch_op_data) return (first_task_start_ts - first_fwk_op.ts) if first_fwk_op else 0 return first_task_start_ts - step_info.get(Constant.FWK_START_TS, 0) @@ -71,13 +72,16 @@ class TraceStepTimeDbParser(BaseParser): AnalysisDb().insert_data_into_table(DbConstant.TABLE_STEP_TRACE_TIME, step_trace_data) def run(self, deps_data: dict): + self.logger.info("TraceStepTimeDbParser start.") try: + self.torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) self._init_step_range(deps_data) self._init_task_info_from_db() self.generate_view() except Exception as error: self.logger.error("Failed to generate step_trace_time table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("TraceStepTimeDbParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: @@ -137,7 +141,7 @@ class TraceStepTimeDbParser(BaseParser): return if TorchDb().judge_table_exist(DbConstant.TABLE_COMPUTE_TASK_INFO): sql = """ - SELECT + SELECT STRING_IDS.value, task.startNs, task.endNs, @@ -160,14 +164,14 @@ class TraceStepTimeDbParser(BaseParser): connectionId FROM COMMUNICATION_OP c ) - SELECT + SELECT comm.opName, comm.startNs, comm.endNs, t.deviceId FROM comm_info comm JOIN ( - SELECT + SELECT connectionId, deviceId FROM TASK