diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h index 22e14c8204a69f35d8e233cd4a9506538bc1d613..d685b8b5a3e86d0d47d79f3891f283f318a216f7 100644 --- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h @@ -236,9 +236,6 @@ private: std::mutex sendRecordMutex_; std::map sendRecord_; - mutable std::mutex retryCountMutex_; - std::map> retryCount_; // dev, isRetryTask, count - std::mutex sendSequenceMutex_; std::map sendSequence_; }; diff --git a/frameworks/libs/distributeddb/communicator/include/send_task_scheduler.h b/frameworks/libs/distributeddb/communicator/include/send_task_scheduler.h index d1b30194dd355deca7c8be1156214d133bfcd715..6d1a4a9abb8c560a96ac5e5466a121ac776df8e7 100644 --- a/frameworks/libs/distributeddb/communicator/include/send_task_scheduler.h +++ b/frameworks/libs/distributeddb/communicator/include/send_task_scheduler.h @@ -83,6 +83,17 @@ public: void InvalidSendTask(const std::string &target); void SetDeviceCommErrCode(const std::string &target, int deviceCommErrCode); + int32_t GetRetryCount(const std::string &dev, bool isRetryTask) const; + + bool IsRetryOutOfLimit(const std::string &target, bool isRetryTask); + + void ResetRetryCount(const std::string &dev, bool isRetryTask); + + void ResetRetryCount(const std::string &dev); + + void ResetRetryCount(); + + void IncreaseRetryCountIfNeed(const std::string &dev, bool isRetryTask); private: int ScheduleDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo); int ScheduleNoDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo); @@ -107,6 +118,8 @@ private: Priority lastSchedulePriority_ = Priority::LOW; std::map deviceCommErrCodeMap_; + + std::map> retryCount_; // dev, isRetryTask, count }; } diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 58b24ad7e539cae93f84c7bfcaba9cc86a65ec07..630386ac4866fc9e980ad5096024ff01bb4d874f 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -26,7 +26,6 @@ namespace DistributedDB { namespace { -constexpr int MAX_SEND_RETRY = 2; constexpr int RETRY_TIME_SPLIT = 4; inline std::string GetThreadId() { @@ -1114,8 +1113,6 @@ void CommunicatorAggregator::RetrySendTaskIfNeed(const std::string &target, uint if (IsRetryOutOfLimit(target, isRetryTask)) { LOGI("[CommAggr] Retry send task is out of limit! target is %.3s", target.c_str()); scheduler_.InvalidSendTask(target); - std::lock_guard autoLock(retryCountMutex_); - retryCount_[target][isRetryTask] = 0; } else { RetrySendTask(target, sendSequenceId, isRetryTask); if (sendSequenceId != GetSendSequenceId(target)) { @@ -1130,9 +1127,8 @@ void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t s { int32_t currentRetryCount = 0; { - std::lock_guard autoLock(retryCountMutex_); - retryCount_[target][isRetryTask]++; - currentRetryCount = retryCount_[target][isRetryTask]; + scheduler_.IncreaseRetryCountIfNeed(target, isRetryTask); + currentRetryCount = scheduler_.GetRetryCount(target, isRetryTask); LOGI("[CommAggr] Target %.3s retry count is %" PRId32, target.c_str(), currentRetryCount); } TimerId timerId = 0u; @@ -1151,8 +1147,7 @@ void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t s bool CommunicatorAggregator::IsRetryOutOfLimit(const std::string &target, bool isRetryTask) { - std::lock_guard autoLock(retryCountMutex_); - return retryCount_[target][isRetryTask] >= MAX_SEND_RETRY; + return scheduler_.IsRetryOutOfLimit(target, isRetryTask); } int32_t CommunicatorAggregator::GetNextRetryInterval(const std::string &target, int32_t currentRetryCount) @@ -1229,28 +1224,17 @@ int CommunicatorAggregator::ReTryDeliverAppLayerFrameOnCommunicatorNotFound(cons void CommunicatorAggregator::ResetRetryCount() { - std::lock_guard autoLock(retryCountMutex_); - retryCount_.clear(); + scheduler_.ResetRetryCount(); } void CommunicatorAggregator::ResetRetryCount(const std::string &dev) { - std::lock_guard autoLock(retryCountMutex_); - retryCount_[dev].clear(); + scheduler_.ResetRetryCount(dev); } int32_t CommunicatorAggregator::GetRetryCount(const std::string &dev, bool isRetryTask) const { - std::lock_guard autoLock(retryCountMutex_); - auto iter = retryCount_.find(dev); - if (iter == retryCount_.end()) { - return 0; - } - auto countIter = iter->second.find(isRetryTask); - if (countIter == iter->second.end()) { - return 0; - } - return countIter->second; + return scheduler_.GetRetryCount(dev, isRetryTask); } DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator) } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/send_task_scheduler.cpp b/frameworks/libs/distributeddb/communicator/src/send_task_scheduler.cpp index abdae7356d649f7772c9123111523c85f37148f8..79e423a4889aa3ae53dfefd3eacd3fafe249a0f5 100644 --- a/frameworks/libs/distributeddb/communicator/src/send_task_scheduler.cpp +++ b/frameworks/libs/distributeddb/communicator/src/send_task_scheduler.cpp @@ -25,6 +25,7 @@ namespace DistributedDB { static constexpr uint32_t MAX_CAPACITY = 67108864; // 64 M bytes static constexpr uint32_t EXTRA_CAPACITY_FOR_NORMAL_PRIORITY = 33554432; // 32 M bytes static constexpr uint32_t EXTRA_CAPACITY_FOR_HIGH_PRIORITY = 67108864; // 64 M bytes +static constexpr int MAX_SEND_RETRY = 2; SendTaskScheduler::~SendTaskScheduler() { @@ -299,6 +300,7 @@ void SendTaskScheduler::InvalidSendTask(const std::string &target) } for (auto &sendTask : taskGroupByPrio_[priority][target]) { sendTask.isValid = false; + retryCount_[target][sendTask.isRetryTask] = 0; LOGI("[Scheduler][InvalidSendTask] invalid frameId=%" PRIu32, sendTask.frameId); if ((deviceCommErrCodeMap_.count(target) == 0) || (deviceCommErrCodeMap_[target] == E_OK)) { continue; @@ -331,9 +333,10 @@ void SendTaskScheduler::SetDeviceCommErrCode(const std::string &target, int devi if (sendTask.isRetryTask) { continue; } - LOGI("[Scheduler][SetDeviceCommErrCode] Erase task that do not allow retries, target=%.3s", - target.c_str()); + LOGI("[Scheduler][SetDeviceCommErrCode] Erase task that do not allow retries, target=%.3s frameId=%" PRIu32, + target.c_str(), sendTask.frameId); sendTask.isValid = false; + retryCount_[target][sendTask.isRetryTask] = 0; if (sendTask.onEnd) { LOGI("[Scheduler][SetDeviceCommErrCode] On Send End, target=%.3s", target.c_str()); sendTask.onEnd(deviceCommErrCodeMap_[target], true); @@ -342,4 +345,60 @@ void SendTaskScheduler::SetDeviceCommErrCode(const std::string &target, int devi } } } + +int32_t SendTaskScheduler::GetRetryCount(const std::string &dev, bool isRetryTask) const +{ + std::lock_guard autoLock(overallMutex_); + auto iter = retryCount_.find(dev); + if (iter == retryCount_.end()) { + return 0; + } + auto countIter = iter->second.find(isRetryTask); + if (countIter == iter->second.end()) { + return 0; + } + return countIter->second; +} + +bool SendTaskScheduler::IsRetryOutOfLimit(const std::string &target, bool isRetryTask) +{ + std::lock_guard autoLock(overallMutex_); + return retryCount_[target][isRetryTask] >= MAX_SEND_RETRY; +} + +void SendTaskScheduler::ResetRetryCount(const std::string &dev, bool isRetryTask) +{ + std::lock_guard autoLock(overallMutex_); + retryCount_[dev][isRetryTask] = 0; +} + +void SendTaskScheduler::ResetRetryCount(const std::string &dev) +{ + std::lock_guard autoLock(overallMutex_); + retryCount_[dev].clear(); +} + +void SendTaskScheduler::ResetRetryCount() +{ + std::lock_guard autoLock(overallMutex_); + retryCount_.clear(); +} + +void SendTaskScheduler::IncreaseRetryCountIfNeed(const std::string &dev, bool isRetryTask) +{ + SendTask task; + SendTaskInfo taskInfo; + uint32_t totalLength = 0; + int errCode = ScheduleOutSendTask(task, taskInfo, totalLength); + if (errCode != E_OK) { + LOGE("[SendTaskScheduler] schedule %.3s task failed %d", dev.c_str(), errCode); + return; + } + if (!task.isValid) { + LOGI("[SendTaskScheduler] %.3s task has been invalid", dev.c_str()); + return; + } + std::lock_guard autoLock(overallMutex_); + retryCount_[dev][isRetryTask]++; +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp index 4cc6abafcbf5121223d39ef1fdc37bdadf569b80..90099e1513b2266ed9b3c6abf78f939b216e1934 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp @@ -1494,7 +1494,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, DoOnSendEndByTaskIfNeedTest002, Test aggregator->Finalize(); } -void TriggerSendMsg(const std::shared_ptr &aggregator, const OnSendEnd &onEnd) +void TriggerSendMsg(const std::shared_ptr &aggregator, const OnSendEnd &onEnd, + bool isRetryTask = true) { Message *msg = BuildRegedTinyMessage(); ASSERT_NE(msg, nullptr); @@ -1507,6 +1508,7 @@ void TriggerSendMsg(const std::shared_ptr &aggregator, c FrameType inType = FrameType::APPLICATION_MESSAGE; TaskConfig config; config.timeout = 1000; // timeout is 1000ms + config.isRetryTask = isRetryTask; error = aggregator->ScheduleSendTask(dstTarget, buffer, inType, config, onEnd); if (error == E_OK) { delete msg; @@ -1567,3 +1569,49 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, SendFailed001, TestSize.Level0) EXPECT_EQ(aggregator->GetRetryCount(DEVICE_NAME_A, false), 0); EXPECT_GT(count.load(), 1); } +<<<<<<< HEAD +======= + +/** + * @tc.name: SendFailed002 + * @tc.desc: Test retry send count. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBCommunicatorDeepTest, SendFailed002, TestSize.Level0) +{ + std::condition_variable cv; + std::mutex endMutex; + bool sendEnd = false; + OnSendEnd onEnd = [&cv, &endMutex, &sendEnd](int, bool) { + std::lock_guard autoLock(endMutex); + sendEnd = true; + cv.notify_all(); + }; + const std::shared_ptr statusAdapter = std::make_shared(); + ASSERT_NE(statusAdapter, nullptr); + auto adapterStub = std::make_shared(DEVICE_NAME_A); + IAdapter *adapterPtr = adapterStub.get(); + ASSERT_NE(adapterPtr, nullptr); + auto aggregator = std::make_shared(); + ASSERT_NE(aggregator, nullptr); + adapterStub->ForkSendBytes([adapterStub]() { + adapterStub->SimulateTriggerSendableCallback(DEVICE_NAME_B, -E_INTERNAL_ERROR); + return -E_WAIT_RETRY; + }); + EXPECT_EQ(aggregator->Initialize(adapterPtr, statusAdapter), E_OK); + ResFinalizer finalizer([aggregator]() { + aggregator->Finalize(); + }); + ASSERT_NO_FATAL_FAILURE(TriggerSendMsg(aggregator, onEnd, false)); + LOGI("[SendFailed002] Begin wait send end"); + std::unique_lock uniqueLock(endMutex); + cv.wait_for(uniqueLock, std::chrono::seconds(5), [&sendEnd]() { // wait max 5s + return sendEnd; + }); + LOGI("[SendFailed002] End wait send end"); + adapterStub->ForkSendBytes(nullptr); + EXPECT_EQ(aggregator->GetRetryCount(DEVICE_NAME_B, true), 0); + EXPECT_EQ(aggregator->GetRetryCount(DEVICE_NAME_B, false), 0); +} +>>>>>>> 148c820e5... reset retry count when invalid send task