diff --git a/frameworks/libs/distributeddb/include/types_export.h b/frameworks/libs/distributeddb/include/types_export.h index d471aa1be0765ad072420f42bcae27447f097d5d..7bdac3a8b3e23f374987bfcf6919fe8eba01b872 100644 --- a/frameworks/libs/distributeddb/include/types_export.h +++ b/frameworks/libs/distributeddb/include/types_export.h @@ -244,5 +244,14 @@ struct StoreInfo { } }; using TranslateToDeviceIdCallback = std::function; + +struct DeviceSyncNotifyInfo { + std::string deviceId; +}; + +enum class DeviceSyncEvent : int { + REMOTE_PULL_STARTED = 0 +}; +using DeviceSyncNotifier = std::function; } // namespace DistributedDB #endif // DISTRIBUTEDDB_TYPES_EXPORT_H diff --git a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h index 8c63e761a10deeafef252b7e01fd03e504fd8cec..24bb48050ed9781781223a3c881f14c738bb53c6 100644 --- a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h +++ b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h @@ -332,6 +332,15 @@ public: { return OK; } + + // Set a notifier callback, it will be called when event {@link DeviceSyncEvent} happened. + // If set nullptr, means unregister the notifier. + // If repeat set, subject to the last time. + DB_API virtual DBStatus SetDeviceSyncNotify([[gnu::unused]] DeviceSyncEvent event, + [[gnu::unused]] const DeviceSyncNotifier ¬ifier) + { + return OK; + } }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp index 1fdf381d191b4381f2c257df7b78ce00980655b3..defc53f1bcafe730ffe2db9ba1b43061522cdb2c 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp @@ -1437,4 +1437,13 @@ void KvStoreNbDelegateImpl::SetHandle(void *handle) dlHandle_ = handle; #endif } + +DBStatus KvStoreNbDelegateImpl::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + if (conn_ == nullptr) { + LOGE("%s", INVALID_CONNECTION); + return DB_ERROR; + } + return TransferDBErrno(conn_->SetDeviceSyncNotify(event, notifier)); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h index 88323d872134f0d8b431676f35ba65880ca594aa..ff3acc5c1ef94db274b9cf9b13cf8999f195efd6 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h @@ -201,6 +201,8 @@ public: DBStatus OperateDataStatus(uint32_t dataOperator) override; + DBStatus SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; + void SetHandle(void *handle); private: diff --git a/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h b/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h index b102ffb925a9cf13f587c1a7410d9d5c8f942328..3ad0172cc6d8c6c83263e1de6291f57e7046df5b 100644 --- a/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h +++ b/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h @@ -174,6 +174,8 @@ public: virtual int ClearCloudWatermark() = 0; virtual int OperateDataStatus(uint32_t dataOperator) = 0; + + virtual int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) = 0; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp index 957d0dc8d2a9841a9b241570caffcc5a6550fb31..ca3566fbce93031f2a512f7aaedb3b13e36d5a9f 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp +++ b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp @@ -466,4 +466,10 @@ int GenericKvDBConnection::OperateDataStatus([[gnu::unused]] uint32_t dataOperat { return -E_NOT_SUPPORT; } + +int GenericKvDBConnection::SetDeviceSyncNotify([[gnu::unused]] DeviceSyncEvent event, + [[gnu::unused]] const DeviceSyncNotifier ¬ifier) +{ + return -E_NOT_SUPPORT; +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h index fe37fc7718f7714e3629a9ea73743980fd8ad7ef..194dc3f0677a2f084933de0a860c138f641a023e 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h +++ b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h @@ -116,6 +116,8 @@ public: int ClearCloudWatermark() override; int OperateDataStatus(uint32_t dataOperator) override; + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; protected: // Get the stashed 'KvDB_ pointer' without ref. template diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp index a624def667c87f0b99eab089e156f8b33faeaa72..20436b6e991d74a85cfcc7286094e1dd636bdc30 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp @@ -738,4 +738,9 @@ bool SyncAbleKvDB::CheckSchemaSupportForCloudSync() const return true; // default is valid } #endif + +int SyncAbleKvDB::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + return syncer_.SetDeviceSyncNotify(event, notifier); +} } diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h index ae32b9bc3d9827b7bfc86a5a229805ea50664cf6..6d137fc95ded2cc73c337e766558a36181ba3c0f 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h @@ -119,6 +119,8 @@ public: void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback); #endif + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier); protected: virtual IKvDBSyncInterface *GetSyncInterface() = 0; diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp index 6c7b10775b54ed1d29ed761e1d81499e85261a7f..719ae4bf46ee26dc926345d1124788465594c793 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp @@ -457,4 +457,13 @@ int SyncAbleKvDBConnection::SetReceiveDataInterceptor(const DataInterceptor &int kvDB->SetReceiveDataInterceptor(interceptor); return E_OK; } + +int SyncAbleKvDBConnection::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + auto *kvDB = GetDB(); + if (kvDB == nullptr) { + return -E_INVALID_CONNECTION; + } + return kvDB->SetDeviceSyncNotify(event, notifier); +} } diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h index 4594b3dd9b4caaa021e6ad496fd88eca7c9c99d3..441e12efd214636a13301f5e1ec4cf72e1438e71 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h @@ -50,6 +50,8 @@ public: void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback) override; #endif + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; protected: int DisableManualSync(); diff --git a/frameworks/libs/distributeddb/syncer/include/isyncer.h b/frameworks/libs/distributeddb/syncer/include/isyncer.h index 426f83df086975b316e45bc2578bb087e0f279fc..cfa1108e4907fbd896111f456cd6fc842fb1dd98 100644 --- a/frameworks/libs/distributeddb/syncer/include/isyncer.h +++ b/frameworks/libs/distributeddb/syncer/include/isyncer.h @@ -142,6 +142,8 @@ public: virtual int32_t GetTaskCount() = 0; virtual bool ExchangeClosePending(bool expected) = 0; + + virtual int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) = 0; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h b/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h index 481d592d395924498b9111ef196dc7db11ba54f7..4ed470732fefa037fed68f588de3538a551a1abe 100644 --- a/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h +++ b/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h @@ -123,6 +123,8 @@ public: int32_t GetTaskCount() override; bool ExchangeClosePending(bool expected) override; + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; private: std::mutex syncerLock_; std::shared_ptr syncer_; diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp index 426d110a29064e16d1e2744746402f4ffcc9ea9a..7a104156cfdd2e93ab2d67e4821e51503d4fd265 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp @@ -1314,4 +1314,24 @@ bool GenericSyncer::ExchangeClosePending(bool expected) RefObject::DecObjRef(syncEngine); return res; } + +int GenericSyncer::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + if (event != DeviceSyncEvent::REMOTE_PULL_STARTED) { + LOGE("[GenericSyncer] Invalid device sync event[%d]", static_cast(event)); + return -E_INVALID_ARGS; + } + ISyncEngine *syncEngine = nullptr; + { + std::lock_guard lock(syncerLock_); + if (syncEngine_ == nullptr) { + return -E_NOT_INIT; + } + syncEngine = syncEngine_; + RefObject::IncObjRef(syncEngine); + } + syncEngine->SetRemotePullStartNotify(notifier); + RefObject::DecObjRef(syncEngine); + return E_OK; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h index 0f002124c182d33684ff782e1971a8c214ea47e8..b8f1c74f2b5a3bf49d5f4ccc40855508d1fd7863 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h @@ -120,6 +120,8 @@ public: int32_t GetTaskCount() override; bool ExchangeClosePending(bool expected) override; + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; protected: // trigger query auto sync or auto subscribe diff --git a/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h index 0c652a39858454c0ee5fbba07b85041be371c4e3..a599a68a398effa2f3342cf641837a13c1796586 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h +++ b/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h @@ -108,6 +108,8 @@ public: virtual int32_t GetRemoteQueryTaskCount() = 0; virtual bool ExchangeClosePending(bool expected) = 0; + + virtual void SetRemotePullStartNotify(const DeviceSyncNotifier ¬ifier) = 0; protected: ~ISyncEngine() override {}; }; diff --git a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h index 0ad79138a6402fabb52626a0d97c841ff97f39a3..f1bc040dffb20649fb009c5106ba3b705aa6d39e 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h +++ b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h @@ -196,6 +196,8 @@ public: virtual bool IsRetryTask() const = 0; virtual bool IsSavingTask(uint32_t timeout) const = 0; + + virtual void RegOnRemotePullStart(const std::function &callback) = 0; protected: virtual ~ISyncTaskContext() {}; }; diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp index 9d9d10afbaa9e5261da581f3f9c0e4e4f257f793..72537fd5c00759a8f27c114689fd261b8ee4b047 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp @@ -1024,7 +1024,6 @@ int SingleVerDataSync::DataRequestRecvInner(SingleVerSyncTaskContext *context, c return errCode; } SingleVerDataSyncUtils::UpdateSyncProcess(context, packet); - if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode pullEndWatermark = 0; errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime); diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp index 0b8e449df97826e9cdeb9d0dcaecedcf8baca511..d182f79e0903440c36cd87b5e6e2c46cbb673172 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp @@ -1023,6 +1023,7 @@ void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, Wate LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!"); return; } + context_->NotifyRemotePullStart(); if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) { LOGI("[StateMachine][AddPullResponseTarget] task is already running"); return; diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp index 4b29db84ba769a8f4ce834feb272051d56147f24..f8c4da533a27592224defd2004a4af54a8cefeeb 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp @@ -683,6 +683,9 @@ ISyncTaskContext *SyncEngine::GetSyncTaskContext(const DeviceSyncTarget &target, storage->DecRefCount(); }); context->RegOnSyncTask([this, context] { return ExecSyncTask(context); }); + context->RegOnRemotePullStart([this](const std::string &dev) { + NotifyRemotePullStart(dev); + }); return context; } @@ -1497,4 +1500,61 @@ void SyncEngine::CorrectTargetUserId(std::mapSetTargetUserId(newTargetUserId); syncTaskContextMap_[{targetDev, newTargetUserId}] = context; } + +void SyncEngine::SetRemotePullStartNotify(const DeviceSyncNotifier ¬ifier) +{ + ExtendInfo extendInfo = GetExtendInfo(); + std::lock_guard autoLock(pullStartNotifierMutex_); + pullStartNotifier_ = notifier; + LOGI("[SyncEngine] Set remote pull notify finished appId[%s] userId[%s] storeId[%s]", + DBCommon::StringMiddleMasking(extendInfo.appId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.userId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.storeId).c_str()); +} + +void SyncEngine::NotifyRemotePullStart(const std::string &dev) +{ + ExtendInfo extendInfo = GetExtendInfo(); + RefObject::IncObjRef(this); + int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, dev, extendInfo]() { + DeviceSyncNotifier notifier; + { + std::lock_guard autoLock(pullStartNotifierMutex_); + if (pullStartNotifier_ == nullptr) { + RefObject::DecObjRef(this); + return; + } + notifier = pullStartNotifier_; + } + notifier({dev}); + LOGI("[SyncEngine] Notifier remote pull start appId[%s] userId[%s] storeId[%s]", + DBCommon::StringMiddleMasking(extendInfo.appId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.userId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.storeId).c_str()); + RefObject::DecObjRef(this); + }); + if (errCode != E_OK) { + RefObject::DecObjRef(this); + } + LOGI("[SyncEngine] Schedule notifier remote pull errCode[%d] appId[%s] userId[%s] storeId[%s]", errCode, + DBCommon::StringMiddleMasking(extendInfo.appId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.userId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.storeId).c_str()); +} + +ExtendInfo SyncEngine::GetExtendInfo() const +{ + ExtendInfo extendInfo; + std::lock_guard storeLock(storageMutex_); + if (syncInterface_ == nullptr) { + LOGE("[SyncEngine] Storage is null"); + return extendInfo; + } + DBProperties properties = syncInterface_->GetDbProperties(); + extendInfo.appId = properties.GetStringProp(DBProperties::APP_ID, ""); + extendInfo.userId = properties.GetStringProp(DBProperties::USER_ID, ""); + extendInfo.storeId = properties.GetStringProp(DBProperties::STORE_ID, ""); + extendInfo.subUserId = properties.GetStringProp(DBProperties::SUB_USER, ""); + return extendInfo; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h index 474bca03004e17a9b9123c76607d13f1a9446c76..854c5bf373948581e494517f229b218189e18e97 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h @@ -141,6 +141,8 @@ public: int32_t GetRemoteQueryTaskCount() override; bool ExchangeClosePending(bool expected) override; + + void SetRemotePullStartNotify(const DeviceSyncNotifier ¬ifier) override; protected: // Create a context virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; @@ -156,14 +158,18 @@ protected: void SetSyncInterface(ISyncInterface *syncInterface); ISyncTaskContext *GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode); + void NotifyRemotePullStart(const std::string &dev); + ExtendInfo GetExtendInfo() const; - std::mutex storageMutex_; + mutable std::mutex storageMutex_; ISyncInterface *syncInterface_; // Used to store all send sync task infos (such as pull sync response, and push sync request) std::map syncTaskContextMap_; std::mutex contextMapLock_; std::shared_ptr subManager_; std::function queryAutoSyncCallback_; + std::mutex pullStartNotifierMutex_; + DeviceSyncNotifier pullStartNotifier_; private: diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp index 6c18cf3c049e26cdc90a721789e7be98abddcf1c..f4401277f39f39b52a1d3ac430502e1aa31464b6 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp @@ -888,4 +888,24 @@ void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode) SetCommFailErrCode(errCode); } } + +void SyncTaskContext::RegOnRemotePullStart(const std::function &callback) +{ + std::lock_guard autoLock(remotePullMutex_); + remotePullNotifier_ = callback; +} + +void SyncTaskContext::NotifyRemotePullStart() +{ + std::function notifier; + { + std::lock_guard autoLock(remotePullMutex_); + if (remotePullNotifier_ == nullptr) { + LOGE("[SyncTaskContext] Notifier is null when remote pull start"); + return; + } + notifier = remotePullNotifier_; + } + notifier(deviceId_); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h index 7885b91fea90381835af6fa5a06d49b23309213a..4b3df0daa4f64120af2b73644a631af082e5c45b 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h @@ -224,6 +224,10 @@ public: int GetCommErrCode() const; void SetCommFailErrCode(int errCode); + + void RegOnRemotePullStart(const std::function &callback) override; + + void NotifyRemotePullStart(); protected: const static int KILL_WAIT_SECONDS = INT32_MAX; @@ -302,6 +306,9 @@ protected: volatile uint32_t negotiationCount_; volatile bool isAutoSubscribe_; + mutable std::mutex remotePullMutex_; + std::function remotePullNotifier_; + // For global ISyncTaskContext Set, used by CommErrCallback. static std::mutex synTaskContextSetLock_; static std::set synTaskContextSet_; diff --git a/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp index cabd4bf1056de0ec162be615f542062172e62f02..bb23c4056787fe8443f95485de90e66cff5f2582 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp @@ -297,4 +297,12 @@ bool SyncerProxy::ExchangeClosePending(bool expected) } return syncer_->ExchangeClosePending(expected); } + +int SyncerProxy::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + if (syncer_ == nullptr) { + return -E_NOT_INIT; + } + return syncer_->SetDeviceSyncNotify(event, notifier); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/BUILD.gn b/frameworks/libs/distributeddb/test/BUILD.gn index 1cdb63578fed66e6bdf1c255b5e451c440d67dc5..49290b71ded5217644b609d260435a29a676564f 100644 --- a/frameworks/libs/distributeddb/test/BUILD.gn +++ b/frameworks/libs/distributeddb/test/BUILD.gn @@ -963,6 +963,10 @@ distributeddb_unittest("DistributedDBKvCloudSyncTest") { [ "unittest/common/store_test/kv/distributeddb_kv_cloud_sync_test.cpp" ] } +distributeddb_unittest("DistributedDBKVNotifyTest") { + sources = [ "unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp" ] +} + config("tokenizer_config_unittest") { visibility = [ ":*" ] include_dirs = [ "//foundation/distributeddatamgr/kv_store/frameworks/libs/distributeddb/common/include" ] @@ -1238,6 +1242,7 @@ group("unittest") { ":DistributedDBJsonPrecheckUnitTest", ":DistributedDBKVCompressTest", ":DistributedDBKVDataStatusTest", + ":DistributedDBKVNotifyTest", ":DistributedDBKvCloudSyncTest", ":DistributedDBKvDeviceSyncTest", ":DistributedDBKvMultiUserSyncTest", @@ -1342,8 +1347,8 @@ group("distributeddatamgr_fuzztest") { "fuzztest/kvdelegatemanager_fuzzer:fuzztest", "fuzztest/kvstoreresultset_fuzzer:fuzztest", "fuzztest/nbdelegate_fuzzer:fuzztest", - "fuzztest/parseckeck_fuzzer:fuzztest", "fuzztest/parsecheckfield_fuzzer:fuzztest", + "fuzztest/parseckeck_fuzzer:fuzztest", "fuzztest/query_fuzzer:fuzztest", "fuzztest/querycompare_fuzzer:fuzztest", "fuzztest/queryin_fuzzer:fuzztest", diff --git a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp index ba3041fb91e56c8c7f222a255a11973bd221820b..af120332fb21363a999a705bfdb4e1abba1a66c2 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp @@ -124,13 +124,18 @@ KvStoreNbDelegate *KVGeneralUt::GetDelegate(const DistributedDB::StoreInfo &info } void KVGeneralUt::BlockPush(const StoreInfo &from, const StoreInfo &to, DBStatus expectRet) +{ + BlockDeviceSync(from, to, SyncMode::SYNC_MODE_PUSH_ONLY, expectRet); +} + +void KVGeneralUt::BlockDeviceSync(const StoreInfo &from, const StoreInfo &to, SyncMode mode, DBStatus expectRet) { auto fromStore = GetDelegate(from); ASSERT_NE(fromStore, nullptr); auto toDevice = GetDevice(to); ASSERT_FALSE(toDevice.empty()); std::map syncRet; - tool_.SyncTest(fromStore, {toDevice}, SyncMode::SYNC_MODE_PUSH_ONLY, syncRet); + tool_.SyncTest(fromStore, {toDevice}, mode, syncRet); for (const auto &item : syncRet) { EXPECT_EQ(item.second, expectRet); } diff --git a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h index d80558cf99398253a45b42549c76cff095584d75..52eead0cc11f45452caa93c168199adb6f567366 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h +++ b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h @@ -31,6 +31,7 @@ protected: void SetOption(const KvStoreNbDelegate::Option &option); KvStoreNbDelegate *GetDelegate(const StoreInfo &info) const; void BlockPush(const StoreInfo &from, const StoreInfo &to, DBStatus expectRet = DBStatus::OK); + void BlockDeviceSync(const StoreInfo &from, const StoreInfo &to, SyncMode mode, DBStatus expectRet); DBStatus SetCloud(KvStoreNbDelegate *&delegate, bool invalidSchema = false); static DataBaseSchema GetDataBaseSchema(bool invalidSchema); DBStatus GetDeviceEntries(KvStoreNbDelegate *delegate, const std::string &deviceId, bool isSelfDevice, diff --git a/frameworks/libs/distributeddb/test/unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..61f723916af305b26b741c3290474d9ad5370cda --- /dev/null +++ b/frameworks/libs/distributeddb/test/unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "kv_general_ut.h" + +namespace DistributedDB { +using namespace testing::ext; +class DistributedDBKVNotifyTest : public KVGeneralUt { +public: + void SetUp() override; +protected: + void NotifySyncTest(SyncMode mode, int expectCount); +}; + +void DistributedDBKVNotifyTest::SetUp() +{ + KVGeneralUt::SetUp(); + auto storeInfo1 = GetStoreInfo1(); + ASSERT_EQ(BasicUnitTest::InitDelegate(storeInfo1, "dev1"), E_OK); + auto storeInfo2 = GetStoreInfo2(); + ASSERT_EQ(BasicUnitTest::InitDelegate(storeInfo2, "dev2"), E_OK); +} + +void DistributedDBKVNotifyTest::NotifySyncTest(SyncMode mode, int expectCount) +{ + /** + * @tc.steps: step1. store1 register pull notify + * @tc.expected: step1. register ok. + */ + auto storeInfo1 = GetStoreInfo1(); + auto store1 = GetDelegate(storeInfo1); + ASSERT_NE(store1, nullptr); + std::atomic count = 0; + int ret = store1->SetDeviceSyncNotify(DeviceSyncEvent::REMOTE_PULL_STARTED, + [&count](const DeviceSyncNotifyInfo &info) { + EXPECT_EQ(info.deviceId, "dev2"); + count++; + }); + ASSERT_EQ(ret, OK); + /** + * @tc.steps: step2. store2 pull store1 + * @tc.expected: step2. pull ok and notify was triggered. + */ + auto storeInfo2 = GetStoreInfo2(); + BlockDeviceSync(storeInfo2, storeInfo1, mode, OK); + EXPECT_EQ(count, expectCount); +} + +/** + * @tc.name: NotifySync001 + * @tc.desc: Test notify when pull sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync001, TestSize.Level0) +{ + ASSERT_NO_FATAL_FAILURE(NotifySyncTest(SyncMode::SYNC_MODE_PULL_ONLY, 1)); +} + +/** + * @tc.name: NotifySync002 + * @tc.desc: Test notify when push pull sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync002, TestSize.Level0) +{ + ASSERT_NO_FATAL_FAILURE(NotifySyncTest(SyncMode::SYNC_MODE_PUSH_PULL, 1)); +} + +/** + * @tc.name: NotifySync003 + * @tc.desc: Test notify when push sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync003, TestSize.Level0) +{ + ASSERT_NO_FATAL_FAILURE(NotifySyncTest(SyncMode::SYNC_MODE_PUSH_ONLY, 0)); +} + +/** + * @tc.name: NotifySync005 + * @tc.desc: Test cancel notify when pull sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync004, TestSize.Level0) +{ + /** + * @tc.steps: step1. store1 register pull notify + * @tc.expected: step1. register ok. + */ + auto storeInfo = GetStoreInfo1(); + auto store = GetDelegate(storeInfo); + ASSERT_NE(store, nullptr); + std::atomic count = 0; + int ret = store->SetDeviceSyncNotify(DeviceSyncEvent::REMOTE_PULL_STARTED, + [&count](const DeviceSyncNotifyInfo &info) { + EXPECT_EQ(info.deviceId, "dev2"); + count++; + }); + ASSERT_EQ(ret, OK); + /** + * @tc.steps: step2. store2 pull store + * @tc.expected: step2. pull ok and notify was triggered. + */ + auto storeInfo2 = GetStoreInfo2(); + BlockDeviceSync(storeInfo2, storeInfo, SyncMode::SYNC_MODE_PULL_ONLY, OK); + EXPECT_EQ(count, 1); + /** + * @tc.steps: step3. store2 pull store after store cancel notify + * @tc.expected: step3. pull ok and notify was not triggered. + */ + count = 0; + ASSERT_EQ(store->SetDeviceSyncNotify(DeviceSyncEvent::REMOTE_PULL_STARTED, nullptr), OK); + BlockDeviceSync(storeInfo2, storeInfo, SyncMode::SYNC_MODE_PULL_ONLY, OK); + EXPECT_EQ(count, 0); +} +} \ No newline at end of file