diff --git a/third_party/acl/inc/acl/acl_rt.h b/third_party/acl/inc/acl/acl_rt.h index 33052829bfcd9ccb11d6df2ae2c44fa43124fcca..a01610ebe2a9bd4ef01efba01b521bd67106f8bb 100644 --- a/third_party/acl/inc/acl/acl_rt.h +++ b/third_party/acl/inc/acl/acl_rt.h @@ -903,6 +903,39 @@ ACL_FUNC_VISIBILITY aclError aclrtMemcpyAsync(void *dst, aclrtMemcpyKind kind, aclrtStream stream); +/** + * @ingroup AscendCL + * @brief Asynchronous memory replication between Host and Device, would + * be synchronous if memory is not allocated via calling acl or rts api. + * + * @par Function + * After calling this interface and memory is allocated via calling acl or rts api, + * be sure to call the aclrtSynchronizeStream interface to ensure that + * the task of memory replication has been completed + * + * @par Restriction + * @li For on-chip Device-to-Device memory copy, + * both the source and destination addresses must be 64-byte aligned + * + * @param dst [IN] destination address pointer + * @param destMax [IN] Max length of destination address memory + * @param src [IN] source address pointer + * @param count [IN] the number of byte to copy + * @param kind [IN] memcpy type + * @param stream [IN] asynchronized task stream + * + * @retval ACL_SUCCESS The function is successfully executed. + * @retval OtherValues Failure + * + * @see aclrtSynchronizeStream + */ +ACL_FUNC_VISIBILITY aclError aclrtMemcpyAsyncWithCondition(void *dst, + size_t destMax, + const void *src, + size_t count, + aclrtMemcpyKind kind, + aclrtStream stream); + /** * @ingroup AscendCL * @brief synchronous memory replication of two-dimensional matrix between host and device diff --git a/third_party/acl/libs/acl.cpp b/third_party/acl/libs/acl.cpp index 4f24e6bf043cba7c53c7015e597f5c6e82164bd6..5166b1949f91c3c030265737c6a5003a193fc603 100644 --- a/third_party/acl/libs/acl.cpp +++ b/third_party/acl/libs/acl.cpp @@ -44,7 +44,9 @@ aclError aclrtMalloc(void **devPtr, size_t size, aclrtMemMallocPolicy policy){re aclError aclrtMallocAlign32(void **devPtr, size_t size, aclrtMemMallocPolicy policy){return 0;} aclError aclrtMemcpy(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind){return 0;} aclError aclrtMemcpyAsync(void *dst, size_t destMax, const void *src, - size_t count, aclrtMemcpyKind kind, aclrtStream stream){return 0;} + size_t count, aclrtMemcpyKind kind, aclrtStream stream) { return 0; } +aclError aclrtMemcpyAsyncWithCondition(void *dst, size_t destMax, const void *src, + size_t count, aclrtMemcpyKind kind, aclrtStream stream) { return 0; } aclError aclrtMallocHost(void **hostPtr, size_t size){return 0;} aclError aclrtFreeHost(void *hostPtr){return 0;} aclError aclrtGetMemInfo(aclrtMemAttr attr, size_t *free, size_t *total){return 0;} diff --git a/torch_npu/csrc/aten/common/CopyKernel.cpp b/torch_npu/csrc/aten/common/CopyKernel.cpp index 80c60ed9a0ad70b30c4bc55ff1b9c3681ef63bff..42c0a164a4e58dbc5e2649327c4ea5561f90de17 100644 --- a/torch_npu/csrc/aten/common/CopyKernel.cpp +++ b/torch_npu/csrc/aten/common/CopyKernel.cpp @@ -11,6 +11,7 @@ #include "torch_npu/csrc/framework/StorageDescHelper.h" #include "torch_npu/csrc/aten/common/FormatCastHelper.h" #include "torch_npu/csrc/aten/common/InnerNpuNativeFunction.h" +#include "torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.h" #include "torch_npu/csrc/core/npu/THNPUCachingHostAllocator.h" #include "torch_npu/csrc/aten/NPUNativeFunctions.h" #include "torch_npu/csrc/aten/CustomFunctions.h" @@ -90,7 +91,7 @@ void copy_between_host_and_device( c10_npu::NPUStream stream = c10_npu::getCurrentNPUStream(); if (non_blocking) { - auto ret = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch(dst, nbytes, src, nbytes, kind); + auto ret = c10_npu::queue::LaunchAsyncCopyTask(dst.data_ptr(), nbytes, src.data_ptr(), nbytes, kind); NPU_CHECK_ERROR(ret); ASCEND_LOGD("non_blocking copy without StreamSynchronize."); void* ptr = torch_npu::utils::is_npu(dst) ? get_base_data_ptr(src) : get_base_data_ptr(dst); diff --git a/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp b/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp index b8a76a644187f5864e0d0b671677534e3ac2e909..b01f201490f8fc3088868da1d06d249955c30342 100644 --- a/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp +++ b/torch_npu/csrc/aten/common/CopyMemoryKernel.cpp @@ -51,10 +51,10 @@ at::Tensor& NPUNativeFunctions::copy_memory_(at::Tensor& self, const at::Tensor& // Designed for the gather of tensors, ignoring npu_format_ and // copying continuous memory between npu tensors. - auto ret = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch( - self, + auto ret = c10_npu::queue::LaunchAsyncCopyTask( + self.data_ptr(), dst_size * self.itemsize(), - src, + src.data_ptr(), dst_size * self.itemsize(), ACL_MEMCPY_DEVICE_TO_DEVICE); NPU_CHECK_ERROR(ret); diff --git a/torch_npu/csrc/aten/common/ResizeNpu.h b/torch_npu/csrc/aten/common/ResizeNpu.h index ad422e1f3c61197f64da322ac9b067bd1e0f6101..d8645845bfe4cd9b23e211bb965b12c22a90ba98 100644 --- a/torch_npu/csrc/aten/common/ResizeNpu.h +++ b/torch_npu/csrc/aten/common/ResizeNpu.h @@ -83,8 +83,8 @@ static void storage_resize_npu( copy_size = static_cast(storage.nbytes()); } if (copy_size > 0) { - aclError error = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch( - storage, + aclError error = c10_npu::queue::LaunchAsyncCopyTask( + const_cast(storage.data()), copy_size, old_data.get(), copy_size, diff --git a/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp b/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp index 8030bd42fe9a7fb9a4040071f1ceaba40e7e5e90..632a4ff1c07a50a8cab315d624284c83f19a8058 100644 --- a/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp +++ b/torch_npu/csrc/aten/ops/op_api/CopyKernelOpApi.cpp @@ -19,6 +19,7 @@ #include "torch_npu/csrc/framework/utils/CalcuOpUtil.h" #include "torch_npu/csrc/framework/contiguous/ContiguousOpt.h" #include "torch_npu/csrc/aten/common/InnerNpuNativeFunction.h" +#include "torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.h" #include "torch_npu/csrc/core/npu/THNPUCachingHostAllocator.h" #include "torch_npu/csrc/aten/NPUOpApiNativeFunctions.h" #include "torch_npu/csrc/aten/NPUNativeFunctions.h" @@ -41,7 +42,7 @@ void copy_between_host_and_device_opapi(at::Tensor& dst, const at::Tensor& src, c10_npu::NPUStream stream = c10_npu::getCurrentNPUStream(); if (non_blocking) { - auto ret = CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch(dst, nbytes, src, nbytes, kind); + auto ret = c10_npu::queue::LaunchAsyncCopyTask(dst.data_ptr(), nbytes, src.data_ptr(), nbytes, kind); NPU_CHECK_ERROR(ret); ASCEND_LOGD("non_blocking copy without StreamSynchronize."); void* ptr = torch_npu::utils::is_npu(dst) ? get_base_data_ptr(src) : get_base_data_ptr(dst); diff --git a/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp b/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp index 15645f966b65e2166d637a5c890493877d7ee903..a2933eaca89225b7cdd7e8a43da6cca21f1e1091 100644 --- a/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp +++ b/torch_npu/csrc/core/npu/THNPUCachingHostAllocator.cpp @@ -174,14 +174,32 @@ struct HostAllocator { return ACL_ERROR_NONE; } - aclError recordEvent(void *ptr, c10_npu::NPUStream stream) - { - std::lock_guard lock(mutex); + aclError recordEvent(void *ptr, c10_npu::NPUStream stream) { + std::lock_guard lock(mutex); - auto it = blocks.find(ptr); - if (it == blocks.end()) { - // Sync when host memory is allocated by malloc - aclError error = c10_npu::acl::AclrtSynchronizeStreamWithTimeout(stream); + const auto it = blocks.find(ptr); + if (it != blocks.end()) { + // Ensure this passed-in pointer is valid and safe to access. + Block &block = it->second; + AT_ASSERT(block.allocated, PTA_ERROR(ErrCode::VALUE)); + + block.streams.insert(stream); + return ACL_ERROR_NONE; + } + + // The remaining memory ptr do not correspond to a pinned host memory allocation. + // So we need to add stream sync to ensure the host memory is still valid during memory copy async. + static const auto exit_memcpy_async_with_condition = c10_npu::acl::IsExistMemcpyAsyncWithCondition(); + if (exit_memcpy_async_with_condition) { + // The new API "aclrtMemcpyAsyncWithCondition" can do sync for memory that is allocated by malloc. + static const auto task_queue_enable = c10_npu::option::OptionsManager::GetTaskQueueEnable(); + if (task_queue_enable) { + // Attention: when task queue is enabled,empty queue by "stream()" is enough + // for "aclrtMemcpyAsyncWithCondition", no need to sync stream. + stream.stream(); + } + } else { + const aclError error = c10_npu::acl::AclrtSynchronizeStreamWithTimeout(stream); if (error != ACL_ERROR_NONE) { CHECK_AND_THROW_FORCE_STOP(error); CHECK_AND_THROW_UCE_ERROR(error); @@ -189,13 +207,8 @@ struct HostAllocator { AT_ERROR("ACL stream synchronize failed."); return error; } - return ACL_ERROR_NONE; } - Block &block = it->second; - AT_ASSERT(block.allocated, PTA_ERROR(ErrCode::VALUE)); - - block.streams.insert(stream); return ACL_ERROR_NONE; } diff --git a/torch_npu/csrc/core/npu/interface/AclInterface.cpp b/torch_npu/csrc/core/npu/interface/AclInterface.cpp index e1437c92d258ecad4cfb6a52353cd061f5fb8a7d..6af6d2b3bf360869acb0e871bfc2c4126fb525c4 100644 --- a/torch_npu/csrc/core/npu/interface/AclInterface.cpp +++ b/torch_npu/csrc/core/npu/interface/AclInterface.cpp @@ -69,6 +69,7 @@ LOAD_FUNCTION(aclrtGetLastError) LOAD_FUNCTION(aclrtPeekAtLastError) LOAD_FUNCTION(aclrtSynchronizeDevice) LOAD_FUNCTION(aclrtSynchronizeDeviceWithTimeout) +LOAD_FUNCTION(aclrtMemcpyAsyncWithCondition) aclprofStepInfoPtr init_stepinfo() { typedef aclprofStepInfoPtr(*npdInitFunc)(); @@ -667,5 +668,33 @@ aclError AclrtSynchronizeDeviceWithTimeout(void) } } +bool IsExistMemcpyAsyncWithCondition() +{ + typedef aclError(*aclrtMemcpyAsyncWithConditionFunc)(void *, size_t, const void *, size_t, aclrtMemcpyKind, + aclrtStream); + static aclrtMemcpyAsyncWithConditionFunc func = + (aclrtMemcpyAsyncWithConditionFunc) GET_FUNC(aclrtMemcpyAsyncWithCondition); + return func != nullptr; +} + +aclError AclrtMemcpyAsyncWithCondition(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind, + aclrtStream stream) +{ + typedef aclError(*aclrtMemcpyAsyncWithConditionFunc)(void *, size_t, const void *, size_t, aclrtMemcpyKind, + aclrtStream); + static aclrtMemcpyAsyncWithConditionFunc func = nullptr; + if (func == nullptr) { + func = (aclrtMemcpyAsyncWithConditionFunc) GET_FUNC(aclrtMemcpyAsyncWithCondition); + } + + aclError ret; + if (func != nullptr) { + ret = func(dst, destMax, src, count, kind, stream); + } else { + ret = aclrtMemcpyAsync(dst, destMax, src, count, kind, stream); + } + return ret; +} + } // namespace acl } // namespace c10 diff --git a/torch_npu/csrc/core/npu/interface/AclInterface.h b/torch_npu/csrc/core/npu/interface/AclInterface.h index af62ffedc58ee67d03ec3a47c4f93a9fb30251ba..65711ad972f125e5944d722b7307de7baaad1d7e 100644 --- a/torch_npu/csrc/core/npu/interface/AclInterface.h +++ b/torch_npu/csrc/core/npu/interface/AclInterface.h @@ -176,5 +176,10 @@ aclError AclStressDetect(int32_t deviceId, void *workspace, size_t workspaceSize aclError AclrtSynchronizeDeviceWithTimeout(void); +bool IsExistMemcpyAsyncWithCondition(); + +aclError AclrtMemcpyAsyncWithCondition(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind, + aclrtStream stream); + } // namespace acl } // namespace c10_npu diff --git a/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp b/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp index 1281d6db5aa9312f9bd429f881da32916ad128ea..b45ee6c9a3ad3e4830ae046634887cf8d90d741a 100644 --- a/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp +++ b/torch_npu/csrc/core/npu/interface/AsyncTaskQueueInterface.cpp @@ -1,4 +1,5 @@ #include "AsyncTaskQueueInterface.h" +#include "torch_npu/csrc/core/npu/interface/AclInterface.h" #include "torch_npu/csrc/core/npu/NPUEventManager.h" #include "torch_npu/csrc/core/npu/register/OptionsManager.h" #include @@ -93,7 +94,7 @@ void AsyncCopyTask::LaunchCopyTask() { #endif } else { c10_npu::NPUStream stream = c10_npu::getCurrentNPUStream(); - NPU_CHECK_ERROR(aclrtMemcpyAsync( + NPU_CHECK_ERROR(c10_npu::acl::AclrtMemcpyAsyncWithCondition( copyParam_.dst, copyParam_.dstLen, copyParam_.src, diff --git a/torch_npu/csrc/framework/OpParamMaker.cpp b/torch_npu/csrc/framework/OpParamMaker.cpp index a30c1420087ab3d9a54851c469e1db15d5712f65..7ff928d6c4ce8c8886f4d3a960a968901ebcc2a8 100644 --- a/torch_npu/csrc/framework/OpParamMaker.cpp +++ b/torch_npu/csrc/framework/OpParamMaker.cpp @@ -421,15 +421,15 @@ int ExecFuncOpApi(c10_npu::queue::QueueParas *in, aclrtStream stream) int MemcopyAsyncFunc(c10_npu::queue::QueueParas *in, aclrtStream stream) { auto cur_paras = static_cast(in->paramVal); - aclError ret = - aclrtMemcpyAsync(cur_paras->dst, cur_paras->dstLen, cur_paras->src, cur_paras->srcLen, cur_paras->kind, stream); + aclError ret = c10_npu::acl::AclrtMemcpyAsyncWithCondition(cur_paras->dst, cur_paras->dstLen, cur_paras->src, + cur_paras->srcLen, cur_paras->kind, stream); if (ret != ACL_ERROR_NONE) { auto ret_temp = c10_npu::acl::AclrtPeekAtLastError(ACL_RT_THREAD_LEVEL); if (ret_temp != ACL_ERROR_NONE) { ret = ret_temp; } ASCEND_LOGE( - "aclrtMemcpyAsync error! ret = %d, dstLen = %zu, srcLen = %zu, kind = %d", + "aclrtMemcpyAsyncWithCondition error! ret = %d, dstLen = %zu, srcLen = %zu, kind = %d", ret, cur_paras->dstLen, cur_paras->srcLen, diff --git a/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp b/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp index 99ee4623575e97dedee07bbba48515ad840f43b1..3fc7826aa00b73fca14c7d45d1fc2ec13c10d8af 100644 --- a/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp +++ b/torch_npu/csrc/framework/utils/CalcuOpUtil.cpp @@ -82,19 +82,6 @@ static std::map {"uint8", ACL_UINT8}, {"uint64", ACL_UINT64}, {"string", ACL_STRING}}; - -aclError AclrtMemcpyAsyncParamCheck(void *dst, size_t destMax, const void *src, - size_t count, aclrtMemcpyKind kind, - aclrtStream stream) { - auto ret = aclrtMemcpyAsync(dst, destMax, src, count, kind, stream); - return ret; -} - -aclError AclrtMemcpyParamCheck(void *dst, size_t destMax, const void *src, - size_t count, aclrtMemcpyKind kind) { - auto ret = aclrtMemcpy(dst, destMax, src, count, kind); - return ret; -} } // namespace namespace at_npu { @@ -187,49 +174,29 @@ aclError CalcuOpUtil::AclrtMemcpyWithModeSwitch( size_t dstMax, const StorageAndOffsetMemSizePair &src, size_t count, aclrtMemcpyKind kind) { - void *dst_ptr = static_cast( - static_cast(const_cast(dst.first->data())) + dst.second); - void *src_ptr = static_cast( - static_cast(const_cast(src.first->data())) + src.second); - return AclrtMemcpyParamCheck(dst_ptr, dstMax, const_cast(src_ptr), - count, kind); + void *dst_ptr = static_cast( + static_cast(const_cast(dst.first->data())) + dst.second); + void *src_ptr = static_cast( + static_cast(const_cast(src.first->data())) + src.second); + return aclrtMemcpy(dst_ptr, dstMax, const_cast(src_ptr), count, kind); } aclError CalcuOpUtil::AclrtMemcpyWithModeSwitch( const StorageAndOffsetMemSizePair &dst, size_t dstMax, const void *src, size_t count, aclrtMemcpyKind kind) { - void *dst_ptr = static_cast( - static_cast(const_cast(dst.first->data())) + dst.second); - return AclrtMemcpyParamCheck(dst_ptr, dstMax, src, count, kind); + void *dst_ptr = static_cast( + static_cast(const_cast(dst.first->data())) + dst.second); + return aclrtMemcpy(dst_ptr, dstMax, src, count, kind); } aclError CalcuOpUtil::AclrtMemcpyWithModeSwitch( void *dst, size_t dstMax, const StorageAndOffsetMemSizePair &src, size_t count, aclrtMemcpyKind kind) { - void *src_ptr = static_cast( - static_cast(const_cast(src.first->data())) + src.second); - return AclrtMemcpyParamCheck(dst, dstMax, const_cast(src_ptr), count, - kind); -} - -aclError CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch(const at::Tensor &dst, - size_t dstMax, - const at::Tensor &src, - size_t count, - aclrtMemcpyKind kind) { - aclError ret = c10_npu::queue::LaunchAsyncCopyTask( - dst.data_ptr(), dstMax, src.data_ptr(), count, kind); - return ret; -} - -aclError CalcuOpUtil::LaunchAsyncCopyTaskWithModeSwitch( - const c10::StorageImpl &dst, size_t dstMax, void *src, size_t count, - aclrtMemcpyKind kind) { - aclError ret = - c10_npu::queue::LaunchAsyncCopyTask(const_cast(dst.data()), dstMax, src, count, kind); - return ret; + void *src_ptr = static_cast( + static_cast(const_cast(src.first->data())) + src.second); + return aclrtMemcpy(dst, dstMax, const_cast(src_ptr), count, kind); } int64_t CalcuOpUtil::GetTensorNpuFormat(const at::Tensor &tensor) { diff --git a/torch_npu/csrc/framework/utils/CalcuOpUtil.h b/torch_npu/csrc/framework/utils/CalcuOpUtil.h index 61bdd9b2f3c7d816fc02e4eb0044cd0252bd0df7..3e6c5740e2d49490f27211233074fb08e14fb488 100644 --- a/torch_npu/csrc/framework/utils/CalcuOpUtil.h +++ b/torch_npu/csrc/framework/utils/CalcuOpUtil.h @@ -80,15 +80,6 @@ public: AclrtMemcpyWithModeSwitch(void *dst, size_t dstMax, const StorageAndOffsetMemSizePair &src, size_t count, aclrtMemcpyKind kind); - static aclError LaunchAsyncCopyTaskWithModeSwitch(const at::Tensor &dst, - size_t dstMax, - const at::Tensor &src, - size_t count, - aclrtMemcpyKind kind); - static aclError LaunchAsyncCopyTaskWithModeSwitch(const c10::StorageImpl &dst, - size_t dstMax, void *src, - size_t count, - aclrtMemcpyKind kind); static void CheckMemoryOverLaps(c10::ArrayRef inputs, c10::ArrayRef outputs);