diff --git a/frameworks/libs/distributeddb/syncer/src/sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/sync_task_context.cpp index 01481034db348d15aed3828f6ec7d08bd684d628..905e894dca08701784b8043f710b627ed6fba962 100644 --- a/frameworks/libs/distributeddb/syncer/src/sync_task_context.cpp +++ b/frameworks/libs/distributeddb/syncer/src/sync_task_context.cpp @@ -536,15 +536,28 @@ void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId) int SyncTaskContext::TimeOut(TimerId id) { - int errCode = E_OK; if (!timeOutCallback_) { - return errCode; + return E_OK; } - if (IncUsedCount() == E_OK) { - errCode = timeOutCallback_(id); + int errCode = IncUsedCount(); + if (errCode != E_OK) { + LOGW("[SyncTaskContext][TimeOut] IncUsedCount failed! errCode=", errCode); + // if return is not E_OK, the timer will be removed + // we removed timer when context call StopTimer + return E_OK; + } + IncObjRef(this); + errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() { + timeOutCallback_(id); + SafeExit(); + DecObjRef(this); + }); + if (errCode != E_OK) { + LOGW("[SyncTaskContext][Timeout] Trigger Timeout Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode); SafeExit(); + DecObjRef(this); } - return errCode; + return E_OK; } void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp index 44a5353779d26b556dbd3d14af8ff75f0d3ab40b..c5d3841b9e96c4cb3dedbefbe951696d28bab5ba 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp @@ -42,6 +42,7 @@ namespace { const int THREE_HUNDRED = 300; const int WAIT_30_SECONDS = 30000; const int WAIT_40_SECONDS = 40000; + const int TIMEOUT_6_SECONDS = 6000; KvStoreDelegateManager g_mgr(APP_ID, USER_ID); KvStoreConfig g_config; @@ -1333,6 +1334,7 @@ HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Leve * @tc.steps: step1. deviceB set get data delay 40s */ g_deviceB->DelayGetSyncData(WAIT_40_SECONDS); + g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS); /** * @tc.steps: step2. deviceA call sync and wait diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp index 0f8bfe3d7c4df6af72b249f6b4cc12973ee9e352..e78ab129edefe2bf6d7d0a3551343c180da4e545 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp @@ -124,7 +124,7 @@ uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) uint32_t VirtualCommunicator::GetTimeout() const { - return 5 * 1000; // 5 * 1000ms + return timeout_; } uint32_t VirtualCommunicator::GetTimeout(const std::string &target) const @@ -132,6 +132,11 @@ uint32_t VirtualCommunicator::GetTimeout(const std::string &target) const return GetTimeout(); } +void VirtualCommunicator::SetTimeout(uint32_t timeout) +{ + timeout_ = timeout; +} + int VirtualCommunicator::GetLocalIdentity(std::string &outTarget) const { outTarget = deviceId_; diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h index 7cde89e498145d7ec640188f415b30465ab05761..f202d1dad60b8c803240c504da624f7c6cbc74a6 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h @@ -49,6 +49,8 @@ public: uint32_t GetTimeout() const override; uint32_t GetTimeout(const std::string &target) const override; + void SetTimeout(uint32_t timeout); + int GetLocalIdentity(std::string &outTarget) const override; int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) override; @@ -97,6 +99,8 @@ private: std::mutex onAggregatorLock_; VirtualCommunicatorAggregator *communicatorAggregator_; + + uint32_t timeout_ = 5 * 1000; // 5 * 1000ms }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp index 2131336581026d60f122ef8efaf39b78fd673d8d..7e40b97c0bd716882010664fa9cc54e11c910ebb 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp @@ -143,7 +143,7 @@ ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::strin } { std::lock_guard lock(communicatorsLock_); - communicators_.insert(std::pair(deviceId, communicator)); + communicators_.insert(std::pair(deviceId, communicator)); } OnlineDevice(deviceId); return communicator; @@ -250,4 +250,12 @@ void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId) { userId_ = userId; } + +void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout) +{ + std::lock_guard lock(communicatorsLock_); + if (communicators_.find(deviceId) != communicators_.end()) { + communicators_[deviceId]->SetTimeout(timeout); + } +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h index 22db8b586c7f60bce98f337dda3621bca0501baf..c1848a2cac9aae93f46d01890b30bc1c9742a384 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h @@ -70,6 +70,8 @@ public: void SetCurrentUserId(const std::string &userId); + void SetTimeout(const std::string &deviceId, uint32_t timeout); + ~VirtualCommunicatorAggregator() {}; VirtualCommunicatorAggregator() {}; @@ -77,7 +79,7 @@ private: void CallSendEnd(int errCode, const OnSendEnd &onEnd); mutable std::mutex communicatorsLock_; - std::map communicators_; + std::map communicators_; std::string remoteDeviceId_ = "real_device"; std::mutex blockLock_; std::condition_variable conditionVar_;