diff --git a/frameworks/libs/distributeddb/BUILD.gn b/frameworks/libs/distributeddb/BUILD.gn index 4308a9b12c8b8a0ab05d0cdf2a262b229e04cb1d..ee9a8885322761fc6887b70b5709a9737746c874 100644 --- a/frameworks/libs/distributeddb/BUILD.gn +++ b/frameworks/libs/distributeddb/BUILD.gn @@ -118,6 +118,7 @@ ohos_shared_library("distributeddb") { "communicator/src/communicator.cpp", "communicator/src/communicator_aggregator.cpp", "communicator/src/communicator_linker.cpp", + "communicator/src/db_status_adapter.cpp", "communicator/src/frame_combiner.cpp", "communicator/src/frame_retainer.cpp", "communicator/src/header_converter.cpp", @@ -254,6 +255,7 @@ ohos_shared_library("distributeddb") { "syncer/src/single_ver_sync_task_context.cpp", "syncer/src/single_ver_syncer.cpp", "syncer/src/subscribe_manager.cpp", + "syncer/src/subscribe_recorder.cpp", "syncer/src/sync_config.cpp", "syncer/src/sync_engine.cpp", "syncer/src/sync_operation.cpp", diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 2b51e51e45d4375a010b8f3753aafba456c1242b..3fdb3f4b24500b0d61b0ae983612bf2ebb48ce39 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -70,6 +70,8 @@ public: static bool IsSameCipher(CipherType srcType, CipherType inputType); static bool CheckIsAlnumAndUnderscore(const std::string &text); + + static std::string GenerateHashLabel(const DBInfo &dbInfo); }; // Define short macro substitute for original long expression for convenience of using diff --git a/frameworks/libs/distributeddb/common/include/runtime_context.h b/frameworks/libs/distributeddb/common/include/runtime_context.h index 85c22557ed5b0b51ffe6569b6cad0dd04bfe0f6d..408427602ca672ba0ef891353af04719c38fd0e8 100644 --- a/frameworks/libs/distributeddb/common/include/runtime_context.h +++ b/frameworks/libs/distributeddb/common/include/runtime_context.h @@ -22,12 +22,14 @@ #include "auto_launch.h" #include "auto_launch_export.h" +#include "db_info_handle.h" #include "icommunicator_aggregator.h" #include "iprocess_system_api_adapter.h" #include "kv_store_observer.h" #include "kvdb_properties.h" #include "macro_utils.h" #include "notification_chain.h" +#include "query_sync_object.h" #include "types_export.h" namespace DistributedDB { @@ -139,6 +141,19 @@ public: virtual std::map GetPermissionCheckParam(const DBProperties &properties) = 0; virtual void StopTaskPool() = 0; + + virtual void SetDBInfoHandle(const std::shared_ptr &handle) = 0; + + virtual void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) = 0; + + virtual void RecordSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId, const QuerySyncObject &query) = 0; + + virtual void RemoveSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId) = 0; + + virtual void RemoveSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId, const QuerySyncObject &query) = 0; + + virtual void GetSubscribeQuery(const DBInfo &dbInfo, + std::map> &subscribeQuery) = 0; protected: RuntimeContext() = default; virtual ~RuntimeContext() {} diff --git a/frameworks/libs/distributeddb/common/src/db_common.cpp b/frameworks/libs/distributeddb/common/src/db_common.cpp index 9b51158ddca8d18a7a384cf5d725d05afb549883..359a93090b31fb767a64df66baa4ce4ab7b9f4c8 100644 --- a/frameworks/libs/distributeddb/common/src/db_common.cpp +++ b/frameworks/libs/distributeddb/common/src/db_common.cpp @@ -403,4 +403,13 @@ bool DBCommon::CheckIsAlnumAndUnderscore(const std::string &text) }); return iter == text.end(); } + +std::string DBCommon::GenerateHashLabel(const DBInfo &dbInfo) +{ + if (dbInfo.syncDualTupleMode) { + return DBCommon::TransferHashString(dbInfo.appId + "-" + dbInfo.storeId); + } else { + return DBCommon::TransferHashString(dbInfo.userId + "-" + dbInfo.appId + "-" + dbInfo.storeId); + } +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp index 8398c979b1f0b460c6b4364776d2c7b71687cc43..3f2af04c0cc1c601ea96bd334a79a824f13a81b2 100644 --- a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp +++ b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp @@ -31,7 +31,9 @@ RuntimeContextImpl::RuntimeContextImpl() timeTickMonitor_(nullptr), systemApiAdapter_(nullptr), lockStatusObserver_(nullptr), - currentSessionId_(1) + currentSessionId_(1), + dbStatusAdapter_(nullptr), + subscribeRecorder_(nullptr) { } @@ -58,6 +60,8 @@ RuntimeContextImpl::~RuntimeContextImpl() delete lockStatusObserver_; lockStatusObserver_ = nullptr; userChangeMonitor_ = nullptr; + dbStatusAdapter_ = nullptr; + subscribeRecorder_ = nullptr; } // Set the label of this process. @@ -94,6 +98,7 @@ int RuntimeContextImpl::SetCommunicatorAdapter(IAdapter *adapter) int RuntimeContextImpl::GetCommunicatorAggregator(ICommunicatorAggregator *&outAggregator) { outAggregator = nullptr; + const std::shared_ptr statusAdapter = GetDBStatusAdapter(); std::lock_guard lock(communicatorLock_); if (communicatorAggregator_ != nullptr) { outAggregator = communicatorAggregator_; @@ -111,7 +116,7 @@ int RuntimeContextImpl::GetCommunicatorAggregator(ICommunicatorAggregator *&outA return -E_OUT_OF_MEMORY; } - int errCode = communicatorAggregator_->Initialize(adapter_); + int errCode = communicatorAggregator_->Initialize(adapter_, statusAdapter); if (errCode != E_OK) { LOGE("CommunicatorAggregator init failed, err = %d!", errCode); RefObject::KillAndDecObjRef(communicatorAggregator_); @@ -733,4 +738,97 @@ void RuntimeContextImpl::StopTaskPool() taskPool_ = nullptr; } } + +void RuntimeContextImpl::SetDBInfoHandle(const std::shared_ptr &handle) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr) { + dbStatusAdapter->SetDBInfoHandle(handle); + } + std::shared_ptr subscribeRecorder = GetSubscribeRecorder(); + if (subscribeRecorder != nullptr) { + subscribeRecorder->RemoveAllSubscribe(); + } +} + +void RuntimeContextImpl::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr) { + dbStatusAdapter->NotifyDBInfos(devInfos, dbInfos); + } +} + +std::shared_ptr RuntimeContextImpl::GetDBStatusAdapter() +{ + std::lock_guard autoLock(statusAdapterMutex_); + if (dbStatusAdapter_ == nullptr) { + dbStatusAdapter_ = std::make_unique(); + } + if (dbStatusAdapter_ == nullptr) { + LOGE("[RuntimeContextImpl] DbStatusAdapter create failed!"); + } + return dbStatusAdapter_; +} + +void RuntimeContextImpl::RecordSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId, const QuerySyncObject &query) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr && !dbStatusAdapter->IsSupport(deviceId)) { + return; + } + std::shared_ptr subscribeRecorder = GetSubscribeRecorder(); + if (subscribeRecorder != nullptr) { + subscribeRecorder->RecordSubscribe(dbInfo, deviceId, query); + } +} + +void RuntimeContextImpl::RemoveSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr && !dbStatusAdapter->IsSupport(deviceId)) { + return; + } + std::shared_ptr subscribeRecorder = GetSubscribeRecorder(); + if (subscribeRecorder != nullptr) { + subscribeRecorder->RemoveSubscribe(dbInfo, deviceId); + } +} + +void RuntimeContextImpl::RemoveSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId, const QuerySyncObject &query) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr && !dbStatusAdapter->IsSupport(deviceId)) { + return; + } + std::shared_ptr subscribeRecorder = GetSubscribeRecorder(); + if (subscribeRecorder != nullptr) { + subscribeRecorder->RemoveSubscribe(dbInfo, deviceId, query); + } +} + +void RuntimeContextImpl::GetSubscribeQuery(const DBInfo &dbInfo, + std::map> &subscribeQuery) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr && !dbStatusAdapter->ExistDBInfoHandle()) { + return; + } + std::shared_ptr subscribeRecorder = GetSubscribeRecorder(); + if (subscribeRecorder != nullptr) { + subscribeRecorder->GetSubscribeQuery(dbInfo, subscribeQuery); + } +} + +std::shared_ptr RuntimeContextImpl::GetSubscribeRecorder() +{ + std::lock_guard autoLock(subscribeRecorderMutex_); + if (subscribeRecorder_ == nullptr) { + subscribeRecorder_ = std::make_unique(); + } + if (subscribeRecorder_ == nullptr) { + LOGE("[RuntimeContextImpl] SubscribeRecorder create failed!"); + } + return subscribeRecorder_; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/common/src/runtime_context_impl.h b/frameworks/libs/distributeddb/common/src/runtime_context_impl.h index e79d23eee3db91ab9ccfcff3a0139bb23817a405..7ec6ecf6604c03b5bc14c37eb0818a828b1a70f6 100644 --- a/frameworks/libs/distributeddb/common/src/runtime_context_impl.h +++ b/frameworks/libs/distributeddb/common/src/runtime_context_impl.h @@ -21,10 +21,12 @@ #include #include "auto_launch.h" +#include "db_status_adapter.h" #include "evloop/src/ievent.h" #include "evloop/src/ievent_loop.h" #include "icommunicator_aggregator.h" #include "lock_status_observer.h" +#include "subscribe_recorder.h" #include "task_pool.h" #include "time_tick_monitor.h" #include "user_change_monitor.h" @@ -127,6 +129,19 @@ public: std::map GetPermissionCheckParam(const DBProperties &properties) override; void StopTaskPool() override; + + void SetDBInfoHandle(const std::shared_ptr &handle) override; + + void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) override; + + void RecordSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId, const QuerySyncObject &query) override; + + void RemoveSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId) override; + + void RemoveSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId, const QuerySyncObject &query) override; + + void GetSubscribeQuery(const DBInfo &dbInfo, + std::map> &subscribeQuery) override; private: static constexpr int MAX_TP_THREADS = 10; // max threads of the task pool. static constexpr int MIN_TP_THREADS = 1; // min threads of the task pool. @@ -135,6 +150,8 @@ private: int PrepareLoop(IEventLoop *&loop); int PrepareTaskPool(); int AllocTimerId(IEvent *evTimer, TimerId &timerId); + std::shared_ptr GetDBStatusAdapter(); + std::shared_ptr GetSubscribeRecorder(); // Context fields mutable std::mutex labelMutex_; @@ -189,6 +206,12 @@ private: // Get map from this callback, use for run permission check in remote device mutable std::shared_mutex permissionConditionLock_; PermissionConditionCallback permissionConditionCallback_; + + mutable std::mutex statusAdapterMutex_; + std::shared_ptr dbStatusAdapter_; + + mutable std::mutex subscribeRecorderMutex_; + std::shared_ptr subscribeRecorder_; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h index c3a31cdc1271b90bd2d3680f78464a12bb4bee26..c709f748dfe1c21d7b79209cda5efc8df2e8e533 100644 --- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h @@ -54,7 +54,7 @@ public: DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator); // See ICommunicatorAggregator for detail - int Initialize(IAdapter *inAdapter) override; + int Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) override; // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called. void Finalize() override; @@ -135,6 +135,12 @@ private: // Record the protocol version of remote target. void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); + void OnRemoteDBStatusChange(const std::string &devInfo, const std::vector &dbInfos); + + void NotifyConnectChange(const std::string &srcTarget, const std::map &changedLabels); + + void RegDBChangeCallback(); + DECLARE_OBJECT_TAG(CommunicatorAggregator); static std::atomic isCommunicatorNotFoundFeedbackEnable_; @@ -175,6 +181,8 @@ private: OnConnectCallback onConnectHandle_; Finalizer onConnectFinalizer_; mutable std::mutex onConnectMutex_; + + std::shared_ptr dbStatusAdapter_; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h new file mode 100644 index 0000000000000000000000000000000000000000..8d58470f6b977ddbe9078b6e9edac830ac35ac00 --- /dev/null +++ b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DB_STATUS_ADAPTER_H +#define DB_STATUS_ADAPTER_H + +#include "db_info_handle.h" +#include +#include "macro_utils.h" + +namespace DistributedDB { +using RemoteDBChangeCallback = std::function &dbInfos)>; +using LocalDBChangeCallback = std::function; +class DBStatusAdapter { +public: + DBStatusAdapter(); + ~DBStatusAdapter() = default; + DISABLE_COPY_ASSIGN_MOVE(DBStatusAdapter); + + void SetDBInfoHandle(const std::shared_ptr &dbInfoHandle); + bool ExistDBInfoHandle() const; + bool IsSupport(const std::string &devInfo); + int GetDBInfos(std::vector &dbInfos) const; + void SetDBStatusChangeCallback(const RemoteDBChangeCallback &remote, const LocalDBChangeCallback &local); + void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos); + void TargetOffline(const std::string &device); +private: + std::shared_ptr GetDBInfoHandle() const; + bool LoadIntoCache(const DeviceInfos &devInfos, const std::vector &dbInfos, + std::vector &changeDbInfos); + void ClearAllCache(); + + static int GetLocalDeviceId(std::string &deviceId); + static bool IsLocalDeviceId(const std::string &deviceId); + static void MergeDBInfos(const std::vector &srcDbInfos, std::vector &dstDbInfos, + std::vector &changeDbInfos); + mutable std::mutex handleMutex_; + std::shared_ptr dbInfoHandle_ = nullptr; + + mutable std::mutex callbackMutex_; + RemoteDBChangeCallback remoteCallback_; + LocalDBChangeCallback localCallback_; + + mutable std::mutex localInfoMutex_; + std::vector localDBInfos_; + + mutable std::mutex remoteInfoMutex_; + std::map> remoteDBInfos_; + + mutable std::mutex supportMutex_; + std::map remoteSupportInfo_; +}; +} +#endif // DB_STATUS_ADAPTER_H diff --git a/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h index 06ae9e7c1e91e74188cf57540be44690eb309fe8..70ee63cf8f1b09bc74c00931df7a31a36c9f4145 100644 --- a/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h @@ -18,6 +18,7 @@ #include #include "communicator_type_define.h" +#include "db_status_adapter.h" #include "iadapter.h" #include "ref_object.h" @@ -32,7 +33,7 @@ public: // The caller is the owner of inAdapter and responsible for manage its lifecycle. // The ICommunicatorAggregator is only the user of inAdapter // If Initialize fail, the ICommunicatorAggregator will rollback what had done to inAdapter so it can be reuse. - virtual int Initialize(IAdapter *inAdapter) = 0; + virtual int Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) = 0; // Call this method after Initialize successfully and before destroy the ICommunicatorAggregator // Emphasize again : DO NOT CALL Finalize IF Initialize FAIL. diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 3cb9f9aab447e1014d37bdebba8e481af0ac4e28..95f01b098400c9388677b6a3c22f5f07c2b1b53b 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -50,7 +50,7 @@ CommunicatorAggregator::~CommunicatorAggregator() commLinker_ = nullptr; } -int CommunicatorAggregator::Initialize(IAdapter *inAdapter) +int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) { if (inAdapter == nullptr) { return -E_INVALID_ARGS; @@ -62,7 +62,7 @@ int CommunicatorAggregator::Initialize(IAdapter *inAdapter) scheduler_.Initialize(); int errCode; - commLinker_ = new (std::nothrow) CommunicatorLinker(this); + commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter); if (commLinker_ == nullptr) { errCode = -E_OUT_OF_MEMORY; goto ROLL_BACK; @@ -83,6 +83,8 @@ int CommunicatorAggregator::Initialize(IAdapter *inAdapter) shutdown_ = false; exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this); + dbStatusAdapter_ = statusAdapter; + RegDBChangeCallback(); return E_OK; ROLL_BACK: UnRegCallbackFromAdapter(); @@ -118,6 +120,7 @@ void CommunicatorAggregator::Finalize() commLinker_ = nullptr; retainer_.Finalize(); combiner_.Finalize(); + dbStatusAdapter_ = nullptr; } ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo) @@ -617,23 +620,7 @@ int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail."); return errCode; } - if (!commLinker_->IsRemoteTargetOnline(srcTarget)) { - LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str()); - for (const auto &entry : changedLabels) { - LOGW("[CommAggr][CommReceive] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second); - } - return E_OK; - } - // Do target change notify - std::lock_guard commMapLockGuard(commMapMutex_); - for (auto &entry : changedLabels) { - // Ignore nonactivated communicator - if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) { - LOGI("[CommAggr][CommReceive] label=%s, srcTarget=%s{private}, isOnline=%d.", - VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second); - commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second); - } - } + NotifyConnectChange(srcTarget, changedLabels); } return E_OK; } @@ -768,6 +755,7 @@ void CommunicatorAggregator::UnRegCallbackFromAdapter() adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr); adapterHandle_->RegTargetChangeCallback(nullptr, nullptr); adapterHandle_->RegSendableCallback(nullptr, nullptr); + dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr); } void CommunicatorAggregator::GenerateLocalSourceId() @@ -879,5 +867,54 @@ std::shared_ptr CommunicatorAggregator::GetExtendHeaderHandl return adapterHandle_->GetExtendHeaderHandle(paramInfo); } +void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector &dbInfos) +{ + std::map changedLabels; + for (const auto &dbInfo: dbInfos) { + std::string label = DBCommon::GenerateHashLabel(dbInfo); + LabelType labelType(label.begin(), label.end()); + changedLabels[labelType] = dbInfo.isNeedSync; + } + if (commLinker_ != nullptr) { + commLinker_->UpdateOnlineLabels(devInfo, changedLabels); + } + NotifyConnectChange(devInfo, changedLabels); +} + +void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget, + const std::map &changedLabels) +{ + if (!commLinker_->IsRemoteTargetOnline(srcTarget)) { + LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str()); + for (const auto &entry : changedLabels) { + LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second); + } + } + // Do target change notify + std::lock_guard commMapLockGuard(commMapMutex_); + for (auto &entry : changedLabels) { + // Ignore nonactivated communicator + if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) { + LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.", + VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second); + commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second); + } + } +} + +void CommunicatorAggregator::RegDBChangeCallback() +{ + if (dbStatusAdapter_ != nullptr) { + dbStatusAdapter_->SetDBStatusChangeCallback( + [this](const std::string &devInfo, const std::vector &dbInfos) { + OnRemoteDBStatusChange(devInfo, dbInfos); + }, + [this](){ + if (commLinker_ != nullptr) { + (void)commLinker_->TriggerLabelExchangeEvent(); + } + }); + } +} DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator) } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp index ab51b370d34c2d2a7361f70f99c9418ff2392bd8..acde3d7324a3b0e57c19016c159215f22893cd23 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp @@ -14,7 +14,10 @@ */ #include "communicator_linker.h" + +#include #include "communicator_aggregator.h" +#include "db_common.h" #include "db_errno.h" #include "hash.h" #include "log_print.h" @@ -29,10 +32,12 @@ constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransm constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval } -CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator) +CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator, + std::shared_ptr statusAdapter) : incSequenceId_(0), incAckTriggerId_(0) { aggregator_ = inAggregator; + statusAdapter_ = std::move(statusAdapter); RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator } @@ -40,6 +45,7 @@ CommunicatorLinker::~CommunicatorLinker() { RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator aggregator_ = nullptr; + statusAdapter_ = nullptr; } void CommunicatorLinker::Initialize() @@ -75,6 +81,9 @@ int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set