diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp deleted file mode 100644 index a584b80713f2b36dbfe515e99c9058c2516e9ec4..0000000000000000000000000000000000000000 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ /dev/null @@ -1,2163 +0,0 @@ -/* - * Copyright (c) 2023 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "cloud_syncer.h" - -#include -#include -#include - -#include "cloud_db_constant.h" -#include "cloud/cloud_storage_utils.h" -#include "cloud_sync_utils.h" -#include "db_errno.h" -#include "cloud/icloud_db.h" -#include "kv_store_errno.h" -#include "log_print.h" -#include "platform_specific.h" -#include "runtime_context.h" -#include "strategy_factory.h" -#include "storage_proxy.h" -#include "store_types.h" - -namespace DistributedDB { -namespace { - const TaskId INVALID_TASK_ID = 0u; - const int MAX_HEARTBEAT_FAILED_LIMIT = 2; - const int HEARTBEAT_PERIOD = 3; - const int MAX_DOWNLOAD_COMMIT_LIMIT = 5; -} - -CloudSyncer::CloudSyncer(std::shared_ptr storageProxy) - : currentTaskId_(INVALID_TASK_ID), - storageProxy_(std::move(storageProxy)), - queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT), - closed_(false), - timerId_(0u), - heartBeatCount_(0), - failedHeartBeatCount_(0), - syncCallbackCount_(0) -{ - if (storageProxy_ != nullptr) { - id_ = storageProxy_->GetIdentify(); - } -} - -int CloudSyncer::Sync(const std::vector &devices, SyncMode mode, - const std::vector &tables, const SyncProcessCallback &callback, int64_t waitTime) -{ - int errCode = CheckParamValid(devices, mode); - if (errCode != E_OK) { - return errCode; - } - if (cloudDB_.IsNotExistCloudDB()) { - return -E_CLOUD_ERROR; - } - if (closed_) { - return -E_DB_CLOSED; - } - CloudTaskInfo taskInfo; - taskInfo.mode = mode; - taskInfo.table = tables; - taskInfo.callback = callback; - taskInfo.timeout = waitTime; - taskInfo.devices = devices; - errCode = TryToAddSyncTask(std::move(taskInfo)); - if (errCode != E_OK) { - return errCode; - } - return TriggerSync(); -} - -void CloudSyncer::SetCloudDB(const std::shared_ptr &cloudDB) -{ - cloudDB_.SetCloudDB(cloudDB); - LOGI("[CloudSyncer] SetCloudDB finish"); -} - -void CloudSyncer::SetIAssetLoader(const std::shared_ptr &loader) -{ - cloudDB_.SetIAssetLoader(loader); - LOGI("[CloudSyncer] SetIAssetLoader finish"); -} - -void CloudSyncer::Close() -{ - closed_ = true; - CloudSyncer::TaskId currentTask; - { - std::lock_guard autoLock(contextLock_); - currentTask = currentContext_.currentTaskId; - } - // mark current task db_closed - SetTaskFailed(currentTask, -E_DB_CLOSED); - cloudDB_.Close(); - { - LOGD("[CloudSyncer] begin wait current task finished"); - std::unique_lock uniqueLock(contextLock_); - contextCv_.wait(uniqueLock, [this]() { - return currentContext_.currentTaskId == INVALID_TASK_ID; - }); - LOGD("[CloudSyncer] current task has been finished"); - } - - // copy all task from queue - std::vector infoList; - { - std::lock_guard autoLock(queueLock_); - for (const auto &item: cloudTaskInfos_) { - infoList.push_back(item.second); - } - taskQueue_.clear(); - cloudTaskInfos_.clear(); - } - // notify all DB_CLOSED - for (auto &info: infoList) { - info.status = ProcessStatus::FINISHED; - info.errCode = -E_DB_CLOSED; - ProcessNotifier notifier(this); - notifier.Init(info.table, info.devices); - notifier.NotifyProcess(info, {}, true); - LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", info.taskId, info.errCode); - } - storageProxy_->Close(); - WaitAllSyncCallbackTaskFinish(); -} - -int CloudSyncer::TriggerSync() -{ - if (closed_) { - return -E_DB_CLOSED; - } - RefObject::IncObjRef(this); - int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() { - DoSyncIfNeed(); - RefObject::DecObjRef(this); - }); - if (errCode != E_OK) { - LOGW("[CloudSyncer] schedule sync task failed %d", errCode); - RefObject::DecObjRef(this); - } - return errCode; -} - -void CloudSyncer::DoSyncIfNeed() -{ - if (closed_) { - return; - } - // get taskId from queue - TaskId triggerTaskId; - { - std::lock_guard autoLock(queueLock_); - if (taskQueue_.empty()) { - return; - } - triggerTaskId = taskQueue_.front(); - } - // pop taskId in queue - if (PrepareSync(triggerTaskId) != E_OK) { - return; - } - // do sync logic - int errCode = DoSync(triggerTaskId); - // finished after sync - DoFinished(triggerTaskId, errCode, {}); - // do next task async - (void)TriggerSync(); -} - -int CloudSyncer::DoSync(TaskId taskId) -{ - std::lock_guard lock(syncMutex_); - CloudTaskInfo taskInfo; - { - std::lock_guard autoLock(queueLock_); - taskInfo = cloudTaskInfos_[taskId]; - } - int errCode = LockCloud(taskId); - if (errCode != E_OK) { - return errCode; - } - bool needUpload = true; - { - std::lock_guard autoLock(contextLock_); - needUpload = currentContext_.strategy->JudgeUpload(); - } - errCode = DoSyncInner(taskInfo, needUpload); - int unlockCode = UnlockCloud(); - if (errCode == E_OK) { - errCode = unlockCode; - } - return errCode; -} - -int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload) -{ - if (!needUpload) { - return E_OK; - } - int errCode = storageProxy_->StartTransaction(); - if (errCode != E_OK) { - LOGE("[CloudSyncer] start transaction Failed before doing upload."); - return errCode; - } - for (size_t i = 0; i < taskInfo.table.size(); ++i) { - size_t index = i + 1; - LOGD("[CloudSyncer] try upload table, index: %zu", index); - errCode = CheckTaskIdValid(taskInfo.taskId); - if (errCode != E_OK) { - LOGE("[CloudSyncer] task is invalid, abort sync"); - break; - } - { - std::lock_guard autoLock(contextLock_); - currentContext_.tableName = taskInfo.table[i]; - } - errCode = DoUpload(taskInfo.taskId, i == (taskInfo.table.size() - 1u)); - if (errCode != E_OK) { - LOGE("[CloudSyncer] upload failed %d", errCode); - break; - } - errCode = SaveCloudWaterMark(taskInfo.table[i]); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Can not save cloud water mark after uploading %d", errCode); - break; - } - } - if (errCode == E_OK) { - int commitErrorCode = storageProxy_->Commit(); - if (commitErrorCode != E_OK) { - LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode); - } - } else { - int rollBackErrorCode = storageProxy_->Rollback(); - if (rollBackErrorCode != E_OK) { - LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode); - } - } - return errCode; -} - -int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload) -{ - int errCode = E_OK; - for (size_t i = 0; i < taskInfo.table.size(); ++i) { - size_t index = i + 1; - LOGD("[CloudSyncer] try download table, index: %zu", index); - errCode = CheckTaskIdValid(taskInfo.taskId); - if (errCode != E_OK) { - LOGE("[CloudSyncer] task is invalid, abort sync"); - return errCode; - } - { - std::lock_guard autoLock(contextLock_); - currentContext_.tableName = taskInfo.table[i]; - } - errCode = DoDownload(taskInfo.taskId); - if (errCode != E_OK) { - LOGE("[CloudSyncer] download failed %d", errCode); - return errCode; - } - if (needUpload) { - continue; - } - errCode = SaveCloudWaterMark(taskInfo.table[i]); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode); - return errCode; - } - } - - return DoUploadInNeed(taskInfo, needUpload); -} - -void CloudSyncer::DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo) -{ - { - std::lock_guard autoLock(queueLock_); - taskQueue_.remove(taskId); - } - std::shared_ptr notifier = nullptr; - { - // check current task is running or not - std::lock_guard autoLock(contextLock_); - if (currentContext_.currentTaskId != taskId) { // should not happen - LOGW("[CloudSyncer] taskId %" PRIu64 " not exist in context!", taskId); - return; - } - currentContext_.currentTaskId = INVALID_TASK_ID; - notifier = currentContext_.notifier; - currentContext_.notifier = nullptr; - currentContext_.strategy = nullptr; - currentContext_.tableName.clear(); - currentContext_.assetDownloadList.completeDownloadList.clear(); - currentContext_.assetDownloadList.downloadList.clear(); - currentContext_.assetFields.clear(); - currentContext_.assetsInfo.clear(); - currentContext_.cloudWaterMarks.clear(); - } - CloudTaskInfo info; - { - std::lock_guard autoLock(queueLock_); - if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen - LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId); - contextCv_.notify_one(); - return; - } - info = std::move(cloudTaskInfos_[taskId]); - cloudTaskInfos_.erase(taskId); - } - contextCv_.notify_one(); - if (info.errCode == E_OK) { - info.errCode = errCode; - } - LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", taskId, info.errCode); - info.status = ProcessStatus::FINISHED; - if (notifier != nullptr) { - notifier->NotifyProcess(info, processInfo, true); - } -} - -static int SaveChangedDataByType(const VBucket &datum, ChangedData &changedData, const DataInfoWithLog &localInfo, - ChangeType type) -{ - int ret = E_OK; - std::vector cloudPkVals; - if (type == ChangeType::OP_DELETE) { - ret = GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey, cloudPkVals); - } else { - ret = GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals); - } - if (ret != E_OK) { - return ret; - } - changedData.primaryData[type].emplace_back(std::move(cloudPkVals)); - return E_OK; -} - -int CloudSyncer::FindDeletedListIndex(const std::vector> &deletedList, const Key &hashKey, - size_t &delIdx) -{ - for (std::pair pair : deletedList) { - if (pair.first == hashKey) { - delIdx = pair.second; - return E_OK; - } - } - return E_INTERNAL_ERROR; -} - -int CloudSyncer::SaveChangedData(SyncParam ¶m, int downIndex, const DataInfo &dataInfo, - WithoutRowIdData &withoutRowIdData, std::vector> &deletedList) -{ - OpType opType = param.downloadData.opType[downIndex]; - Key hashKey = dataInfo.localInfo.logInfo.hashKey; - if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) { - if (opType == OpType::INSERT) { - size_t delIdx; - int errCode = FindDeletedListIndex(deletedList, hashKey, delIdx); - if (errCode != E_OK) { - LOGE("[CloudSyncer] FindDeletedListIndex could not find delete item."); - return errCode; - } - param.changedData.primaryData[ChangeType::OP_DELETE].erase( - param.changedData.primaryData[ChangeType::OP_DELETE].begin() + delIdx); - (void)param.dupHashKeySet.insert(hashKey); - opType = OpType::UPDATE; - // only composite primary key needs to be processed. - if (!param.isSinglePrimaryKey) { - withoutRowIdData.updateData.push_back(std::make_tuple(downIndex, - param.changedData.primaryData[ChangeType::OP_UPDATE].size())); - } - } else if (opType == OpType::DELETE) { - std::pair pair{hashKey, static_cast( - param.changedData.primaryData[ChangeType::OP_DELETE].size())}; - deletedList.emplace_back(pair); - } else { - LOGW("[CloudSyncer] deletePrimaryKeySet ignore opType %d.", opType); - } - } - // INSERT: for no primary key or composite primary key situation - if (!param.isSinglePrimaryKey && opType == OpType::INSERT) { - withoutRowIdData.insertData.push_back(downIndex); - return E_OK; - } - switch (opType) { - // INSERT: only for single primary key situation - case OpType::INSERT: - return SaveChangedDataByType( - param.downloadData.data[downIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT); - case OpType::UPDATE: - if (NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) { - return SaveChangedDataByType(param.downloadData.data[downIndex], param.changedData, - dataInfo.localInfo, ChangeType::OP_UPDATE); - } - break; - case OpType::DELETE: - return SaveChangedDataByType(param.downloadData.data[downIndex], param.changedData, - dataInfo.localInfo, ChangeType::OP_DELETE); - default: - break; - } - return E_OK; -} - -static LogInfo GetCloudLogInfo(VBucket &datum) -{ - LogInfo cloudLogInfo = { 0 }; - cloudLogInfo.timestamp = (Timestamp)std::get(datum[CloudDbConstant::MODIFY_FIELD]); - cloudLogInfo.wTimestamp = (Timestamp)std::get(datum[CloudDbConstant::CREATE_FIELD]); - cloudLogInfo.flag = (std::get(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u; - cloudLogInfo.cloudGid = std::get(datum[CloudDbConstant::GID_FIELD]); - return cloudLogInfo; -} - -/** - * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db. -*/ -int CloudSyncer::UpdateChangedData(DownloadData &downloadData, const WithoutRowIdData &withoutRowIdData, - ChangedData &changedData) -{ - if (withoutRowIdData.insertData.empty() && withoutRowIdData.updateData.empty()) { - return E_OK; - } - for (size_t j : withoutRowIdData.insertData) { - VBucket &datum = downloadData.data[j]; - changedData.primaryData[ChangeType::OP_INSERT].push_back({datum[CloudDbConstant::ROW_ID_FIELD_NAME]}); - } - for (const auto &tuple : withoutRowIdData.updateData) { - size_t downloadIndex = std::get<0>(tuple); - size_t updateIndex = std::get<1>(tuple); - VBucket &datum = downloadData.data[downloadIndex]; - size_t size = changedData.primaryData[ChangeType::OP_UPDATE].size(); - if (updateIndex >= size) { - LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size); - return E_INTERNAL_ERROR; - } - if (changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) { - LOGE("[CloudSyncer] primary key value list should not be empty."); - return E_INTERNAL_ERROR; - } - // no primary key or composite primary key, the first element is rowid - changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] = datum[CloudDbConstant::ROW_ID_FIELD_NAME]; - } - return E_OK; -} - -static void TagAsset(AssetOpType flag, AssetStatus status, Asset &asset, Assets &res) -{ - if (asset.status == static_cast(AssetStatus::DELETE)) { - asset.flag = static_cast(AssetOpType::DELETE); - } else { - asset.flag = static_cast(flag); - } - asset.status = static_cast(status); - Timestamp timestamp; - if (OS::GetCurrentSysTimeInMicrosecond(timestamp) != E_OK) { - // We set timestamp to zero here so that once client access current asset, the difference between access time - // and zero will be infinity, therefore outnumber the threshold, and client will do sync again - timestamp = 0; - LOGW("Can not get current timestamp and set timestamp to zero"); - } - asset.timestamp = static_cast(timestamp / CloudDbConstant::TEN_THOUSAND); - asset.status = asset.flag == static_cast(AssetOpType::NO_CHANGE) ? - static_cast(AssetStatus::NORMAL) : asset.status; - res.push_back(asset); -} - -static void TagAssetWithNormalStatus(const bool isNormalStatus, AssetOpType flag, Asset &asset, Assets &res) -{ - if (isNormalStatus) { - TagAsset(flag, AssetStatus::NORMAL, asset, res); - return; - } - TagAsset(flag, AssetStatus::DOWNLOADING, asset, res); -} - -static void TagAssetsWithNormalStatus(const bool isNormalStatus, AssetOpType flag, Assets &assets, Assets &res) -{ - for (Asset &asset : assets) { - TagAssetWithNormalStatus(isNormalStatus, flag, asset, res); - } -} - -template -static bool IsDataContainField(const std::string &assetFieldName, const VBucket &data) -{ - auto assetIter = data.find(assetFieldName); - if (assetIter == data.end()) { - return false; - } - // When type of Assets is not Nil but a vector which size is 0, we think data is not contain this field. - if (assetIter->second.index() == TYPE_INDEX) { - if (std::get(assetIter->second).empty()) { - return false; - } - } - if (assetIter->second.index() != TYPE_INDEX) { - return false; - } - return true; -} - -// AssetOpType and AssetStatus will be tagged, assets to be changed will be returned -static Assets TagAsset(const std::string &assetFieldName, VBucket &coveredData, VBucket &beCoveredData, - bool setNormalStatus) -{ - Assets res = {}; - bool beCoveredHasAsset = IsDataContainField(assetFieldName, beCoveredData) || - IsDataContainField(assetFieldName, beCoveredData); - bool coveredHasAsset = IsDataContainField(assetFieldName, coveredData); - if (!beCoveredHasAsset) { - if (!coveredHasAsset) { - LOGD("[CloudSyncer] Both data do not contain certain asset field"); - return res; - } - TagAssetWithNormalStatus( - setNormalStatus, AssetOpType::INSERT, std::get(coveredData[assetFieldName]), res); - return res; - } - if (!coveredHasAsset) { - if (beCoveredData[assetFieldName].index() == TYPE_INDEX) { - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::DELETE, - std::get(beCoveredData[assetFieldName]), res); - } else if (beCoveredData[assetFieldName].index() == TYPE_INDEX) { - TagAssetsWithNormalStatus(setNormalStatus, AssetOpType::DELETE, - std::get(beCoveredData[assetFieldName]), res); - } - return res; - } - Asset &covered = std::get(coveredData[assetFieldName]); - Asset beCovered; - if (beCoveredData[assetFieldName].index() == TYPE_INDEX) { - // This indicates that asset in cloudData is stored as Asset - beCovered = std::get(beCoveredData[assetFieldName]); - } else if (beCoveredData[assetFieldName].index() == TYPE_INDEX) { - // Stored as ASSETS, first element in assets will be the target asset - beCovered = (std::get(beCoveredData[assetFieldName]))[0]; - } else { - LOGE("[CloudSyncer] The type of data is neither Asset nor Assets"); - return res; - } - if (covered.name != beCovered.name) { - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::INSERT, covered, res); - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::DELETE, beCovered, res); - return res; - } - if (covered.hash != beCovered.hash) { - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::UPDATE, covered, res); - } else { - Assets tmpAssets; - TagAssetWithNormalStatus(true, AssetOpType::NO_CHANGE, covered, tmpAssets); - } - return res; -} - -// AssetOpType and AssetStatus will be tagged, assets to be changed will be returned -// use VBucket rather than Type because we need to check whether it is empty -static Assets TagAssets(const std::string &assetFieldName, VBucket &coveredData, VBucket &beCoveredData, - bool setNormalStatus) -{ - Assets res = {}; - bool beCoveredHasAssets = IsDataContainField(assetFieldName, beCoveredData); - bool coveredHasAssets = IsDataContainField(assetFieldName, coveredData); - if (!beCoveredHasAssets) { - if (!coveredHasAssets) { - return res; - } - // all the element in assets will be set to INSERT - TagAssetsWithNormalStatus(setNormalStatus, - AssetOpType::INSERT, std::get(coveredData[assetFieldName]), res); - return res; - } - if (!coveredHasAssets) { - // all the element in assets will be set to DELETE - TagAssetsWithNormalStatus(setNormalStatus, - AssetOpType::DELETE, std::get(beCoveredData[assetFieldName]), res); - coveredData[assetFieldName] = res; - return res; - } - Assets &covered = std::get(coveredData[assetFieldName]); - Assets &beCovered = std::get(beCoveredData[assetFieldName]); - std::map coveredAssetsIndexMap = CloudStorageUtils::GenAssetsIndexMap(covered); - for (Asset &beCoveredAsset : beCovered) { - auto it = coveredAssetsIndexMap.find(beCoveredAsset.name); - if (it == coveredAssetsIndexMap.end()) { - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::DELETE, beCoveredAsset, res); - std::get(coveredData[assetFieldName]).push_back(beCoveredAsset); - continue; - } - Asset &coveredAsset = covered[it->second]; - if (beCoveredAsset.hash != coveredAsset.hash) { - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::UPDATE, coveredAsset, res); - } else { - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::NO_CHANGE, coveredAsset, res); - } - // Erase element which has been handled, remaining element will be set to Insert - coveredAssetsIndexMap.erase(it); - // flag in Asset is defaultly set to NoChange, so we just go to next iteration - } - for (const auto &noHandledAssetKvPair : coveredAssetsIndexMap) { - TagAssetWithNormalStatus(setNormalStatus, AssetOpType::INSERT, covered[noHandledAssetKvPair.second], res); - } - return res; -} - -bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector &assetFields, VBucket &data) -{ - for (const auto &assetField : assetFields) { - if (assetField.type == TYPE_INDEX && data[assetField.colName].index() == TYPE_INDEX) { - if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get(data[assetField.colName]))) { - return true; - } - } - } - return false; -} - -bool CloudSyncer::IsDataContainAssets() -{ - std::lock_guard autoLock(contextLock_); - bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end()); - if (!hasTable) { - LOGE("[CloudSyncer] failed to get assetFields, because tableName doesn't exit in currentContext, %d.", - -E_INTERNAL_ERROR); - } - if (hasTable && currentContext_.assetFields[currentContext_.tableName].empty()) { - LOGI("[CloudSyncer] Current table do not contain assets, thereby we needn't download assets"); - return false; - } - return true; -} - -void CloudSyncer::IncSyncCallbackTaskCount() -{ - std::lock_guard autoLock(syncCallbackMutex_); - syncCallbackCount_++; -} - -void CloudSyncer::DecSyncCallbackTaskCount() -{ - { - std::lock_guard autoLock(syncCallbackMutex_); - syncCallbackCount_--; - } - syncCallbackCv_.notify_all(); -} - -void CloudSyncer::WaitAllSyncCallbackTaskFinish() -{ - std::unique_lock uniqueLock(syncCallbackMutex_); - LOGD("[CloudSyncer] Begin wait all callback task finish"); - syncCallbackCv_.wait(uniqueLock, [this]() { - return syncCallbackCount_ <= 0; - }); - LOGD("[CloudSyncer] End wait all callback task finish"); -} - -std::map CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData, - bool setNormalStatus) -{ - // Define a map to store the result - std::map res = {}; - std::vector assetFields; - { - std::lock_guard autoLock(contextLock_); - assetFields = currentContext_.assetFields[currentContext_.tableName]; - } - // For every column contain asset or assets, assetFields are in context - for (const Field &assetField : assetFields) { - Assets assets = TagAssetsInSingleCol(coveredData, beCoveredData, assetField, setNormalStatus); - if (!assets.empty()) { - res[assetField.colName] = assets; - } - } - return res; -} - -Assets CloudSyncer::TagAssetsInSingleCol( - VBucket &coveredData, VBucket &beCoveredData, const Field &assetField, bool setNormalStatus) -{ - // Define a list to store the tagged result - Assets assets = {}; - switch (assetField.type) { - case TYPE_INDEX: { - assets = TagAssets(assetField.colName, coveredData, beCoveredData, setNormalStatus); - break; - } - case TYPE_INDEX: { - assets = TagAsset(assetField.colName, coveredData, beCoveredData, setNormalStatus); - break; - } - default: - LOGW("[CloudSyncer] Meet an unexpected type %d", assetField.type); - break; - } - return assets; -} - -int CloudSyncer::FillCloudAssets( - const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets) -{ - int ret = E_OK; - if (normalAssets.size() > 1) { - ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true); - if (ret != E_OK) { - LOGE("[CloudSyncer] Can not fill normal cloud assets for download"); - return ret; - } - } - if (failedAssets.size() > 1) { - ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false); - if (ret != E_OK) { - LOGE("[CloudSyncer] Can not fill abnormal assets for download"); - return ret; - } - } - return E_OK; -} - -int CloudSyncer::HandleDownloadResult(const std::string &tableName, DownloadCommitList &commitList, - uint32_t &successCount) -{ - successCount = 0; - int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE); - if (errCode != E_OK) { - LOGE("[CloudSyncer] start transaction Failed before handle download."); - return errCode; - } - errCode = storageProxy_->SetLogTriggerStatus(false); - if (errCode != E_OK) { - return errCode; - } - for (size_t i = 0; i < commitList.size(); i++) { - std::string gid = std::get<0>(commitList[i]); // 0 means gid is the first element in assetsInfo - // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i] - std::map assetsMap = std::get<1>(commitList[i]); - bool setAllNormal = std::get<2>(commitList[i]); // 2 means whether the download return is E_OK - VBucket normalAssets; - VBucket failedAssets; - normalAssets[CloudDbConstant::GID_FIELD] = gid; - failedAssets[CloudDbConstant::GID_FIELD] = gid; - for (auto &assetKvPair : assetsMap) { - Assets &assets = assetKvPair.second; - if (setAllNormal) { - normalAssets[assetKvPair.first] = std::move(assets); - } else { - failedAssets[assetKvPair.first] = std::move(assets); - } - } - errCode = FillCloudAssets(tableName, normalAssets, failedAssets); - if (errCode != E_OK) { - break; - } - successCount++; - } - errCode = storageProxy_->SetLogTriggerStatus(true); - if (errCode != E_OK) { - LOGE("Set log trigger true failed when handle download.%d", errCode); - successCount = 0; - storageProxy_->Rollback(); - return errCode; - } - errCode = storageProxy_->Commit(); - if (errCode != E_OK) { - successCount = 0; - } - return errCode; -} - -int CloudSyncer::CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult, - const std::set &dupHashKeySet, ChangedData &changedAssets) -{ - int downloadStatus = E_OK; - DownloadCommitList commitList; - for (size_t i = 0; i < downloadList.size(); i++) { - DownloadItem downloadItem; - GetDownloadItem(downloadList, i, downloadItem); - std::map downloadAssets(downloadItem.assets); - CloudStorageUtils::EraseNoChangeAsset(downloadAssets); - if (downloadAssets.empty()) { // Download data (include deleting) - continue; - } - int errorCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, downloadAssets); - if (errorCode == -E_NOT_SET) { - info.downLoadInfo.failCount += (downloadList.size() - i); - info.downLoadInfo.successCount -= (downloadList.size() - i); - return -E_NOT_SET; - } - if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) { - changedAssets.primaryData[OpTypeToChangeType(downloadItem.strategy)].push_back( - downloadItem.primaryKeyValList); - } else if (downloadItem.strategy == OpType::INSERT) { - changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList); - } - if (!willHandleResult) { - continue; - } - CloudStorageUtils::MergeDownloadAsset(downloadAssets, downloadItem.assets); - // Process result of each asset - commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK)); - downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus; - if ((i + 1) % MAX_DOWNLOAD_COMMIT_LIMIT == 0 || i == (commitList.size() - 1)) { - int ret = CommitDownloadResult(info, commitList); - if (ret != E_OK) { - return ret; - } - } - } - if (!commitList.empty()) { - int ret = CommitDownloadResult(info, commitList); - if (ret != E_OK) { - return ret; - } - } - LOGD("Download status is %d", downloadStatus); - return downloadStatus; -} - -void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem) -{ - downloadItem.gid = std::get<0>(downloadList[i]); // 0 means gid is the first element in assetsInfo - downloadItem.prefix = std::get<1>(downloadList[i]); // 1 means primaryKey is the second element in assetsInfo - downloadItem.strategy = std::get<2>(downloadList[i]); // 2 means strategy is the third element in assetsInfo - // 3 means assets info [colName, assets] is the forth element in downloadList[i] - downloadItem.assets = std::get<3>(downloadList[i]); - downloadItem.hashKey = std::get<4>(downloadList[i]); // 4 means hash key - downloadItem.primaryKeyValList = std::get<5>(downloadList[i]); // 5 means primary key value list -} - -int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector &pKColNames, - const std::set &dupHashKeySet, ChangedData &changedAssets) -{ - if (!IsDataContainAssets()) { - return E_OK; - } - // update changed data info - if (!IsChngDataEmpty(changedAssets)) { - // changedData.primaryData should have no member inside - return -E_INVALID_ARGS; - } - changedAssets.tableName = info.tableName; - changedAssets.type = ChangedDataType::ASSET; - changedAssets.field = pKColNames; - // Get AssetDownloadList - DownloadList downloadList; - DownloadList completeDeletedList; - { - std::lock_guard autoLock(contextLock_); - downloadList = currentContext_.assetDownloadList.downloadList; - completeDeletedList = currentContext_.assetDownloadList.completeDownloadList; - } - // Download data (include deleting) will handle return Code in this situation - int ret = CloudDbDownloadAssets(info, downloadList, true, dupHashKeySet, changedAssets); - if (ret != E_OK) { - LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret); - return ret; - } - // Download data (include deleting), won't handle return Code in this situation - ret = CloudDbDownloadAssets(info, completeDeletedList, false, dupHashKeySet, changedAssets); - if (ret != E_OK) { - LOGE("[CloudSyncer] Can not download assets or can not handle download result for deleted record %d", ret); - } - return ret; -} - -std::map CloudSyncer::GetAssetsFromVBucket(VBucket &data) -{ - std::map assets; - std::vector fields; - { - std::lock_guard autoLock(contextLock_); - fields = currentContext_.assetFields[currentContext_.tableName]; - } - for (const auto &field : fields) { - if (data.find(field.colName) != data.end()) { - if (field.type == TYPE_INDEX && data[field.colName].index() == TYPE_INDEX) { - assets[field.colName] = { std::get(data[field.colName]) }; - } else if (field.type == TYPE_INDEX && data[field.colName].index() == TYPE_INDEX) { - assets[field.colName] = std::get(data[field.colName]); - } else { - Assets emptyAssets; - assets[field.colName] = emptyAssets; - } - } - } - return assets; -} - -int CloudSyncer::TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo) -{ - OpType strategyOpResult = OpType::NOT_HANDLE; - int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult); - if (errCode != E_OK) { - return errCode; - } - param.downloadData.opType[idx] = strategyOpResult; - if (!IsDataContainAssets()) { - return E_OK; - } - Key hashKey; - if (isExist) { - hashKey = dataInfo.localInfo.logInfo.hashKey; - } - return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo); -} - -int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo, - VBucket &localAssetInfo) -{ - int ret = E_OK; - OpType strategy = param.downloadData.opType[idx]; - switch (strategy) { - case OpType::INSERT: - case OpType::UPDATE: - case OpType::DELETE: - ret = HandleTagAssets(hashKey, idx, param, dataInfo, localAssetInfo); - break; - case OpType::NOT_HANDLE: - case OpType::ONLY_UPDATE_GID: - case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data - // Save the asset info into context - std::map assetsMap = GetAssetsFromVBucket(param.downloadData.data[idx]); - { - std::lock_guard autoLock(contextLock_); - if (currentContext_.assetsInfo.find(param.tableName) == currentContext_.assetsInfo.end()) { - currentContext_.assetsInfo[param.tableName] = {}; - } - currentContext_.assetsInfo[param.tableName][dataInfo.cloudLogInfo.cloudGid] = assetsMap; - } - break; - } - default: - break; - } - return ret; -} - -int CloudSyncer::HandleTagAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo, - VBucket &localAssetInfo) -{ - Type prefix; - std::vector pkVals; - OpType strategy = param.downloadData.opType[idx]; - bool isDelStrategy = (strategy == OpType::DELETE); - int ret = GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys : param.downloadData.data[idx], - param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals); - if (ret != E_OK) { - LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret); - return ret; - } - prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix; - if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX) { - LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil."); - return -E_INTERNAL_ERROR; - } - std::map assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo, - false); - if (isDelStrategy) { - param.assetsDownloadList.completeDownloadList.push_back( - std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey, pkVals)); - } else { - param.assetsDownloadList.downloadList.push_back( - std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey, pkVals)); - } - return ret; -} - -int CloudSyncer::SaveDatum(SyncParam ¶m, size_t idx, WithoutRowIdData &withoutRowIdData, - std::vector> &deletedList) -{ - int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames); - if (ret != E_OK) { - LOGE("[CloudSyncer] Invalid download data:%d", ret); - return ret; - } - ModifyCloudDataTime(param.downloadData.data[idx]); - DataInfo dataInfo; - VBucket localAssetInfo; - bool isExist = true; - ret = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[idx], dataInfo.localInfo, - localAssetInfo); - if (ret == -E_NOT_FOUND) { - isExist = false; - } else if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret); - return ret; - } - // Get cloudLogInfo from cloud data - dataInfo.cloudLogInfo = GetCloudLogInfo(param.downloadData.data[idx]); - // Tag datum to get opType - ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot tag status: %d.", ret); - return ret; - } - ret = SaveChangedData(param, idx, dataInfo, withoutRowIdData, deletedList); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot save changed data: %d.", ret); - } - return ret; -} - -int CloudSyncer::SaveData(SyncParam ¶m) -{ - if (!IsChngDataEmpty(param.changedData)) { - LOGE("[CloudSyncer] changedData.primaryData should have no member inside."); - return -E_INVALID_ARGS; - } - // Update download batch Info - param.info.downLoadInfo.batchIndex += 1; - param.info.downLoadInfo.total += param.downloadData.data.size(); - int ret = E_OK; - WithoutRowIdData withoutRowIdData; - AssetDownloadList assetsDownloadList; - param.assetsDownloadList = assetsDownloadList; - param.deletePrimaryKeySet.clear(); - param.dupHashKeySet.clear(); - std::vector> deletedList; - for (size_t i = 0; i < param.downloadData.data.size(); i++) { - ret = SaveDatum(param, i, withoutRowIdData, deletedList); - if (ret != E_OK) { - param.info.downLoadInfo.failCount += param.downloadData.data.size(); - LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret); - return ret; - } - } - // Save assetsMap into current context - { - std::lock_guard autoLock(contextLock_); - currentContext_.assetDownloadList = param.assetsDownloadList; - } - // save the data to the database by batch, downloadData will return rowid when insert data. - ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData); - if (ret != E_OK) { - param.info.downLoadInfo.failCount += param.downloadData.data.size(); - LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret); - return ret; - } - ret = UpdateChangedData(param.downloadData, withoutRowIdData, param.changedData); - if (ret != E_OK) { - param.info.downLoadInfo.failCount += param.downloadData.data.size(); - LOGE("[CloudSyncer] Cannot update changed data: %d.", ret); - return ret; - } - // Update downloadInfo - param.info.downLoadInfo.successCount += param.downloadData.data.size(); - // Get latest cloudWaterMark - VBucket &lastData = param.downloadData.data.back(); - param.cloudWaterMark = std::get(lastData[CloudDbConstant::CURSOR_FIELD]); - return E_OK; -} - -int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName) -{ - // Check Input and Context Validity - int ret = CheckTaskIdValid(taskId); - if (ret != E_OK) { - return ret; - } - { - std::lock_guard autoLock(queueLock_); - if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { - LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId); - return -E_INVALID_ARGS; - } - } - if (currentContext_.strategy == nullptr) { - LOGE("[CloudSyncer] Strategy has not been initialized"); - return -E_INVALID_ARGS; - } - ret = storageProxy_->CheckSchema(tableName); - if (ret != E_OK) { - LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret); - return ret; - } - return E_OK; -} - -bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData) -{ - { - std::lock_guard autoLock(contextLock_); - if (IsModeForcePush(currentContext_.currentTaskId)) { - return false; - } - } - // when there have no data been changed, it needn't notified - if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() && - changedData.primaryData[OP_DELETE].empty()) { - return false; - } - return true; -} - -int CloudSyncer::NotifyChangedData(ChangedData &&changedData) -{ - if (!NeedNotifyChangedData(changedData)) { - return E_OK; - } - std::string deviceName; - { - std::lock_guard autoLock(contextLock_); - std::vector devices = currentContext_.notifier->GetDevices(); - if (devices.empty()) { - LOGE("[CloudSyncer] CurrentContext do not contain device info"); - return -E_CLOUD_ERROR; - } - // We use first device name as the target of NotifyChangedData - deviceName = devices[0]; - } - int ret = storageProxy_->NotifyChangedData(deviceName, std::move(changedData)); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret); - } - return ret; -} - -void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m) -{ - CloudTaskInfo taskInfo; - { - std::lock_guard autoLock(queueLock_); - taskInfo = cloudTaskInfos_[taskId]; - } - std::lock_guard autoLock(contextLock_); - - if (currentContext_.strategy->JudgeUpload()) { - currentContext_.notifier->NotifyProcess(taskInfo, param.info); - } else { - if (param.isLastBatch) { - param.info.tableStatus = ProcessStatus::FINISHED; - } - if (taskInfo.table.back() == param.tableName && param.isLastBatch) { - currentContext_.notifier->UpdateProcess(param.info); - } else { - currentContext_.notifier->NotifyProcess(taskInfo, param.info); - } - } -} - -int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m) -{ - int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret); - return ret; - } - if (!IsModeForcePush(taskId)) { - param.changedData.tableName = param.info.tableName; - param.changedData.field = param.pkColNames; - param.changedData.type = ChangedDataType::DATA; - } - ret = SaveData(param); - if (ret != E_OK) { - LOGE("[CloudSyncer] cannot save data: %d.", ret); - int rollBackErrorCode = storageProxy_->Rollback(); - if (rollBackErrorCode != E_OK) { - LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode); - } else { - LOGI("[CloudSyncer] Roll back transaction success: %d.", ret); - } - return ret; - } - ret = storageProxy_->Commit(); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret); - } - return ret; -} - -int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m) -{ - ChangedData changedData; - param.changedData = changedData; - param.downloadData.opType.resize(param.downloadData.data.size()); - int ret = SaveDataInTransaction(taskId, param); - if (ret != E_OK) { - return ret; - } - // call OnChange to notify changedData object first time (without Assets) - ret = NotifyChangedData(std::move(param.changedData)); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret); - return ret; - } - // Begin downloading assets - ChangedData changedAssets; - ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets); - (void)NotifyChangedData(std::move(changedAssets)); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret); - return ret; - } - UpdateCloudWaterMark(param); - return E_OK; -} - -void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, - bool lastBatch) -{ - CloudTaskInfo taskInfo; - { - std::lock_guard autoLock(queueLock_); - taskInfo = cloudTaskInfos_[uploadParam.taskId]; - } - std::lock_guard autoLock(contextLock_); - if (uploadParam.lastTable && lastBatch) { - currentContext_.notifier->UpdateProcess(innerProcessInfo); - } else { - currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo); - } -} - -int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId) -{ - SyncParam param; - int ret = GetCurrentTableName(param.tableName); - if (ret != E_OK) { - LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret); - return ret; - } - param.info.tableName = param.tableName; - std::vector assetFields; - // only no primary key and composite primary key contains rowid. - ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot get primary column names: %d", ret); - return ret; - } - { - std::lock_guard autoLock(contextLock_); - currentContext_.assetFields[currentContext_.tableName] = assetFields; - } - param.isSinglePrimaryKey = IsSinglePrimaryKey(param.pkColNames); - param.cloudWaterMark = ""; - if (!IsModeForcePull(taskId)) { - ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret); - return ret; - } - } - return DoDownloadInner(taskId, param); -} - -int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m) -{ - // Query data by batch until reaching end and not more data need to be download - int ret = PreCheck(taskId, param.info.tableName); - if (ret != E_OK) { - return ret; - } - bool queryEnd = false; - while (!queryEnd) { - // Get cloud data after cloud water mark - param.info.tableStatus = ProcessStatus::PROCESSING; - DownloadData downloadData; - param.downloadData = downloadData; - param.isLastBatch = false; - ret = QueryCloudData(param.info.tableName, param.cloudWaterMark, param.downloadData); - if (ret == -E_QUERY_END) { - // Won't break here since downloadData may not be null - queryEnd = true; - param.isLastBatch = true; - } else if (ret != E_OK) { - std::lock_guard autoLock(contextLock_); - param.info.tableStatus = ProcessStatus::FINISHED; - currentContext_.notifier->UpdateProcess(param.info); - return ret; - } - if (param.downloadData.data.empty()) { - if (ret == E_OK) { - LOGD("[CloudSyncer] try to query cloud data use increment water mark"); - UpdateCloudWaterMark(param); - continue; - } - NotifyInEmptyDownload(taskId, param.info); - break; - } - // Save data in transaction, update cloud water mark, notify process and changed data - ret = SaveDataNotifyProcess(taskId, param); - if (ret != E_OK) { - std::lock_guard autoLock(contextLock_); - param.info.tableStatus = ProcessStatus::FINISHED; - currentContext_.notifier->UpdateProcess(param.info); - return ret; - } - (void)NotifyInDownload(taskId, param); - } - return E_OK; -} - -void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info) -{ - std::lock_guard autoLock(contextLock_); - if (currentContext_.strategy->JudgeUpload()) { - currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info); - } else { - info.tableStatus = FINISHED; - if (cloudTaskInfos_[taskId].table.back() == info.tableName) { - currentContext_.notifier->UpdateProcess(info); - } else { - currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info); - } - } -} - -int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark) -{ - int ret = PreCheck(taskId, tableName); - if (ret != E_OK) { - return ret; - } - { - std::lock_guard autoLock(queueLock_); - if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { - LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId); - return -E_INVALID_ARGS; - } - if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) || - (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) { - LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.", - static_cast(cloudTaskInfos_[taskId].mode)); - return -E_INVALID_ARGS; - } - } - - if (!IsModeForcePush(taskId)) { - ret = storageProxy_->GetLocalWaterMark(tableName, localMark); - if (ret != E_OK) { - LOGE("[CloudSyncer] Failed to get local water mark when upload, %d.", ret); - } - } - return ret; -} - -bool CloudSyncer::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData) -{ - return uploadData.insData.extend.empty() && uploadData.insData.record.empty() && - uploadData.updData.extend.empty() && uploadData.updData.record.empty() && - uploadData.delData.extend.empty() && uploadData.delData.record.empty(); -} - -int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo) -{ - int errCode = E_OK; - Info insertInfo; - Info updateInfo; - Info deleteInfo; - - if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) { - errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record, - uploadData.delData.extend, deleteInfo); - if (errCode != E_OK) { - return errCode; - } - innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount; - } - - if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) { - errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record, - uploadData.insData.extend, insertInfo); - if (errCode != E_OK) { - return errCode; - } - // we need to fill back gid after insert data to cloud. - int ret = storageProxy_->FillCloudGidAndAsset(OpType::INSERT, uploadData); - if (ret != E_OK) { - LOGE("[CloudSyncer] Failed to fill back when doing upload insData, %d.", ret); - return ret; - } - innerProcessInfo.upLoadInfo.successCount += insertInfo.successCount; - } - - if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) { - errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record, - uploadData.updData.extend, updateInfo); - if (errCode != E_OK) { - return errCode; - } - errCode = storageProxy_->FillCloudGidAndAsset(OpType::UPDATE, uploadData); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Failed to fill back when doing upload updData, %d.", errCode); - return errCode; - } - innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount; - } - bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total; - if (lastBatch) { - innerProcessInfo.tableStatus = ProcessStatus::FINISHED; - } - // After each batch upload successed, call NotifyProcess - NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch); - - // if batch upload successed, update local water mark - // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here. - errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode); - } - return errCode; -} - -int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam) -{ - int errCode = E_OK; - // if we use local cover cloud strategy, it won't update local water mark also. - if (IsModeForcePush(uploadParam.taskId)) { - return E_OK; - } - errCode = storageProxy_->PutLocalWaterMark(tableName, uploadParam.localMark); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode); - } - return errCode; -} - -int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable) -{ - std::string tableName; - int ret = GetCurrentTableName(tableName); - if (ret != E_OK) { - LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret); - return ret; - } - - Timestamp localMark = 0u; - ret = PreCheckUpload(taskId, tableName, localMark); - if (ret != E_OK) { - LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret); - return ret; - } - - int64_t count = 0; - ret = storageProxy_->GetUploadCount(tableName, localMark, IsModeForcePush(taskId), count); - if (ret != E_OK) { - // GetUploadCount will return E_OK when upload count is zero. - LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret); - return ret; - } - if (count == 0) { - LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero."); - InnerProcessInfo innerProcessInfo; - innerProcessInfo.tableName = tableName; - innerProcessInfo.upLoadInfo.total = 0; // count is zero - innerProcessInfo.tableStatus = ProcessStatus::FINISHED; - { - std::lock_guard autoLock(contextLock_); - if (lastTable) { - currentContext_.notifier->UpdateProcess(innerProcessInfo); - } else { - currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo); - } - } - return E_OK; - } - UploadParam param; - param.count = count; - param.localMark = localMark; - param.lastTable = lastTable; - param.taskId = taskId; - return DoUploadInner(tableName, param); -} - -void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData) -{ - if (!IsDataContainAssets()) { - return; - } - std::map> cloudAssets; - { - std::lock_guard autoLock(contextLock_); - cloudAssets = currentContext_.assetsInfo[currentContext_.tableName]; - } - // for delete scenario, assets should not appear in the records. Thereby we needn't tag the assests. - // for insert scenario, gid does not exist. Thereby, we needn't compare with cloud asset get in download procedure - for (size_t i = 0; i < uploadData.insData.extend.size(); i++) { - VBucket cloudAsset; // cloudAsset must be empty - (void)TagAssetsInSingleRecord(uploadData.insData.record[i], cloudAsset, true); - } - // for update scenario, assets shoulb be compared with asset get in download procedure. - for (size_t i = 0; i < uploadData.updData.extend.size(); i++) { - VBucket cloudAsset; - // gid must exist in UPDATE scenario, cause we have re-fill gid during download procedure - // But we need to check for safety - auto gidIter = uploadData.updData.extend[i].find(CloudDbConstant::GID_FIELD); - if (gidIter == uploadData.updData.extend[i].end()) { - LOGE("[CloudSyncer] Datum to be upload must contain gid"); - return; - } - // update data must contain gid, however, we could only pull data after water mark - // Therefore, we need to check whether we contain the data - std::string &gid = std::get(gidIter->second); - if (cloudAssets.find(gid) == cloudAssets.end()) { - // In this case, we directly upload data without compartion and tagging - std::vector assetFields; - { - std::lock_guard autoLock(contextLock_); - assetFields = currentContext_.assetFields[currentContext_.tableName]; - } - StatusToFlagForAssetsInRecord(assetFields, uploadData.updData.record[i]); - continue; - } - for (const auto &it : cloudAssets[gid]) { - cloudAsset[it.first] = it.second; - } - (void)TagAssetsInSingleRecord(uploadData.updData.record[i], cloudAsset, true); - } -} - -int CloudSyncer::PreProcessBatchUpload(TaskId taskId, const InnerProcessInfo &innerProcessInfo, - CloudSyncData &uploadData, Timestamp &localMark) -{ - // Precheck and calculate local water mark which would be updated if batch upload successed. - int ret = CheckTaskIdValid(taskId); - if (ret != E_OK) { - return ret; - } - ret = CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName, innerProcessInfo.upLoadInfo.total, taskId); - if (ret != E_OK) { - LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret); - return ret; - } - TagUploadAssets(uploadData); - // get local water mark to be updated in future. - ret = UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total, taskId, localMark); - if (ret != E_OK) { - LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret); - } - return ret; -} - -int CloudSyncer::SaveCloudWaterMark(const TableName &tableName) -{ - std::string cloudWaterMark; - { - std::lock_guard autoLock(contextLock_); - if (currentContext_.cloudWaterMarks.find(tableName) == currentContext_.cloudWaterMarks.end()) { - LOGD("[CloudSyncer] Not found water mark just return"); - return E_OK; - } - cloudWaterMark = currentContext_.cloudWaterMarks[tableName]; - } - int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Cannot set cloud water mark while Uploading, %d.", errCode); - } - return errCode; -} - -void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData) -{ - std::lock_guard autoLock(queueLock_); - uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH); -} - -bool CloudSyncer::IsModeForcePush(const TaskId taskId) -{ - std::lock_guard autoLock(queueLock_); - return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH; -} - -bool CloudSyncer::IsModeForcePull(const TaskId taskId) -{ - std::lock_guard autoLock(queueLock_); - return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL; -} - -int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam) -{ - ContinueToken continueStmtToken = nullptr; - CloudSyncData uploadData(tableName); - SetUploadDataFlag(uploadParam.taskId, uploadData); - - int ret = storageProxy_->GetCloudData(tableName, uploadParam.localMark, continueStmtToken, uploadData); - if ((ret != E_OK) && (ret != -E_UNFINISHED)) { - LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret); - return ret; - } - - InnerProcessInfo info; - info.tableName = tableName; - info.tableStatus = ProcessStatus::PROCESSING; - info.upLoadInfo.total = static_cast(uploadParam.count); - uint32_t batchIndex = 0; - bool getDataUnfinished = false; - - while (!CheckCloudSyncDataEmpty(uploadData)) { - getDataUnfinished = (ret == -E_UNFINISHED); - ret = PreProcessBatchUpload(uploadParam.taskId, info, uploadData, uploadParam.localMark); - if (ret != E_OK) { - break; - } - info.upLoadInfo.batchIndex = ++batchIndex; - - ret = DoBatchUpload(uploadData, uploadParam, info); - if (ret != E_OK) { - LOGE("[CloudSyncer] Failed to do upload, %d", ret); - info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount; - info.tableStatus = ProcessStatus::FINISHED; - { - std::lock_guard autoLock(contextLock_); - currentContext_.notifier->UpdateProcess(info); - } - break; - } - - uploadData = CloudSyncData(tableName); - - if (continueStmtToken == nullptr) { - break; - } - SetUploadDataFlag(uploadParam.taskId, uploadData); - - ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData); - if ((ret != E_OK) && (ret != -E_UNFINISHED)) { - LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret); - break; - } - } - - if (getDataUnfinished) { - storageProxy_->ReleaseContinueToken(continueStmtToken); - } - return ret; -} - -int CloudSyncer::PreHandleData(VBucket &datum, const std::vector &pkColNames) -{ - // type index of field in fields - std::vector> filedAndIndex = { - std::pair(CloudDbConstant::GID_FIELD, TYPE_INDEX), - std::pair(CloudDbConstant::CREATE_FIELD, TYPE_INDEX), - std::pair(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX), - std::pair(CloudDbConstant::DELETE_FIELD, TYPE_INDEX), - std::pair(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX) - }; - - for (size_t i = 0; i < filedAndIndex.size(); i++) { - if (datum.find(filedAndIndex[i].first) == datum.end()) { - LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", filedAndIndex[i].first.c_str()); - return -E_CLOUD_ERROR; - } - if (datum[filedAndIndex[i].first].index() != static_cast(filedAndIndex[i].second)) { - LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", filedAndIndex[i].first.c_str()); - return -E_CLOUD_ERROR; - } - } - - if (std::get(datum[CloudDbConstant::DELETE_FIELD])) { - RemoveDataExceptExtendInfo(datum, pkColNames); - } - std::lock_guard autoLock(contextLock_); - if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) { - LOGE("[CloudSyncer] Cloud data contain duplicate asset"); - return -E_CLOUD_ERROR; - } - return E_OK; -} - -int CloudSyncer::QueryCloudData(const std::string &tableName, std::string &cloudWaterMark, - DownloadData &downloadData) -{ - VBucket extend = { - {CloudDbConstant::CURSOR_FIELD, cloudWaterMark} - }; - int ret = cloudDB_.Query(tableName, extend, downloadData.data); - if (ret == -E_QUERY_END) { - LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded"); - return -E_QUERY_END; - } - if (ret == E_OK && downloadData.data.empty()) { - if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX) { - LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index()); - return -E_CLOUD_ERROR; - } - cloudWaterMark = std::get(extend[CloudDbConstant::CURSOR_FIELD]); - LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str()); - return ret; - } - if (ret != E_OK) { - LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret); - } - return ret; -} - -int CloudSyncer::CheckParamValid(const std::vector &devices, SyncMode mode) -{ - if (devices.size() != 1) { - LOGE("[CloudSyncer] invalid devices size %zu", devices.size()); - return -E_INVALID_ARGS; - } - for (const auto &dev: devices) { - if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) { - LOGE("[CloudSyncer] invalid device, size %zu", dev.size()); - return -E_INVALID_ARGS; - } - } - if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) { - LOGE("[CloudSyncer] not support mode %d", static_cast(mode)); - return -E_NOT_SUPPORT; - } - if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) { - LOGE("[CloudSyncer] invalid mode %d", static_cast(mode)); - return -E_INVALID_ARGS; - } - return E_OK; -} - -int CloudSyncer::CheckTaskIdValid(TaskId taskId) -{ - if (closed_) { - LOGE("[CloudSyncer] DB is closed."); - return -E_DB_CLOSED; - } - { - std::lock_guard autoLock(queueLock_); - if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { - LOGE("[CloudSyncer] not found task."); - return -E_INVALID_ARGS; - } - if (cloudTaskInfos_[taskId].errCode != E_OK) { - return cloudTaskInfos_[taskId].errCode; - } - } - std::lock_guard autoLock(contextLock_); - return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS; -} - -int CloudSyncer::GetCurrentTableName(std::string &tableName) -{ - std::lock_guard autoLock(contextLock_); - if (currentContext_.tableName.empty()) { - return -E_BUSY; - } - tableName = currentContext_.tableName; - return E_OK; -} - -int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo) -{ - if (closed_) { - LOGW("[CloudSyncer] syncer is closed, should not sync now"); - return -E_DB_CLOSED; - } - std::lock_guard autoLock(queueLock_); - int errCode = CheckQueueSizeWithNoLock(); - if (errCode != E_OK) { - return errCode; - } - do { - currentTaskId_++; - } while (currentTaskId_ == 0); - taskInfo.taskId = currentTaskId_; - taskQueue_.push_back(currentTaskId_); - cloudTaskInfos_[currentTaskId_] = taskInfo; - LOGI("[CloudSyncer] Add task ok, taskId %" PRIu64, cloudTaskInfos_[currentTaskId_].taskId); - return E_OK; -} - -int CloudSyncer::CheckQueueSizeWithNoLock() -{ - int32_t limit = queuedManualSyncLimit_; - if (taskQueue_.size() >= static_cast(limit)) { - LOGW("[CloudSyncer] too much sync task"); - return -E_BUSY; - } - return E_OK; -} - -int CloudSyncer::PrepareSync(TaskId taskId) -{ - std::vector tableNames; - std::vector devices; - SyncMode mode; - { - std::lock_guard autoLock(queueLock_); - if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { - LOGW("[CloudSyncer] Abort sync because syncer is closed"); - return -E_DB_CLOSED; - } - tableNames = cloudTaskInfos_[taskId].table; - mode = cloudTaskInfos_[taskId].mode; - devices = cloudTaskInfos_[taskId].devices; - } - { - // check current task is running or not - std::lock_guard autoLock(contextLock_); - if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) { - LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running"); - return -E_DB_CLOSED; - } - currentContext_.currentTaskId = taskId; - currentContext_.notifier = std::make_shared(this); - currentContext_.strategy = StrategyFactory::BuildSyncStrategy(mode); - currentContext_.notifier->Init(tableNames, devices); - LOGI("[CloudSyncer] exec taskId %" PRIu64, taskId); - } - std::lock_guard autoLock(queueLock_); - cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING; - return E_OK; -} - -int CloudSyncer::LockCloud(TaskId taskId) -{ - int period; - { - auto res = cloudDB_.Lock(); - if (res.first != E_OK) { - return res.first; - } - period = static_cast(res.second) / HEARTBEAT_PERIOD; - } - int errCode = StartHeartBeatTimer(period, taskId); - if (errCode != E_OK) { - UnlockCloud(); - } - return errCode; -} - -int CloudSyncer::UnlockCloud() -{ - FinishHeartBeatTimer(); - int errCode = cloudDB_.UnLock(); - WaitAllHeartBeatTaskExit(); - return errCode; -} - -int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId) -{ - if (timerId_ != 0u) { - LOGW("[CloudSyncer] HeartBeat timer has been start!"); - return E_OK; - } - TimerId timerId = 0; - int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) { - HeartBeat(timerId, taskId); - return E_OK; - }, nullptr, timerId); - if (errCode != E_OK) { - LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode); - return errCode; - } - timerId_ = timerId; - return E_OK; -} - -void CloudSyncer::FinishHeartBeatTimer() -{ - if (timerId_ == 0u) { - return; - } - RuntimeContext::GetInstance()->RemoveTimer(timerId_, true); - timerId_ = 0u; - LOGD("[CloudSyncer] Finish heartbeat timer ok"); -} - -void CloudSyncer::WaitAllHeartBeatTaskExit() -{ - std::unique_lock uniqueLock(heartbeatMutex_); - if (heartBeatCount_ <= 0) { - return; - } - LOGD("[CloudSyncer] Begin wait all heartbeat task exit"); - heartbeatCv_.wait(uniqueLock, [this]() { - return heartBeatCount_ <= 0; - }); - LOGD("[CloudSyncer] End wait all heartbeat task exit"); -} - -void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId) -{ - if (timerId_ != timerId) { - return; - } - { - std::lock_guard autoLock(heartbeatMutex_); - heartBeatCount_++; - } - int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() { - if (heartBeatCount_ >= HEARTBEAT_PERIOD) { - // heartbeat block twice should finish task now - SetTaskFailed(taskId, -E_CLOUD_ERROR); - } else { - int ret = cloudDB_.HeartBeat(); - if (ret != E_OK) { - HeartBeatFailed(taskId, ret); - } else { - failedHeartBeatCount_ = 0; - } - } - { - std::lock_guard autoLock(heartbeatMutex_); - heartBeatCount_--; - } - heartbeatCv_.notify_all(); - }); - if (errCode != E_OK) { - LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode); - { - std::lock_guard autoLock(heartbeatMutex_); - heartBeatCount_--; - } - heartbeatCv_.notify_all(); - } -} - -void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode) -{ - failedHeartBeatCount_++; - if (failedHeartBeatCount_ < MAX_HEARTBEAT_FAILED_LIMIT) { - return; - } - LOGW("[CloudSyncer] heartbeat failed too much times!"); - FinishHeartBeatTimer(); - SetTaskFailed(taskId, errCode); -} - -void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode) -{ - std::lock_guard autoLock(queueLock_); - if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { - return; - } - if (cloudTaskInfos_[taskId].errCode != E_OK) { - return; - } - cloudTaskInfos_[taskId].errCode = errCode; -} - -int CloudSyncer::CheckCloudSyncDataValid(CloudSyncData uploadData, const std::string &tableName, - const int64_t &count, TaskId &taskId) -{ - size_t insRecordLen = uploadData.insData.record.size(); - size_t insExtendLen = uploadData.insData.extend.size(); - size_t updRecordLen = uploadData.updData.record.size(); - size_t updExtendLen = uploadData.updData.extend.size(); - size_t delRecordLen = uploadData.delData.record.size(); - size_t delExtendLen = uploadData.delData.extend.size(); - - bool syncDataValid = (uploadData.tableName == tableName) && - ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) || - (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) || - (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen)); - if (!syncDataValid) { - LOGE("[CloudSyncer] upload data is empty but upload count is not zero or upload table name" - " is not the same as table name of sync data."); - return -E_INTERNAL_ERROR; - } - int64_t syncDataCount = static_cast(insRecordLen) + static_cast(updRecordLen) + - static_cast(delRecordLen); - if (syncDataCount > count) { - LOGE("[CloudSyncer] Size of a batch of sync data is greater than upload data size."); - return -E_INTERNAL_ERROR; - } - - return E_OK; -} - -int CloudSyncer::GetWaterMarkAndUpdateTime(std::vector& extend, Timestamp &waterMark) -{ - for (auto &extendData: extend) { - if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) { - LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist."); - return -E_INTERNAL_ERROR; - } - if (TYPE_INDEX != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) { - LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t."); - return -E_INTERNAL_ERROR; - } - if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) { - LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist."); - return -E_INTERNAL_ERROR; - } - if (TYPE_INDEX != extendData.at(CloudDbConstant::CREATE_FIELD).index()) { - LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t."); - return -E_INTERNAL_ERROR; - } - waterMark = std::max(int64_t(waterMark), std::get(extendData.at(CloudDbConstant::MODIFY_FIELD))); - int64_t modifyTime = - std::get(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND; - int64_t createTime = - std::get(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND; - extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime); - extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime); - } - return E_OK; -} - -// After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark; -int CloudSyncer::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, - TaskId taskId, Timestamp &waterMark) -{ - int ret = CheckCloudSyncDataValid(uploadData, uploadData.tableName, count, taskId); - if (ret != E_OK) { - LOGE("[CloudSyncer] Invalid Sync Data when get local water mark."); - return ret; - } - if (!uploadData.insData.extend.empty()) { - if (uploadData.insData.record.size() != uploadData.insData.extend.size()) { - LOGE("[CloudSyncer] Inconsistent size of inserted data."); - return -E_INTERNAL_ERROR; - } - ret = GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark); - if (ret != E_OK) { - return ret; - } - } - - if (!uploadData.updData.extend.empty()) { - if (uploadData.updData.record.size() != uploadData.updData.extend.size()) { - LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR); - return -E_INTERNAL_ERROR; - } - ret = GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark); - if (ret != E_OK) { - return ret; - } - } - - if (!uploadData.delData.extend.empty()) { - if (uploadData.delData.record.size() != uploadData.delData.extend.size()) { - LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR); - return -E_INTERNAL_ERROR; - } - ret = GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark); - if (ret != E_OK) { - return ret; - } - } - return E_OK; -} - -void CloudSyncer::ClearCloudSyncData(CloudSyncData &uploadData) -{ - std::vector().swap(uploadData.insData.record); - std::vector().swap(uploadData.insData.extend); - std::vector().swap(uploadData.insData.rowid); - std::vector().swap(uploadData.updData.record); - std::vector().swap(uploadData.updData.extend); - std::vector().swap(uploadData.delData.record); - std::vector().swap(uploadData.delData.extend); -} -int32_t CloudSyncer::GetCloudSyncTaskCount() -{ - std::lock_guard autoLock(queueLock_); - return taskQueue_.size(); -} - -int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector &tableNameList, - const RelationalSchemaObject &localSchema) -{ - std::lock_guard lock(syncMutex_); - std::string emptyString; - int index = 1; - for (const auto &tableName: tableNameList) { - LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index); - int ret = storageProxy_->SetCloudWaterMark(tableName, emptyString); - if (ret != E_OK) { - LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret); - return ret; - } - index++; - } - int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE); - if (errCode != E_OK) { - LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode); - return errCode; - } - - std::vector assets; - errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets); - if (errCode != E_OK) { - LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode); - storageProxy_->Rollback(); - return errCode; - } - - if (!assets.empty() && mode == FLAG_AND_DATA) { - errCode = cloudDB_.RemoveLocalAssets(assets); - if (errCode != E_OK) { - LOGE("[Storage Executor] failed to remove local assets, %d.", errCode); - storageProxy_->Rollback(); - return errCode; - } - } - - storageProxy_->Commit(); - return errCode; -} - -void CloudSyncer::ModifyCloudDataTime(VBucket &data) -{ - // data already check field modify_field and create_field - int64_t modifyTime = std::get(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND; - int64_t createTime = std::get(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND; - data[CloudDbConstant::MODIFY_FIELD] = modifyTime; - data[CloudDbConstant::CREATE_FIELD] = createTime; -} - -void CloudSyncer::UpdateCloudWaterMark(const SyncParam ¶m) -{ - bool isUpdateCloudCursor = true; - { - std::lock_guard autoLock(queueLock_); - isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor(); - } - // use the cursor of the last datum in data set to update cloud water mark - if (isUpdateCloudCursor) { - std::lock_guard autoLock(contextLock_); - currentContext_.cloudWaterMarks[param.info.tableName] = param.cloudWaterMark; - } -} - -int CloudSyncer::CommitDownloadResult(InnerProcessInfo &info, DownloadCommitList &commitList) -{ - uint32_t successCount = 0; - int ret = HandleDownloadResult(info.tableName, commitList, successCount); - info.downLoadInfo.failCount += (commitList.size() - successCount); - info.downLoadInfo.successCount -= (commitList.size() - successCount); - if (ret != E_OK) { - LOGE("Commit download result failed.%d", ret); - } - commitList.clear(); - return ret; -} - -std::string CloudSyncer::GetIdentify() const -{ - return id_; -} - -int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult) -{ - strategyOpResult = OpType::NOT_HANDLE; - if (!NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) { - // not handle same data - return E_OK; - } - { - std::lock_guard autoLock(contextLock_); - if (!currentContext_.strategy) { - LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR); - return -E_INTERNAL_ERROR; - } - strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, dataInfo.localInfo.logInfo, - dataInfo.cloudLogInfo, param.deletePrimaryKeySet); - } - return E_OK; -} -} // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h deleted file mode 100644 index e0d7cec9f92926df38425c46b45035ea99e5faf3..0000000000000000000000000000000000000000 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Copyright (c) 2023 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CLOUD_SYNCER_H -#define CLOUD_SYNCER_H -#include -#include -#include -#include - -#include "cloud_db_proxy.h" -#include "cloud/cloud_store_types.h" -#include "cloud/cloud_sync_strategy.h" -#include "cloud/icloud_syncer.h" -#include "cloud/process_notifier.h" -#include "data_transformer.h" -#include "db_common.h" -#include "cloud/icloud_db.h" -#include "ref_object.h" -#include "runtime_context.h" -#include "storage_proxy.h" -#include "store_observer.h" - -namespace DistributedDB { -using DownloadList = std::vector, Key, - std::vector>>; -using DownloadCommitList = std::vector, bool>>; -class CloudSyncer : public ICloudSyncer { -public: - explicit CloudSyncer(std::shared_ptr storageProxy); - ~CloudSyncer() override = default; - DISABLE_COPY_ASSIGN_MOVE(CloudSyncer); - - int Sync(const std::vector &devices, SyncMode mode, const std::vector &tables, - const SyncProcessCallback &callback, int64_t waitTime); - - void SetCloudDB(const std::shared_ptr &cloudDB); - - void SetIAssetLoader(const std::shared_ptr &loader); - - int CleanCloudData(ClearMode mode, const std::vector &tableNameList, - const RelationalSchemaObject &localSchema); - - int32_t GetCloudSyncTaskCount(); - - void Close(); - - void IncSyncCallbackTaskCount() override; - - void DecSyncCallbackTaskCount() override; - - std::string GetIdentify() const override; -protected: - struct DataInfo { - DataInfoWithLog localInfo; - LogInfo cloudLogInfo; - }; - struct AssetDownloadList { - // assets in following list will fill STATUS and timestamp after calling downloading - DownloadList downloadList = {}; - // assets in following list won't fill STATUS and timestamp after calling downloading - DownloadList completeDownloadList = {}; - }; - struct SyncParam { - DownloadData downloadData; - ChangedData changedData; - InnerProcessInfo info; - AssetDownloadList assetsDownloadList; - std::string cloudWaterMark; - std::vector pkColNames; - std::set deletePrimaryKeySet; - std::set dupHashKeySet; - std::string tableName; - bool isSinglePrimaryKey; - bool isLastBatch = false; - }; - struct TaskContext { - TaskId currentTaskId = 0u; - std::string tableName; - std::shared_ptr notifier; - std::shared_ptr strategy; - std::map> assetFields; - // should be cleared after each Download - AssetDownloadList assetDownloadList; - // store GID and assets, using in upload procedure - std::map>> assetsInfo; - std::map cloudWaterMarks; - }; - struct UploadParam { - int64_t count = 0; - TaskId taskId = 0u; - Timestamp localMark = 0u; - bool lastTable = false; - }; - struct WithoutRowIdData { - std::vector insertData = {}; - std::vector> updateData = {}; - }; - struct DownloadItem { - std::string gid; - Type prefix; - OpType strategy; - std::map assets; - Key hashKey; - std::vector primaryKeyValList; - }; - - int TriggerSync(); - - void DoSyncIfNeed(); - - int DoSync(TaskId taskId); - - int DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload); - - int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload); - - void DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo); - - virtual int DoDownload(CloudSyncer::TaskId taskId); - - int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m); - - void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info); - - int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark); - - int PreCheck(TaskId &taskId, const TableName &tableName); - - int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo); - - int CheckCloudSyncDataValid(CloudSyncData uploadData, const std::string &tableName, const int64_t &count, - TaskId &taskId); - - static bool CheckCloudSyncDataEmpty(const CloudSyncData &uploadData); - - int GetWaterMarkAndUpdateTime(std::vector& extend, Timestamp &waterMark); - - int UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, TaskId taskId, - Timestamp &waterMark); - - void ClearCloudSyncData(CloudSyncData &uploadData); - - int PreProcessBatchUpload(TaskId taskId, const InnerProcessInfo &innerProcessInfo, - CloudSyncData &uploadData, Timestamp &localMark); - - int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam); - - virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable); - - void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData); - - bool IsModeForcePush(const TaskId taskId); - - bool IsModeForcePull(const TaskId taskId); - - int DoUploadInner(const std::string &tableName, UploadParam &uploadParam); - - int PreHandleData(VBucket &datum, const std::vector &pkColNames); - - int QueryCloudData(const std::string &tableName, std::string &cloudWaterMark, DownloadData &downloadData); - - int CheckTaskIdValid(TaskId taskId); - - int GetCurrentTableName(std::string &tableName); - - int TryToAddSyncTask(CloudTaskInfo &&taskInfo); - - int CheckQueueSizeWithNoLock(); - - int PrepareSync(TaskId taskId); - - int LockCloud(TaskId taskId); - - int UnlockCloud(); - - int StartHeartBeatTimer(int period, TaskId taskId); - - void FinishHeartBeatTimer(); - - void WaitAllHeartBeatTaskExit(); - - void HeartBeat(TimerId timerId, TaskId taskId); - - void HeartBeatFailed(TaskId taskId, int errCode); - - void SetTaskFailed(TaskId taskId, int errCode); - - int SaveDatum(SyncParam ¶m, size_t idx, WithoutRowIdData &withoutRowIdData, - std::vector> &deletedList); - - int SaveData(SyncParam ¶m); - - void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m); - - int SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m); - - int FindDeletedListIndex(const std::vector> &deletedList, const Key &hashKey, - size_t &delIdx); - - int SaveChangedData(SyncParam ¶m, int dataIndex, const DataInfo &dataInfo, - WithoutRowIdData &withoutRowIdData, std::vector> &deletedList); - - int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m); - - void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch); - - bool NeedNotifyChangedData(const ChangedData &changedData); - - int NotifyChangedData(ChangedData &&changedData); - - std::map GetAssetsFromVBucket(VBucket &data); - - std::map TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData, - bool setNormalStatus); - - Assets TagAssetsInSingleCol(VBucket &coveredData, VBucket &beCoveredData, const Field &assetField, - bool setNormalStatus); - - int TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo); - - int HandleTagAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo, VBucket &localAssetInfo); - - int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo, - VBucket &localAssetInfo); - - void TagUploadAssets(CloudSyncData &uploadData); - - int FillCloudAssets(const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets); - - int HandleDownloadResult(const std::string &tableName, DownloadCommitList &commitList, uint32_t &successCount); - - int DownloadAssets(InnerProcessInfo &info, const std::vector &pKColNames, - const std::set &dupHashKeySet, ChangedData &changedAssets); - - int CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult, - const std::set &dupHashKeySet, ChangedData &changedAssets); - - void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem); - - bool IsDataContainAssets(); - - void ModifyCloudDataTime(VBucket &data); - - int SaveCloudWaterMark(const TableName &tableName); - - bool IsDataContainDuplicateAsset(const std::vector &assetFields, VBucket &data); - - int UpdateChangedData(DownloadData &downloadData, const WithoutRowIdData &withoutRowIdData, - ChangedData &changedData); - - void WaitAllSyncCallbackTaskFinish(); - - void UpdateCloudWaterMark(const SyncParam ¶m); - - int TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult); - - int CommitDownloadResult(InnerProcessInfo &info, DownloadCommitList &commitList); - - static int CheckParamValid(const std::vector &devices, SyncMode mode); - - std::mutex queueLock_; - TaskId currentTaskId_; - std::list taskQueue_; - std::map cloudTaskInfos_; - - std::mutex contextLock_; - TaskContext currentContext_; - std::condition_variable contextCv_; - std::mutex syncMutex_; // Clean Cloud Data and Sync are mutually exclusive - - CloudDBProxy cloudDB_; - - std::shared_ptr storageProxy_; - std::atomic queuedManualSyncLimit_; - - std::atomic closed_; - - std::atomic timerId_; - std::mutex heartbeatMutex_; - std::condition_variable heartbeatCv_; - int32_t heartBeatCount_; - std::atomic failedHeartBeatCount_; - - std::mutex syncCallbackMutex_; - std::condition_variable syncCallbackCv_; - int32_t syncCallbackCount_; - - std::string id_; -}; -} -#endif // CLOUD_SYNCER_H diff --git a/frameworks/libs/distributeddb/test/BUILD.gn b/frameworks/libs/distributeddb/test/BUILD.gn index 05d14f0b2242291a87965460c79d46186b1c8c26..4c3f8d156cb8188e8ff408c033202439ea3d46d4 100644 --- a/frameworks/libs/distributeddb/test/BUILD.gn +++ b/frameworks/libs/distributeddb/test/BUILD.gn @@ -854,7 +854,8 @@ distributeddb_unittest("DistributedDBCloudSaveCloudDataTest") { sources = [ "unittest/common/storage/cloud/distributeddb_cloud_save_cloud_data_test.cpp" ] } -distributeddb_unittest("DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest") { +distributeddb_unittest( + "DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest") { sources = [ "unittest/common/interfaces/distributeddb_cloud_interfaces_relational_remove_device_data_test.cpp" ] } @@ -862,6 +863,16 @@ distributeddb_unittest("DistributedDBCloudCheckSyncTest") { sources = [ "unittest/common/storage/cloud/distributeddb_cloud_check_sync_test.cpp" ] } +distributeddb_unittest( + "DistributedDBCloudTableCompoundPrimaryKeySyncTest") { + sources = [ "unittest/common/syncer/cloud/distributeddb_cloud_table_compound_primary_key_sync_test.cpp" ] +} + +distributeddb_unittest( + "DistributedDBCloudTableWithoutPrimaryKeySyncTest") { + sources = [ "unittest/common/syncer/cloud/distributeddb_cloud_table_without_primary_key_sync_test.cpp" ] +} + ############################################################################### group("unittest") { testonly = true @@ -882,6 +893,8 @@ group("unittest") { ":DistributedDBCloudSyncerDownloadTest", ":DistributedDBCloudSyncerProgressManagerTest", ":DistributedDBCloudSyncerUploadTest", + ":DistributedDBCloudTableCompoundPrimaryKeySyncTest", + ":DistributedDBCloudTableWithoutPrimaryKeySyncTest", ":DistributedDBCommonTest", ":DistributedDBCommunicatorDeepTest", ":DistributedDBCommunicatorProxyTest", diff --git a/frameworks/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp index 6ab5678c19941849c0a6c0f2baf2b40245dbf2b3..8f3ec452b68b97d1b25a7504d4d30f99ec024e02 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp @@ -803,6 +803,9 @@ static bool IsPrimaryDataEq( uint64_t type, DistributedDB::ChangedData &input, DistributedDB::ChangedData &expected) { for (size_t m = 0; m < input.primaryData[type].size(); m++) { + if (input.primaryData[type][m].size() != expected.primaryData[type][m].size()) { + return false; + } for (size_t k = 0; k < input.primaryData[type][m].size(); k++) { if (!IsPrimaryKeyEq(input.primaryData[type][m][k], expected.primaryData[type][m][k])) { return false; diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_table_compound_primary_key_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_table_compound_primary_key_sync_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..549f3b3b1c8240ba1d0cc30f2ce0c694bc87f393 --- /dev/null +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_table_compound_primary_key_sync_test.cpp @@ -0,0 +1,566 @@ +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifdef RELATIONAL_STORE +#include +#include +#include "cloud/cloud_storage_utils.h" +#include "cloud_db_constant.h" +#include "distributeddb_data_generate_unit_test.h" +#include "distributeddb_tools_unit_test.h" +#include "process_system_api_adapter_impl.h" +#include "relational_store_instance.h" +#include "relational_store_manager.h" +#include "runtime_config.h" +#include "sqlite_relational_store.h" +#include "sqlite_relational_utils.h" +#include "store_observer.h" +#include "time_helper.h" +#include "virtual_asset_loader.h" +#include "virtual_cloud_data_translate.h" +#include "virtual_cloud_db.h" +#include "mock_asset_loader.h" + +using namespace testing::ext; +using namespace DistributedDB; +using namespace DistributedDBUnitTest; +using namespace std; + +namespace { + string g_storeID = "Relational_Store_SYNC"; + const string g_tableName = "worker"; + const string DEVICE_CLOUD = "cloud_dev"; + const string DB_SUFFIX = ".db"; + const int64_t g_syncWaitTime = 60; + const int g_arrayHalfSub = 2; + int g_syncIndex = 0; + string g_testDir; + string g_storePath; + std::mutex g_processMutex; + std::condition_variable g_processCondition; + std::shared_ptr g_virtualCloudDb; + std::shared_ptr g_virtualAssetLoader; + DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID); + RelationalStoreObserverUnitTest *g_observer = nullptr; + RelationalStoreDelegate *g_delegate = nullptr; + SyncProcess g_syncProcess; + using CloudSyncStatusCallback = std::function &onProcess)>; + const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL = + "CREATE TABLE IF NOT EXISTS " + g_tableName + "(" \ + "name TEXT," \ + "height REAL ," \ + "married BOOLEAN ," \ + "photo BLOB NOT NULL," \ + "asset BLOB," \ + "age INT," \ + "PRIMARY KEY (" \ + " name," \ + " age)" \ + ");"; + const std::vector g_cloudFiledWithOutPrimaryKey = { + {"name", TYPE_INDEX, true}, {"height", TYPE_INDEX}, + {"married", TYPE_INDEX}, {"photo", TYPE_INDEX, false, false}, + {"asset", TYPE_INDEX}, {"age", TYPE_INDEX, true} + }; + const std::vector g_tables = {g_tableName}; + const Asset g_cloudAsset1 = { + .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync", + .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC" + }; + const Asset g_cloudAsset2 = { + .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync", + .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "UPDATE" + }; + + void CreateUserDBAndTable(sqlite3 *&db) + { + EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK); + EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK); + } + + void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull) + { + std::vector photo(photoSize, 'v'); + std::vector record; + std::vector extend; + Timestamp now = TimeHelper::GetSysCurrentTime(); + for (int64_t i = begin; i < begin + count; ++i) { + VBucket data; + data.insert_or_assign("name", "Cloud" + std::to_string(i)); + data.insert_or_assign("height", 166.0); // 166.0 is random double value + data.insert_or_assign("married", false); + data.insert_or_assign("photo", photo); + data.insert_or_assign("age", 13L); // 13 is random int64_t value + Asset asset = g_cloudAsset1; + asset.name = asset.name + std::to_string(i); + assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset); + record.push_back(data); + VBucket log; + log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); + extend.push_back(log); + } + ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName, std::move(record), extend), DBStatus::OK); + LOGD("insert cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); + std::this_thread::sleep_for(std::chrono::milliseconds(count)); + } + + void UpdateCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull) + { + std::vector photo(photoSize, 'v'); + std::vector record; + std::vector extend; + Timestamp now = TimeHelper::GetSysCurrentTime(); + for (int64_t i = begin; i < begin + count; ++i) { + VBucket data; + data.insert_or_assign("name", "Cloud" + std::to_string(i)); + data.insert_or_assign("height", 188.0); // 188.0 is random double value + data.insert_or_assign("married", false); + data.insert_or_assign("photo", photo); + data.insert_or_assign("age", 13L); // 13 is random int64_t value + Asset asset = g_cloudAsset2; + asset.name = asset.name + std::to_string(i); + assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset); + record.push_back(data); + VBucket log; + log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); + log.insert_or_assign(CloudDbConstant::GID_FIELD, to_string(i)); + extend.push_back(log); + } + ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName, std::move(record), extend), DBStatus::OK); + LOGD("update cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); + std::this_thread::sleep_for(std::chrono::milliseconds(count)); + } + + void DeleteCloudTableRecordByGid(int64_t begin, int64_t count) { + for (int64_t i = begin; i < begin + count; ++i) { + VBucket data; + data.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i)); + ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName, data), DBStatus::OK); + } + LOGD("delete cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); + std::this_thread::sleep_for(std::chrono::milliseconds(count)); + } + + void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback, + std::vector &expectProcess) + { + g_syncIndex = 0; + callback = [&syncProcess, &expectProcess](const std::map &process) { + LOGI("devices size = %d", process.size()); + ASSERT_EQ(process.size(), 1u); + syncProcess = std::move(process.begin()->second); + ASSERT_EQ(process.begin()->first, DEVICE_CLOUD); + ASSERT_NE(syncProcess.tableProcess.empty(), true); + LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode); + std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) { + auto table1 = syncProcess.tableProcess.find(item); + if (table1 != syncProcess.tableProcess.end()) { + LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, " + "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u", + item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex, + table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount, + table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex, + table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount, + table1->second.upLoadInfo.failCount); + } + }); + if (expectProcess.empty()) { + if (syncProcess.process == FINISHED) { + g_processCondition.notify_one(); + } + return; + } + ASSERT_LE(static_cast(g_syncIndex), expectProcess.size()); + for (size_t i = 0; i < g_tables.size(); ++i) { + SyncProcess head = expectProcess[g_syncIndex]; + for (auto &expect : head.tableProcess) { + auto real = syncProcess.tableProcess.find(expect.first); + ASSERT_NE(real, syncProcess.tableProcess.end()); + EXPECT_EQ(expect.second.process, real->second.process); + EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex); + EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total); + EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount); + EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount); + EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex); + EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total); + EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount); + EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount); + } + } + g_syncIndex++; + if (syncProcess.process == FINISHED) { + g_processCondition.notify_one(); + } + }; + } + + int QueryCountCallback(void *data, int count, char **colValue, char **colName) + { + if (count != 1) { + return 0; + } + auto expectCount = reinterpret_cast(data); + EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal + return 0; + } + + void CheckDownloadResult(sqlite3 *&db, std::vector expectCounts, std::string keyStr = "Cloud") + { + for (size_t i = 0; i < g_tables.size(); ++i) { + string queryDownload = "select count(*) from " + g_tables[i] + " where name " + + " like '" + keyStr + "%'"; + EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback, + reinterpret_cast(expectCounts[i]), nullptr), SQLITE_OK); + } + } + + void CheckCloudTotalCount(std::vector expectCounts) + { + VBucket extend; + extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0); + for (size_t i = 0; i < g_tables.size(); ++i) { + int64_t realCount = 0; + std::vector data; + g_virtualCloudDb->Query(g_tables[i], extend, data); + for (size_t j = 0; j < data.size(); ++j) { + auto entry = data[j].find(CloudDbConstant::DELETE_FIELD); + if (entry != data[j].end() && std::get(entry->second)) { + continue; + } + realCount++; + } + EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data. + } + } + + void CheckLocalRecordNum(sqlite3 *&db, std::string tableName, int count) { + std::string sql = "select count(*) from " + tableName + ";"; + EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback, + reinterpret_cast(count), nullptr), SQLITE_OK); + } + + void GetCloudDbSchema(DataBaseSchema &dataBaseSchema) + { + TableSchema tableSchemaWithOutPrimaryKey = { + .name = g_tableName, + .fields = g_cloudFiledWithOutPrimaryKey + }; + dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey); + } + + void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime) + { + std::unique_lock lock(g_processMutex); + bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() { + return syncProcess.process == FINISHED; + }); + ASSERT_EQ(result, true); + LOGD("-------------------sync end--------------"); + } + + void callSync(const std::vector &tableNames, SyncMode mode, DBStatus dbStatus) + { + g_syncProcess = {}; + Query query = Query::Select().FromTable(tableNames); + std::vector expectProcess; + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus); + if (dbStatus == DBStatus::OK) { + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + } + } + + void CloseDb() + { + delete g_observer; + g_virtualCloudDb = nullptr; + if (g_delegate != nullptr) { + EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK); + g_delegate = nullptr; + } + } + + class DistributedDBCloudTableCompoundPrimaryKeySyncTest : public testing::Test { + public: + static void SetUpTestCase(void); + static void TearDownTestCase(void); + void SetUp(); + void TearDown(); + protected: + sqlite3 *db = nullptr; + }; + + void DistributedDBCloudTableCompoundPrimaryKeySyncTest::SetUpTestCase(void) + { + DistributedDBToolsUnitTest::TestDirInit(g_testDir); + g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX; + LOGI("The test db is:%s", g_testDir.c_str()); + RuntimeConfig::SetCloudTranslate(std::make_shared()); + } + + void DistributedDBCloudTableCompoundPrimaryKeySyncTest::TearDownTestCase(void) + {} + + void DistributedDBCloudTableCompoundPrimaryKeySyncTest::SetUp(void) + { + if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { + LOGE("rm test db files error."); + } + DistributedDBToolsUnitTest::PrintTestCaseInfo(); + LOGD("Test dir is %s", g_testDir.c_str()); + db = RelationalTestUtils::CreateDataBase(g_storePath); + ASSERT_NE(db, nullptr); + CreateUserDBAndTable(db); + g_observer = new (std::nothrow) RelationalStoreObserverUnitTest(); + ASSERT_NE(g_observer, nullptr); + ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer }, + g_delegate), DBStatus::OK); + ASSERT_NE(g_delegate, nullptr); + ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName, CLOUD_COOPERATION), DBStatus::OK); + g_virtualCloudDb = make_shared(); + g_virtualAssetLoader = make_shared(); + g_syncProcess = {}; + ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK); + ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK); + // sync before setting cloud db schema,it should return SCHEMA_MISMATCH + Query query = Query::Select().FromTable(g_tables); + CloudSyncStatusCallback callback; + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), + DBStatus::SCHEMA_MISMATCH); + DataBaseSchema dataBaseSchema; + GetCloudDbSchema(dataBaseSchema); + ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK); + } + + void DistributedDBCloudTableCompoundPrimaryKeySyncTest::TearDown(void) + { + EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK); + if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { + LOGE("rm test db files error."); + } + } + +/* + * @tc.name: CloudSyncTest001 + * @tc.desc: test data sync when cloud insert + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableCompoundPrimaryKeySyncTest, CloudSyncTest001, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::DATA; + changedDataForTable.field.push_back(std::string("rowid")); + changedDataForTable.field.push_back(std::string("name")); + changedDataForTable.field.push_back(std::string("age")); + for (int64_t i = 0; i < cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_INSERT].push_back({i + 1, + "Cloud" + to_string(i), 13L}); // 13 is expect age + } + g_observer->SetExpectedResult(changedDataForTable); + InsertCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest002 + * @tc.desc: test data sync when cloud update + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableCompoundPrimaryKeySyncTest, CloudSyncTest002, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::DATA; + changedDataForTable.field.push_back(std::string("rowid")); + changedDataForTable.field.push_back(std::string("name")); + changedDataForTable.field.push_back(std::string("age")); + for (int64_t i = 0; i < cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_UPDATE].push_back({i + 1, + "Cloud" + to_string(i), 13L}); // 13 is expect age + } + g_observer->SetExpectedResult(changedDataForTable); + UpdateCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest003 + * @tc.desc: test data sync when cloud delete + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableCompoundPrimaryKeySyncTest, CloudSyncTest003, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::DATA; + changedDataForTable.field.push_back(std::string("rowid")); + changedDataForTable.field.push_back(std::string("name")); + changedDataForTable.field.push_back(std::string("age")); + for (int64_t i = 0; i < cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_DELETE].push_back({i + 1, + "Cloud" + to_string(i), 13L}); // 13 is expect age + } + g_observer->SetExpectedResult(changedDataForTable); + DeleteCloudTableRecordByGid(0, cloudCount); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckCloudTotalCount({0L}); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckLocalRecordNum(db, g_tableName, 0); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest004 + * @tc.desc: test asset when cloud insert + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableCompoundPrimaryKeySyncTest, CloudSyncTest004, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::ASSET; + changedDataForTable.field.push_back(std::string("rowid")); + changedDataForTable.field.push_back(std::string("name")); + changedDataForTable.field.push_back(std::string("age")); + for (int64_t i = 0; i < cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_INSERT].push_back({i + 1, + "Cloud" + to_string(i), 13L}); // 13 is expect age + } + g_observer->SetExpectedResult(changedDataForTable); + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest005 + * @tc.desc: test asset sync when cloud insert + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableCompoundPrimaryKeySyncTest, CloudSyncTest005, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::ASSET; + changedDataForTable.field.push_back(std::string("rowid")); + changedDataForTable.field.push_back(std::string("name")); + changedDataForTable.field.push_back(std::string("age")); + for (int64_t i = 0; i < cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_UPDATE].push_back({i + 1, + "Cloud" + to_string(i), 13L}); // 13 is expect age + } + g_observer->SetExpectedResult(changedDataForTable); + UpdateCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest006 + * @tc.desc: test asset sync when cloud delete + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableCompoundPrimaryKeySyncTest, CloudSyncTest006, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::ASSET; + changedDataForTable.field.push_back(std::string("rowid")); + changedDataForTable.field.push_back(std::string("name")); + changedDataForTable.field.push_back(std::string("age")); + for (int64_t i = 0; i < cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_DELETE].push_back({i + 1, + "Cloud" + to_string(i), 13L}); // 13 is expect age + } + g_observer->SetExpectedResult(changedDataForTable); + DeleteCloudTableRecordByGid(0, cloudCount); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckCloudTotalCount({0L}); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckLocalRecordNum(db, g_tableName, 0); + CloseDb(); +} + +} +#endif // RELATIONAL_STORE \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_table_without_primary_key_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_table_without_primary_key_sync_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a09157e6ebe07a64cde688de8c4f19f86078fde2 --- /dev/null +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_table_without_primary_key_sync_test.cpp @@ -0,0 +1,544 @@ +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifdef RELATIONAL_STORE +#include +#include +#include "cloud/cloud_storage_utils.h" +#include "cloud_db_constant.h" +#include "distributeddb_data_generate_unit_test.h" +#include "distributeddb_tools_unit_test.h" +#include "process_system_api_adapter_impl.h" +#include "relational_store_instance.h" +#include "relational_store_manager.h" +#include "runtime_config.h" +#include "sqlite_relational_store.h" +#include "sqlite_relational_utils.h" +#include "store_observer.h" +#include "time_helper.h" +#include "virtual_asset_loader.h" +#include "virtual_cloud_data_translate.h" +#include "virtual_cloud_db.h" +#include "mock_asset_loader.h" + +using namespace testing::ext; +using namespace DistributedDB; +using namespace DistributedDBUnitTest; +using namespace std; + +namespace { + string g_storeID = "Relational_Store_SYNC"; + const string g_tableName = "worker"; + const string DEVICE_CLOUD = "cloud_dev"; + const string DB_SUFFIX = ".db"; + const int64_t g_syncWaitTime = 60; + const int g_arrayHalfSub = 2; + int g_syncIndex = 0; + string g_testDir; + string g_storePath; + std::mutex g_processMutex; + std::condition_variable g_processCondition; + std::shared_ptr g_virtualCloudDb; + std::shared_ptr g_virtualAssetLoader; + DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID); + RelationalStoreObserverUnitTest *g_observer = nullptr; + RelationalStoreDelegate *g_delegate = nullptr; + SyncProcess g_syncProcess; + using CloudSyncStatusCallback = std::function &onProcess)>; + const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL = + "CREATE TABLE IF NOT EXISTS " + g_tableName + "(" \ + "name TEXT," \ + "height REAL ," \ + "married BOOLEAN ," \ + "photo BLOB NOT NULL," \ + "asset BLOB," \ + "age INT);"; + const std::vector g_cloudFiledWithOutPrimaryKey = { + {"name", TYPE_INDEX, false, true}, {"height", TYPE_INDEX}, + {"married", TYPE_INDEX}, {"photo", TYPE_INDEX, false, false}, + {"asset", TYPE_INDEX}, {"age", TYPE_INDEX} + }; + const std::vector g_tables = {g_tableName}; + const Asset g_cloudAsset1 = { + .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync", + .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC" + }; + const Asset g_cloudAsset2 = { + .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync", + .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "UPDATE" + }; + + void CreateUserDBAndTable(sqlite3 *&db) + { + EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK); + EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK); + } + + void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull) + { + std::vector photo(photoSize, 'v'); + std::vector record; + std::vector extend; + Timestamp now = TimeHelper::GetSysCurrentTime(); + for (int64_t i = begin; i < begin + count; ++i) { + VBucket data; + data.insert_or_assign("name", "Cloud" + std::to_string(i)); + data.insert_or_assign("height", 166.0); // 166.0 is random double value + data.insert_or_assign("married", false); + data.insert_or_assign("photo", photo); + data.insert_or_assign("age", 13L); // 13 is random int64_t value + Asset asset = g_cloudAsset1; + asset.name = asset.name + std::to_string(i); + assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset); + record.push_back(data); + VBucket log; + log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); + extend.push_back(log); + } + ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName, std::move(record), extend), DBStatus::OK); + LOGD("insert cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); + std::this_thread::sleep_for(std::chrono::milliseconds(count)); + } + + void UpdateCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull) + { + std::vector photo(photoSize, 'v'); + std::vector record; + std::vector extend; + Timestamp now = TimeHelper::GetSysCurrentTime(); + for (int64_t i = begin; i < begin + count; ++i) { + VBucket data; + data.insert_or_assign("name", "Cloud" + std::to_string(i)); + data.insert_or_assign("height", 188.0); // 188.0 is random double value + data.insert_or_assign("married", false); + data.insert_or_assign("photo", photo); + data.insert_or_assign("age", 13L); // 13 is random int64_t value + Asset asset = g_cloudAsset2; + asset.name = asset.name + std::to_string(i); + assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset); + record.push_back(data); + VBucket log; + log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i); + log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); + log.insert_or_assign(CloudDbConstant::GID_FIELD, to_string(i)); + extend.push_back(log); + } + ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName, std::move(record), extend), DBStatus::OK); + LOGD("update cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); + std::this_thread::sleep_for(std::chrono::milliseconds(count)); + } + + void DeleteCloudTableRecordByGid(int64_t begin, int64_t count) { + for (int64_t i = begin; i < begin + count; ++i) { + VBucket data; + data.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i)); + ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName, data), DBStatus::OK); + } + LOGD("delete cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); + std::this_thread::sleep_for(std::chrono::milliseconds(count)); + } + + void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback, + std::vector &expectProcess) + { + g_syncIndex = 0; + callback = [&syncProcess, &expectProcess](const std::map &process) { + LOGI("devices size = %d", process.size()); + ASSERT_EQ(process.size(), 1u); + syncProcess = std::move(process.begin()->second); + ASSERT_EQ(process.begin()->first, DEVICE_CLOUD); + ASSERT_NE(syncProcess.tableProcess.empty(), true); + LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode); + std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) { + auto table1 = syncProcess.tableProcess.find(item); + if (table1 != syncProcess.tableProcess.end()) { + LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, " + "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u", + item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex, + table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount, + table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex, + table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount, + table1->second.upLoadInfo.failCount); + } + }); + if (expectProcess.empty()) { + if (syncProcess.process == FINISHED) { + g_processCondition.notify_one(); + } + return; + } + ASSERT_LE(static_cast(g_syncIndex), expectProcess.size()); + for (size_t i = 0; i < g_tables.size(); ++i) { + SyncProcess head = expectProcess[g_syncIndex]; + for (auto &expect : head.tableProcess) { + auto real = syncProcess.tableProcess.find(expect.first); + ASSERT_NE(real, syncProcess.tableProcess.end()); + EXPECT_EQ(expect.second.process, real->second.process); + EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex); + EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total); + EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount); + EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount); + EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex); + EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total); + EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount); + EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount); + } + } + g_syncIndex++; + if (syncProcess.process == FINISHED) { + g_processCondition.notify_one(); + } + }; + } + + int QueryCountCallback(void *data, int count, char **colValue, char **colName) + { + if (count != 1) { + return 0; + } + auto expectCount = reinterpret_cast(data); + EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal + return 0; + } + + void CheckDownloadResult(sqlite3 *&db, std::vector expectCounts, std::string keyStr = "Cloud") + { + for (size_t i = 0; i < g_tables.size(); ++i) { + string queryDownload = "select count(*) from " + g_tables[i] + " where name " + + " like '" + keyStr + "%'"; + EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback, + reinterpret_cast(expectCounts[i]), nullptr), SQLITE_OK); + } + } + + void CheckCloudTotalCount(std::vector expectCounts) + { + VBucket extend; + extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0); + for (size_t i = 0; i < g_tables.size(); ++i) { + int64_t realCount = 0; + std::vector data; + g_virtualCloudDb->Query(g_tables[i], extend, data); + for (size_t j = 0; j < data.size(); ++j) { + auto entry = data[j].find(CloudDbConstant::DELETE_FIELD); + if (entry != data[j].end() && std::get(entry->second)) { + continue; + } + realCount++; + } + EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data. + } + } + + void CheckLocalRecordNum(sqlite3 *&db, std::string tableName, int count) { + std::string sql = "select count(*) from " + tableName + ";"; + EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback, + reinterpret_cast(count), nullptr), SQLITE_OK); + } + + void GetCloudDbSchema(DataBaseSchema &dataBaseSchema) + { + TableSchema tableSchemaWithOutPrimaryKey = { + .name = g_tableName, + .fields = g_cloudFiledWithOutPrimaryKey + }; + dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey); + } + + void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime) + { + std::unique_lock lock(g_processMutex); + bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() { + return syncProcess.process == FINISHED; + }); + ASSERT_EQ(result, true); + LOGD("-------------------sync end--------------"); + } + + void callSync(const std::vector &tableNames, SyncMode mode, DBStatus dbStatus) + { + g_syncProcess = {}; + Query query = Query::Select().FromTable(tableNames); + std::vector expectProcess; + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus); + if (dbStatus == DBStatus::OK) { + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + } + } + + void CloseDb() + { + delete g_observer; + g_virtualCloudDb = nullptr; + if (g_delegate != nullptr) { + EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK); + g_delegate = nullptr; + } + } + + class DistributedDBCloudTableWithoutPrimaryKeySyncTest : public testing::Test { + public: + static void SetUpTestCase(void); + static void TearDownTestCase(void); + void SetUp(); + void TearDown(); + protected: + sqlite3 *db = nullptr; + }; + + void DistributedDBCloudTableWithoutPrimaryKeySyncTest::SetUpTestCase(void) + { + DistributedDBToolsUnitTest::TestDirInit(g_testDir); + g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX; + LOGI("The test db is:%s", g_testDir.c_str()); + RuntimeConfig::SetCloudTranslate(std::make_shared()); + } + + void DistributedDBCloudTableWithoutPrimaryKeySyncTest::TearDownTestCase(void) + {} + + void DistributedDBCloudTableWithoutPrimaryKeySyncTest::SetUp(void) + { + if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { + LOGE("rm test db files error."); + } + DistributedDBToolsUnitTest::PrintTestCaseInfo(); + LOGD("Test dir is %s", g_testDir.c_str()); + db = RelationalTestUtils::CreateDataBase(g_storePath); + ASSERT_NE(db, nullptr); + CreateUserDBAndTable(db); + g_observer = new (std::nothrow) RelationalStoreObserverUnitTest(); + ASSERT_NE(g_observer, nullptr); + ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer }, + g_delegate), DBStatus::OK); + ASSERT_NE(g_delegate, nullptr); + ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName, CLOUD_COOPERATION), DBStatus::OK); + g_virtualCloudDb = make_shared(); + g_virtualAssetLoader = make_shared(); + g_syncProcess = {}; + ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK); + ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK); + // sync before setting cloud db schema,it should return SCHEMA_MISMATCH + Query query = Query::Select().FromTable(g_tables); + CloudSyncStatusCallback callback; + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), + DBStatus::SCHEMA_MISMATCH); + DataBaseSchema dataBaseSchema; + GetCloudDbSchema(dataBaseSchema); + ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK); + } + + void DistributedDBCloudTableWithoutPrimaryKeySyncTest::TearDown(void) + { + EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK); + if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { + LOGE("rm test db files error."); + } + } + +/* + * @tc.name: CloudSyncTest001 + * @tc.desc: test data sync when cloud insert + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableWithoutPrimaryKeySyncTest, CloudSyncTest001, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::DATA; + changedDataForTable.field.push_back(std::string("rowid")); + for (int64_t i = 1; i <= cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_INSERT].push_back({i}); + } + g_observer->SetExpectedResult(changedDataForTable); + InsertCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest002 + * @tc.desc: test data sync when cloud update + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableWithoutPrimaryKeySyncTest, CloudSyncTest002, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::DATA; + changedDataForTable.field.push_back(std::string("rowid")); + for (int64_t i = 1; i <= cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_UPDATE].push_back({i}); + } + g_observer->SetExpectedResult(changedDataForTable); + UpdateCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest003 + * @tc.desc: test data sync when cloud delete + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableWithoutPrimaryKeySyncTest, CloudSyncTest003, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, true); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::DATA; + changedDataForTable.field.push_back(std::string("rowid")); + for (int64_t i = 1; i <= cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_DELETE].push_back({i}); + } + g_observer->SetExpectedResult(changedDataForTable); + DeleteCloudTableRecordByGid(0, cloudCount); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckCloudTotalCount({0L}); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckLocalRecordNum(db, g_tableName, 0); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest004 + * @tc.desc: test asset when cloud insert + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableWithoutPrimaryKeySyncTest, CloudSyncTest004, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::ASSET; + changedDataForTable.field.push_back(std::string("rowid")); + for (int64_t i = 1; i <= cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_INSERT].push_back({i}); + } + g_observer->SetExpectedResult(changedDataForTable); + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest005 + * @tc.desc: test asset sync when cloud update + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableWithoutPrimaryKeySyncTest, CloudSyncTest005, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::ASSET; + changedDataForTable.field.push_back(std::string("rowid")); + for (int64_t i = 1; i <= cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_UPDATE].push_back({i}); + } + g_observer->SetExpectedResult(changedDataForTable); + UpdateCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + CloseDb(); +} + +/* + * @tc.name: CloudSyncTest006 + * @tc.desc: test asset sync when cloud insert + * @tc.type: FUNC + * @tc.require: + * @tc.author: chenchaohao + */ +HWTEST_F(DistributedDBCloudTableWithoutPrimaryKeySyncTest, CloudSyncTest006, TestSize.Level0) +{ + int64_t cloudCount = 10; // 10 is random cloud count + int64_t paddingSize = 10; // 10 is padding size + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckDownloadResult(db, {cloudCount}); + CheckCloudTotalCount({cloudCount}); + + ChangedData changedDataForTable; + changedDataForTable.tableName = g_tableName; + changedDataForTable.type = ChangedDataType::ASSET; + changedDataForTable.field.push_back(std::string("rowid")); + for (int64_t i = 1; i <= cloudCount; ++i) { + changedDataForTable.primaryData[ChangeType::OP_DELETE].push_back({i}); + } + g_observer->SetExpectedResult(changedDataForTable); + DeleteCloudTableRecordByGid(0, cloudCount); + callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK); + CheckCloudTotalCount({0L}); + EXPECT_TRUE(g_observer->IsAllChangedDataEq()); + g_observer->ClearChangedData(); + CheckLocalRecordNum(db, g_tableName, 0); + CloseDb(); +} + +} +#endif // RELATIONAL_STORE \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp index 959e32c6bbc7ff553598712b4e67b30654ea2b51..0f3d94f6e8bbfcad93b73235889381134e646b59 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp @@ -180,6 +180,7 @@ DBStatus VirtualCloudDb::DeleteByGid(const std::string &tableName, VBucket &exte tableData.extend[g_modifiedField] = (int64_t)TimeHelper::GetSysCurrentTime() / CloudDbConstant::TEN_THOUSAND; tableData.extend[g_deleteField] = true; + tableData.extend[g_cursorField] = std::to_string(currentCursor_++); LOGD("[VirtualCloudDb] DeleteByGid, gid %s", std::get(extend[g_gidField]).c_str()); tableData.record.clear(); break;