diff --git a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp index d8c6c720e705e4fc0bb5b57d17bc204f252b4aa5..b2b11a1c2be97c06b91b143b48e44c1b06ae4fd6 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp @@ -47,6 +47,7 @@ int CloudMetaData::GetLocalWaterMark(TableName tableName, LocalWaterMark &localM int CloudMetaData::GetCloudWaterMark(TableName tableName, CloudWaterMark &cloudMark) { + LOGE("===GetCloudWaterMark=%s", cloudMark.c_str()); std::lock_guard lock(cloudMetaMutex_); if (cloudMetaVals_.count(tableName) == 0) { int ret = ReadMarkFromMeta(tableName); @@ -60,6 +61,7 @@ int CloudMetaData::GetCloudWaterMark(TableName tableName, CloudWaterMark &cloudM int CloudMetaData::SetLocalWaterMark(TableName tableName, LocalWaterMark localMark) { + LOGE("===SetLocalWaterMark=%s", cloudMark.c_str()); std::lock_guard lock(cloudMetaMutex_); CloudWaterMark cloudMark = ""; auto iter = cloudMetaVals_.find(tableName); diff --git a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp index 4e5d7e4b7e102e2a7dbb406f81f5c10a7f8a863a..19643d1380c7d5f420e616ab68731d0edc46fbb7 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp +++ b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp @@ -134,8 +134,7 @@ int StorageProxy::Rollback() return errCode; } -int StorageProxy::GetUploadCount(const std::string &tableName, const LocalWaterMark &localMark, - const bool isCloudForcePush, int64_t &count) +int StorageProxy::GetUploadCount(const std::string &tableName, const LocalWaterMark &localMark, int64_t &count) { std::shared_lock readLock(storeMutex_); if (store_ == nullptr) { @@ -145,7 +144,7 @@ int StorageProxy::GetUploadCount(const std::string &tableName, const LocalWaterM LOGE("the transaction has not been started"); return -E_TRANSACT_STATE; } - return store_->GetUploadCount(tableName, localMark, isCloudForcePush, count); + return store_->GetUploadCount(tableName, localMark, count); } int StorageProxy::FillCloudGid(const CloudSyncData &data) @@ -222,20 +221,6 @@ int StorageProxy::PutCloudSyncData(const std::string &tableName, DownloadData &d return store_->PutCloudSyncData(tableName, downloadData); } -int StorageProxy::CleanCloudData(ClearMode mode, const std::vector &tableNameList, - std::vector &assets) -{ - std::shared_lock readLock(storeMutex_); - if (store_ == nullptr) { - return -E_INVALID_DB; - } - if (!transactionExeFlag_.load()) { - LOGE("the transaction has not been started"); - return -E_TRANSACT_STATE; - } - return store_->CleanCloudData(mode, tableNameList, assets); -} - int StorageProxy::ReleaseContinueToken(ContinueToken &continueStmtToken) { return store_->ReleaseCloudDataToken(continueStmtToken); @@ -268,8 +253,7 @@ int StorageProxy::CheckSchema(std::vector &tables) return E_OK; } -int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector &colNames, - std::vector &assetFields) +int StorageProxy::GetPrimaryColNames(const TableName &tableName, std::vector &colNames) { if (!colNames.empty()) { // output parameter should be empty @@ -291,9 +275,6 @@ int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, if (field.primary) { colNames.push_back(field.colName); } - if (field.type == TYPE_INDEX || field.type == TYPE_INDEX) { - assetFields.push_back(field); - } } if (colNames.empty()) { colNames.push_back(CloudDbConstant::ROW_ID_FIELD_NAME); @@ -311,21 +292,12 @@ int StorageProxy::NotifyChangedData(const std::string deviceName, ChangedData && return E_OK; } -int StorageProxy::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isFullReplace) -{ - std::shared_lock readLock(storeMutex_); - if (store_ == nullptr) { - return -E_INVALID_DB; - } - return store_->FillCloudAssetForDownload(tableName, asset, isFullReplace); -} - -int StorageProxy::FillCloudAssetForUpload(const CloudSyncData &data) +int StorageProxy::FillCloudAsset(const std::string &tableName, VBucket &asset, bool isFullReplace) { std::shared_lock readLock(storeMutex_); if (store_ == nullptr) { return -E_INVALID_DB; } - return store_->FillCloudAssetForUpload(data); + return store_->FillCloudAsset(tableName, asset, isFullReplace); } } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index 2ada2797f469410b6f6c8ca3b6202171dcc56b76..016dea40eab28ecc5fe780e9f882c063cfd2639d 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -16,15 +16,12 @@ #include #include -#include #include "cloud/cloud_db_constant.h" -#include "cloud/cloud_storage_utils.h" #include "db_errno.h" #include "cloud/icloud_db.h" #include "kv_store_errno.h" #include "log_print.h" -#include "platform_specific.h" #include "runtime_context.h" #include "strategy_factory.h" #include "storage_proxy.h" @@ -57,9 +54,6 @@ int CloudSyncer::Sync(const std::vector &devices, SyncMode mode, if (cloudDB_.IsNotExistCloudDB()) { return -E_CLOUD_ERROR; } - if (closed_) { - return -E_DB_CLOSED; - } CloudTaskInfo taskInfo; taskInfo.mode = mode; taskInfo.table = tables; @@ -80,13 +74,6 @@ int CloudSyncer::SetCloudDB(const std::shared_ptr &cloudDB) return E_OK; } -int CloudSyncer::SetIAssetLoader(const std::shared_ptr &loader) -{ - cloudDB_.SetIAssetLoader(loader); - LOGI("[CloudSyncer] SetIAssetLoader finish"); - return E_OK; -} - void CloudSyncer::Close() { closed_ = true; @@ -116,10 +103,9 @@ void CloudSyncer::Close() info.errCode = -E_DB_CLOSED; ProcessNotifier notifier; notifier.Init(info.table, info.devices); - notifier.NotifyProcess(info, {}, true); + notifier.NotifyProcess(info, {}); LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", info.taskId, info.errCode); } - storageProxy_->Close(); } void CloudSyncer::ProcessNotifier::Init(const std::vector &tableName, @@ -160,17 +146,12 @@ void CloudSyncer::ProcessNotifier::UpdateProcess(const CloudSyncer::InnerProcess } } -void CloudSyncer::ProcessNotifier::NotifyProcess(const CloudTaskInfo &taskInfo, const InnerProcessInfo &process, - bool notifyWhenError) +void CloudSyncer::ProcessNotifier::NotifyProcess(const CloudTaskInfo &taskInfo, const InnerProcessInfo &process) { UpdateProcess(process); std::map currentProcess; { std::lock_guard autoLock(processMutex_); - if (!notifyWhenError && taskInfo.errCode != E_OK) { - LOGD("[ProcessNotifier] task has error, do not notify now"); - return; - } syncProcess_.errCode = TransferDBErrno(taskInfo.errCode); syncProcess_.process = taskInfo.status; for (const auto &device : devices_) { @@ -241,7 +222,6 @@ void CloudSyncer::DoSyncIfNeed() int CloudSyncer::DoSync(TaskId taskId) { - std::lock_guard lock(syncMutex_); CloudTaskInfo taskInfo; { std::lock_guard autoLock(queueLock_); @@ -251,12 +231,7 @@ int CloudSyncer::DoSync(TaskId taskId) if (errCode != E_OK) { return errCode; } - bool needUpload = true; - { - std::lock_guard autoLock(contextLock_); - needUpload = currentContext_.strategy->JudgeUpload(); - } - errCode = DoSyncInner(taskInfo, needUpload); + errCode = DoSyncInner(taskInfo); int unlockCode = UnlockCloud(); if (errCode == E_OK) { errCode = unlockCode; @@ -264,75 +239,54 @@ int CloudSyncer::DoSync(TaskId taskId) return errCode; } -int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload) +int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo) { int errCode = E_OK; - if (needUpload) { - errCode = storageProxy_->StartTransaction(); + for (size_t i = 0; i < taskInfo.table.size(); ++i) { + LOGD("[CloudSyncer] try download %zu th table", i); + errCode = CheckTaskIdValid(taskInfo.taskId); if (errCode != E_OK) { - LOGE("[CloudSyncer] start transaction Failed before doing upload."); + LOGD("[CloudSyncer] task is invalid, abort sync"); return errCode; } - for (size_t i = 0; i < taskInfo.table.size(); ++i) { - LOGD("[CloudSyncer] try upload table, index: %zu", i + 1); - errCode = CheckTaskIdValid(taskInfo.taskId); - if (errCode != E_OK) { - LOGE("[CloudSyncer] task is invalid, abort sync"); - break; - } - { - std::lock_guard autoLock(contextLock_); - currentContext_.tableName = taskInfo.table[i]; - } - errCode = DoUpload(taskInfo.taskId, i == (taskInfo.table.size() - 1u)); - if (errCode != E_OK) { - LOGE("[CloudSyncer] upload failed %d", errCode); - break; - } - errCode = SaveCloudWaterMark(taskInfo.table[i]); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Can not save cloud water mark after uploading %d", errCode); - return errCode; - } + { + std::lock_guard autoLock(contextLock_); + currentContext_.tableName = taskInfo.table[i]; } - if (errCode == E_OK) { - storageProxy_->Commit(); - } else { - storageProxy_->Rollback(); + errCode = DoDownload(taskInfo.taskId); + if (errCode != E_OK) { + LOGD("[CloudSyncer] download failed %d", errCode); + return errCode; } } - return errCode; -} - -int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload) -{ - int errCode = E_OK; + errCode = storageProxy_->StartTransaction(); + if (errCode != E_OK) { + LOGE("[CloudSyncer] start transaction Failed before doing upload."); + return errCode; + } for (size_t i = 0; i < taskInfo.table.size(); ++i) { - LOGD("[CloudSyncer] try download table, index: %zu", i + 1); + LOGD("[CloudSyncer] try upload %zu th table", i); errCode = CheckTaskIdValid(taskInfo.taskId); if (errCode != E_OK) { - LOGE("[CloudSyncer] task is invalid, abort sync"); - return errCode; + LOGD("[CloudSyncer] task is invalid, abort sync"); + break; } { std::lock_guard autoLock(contextLock_); currentContext_.tableName = taskInfo.table[i]; } - errCode = DoDownload(taskInfo.taskId); + errCode = DoUpload(taskInfo.taskId, i == (taskInfo.table.size() - 1u)); if (errCode != E_OK) { - LOGE("[CloudSyncer] download failed %d", errCode); - return errCode; - } - if (!needUpload) { - errCode = SaveCloudWaterMark(taskInfo.table[i]); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode); - return errCode; - } + LOGD("[CloudSyncer] upload failed %d", errCode); + break; } } - - return DoUploadInNeed(taskInfo, needUpload); + if (errCode == E_OK) { + storageProxy_->Commit(); + } else { + storageProxy_->Rollback(); + } + return errCode; } void CloudSyncer::DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo) @@ -350,11 +304,6 @@ void CloudSyncer::DoFinished(TaskId taskId, int errCode, const InnerProcessInfo currentContext_.notifier = nullptr; currentContext_.strategy = nullptr; currentContext_.tableName.clear(); - currentContext_.assetDownloadList.completeDownloadList.clear(); - currentContext_.assetDownloadList.downloadList.clear(); - currentContext_.assetFields.clear(); - currentContext_.assetsInfo.clear(); - currentContext_.cloudWaterMarks.clear(); } CloudTaskInfo info; { @@ -374,7 +323,7 @@ void CloudSyncer::DoFinished(TaskId taskId, int errCode, const InnerProcessInfo LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", taskId, info.errCode); info.status = ProcessStatus::FINISHED; if (notifier != nullptr) { - notifier->NotifyProcess(info, processInfo, true); + notifier->NotifyProcess(info, processInfo); } } @@ -432,28 +381,22 @@ static bool shouldSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogI return true; } -int CloudSyncer::SaveChangedData(SyncParam ¶m, int dataIndex, DataInfo &dataInfo, - std::vector &InsertDataNoPrimaryKeys) +int CloudSyncer::SaveChangedData(DownloadData &downloadData, + int dataIndex, DataInfoWithLog &localInfo, LogInfo &cloudLogInfo, ChangedData &changedData) { - // For no primary key situation, - if (param.downloadData.opType[dataIndex] == OpType::INSERT && param.changedData.field.size() == 1 && - param.changedData.field[0] == CloudDbConstant::ROW_ID_FIELD_NAME) { - InsertDataNoPrimaryKeys.push_back(dataIndex); - return E_OK; - } - switch (param.downloadData.opType[dataIndex]) { + switch (downloadData.opType[dataIndex]) { case OpType::INSERT: return SaveChangedtData( - param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT); + downloadData.data[dataIndex], changedData, localInfo, ChangeType::OP_INSERT); case OpType::UPDATE: - if (shouldSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) { + if (shouldSaveData(localInfo.logInfo, cloudLogInfo)) { return SaveChangedtData( - param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_UPDATE); + downloadData.data[dataIndex], changedData, localInfo, ChangeType::OP_UPDATE); } break; case OpType::DELETE: return SaveChangedtData( - param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_DELETE); + downloadData.data[dataIndex], changedData, localInfo, ChangeType::OP_DELETE); default: break; } @@ -489,515 +432,65 @@ static void UpdateChangedData( } } -static void TagAsset(AssetOpType flag, AssetStatus status, Asset &asset, Assets &res) -{ - asset.flag = static_cast(flag); - asset.status = static_cast(status); - Timestamp timestamp; - OS::GetCurrentSysTimeInMicrosecond(timestamp); - asset.timestamp = timestamp / CloudDbConstant::TEN_THOUSAND; - res.push_back(asset); -} - -static void TagAssets(AssetOpType flag, AssetStatus status, Assets &assets, Assets &res) -{ - for (Asset &asset : assets) { - TagAsset(flag, status, asset, res); - } -} - -template -static bool IsDataContainField(const std::string &assetFieldName, VBucket &data) -{ - auto assetIter = data.find(assetFieldName); - if (assetIter == data.end()) { - return false; - } - if (assetIter->second.index() == TYPE_INDEX) { - if (std::get(assetIter->second).empty()) { - return false; - } - } - if (assetIter->second.index() != TYPE_INDEX) { - return false; - } - return true; -} - -// AssetOpType and AssetStatus will be tagged, assets to be changed will be returned -static Assets TagAsset(const std::string &assetFieldName, VBucket &coveredData, VBucket &beCoveredData) -{ - Assets res = {}; - bool beCoveredHasAsset = IsDataContainField(assetFieldName, beCoveredData) || - IsDataContainField(assetFieldName, beCoveredData); - bool coveredHasAsset = IsDataContainField(assetFieldName, coveredData); - if (!beCoveredHasAsset) { - if (!coveredHasAsset) { - LOGD("Both data do not contain certain asset field"); - return res; - } - LOGD("coveredData will be regarded as insert"); - TagAsset(AssetOpType::INSERT, AssetStatus::DOWNLOADING, std::get(coveredData[assetFieldName]), res); - return res; - } - if (!coveredHasAsset) { - TagAsset(AssetOpType::DELETE, AssetStatus::DOWNLOADING, std::get(beCoveredData[assetFieldName]), res); - return res; - } - Asset &covered = std::get(coveredData[assetFieldName]); - Assets beCoveredDataInAssets; - Asset beCovered; - int ret = CloudStorageUtils::GetValueFromVBucket(assetFieldName, beCoveredData, beCoveredDataInAssets); - if (ret != E_OK) { - // This indicates that asset in cloudData is stored as Asset - beCovered = std::get(beCoveredData[assetFieldName]); - } else { - // This indicates that asset in cloudData is stored as ASSETS - // first element in assets will be the target asset - beCovered = beCoveredDataInAssets[0]; - } - if (covered.name != beCovered.name) { - TagAsset(AssetOpType::INSERT, AssetStatus::DOWNLOADING, covered, res); - TagAsset(AssetOpType::DELETE, AssetStatus::DOWNLOADING, beCovered, res); - return res; - } - if (covered.hash != beCovered.hash) { - TagAsset(AssetOpType::UPDATE, AssetStatus::DOWNLOADING, covered, res); - } - return res; -} - -static std::unordered_map GenAssetsMap(Assets &assets) -{ - std::unordered_map assetsMap; - for (size_t i = 0; i < assets.size(); i++) { - assetsMap[assets[i].name] = i; - } - return assetsMap; -} - -// AssetOpType and AssetStatus will be tagged, assets to be changed will be returned -// use VBucket rather than Type because we need to check whether it is empty -static Assets TagAssets(const std::string &assetFieldName, VBucket &coveredData, VBucket &beCoveredData, - bool WriteToCoveredData) +int CloudSyncer::SaveData(const TableName &tableName, DownloadData &downloadData, + Info &downloadInfo, CloudWaterMark &latestCloudWaterMark, ChangedData &changedData) { - Assets res = {}; - bool beCoveredHasAssets = IsDataContainField(assetFieldName, beCoveredData); - bool coveredHasAssets = IsDataContainField(assetFieldName, coveredData); - if (!beCoveredHasAssets) { - if (!coveredHasAssets) { - return res; - } - // all the element in assets will be set to INSERT - TagAssets(AssetOpType::INSERT, AssetStatus::DOWNLOADING, std::get(coveredData[assetFieldName]), res); - return res; - } - if (!coveredHasAssets) { - // all the element in assets will be set to DELETE - TagAssets(AssetOpType::DELETE, AssetStatus::DOWNLOADING, std::get(beCoveredData[assetFieldName]), res); - if (WriteToCoveredData) { - LOGD("Write assets to be deleted from beCoveredData to CoveredData"); - coveredData[assetFieldName] = res; - } - return res; - } - Assets &covered = std::get(coveredData[assetFieldName]); - Assets &beCovered = std::get(beCoveredData[assetFieldName]); - std::unordered_map CoveredAssetsMap = GenAssetsMap(covered); - for (Asset &beCoveredAsset : beCovered) { - auto it = CoveredAssetsMap.find(beCoveredAsset.name); - if (it == CoveredAssetsMap.end()) { - TagAsset(AssetOpType::DELETE, AssetStatus::DOWNLOADING, beCoveredAsset, res); - if (WriteToCoveredData) { - std::get(coveredData[assetFieldName]).push_back(beCoveredAsset); - } - continue; - } - Asset &coveredAsset = covered[it->second]; - if (beCoveredAsset.hash != coveredAsset.hash) { - TagAsset(AssetOpType::UPDATE, AssetStatus::DOWNLOADING, coveredAsset, res); - } - // Erase element which has been handled, remaining element will be set to Insert - CoveredAssetsMap.erase(it); - // flag in Asset is defaultly set to NoChange, so we just continue - continue; - } - for (auto &noHandledAssetKvPair : CoveredAssetsMap) { - TagAsset(AssetOpType::INSERT, AssetStatus::DOWNLOADING, covered[noHandledAssetKvPair.second], res); - } - return res; -} - -bool CloudSyncer::ShouldProcessAssets() -{ - std::lock_guard autoLock(contextLock_); - if (currentContext_.assetFields[currentContext_.tableName].empty()) { - LOGI("Current table do not contain assets, thereby we needn't download assets"); - return false; - } - return true; -} - -std::map CloudSyncer::TagAssetsInSingleRecord(VBucket &CoveredData, VBucket &BeCoveredData, - bool WriteToCoveredData) -{ - // Define a map to store the result - std::map res = {}; - std::vector assetFields; - { - std::lock_guard autoLock(contextLock_); - assetFields = currentContext_.assetFields[currentContext_.tableName]; - } - // For every column contain asset or assets, assetFields are in context - for (const Field &assetField : assetFields) { - res[assetField.colName] = TagAssetsInSingleCol(CoveredData, BeCoveredData, assetField, WriteToCoveredData); - } - return res; -} - -Assets CloudSyncer::TagAssetsInSingleCol( - VBucket &CoveredData, VBucket &BeCoveredData, const Field &assetField, bool WriteToCoveredData) -{ - // Define a list to store the tagged result - Assets assets = {}; - switch (assetField.type) { - case TYPE_INDEX: { - assets = TagAssets(assetField.colName, CoveredData, BeCoveredData, WriteToCoveredData); - break; - } - case TYPE_INDEX: { - assets = TagAsset(assetField.colName, CoveredData, BeCoveredData); - break; - } - default: - LOGW("Meet an unexpected type %d", assetField.type); - break; + if (!IsChngDataEmpty(changedData)) { + // changedData.primaryData should have no member inside + return -E_INVALID_ARGS; } - return assets; -} - -int CloudSyncer::FillCloudAssets( - const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets) -{ + // Update download btach Info + downloadInfo.batchIndex += 1; + downloadInfo.total += downloadData.data.size(); + // Tag every datum in data set int ret = E_OK; - if (!normalAssets.empty() && normalAssets.size() > 1) { - ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true); + std::vector InsertDataNoPrimaryKeys; + for (size_t i = 0; i < downloadData.data.size(); i++) { + ret = CheckDownloadDatum(downloadData.data[i]); if (ret != E_OK) { + LOGE("[CloudSyncer] Invalid download data:%d", ret); return ret; } - } - if (!failedAssets.empty() && failedAssets.size() > 1) { - ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false); - if (ret != E_OK) { + DataInfoWithLog dataInfoWithLog; + VBucket assetInfo; + bool isExist = true; + ret = storageProxy_->GetInfoByPrimaryKeyOrGid(tableName, downloadData.data[i], dataInfoWithLog, assetInfo); + if (ret == -E_NOT_FOUND) { + isExist = false; + } else if (ret != E_OK) { + LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret); return ret; } - } - return E_OK; -} - -int CloudSyncer::HandleDownloadResult(const std::string &tableName, std::string gid, - std::map &DownloadResult, bool setAllNormal) -{ - VBucket normalAssets; - VBucket failedAssets; - normalAssets[CloudDbConstant::GID_FIELD] = gid; - failedAssets[CloudDbConstant::GID_FIELD] = gid; - for (auto &assetKvPair : DownloadResult) { - Assets &assets = assetKvPair.second; - Assets normalAssetsList = {}; - Assets failedAssetsList = {}; - for (Asset &asset : assets) { - // if success, write every attribute of asset into database - if (static_cast(asset.status) == AssetStatus::NORMAL || setAllNormal) { - normalAssetsList.push_back(asset); - continue; - } - // if fail, partially write cloud asset attribute into database - if (static_cast(asset.status) == AssetStatus::ABNORMAL) { - failedAssetsList.push_back(asset); - } - if (static_cast(asset.status) == AssetStatus::DOWNLOADING) { - LOGW("Asset status should not be DOWNLOADING after download operation"); - } - } - if (!normalAssetsList.empty()) { - normalAssets[assetKvPair.first] = normalAssetsList; - } - if (!failedAssetsList.empty()) { - failedAssets[assetKvPair.first] = failedAssetsList; - } - } - return FillCloudAssets(tableName, normalAssets, failedAssets); -} - -static ChangeType OpTypeToChangeType(OpType strategy) -{ - switch (strategy) { - case OpType::INSERT: - return OP_INSERT; - case OpType::DELETE: - return OP_DELETE; - case OpType::UPDATE: - return OP_UPDATE; - default: - return OP_BUTT; - } -} - -int CloudSyncer::CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult, - ChangedData &changedAssets) -{ - for (auto &assetsDownloadInfo : downloadList) { - std::string gid = std::get<0>(assetsDownloadInfo); - Type primaryKey = std::get<1>(assetsDownloadInfo); - OpType strategy = std::get<2>(assetsDownloadInfo); - std::map assets = std::get<3>(assetsDownloadInfo); - // Download data (include deleting) - int errorCode = cloudDB_.Download(info.tableName, gid, primaryKey, assets); - if (!willHandleResult || assets.empty()) { - continue; - } - // Process result of each asset - if (errorCode != E_OK) { - // if not OK, update process info and handle download result seperately - info.downLoadInfo.failCount++; - info.downLoadInfo.successCount--; - int ret = HandleDownloadResult(info.tableName, gid, assets, false); - if (ret != E_OK) { - return ret; - } + // Get cloudLogInfo from cloud data + LogInfo cloudLogInfo = GetCloudLogInfo(downloadData.data[i]); + // Tag datum to get opType and save changed data + downloadData.opType[i] = + currentContext_.strategy->TagSyncDataStatus(isExist, dataInfoWithLog.logInfo, cloudLogInfo); + // For no primary key situation, + if (downloadData.opType[i] == OpType::INSERT && changedData.field.size() == 1 && + changedData.field[0] == CloudDbConstant::ROW_ID_FIELD_NAME) { + InsertDataNoPrimaryKeys.push_back(i); continue; } - // if success, update ChangedData && Write every attribute of asset into database - // All status will be set to NORMAL(0) - changedAssets.primaryData[OpTypeToChangeType(strategy)].push_back({primaryKey}); - int ret = HandleDownloadResult(info.tableName, gid, assets, true); - if (ret != E_OK) { - return ret; - } - } - return E_OK; -} - -int CloudSyncer::DownloadNotifyAssets(InnerProcessInfo &info, std::vector &pKColNames, - ChangedData &changedAssets) -{ - if (!ShouldProcessAssets()) { - return E_OK; - } - // update changed data info - if (!IsChngDataEmpty(changedAssets)) { - // changedData.primaryData should have no member inside - return -E_INVALID_ARGS; - } - changedAssets.tableName = info.tableName; - changedAssets.type = ChangedDataType::ASSET; - changedAssets.field = pKColNames; - // Get AssetDownloadList - DownloadList downloadList; - DownloadList completeDeletedList; - { - std::lock_guard autoLock(contextLock_); - downloadList = currentContext_.assetDownloadList.downloadList; - completeDeletedList = currentContext_.assetDownloadList.completeDownloadList; - } - // Download data (include deleting) will handle return Code in this situation - int ret = CloudDbDownloadAssets(info, downloadList, true, changedAssets); - if (ret != E_OK) { - return ret; - } - // Download data (include deleting), won't handle return Code in this situation - ret = CloudDbDownloadAssets(info, completeDeletedList, false, changedAssets); - if (ret != E_OK) { - return ret; - } - ret = NotifyChangedData(std::move(changedAssets)); - if (ret != E_OK) { - LOGE("Cannot notify changed data due to error %d", ret); - return ret; - } - return E_OK; -} - -std::map CloudSyncer::GetAssetsFromVBucket(VBucket &data) -{ - std::map assets; - std::vector fields; - { - std::lock_guard autoLock(contextLock_); - fields = currentContext_.assetFields[currentContext_.tableName]; - } - for (Field &field : fields) { - if (data.find(field.colName) != data.end()) { - if (field.type == TYPE_INDEX && data[field.colName].index() == TYPE_INDEX) { - assets[field.colName] = { std::get(data[field.colName]) }; - } else if (field.type == TYPE_INDEX && data[field.colName].index() == TYPE_INDEX) { - assets[field.colName] = std::get(data[field.colName]); - } else { - Assets emptyAssets; - assets[field.colName] = emptyAssets; - } - } - } - return assets; -} - -static bool IsCompositeKey(std::vector &pKColNames) -{ - if (pKColNames.empty()) { - return false; - } - if (pKColNames.size() > 1) { - return true; - } - return false; -} - -static bool IsNoPrimaryKey(std::vector &pKColNames) -{ - if (pKColNames.empty()) { - return true; - } - if (pKColNames.size() == 1 && pKColNames[0] == CloudDbConstant::ROW_ID_FIELD_NAME) { - return true; - } - return false; -} - -static bool IsSinglePrimaryKey(std::vector &pKColNames) -{ - if (IsCompositeKey(pKColNames) || IsNoPrimaryKey(pKColNames)) { - return false; - } - return true; -} - -void CloudSyncer::TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo) -{ - OpType strategy = - currentContext_.strategy->TagSyncDataStatus(isExist, dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo); - param.downloadData.opType[idx] = strategy; - if (!ShouldProcessAssets()) { - return; - } - std::map assetsMap; - Type prefix; - std::vector pKVals; - int ret = E_OK; - if (IsSinglePrimaryKey(param.pkColNames)) { - if (strategy == OpType::DELETE) { - ret = GetCloudPkVals( - dataInfo.localInfo.primaryKeys, param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pKVals); - } else { - ret = GetCloudPkVals(param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pKVals); - } - prefix = pKVals[0]; - } - // TODO: return ret; - switch (strategy) - { - case OpType::INSERT: - case OpType::UPDATE: - assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo, false); - param.assetsDownloadList.downloadList.push_back( - std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap)); - break; - case OpType::DELETE: - assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo, false); - param.assetsDownloadList.completeDownloadList.push_back( - std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap)); - break; - case OpType::NOT_HANDLE: - case OpType::ONLY_UPDATE_GID: - case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data - // Save the asset info into context - assetsMap = GetAssetsFromVBucket(param.downloadData.data[idx]); - if (!assetsMap.empty()) { - { - std::lock_guard autoLock(contextLock_); - if (currentContext_.assetsInfo.find(param.tableName) == currentContext_.assetsInfo.end()) { - currentContext_.assetsInfo[param.tableName] = {}; - } - currentContext_.assetsInfo[param.tableName][dataInfo.cloudLogInfo.cloudGid] = assetsMap; - } - } - break; - } - default: - break; - } - return; -} - -int CloudSyncer::SaveDatum(SyncParam ¶m, size_t idx, std::vector &InsertDataNoPrimaryKeys) -{ - int ret = CheckDownloadDatum(param.downloadData.data[idx]); - if (ret != E_OK) { - LOGE("[CloudSyncer] Invalid download data:%d", ret); - return ret; - } - ModifyCloudDataTime(param.downloadData.data[idx]); - DataInfo dataInfo; - VBucket localAssetInfo; - bool isExist = true; - ret = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[idx], dataInfo.localInfo, - localAssetInfo); - if (ret == -E_NOT_FOUND) { - isExist = false; - } else if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret); - return ret; - } - // Get cloudLogInfo from cloud data - dataInfo.cloudLogInfo = GetCloudLogInfo(param.downloadData.data[idx]); - // Tag datum to get opType - TagStatus(isExist, param, idx, dataInfo, localAssetInfo); - ret = SaveChangedData(param, idx, dataInfo, InsertDataNoPrimaryKeys); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot save changed data: %d.", ret); - return ret; - } - return E_OK; -} - -int CloudSyncer::SaveData(SyncParam ¶m) -{ - if (!IsChngDataEmpty(param.changedData)) { - // changedData.primaryData should have no member inside - return -E_INVALID_ARGS; - } - // Update download btach Info - param.info.downLoadInfo.batchIndex += 1; - param.info.downLoadInfo.total += param.downloadData.data.size(); - int ret = E_OK; - std::vector InsertDataNoPrimaryKeys; - AssetDownloadList assetsDownloadList; - param.assetsDownloadList = assetsDownloadList; - for (size_t i = 0; i < param.downloadData.data.size(); i++) { - ret = SaveDatum(param, i, InsertDataNoPrimaryKeys); + ret = SaveChangedData(downloadData, i, dataInfoWithLog, cloudLogInfo, changedData); if (ret != E_OK) { - LOGE("Cannot save datum due to error code %d", ret); + LOGE("[CloudSyncer] Cannot save changed data: %d.", ret); return ret; } } - // Save assetsMap into current context - { - std::lock_guard autoLock(contextLock_); - currentContext_.assetDownloadList = param.assetsDownloadList; - } // save the data to the database by batch - ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData); + ret = storageProxy_->PutCloudSyncData(tableName, downloadData); if (ret != E_OK) { - param.info.downLoadInfo.failCount += param.downloadData.data.size(); + downloadInfo.failCount += downloadData.data.size(); LOGE("[CloudSyncer] Cannot save the data to databse with error code: %d.", ret); return ret; } - UpdateChangedData(param.downloadData, InsertDataNoPrimaryKeys, param.changedData); + UpdateChangedData(downloadData, InsertDataNoPrimaryKeys, changedData); // Update downloadInfo - param.info.downLoadInfo.successCount += param.downloadData.data.size(); + downloadInfo.successCount += downloadData.data.size(); // Get latest cloudWaterMark - VBucket &lastData = param.downloadData.data.back(); - param.cloudWaterMark = std::get(lastData[CloudDbConstant::CURSOR_FIELD]); + VBucket &lastData = downloadData.data.back(); + latestCloudWaterMark = std::get(lastData[CloudDbConstant::CURSOR_FIELD]); return E_OK; } @@ -1016,7 +509,7 @@ int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableNam } } if (currentContext_.strategy == nullptr) { - LOGE("Strategy has not been initialized"); + LOGD("Strategy has not been initialized"); return -E_INVALID_ARGS; } ret = storageProxy_->CheckSchema(tableName); @@ -1027,14 +520,8 @@ int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableNam return E_OK; } -bool CloudSyncer::NeedNotifyChangedData(ChangedData &changedData) +static bool NeedNotifyChangedData(ChangedData &changedData) { - { - std::lock_guard autoLock(contextLock_); - if (IsModeForcePush(currentContext_.currentTaskId)) { - return false; - } - } // when there have no data been changed, it needn't notified if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() && @@ -1068,24 +555,24 @@ int CloudSyncer::NotifyChangedData(ChangedData &&changedData) return ret; } -int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m) +int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, const TableName &tableName, + DownloadData &downloadData, InnerProcessInfo &info, const std::vector &pkColNames) { int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE); if (ret != E_OK) { LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret); return ret; } - if (!IsModeForcePush(taskId)) { - param.changedData.tableName = param.info.tableName; - param.changedData.field = param.pkColNames; - param.changedData.type = ChangedDataType::DATA; - } - ret = SaveData(param); + CloudWaterMark newCloudWaterMark; + ChangedData changedData; + changedData.tableName = tableName; + changedData.field = pkColNames; + ret = SaveData(tableName, downloadData, info.downLoadInfo, newCloudWaterMark, changedData); if (ret != E_OK) { LOGE("[CloudSyncer] cannot save data: %d.", ret); { std::lock_guard autoLock(contextLock_); - currentContext_.notifier->UpdateProcess(param.info); + currentContext_.notifier->UpdateProcess(info); } int rollBackErrorCode = storageProxy_->Rollback(); if (rollBackErrorCode != E_OK) { @@ -1100,126 +587,91 @@ int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam &p LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret); return ret; } - return E_OK; -} - -int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m) -{ - ChangedData changedData; - param.changedData = changedData; - int ret = SaveDataInTransaction(taskId, param); - if (ret != E_OK) { - return ret; - } - // call OnChange to notify changedData object first time (without Assets) - ret = NotifyChangedData(std::move(param.changedData)); - if (ret != E_OK) { - LOGE("Cannot notify changed data due to error %d", ret); - return ret; - } - // Begin dowloading assets - ChangedData changedAssets; - ret = DownloadNotifyAssets(param.info, param.pkColNames, changedAssets); - if (ret != E_OK) { - LOGE("Someting wrong happened during assets downloading due to error %d", ret); - return ret; - } - bool isUpdateCloudCursor = true; { std::lock_guard autoLock(queueLock_); - currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info); - isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor(); + currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info); } // use the cursor of the last datum in data set to update cloud water mark - if (isUpdateCloudCursor) { - std::lock_guard autoLock(contextLock_); - currentContext_.cloudWaterMarks[param.info.tableName] = param.cloudWaterMark; + ret = storageProxy_->PutCloudWaterMark(tableName, newCloudWaterMark); + if (ret != E_OK) { + LOGE("[CloudSyncer] Cannot set cloud water mark while downloading, %d.", ret); + return ret; } - return E_OK; + // call OnChange to notify changedData object + return NotifyChangedData(std::move(changedData)); } -void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, - bool lastBatch) +void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo) { - CloudTaskInfo taskInfo; - { - std::lock_guard autoLock(queueLock_); - taskInfo = cloudTaskInfos_[uploadParam.taskId]; - } std::lock_guard autoLock(contextLock_); - if (uploadParam.lastTable && lastBatch) { + if (uploadParam.lastTable) { currentContext_.notifier->UpdateProcess(innerProcessInfo); } else { - currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo); + currentContext_.notifier->NotifyProcess(cloudTaskInfos_[uploadParam.taskId], innerProcessInfo); } } int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId) { - SyncParam param; - int ret = GetCurrentTableName(param.tableName); + TableName tableName; + int ret = GetCurrentTableName(tableName); if (ret != E_OK) { LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret); return ret; } - param.info.tableName = param.tableName; - std::vector assetFields; - ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot get primary column names: %d", ret); - return ret; - } - { - std::lock_guard autoLock(contextLock_); - currentContext_.assetFields[currentContext_.tableName] = assetFields; - } - - ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark); - if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret); - return ret; - } - return DoDownloadInner(taskId, param); -} - + InnerProcessInfo info; + info.tableName = tableName; -int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m) -{ - // Query data by batch until reaching end and not more data need to be download + std::vector colNames; + storageProxy_->GetPrimaryColNames(tableName, colNames); + CloudWaterMark cloudWaterMark; bool queryEnd = false; uint32_t retryCnt = 0; - int ret = E_OK; + bool isDataEmpty = false; + // Query data by batch until reaching end and not more data need to be download while (!queryEnd) { - ret = PreCheck(taskId, param.info.tableName); + ret = PreCheck(taskId, tableName); if (ret != E_OK) { return ret; } // Get cloud data after cloud water mark - param.info.tableStatus = ProcessStatus::PROCESSING; + info.tableStatus = ProcessStatus::PROCESSING; DownloadData downloadData; - param.downloadData = downloadData; - ret = QueryCloudData(param.info.tableName, param.cloudWaterMark, param.downloadData); + if (!isDataEmpty) { + ret = storageProxy_->GetCloudWaterMark(tableName, cloudWaterMark); + if (ret != E_OK) { + LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret); + return ret; + } + } + ret = QueryCloudData(tableName, downloadData, cloudWaterMark); if (ret == -E_QUERY_END) { // Won't break here since downloadData may not be null queryEnd = true; } else if (ret != E_OK) { return ret; } - if (param.downloadData.data.empty()) { - if (ret == E_OK && retryCnt >= CloudDbConstant::MAX_DOWNLOAD_RETRY_TIME) { - LOGE("Cloud Db send empty data but didn't return QUERY_END for too much time"); - return -E_CLOUD_ERROR; - } - if (ret == E_OK && retryCnt < CloudDbConstant::MAX_DOWNLOAD_RETRY_TIME) { - LOGW("Cloud Db return E_OK but send empty data, it should return QUERY_END, retry"); - retryCnt++; - continue; - } - NotifyInEmptyDownload(taskId, param.info); - break; + if (downloadData.data.empty()) { + isDataEmpty = true; + LOGE("=====data is empty, cloudWaterMark=%s", cloudWaterMark.c_str()); + continue; + // if (ret == E_OK && retryCnt >= CloudDbConstant::MAX_DOWNLOAD_RETRY_TIME) { + // LOGE("Cloud Db send empty data but didn't return QUERY_END for too much time"); + // return -E_CLOUD_ERROR; + // } + // if (ret == E_OK && retryCnt < CloudDbConstant::MAX_DOWNLOAD_RETRY_TIME) { + // LOGW("Cloud Db return E_OK but send empty data, it should return QUERY_END, retry"); + // retryCnt++; + // continue; + // } + // { + // std::lock_guard autoLock(queueLock_); + // currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info); + // } + // break; } // Save data in transaction, update cloud water mark, notify process and changed data - ret = SaveDataNotifyProcess(taskId, param); + ret = SaveDataNotifyProcess(taskId, tableName, downloadData, info, colNames); if (ret != E_OK) { return ret; } @@ -1228,22 +680,7 @@ int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m) return E_OK; } -void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info) -{ - std::lock_guard autoLock(contextLock_); - if (currentContext_.strategy->JudgeUpload()) { - currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info); - } else { - info.tableStatus = FINISHED; - if (cloudTaskInfos_[taskId].table.back() == info.tableName) { - currentContext_.notifier->UpdateProcess(info); - } else { - currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info); - } - } -} - -int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, LocalWaterMark &localMark) +int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName) { int ret = PreCheck(taskId, tableName); if (ret != E_OK) { @@ -1262,14 +699,7 @@ int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &ta return -E_INVALID_ARGS; } } - - if (!IsModeForcePush(taskId)) { - ret = storageProxy_->GetLocalWaterMark(tableName, localMark); - if (ret != E_OK) { - LOGE("[CloudSyncer] Failed to get local water mark when upload, %d.", ret); - } - } - return ret; + return E_OK; } bool CloudSyncer::CheckCloudSyncDataEmpty(CloudSyncData &uploadData) @@ -1285,16 +715,6 @@ int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadPar Info insertInfo; Info updateInfo; Info deleteInfo; - - if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) { - errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record, - uploadData.delData.extend, deleteInfo); - if (errCode != E_OK) { - return errCode; - } - innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount; - } - if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) { errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record, uploadData.insData.extend, insertInfo); @@ -1316,44 +736,32 @@ int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadPar if (errCode != E_OK) { return errCode; } - errCode = storageProxy_->FillCloudAssetForUpload(uploadData); + innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount; + } + + if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) { + errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record, + uploadData.delData.extend, deleteInfo); if (errCode != E_OK) { - LOGE("Cannot fill cloud asset during upload procedure"); return errCode; } - innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount; + innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount; } - bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total; - if (lastBatch) { + if (innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total) { innerProcessInfo.tableStatus = ProcessStatus::FINISHED; } // After each batch upload successed, call NotifyProcess - NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch); + NotifyInBatchUpload(uploadParam, innerProcessInfo); // if batch upload successed, update local water mark // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here. - errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam); + errCode = storageProxy_->PutLocalWaterMark(uploadData.tableName, uploadParam.localMark); if (errCode != E_OK) { LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode); } return errCode; } -int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam) -{ - int errCode = E_OK; - // if we use local cover cloud strategy, it won't update local water mark also. - if (IsModeForcePush(uploadParam.taskId)) { - return E_OK; - } - errCode = storageProxy_->PutLocalWaterMark(tableName, uploadParam.localMark); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode); - return errCode; - } - return E_OK; -} - int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable) { std::string tableName; @@ -1362,16 +770,21 @@ int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable) LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret); return ret; } - - LocalWaterMark localMark = 0u; - ret = PreCheckUpload(taskId, tableName, localMark); + ret = PreCheckUpload(taskId, tableName); if (ret != E_OK) { LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret); return ret; } + LocalWaterMark localMark; + ret = storageProxy_->GetLocalWaterMark(tableName, localMark); + if (ret != E_OK) { + LOGE("[CloudSyncer] Failed to get local water mark when upload, %d.", ret); + return ret; + } + int64_t count = 0; - ret = storageProxy_->GetUploadCount(tableName, localMark, IsModeForcePush(taskId), count); + ret = storageProxy_->GetUploadCount(tableName, localMark, count); if (ret != E_OK) { // GetUploadCount will return E_OK when upload count is zero. LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret); @@ -1401,93 +814,6 @@ int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable) return DoUploadInner(tableName, param); } -static AssetOpType StatusToFlag(AssetStatus status) { - switch (status) - { - case AssetStatus::INSERT: - return AssetOpType::INSERT; - case AssetStatus::DELETE: - return AssetOpType::DELETE; - case AssetStatus::UPDATE: - return AssetOpType::UPDATE; - case AssetStatus::NORMAL: - return AssetOpType::NO_CHANGE; - default: - LOGW("Unexpected Situation and won't be handled, Caller should ensure that current situation won't occur"); - return AssetOpType::NO_CHANGE; - } -} - -static void StatusToFlagForAsset(Asset &asset) -{ - asset.flag = static_cast(StatusToFlag(static_cast(asset.status))); -} - -static void StatusToFlagForAssets(Assets &assets) -{ - for (Asset &asset : assets) { - StatusToFlagForAsset(asset); - } -} - -static void StatusToFlagForAssetsInRecord(const std::vector &fields, VBucket &record) -{ - for (const Field &field : fields) { - if (field.type == TYPE_INDEX && record[field.colName].index() == TYPE_INDEX) { - StatusToFlagForAssets(std::get(record[field.colName])); - } - if (field.type == TYPE_INDEX && record[field.colName].index() == TYPE_INDEX) { - StatusToFlagForAsset(std::get(record[field.colName])); - } - } -} - -void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData) -{ - if (!ShouldProcessAssets()) { - return; - } - std::map> cloudAssets; - { - std::lock_guard autoLock(contextLock_); - cloudAssets = currentContext_.assetsInfo[currentContext_.tableName]; - } - // for delete scenario, assets should not appear in the records. Thereby we needn't tag the assests. - // for insert scenario, gid does not exist. Thereby, we needn't compare with cloud asset get in download procedure - for (size_t i = 0; i < uploadData.insData.extend.size(); i++) { - VBucket cloudAsset; // cloudAsset must be empty - (void)TagAssetsInSingleRecord(uploadData.insData.record[i], cloudAsset, true); - } - // for update scenario, assets shoulb be compared with asset get in download procedure. - for (size_t i = 0; i < uploadData.updData.extend.size(); i++) { - VBucket cloudAsset; - // gid must exist in UPDATE scenario, cause we have re-fill gid during download procedure - // But we need to check for safety - auto gidIter = uploadData.updData.extend[i].find(CloudDbConstant::GID_FIELD); - if (gidIter == uploadData.updData.extend[i].end()) { - LOGE("Datum to be upload must contain gid"); - return; - } - // update data must contain gid, however, we could only pull data after water mark - // Therefore, we need to check whether we contain the data - std::string &gid = std::get(gidIter->second); - if (cloudAssets.find(gid) == cloudAssets.end()) { - // In this case, we directly upload data without compartion and tagging - std::vector assetFields; - { - std::lock_guard autoLock(contextLock_); - assetFields = currentContext_.assetFields[currentContext_.tableName]; - } - StatusToFlagForAssetsInRecord(assetFields, uploadData.updData.record[i]); - continue; - } - for (auto &it : cloudAssets[gid]) { - cloudAsset[it.first] = it.second; - } - (void)TagAssetsInSingleRecord(uploadData.updData.record[i], cloudAsset, true); - } -} - int CloudSyncer::PreProcessBatchUpload(TaskId taskId, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo, LocalWaterMark &localMark) { @@ -1501,53 +827,19 @@ int CloudSyncer::PreProcessBatchUpload(TaskId taskId, CloudSyncData &uploadData, LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret); return ret; } - TagUploadAssets(uploadData); // get local water mark to be updated in future. - if (!IsModeForcePush(taskId)) { - ret = UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total, taskId, localMark); - if (ret != E_OK) { - LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret); - } + ret = CalculateLocalWaterMark(uploadData, innerProcessInfo.upLoadInfo.total, taskId, localMark); + if (ret != E_OK) { + LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret); + return ret; } return ret; } -int CloudSyncer::SaveCloudWaterMark(const TableName &tableName) -{ - CloudWaterMark cloudWaterMark; - { - std::lock_guard autoLock(contextLock_); - auto it = currentContext_.cloudWaterMarks.find(tableName); - if (it == currentContext_.cloudWaterMarks.end()) { - LOGD("[CloudSyncer] Not found water mark just return"); - return E_OK; - } - cloudWaterMark = currentContext_.cloudWaterMarks[tableName]; - } - int errCode = storageProxy_->PutCloudWaterMark(tableName, cloudWaterMark); - if (errCode != E_OK) { - LOGE("[CloudSyncer] Cannot set cloud water mark while Uploading, %d.", errCode); - } - return errCode; -} - -void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData) -{ - std::lock_guard autoLock(queueLock_); - uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH); -} - -bool CloudSyncer::IsModeForcePush(const TaskId taskId) -{ - std::lock_guard autoLock(queueLock_); - return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH; -} - int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam) { ContinueToken continueStmtToken = nullptr; CloudSyncData uploadData(tableName); - SetUploadDataFlag(uploadParam.taskId, uploadData); bool getDataUnfinished = false; int ret = storageProxy_->GetCloudData(tableName, uploadParam.localMark, continueStmtToken, uploadData); @@ -1556,27 +848,28 @@ int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &upload return ret; } - InnerProcessInfo info; - info.tableName = tableName; - info.tableStatus = ProcessStatus::PROCESSING; - info.upLoadInfo.total = uploadParam.count; + InnerProcessInfo innerProcessInfo; + innerProcessInfo.tableName = tableName; + innerProcessInfo.tableStatus = ProcessStatus::PROCESSING; + innerProcessInfo.upLoadInfo.total = uploadParam.count; uint32_t batchIndex = 0; while (!CheckCloudSyncDataEmpty(uploadData)) { getDataUnfinished = (ret == -E_UNFINISHED); - ret = PreProcessBatchUpload(uploadParam.taskId, uploadData, info, uploadParam.localMark); + ret = PreProcessBatchUpload(uploadParam.taskId, uploadData, innerProcessInfo, uploadParam.localMark); if (ret != E_OK) { goto RELEASE_EXIT; } - info.upLoadInfo.batchIndex = ++batchIndex; + innerProcessInfo.upLoadInfo.batchIndex = ++batchIndex; - ret = DoBatchUpload(uploadData, uploadParam, info); + ret = DoBatchUpload(uploadData, uploadParam, innerProcessInfo); if (ret != E_OK) { LOGE("[CloudSyncer] Failed to do upload, %d", ret); - info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount; + innerProcessInfo.upLoadInfo.failCount = + innerProcessInfo.upLoadInfo.total - innerProcessInfo.upLoadInfo.successCount; { std::lock_guard autoLock(contextLock_); - currentContext_.notifier->UpdateProcess(info); + currentContext_.notifier->UpdateProcess(innerProcessInfo); } goto RELEASE_EXIT; } @@ -1586,8 +879,6 @@ int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &upload if (continueStmtToken == nullptr) { break; } - SetUploadDataFlag(uploadParam.taskId, uploadData); - ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData); if ((ret != E_OK) && (ret != -E_UNFINISHED)) { LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret); @@ -1623,19 +914,20 @@ int CloudSyncer::CheckDownloadDatum(VBucket &datum) return E_OK; } -int CloudSyncer::QueryCloudData(const std::string &tableName, const CloudWaterMark &cloudWaterMark, - DownloadData &downloadData) +int CloudSyncer::QueryCloudData(const std::string &tableName, DownloadData &downloadData, + CloudWaterMark &cloudWaterMark) { VBucket extend = { {CloudDbConstant::CURSOR_FIELD, cloudWaterMark} }; - int ret = cloudDB_.Query(tableName, extend, downloadData.data); + ret = cloudDB_.Query(tableName, extend, downloadData.data); downloadData.opType.resize(downloadData.data.size()); if (ret == -E_QUERY_END) { LOGI("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded"); return -E_QUERY_END; } if (ret == E_OK) { + cloudWaterMark = std::get(extend[CloudDbConstant::CURSOR_FIELD]); LOGI("[CloudSyncer] Download data from cloud database success but still has data to download"); return E_OK; } @@ -1654,8 +946,8 @@ int CloudSyncer::CheckParamValid(const std::vector &devices, SyncMode return -E_INVALID_ARGS; } for (const auto &dev: devices) { - if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) { - LOGE("[CloudSyncer] invalid device, size %zu", dev.size()); + if (dev.size() == 0) { + LOGE("[CloudSyncer] invalid devices"); return -E_INVALID_ARGS; } } @@ -1921,38 +1213,25 @@ int CloudSyncer::CheckCloudSyncDataValid(CloudSyncData uploadData, const std::st return E_OK; } -int CloudSyncer::GetWaterMarkAndUpdateTime(std::vector& extend, LocalWaterMark &waterMark) +int CloudSyncer::GetWaterMarkInner(const std::vector& extend, LocalWaterMark &waterMark) { - for (auto &extendData: extend) { + for (const auto &extendData: extend) { if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) { LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist."); return -E_INTERNAL_ERROR; } if (TYPE_INDEX != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) { LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t."); - return -E_INTERNAL_ERROR; - } - if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) { - LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist."); - return -E_INTERNAL_ERROR; - } - if (TYPE_INDEX != extendData.at(CloudDbConstant::CREATE_FIELD).index()) { - LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t."); + return -E_INTERNAL_ERROR; } waterMark = std::max(int64_t(waterMark), std::get(extendData.at(CloudDbConstant::MODIFY_FIELD))); - int64_t modifyTime = - std::get(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND; - int64_t createTime = - std::get(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND; - extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime); - extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime); } return E_OK; } // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark; -int CloudSyncer::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, +int CloudSyncer::CalculateLocalWaterMark(CloudSyncData &uploadData, const int64_t &count, TaskId taskId, LocalWaterMark &waterMark) { int ret = E_OK; @@ -1966,7 +1245,7 @@ int CloudSyncer::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &coun LOGE("[CloudSyncer] Inconsistent size of inserted data."); return -E_INTERNAL_ERROR; } - ret = GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark); + ret = GetWaterMarkInner(uploadData.insData.extend, waterMark); if (ret != E_OK) { return ret; } @@ -1977,7 +1256,7 @@ int CloudSyncer::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &coun LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR); return -E_INTERNAL_ERROR; } - ret = GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark); + ret = GetWaterMarkInner(uploadData.updData.extend, waterMark); if (ret != E_OK) { return ret; } @@ -1988,7 +1267,7 @@ int CloudSyncer::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &coun LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR); return -E_INTERNAL_ERROR; } - ret = GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark); + ret = GetWaterMarkInner(uploadData.delData.extend, waterMark); if (ret != E_OK) { return ret; } @@ -2006,53 +1285,4 @@ void CloudSyncer::ClearCloudSyncData(CloudSyncData &uploadData) std::vector().swap(uploadData.delData.record); std::vector().swap(uploadData.delData.extend); } - -int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector &tableNameList) -{ - std::lock_guard lock(syncMutex_); - std::string emptyString; - int index = 1; - for (const auto &tableName: tableNameList) { - LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index); - int ret = storageProxy_->PutCloudWaterMark(tableName, emptyString); - if (ret != E_OK) { - LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret); - return ret; - } - index++; - } - int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE); - if (errCode != E_OK) { - LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode); - return errCode; - } - - std::vector assets; - errCode = storageProxy_->CleanCloudData(mode, tableNameList, assets); - if (errCode != E_OK) { - LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode); - storageProxy_->Rollback(); - return errCode; - } - - errCode = cloudDB_.RemoveLocalAssets(assets); - if (errCode != E_OK) { - LOGE("[Storage Executor] failed to remove local assets, %d.", errCode); - storageProxy_->Rollback(); - return errCode; - } - storageProxy_->Commit(); - - return errCode; -} - - -void CloudSyncer::ModifyCloudDataTime(VBucket &data) -{ - // data already check field modify_field and create_field - int64_t modifyTime = std::get(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND; - int64_t createTime = std::get(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND; - data[CloudDbConstant::MODIFY_FIELD] = modifyTime; - data[CloudDbConstant::CREATE_FIELD] = createTime; -} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h index 7f2d3df89dd9a5e614d1f1ede35631108d7f43e6..bd95cda1638d6599fc6c9677eb7c6d9baada4997 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h @@ -18,8 +18,6 @@ #include #include #include -#include - #include "cloud_db_proxy.h" #include "cloud/cloud_store_types.h" #include "cloud/cloud_sync_strategy.h" @@ -32,7 +30,6 @@ #include "store_observer.h" namespace DistributedDB { -using DownloadList = std::vector>>; class CloudSyncer : public RefObject { public: explicit CloudSyncer(std::shared_ptr storageProxy); @@ -44,10 +41,6 @@ public: int SetCloudDB(const std::shared_ptr &cloudDB); - int SetIAssetLoader(const std::shared_ptr &loader); - - int CleanCloudData(ClearMode mode, const std::vector &tableNameList); - void Close(); protected: using TaskId = uint64_t; @@ -61,40 +54,20 @@ protected: int64_t timeout; std::vector devices; }; - struct DataInfo { - DataInfoWithLog localInfo; - LogInfo cloudLogInfo; - }; struct InnerProcessInfo { std::string tableName; ProcessStatus tableStatus = ProcessStatus::PREPARED; Info downLoadInfo; Info upLoadInfo; }; - struct AssetDownloadList { - // assets in following list will fill STATUS and timestamp after calling downloading - DownloadList downloadList; - // assets in following list won't fill STATUS and timestamp after calling downloading - DownloadList completeDownloadList; - }; - struct SyncParam { - DownloadData downloadData; - ChangedData changedData; - InnerProcessInfo info; - AssetDownloadList assetsDownloadList; - CloudWaterMark cloudWaterMark; - std::vector pkColNames; - std::string tableName; - }; class ProcessNotifier { public: void Init(const std::vector &tableName, const std::vector &devices); void UpdateProcess(const InnerProcessInfo &process); - void NotifyProcess(const CloudTaskInfo &taskInfo, const InnerProcessInfo &process, - bool notifyWhenError = false); - + void NotifyProcess(const CloudTaskInfo &taskInfo, const InnerProcessInfo &process); + std::vector GetDevices() const; protected: std::mutex processMutex_; @@ -106,12 +79,6 @@ protected: std::string tableName; std::shared_ptr notifier; std::shared_ptr strategy; - std::map> assetFields; - // shoulb be cleared after each Download - AssetDownloadList assetDownloadList; - // store GID and assets, using in upload procedure - std::map>> assetsInfo; - std::map cloudWaterMarks; }; struct UploadParam { int64_t count = 0; @@ -126,19 +93,13 @@ protected: int DoSync(TaskId taskId); - int DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload); - - int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload); + int DoSyncInner(const CloudTaskInfo &taskInfo); void DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo); virtual int DoDownload(TaskId taskId); - int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m); - - void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info); - - int PreCheckUpload(TaskId &taskId, const TableName &tableName, LocalWaterMark &localMark); + int PreCheckUpload(TaskId &taskId, const TableName &tableName); int PreCheck(TaskId &taskId, const TableName &tableName); @@ -149,9 +110,9 @@ protected: bool CheckCloudSyncDataEmpty(CloudSyncData &uploadData); - int GetWaterMarkAndUpdateTime(std::vector& extend, LocalWaterMark &waterMark); + int GetWaterMarkInner(const std::vector& extend, LocalWaterMark &waterMark); - int UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, TaskId taskId, + int CalculateLocalWaterMark(CloudSyncData &uploadData, const int64_t &count, TaskId taskId, LocalWaterMark &waterMark); void ClearCloudSyncData(CloudSyncData &uploadData); @@ -159,19 +120,13 @@ protected: int PreProcessBatchUpload(TaskId taskId, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo, LocalWaterMark &localMark); - int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam); - virtual int DoUpload(TaskId taskId, bool lastTable); - void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData); - - bool IsModeForcePush(const TaskId taskId); - int DoUploadInner(const std::string &tableName, UploadParam &uploadParam); int CheckDownloadDatum(VBucket &datum); - int QueryCloudData(const std::string &tableName, const CloudWaterMark &cloudWaterMark, DownloadData &downloadData); + int QueryCloudData(const std::string &tableName, DownloadData &downloadData, CloudWaterMark &cloudWaterMark); int CheckTaskIdValid(TaskId taskId); @@ -199,56 +154,23 @@ protected: void SetTaskFailed(TaskId taskId, int errCode); - int SaveDatum(SyncParam ¶m, size_t idx, std::vector &InsertDataNoPrimaryKeys); - - int SaveData(SyncParam ¶m); + int SaveData(const TableName &tablename, DownloadData &downloadData, + Info &downloadInfo, CloudWaterMark &latestCloudWaterMark, ChangedData &changedData); - int SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m); + int SaveChangedData(DownloadData &downloadData, + int dataIndex, DataInfoWithLog &localLogInfo, LogInfo &cloudLogInfo, ChangedData &changedData); - int SaveChangedData(SyncParam ¶m, int dataIndex, DataInfo &dataInfo, - std::vector &InsertDataNoPrimaryKeys); + int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, const TableName &tableName, + DownloadData &downloadData, InnerProcessInfo &info, const std::vector &pkColNames); - int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m); - - void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch); - - bool NeedNotifyChangedData(ChangedData &changedData); + void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo); int NotifyChangedData(ChangedData &&changedData); - std::map GetAssetsFromVBucket(VBucket &data); - - std::map TagAssetsInSingleRecord(VBucket &CoveredData, VBucket &BeCoveredData, - bool WriteToCoveredData); - - Assets TagAssetsInSingleCol(VBucket &CoveredData, VBucket &BeCoveredData, const Field &assetField, - bool WriteToCoveredData); - - void TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo); - - void TagUploadAssets(CloudSyncData &uploadData); - - int FillCloudAssets(const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets); - - int HandleDownloadResult(const std::string &tableName, std::string gid, - std::map &DownloadResult, bool setAllNormal); - - int DownloadNotifyAssets(InnerProcessInfo &info, std::vector &pKColNames, - ChangedData &changedAssets); - - int CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult, - ChangedData &changedAssets); - - bool ShouldProcessAssets(); - static int CheckParamValid(const std::vector &devices, SyncMode mode); - void ModifyCloudDataTime(VBucket &data); - - int SaveCloudWaterMark(const TableName &tableName); - std::mutex queueLock_; - TaskId currentTaskId_; + std::atomic currentTaskId_; std::list taskQueue_; std::map cloudTaskInfos_; @@ -256,8 +178,6 @@ protected: TaskContext currentContext_; std::condition_variable contextCv_; - std::mutex syncMutex_; // Clean Cloud Data and Sync are mutually exclusive - CloudDBProxy cloudDB_; std::shared_ptr storageProxy_;