From cecd45f8187436d7aafc3c511c142bd98f9818f7 Mon Sep 17 00:00:00 2001 From: liaoyonghuang Date: Wed, 21 May 2025 09:50:19 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E7=AB=AF=E7=AB=AF=E3=80=91=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E8=8E=B7=E5=8F=96=E6=BA=90=E7=AB=AFuserId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: liaoyonghuang --- .../include/communicator_aggregator.h | 4 +- .../communicator/include/frame_retainer.h | 2 + .../communicator/include/message.h | 11 ++ .../communicator/src/communicator.cpp | 6 +- .../communicator/src/communicator.h | 2 +- .../src/communicator_aggregator.cpp | 39 ++++--- .../communicator/src/frame_retainer.cpp | 4 +- .../include/iprocess_communicator.h | 1 + .../syncer/src/device/isync_task_context.h | 2 +- .../single_ver_sync_task_context.cpp | 7 +- .../singlever/single_ver_sync_task_context.h | 2 +- .../syncer/src/device/sync_engine.cpp | 106 ++++++++++-------- .../syncer/src/device/sync_engine.h | 10 +- .../distributeddb_mock_sync_module_test.cpp | 18 +-- ...buteddb_single_ver_p2p_sync_check_test.cpp | 2 +- .../syncer/distributeddb_time_sync_test.cpp | 14 +-- .../common/syncer/generic_virtual_device.cpp | 3 +- .../common/syncer/generic_virtual_device.h | 1 + .../unittest/common/syncer/mock_sync_engine.h | 4 +- .../virtual_communicator_aggregator.cpp | 1 + 20 files changed, 135 insertions(+), 104 deletions(-) diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h index cfd849de530..7005f476ae0 100644 --- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h @@ -120,7 +120,7 @@ private: // Function with suffix NoMutex should be called with mutex in the caller int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, - const LabelType &toLabel, const std::string &userId = ""); + const LabelType &toLabel, const UserInfo &userInfo); // Auxiliary function for cutting short primary function int RegCallbackToAdapter(); @@ -164,7 +164,7 @@ private: uint64_t IncreaseSendSequenceId(const std::string &target); int GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, const DataUserInfoProc &userInfoProc, - const std::string &device, std::string &userId); + const std::string &device, UserInfo &userInfo); DECLARE_OBJECT_TAG(CommunicatorAggregator); diff --git a/frameworks/libs/distributeddb/communicator/include/frame_retainer.h b/frameworks/libs/distributeddb/communicator/include/frame_retainer.h index 4a7c80e9fac..854bc0923ef 100644 --- a/frameworks/libs/distributeddb/communicator/include/frame_retainer.h +++ b/frameworks/libs/distributeddb/communicator/include/frame_retainer.h @@ -29,12 +29,14 @@ class SerialBuffer; // Forward Declarations struct FrameInfo { SerialBuffer *buffer = nullptr; std::string srcTarget; + std::string senderUser; LabelType commLabel; uint32_t frameId = 0u; }; struct RetainWork { SerialBuffer *buffer = nullptr; + std::string senderUser; uint32_t frameId = 0u; uint32_t remainTime = 0u; // in second }; diff --git a/frameworks/libs/distributeddb/communicator/include/message.h b/frameworks/libs/distributeddb/communicator/include/message.h index 416e083169c..ec52a076b1c 100644 --- a/frameworks/libs/distributeddb/communicator/include/message.h +++ b/frameworks/libs/distributeddb/communicator/include/message.h @@ -142,6 +142,11 @@ public: target_ = inTarget; } + void SetSenderUserId(const std::string &userId) + { + senderUserId_ = userId; + } + void SetPriority(Priority inPriority) { prio_ = inPriority; @@ -185,6 +190,11 @@ public: return target_; } + std::string GetSenderUserId() const + { + return senderUserId_; + } + Priority GetPriority() const { return prio_; @@ -212,6 +222,7 @@ private: // Field carry supplemental info std::string target_; + std::string senderUserId_; Priority prio_ = Priority::LOW; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator.cpp index f8ebfa3cbf3..2171b170aa9 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator.cpp @@ -146,13 +146,15 @@ int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg return errCode; } -void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf) +void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf, + const std::string &senderUser) { std::lock_guard messageHandleLockGuard(messageHandleMutex_); - if (srcTarget.size() != 0 && inBuf != nullptr && onMessageHandle_) { + if (!srcTarget.empty() && inBuf != nullptr && onMessageHandle_) { int error = E_OK; // if error is not E_OK, null pointer will be returned Message *message = ProtocolProto::ToMessage(inBuf, error); + message->SetSenderUserId(senderUser); delete inBuf; inBuf = nullptr; // message is not nullptr if error is E_OK or error is E_NOT_REGISTER. diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.h b/frameworks/libs/distributeddb/communicator/src/communicator.h index 41ede23628e..d58ac48a57d 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator.h +++ b/frameworks/libs/distributeddb/communicator/src/communicator.h @@ -56,7 +56,7 @@ public: const OnSendEnd &onEnd) override; // Call by CommunicatorAggregator directly - void OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf); + void OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf, const std::string &senderUser); // Call by CommunicatorAggregator directly void OnConnectChange(const std::string &target, bool isConnect); diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 9d7c9cd4162..804a66b2a34 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -276,7 +276,7 @@ void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, co // Do Redeliver, the communicator is responsible to deal with the frame std::list framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel); for (auto &entry : framesToRedeliver) { - commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer); + commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer, entry.senderUser); } } @@ -678,20 +678,20 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &recei SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc) { LabelType toLabel = inResult.GetCommLabel(); - std::string userId; + UserInfo userInfo; if (receiveBytesInfo.headLength != 0) { - int ret = GetDataUserId(inResult, toLabel, userInfoProc, receiveBytesInfo.srcTarget, userId); - if ((ret != E_OK) || (userId.empty())) { + int ret = GetDataUserId(inResult, toLabel, userInfoProc, receiveBytesInfo.srcTarget, userInfo); + if (ret != E_OK || userInfo.receiveUser.empty() || userInfo.senderUser.empty()) { LOGE("[CommAggr][AppReceive] get data user id err, ret=%d", ret); delete inFrameBuffer; inFrameBuffer = nullptr; - return ret; + return ret != E_OK ? ret : -E_NO_TRUSTED_USER; } } { std::lock_guard commMapLockGuard(commMapMutex_); int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(receiveBytesInfo.srcTarget, inFrameBuffer, toLabel, - userId); + userInfo); if (errCode == E_OK) { // Attention: Here is equal to E_OK return E_OK; } @@ -701,7 +701,7 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &recei { std::lock_guard onCommLackLockGuard(onCommLackMutex_); if (onCommLackHandle_) { - errCode = onCommLackHandle_(toLabel, userId); + errCode = onCommLackHandle_(toLabel, userInfo.receiveUser); LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread } else { LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently."); @@ -710,7 +710,7 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &recei // Here we have to lock commMapMutex_ and search communicator again. std::lock_guard commMapLockGuard(commMapMutex_); int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(receiveBytesInfo.srcTarget, inFrameBuffer, toLabel, - userId); + userInfo); if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK. LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel)); return E_OK; @@ -723,13 +723,14 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &recei return errCode; // The caller will display errCode in log } // Do Retention, the retainer is responsible to deal with the frame - retainer_.RetainFrame(FrameInfo{inFrameBuffer, receiveBytesInfo.srcTarget, toLabel, inResult.GetFrameId()}); + retainer_.RetainFrame(FrameInfo{inFrameBuffer, receiveBytesInfo.srcTarget, userInfo.senderUser, toLabel, + inResult.GetFrameId()}); inFrameBuffer = nullptr; return E_OK; } int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, - const DataUserInfoProc &userInfoProc, const std::string &device, std::string &userId) + const DataUserInfoProc &userInfoProc, const std::string &device, UserInfo &userInfo) { if (userInfoProc.processCommunicator == nullptr) { LOGE("[CommAggr][GetDataUserId] processCommunicator is nullptr"); @@ -744,8 +745,8 @@ int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const Lab LOGE("[CommAggr][GetDataUserId] userId dismatched, drop packet"); return ret; } - if (userInfos.size() >= 1) { - userId = userInfos[0].receiveUser; + if (!userInfos.empty()) { + userInfo = userInfos[0]; } else { LOGW("[CommAggr][GetDataUserId] userInfos is empty"); } @@ -753,11 +754,13 @@ int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const Lab } int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, - SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const std::string &userId) + SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo) { // Ignore nonactivated communicator, which is regarded as inexistent - if (commMap_[userId].count(toLabel) != 0 && commMap_[userId].at(toLabel).second) { - commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer); + const std::string &senderUser = userInfo.senderUser; + const std::string &receiveUser = userInfo.receiveUser; + if (commMap_[receiveUser].count(toLabel) != 0 && commMap_[receiveUser].at(toLabel).second) { + commMap_[receiveUser].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer, senderUser); // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return. inFrameBuffer = nullptr; return E_OK; @@ -770,7 +773,7 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s communicator = entry.second.first; isEmpty = userCommMap.first.empty(); LOGW("[CommAggr][TryDeliver] Found communicator of %s, but required user is %s", - userCommMap.first.c_str(), userId.c_str()); + userCommMap.first.c_str(), receiveUser.c_str()); break; } } @@ -778,8 +781,8 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s break; } } - if (communicator != nullptr && (userId.empty() || isEmpty)) { - communicator->OnBufferReceive(srcTarget, inFrameBuffer); + if (communicator != nullptr && (receiveUser.empty() || isEmpty)) { + communicator->OnBufferReceive(srcTarget, inFrameBuffer, senderUser); inFrameBuffer = nullptr; return E_OK; } diff --git a/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp b/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp index fc0eb0bba34..c51fec9edb3 100644 --- a/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp +++ b/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp @@ -85,7 +85,7 @@ void FrameRetainer::RetainFrame(const FrameInfo &inFrame) if (inFrame.buffer == nullptr) { return; // Never gonna happen } - RetainWork work{inFrame.buffer, inFrame.frameId, MAX_RETAIN_TIME}; + RetainWork work{inFrame.buffer, inFrame.senderUser, inFrame.frameId, MAX_RETAIN_TIME}; if (work.buffer->GetSize() > MAX_RETAIN_FRAME_SIZE) { LOGE("[Retainer][Retain] Frame size=%u over limit=%u.", work.buffer->GetSize(), MAX_RETAIN_FRAME_SIZE); delete work.buffer; @@ -140,7 +140,7 @@ std::list FrameRetainer::FetchFramesForSpecificCommunicator(const Lab for (auto &entry : fetchOrder) { RetainWork &work = perLabel[entry.second][entry.first]; LogRetainInfo("[Retainer][Fetch] FETCH-OUT", inCommLabel, entry.second, entry.first, work); - outFrameList.emplace_back(FrameInfo{work.buffer, entry.second, inCommLabel, work.frameId}); + outFrameList.emplace_back(FrameInfo{work.buffer, entry.second, work.senderUser, inCommLabel, work.frameId}); // Update statistics totalSizeByByte_ -= work.buffer->GetSize(); totalRetainFrames_--; diff --git a/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h b/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h index 80893c447a9..d6cc41343e0 100644 --- a/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h +++ b/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h @@ -60,6 +60,7 @@ struct DataUserInfo { struct UserInfo { std::string receiveUser; + std::string senderUser; }; class ExtendHeaderHandle { diff --git a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h index 62a84de9c35..2ef33d7d596 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h +++ b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h @@ -35,7 +35,7 @@ public: enum TASK_EXEC_STATUS { INIT, RUNNING, FAILED, FINISHED }; // Initialize the context - virtual int Initialize(const std::string &deviceId, ISyncInterface *syncInterface, + virtual int Initialize(const DeviceSyncTarget &target, ISyncInterface *syncInterface, const std::shared_ptr &metadata, ICommunicator *communicator) = 0; // Add a sync task target with the operation to the queue diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp index 6edadebe7bf..e72dfa41585 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp @@ -39,10 +39,10 @@ SingleVerSyncTaskContext::~SingleVerSyncTaskContext() subManager_ = nullptr; } -int SingleVerSyncTaskContext::Initialize(const std::string &deviceId, ISyncInterface *syncInterface, +int SingleVerSyncTaskContext::Initialize(const DeviceSyncTarget &target, ISyncInterface *syncInterface, const std::shared_ptr &metadata, ICommunicator *communicator) { - if (deviceId.empty() || syncInterface == nullptr || metadata == nullptr || + if (target.device.empty() || syncInterface == nullptr || metadata == nullptr || communicator == nullptr) { LOGE("[SingleVerSyncTaskContext] [Initialize] parameter is invalid."); return -E_INVALID_ARGS; @@ -52,7 +52,8 @@ int SingleVerSyncTaskContext::Initialize(const std::string &deviceId, ISyncInter LOGE("[SingleVerSyncTaskContext] [Initialize] stateMachine_ is nullptr."); return -E_OUT_OF_MEMORY; } - deviceId_ = deviceId; + deviceId_ = target.device; + targetUserId_ = target.userId; std::vector dbIdentifier = syncInterface->GetIdentifier(); dbIdentifier.resize(3); // only show 3 bytes syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" + diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h index 0cb30f5165a..66efa6acfbc 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h @@ -40,7 +40,7 @@ public: DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncTaskContext); // Init SingleVerSyncTaskContext - int Initialize(const std::string &deviceId, ISyncInterface *syncInterface, + int Initialize(const DeviceSyncTarget &target, ISyncInterface *syncInterface, const std::shared_ptr &metadata, ICommunicator *communicator) override; // Add a sync task target with the operation to the queue diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp index 00e50851095..6c90856569a 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp @@ -323,13 +323,14 @@ int SyncEngine::InitComunicator(const ISyncInterface *syncInterface) int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation) { int errCode = E_OK; + std::string targetUserId = GetTargetUserId(deviceId); ISyncTaskContext *context = nullptr; { std::lock_guard lock(contextMapLock_); - context = FindSyncTaskContext(deviceId); + context = FindSyncTaskContext({deviceId, targetUserId}); if (context == nullptr) { if (!IsKilled()) { - context = GetSyncTaskContext(deviceId, errCode); + context = GetSyncTaskContext({deviceId, targetUserId}, errCode); } if (context == nullptr) { return errCode; @@ -417,7 +418,7 @@ int SyncEngine::DealMsgUtilQueueEmpty() // it will deal with the first message in queue, we should increase object reference counts and sure that resources // could be prevented from destroying by other threads. do { - ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode); + ISyncTaskContext *nextContext = GetContextForMsg({inMsg->GetTarget(), inMsg->GetSenderUserId()}, errCode); if (errCode != E_OK) { break; } @@ -434,12 +435,12 @@ int SyncEngine::DealMsgUtilQueueEmpty() return errCode; } -ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode) +ISyncTaskContext *SyncEngine::GetContextForMsg(const DeviceSyncTarget &target, int &errCode) { ISyncTaskContext *context = nullptr; { std::lock_guard lock(contextMapLock_); - context = FindSyncTaskContext(targetDev); + context = FindSyncTaskContext(target); if (context != nullptr) { // LCOV_EXCL_BR_LINE if (context->IsKilled()) { errCode = -E_OBJ_IS_KILLED; @@ -450,7 +451,7 @@ ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int errCode = -E_OBJ_IS_KILLED; return nullptr; } - context = GetSyncTaskContext(targetDev, errCode); + context = GetSyncTaskContext(target, errCode); if (context == nullptr) { return nullptr; } @@ -544,7 +545,7 @@ int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message } int errCode = E_OK; - ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode); + ISyncTaskContext *nextContext = GetContextForMsg({targetDev, inMsg->GetSenderUserId()}, errCode); if (errCode != E_OK) { return errCode; } @@ -598,9 +599,9 @@ int SyncEngine::GetMsgSize(const Message *inMsg) const } } -ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId) +ISyncTaskContext *SyncEngine::FindSyncTaskContext(const DeviceSyncTarget &target) { - auto iter = syncTaskContextMap_.find(deviceId); + auto iter = syncTaskContextMap_.find(target); if (iter != syncTaskContextMap_.end()) { ISyncTaskContext *context = iter->second; return context; @@ -608,24 +609,29 @@ ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId) return nullptr; } -ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId) +std::vector SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId) { - ISyncTaskContext *context = nullptr; + std::vector contexts; std::lock_guard lock(contextMapLock_); - context = FindSyncTaskContext(deviceId); - if (context == nullptr) { - LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId)); - return nullptr; - } - if (context->IsKilled()) { // LCOV_EXCL_BR_LINE - LOGI("[SyncEngine] context is killing"); - return nullptr; + for (const auto &iter : syncTaskContextMap_) { + if (iter.first.device != deviceId) { + continue; + } + if (iter.second == nullptr) { + LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId)); + return {}; + } + if (iter.second->IsKilled()) { // LCOV_EXCL_BR_LINE + LOGI("[SyncEngine] context is killing"); + return {}; + } + RefObject::IncObjRef(iter.second); + contexts.push_back(iter.second); } - RefObject::IncObjRef(context); - return context; + return contexts; } -ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode) +ISyncTaskContext *SyncEngine::GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode) { auto storage = GetAndIncSyncInterface(); if (storage == nullptr) { @@ -639,19 +645,19 @@ ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, in LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!"); return nullptr; } - errCode = context->Initialize(deviceId, storage, metadata_, communicatorProxy_); + errCode = context->Initialize(target, storage, metadata_, communicatorProxy_); if (errCode != E_OK) { - LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId)); + LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(target.device)); RefObject::DecObjRef(context); storage->DecRefCount(); context = nullptr; return nullptr; } - syncTaskContextMap_.insert(std::pair(deviceId, context)); + syncTaskContextMap_.insert(std::pair(target, context)); // IncRef for SyncEngine to make sure SyncEngine is valid when context access RefObject::IncObjRef(this); - context->OnLastRef([this, deviceId, storage]() { - LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId)); + context->OnLastRef([this, target, storage]() { + LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(target.device)); RefObject::DecObjRef(this); storage->DecRefCount(); }); @@ -665,8 +671,6 @@ int SyncEngine::ExecSyncTask(ISyncTaskContext *context) return -E_OBJ_IS_KILLED; } auto timeout = GetTimeout(context->GetDeviceId()); - std::string targetUserId = GetTargetUserId(context->GetDeviceId()); - context->SetTargetUserId(targetUserId); AutoLock lockGuard(context); int status = context->GetTaskExecStatus(); if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) { @@ -680,7 +684,7 @@ int SyncEngine::ExecSyncTask(ISyncTaskContext *context) context->ClearSyncOperation(); continue; } - if (targetUserId.empty()) { + if (context->GetTargetUserId().empty()) { LOGE("[SyncEngine] No target user found."); context->SetTaskErrCode(-E_NO_TRUSTED_USER); context->SetOperationStatus(SyncOperation::OP_FAILED); @@ -869,33 +873,37 @@ void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterfa static_cast(storage)->GetDBInfo(dbInfo); RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId); // get context and Inc context if context is not nullptr - ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId); - { - std::lock_guard lock(communicatorProxyLock_); - if (communicatorProxy_ == nullptr) { - return; + std::vector contexts = GetSyncTaskContextAndInc(deviceId); + for (const auto &context : contexts) { + { + std::lock_guard lock(communicatorProxyLock_); + if (communicatorProxy_ == nullptr) { + return; + } + if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE + LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId)); + RefObject::DecObjRef(context); + return; + } } - if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE - LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId)); + // means device is offline, clear local subscribe + subManager_->ClearLocalSubscribeQuery(deviceId); + // clear sync task + if (context != nullptr) { + context->ClearAllSyncTask(); RefObject::DecObjRef(context); - return; } } - // means device is offline, clear local subscribe - subManager_->ClearLocalSubscribeQuery(deviceId); - // clear sync task - if (context != nullptr) { - context->ClearAllSyncTask(); - RefObject::DecObjRef(context); - } } void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId) { - ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId); - if (context != nullptr) { - context->ClearAllSyncTask(); - RefObject::DecObjRef(context); + std::vector contexts = GetSyncTaskContextAndInc(deviceId); + for (const auto &context : contexts) { + if (context != nullptr) { + context->ClearAllSyncTask(); + RefObject::DecObjRef(context); + } } } diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h index f81b7a474cb..cbed36ed923 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h @@ -142,8 +142,8 @@ protected: virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; // Find SyncTaskContext from the map - ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId); - ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId); + ISyncTaskContext *FindSyncTaskContext(const DeviceSyncTarget &target); + std::vector GetSyncTaskContextAndInc(const std::string &deviceId); void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); @@ -151,12 +151,12 @@ protected: ISyncInterface *GetAndIncSyncInterface(); void SetSyncInterface(ISyncInterface *syncInterface); - ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode); + ISyncTaskContext *GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode); std::mutex storageMutex_; ISyncInterface *syncInterface_; // Used to store all send sync task infos (such as pull sync response, and push sync request) - std::map syncTaskContextMap_; + std::map syncTaskContextMap_; std::mutex contextMapLock_; std::shared_ptr subManager_; std::function queryAutoSyncCallback_; @@ -201,7 +201,7 @@ private: // Handle message in order. int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg); - ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode); + ISyncTaskContext *GetContextForMsg(const DeviceSyncTarget &target, int &errCode); ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = ""); diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp index a7d883f53b9..5a31e4a6a73 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp @@ -108,7 +108,7 @@ void Init(MockSingleVerStateMachine &stateMachine, MockSyncTaskContext &syncTask { std::shared_ptr metadata = std::make_shared(); ASSERT_EQ(metadata->Initialize(&dbSyncInterface), E_OK); - (void)syncTaskContext.Initialize("device", &dbSyncInterface, metadata, &communicator); + (void)syncTaskContext.Initialize({"device", ""}, &dbSyncInterface, metadata, &communicator); (void)stateMachine.Initialize(&syncTaskContext, &dbSyncInterface, metadata, &communicator); } @@ -117,7 +117,7 @@ void Init(MockSingleVerStateMachine &stateMachine, MockSyncTaskContext *syncTask { std::shared_ptr metadata = std::make_shared(); ASSERT_EQ(metadata->Initialize(dbSyncInterface), E_OK); - (void)syncTaskContext->Initialize("device", dbSyncInterface, metadata, &communicator); + (void)syncTaskContext->Initialize({"device", ""}, dbSyncInterface, metadata, &communicator); (void)stateMachine.Initialize(syncTaskContext, dbSyncInterface, metadata, &communicator); } @@ -934,7 +934,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncDataSync003, TestSize.Level1) const std::string deviceId = "deviceId"; dataSync.Initialize(&storage, &communicator, metadata, deviceId); syncTaskContext.SetRemoteSoftwareVersion(SOFTWARE_VERSION_CURRENT); - syncTaskContext.Initialize(deviceId, &storage, metadata, &communicator); + syncTaskContext.Initialize({deviceId, ""}, &storage, metadata, &communicator); syncTaskContext.EnableClearRemoteStaleData(true); /** @@ -1410,7 +1410,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest004, TestSize.Level0) auto *enginePtr = new (std::nothrow) MockSyncEngine(); ASSERT_NE(enginePtr, nullptr); int errCode = E_OK; - auto *context = enginePtr->CallGetSyncTaskContext("dev", errCode); + auto *context = enginePtr->CallGetSyncTaskContext({"dev", "user"}, errCode); EXPECT_EQ(context, nullptr); EXPECT_EQ(errCode, -E_INVALID_DB); RefObject::KillAndDecObjRef(enginePtr); @@ -1727,7 +1727,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck001, TestSize.Leve MockCommunicator communicator; VirtualSingleVerSyncDBInterface dbSyncInterface; std::shared_ptr metadata = std::make_shared(); - (void)syncTaskContext.Initialize("device", &dbSyncInterface, metadata, &communicator); + (void)syncTaskContext.Initialize({"device", ""}, &dbSyncInterface, metadata, &communicator); syncTaskContext.SetLastFullSyncTaskStatus(SyncOperation::Status::OP_FINISHED_ALL); syncTaskContext.CallSetSyncMode(static_cast(SyncModeType::PUSH)); EXPECT_EQ(syncTaskContext.CallIsCurrentSyncTaskCanBeSkipped(), true); @@ -1880,7 +1880,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck005, TestSize.Leve VirtualSingleVerSyncDBInterface dbSyncInterface; std::shared_ptr metadata = std::make_shared(); ASSERT_EQ(metadata->Initialize(&dbSyncInterface), E_OK); - (void)context->Initialize("device", &dbSyncInterface, metadata, &communicator); + (void)context->Initialize({"device", ""}, &dbSyncInterface, metadata, &communicator); (void)stateMachine.Initialize(context, &dbSyncInterface, metadata, &communicator); for (int i = 0; i < 100; ++i) { // 100 sync target @@ -1936,7 +1936,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck006, TestSize.Leve VirtualSingleVerSyncDBInterface dbSyncInterface; std::shared_ptr metadata = std::make_shared(); ASSERT_EQ(metadata->Initialize(&dbSyncInterface), E_OK); - (void)context->Initialize("device", &dbSyncInterface, metadata, communicator); + (void)context->Initialize({"device", ""}, &dbSyncInterface, metadata, communicator); /** * @tc.steps: step2. add sync target into context */ @@ -1976,7 +1976,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck007, TestSize.Leve VirtualRelationalVerSyncDBInterface dbSyncInterface; std::shared_ptr metadata = std::make_shared(); ASSERT_EQ(metadata->Initialize(&dbSyncInterface), E_OK); - (void)context->Initialize("device", &dbSyncInterface, metadata, &communicator); + (void)context->Initialize({"device", ""}, &dbSyncInterface, metadata, &communicator); (void)stateMachine.Initialize(context, &dbSyncInterface, metadata, &communicator); /** * @tc.steps: step2. prepare table and query @@ -2398,7 +2398,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SingleVerDataSyncUtils001, TestSize.Le MockCommunicator communicator; VirtualSingleVerSyncDBInterface dbSyncInterface; std::shared_ptr metadata = std::make_shared(); - (void)context.Initialize("device", &dbSyncInterface, metadata, &communicator); + (void)context.Initialize({"device", ""}, &dbSyncInterface, metadata, &communicator); std::vector data; for (int i = 0; i < 2; ++i) { // loop 2 times diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp index 989450edf74..19a066635c9 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp @@ -2144,7 +2144,7 @@ HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt002, TestSize.L * @tc.require: * @tc.author: zhangqiquan */ -HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level1) +HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level0) { /** * @tc.steps: step1. record packet which send to B diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_time_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_time_sync_test.cpp index 33ace07dd8a..3d5c2607ee6 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_time_sync_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_time_sync_test.cpp @@ -163,7 +163,7 @@ HWTEST_F(DistributedDBTimeSyncTest, NormalSync001, TestSize.Level0) /** * @tc.steps: step3. Register the OnMessageCallback to virtual communicator */ - g_syncTaskContext->Initialize(DEVICE_B, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); + g_syncTaskContext->Initialize({DEVICE_B, ""}, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); g_virtualCommunicator->SetTimeSync(g_timeSyncA.get(), g_timeSyncB.get(), DEVICE_A, g_syncTaskContext); /** @@ -203,7 +203,7 @@ HWTEST_F(DistributedDBTimeSyncTest, NormalSync002, TestSize.Level0) /** * @tc.steps: step2. Register the OnMessageCallback to virtual communicator */ - g_syncTaskContext->Initialize(DEVICE_B, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); + g_syncTaskContext->Initialize({DEVICE_B, ""}, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); g_virtualCommunicator->SetTimeSync(g_timeSyncA.get(), g_timeSyncB.get(), DEVICE_A, g_syncTaskContext); /** * @tc.steps: step3. Fetch timeOffset value @@ -253,7 +253,7 @@ HWTEST_F(DistributedDBTimeSyncTest, NormalSync003, TestSize.Level0) /** * @tc.steps: step3. Register the OnMessageCallback to virtual communicator */ - g_syncTaskContext->Initialize(DEVICE_B, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); + g_syncTaskContext->Initialize({DEVICE_B, ""}, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); g_virtualCommunicator->SetTimeSync(g_timeSyncA.get(), g_timeSyncB.get(), DEVICE_A, g_syncTaskContext); /** * @tc.steps: step4. Fetch timeOffset value @@ -290,7 +290,7 @@ HWTEST_F(DistributedDBTimeSyncTest, NetDisconnetSyncTest001, TestSize.Level0) errCode = g_timeSyncB->Initialize(g_virtualCommunicator, g_metadataB, g_syncInterfaceB, DEVICE_A, ""); EXPECT_TRUE(errCode == E_OK); - g_syncTaskContext->Initialize(DEVICE_B, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); + g_syncTaskContext->Initialize({DEVICE_B, ""}, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); g_virtualCommunicator->SetTimeSync(g_timeSyncA.get(), g_timeSyncB.get(), DEVICE_A, g_syncTaskContext); /** * @tc.steps: step2. Disable the virtual communicator @@ -395,7 +395,7 @@ HWTEST_F(DistributedDBTimeSyncTest, InvalidMessgeTest002, TestSize.Level0) // initialize timeSyncB errCode = g_timeSyncB->Initialize(g_virtualCommunicator, g_metadataB, g_syncInterfaceB, DEVICE_A, ""); EXPECT_TRUE(errCode == E_OK); - g_syncTaskContext->Initialize(DEVICE_B, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); + g_syncTaskContext->Initialize({DEVICE_B, ""}, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); g_virtualCommunicator->SetTimeSync(g_timeSyncA.get(), g_timeSyncB.get(), DEVICE_A, g_syncTaskContext); Message *msg = new (std::nothrow) Message(); @@ -459,7 +459,7 @@ HWTEST_F(DistributedDBTimeSyncTest, SyncTimeout001, TestSize.Level2) * @tc.steps: step1. Initialize the syncTaskContext * @tc.expected: step1. Initialize syncTaskContext successfully */ - errCode = g_syncTaskContext->Initialize(DEVICE_B, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); + errCode = g_syncTaskContext->Initialize({DEVICE_B, ""}, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); EXPECT_TRUE(errCode == E_OK); /** * @tc.steps: step2. Start the time syc task invoking StartSync() method @@ -488,7 +488,7 @@ HWTEST_F(DistributedDBTimeSyncTest, CheckRemoteVersion001, TestSize.Level0) * @tc.steps: step1. Initialize the syncTaskContext * @tc.expected: step1. Initialize syncTaskContext successfully */ - errCode = g_syncTaskContext->Initialize(DEVICE_B, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); + errCode = g_syncTaskContext->Initialize({DEVICE_B, ""}, g_syncInterfaceA, g_metadataA, g_virtualCommunicator); EXPECT_EQ(errCode, E_OK); /** * @tc.steps: step2. Check remote version diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp index b68cae6e725..fc2806f7f54 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp @@ -27,6 +27,7 @@ GenericVirtualDevice::GenericVirtualDevice(std::string deviceId) metadata_(nullptr), deviceId_(std::move(deviceId)), remoteDeviceId_("real_device"), + targetUserId_("targetUser"), context_(nullptr), onRemoteDataChanged_(nullptr), subManager_(nullptr), @@ -110,7 +111,7 @@ int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregato } communicateHandle_->RegOnMessageCallback( std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {}); - context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_); + context_->Initialize({remoteDeviceId_, targetUserId_}, storage_, metadata_, communicateHandle_); context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY); context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this)); diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h index 462dd694848..62dd5989334 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h @@ -56,6 +56,7 @@ protected: std::shared_ptr metadata_; std::string deviceId_; std::string remoteDeviceId_; + std::string targetUserId_; SyncTaskContext *context_; std::function onRemoteDataChanged_; diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h index 1545ea636e1..72a537b9567 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h @@ -29,9 +29,9 @@ public: subManager_ = std::make_shared(); } - ISyncTaskContext *CallGetSyncTaskContext(const std::string &deviceId, int &errCode) + ISyncTaskContext * CallGetSyncTaskContext(const DeviceSyncTarget &target, int &errCode) { - return SyncEngine::GetSyncTaskContext(deviceId, errCode); + return SyncEngine::GetSyncTaskContext(target, errCode); } }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp index ddc3d9c5c92..50d3ef2f529 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp @@ -211,6 +211,7 @@ void VirtualCommunicatorAggregator::DispatchMessageInner(const std::string &srcT uint32_t messageId = inMsg->GetMessageId(); Message *msg = const_cast(inMsg); msg->SetTarget(srcTarget); + msg->SetSenderUserId(communicator->GetTargetUserId({})); RefObject::IncObjRef(communicator); auto onDispatch = onDispatch_; bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) && -- Gitee