diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_test.cpp index da4bcbfc9f524e4a5c4b34925cd785a3adb2ef82..b754bb9049110a97eda2b0c741aa67d2adb47eba 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_test.cpp @@ -976,8 +976,7 @@ HWTEST_F(DistributedDBSingleVerP2PSyncTest, DeviceOfflineSync001, TestSize.Level item.value.clear(); Key hashKey; DistributedDBToolsUnitTest::CalcHash(key3, hashKey); - g_deviceC->GetData(hashKey, item); - EXPECT_TRUE((item.flag & VirtualDataItem::DELETE_FLAG) == 1); + EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND); item.value.clear(); g_deviceC->GetData(key4, item); EXPECT_TRUE(item.value == value4); @@ -2795,6 +2794,56 @@ HWTEST_F(DistributedDBSingleVerP2PSyncTest, RemoveDeviceData001, TestSize.Level1 thread2.join(); } +/** + * @tc.name: RemoveDeviceData002 + * @tc.desc: test remove device data before sync + * @tc.type: FUNC + * @tc.require: AR000D487B + * @tc.author: zhuwentao + */ +HWTEST_F(DistributedDBSingleVerP2PSyncTest, RemoveDeviceData002, TestSize.Level1) +{ + ASSERT_TRUE(g_kvDelegatePtr != nullptr); + /** + * @tc.steps: step1. sync deviceB data to A and check data + * * @tc.expected: step1. interface return ok + */ + Key key1 = {'1'}; + Key key2 = {'2'}; + Value value = {'1'}; + Timestamp currentTime; + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK); + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK); + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + Value actualValue; + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK); + EXPECT_EQ(actualValue, value); + actualValue.clear(); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK); + EXPECT_EQ(actualValue, value); + /** + * @tc.steps: step2. call RemoveDeviceData + * * @tc.expected: step2. interface return ok + */ + g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId()); + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND); + /** + * @tc.steps: step3. sync to device A again and check data + * * @tc.expected: step3. sync ok + */ + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + actualValue.clear(); + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK); + EXPECT_EQ(actualValue, value); + actualValue.clear(); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK); + EXPECT_EQ(actualValue, value); +} + + /** * @tc.name: CalculateSyncData001 * @tc.desc: Test sync data whose device never synced before @@ -2889,7 +2938,7 @@ HWTEST_F(DistributedDBSingleVerP2PSyncTest, CalculateSyncData004, TestSize.Level /** * @tc.name: CalculateSyncData005 - * @tc.desc: Test CalculateSyncData and close db Concurrently + * @tc.desc: Test CalculateSyncData and rekey Concurrently * @tc.type: FUNC * @tc.require: AR000HI2JS * @tc.author: zhuwentao @@ -2904,19 +2953,24 @@ HWTEST_F(DistributedDBSingleVerP2PSyncTest, CalculateSyncData005, TestSize.Level std::thread thread1([]() { if (g_kvDelegatePtr != nullptr) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); - g_mgr.CloseKvStore(g_kvDelegatePtr); - g_kvDelegatePtr = nullptr; + CipherPassword passwd; // random password + vector passwdBuffer(10, 45); // 10 and 45 as random password. + passwd.SetValue(passwdBuffer.data(), passwdBuffer.size()); + g_kvDelegatePtr->Rekey(passwd); } }); std::thread thread2([&dataSize, &key1, &value1]() { if (g_kvDelegatePtr != nullptr) { dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B); } - uint32_t expectedDataSize = (key1.size() + value1.size()); - uint32_t externalSize = 70u; - uint32_t serialHeadLen = 8u; - ASSERT_GE(static_cast(dataSize), expectedDataSize); - ASSERT_LE(static_cast(dataSize), serialHeadLen + expectedDataSize + externalSize); + if (dataSize > 0) + { + uint32_t expectedDataSize = (key1.size() + value1.size()); + uint32_t externalSize = 70u; + uint32_t serialHeadLen = 8u; + ASSERT_GE(static_cast(dataSize), expectedDataSize); + ASSERT_LE(static_cast(dataSize), serialHeadLen + expectedDataSize + externalSize); + } }); thread1.join(); thread2.join(); @@ -3023,4 +3077,253 @@ HWTEST_F(DistributedDBSingleVerP2PSyncTest, DeviceOfflineSyncTask002, TestSize.L g_kvDelegatePtr = nullptr; } ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK); +} + +/** + * @tc.name: RebuildSync001 + * @tc.desc: rebuild db and sync again + * @tc.type: FUNC + * @tc.require: + * @tc.author: zhuwentao + */ +HWTEST_F(DistributedDBSingleVerP2PSyncTest, RebuildSync001, TestSize.Level3) +{ + ASSERT_TRUE(g_kvDelegatePtr != nullptr); + /** + * @tc.steps: step1. sync deviceB data to A and check data + * * @tc.expected: step1. interface return ok + */ + Key key1 = {'1'}; + Key key2 = {'2'}; + Value value = {'1'}; + Timestamp currentTime; + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK); + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK); + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + + Value actualValue; + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK); + EXPECT_EQ(actualValue, value); + actualValue.clear(); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK); + EXPECT_EQ(actualValue, value); + /** + * @tc.steps: step2. delete db and rebuild + * * @tc.expected: step2. interface return ok + */ + if (g_kvDelegatePtr != nullptr) { + g_mgr.CloseKvStore(g_kvDelegatePtr); + g_kvDelegatePtr = nullptr; + } + ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK); + KvStoreNbDelegate::Option option; + g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback); + ASSERT_TRUE(g_kvDelegateStatus == OK); + ASSERT_TRUE(g_kvDelegatePtr != nullptr); + /** + * @tc.steps: step3. sync to device A again + * * @tc.expected: step3. sync ok + */ + value = {'2'}; + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK); + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + /** + * @tc.steps: step4. check data in device A + * * @tc.expected: step4. check ok + */ + actualValue.clear(); + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK); + EXPECT_EQ(actualValue, value); +} + +/** + * @tc.name: RebuildSync002 + * @tc.desc: test clear remote data when receive data + * @tc.type: FUNC + * @tc.require: AR000D487B + * @tc.author: zhuwentao + */ +HWTEST_F(DistributedDBSingleVerP2PSyncTest, RebuildSync002, TestSize.Level1) +{ + ASSERT_TRUE(g_kvDelegatePtr != nullptr); + std::vector devices; + devices.push_back(g_deviceB->GetDeviceId()); + /** + * @tc.steps: step1. device A SET_WIPE_POLICY + * * @tc.expected: step1. interface return ok + */ + int pragmaData = 2; // 2 means enable + PragmaData input = static_cast(&pragmaData); + EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK); + /** + * @tc.steps: step2. sync deviceB data to A and check data + * * @tc.expected: step2. interface return ok + */ + Key key1 = {'1'}; + Key key2 = {'2'}; + Key key3 = {'3'}; + Key key4 = {'4'}; + Value value = {'1'}; + Timestamp currentTime; + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK); + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK); + EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK); + /** + * @tc.steps: step3. deviceA call pull sync + * @tc.expected: step3. sync should return OK. + */ + std::map result; + ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK); + + /** + * @tc.expected: step4. onComplete should be called, check data + */ + ASSERT_TRUE(result.size() == devices.size()); + for (const auto &pair : result) { + EXPECT_TRUE(pair.second == OK); + } + Value actualValue; + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK); + EXPECT_EQ(actualValue, value); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK); + EXPECT_EQ(actualValue, value); + /** + * @tc.steps: step5. device B rebuild and put some data + * * @tc.expected: step5. rebuild ok + */ + if (g_deviceB != nullptr) { + delete g_deviceB; + g_deviceB = nullptr; + } + g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B); + ASSERT_TRUE(g_deviceB != nullptr); + VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface(); + ASSERT_TRUE(syncInterfaceB != nullptr); + ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK); + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK); + (void)OS::GetCurrentSysTimeInMicrosecond(currentTime); + EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK); + /** + * @tc.steps: step6. sync to device A again and check data + * * @tc.expected: step6. sync ok + */ + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK); + EXPECT_EQ(actualValue, value); + EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK); + EXPECT_EQ(actualValue, value); + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND); +} + +/** + * @tc.name: RebuildSync003 + * @tc.desc: test clear history data when receive ack + * @tc.type: FUNC + * @tc.require: AR000D487B + * @tc.author: zhuwentao + */ +HWTEST_F(DistributedDBSingleVerP2PSyncTest, RebuildSync003, TestSize.Level1) +{ + ASSERT_TRUE(g_kvDelegatePtr != nullptr); + /** + * @tc.steps: step1. sync deviceB data to A and check data + * * @tc.expected: step1. interface return ok + */ + Key key1 = {'1'}; + Key key2 = {'2'}; + Key key3 = {'3'}; + Key key4 = {'4'}; + Value value = {'1'}; + EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp + EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp + EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK); + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK); + Value actualValue; + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK); + EXPECT_EQ(actualValue, value); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK); + EXPECT_EQ(actualValue, value); + VirtualDataItem item; + EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK); + EXPECT_EQ(item.value, value); + /** + * @tc.steps: step2. device B sync to device A,but make it failed + * * @tc.expected: step2. interface return ok + */ + EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp + g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE); + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + /** + * @tc.steps: step3. device B set delay send time + * * @tc.expected: step3. interface return ok + */ + std::set delayDevice = {DEVICE_B}; + g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice);// delay 3000ms one time + /** + * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s + * * @tc.expected: step4. interface return ok + */ + std::thread thread1([]() { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // wait 1s + g_deviceB->SetClearRemoteStaleData(true); + if (g_kvDelegatePtr != nullptr) { + g_mgr.CloseKvStore(g_kvDelegatePtr); + g_kvDelegatePtr = nullptr; + } + ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK); + KvStoreNbDelegate::Option option; + g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback); + ASSERT_TRUE(g_kvDelegateStatus == OK); + ASSERT_TRUE(g_kvDelegatePtr != nullptr); + std::map result; + std::vector devices = {g_deviceB->GetDeviceId()}; + g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE); + ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK); + }); + /** + * @tc.steps: step5. device B sync to A, make it clear history data and check data + * * @tc.expected: step5. interface return ok + */ + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + thread1.join(); + EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND); + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK); + EXPECT_EQ(actualValue, value); + EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK); + EXPECT_EQ(actualValue, value); + g_communicatorAggregator->ResetSendDelayInfo(); +} + +/** + * @tc.name: GetSyncDataFail001 + * @tc.desc: test get sync data failed when sync + * @tc.type: FUNC + * @tc.require: AR000D487B + * @tc.author: zhuwentao + */ + HWTEST_F(DistributedDBSingleVerP2PSyncTest, GetSyncDataFail001, TestSize.Level1) +{ + /** + * @tc.steps: step1. device B set get data errCode control and put some data + * * @tc.expected: step1. interface return ok + */ + g_deviceB->SetGetDataErrCode(1, -E_BUSY, true); + Key key1 = {'1'}; + Value value = {'1'}; + EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp + /** + * @tc.steps: step2. device B sync to device A and check data + * * @tc.expected: step2. interface return ok + */ + EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK); + Value actualValue; + EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND); + g_deviceB->ResetDataControl(); } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp index f4c46fa1cfb7357da519371fb5a6422e4e0e260c..d0a0bff3ad9a1397423090988e1584eeed89027e 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp @@ -271,4 +271,12 @@ int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCon } return TransferDBErrno(errCode); } + +void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData) +{ + if (context_ != nullptr) { + static_cast(context_)->EnableClearRemoteStaleData(isStaleData); + LOGD("set clear remote stale data mark"); + } +} } // DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h index 8c226e440b8c33b2a7f7be208eafe1ea33180c98..04929441cbc09a737e0e7ac5f8e1e72e7ff8f638 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h @@ -20,6 +20,7 @@ #include "ikvdb_sync_interface.h" #include "meta_data.h" #include "remote_executor.h" +#include "single_ver_kv_sync_task_context.h" #include "subscribe_manager.h" #include "sync_task_context.h" #include "store_types.h" @@ -45,6 +46,7 @@ public: virtual int Sync(SyncMode mode, const Query &query, const SyncOperation::UserCallback &callBack, bool wait); virtual int RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout, std::shared_ptr &result); + void SetClearRemoteStaleData(bool isStaleData); protected: ICommunicator *communicateHandle_; VirtualCommunicatorAggregator *communicatorAggregator_; diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp index b272d6e19b46e09045f23742ffb978ff3ee3d050..ac691252f403288758dfbbdb7e1760d3d28baea3 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp @@ -81,6 +81,18 @@ void KvVirtualDevice::DelayGetSyncData(uint64_t milliDelayTime) syncInterface->DelayGetSyncData(milliDelayTime); } +void KvVirtualDevice::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl) +{ + VirtualSingleVerSyncDBInterface *syncInterface = static_cast(storage_); + syncInterface->SetGetDataErrCode(whichTime, errCode, isGetDataControl); +} + +void KvVirtualDevice::ResetDataControl() +{ + VirtualSingleVerSyncDBInterface *syncInterface = static_cast(storage_); + syncInterface->ResetDataControl(); +} + int KvVirtualDevice::Subscribe(QuerySyncObject query, bool wait, int id) { auto operation = new (std::nothrow) SyncOperation(id, {remoteDeviceId_}, SUBSCRIBE_QUERY, nullptr, wait); diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.h index b95e5402958ffedd3eb1d88fb4b1ea0ec0236025..b905510d5126e732773254c605db80724237c051 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.h @@ -34,6 +34,8 @@ public: int Commit(); void SetSaveDataDelayTime(uint64_t milliDelayTime); void DelayGetSyncData(uint64_t milliDelayTime); + void SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl); + void ResetDataControl(); int Subscribe(QuerySyncObject query, bool wait, int id); int UnSubscribe(QuerySyncObject query, bool wait, int id); diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp index f3dc31a4b91122f0e3345a66fb2eb9d94d13cd94..84361b0953a9fe8b6f6e40c600417049955df6b1 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp @@ -87,12 +87,17 @@ int VirtualCommunicator::GetRemoteCommunicatorVersion(const std::string &deviceI return E_OK; } -void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Message *inMsg) const +void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Message *inMsg) { std::lock_guard lock(onMessageLock_); - if (isEnable_ && onMessage_ && (srcTarget != deviceId_)) { + if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) || + ((inMsg->GetMessageId() == dropMsgId_) && (dropMsgTimes_ == 0)))) { onMessage_(srcTarget, inMsg); } else { + LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str()); + if (dropMsgTimes_ > 0) { + dropMsgTimes_--; + } delete inMsg; inMsg = nullptr; } @@ -114,7 +119,7 @@ void VirtualCommunicator::CallbackOnConnect(const std::string &target, bool isCo uint32_t VirtualCommunicator::GetCommunicatorMtuSize() const { - return 5 * 1024 * 1024; // 5 * 1024 * 1024B + return mtuSize_; } uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) const @@ -122,6 +127,11 @@ uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) return GetCommunicatorMtuSize(); } +void VirtualCommunicator::SetCommunicatorMtuSize(uint32_t mtuSize) +{ + mtuSize_ = mtuSize; +} + uint32_t VirtualCommunicator::GetTimeout() const { return timeout_; @@ -211,4 +221,13 @@ int VirtualCommunicator::TranslateMsg(const Message *inMsg, Message *&outMsg) buffer = nullptr; return errCode; } + +void VirtualCommunicator::SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes) +{ + dropMsgId_ = msgid; + dropMsgTimes_ = dropTimes; + if (msgid == UNKNOW_MESSAGE) { + dropMsgTimes_ = 0; + } +} } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h index f202d1dad60b8c803240c504da624f7c6cbc74a6..e79af68791f79bb8a0143da14c4ccc6eaf94c597 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h @@ -27,6 +27,7 @@ #include "icommunicator.h" #include "ref_object.h" #include "serial_buffer.h" +#include "sync_types.h" namespace DistributedDB { class VirtualCommunicatorAggregator; @@ -46,6 +47,7 @@ public: uint32_t GetCommunicatorMtuSize() const override; uint32_t GetCommunicatorMtuSize(const std::string &target) const override; + void SetCommunicatorMtuSize(uint32_t mtuSize); uint32_t GetTimeout() const override; uint32_t GetTimeout(const std::string &target) const override; @@ -59,7 +61,7 @@ public: int GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const override; - void CallbackOnMessage(const std::string &srcTarget, Message *inMsg) const; + void CallbackOnMessage(const std::string &srcTarget, Message *inMsg); void CallbackOnConnect(const std::string &target, bool isConnect) const; @@ -77,6 +79,8 @@ public: bool IsDeviceOnline(const std::string &device) const override; + void SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes = 1); + private: int TimeSync(); int DataSync(); @@ -101,6 +105,9 @@ private: VirtualCommunicatorAggregator *communicatorAggregator_; uint32_t timeout_ = 5 * 1000; // 5 * 1000ms + MessageId dropMsgId_ = MessageId::UNKNOW_MESSAGE; + uint32_t dropMsgTimes_ = 0; + uint32_t mtuSize_ = 5 * 1024 * 1024; // 5 * 1024 * 1024B }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp index 7e40b97c0bd716882010664fa9cc54e11c910ebb..0bc301134eae2c8635df496a1887250195842948 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.cpp @@ -189,13 +189,29 @@ void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget msg->SetTarget(srcTarget); RefObject::IncObjRef(communicator); auto onDispatch = onDispatch_; - std::thread thread([communicator, srcTarget, dstTarget, msg, onDispatch]() { + bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (msg->GetMessageId() == delayMessageId_) && + (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0)); + uint32_t sendDelayTime = sendDelayTime_; + std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() { + if (isNeedDelay) { + LOGD("begin to delay message for %" PRIu32 "ms dstTarget %s", sendDelayTime, dstTarget.c_str()); + std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime)); + } if (onDispatch) { onDispatch(dstTarget, msg); } communicator->CallbackOnMessage(srcTarget, msg); RefObject::DecObjRef(communicator); }); + if ((skipTimes_ == 0) && delayTimes_ > 0 && (inMsg->GetMessageId() == delayMessageId_) && + (delayDevices_.count(dstTarget) > 0)) { + delayTimes_--; + } + LOGI("before skipTimes_=%u", skipTimes_); + if (skipTimes_ > 0 && (inMsg->GetMessageId() == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) { + skipTimes_--; + } + LOGI("skipTimes_=%u", skipTimes_); thread.detach(); CallSendEnd(E_OK, onEnd); } else { @@ -258,4 +274,40 @@ void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint communicators_[deviceId]->SetTimeout(timeout); } } + +void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid, + uint32_t dropTimes) +{ + std::lock_guard lock(communicatorsLock_); + if (communicators_.find(deviceId) != communicators_.end()) { + communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes); + } +} + +void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize) +{ + std::lock_guard lock(communicatorsLock_); + if (communicators_.find(deviceId) != communicators_.end()) { + communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize); + } +} + +void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId, + uint32_t delayTimes, uint32_t skipTimes, std::set &delayDevices) +{ + sendDelayTime_ = sendDelayTime; + delayMessageId_ = delayMessageId; + delayTimes_ = delayTimes; + delayDevices_ = delayDevices; + skipTimes_ = skipTimes; +} + +void VirtualCommunicatorAggregator::ResetSendDelayInfo() +{ + sendDelayTime_ = 0; + delayMessageId_ = INVALID_MESSAGE_ID; + delayTimes_ = 0; + skipTimes_ = 0; + delayDevices_.clear(); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h index c1848a2cac9aae93f46d01890b30bc1c9742a384..5c5a0d5054213d961c1386e16ce4451b497229ea 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator_aggregator.h @@ -17,6 +17,7 @@ #define VIRTUAL_ICOMMUNICATORAGGREGATOR_H #include +#include #include "icommunicator_aggregator.h" #include "virtual_communicator.h" @@ -72,6 +73,14 @@ public: void SetTimeout(const std::string &deviceId, uint32_t timeout); + void SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid, uint32_t dropTimes = 1); + + void SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize); + + void SetSendDelayInfo(uint32_t sendDelayTime, uint32_t DelayMessageId, uint32_t delayTimes, uint32_t skipTimes, + std::set &delayDevices); + void ResetSendDelayInfo(); + ~VirtualCommunicatorAggregator() {}; VirtualCommunicatorAggregator() {}; @@ -89,6 +98,12 @@ private: OnConnectCallback onConnect_; std::function onDispatch_; std::string userId_; + + uint32_t sendDelayTime_ = 0; + uint32_t delayMessageId_ = INVALID_MESSAGE_ID; + uint32_t delayTimes_ = 0; // ms + uint32_t skipTimes_ = 0; + std::set delayDevices_; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp index 8926e51ac08e3bffbedeec6d4ce2d2f724957eca..ee6e958cf138aafa5b0ac38250552d6b0e293f78 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp @@ -15,6 +15,7 @@ #ifdef RELATIONAL_STORE #include "db_common.h" #include "generic_single_ver_kv_entry.h" +#include "platform_specific.h" #include "relational_remote_query_continue_token.h" #include "runtime_context.h" #include "virtual_relational_ver_sync_db_interface.h" @@ -62,6 +63,12 @@ namespace { } } +VirtualRelationalVerSyncDBInterface::VirtualRelationalVerSyncDBInterface() +{ + (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_); + LOGD("virtual device init db createTime"); +} + int VirtualRelationalVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &object, const std::vector &entries, const std::string &deviceName) { @@ -163,6 +170,7 @@ void VirtualRelationalVerSyncDBInterface::SetSchemaInfo(const RelationalSchemaOb int VirtualRelationalVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const { + outTime = dbCreateTime_; return E_OK; } diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h index a7ca8f479b25a76808da79a77a67248f6614beeb..07f9a87c9beb19397b91e763e96cda7f4f5a04c5 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h @@ -37,7 +37,7 @@ struct VirtualRowData { class VirtualRelationalVerSyncDBInterface : public RelationalDBSyncInterface { public: - VirtualRelationalVerSyncDBInterface() = default; + VirtualRelationalVerSyncDBInterface(); ~VirtualRelationalVerSyncDBInterface() override = default; int PutSyncDataWithQuery(const QueryObject &query, const std::vector &entries, @@ -133,6 +133,7 @@ private: RelationalDBProperties rdbProperties_; SecurityOption secOption_; bool permitCreateDistributedTable_ = true; + uint64_t dbCreateTime_; }; } #endif diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.cpp index 259868e48723deb6b7ec9259b5a96a430c2a0b70..1c30be15e558a375d713c541b56b15dfd43af247 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.cpp @@ -23,6 +23,7 @@ #include "generic_single_ver_kv_entry.h" #include "intercepted_data_impl.h" #include "log_print.h" +#include "platform_specific.h" #include "query_object.h" #include "securec.h" @@ -57,6 +58,13 @@ namespace { return errCode; } } + +VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface() +{ + (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_); + LOGD("virtual device init db createTime"); +} + int VirtualSingleVerSyncDBInterface::GetInterfaceType() const { return SYNC_SVD; @@ -173,6 +181,15 @@ int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceN { std::lock_guard autoLock(deviceDataLock_); deviceData_.erase(deviceName); + uint32_t devId = 0; + if (deviceMapping_.find(deviceName) != deviceMapping_.end()) { + devId = deviceMapping_[deviceName]; + } + for (auto &item : dbData_) { + if (item.deviceId_ == devId && devId > 0) { + item.flag = VirtualDataItem::DELETE_FLAG; + } + } LOGD("RemoveDeviceData FINISH"); return E_OK; } @@ -182,6 +199,9 @@ int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem auto iter = std::find_if(dbData_.begin(), dbData_.end(), [key](const VirtualDataItem& item) { return item.key == key; }); if (iter != dbData_.end()) { + if (iter->flag == VirtualDataItem::DELETE_FLAG) { + return -E_NOT_FOUND; + } dataItem.key = iter->key; dataItem.value = iter->value; dataItem.timestamp = iter->timestamp; @@ -218,7 +238,13 @@ int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector &dataItems, ContinueToken &continueStmtToken) const { - std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_)); + if (getDataDelayTime_ > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_)); + } + int errCode = DataControl(); + if (errCode != E_OK) { + return errCode; + } for (const auto &data : dbData_) { if (data.isLocal) { if (data.writeTimestamp >= begin && data.writeTimestamp < end) { @@ -248,6 +274,11 @@ int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector& dataItems, const std::string &deviceName) { + if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) { + availableDeviceId_++; + deviceMapping_[deviceName] = availableDeviceId_; + LOGD("put deviceName=%s into device map", deviceName.c_str()); + } for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) { LOGD("PutSyncData"); auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(), @@ -262,6 +293,7 @@ int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector& d dbDataIter->writeTimestamp = iter->writeTimestamp; dbDataIter->flag = iter->flag; dbDataIter->isLocal = false; + dbDataIter->deviceId_ = deviceMapping_[deviceName]; } else { LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp); VirtualDataItem dataItem; @@ -271,6 +303,7 @@ int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector& d dataItem.writeTimestamp = iter->writeTimestamp; dataItem.flag = iter->flag; dataItem.isLocal = false; + dataItem.deviceId_ = deviceMapping_[deviceName]; dbData_.push_back(dataItem); } } @@ -312,6 +345,7 @@ void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const { + outTime = dbCreateTime_; return E_OK; } @@ -319,7 +353,13 @@ int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncT const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, std::vector &entries) const { - std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_)); + if (getDataDelayTime_ > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_)); + } + int errCode = DataControl(); + if (errCode != E_OK) { + return errCode; + } const auto &startKey = query.GetPrefixKey(); Key endKey = startKey; endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX); @@ -448,4 +488,34 @@ void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime) { getDataDelayTime_ = milliDelayTime; } + +void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl) +{ + countDown_ = whichTime; + expectedErrCode_ = errCode; + isGetDataControl_ = isGetDataControl; +} + +int VirtualSingleVerSyncDBInterface::DataControl() const +{ + static int getDataTimes = 0; + if (countDown_ == -1) { // init -1 + getDataTimes = 0; + } + if (isGetDataControl_ && countDown_ > 0) { + getDataTimes++; + } + if (isGetDataControl_ && countDown_ == getDataTimes) { + LOGD("virtual device get data failed = %d", expectedErrCode_); + getDataTimes = 0; + return expectedErrCode_; + } + return E_OK; +} + +void VirtualSingleVerSyncDBInterface::ResetDataControl() +{ + countDown_ = -1; + expectedErrCode_ = E_OK; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.h index efb16751af2dfc4f062bc44651edf7c4086ac33b..d938ab238d4ab379ed434919bb475d24bbc355d1 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_single_ver_sync_db_Interface.h @@ -32,11 +32,13 @@ struct VirtualDataItem { Timestamp writeTimestamp = 0; uint64_t flag = 0; bool isLocal = true; + uint32_t deviceId_ = 0; // 0: means local static const uint64_t DELETE_FLAG = 0x01; static const uint64_t LOCAL_FLAG = 0x02; }; class VirtualSingleVerSyncDBInterface : public SingleVerKvDBSyncInterface { public: + VirtualSingleVerSyncDBInterface(); int GetInterfaceType() const override; void IncRefCount() override; @@ -131,6 +133,9 @@ public: void SetDbProperties(KvDBProperties &kvDBProperties); void DelayGetSyncData(uint32_t milliDelayTime); + + void SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl); + void ResetDataControl(); private: int GetSyncData(Timestamp begin, Timestamp end, uint32_t blockSize, std::vector& dataItems, ContinueToken& continueStmtToken) const; @@ -140,8 +145,12 @@ private: int PutSyncData(std::vector& dataItems, const std::string &deviceName); + int DataControl() const; + mutable std::map, std::vector> metadata_; std::vector dbData_; + std::map deviceMapping_; // key: deviceName, value: deviceId + uint32_t availableDeviceId_ = 0; std::string schema_; SchemaObject schemaObj_; KvDBProperties properties_; @@ -153,6 +162,11 @@ private: std::map> deviceData_; std::vector identifier_; uint64_t getDataDelayTime_ = 0; + uint64_t dbCreateTime_; + + int countDown_ = -1; + int expectedErrCode_ = E_OK; + bool isGetDataControl_ = true; // control get data: true, control save data : false }; } // namespace DistributedDB