From 22775e02bd5c85796f7ad93531c1e2a6005e196e Mon Sep 17 00:00:00 2001 From: wangqiang160 Date: Tue, 12 Nov 2024 20:31:20 +0800 Subject: [PATCH] opt memcpyasync by aclrtMemcpyAsyncWithCondition --- third_party/acl/inc/acl/acl_rt.h | 33 +++++++++++ third_party/acl/libs/acl.cpp | 4 +- torch_npu/csrc/aten/common/CopyKernel.cpp | 3 +- .../csrc/aten/common/CopyMemoryKernel.cpp | 6 +- torch_npu/csrc/aten/common/ResizeNpu.h | 4 +- .../csrc/aten/ops/op_api/CopyKernelOpApi.cpp | 3 +- .../core/npu/THNPUCachingHostAllocator.cpp | 37 +++++++++---- .../csrc/core/npu/interface/AclInterface.cpp | 29 ++++++++++ .../csrc/core/npu/interface/AclInterface.h | 5 ++ .../npu/interface/AsyncTaskQueueInterface.cpp | 3 +- torch_npu/csrc/framework/OpParamMaker.cpp | 6 +- .../csrc/framework/utils/CalcuOpUtil.cpp | 55 ++++--------------- torch_npu/csrc/framework/utils/CalcuOpUtil.h | 9 --- 13 files changed, 120 insertions(+), 77 deletions(-) diff --git a/third_party/acl/inc/acl/acl_rt.h b/third_party/acl/inc/acl/acl_rt.h index 33052829bf..a01610ebe2 100644 --- a/third_party/acl/inc/acl/acl_rt.h +++ b/third_party/acl/inc/acl/acl_rt.h @@ -903,6 +903,39 @@ ACL_FUNC_VISIBILITY aclError aclrtMemcpyAsync(void *dst, aclrtMemcpyKind kind, aclrtStream stream); +/** + * @ingroup AscendCL + * @brief Asynchronous memory replication between Host and Device, would + * be synchronous if memory is not allocated via calling acl or rts api. + * + * @par Function + * After calling this interface and memory is allocated via calling acl or rts api, + * be sure to call the aclrtSynchronizeStream interface to ensure that + * the task of memory replication has been completed + * + * @par Restriction + * @li For on-chip Device-to-Device memory copy, + * both the source and destination addresses must be 64-byte aligned + * + * @param dst [IN] destination address pointer + * @param destMax [IN] Max length of destination address memory + * @param src [IN] source address pointer + * @param count [IN] the number of byte to copy + * @param kind [IN] memcpy type + * @param stream [IN] asynchronized task stream + * + * @retval ACL_SUCCESS The function is successfully executed. + * @retval OtherValues Failure + * + * @see aclrtSynchronizeStream + */ +ACL_FUNC_VISIBILITY aclError aclrtMemcpyAsyncWithCondition(void *dst, + size_t destMax, + const void *src, + size_t count, + aclrtMemcpyKind kind, + aclrtStream stream); + /** * @ingroup AscendCL * @brief synchronous memory replication of two-dimensional matrix between host and device diff --git a/third_party/acl/libs/acl.cpp b/third_party/acl/libs/acl.cpp index 4f24e6bf04..5166b1949f 100644 --- a/third_party/acl/libs/acl.cpp +++ b/third_party/acl/libs/acl.cpp @@ -44,7 +44,9 @@ aclError aclrtMalloc(void **devPtr, size_t size, aclrtMemMallocPolicy policy){re aclError aclrtMallocAlign32(void **devPtr, size_t size, aclrtMemMallocPolicy policy){return 0;} aclError aclrtMemcpy(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind){return 0;} aclError aclrtMemcpyAsync(void *dst, size_t destMax, const void *src, - size_t count, aclrtMemcpyKind kind, aclrtStream stream){return 0;} + size_t count, aclrtMemcpyKind kind, aclrtStream stream) { return 0; } +aclError aclrtMemcpyAsyncWithCondition(void *dst, size_t destMax, const void *src, + size_t count, aclrtMemcpyKind kind, aclrtStream stream) { return 0; } aclError aclrtMallocHost(void **hostPtr, size_t size){return 0;} aclError aclrtFreeHost(void *hostPtr){return 0;} aclError aclrtGetMemInfo(aclrtMemAttr attr, size_t *free, size_t *total){return 0;} diff --git a/torch_npu/csrc/aten/common/CopyKernel.cpp b/torch_npu/csrc/aten/common/CopyKernel.cpp index 80c60ed9a0..42c0a164a4 100644 --- a/torch_npu/csrc/aten/common/CopyKernel.cpp +++ b/torch_npu/csrc/aten/common/CopyKernel.cpp @@ -11,6 +11,7 @@ #include "torch_npu/csrc/framework/StorageDescHelper.h" #include "torch_npu/csrc/aten/common/FormatCastHelper.h" #include "torch_npu/csrc/aten/common/InnerNpuNativeFunction.h" +#include "torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.h" #include "torch_npu/csrc/core/npu/THNPUCachingHostAllocator.h" #include "torch_npu/csrc/aten/NPUNativeFunctions.h" #include "torch_npu/csrc/aten/CustomFunctions.h" @@ -90,7 +91,7 @@ void copy_between_host_and_device( c10_npu::NPUStream stream = c10_npu::getCurrentNPUStream(); if (non_blocking) { - auto ret = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch(dst, nbytes, src, nbytes, kind); + auto ret = c10_npu::queue::LaunchAsyncCopyTask(dst.data_ptr(), nbytes, src.data_ptr(), nbytes, kind); NPU_CHECK_ERROR(ret); ASCEND_LOGD("non_blocking copy without StreamSynchronize."); void* ptr = torch_npu::utils::is_npu(dst) ? get_base_data_ptr(src) : get_base_data_ptr(dst); diff --git a/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp b/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp index b8a76a6441..b01f201490 100644 --- a/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp +++ b/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp @@ -51,10 +51,10 @@ at::Tensor& NPUNativeFunctions::copy_memory_(at::Tensor& self, const at::Tensor& // Designed for the gather of tensors, ignoring npu_format_ and // copying continuous memory between npu tensors. - auto ret = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch( - self, + auto ret = c10_npu::queue::LaunchAsyncCopyTask( + self.data_ptr(), dst_size * self.itemsize(), - src, + src.data_ptr(), dst_size * self.itemsize(), ACL_MEMCPY_DEVICE_TO_DEVICE); NPU_CHECK_ERROR(ret); diff --git a/torch_npu/csrc/aten/common/ResizeNpu.h b/torch_npu/csrc/aten/common/ResizeNpu.h index ad422e1f3c..d8645845bf 100644 --- a/torch_npu/csrc/aten/common/ResizeNpu.h +++ b/torch_npu/csrc/aten/common/ResizeNpu.h @@ -83,8 +83,8 @@ static void storage_resize_npu( copy_size = static_cast(storage.nbytes()); } if (copy_size > 0) { - aclError error = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch( - storage, + aclError error = c10_npu::queue::LaunchAsyncCopyTask( + const_cast(storage.data()), copy_size, old_data.get(), copy_size, diff --git a/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp b/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp index 8030bd42fe..632a4ff1c0 100644 --- a/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp +++ b/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp @@ -19,6 +19,7 @@ #include "torch_npu/csrc/framework/utils/CalcuOpUtil.h" #include "torch_npu/csrc/framework/contiguous/ContiguousOpt.h" #include "torch_npu/csrc/aten/common/InnerNpuNativeFunction.h" +#include "torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.h" #include "torch_npu/csrc/core/npu/THNPUCachingHostAllocator.h" #include "torch_npu/csrc/aten/NPUOpApiNativeFunctions.h" #include "torch_npu/csrc/aten/NPUNativeFunctions.h" @@ -41,7 +42,7 @@ void copy_between_host_and_device_opapi(at::Tensor& dst, const at::Tensor& src, c10_npu::NPUStream stream = c10_npu::getCurrentNPUStream(); if (non_blocking) { - auto ret = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch(dst, nbytes, src, nbytes, kind); + auto ret = c10_npu::queue::LaunchAsyncCopyTask(dst.data_ptr(), nbytes, src.data_ptr(), nbytes, kind); NPU_CHECK_ERROR(ret); ASCEND_LOGD("non_blocking copy without StreamSynchronize."); void* ptr = torch_npu::utils::is_npu(dst) ? get_base_data_ptr(src) : get_base_data_ptr(dst); diff --git a/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp b/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp index 15645f966b..a2933eaca8 100644 --- a/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp +++ b/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp @@ -174,14 +174,32 @@ struct HostAllocator { return ACL_ERROR_NONE; } - aclError recordEvent(void *ptr, c10_npu::NPUStream stream) - { - std::lock_guard lock(mutex); + aclError recordEvent(void *ptr, c10_npu::NPUStream stream) { + std::lock_guard lock(mutex); - auto it = blocks.find(ptr); - if (it == blocks.end()) { - // Sync when host memory is allocated by malloc - aclError error = c10_npu::acl::AclrtSynchronizeStreamWithTimeout(stream); + const auto it = blocks.find(ptr); + if (it != blocks.end()) { + // Ensure this passed-in pointer is valid and safe to access. + Block &block = it->second; + AT_ASSERT(block.allocated, PTA_ERROR(ErrCode::VALUE)); + + block.streams.insert(stream); + return ACL_ERROR_NONE; + } + + // The remaining memory ptr do not correspond to a pinned host memory allocation. + // So we need to add stream sync to ensure the host memory is still valid during memory copy async. + static const auto exit_memcpy_async_with_condition = c10_npu::acl::IsExistMemcpyAsyncWithCondition(); + if (exit_memcpy_async_with_condition) { + // The new API "aclrtMemcpyAsyncWithCondition" can do sync for memory that is allocated by malloc. + static const auto task_queue_enable = c10_npu::option::OptionsManager::GetTaskQueueEnable(); + if (task_queue_enable) { + // Attention: when task queue is enabled,empty queue by "stream()" is enough + // for "aclrtMemcpyAsyncWithCondition", no need to sync stream. + stream.stream(); + } + } else { + const aclError error = c10_npu::acl::AclrtSynchronizeStreamWithTimeout(stream); if (error != ACL_ERROR_NONE) { CHECK_AND_THROW_FORCE_STOP(error); CHECK_AND_THROW_UCE_ERROR(error); @@ -189,13 +207,8 @@ struct HostAllocator { AT_ERROR("ACL stream synchronize failed."); return error; } - return ACL_ERROR_NONE; } - Block &block = it->second; - AT_ASSERT(block.allocated, PTA_ERROR(ErrCode::VALUE)); - - block.streams.insert(stream); return ACL_ERROR_NONE; } diff --git a/torch_npu/csrc/core/npu/interface/AclInterface.cpp b/torch_npu/csrc/core/npu/interface/AclInterface.cpp index e1437c92d2..6af6d2b3bf 100644 --- a/torch_npu/csrc/core/npu/interface/AclInterface.cpp +++ b/torch_npu/csrc/core/npu/interface/AclInterface.cpp @@ -69,6 +69,7 @@ LOAD_FUNCTION(aclrtGetLastError) LOAD_FUNCTION(aclrtPeekAtLastError) LOAD_FUNCTION(aclrtSynchronizeDevice) LOAD_FUNCTION(aclrtSynchronizeDeviceWithTimeout) +LOAD_FUNCTION(aclrtMemcpyAsyncWithCondition) aclprofStepInfoPtr init_stepinfo() { typedef aclprofStepInfoPtr(*npdInitFunc)(); @@ -667,5 +668,33 @@ aclError AclrtSynchronizeDeviceWithTimeout(void) } } +bool IsExistMemcpyAsyncWithCondition() +{ + typedef aclError(*aclrtMemcpyAsyncWithConditionFunc)(void *, size_t, const void *, size_t, aclrtMemcpyKind, + aclrtStream); + static aclrtMemcpyAsyncWithConditionFunc func = + (aclrtMemcpyAsyncWithConditionFunc) GET_FUNC(aclrtMemcpyAsyncWithCondition); + return func != nullptr; +} + +aclError AclrtMemcpyAsyncWithCondition(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind, + aclrtStream stream) +{ + typedef aclError(*aclrtMemcpyAsyncWithConditionFunc)(void *, size_t, const void *, size_t, aclrtMemcpyKind, + aclrtStream); + static aclrtMemcpyAsyncWithConditionFunc func = nullptr; + if (func == nullptr) { + func = (aclrtMemcpyAsyncWithConditionFunc) GET_FUNC(aclrtMemcpyAsyncWithCondition); + } + + aclError ret; + if (func != nullptr) { + ret = func(dst, destMax, src, count, kind, stream); + } else { + ret = aclrtMemcpyAsync(dst, destMax, src, count, kind, stream); + } + return ret; +} + } // namespace acl } // namespace c10 diff --git a/torch_npu/csrc/core/npu/interface/AclInterface.h b/torch_npu/csrc/core/npu/interface/AclInterface.h index af62ffedc5..65711ad972 100644 --- a/torch_npu/csrc/core/npu/interface/AclInterface.h +++ b/torch_npu/csrc/core/npu/interface/AclInterface.h @@ -176,5 +176,10 @@ aclError AclStressDetect(int32_t deviceId, void *workspace, size_t workspaceSize aclError AclrtSynchronizeDeviceWithTimeout(void); +bool IsExistMemcpyAsyncWithCondition(); + +aclError AclrtMemcpyAsyncWithCondition(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind, + aclrtStream stream); + } // namespace acl } // namespace c10_npu diff --git a/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp b/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp index 1281d6db5a..b45ee6c9a3 100644 --- a/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp +++ b/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp @@ -1,4 +1,5 @@ #include "AsyncTaskQueueInterface.h" +#include "torch_npu/csrc/core/npu/interface/AclInterface.h" #include "torch_npu/csrc/core/npu/NPUEventManager.h" #include "torch_npu/csrc/core/npu/register/OptionsManager.h" #include @@ -93,7 +94,7 @@ void AsyncCopyTask::LaunchCopyTask() { #endif } else { c10_npu::NPUStream stream = c10_npu::getCurrentNPUStream(); - NPU_CHECK_ERROR(aclrtMemcpyAsync( + NPU_CHECK_ERROR(c10_npu::acl::AclrtMemcpyAsyncWithCondition( copyParam_.dst, copyParam_.dstLen, copyParam_.src, diff --git a/torch_npu/csrc/framework/OpParamMaker.cpp b/torch_npu/csrc/framework/OpParamMaker.cpp index a30c142008..7ff928d6c4 100644 --- a/torch_npu/csrc/framework/OpParamMaker.cpp +++ b/torch_npu/csrc/framework/OpParamMaker.cpp @@ -421,15 +421,15 @@ int ExecFuncOpApi(c10_npu::queue::QueueParas *in, aclrtStream stream) int MemcopyAsyncFunc(c10_npu::queue::QueueParas *in, aclrtStream stream) { auto cur_paras = static_cast(in->paramVal); - aclError ret = - aclrtMemcpyAsync(cur_paras->dst, cur_paras->dstLen, cur_paras->src, cur_paras->srcLen, cur_paras->kind, stream); + aclError ret = c10_npu::acl::AclrtMemcpyAsyncWithCondition(cur_paras->dst, cur_paras->dstLen, cur_paras->src, + cur_paras->srcLen, cur_paras->kind, stream); if (ret != ACL_ERROR_NONE) { auto ret_temp = c10_npu::acl::AclrtPeekAtLastError(ACL_RT_THREAD_LEVEL); if (ret_temp != ACL_ERROR_NONE) { ret = ret_temp; } ASCEND_LOGE( - "aclrtMemcpyAsync error! ret = %d, dstLen = %zu, srcLen = %zu, kind = %d", + "aclrtMemcpyAsyncWithCondition error! ret = %d, dstLen = %zu, srcLen = %zu, kind = %d", ret, cur_paras->dstLen, cur_paras->srcLen, diff --git a/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp b/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp index 99ee462357..3fc7826aa0 100644 --- a/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp +++ b/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp @@ -82,19 +82,6 @@ static std::map {"uint8", ACL_UINT8}, {"uint64", ACL_UINT64}, {"string", ACL_STRING}}; - -aclError AclrtMemcpyAsyncParamCheck(void *dst, size_t destMax, const void *src, - size_t count, aclrtMemcpyKind kind, - aclrtStream stream) { - auto ret = aclrtMemcpyAsync(dst, destMax, src, count, kind, stream); - return ret; -} - -aclError AclrtMemcpyParamCheck(void *dst, size_t destMax, const void *src, - size_t count, aclrtMemcpyKind kind) { - auto ret = aclrtMemcpy(dst, destMax, src, count, kind); - return ret; -} } // namespace namespace at_npu { @@ -187,49 +174,29 @@ aclError CalcuOpUtil::AclrtMemcpyWithModeSwitch( size_t dstMax, const StorageAndOffsetMemSizePair &src, size_t count, aclrtMemcpyKind kind) { - void *dst_ptr = static_cast( - static_cast(const_cast(dst.first->data())) + dst.second); - void *src_ptr = static_cast( - static_cast(const_cast(src.first->data())) + src.second); - return AclrtMemcpyParamCheck(dst_ptr, dstMax, const_cast(src_ptr), - count, kind); + void *dst_ptr = static_cast( + static_cast(const_cast(dst.first->data())) + dst.second); + void *src_ptr = static_cast( + static_cast(const_cast(src.first->data())) + src.second); + return aclrtMemcpy(dst_ptr, dstMax, const_cast(src_ptr), count, kind); } aclError CalcuOpUtil::AclrtMemcpyWithModeSwitch( const StorageAndOffsetMemSizePair &dst, size_t dstMax, const void *src, size_t count, aclrtMemcpyKind kind) { - void *dst_ptr = static_cast( - static_cast(const_cast(dst.first->data())) + dst.second); - return AclrtMemcpyParamCheck(dst_ptr, dstMax, src, count, kind); + void *dst_ptr = static_cast( + static_cast(const_cast(dst.first->data())) + dst.second); + return aclrtMemcpy(dst_ptr, dstMax, src, count, kind); } aclError CalcuOpUtil::AclrtMemcpyWithModeSwitch( void *dst, size_t dstMax, const StorageAndOffsetMemSizePair &src, size_t count, aclrtMemcpyKind kind) { - void *src_ptr = static_cast( - static_cast(const_cast(src.first->data())) + src.second); - return AclrtMemcpyParamCheck(dst, dstMax, const_cast(src_ptr), count, - kind); -} - -aclError CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch(const at::Tensor &dst, - size_t dstMax, - const at::Tensor &src, - size_t count, - aclrtMemcpyKind kind) { - aclError ret = c10_npu::queue::LaunchAsyncCopyTask( - dst.data_ptr(), dstMax, src.data_ptr(), count, kind); - return ret; -} - -aclError CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch( - const c10::StorageImpl &dst, size_t dstMax, void *src, size_t count, - aclrtMemcpyKind kind) { - aclError ret = - c10_npu::queue::LaunchAsyncCopyTask(const_cast(dst.data()), dstMax, src, count, kind); - return ret; + void *src_ptr = static_cast( + static_cast(const_cast(src.first->data())) + src.second); + return aclrtMemcpy(dst, dstMax, const_cast(src_ptr), count, kind); } int64_t CalcuOpUtil::GetTensorNpuFormat(const at::Tensor &tensor) { diff --git a/torch_npu/csrc/framework/utils/CalcuOpUtil.h b/torch_npu/csrc/framework/utils/CalcuOpUtil.h index 61bdd9b2f3..3e6c5740e2 100644 --- a/torch_npu/csrc/framework/utils/CalcuOpUtil.h +++ b/torch_npu/csrc/framework/utils/CalcuOpUtil.h @@ -80,15 +80,6 @@ public: AclrtMemcpyWithModeSwitch(void *dst, size_t dstMax, const StorageAndOffsetMemSizePair &src, size_t count, aclrtMemcpyKind kind); - static aclError LaunchAsyncCopyTaskWithModeSwitch(const at::Tensor &dst, - size_t dstMax, - const at::Tensor &src, - size_t count, - aclrtMemcpyKind kind); - static aclError LaunchAsyncCopyTaskWithModeSwitch(const c10::StorageImpl &dst, - size_t dstMax, void *src, - size_t count, - aclrtMemcpyKind kind); static void CheckMemoryOverLaps(c10::ArrayRef inputs, c10::ArrayRef outputs); -- Gitee