diff --git a/services/distributeddataservice/libs/distributeddb/common/include/db_types.h b/services/distributeddataservice/libs/distributeddb/common/include/db_types.h index c413fcdadade3ba4b541dba5cccc60a5935d4a47..131c95180ba7a3918f4fdc29556912f9160cb064 100755 --- a/services/distributeddataservice/libs/distributeddb/common/include/db_types.h +++ b/services/distributeddataservice/libs/distributeddb/common/include/db_types.h @@ -53,6 +53,7 @@ struct DataItem { // Only use for query sync and subscribe. ATTENTION!!! this flag should not write into mainDB. // Mark the changed row data does not match with query sync(or subscribe) condition. static constexpr uint64_t REMOTE_DEVICE_DATA_MISS_QUERY = 0x10; + static constexpr uint64_t UPDATE_FLAG = 0X20; }; struct PragmaPublishInfo { @@ -129,6 +130,7 @@ struct SyncTimeRange { TimeStamp deleteBeginTime = 0; TimeStamp endTime = static_cast(INT64_MAX); TimeStamp deleteEndTime = static_cast(INT64_MAX); + TimeStamp lastQueryTime = 0; bool IsValid() const { return (beginTime <= endTime && deleteBeginTime <= deleteEndTime); diff --git a/services/distributeddataservice/libs/distributeddb/common/src/relational/relational_schema_object.cpp b/services/distributeddataservice/libs/distributeddb/common/src/relational/relational_schema_object.cpp index 7f4db0870ba378159a3bbbc472d2b202632fb2cd..099d0c899f4400446abfebc8d0a00e9732437c53 100644 --- a/services/distributeddataservice/libs/distributeddb/common/src/relational/relational_schema_object.cpp +++ b/services/distributeddataservice/libs/distributeddb/common/src/relational/relational_schema_object.cpp @@ -590,7 +590,7 @@ RelationalSyncOpinion RelationalSchemaObject::MakeLocalSyncOpinion(const Relatio } if (remoteType != SchemaType::RELATIVE) { - LOGW("[RelationalSchema][opinion] Not support sync with schema type: local-type=[%d] remote-type=[%d]", + LOGW("[RelationalSchema][opinion] Not support sync with schema type: local-type=[%s] remote-type=[%s]", SchemaUtils::SchemaTypeString(localType).c_str(), SchemaUtils::SchemaTypeString(remoteType).c_str()); return {}; } diff --git a/services/distributeddataservice/libs/distributeddb/interfaces/src/relational/relational_store_sqlite_ext.cpp b/services/distributeddataservice/libs/distributeddb/interfaces/src/relational/relational_store_sqlite_ext.cpp index 92421716da3213b66520650538dacc77c11af4f7..b22eb7479bae4c9d187f5044a672a2fb3aeaff21 100644 --- a/services/distributeddataservice/libs/distributeddb/interfaces/src/relational/relational_store_sqlite_ext.cpp +++ b/services/distributeddataservice/libs/distributeddb/interfaces/src/relational/relational_store_sqlite_ext.cpp @@ -328,7 +328,7 @@ int GetCurrentMaxTimeStamp(sqlite3 *db, TimeStamp &maxTimestamp) ResetStatement(getTimeStmt); continue; } - TimeStamp tableMaxTimestamp = sqlite3_column_int64(getTimeStmt, 0); + auto tableMaxTimestamp = static_cast(sqlite3_column_int64(getTimeStmt, 0)); maxTimestamp = (maxTimestamp > tableMaxTimestamp) ? maxTimestamp : tableMaxTimestamp; ResetStatement(getTimeStmt); } diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp index a96a60b07b4d5a720991dc2685b53d2aeae51e96..cf5b70167d91bd2bb9e976701e2bd11abd77cc07 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp @@ -296,8 +296,7 @@ int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector &da token->FinishGetData(); errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED; } - } while (errCode == -E_UNFINISHED && - CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen())); + } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen())); ERROR: if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened. @@ -385,7 +384,7 @@ int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std: const std::string &deviceName) { int errCode = E_OK; - LOGD("[SQLiteSingleVerNaturalStore::SaveSyncData] Get write handle."); + LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle."); auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM); if (handle == nullptr) { return errCode; diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.cpp index 1b2427073f65194962853c86d6461119bcf833aa..5b19ce55d9ea1416d5d5502b402c13f2c3674dd9 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.cpp @@ -123,6 +123,11 @@ std::string GetFlagClauseForRDB() { return "WHERE (b.flag&0x03=0x02)"; } + +std::string GetMissQueryFlagClauseForRDB() +{ + return "WHERE (b.flag&0x23=0x22)"; +} } SqliteQueryHelper::SqliteQueryHelper(const QueryObjInfo &info) @@ -897,7 +902,7 @@ int SqliteQueryHelper::GetSubscribeSql(const std::string &subscribeId, TriggerMo return errCode; } -int SqliteQueryHelper::GetRelationalSyncDataFullSql(std::string &sql, const std::vector &fieldNames) +int SqliteQueryHelper::GetRelationalMissQuerySql(const std::vector &fieldNames, std::string &sql) { if (!isValid_) { return -E_INVALID_QUERY_FORMAT; @@ -909,7 +914,7 @@ int SqliteQueryHelper::GetRelationalSyncDataFullSql(std::string &sql, const std: } sql = GetSelectAndFromClauseForRDB(tableName_, fieldNames); - sql += GetFlagClauseForRDB(); + sql += GetMissQueryFlagClauseForRDB(); sql += GetTimeRangeClauseForRDB(); sql += "ORDER BY " + DBConstant::TIMESTAMP_ALIAS + " ASC;"; return E_OK; @@ -946,11 +951,11 @@ int SqliteQueryHelper::GetRelationalSyncDataQuerySql(std::string &sql, bool hasS return errCode; } -int SqliteQueryHelper::GetRelationalFullStatement(sqlite3 *dbHandle, uint64_t beginTime, uint64_t endTime, +int SqliteQueryHelper::GetRelationalMissQueryStatement(sqlite3 *dbHandle, uint64_t beginTime, uint64_t endTime, const std::vector &fieldNames, sqlite3_stmt *&statement) { std::string sql; - int errCode = GetRelationalSyncDataFullSql(sql, fieldNames); + int errCode = GetRelationalMissQuerySql(fieldNames, sql); if (errCode != E_OK) { LOGE("[Query] Get SQL fail!"); return -E_INVALID_QUERY_FORMAT; diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.h b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.h index 7944ccb79d5fd8e5759e2f59aee16089e7391345..fbfcc9820c83f662298f1cb16518967256cf9203 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_query_helper.h @@ -92,8 +92,8 @@ public: return tableName_; } - int GetRelationalSyncDataFullSql(std::string &sql, const std::vector &fieldNames); - int GetRelationalFullStatement(sqlite3 *dbHandle, uint64_t beginTime, uint64_t endTime, + int GetRelationalMissQuerySql(const std::vector &fieldNames, std::string &sql); + int GetRelationalMissQueryStatement(sqlite3 *dbHandle, uint64_t beginTime, uint64_t endTime, const std::vector &fieldNames, sqlite3_stmt *&statement); int GetRelationalSyncDataQuerySql(std::string &sql, bool hasSubQuery, const std::vector &fieldNames); int GetRelationalQueryStatement(sqlite3 *dbHandle, uint64_t beginTime, uint64_t endTime, diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_natural_store.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_natural_store.cpp index 257df72096bed198b832ab2636d02086045243db..9856f72fed29686cbe1dff1b8ef4053ebfffbfab 100755 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_natural_store.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_natural_store.cpp @@ -1352,7 +1352,7 @@ int SQLiteSingleVerNaturalStore::Rekey(const CipherPassword &passwd) return errCode; } LOGI("Stop the syncer for rekey"); - StopSyncer(); + StopSyncer(true); std::this_thread::sleep_for(std::chrono::milliseconds(5)); // wait for 5 ms errCode = storageEngine_->TryToDisable(true, OperatePerm::REKEY_MONOPOLIZE_PERM); if (errCode != E_OK) { @@ -1439,7 +1439,7 @@ int SQLiteSingleVerNaturalStore::Import(const std::string &filePath, const Ciphe if (errCode != E_OK) { return errCode; } - StopSyncer(); + StopSyncer(true); std::this_thread::sleep_for(std::chrono::milliseconds(5)); // wait for 5 ms std::unique_ptr operation; diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.cpp index 1d5016ca03f648ecc7eda0320bd0584c709603f2..3060169d1e498efa17fbfaa08ee6b20fce821c10 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.cpp @@ -44,8 +44,10 @@ int SQLiteSingleVerRelationalContinueToken::GetStatement(sqlite3 *db, sqlite3_st return errCode; } - if (!queryObj_.IsQueryOnlyByKey()) { // If query only by key, no need to deal with REMOTE_DEVICE_DATA_MISS_QUERY. - errCode = GetFullStatement(db, fullStmt); + // if lastQueryTime equals 0, that means never sync before, need not to send miss query data. + // if queryObj is empty, that means to send all data now, need not to send miss query data. + if (timeRange_.lastQueryTime != 0 && !queryObj_.Empty()) { + errCode = GetMissQueryStatement(db, fullStmt); } if (errCode != E_OK) { SQLiteUtils::ResetStatement(queryStmt, true, errCode); @@ -61,6 +63,7 @@ void SQLiteSingleVerRelationalContinueToken::SetNextBeginTime(const DataItem &th } if (!isGettingDeletedData_) { timeRange_.beginTime = nextBeginTime; + timeRange_.lastQueryTime = std::max(nextBeginTime, timeRange_.lastQueryTime); return; } if ((theLastItem.flag & DataItem::DELETE_FLAG) != 0) { // The last one could be non-deleted. @@ -98,14 +101,14 @@ int SQLiteSingleVerRelationalContinueToken::GetQuerySyncStatement(sqlite3 *db, s return helper.GetRelationalQueryStatement(db, timeRange_.beginTime, timeRange_.endTime, fieldNames_, stmt); } -int SQLiteSingleVerRelationalContinueToken::GetFullStatement(sqlite3 *db, sqlite3_stmt *&stmt) +int SQLiteSingleVerRelationalContinueToken::GetMissQueryStatement(sqlite3 *db, sqlite3_stmt *&stmt) { int errCode = E_OK; SqliteQueryHelper helper = queryObj_.GetQueryHelper(errCode); if (errCode != E_OK) { return errCode; } - return helper.GetRelationalFullStatement(db, timeRange_.beginTime, timeRange_.endTime, fieldNames_, stmt); + return helper.GetRelationalMissQueryStatement(db, timeRange_.lastQueryTime + 1, INT64_MAX, fieldNames_, stmt); } int SQLiteSingleVerRelationalContinueToken::GetDeletedDataStmt(sqlite3 *db, sqlite3_stmt *&stmt) const diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.h b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.h index 88f298aa5caaf7d5e535b0de6e0f92c4eabaae92..04d3ed37bdee25e88763ba9b0c04b84f744ff26a 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_continue_token.h @@ -41,7 +41,7 @@ private: std::string GetDeletedDataSQL() const; int GetQuerySyncStatement(sqlite3 *db, sqlite3_stmt *&stmt); int GetDeletedDataStmt(sqlite3 *db, sqlite3_stmt *&stmt) const; - int GetFullStatement(sqlite3 *db, sqlite3_stmt *&stmt); + int GetMissQueryStatement(sqlite3 *db, sqlite3_stmt *&stmt); static const unsigned int MAGIC_BEGIN = 0x600D0AC7; // for token guard static const unsigned int MAGIC_END = 0x0AC7600D; // for token guard diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp index 1a7a640ce763c50a1870af787ed8741b2fcc8cfe..ad66f016aef6fa85226955bacfcc79c166aa8cdc 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp @@ -19,15 +19,6 @@ #include "db_common.h" namespace DistributedDB { -namespace { - void ResetAllStatements(std::initializer_list stmts, bool finalize, int &errCode) - { - for (sqlite3_stmt *stmt : stmts) { - SQLiteUtils::ResetStatement(stmt, finalize, errCode); - } - } -} - SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable) : SQLiteStorageExecutor(dbHandle, writable, false) {} @@ -181,10 +172,12 @@ int UpgradeFields(sqlite3 *db, const std::vector &tables, std::vect return a.GetColumnId()< b.GetColumnId(); }); int errCode = E_OK; - for (auto table : tables) { - for (auto field : fields) { - std::string alterSql = "ALTER TABLE " + table + " ADD " + field.GetFieldName() + " " + - field.GetDataType() + ";"; + for (const auto &table : tables) { + for (const auto &field : fields) { + std::string alterSql = "ALTER TABLE " + table + " ADD " + field.GetFieldName() + " " + field.GetDataType(); + alterSql += field.IsNotNull() ? " NOT NULL" : ""; + alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : ""; + alterSql += ";"; errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql); if (errCode != E_OK) { LOGE("Alter table failed. %d", errCode); @@ -464,6 +457,7 @@ static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo) logInfo.wTimeStamp = static_cast(sqlite3_column_int64(logStatement, 4)); // 4 means w_timestamp index logInfo.flag = static_cast(sqlite3_column_int64(logStatement, 5)); // 5 means flag index logInfo.flag &= (~DataItem::LOCAL_FLAG); + logInfo.flag &= (~DataItem::UPDATE_FLAG); return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey); // 6 means hashKey index } @@ -474,7 +468,7 @@ static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen) static const size_t maxOrigDevLength = 40; size_t devLength = std::max(maxOrigDevLength, item.origDev.size()); size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) + - Parcel::GetVectorCharLen(item.value) + devLength + appendLen); + Parcel::GetVectorCharLen(item.value) + devLength + appendLen); return dataSize; } @@ -832,59 +826,108 @@ int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataIte return DeleteSyncLog(item, rmLogStmt); } +int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, DataItem &itemGet) +{ + if (saveStmt_.queryStmt == nullptr) { + return -E_INVALID_ARGS; + } + int errCode = SQLiteUtils::BindBlobToStatement(saveStmt_.queryStmt, 1, dataItem.hashKey); // 1 index for hashkey + if (errCode != E_OK) { + return errCode; + } + errCode = SQLiteUtils::BindTextToStatement(saveStmt_.queryStmt, 2, dataItem.dev); // 2 index for devices + if (errCode != E_OK) { + return errCode; + } + + LogInfo logInfoGet; + errCode = SQLiteUtils::StepWithRetry(saveStmt_.queryStmt, isMemDb_); + if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { + errCode = -E_NOT_FOUND; + } else { + errCode = GetLogData(saveStmt_.queryStmt, logInfoGet); + } + itemGet.timeStamp = logInfoGet.timestamp; + SQLiteUtils::ResetStatement(saveStmt_.queryStmt, false, errCode); + return errCode; +} + +int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem, bool &isDefeated) +{ + if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) { + isDefeated = false; // no need to slove conflict except miss query data + return E_OK; + } + + DataItem itemGet; + int errCode = GetSyncDataPre(dataItem, itemGet); + if (errCode != E_OK && errCode != -E_NOT_FOUND) { + LOGE("Failed to get raw data. %d", errCode); + return errCode; + } + isDefeated = (dataItem.timeStamp <= itemGet.timeStamp); // defeated if item timestamp is earlier then raw data + return E_OK; +} + +int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const std::vector &fieldInfos, + const std::string &deviceName, DataItem &item, TimeStamp &maxTimestamp) +{ + item.dev = deviceName; + bool isDefeated = false; + int errCode = CheckDataConflictDefeated(item, isDefeated); + if (errCode != E_OK) { + LOGE("check data conflict failed. %d", errCode); + return errCode; + } + + if (isDefeated) { + LOGD("Data was defeated."); + return E_OK; + } + if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) { + return ProcessMissQueryData(item, saveStmt_.rmDataStmt, saveStmt_.rmLogStmt); + } + int64_t rowid = -1; + errCode = SaveSyncDataItem(item, saveStmt_.saveDataStmt, saveStmt_.rmDataStmt, fieldInfos, rowid); + if (errCode == E_OK || errCode == -E_NOT_FOUND) { + errCode = SaveSyncLog(saveStmt_.saveLogStmt, saveStmt_.queryStmt, item, maxTimestamp, rowid); + } + return errCode; +} + int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(const QueryObject &object, std::vector &dataItems, const std::string &deviceName, TimeStamp &maxTimestamp) { - sqlite3_stmt *saveDataStmt = nullptr; - int errCode = PrepareForSavingData(object, saveDataStmt); + int errCode = PrepareForSavingData(object, saveStmt_.saveDataStmt); if (errCode != E_OK) { return errCode; } - sqlite3_stmt *saveLogStmt = nullptr; - sqlite3_stmt *queryStmt = nullptr; - errCode = PrepareForSavingLog(object, deviceName, saveLogStmt, queryStmt); + errCode = PrepareForSavingLog(object, deviceName, saveStmt_.saveLogStmt, saveStmt_.queryStmt); if (errCode != E_OK) { - SQLiteUtils::ResetStatement(saveDataStmt, true, errCode); + SQLiteUtils::ResetStatement(saveStmt_.saveDataStmt, true, errCode); return errCode; } - std::map colInfos = table_.GetFields(); std::vector fieldInfos; - for (const auto &col: colInfos) { + for (const auto &col: table_.GetFields()) { fieldInfos.push_back(col.second); } - sqlite3_stmt *rmDataStmt = nullptr; - sqlite3_stmt *rmLogStmt = nullptr; + for (auto &item : dataItems) { if (item.neglect) { // Do not save this record if it is neglected continue; } - item.dev = deviceName; - if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) { - errCode = ProcessMissQueryData(item, rmDataStmt, rmLogStmt); - if (errCode != E_OK) { - break; - } - continue; - } - int64_t rowid = -1; - errCode = SaveSyncDataItem(item, saveDataStmt, rmDataStmt, fieldInfos, rowid); - if (errCode != E_OK && errCode != -E_NOT_FOUND) { - LOGE("Save sync dataitem failed:%d.", errCode); - break; - } - errCode = SaveSyncLog(saveLogStmt, queryStmt, item, maxTimestamp, rowid); + errCode = SaveSyncDataItem(fieldInfos, deviceName, item, maxTimestamp); if (errCode != E_OK) { - LOGE("Save sync log failed:%d.", errCode); break; } maxTimestamp = std::max(item.timeStamp, maxTimestamp); // Need not reset rmDataStmt and rmLogStmt here. - ResetAllStatements({ saveDataStmt, saveLogStmt, queryStmt }, false, errCode); + saveStmt_.ResetStatements(false); } if (errCode == -E_NOT_FOUND) { errCode = E_OK; } - ResetAllStatements({ saveDataStmt, saveLogStmt, queryStmt, rmDataStmt, rmLogStmt }, true, errCode); + saveStmt_.ResetStatements(true); return errCode; } @@ -929,49 +972,81 @@ int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *s } } - errCode = DataTransformer::SerializeDataItem(data, table_.GetFieldInfos(), dataItem); + errCode = DataTransformer::SerializeDataItem(data, + isGettingDeletedData ? std::vector() : table_.GetFieldInfos(), dataItem); if (errCode != E_OK) { LOGE("relational data value transfer to kv fail"); } return errCode; } -int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(std::vector &dataItems, size_t &dataTotalSize, - const Key &cursorHashKey, sqlite3_stmt *fullStmt, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) +int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item) { - int errCode = E_OK; - while (true) { - DataItem item; - errCode = SQLiteUtils::StepWithRetry(fullStmt, isMemDb_); - if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { - errCode = GetDataItemForSync(fullStmt, item, false); - if (errCode != E_OK) { - break; - } - } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { - errCode = -E_FINISHED; - break; - } else { - LOGE("Get full data failed:%d.", errCode); - break; - } - if (item.hashKey == cursorHashKey) { - break; - } - item.value = {}; - item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; + int errCode = GetDataItemForSync(fullStmt, item, false); + if (errCode != E_OK) { + return errCode; + } + item.value = {}; + item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; + return errCode; +} +namespace { +int StepNext(bool isMemDB, sqlite3_stmt *stmt, TimeStamp ×tamp) +{ + if (stmt == nullptr) { + return -E_INVALID_ARGS; + } + int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB); + if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { + timestamp = INT64_MAX; + errCode = E_OK; + } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { + timestamp = static_cast(sqlite3_column_int64(stmt, 3)); // 3 means timestamp index + errCode = E_OK; + } + return errCode; +} + +int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize, + std::vector &dataItems, DataItem &&item) +{ + // If one record is over 4M, ignore it. + if (item.value.size() > DBConstant::MAX_VALUE_SIZE) { + overLongSize++; + } else { // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item. dataTotalSize += GetDataItemSerialSize(item, appendLength); - if ((dataTotalSize > dataSizeInfo.blockSize && !dataItems.empty()) || - dataItems.size() >= dataSizeInfo.packetSize) { - errCode = -E_UNFINISHED; - break; + if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) { + return -E_UNFINISHED; } else { - dataItems.push_back(std::move(item)); + dataItems.push_back(item); } } - return errCode; + return E_OK; +} +} + +int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData, + sqlite3_stmt *queryStmt, DataItem &item, TimeStamp &queryTime) +{ + if (!isFirstTime) { // For the first time, never step before, can get nothing + int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData); + if (errCode != E_OK) { + return errCode; + } + } + return StepNext(isMemDb_, queryStmt, queryTime); +} + +int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item, + TimeStamp &missQueryTime) +{ + int errCode = GetMissQueryData(fullStmt, item); + if (errCode != E_OK) { + return errCode; + } + return StepNext(isMemDb_, fullStmt, missQueryTime); } int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector &dataItems, size_t appendLength, @@ -988,45 +1063,40 @@ int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector DBConstant::MAX_VALUE_SIZE) { - overLongSize++; - continue; + if (!isFirstTime) { + errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item)); + if (errCode != E_OK) { + break; + } } - // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item. - dataTotalSize += GetDataItemSerialSize(item, appendLength); - if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) { - errCode = -E_UNFINISHED; + isFirstTime = false; + if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) { + errCode = -E_FINISHED; break; - } else { - dataItems.push_back(std::move(item)); } } while (true); if (overLongSize != 0) { @@ -1201,5 +1271,26 @@ int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableI SQLiteUtils::ResetStatement(stmt, true, errCode); return errCode; } + +int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataStmt::ResetStatements(bool isNeedFinalize) +{ + int errCode = E_OK; + if (saveDataStmt != nullptr) { + SQLiteUtils::ResetStatement(saveDataStmt, isNeedFinalize, errCode); + } + if (saveLogStmt != nullptr) { + SQLiteUtils::ResetStatement(saveLogStmt, isNeedFinalize, errCode); + } + if (queryStmt != nullptr) { + SQLiteUtils::ResetStatement(queryStmt, isNeedFinalize, errCode); + } + if (rmDataStmt != nullptr) { + SQLiteUtils::ResetStatement(rmDataStmt, isNeedFinalize, errCode); + } + if (rmLogStmt != nullptr) { + SQLiteUtils::ResetStatement(rmLogStmt, isNeedFinalize, errCode); + } + return errCode; +} } // namespace DistributedDB #endif diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h index 9f4d170fadee612219d3f2be7f9ae740a8d3ca07..bbb63d72b7bae10b0d180d2b7a21c779e73cd0ae 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h @@ -71,11 +71,28 @@ public: int CheckQueryObjectLegal(const TableInfo &table, QueryObject &query); private: + struct SaveSyncDataStmt { + sqlite3_stmt *saveDataStmt = nullptr; + sqlite3_stmt *saveLogStmt = nullptr; + sqlite3_stmt *queryStmt = nullptr; + sqlite3_stmt *rmDataStmt = nullptr; + sqlite3_stmt *rmLogStmt = nullptr; + + int ResetStatements(bool isNeedFinalize); + }; + int PrepareForSyncDataByTime(TimeStamp begin, TimeStamp end, sqlite3_stmt *&statement, bool getDeletedData) const; int GetDataItemForSync(sqlite3_stmt *statement, DataItem &dataItem, bool isGettingDeletedData) const; + int GetSyncDataPre(const DataItem &dataItem, DataItem &itemGet); + + int CheckDataConflictDefeated(const DataItem &item, bool &isDefeated); + + int SaveSyncDataItem(const std::vector &fieldInfos, const std::string &deviceName, DataItem &item, + TimeStamp &maxTimestamp); + int SaveSyncDataItems(const QueryObject &object, std::vector &dataItems, const std::string &deviceName, TimeStamp &timeStamp); int SaveSyncDataItem(const DataItem &dataItem, sqlite3_stmt *&saveDataStmt, sqlite3_stmt *&rmDataStmt, @@ -93,12 +110,16 @@ private: int DeleteSyncLog(const DataItem &item, sqlite3_stmt *&rmLogStmt); int ProcessMissQueryData(const DataItem &item, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt); - int GetMissQueryData(std::vector &dataItems, size_t &dataTotalSize, const Key &cursorHashKey, - sqlite3_stmt *fullStmt, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo); + int GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item); + int GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData, sqlite3_stmt *queryStmt, DataItem &item, + TimeStamp &queryTime); + int GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item, TimeStamp &missQueryTime); void SetTableInfo(const TableInfo &tableInfo); // When put or get sync data, must call the func first. std::string baseTblName_; TableInfo table_; // Always operating table, user table when get, device table when put. + + SaveSyncDataStmt saveStmt_; }; } // namespace DistributedDB #endif diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_utils.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_utils.cpp index a65a533d93c24a728c78f22a5078d56ae45ff6ac..5a77599c93139b953e32505c7fe6f41ea2268863 100755 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_utils.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_utils.cpp @@ -1361,7 +1361,7 @@ std::string GetUpdateTrigger(const TableInfo table) updateTrigger += "ON " + table.GetTableName() + "\n"; updateTrigger += "BEGIN\n"; updateTrigger += "\t UPDATE " + DBConstant::RELATIONAL_PREFIX + table.GetTableName() + "_log"; - updateTrigger += " SET timestamp=get_sys_time(0), device='" + table.GetDevId() + "'"; + updateTrigger += " SET timestamp=get_sys_time(0), device='" + table.GetDevId() + "', flag=0x22"; updateTrigger += " where hash_key=calc_hash(old." + table.GetPrimaryKey() + ") and flag&0x02=0x02;\n"; updateTrigger += "END;"; return updateTrigger; diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp index 4200046d94c9e428da50e767097e3f3ec761b738..2daf61d3288341bb78305c06dc1a921940bf7def 100755 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp @@ -26,6 +26,7 @@ const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1; SyncAbleKvDB::SyncAbleKvDB() : started_(false), + closed_(false), isSyncModuleActiveCheck_(false), isSyncNeedActive_(true), notifyChain_(nullptr), @@ -73,7 +74,7 @@ void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData void SyncAbleKvDB::Close() { - StopSyncer(); + StopSyncer(true); } // Start a sync action. @@ -149,13 +150,19 @@ void SyncAbleKvDB::ReSetSyncModuleActive() // Start syncer void SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive) +{ + std::unique_lock lock(syncerOperateLock_); + StartSyncerWithNoLock(isCheckSyncActive, isNeedActive); + closed_ = false; +} + +void SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive) { IKvDBSyncInterface *syncInterface = GetSyncInterface(); if (syncInterface == nullptr) { LOGF("KvDB got null sync interface."); return; } - std::unique_lock lock(syncerOperateLock_); if (!isCheckSyncActive) { SetSyncModuleActive(); isNeedActive = GetSyncModuleActive(); @@ -181,16 +188,22 @@ void SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive) } // Stop syncer -void SyncAbleKvDB::StopSyncer() +void SyncAbleKvDB::StopSyncer(bool isClosed) { std::unique_lock lock(syncerOperateLock_); + StopSyncerWithNoLock(isClosed); +} + +void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosed) +{ ReSetSyncModuleActive(); syncer_.Close(); if (started_) { started_ = false; } + closed_ = isClosed; if (userChangeListerner_ != nullptr) { - userChangeListerner_->Drop(true); + userChangeListerner_->Drop(false); userChangeListerner_ = nullptr; } } @@ -199,28 +212,27 @@ void SyncAbleKvDB::UserChangeHandle() { bool isNeedChange; bool isNeedActive = true; - { - IKvDBSyncInterface *syncInterface = GetSyncInterface(); - if (syncInterface == nullptr) { - LOGF("KvDB got null sync interface."); - return; - } - std::unique_lock lock(syncerOperateLock_); - std::string userId = syncInterface->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, ""); - std::string appId = syncInterface->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, ""); - std::string storeId = syncInterface->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, ""); - isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(userId, appId, storeId); - isNeedChange = (isNeedActive != isSyncNeedActive_) ? true : false; + IKvDBSyncInterface *syncInterface = GetSyncInterface(); + if (syncInterface == nullptr) { + LOGF("KvDB got null sync interface."); + return; + } + std::unique_lock lock(syncerOperateLock_); + if (closed_) { + LOGI("kvDB is already closed"); + return; } + std::string userId = syncInterface->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, ""); + std::string appId = syncInterface->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, ""); + std::string storeId = syncInterface->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, ""); + isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(userId, appId, storeId); + isNeedChange = (isNeedActive != isSyncNeedActive_) ? true : false; // non_active to active or active to non_active if (isNeedChange) { - StopSyncer(); // will drop userChangeListerner; - { - std::unique_lock lock(syncerOperateLock_); - isSyncModuleActiveCheck_ = true; - isSyncNeedActive_ = isNeedActive; - } - StartSyncer(true, isNeedActive); + StopSyncerWithNoLock(); // will drop userChangeListerner; + isSyncModuleActiveCheck_ = true; + isSyncNeedActive_ = isNeedActive; + StartSyncerWithNoLock(true, isNeedActive); } } @@ -228,7 +240,7 @@ void SyncAbleKvDB::ChangeUserListerner() { // only active to non_active call, put into USER_NON_ACTIVE_EVENT listerner from USER_ACTIVE_TO_NON_ACTIVE_EVENT if (userChangeListerner_ != nullptr) { - userChangeListerner_->Drop(true); + userChangeListerner_->Drop(false); userChangeListerner_ = nullptr; } if (userChangeListerner_ == nullptr) { diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h index 3d565a5cca5e32145a36a2134d6badb1dc65e10c..14b0bcd75a03b2ffed80c51d4c399c4079bbc267 100755 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h @@ -94,8 +94,12 @@ protected: // Start syncer void StartSyncer(bool isCheckSyncActive = false, bool isNeedActive = true); + void StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive); + // Stop syncer - void StopSyncer(); + void StopSyncer(bool isClosed = false); + + void StopSyncerWithNoLock(bool isClosed = false); void UserChangeHandle(); @@ -113,6 +117,7 @@ private: SyncerProxy syncer_; std::atomic started_; + std::atomic closed_; std::atomic isSyncModuleActiveCheck_; std::atomic isSyncNeedActive_; mutable std::shared_mutex notifyChainLock_; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.cpp index 0742cafa840fe37f6bd8913432deae42ff520169..8e56882befb00df2d326ad20730aa222797621f7 100755 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.cpp @@ -428,6 +428,23 @@ int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify, return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark); } +int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, TimeStamp &timeStamp) +{ + QueryWaterMark queryWaterMark; + int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark); + if (errCode != E_OK) { + return errCode; + } + timeStamp = queryWaterMark.lastQueryTime; + return E_OK; +} + +int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, + const TimeStamp &timeStamp) +{ + return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timeStamp); +} + int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift) { DeleteWaterMark deleteWaterMark; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.h b/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.h index ff14a90c16c90f9066fbda51727c471b193d14b4..16d7cbb207086098b38956bb40c28962b1bc1b82 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/meta_data.h @@ -38,7 +38,7 @@ struct MetaDataValue { class Metadata { public: Metadata(); - ~Metadata(); + virtual ~Metadata(); int Initialize(ISyncInterface *storage); @@ -82,6 +82,11 @@ public: int GetRecvQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId, WaterMark &waterMark); + virtual int SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, + const TimeStamp &timeStamp); + + virtual int GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, TimeStamp &timeStamp); + int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark); // the deleteSync's sendWatermark will increase by the device watermark diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.cpp index 7884d46b0ab45646d2674117d0ceafb115f4c0ec..3a6a5cc653eb6581646668775b396122919182e3 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.cpp @@ -28,7 +28,7 @@ namespace { const int MAX_CACHE_ITEMS = 200; const uint32_t MAX_STORE_ITEMS = 100000; // WaterMark Version - constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0; + constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0; constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0; // Prefix Key in db const std::string QUERY_SYNC_PREFIX_KEY = "querySync"; @@ -193,6 +193,21 @@ int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIden return SetRecvQueryWaterMarkWithoutLock(deviceId, cacheKey, waterMark); } +int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify, + const std::string &deviceId, const TimeStamp &timeStamp) +{ + std::string cacheKey; + GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey); + std::lock_guard autoLock(queryWaterMarkLock_); + QueryWaterMark queryWaterMark; + int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark); + if (errCode != E_OK) { + return errCode; + } + queryWaterMark.lastQueryTime = timeStamp; + return UpdateCacheAndSave(cacheKey, deviceId, queryWaterMark); +} + int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &deviceId, const std::string &cacheKey, const WaterMark &waterMark) { @@ -291,6 +306,7 @@ int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &quer parcel.WriteUInt64(queryWaterMark.recvWaterMark); parcel.WriteUInt64(queryWaterMark.lastUsedTime); parcel.WriteString(queryWaterMark.sql); + parcel.WriteUInt64(queryWaterMark.lastQueryTime); if (parcel.IsError()) { LOGE("[Meta] Parcel error when serialize queryWaterMark"); return -E_PARSE_FAIL; @@ -307,6 +323,9 @@ int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWate parcel.ReadUInt64(queryWaterMark.recvWaterMark); parcel.ReadUInt64(queryWaterMark.lastUsedTime); parcel.ReadString(queryWaterMark.sql); + if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) { + parcel.ReadUInt64(queryWaterMark.lastQueryTime); + } if (parcel.IsError()) { LOGE("[Meta] Parcel error when deserialize queryWaterMark"); return -E_PARSE_FAIL; @@ -322,6 +341,7 @@ uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterM length += Parcel::GetUInt64Len(); // recvWaterMark length += Parcel::GetUInt64Len(); // lastUsedTime length += Parcel::GetStringLen(queryWaterMark.sql); + length += Parcel::GetUInt64Len(); // lastQueryTime return length; } diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.h b/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.h index 11a95850b21037d2337920b923d0049eec7f8afb..609bfeab33df4db95e102e8f5d65c73b29e8110b 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/query_sync_water_mark_helper.h @@ -26,11 +26,12 @@ namespace DistributedDB { struct QueryWaterMark { - uint32_t version = 0; + uint32_t version = 0; // start with 103 WaterMark sendWaterMark = 0; WaterMark recvWaterMark = 0; TimeStamp lastUsedTime = 0; // use for delete data std::string sql; // for analyze sql from logs + TimeStamp lastQueryTime = 0; // use for miss query scene add in 106 }; struct DeleteWaterMark { @@ -76,6 +77,9 @@ public: int SetRecvQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId, const WaterMark &waterMark); + int SetLastQueryTime(const std::string &queryIdentify, + const std::string &deviceId, const TimeStamp &timeStamp); + int GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark); int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark); diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_message_schedule.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_message_schedule.cpp index b866028f0828437be053dd53622e64b8fbfb8349..ff2a3b3656e2621a78cf77934b97068e332131ca 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_message_schedule.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_message_schedule.cpp @@ -193,8 +193,7 @@ Message *SingleVerDataMessageSchedule::GetMsgFromMap(bool &isNeedHandle) uint64_t packetId = packet->GetPacketId(); if (sequenceId < expectedSequenceId_) { uint64_t revisePacketId = finishedPacketId_ - (expectedSequenceId_ - 1 - sequenceId); - LOGI("[DataMsgSchedule] msg seqId=%llu less than exSeqId=%llu,pacId=%llu,revisePacId=%llu,label=%s,dev=%s", - sequenceId, expectedSequenceId_, packetId, revisePacketId, label_.c_str(), STR_MASK(deviceId_)); + LOGI("[DataMsgSchedule] drop msg because seqId less than exSeqId"); if (packetId < revisePacketId) { delete msg; continue; @@ -205,8 +204,7 @@ Message *SingleVerDataMessageSchedule::GetMsgFromMap(bool &isNeedHandle) } if (sequenceId == expectedSequenceId_) { if (packetId < finishedPacketId_) { - LOGI("[DataMsgSchedule] drop msg seqId=%llu,packetId=%llu,label=%s,dev=%s", sequenceId, packetId, - label_.c_str(), STR_MASK(deviceId_)); + LOGI("[DataMsgSchedule] drop msg because packetId less than finishedPacketId"); delete msg; continue; } diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp index 5f0e7066b8046bb69c0f1686e07a07b151a9cc1e..185253f0a6541881d0df77696822bc89771eb6ac 100755 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp @@ -166,13 +166,25 @@ int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const LOGI("[DataSync] ignore ack,sessionId is different"); return E_OK; } + TimeStamp lastQueryTime = 0; if (reSendMap_.count(sequenceId) != 0) { + lastQueryTime = reSendMap_[sequenceId].end; reSendMap_.erase(sequenceId); windowSize_++; } else { LOGI("[DataSync] ack seqId not in map"); return E_OK; } + if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) { + TimeStamp dbLastQueryTime = 0; + int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime); + if (errCode == E_OK && dbLastQueryTime < lastQueryTime) { + errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime); + } + if (errCode != E_OK) { + return errCode; + } + } if (!isAllDataHasSent_) { return InnerSyncStart(context); } else if (reSendMap_.size() == 0) { @@ -324,10 +336,15 @@ int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vec } else { WaterMark deletedStartMark = 0; GetLocalDeleteSyncWaterMark(context, deletedStartMark); - - QuerySyncObject queryObj = context->GetQuery(); - errCode = storage_->GetSyncData(queryObj, SyncTimeRange{ startMark, deletedStartMark, endMark, endMark}, - syncDataSizeInfo, token, outData); + TimeStamp lastQueryTimeStamp = 0; + errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), + context->GetDeleteSyncId(), lastQueryTimeStamp); + if (errCode == E_OK) { + QuerySyncObject queryObj = context->GetQuery(); + errCode = storage_->GetSyncData(queryObj, + SyncTimeRange{ startMark, deletedStartMark, endMark, endMark, lastQueryTimeStamp}, + syncDataSizeInfo, token, outData); + } } context->SetContinueToken(token); if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) { diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h index 54d24f678478b68116441d28d18c403e12b3bde6..657dd3c0236c699ba9664788b1819ae11bb53f3f 100755 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h @@ -220,7 +220,7 @@ protected: virtual int RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context); - void UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context); + virtual void UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context); void FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet, DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode); @@ -233,8 +233,6 @@ protected: int ControlCmdStartCheck(SingleVerSyncTaskContext *context); - void FillControlRequestPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context); - int SendControlPacket(const ControlRequestPacket *packet, SingleVerSyncTaskContext *context); int ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message); diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.cpp index 2136dc6f80eb1d996d7312218717000601b34f25..c92ca4f48ee53116c9d9ad2f5f929fc98069708e 100755 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.cpp @@ -1093,10 +1093,17 @@ void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int er PushPullDataRequestEvokeErrHandle(); break; case -E_BUSY: - case -E_SECURITY_OPTION_CHECK_ERROR: - case -E_INVALID_QUERY_FORMAT: - case -E_INVALID_QUERY_FIELD: + case -E_DISTRIBUTED_SCHEMA_CHANGED: + case -E_DISTRIBUTED_SCHEMA_NOT_FOUND: + case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND: + case -E_FEEDBACK_UNKNOWN_MESSAGE: case -E_INTERCEPT_DATA_FAIL: + case -E_INVALID_QUERY_FIELD: + case -E_INVALID_QUERY_FORMAT: + case -E_MAX_LIMITS: + case -E_NOT_REGISTER: + case -E_NOT_SUPPORT: + case -E_SECURITY_OPTION_CHECK_ERROR: context_->SetTaskErrCode(errCode); SwitchStateAndStep(Event::INNER_ERR_EVENT); break; @@ -1135,17 +1142,19 @@ void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handl context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED); } break; - case -E_EKEYREVOKED: - case -E_SECURITY_OPTION_CHECK_ERROR: case -E_BUSY: - case -E_INVALID_QUERY_FORMAT: - case -E_INVALID_QUERY_FIELD: - case -E_FEEDBACK_UNKNOWN_MESSAGE: + case -E_DISTRIBUTED_SCHEMA_CHANGED: + case -E_DISTRIBUTED_SCHEMA_NOT_FOUND: + case -E_EKEYREVOKED: case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND: - case -E_NOT_SUPPORT: + case -E_FEEDBACK_UNKNOWN_MESSAGE: case -E_INTERCEPT_DATA_FAIL: - case -E_NOT_REGISTER: + case -E_INVALID_QUERY_FIELD: + case -E_INVALID_QUERY_FORMAT: case -E_MAX_LIMITS: + case -E_NOT_REGISTER: + case -E_NOT_SUPPORT: + case -E_SECURITY_OPTION_CHECK_ERROR: if (handleError) { context_->SetTaskErrCode(errCode); } diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.h b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.h index eeb06e057fc4a1f84f98d611047c647543d2d800..44aca53fd788b587f7484d0eb566e80f39956658 100755 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_sync_state_machine.h @@ -129,6 +129,8 @@ protected: int TimeMarkSyncRecv(const Message *inMsg); + void DataAckRecvErrCodeHandle(int errCode, bool handleError); + private: // Used to init sync state machine switchbables static void InitStateSwitchTables(); @@ -206,8 +208,6 @@ private: SyncType GetSyncType(uint32_t messageId) const; - void DataAckRecvErrCodeHandle(int errCode, bool handleError); - void JumpStatusAfterAbilitySync(int mode); void ControlAckRecvErrCodeHandle(int errCode); diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp index 8a89423c2105065026b1b9278783d212a41789b1..68a2711d14b06584c495f683929b0e20632ef685 100755 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.cpp @@ -938,4 +938,27 @@ END: SQLiteUtils::ResetStatement(stmt, true, errCode); return errCode; } + +int RelationalTestUtils::CheckTableRecords(sqlite3 *db, const std::string &table) +{ + if (db == nullptr || table.empty()) { + return -E_INVALID_ARGS; + } + int count = -1; + std::string sql = "select count(1) from " + table + ";"; + + sqlite3_stmt *stmt = nullptr; + int errCode = SQLiteUtils::GetStatement(db, sql, stmt); + if (errCode != E_OK) { + goto END; + } + + errCode = SQLiteUtils::StepWithRetry(stmt); + if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { + count = sqlite3_column_int(stmt, 0); + } +END: + SQLiteUtils::ResetStatement(stmt, true, errCode); + return count; +} } // namespace DistributedDBUnitTest diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.h index bad74974fd7c2207b6b2ee198e7cd4a30c63fb72..d87bdcc5b7b0939a727cb90f2fe3214cb391181d 100755 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/common/distributeddb_tools_unit_test.h @@ -305,6 +305,7 @@ public: static int ExecSql(sqlite3 *db, const std::string &sql); static void CreateDeviceTable(sqlite3 *db, const std::string &table, const std::string &device); static int CheckSqlResult(sqlite3 *db, const std::string &sql, bool &result); + static int CheckTableRecords(sqlite3 *db, const std::string &table); }; } // namespace DistributedDBUnitTest diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_relational_test.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_relational_test.cpp index c96e31d1bc283ce7164662031d02469855b90b09..7f9d9be34e8a3982afca516d42a58b93743a1f16 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_relational_test.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_relational_test.cpp @@ -420,6 +420,10 @@ void TableModifyTest(const std::string &modifySql, DBStatus expect) EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK); EXPECT_EQ(RelationalTestUtils::ExecSql(db, NORMAL_CREATE_TABLE_SQL), SQLITE_OK); + RelationalTestUtils::CreateDeviceTable(db, "sync_data", "DEVICE_A"); + RelationalTestUtils::CreateDeviceTable(db, "sync_data", "DEVICE_B"); + RelationalTestUtils::CreateDeviceTable(db, "sync_data", "DEVICE_C"); + /** * @tc.steps:step2. Open store * @tc.expected: step2. return OK @@ -466,7 +470,7 @@ void TableModifyTest(const std::string &modifySql, DBStatus expect) */ HWTEST_F(DistributedDBInterfacesRelationalTest, RelationalTableModifyTest001, TestSize.Level1) { - TableModifyTest("ALTER TABLE sync_data ADD COLUMN add_field INTEGER;", OK); + TableModifyTest("ALTER TABLE sync_data ADD COLUMN add_field INTEGER NOT NULL DEFAULT 123;", OK); } /** @@ -556,6 +560,18 @@ HWTEST_F(DistributedDBInterfacesRelationalTest, RelationalTableModifyTest004, Te EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK); } +/** + * @tc.name: RelationalTableModifyTest005 + * @tc.desc: Test modify distributed table with compatible upgrade + * @tc.type: FUNC + * @tc.require: AR000GK58F + * @tc.author: lianhuix + */ +HWTEST_F(DistributedDBInterfacesRelationalTest, RelationalTableModifyTest005, TestSize.Level1) +{ + TableModifyTest("ALTER TABLE sync_data ADD COLUMN add_field STRING NOT NULL DEFAULT 'asdf';", OK); +} + /** * @tc.name: RelationalRemoveDeviceDataTest001 * @tc.desc: Test remove device data @@ -729,4 +745,4 @@ HWTEST_F(DistributedDBInterfacesRelationalTest, RelationalOpenStorePressureTest0 std::this_thread::sleep_for(std::chrono::microseconds(100 * u(e))); } openStoreThread.join(); -} \ No newline at end of file +} diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_get_data_test.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_get_data_test.cpp index b715fa7f8d8f4762058a96ef2439c086f62cd9af..16ca8a6a8aef1ec2d255fb57fffacc730fca6769 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_get_data_test.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_get_data_test.cpp @@ -190,6 +190,7 @@ int PutBatchData(uint32_t totalCount, uint32_t valueSize) if (errCode != SQLITE_OK) { goto ERROR; } + EXPECT_EQ(sqlite3_exec(db, "BEGIN IMMEDIATE TRANSACTION", nullptr, nullptr, nullptr), SQLITE_OK); errCode = SQLiteUtils::GetStatement(db, sql, stmt); if (errCode != E_OK) { goto ERROR; @@ -208,6 +209,11 @@ int PutBatchData(uint32_t totalCount, uint32_t valueSize) } ERROR: + if (errCode == E_OK) { + EXPECT_EQ(sqlite3_exec(db, "COMMIT TRANSACTION", nullptr, nullptr, nullptr), SQLITE_OK); + } else { + EXPECT_EQ(sqlite3_exec(db, "ROLLBACK TRANSACTION", nullptr, nullptr, nullptr), SQLITE_OK); + } SQLiteUtils::ResetStatement(stmt, true, errCode); errCode = SQLiteUtils::MapSQLiteErrno(errCode); sqlite3_close(db); @@ -236,6 +242,9 @@ void DistributedDBRelationalGetDataTest::TearDownTestCase(void) void DistributedDBRelationalGetDataTest::SetUp(void) { DistributedDBToolsUnitTest::PrintTestCaseInfo(); + if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { + LOGE("rm test db files error."); + } CreateDBAndTable(); } @@ -245,9 +254,7 @@ void DistributedDBRelationalGetDataTest::TearDown(void) EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK); g_delegate = nullptr; } - if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { - LOGE("rm test db files error."); - } + return; } @@ -563,19 +570,14 @@ HWTEST_F(DistributedDBRelationalGetDataTest, GetQuerySyncData2, TestSize.Level1) std::vector entries; EXPECT_EQ(store->GetSyncData(queryObj, SyncTimeRange {}, DataSizeSpecInfo {}, token, entries), E_OK); EXPECT_EQ(token, nullptr); - EXPECT_EQ(entries.size(), RECORD_COUNT); // expect 98 records. in addition to that, there are 2 miss query data. size_t expectCount = 98; // expect 98 records. - size_t count = 0; + EXPECT_EQ(entries.size(), expectCount); for (auto iter = entries.begin(); iter != entries.end(); ++iter) { - if (((*iter)->GetFlag() & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) { - count++; - } auto nextOne = std::next(iter, 1); if (nextOne != entries.end()) { EXPECT_LT((*iter)->GetTimestamp(), (*nextOne)->GetTimestamp()); } } - EXPECT_EQ(count, expectCount); SingleVerKvEntry::Release(entries); /** @@ -587,14 +589,10 @@ HWTEST_F(DistributedDBRelationalGetDataTest, GetQuerySyncData2, TestSize.Level1) queryObj.SetSchema(store->GetSchemaInfo()); EXPECT_EQ(store->GetSyncData(queryObj, SyncTimeRange {}, DataSizeSpecInfo {}, token, entries), E_OK); - EXPECT_EQ(entries.size(), RECORD_COUNT); // expect 2 records. in addition to that, there are 98 miss query data. EXPECT_EQ(token, nullptr); expectCount = 2; // expect 2 records. - count = 0; + EXPECT_EQ(entries.size(), expectCount); for (auto iter = entries.begin(); iter != entries.end(); ++iter) { - if (((*iter)->GetFlag() & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) { - count++; - } auto nextOne = std::next(iter, 1); if (nextOne != entries.end()) { EXPECT_LT((*iter)->GetTimestamp(), (*nextOne)->GetTimestamp()); @@ -883,7 +881,6 @@ HWTEST_F(DistributedDBRelationalGetDataTest, MissQuery1, TestSize.Level1) "INSERT INTO " + tableName + " VALUES(NULL, 4);", "INSERT INTO " + tableName + " VALUES(NULL, 5);", }; - const size_t RECORD_COUNT = sqls.size(); for (const auto &sql : sqls) { ASSERT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK); } @@ -895,10 +892,12 @@ HWTEST_F(DistributedDBRelationalGetDataTest, MissQuery1, TestSize.Level1) auto store = GetRelationalStore(); ASSERT_NE(store, nullptr); ContinueToken token = nullptr; + SyncTimeRange timeRange; QueryObject query(Query::Select(tableName).EqualTo("value", 2).Or().EqualTo("value", 3).Or().EqualTo("value", 4)); std::vector entries; - EXPECT_EQ(store->GetSyncData(query, SyncTimeRange {}, DataSizeSpecInfo {}, token, entries), E_OK); - EXPECT_EQ(entries.size(), RECORD_COUNT); + EXPECT_EQ(store->GetSyncData(query, timeRange, DataSizeSpecInfo {}, token, entries), E_OK); + timeRange.lastQueryTime = (*(entries.rbegin()))->GetTimestamp(); + EXPECT_EQ(entries.size(), 3U); // 3 for test /** * @tc.steps: step4. Put data into "data" table from deviceA for 10 times. @@ -943,8 +942,8 @@ HWTEST_F(DistributedDBRelationalGetDataTest, MissQuery1, TestSize.Level1) * @tc.expected: Succeed and the count is right. */ query = QueryObject(Query::Select(tableName).EqualTo("value", 2).Or().EqualTo("value", 3).Or().EqualTo("value", 4)); - EXPECT_EQ(store->GetSyncData(query, SyncTimeRange {}, DataSizeSpecInfo {}, token, entries), E_OK); - EXPECT_EQ(entries.size(), RECORD_COUNT); + EXPECT_EQ(store->GetSyncData(query, timeRange, DataSizeSpecInfo {}, token, entries), E_OK); + EXPECT_EQ(entries.size(), 3U); // 3 for test, 1 query data and 2 miss query data. /** * @tc.steps: step8. Put data into "data" table from deviceA for 10 times. @@ -1222,13 +1221,13 @@ HWTEST_F(DistributedDBRelationalGetDataTest, CompatibleData2, TestSize.Level1) ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option {}, g_delegate), DBStatus::OK); ASSERT_NE(g_delegate, nullptr); ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName), DBStatus::OK); - + sqlite3 *db = nullptr; ASSERT_EQ(sqlite3_open(g_storePath.c_str(), &db), SQLITE_OK); - + auto store = GetRelationalStore(); ASSERT_NE(store, nullptr); - + /** * @tc.steps: step1. Create distributed table from deviceA. * @tc.expected: Succeed, return OK. @@ -1236,7 +1235,7 @@ HWTEST_F(DistributedDBRelationalGetDataTest, CompatibleData2, TestSize.Level1) const DeviceID deviceID = "deviceA"; ASSERT_EQ(E_OK, SQLiteUtils::CreateSameStuTable(db, store->GetSchemaInfo().GetTable(g_tableName), DBCommon::GetDistributedTableName(deviceID, g_tableName))); - + /** * @tc.steps: step2. Alter "data" table and create distributed table again. * @tc.expected: Succeed. @@ -1247,19 +1246,91 @@ HWTEST_F(DistributedDBRelationalGetDataTest, CompatibleData2, TestSize.Level1) "ALTER TABLE " + g_tableName + " ADD COLUMN blob_type BLOB DEFAULT 123 not null;"; ASSERT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK); ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName), DBStatus::OK); - + /** * @tc.steps: step3. Check deviceA's distributed table. * @tc.expected: The create sql is correct. */ std::string expectSql = "CREATE TABLE naturalbase_rdb_aux_data_" "265a9c8c3c690cdfdac72acfe7a50f748811802635d987bb7d69dc602ed3794f(key integer NOT NULL PRIMARY KEY," - "value integer, integer_type integer, text_type text, real_type real, blob_type blob)"; + "value integer, integer_type integer NOT NULL DEFAULT 123, text_type text NOT NULL DEFAULT 'high_version', " + "real_type real NOT NULL DEFAULT 123.123456, blob_type blob NOT NULL DEFAULT 123)"; sql = "SELECT sql FROM sqlite_master WHERE tbl_name='" + DBConstant::RELATIONAL_PREFIX + g_tableName + "_" + DBCommon::TransferStringToHex(DBCommon::TransferHashString(deviceID)) + "';"; EXPECT_EQ(GetOneText(db, sql), expectSql); - + sqlite3_close(db); RefObject::DecObjRef(g_store); } -#endif \ No newline at end of file + +std::vector PrepareEntries(std::vector dataItems) +{ + std::vector entries; + int errCode = E_OK; + for (auto &item : dataItems) { + auto entry = new (std::nothrow) GenericSingleVerKvEntry(); + if (entry == nullptr) { + errCode = -E_OUT_OF_MEMORY; + LOGE("GetKvEntries failed, errCode:%d", errCode); + SingleVerKvEntry::Release(entries); + break; + } + entry->SetEntryData(std::move(item)); + entries.push_back(entry); + } + return entries; +} + +/** + * @tc.name: CompatibleData2 + * @tc.desc: Check compatibility. + * @tc.type: FUNC + * @tc.require: AR000GK58H + * @tc.author: lidongwei + */ +HWTEST_F(DistributedDBRelationalGetDataTest, PutSyncDataConflictDataTest001, TestSize.Level1) +{ + const DeviceID deviceID_A = "deviceA"; + const DeviceID deviceID_B = "deviceB"; + sqlite3 *db = RelationalTestUtils::CreateDataBase(g_storePath); + RelationalTestUtils::CreateDeviceTable(db, g_tableName, deviceID_B); + + DBStatus status = g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option {}, g_delegate); + EXPECT_EQ(status, DBStatus::OK); + ASSERT_NE(g_delegate, nullptr); + EXPECT_EQ(g_delegate->CreateDistributedTable(g_tableName), DBStatus::OK); + + auto store = const_cast(GetRelationalStore()); + ASSERT_NE(store, nullptr); + + RelationalTestUtils::ExecSql(db, "INSERT OR REPLACE INTO " + g_tableName + " (key,value) VALUES (1001,'VAL_1');"); + RelationalTestUtils::ExecSql(db, "INSERT OR REPLACE INTO " + g_tableName + " (key,value) VALUES (1002,'VAL_2');"); + RelationalTestUtils::ExecSql(db, "INSERT OR REPLACE INTO " + g_tableName + " (key,value) VALUES (1003,'VAL_3');"); + + DataSizeSpecInfo sizeInfo {MTU_SIZE, 50}; + ContinueToken token = nullptr; + QueryObject query(Query::Select(g_tableName)); + std::vector entries; + int errCode = store->GetSyncData(query, {}, sizeInfo, token, entries); + EXPECT_EQ(errCode, E_OK); + + errCode = store->PutSyncDataWithQuery(query, entries, deviceID_B); + EXPECT_EQ(errCode, E_OK); + GenericSingleVerKvEntry::Release(entries); + + QueryObject query2(Query::Select(g_tableName).EqualTo("key", 1001)); + std::vector entries2; + store->GetSyncData(query2, {}, sizeInfo, token, entries2); + + errCode = store->PutSyncDataWithQuery(query, entries2, deviceID_B); + EXPECT_EQ(errCode, E_OK); + GenericSingleVerKvEntry::Release(entries2); + + RefObject::DecObjRef(g_store); + + std::string deviceTable = DBCommon::GetDistributedTableName(deviceID_B, g_tableName); + EXPECT_EQ(RelationalTestUtils::CheckTableRecords(db, deviceTable), 3); + sqlite3_close_v2(db); +} + +#endif diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp index 711adc11ba56d0562f066341671f09ce3db43baf..f292d7878737ea4d62c618cbe35066de84a88c82 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp @@ -19,10 +19,14 @@ #include "message.h" #include "mock_auto_launch.h" #include "mock_communicator.h" +#include "mock_meta_data.h" #include "mock_single_ver_data_sync.h" #include "mock_single_ver_state_machine.h" #include "mock_sync_task_context.h" #include "virtual_single_ver_sync_db_Interface.h" +#ifdef DATA_SYNC_CHECK_003 +#include "virtual_relational_ver_sync_db_interface.h" +#endif using namespace testing::ext; using namespace testing; @@ -164,6 +168,32 @@ HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck004, TestSize.Level1) delete message; } +/** + * @tc.name: StateMachineCheck005 + * @tc.desc: Test machine recv errCode. + * @tc.type: FUNC + * @tc.require: AR000CCPOM + * @tc.author: zhangqiquan + */ +HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck005, TestSize.Level1) +{ + MockSingleVerStateMachine stateMachine; + MockSyncTaskContext syncTaskContext; + MockCommunicator communicator; + VirtualSingleVerSyncDBInterface dbSyncInterface; + Init(stateMachine, syncTaskContext, communicator, dbSyncInterface); + EXPECT_CALL(stateMachine, SwitchStateAndStep(_)).WillRepeatedly(Return()); + EXPECT_CALL(syncTaskContext, GetRequestSessionId()).WillRepeatedly(Return(0u)); + + std::initializer_list testCode = {-E_DISTRIBUTED_SCHEMA_CHANGED, -E_DISTRIBUTED_SCHEMA_NOT_FOUND}; + for (int errCode : testCode) { + stateMachine.DataRecvErrCodeHandle(0, errCode); + EXPECT_EQ(syncTaskContext.GetTaskErrCode(), errCode); + stateMachine.CallDataAckRecvErrCodeHandle(errCode, true); + EXPECT_EQ(syncTaskContext.GetTaskErrCode(), errCode); + } +} + /** * @tc.name: DataSyncCheck001 * @tc.desc: Test dataSync recv error ack. @@ -197,7 +227,45 @@ HWTEST_F(DistributedDBMockSyncModuleTest, DataSyncCheck002, TestSize.Level1) EXPECT_EQ(dataSync.AckPacketIdCheck(message), true); delete message; } +#ifdef DATA_SYNC_CHECK_003 +/** + * @tc.name: DataSyncCheck003 + * @tc.desc: Test dataSync recv notify ack. + * @tc.type: FUNC + * @tc.require: AR000CCPOM + * @tc.author: zhangqiquan + */ +HWTEST_F(DistributedDBMockSyncModuleTest, DataSyncCheck003, TestSize.Level1) +{ + MockSingleVerDataSync mockDataSync; + MockSyncTaskContext mockSyncTaskContext; + auto mockMetadata = std::make_shared(); + SyncTimeRange dataTimeRange = {1, 0, 1, 0}; + mockDataSync.CallUpdateSendInfo(dataTimeRange, &mockSyncTaskContext); + VirtualRelationalVerSyncDBInterface storage; + MockCommunicator communicator; + std::shared_ptr metadata = std::static_pointer_cast(mockMetadata); + mockDataSync.Initialize(&storage, &communicator, metadata, "deviceId"); + + DistributedDB::Message *message = new(std::nothrow) DistributedDB::Message(); + ASSERT_TRUE(message != nullptr); + DataAckPacket packet; + message->SetSequenceId(1); + message->SetCopiedObject(packet); + mockSyncTaskContext.SetQuerySync(true); + + EXPECT_CALL(*mockMetadata, GetLastQueryTime(_, _, _)).WillOnce(Return(E_OK)); + EXPECT_CALL(*mockMetadata, SetLastQueryTime(_, _, _)).WillOnce([&dataTimeRange](const std::string &queryIdentify, + const std::string &deviceId, const TimeStamp &timeStamp) { + EXPECT_EQ(timeStamp, dataTimeRange.endTime); + return E_OK; + }); + EXPECT_CALL(mockSyncTaskContext, SetOperationStatus(_)).WillOnce(Return()); + EXPECT_EQ(mockDataSync.TryContinueSync(&mockSyncTaskContext, message), -E_FINISHED); + delete message; +} +#endif /** * @tc.name: AutoLaunchCheck001 * @tc.desc: Test autoLaunch close connection. diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp index 10000da8aeee73b1453a5eeff6eea55b3512dba1..4e8f7c454cebb7e4e6a8f9543d38579fee6d1397 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp @@ -549,4 +549,79 @@ HWTEST_F(DistributedDBSingleVerMultiUserTest, MultiUser005, TestSize.Level0) */ EXPECT_TRUE(KvStoreDelegateManager::NotifyUserChanged() == OK); CloseStore(); +} + +/** + * @tc.name: MultiUser006 + * @tc.desc: test NotifyUserChanged and close db concurrently + * @tc.type: FUNC + * @tc.require: AR000E8S2T + * @tc.author: zhuwentao + */ +HWTEST_F(DistributedDBSingleVerMultiUserTest, MultiUser006, TestSize.Level0) +{ + /** + * @tc.steps: step1. set SyncActivationCheckCallback and only userId1 can active + */ + g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback2); + /** + * @tc.steps: step2. openstore1 in dual tuple sync mode and openstore2 in normal sync mode + * @tc.expected: step2. only user2 sync mode is active + */ + + OpenStore1(true); + OpenStore2(false); + /** + * @tc.steps: step3. set SyncActivationCheckCallback and only userId2 can active + */ + g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback1); + /** + * @tc.steps: step4. call NotifyUserChanged and close db concurrently + * @tc.expected: step4. return OK + */ + thread subThread([&]() { + EXPECT_TRUE(KvStoreDelegateManager::NotifyUserChanged() == OK); + }); + subThread.detach(); + ASSERT_EQ(g_mgr1.CloseKvStore(g_kvDelegatePtr1), OK); + g_kvDelegatePtr1 = nullptr; + CloseStore(); +} + +/** + * @tc.name: MultiUser007 + * @tc.desc: test NotifyUserChanged and rekey db concurrently + * @tc.type: FUNC + * @tc.require: AR000E8S2T + * @tc.author: zhuwentao + */ +HWTEST_F(DistributedDBSingleVerMultiUserTest, MultiUser007, TestSize.Level0) +{ + /** + * @tc.steps: step1. set SyncActivationCheckCallback and only userId1 can active + */ + g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback2); + /** + * @tc.steps: step2. openstore1 in dual tuple sync mode and openstore2 in normal sync mode + * @tc.expected: step2. only user2 sync mode is active + */ + + OpenStore1(true); + OpenStore2(false); + /** + * @tc.steps: step3. set SyncActivationCheckCallback and only userId2 can active + */ + g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback1); + /** + * @tc.steps: step2. call NotifyUserChanged and close db concurrently + * @tc.expected: step2. return OK + */ + CipherPassword passwd; + thread subThread([&]() { + EXPECT_TRUE(KvStoreDelegateManager::NotifyUserChanged() == OK); + }); + subThread.detach(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + EXPECT_TRUE(g_kvDelegatePtr1->Rekey(passwd) == OK); + CloseStore(); } \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_meta_data.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_meta_data.h new file mode 100644 index 0000000000000000000000000000000000000000..4be11ce3da80d1f21dda0200c71eb85cb1ab9472 --- /dev/null +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_meta_data.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MOCK_META_DATA_H +#define MOCK_META_DATA_H + +#include +#include "meta_data.h" + +namespace DistributedDB { +class MockMetadata : public Metadata { +public: + MOCK_METHOD3(SetLastQueryTime, int(const std::string &, const std::string &, const TimeStamp &)); + + MOCK_METHOD3(GetLastQueryTime, int(const std::string &, const std::string &, TimeStamp &)); +}; +} // namespace DistributedDB +#endif // #define MOCK_META_DATA_H \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_data_sync.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_data_sync.h index 2775130e2feee23ee54e55c7e285f57f951129ce..2c74476e54ba5b8361e7f9eadf317b1ec125649d 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_data_sync.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_data_sync.h @@ -32,6 +32,11 @@ public: return SingleVerDataSync::PullRequestStart(context); } + void CallUpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context) + { + SingleVerDataSync::UpdateSendInfo(dataTimeRange, context); + } + MOCK_METHOD1(RemoveDeviceDataIfNeed, int(SingleVerSyncTaskContext *)); }; } // namespace DistributedDB diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_state_machine.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_state_machine.h index 45711c719a7935638ce78cda649baaef06da3ac3..b8dc940cc8791ef02c1202e28a539b0108d4c1a3 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_state_machine.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_single_ver_state_machine.h @@ -37,6 +37,11 @@ public: return SingleVerSyncStateMachine::TimeMarkSyncRecv(inMsg); } + void CallDataAckRecvErrCodeHandle(int errCode, bool handleError) + { + SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(errCode, handleError); + } + MOCK_METHOD1(SwitchStateAndStep, void(uint8_t)); MOCK_METHOD0(PrepareNextSyncTask, int(void)); diff --git a/services/distributeddataservice/test/moduletest/common/distributeddb/src/distributeddb_nb_create_test.cpp b/services/distributeddataservice/test/moduletest/common/distributeddb/src/distributeddb_nb_create_test.cpp index 24520b4684a7e351334f6cfb4c262d3dca16ee53..6d7d95c2c5531675a0b9676e74b2ceda2bddb6ca 100755 --- a/services/distributeddataservice/test/moduletest/common/distributeddb/src/distributeddb_nb_create_test.cpp +++ b/services/distributeddataservice/test/moduletest/common/distributeddb/src/distributeddb_nb_create_test.cpp @@ -1195,7 +1195,7 @@ HWTEST_F(DistributeddbNbCreateTest, MemoryDb003, TestSize.Level0) EXPECT_EQ(manager->CloseKvStore(delegate), OK); delegate = nullptr; - EXPECT_EQ(manager->DeleteKvStore(STORE_ID_1), NOT_FOUND); + EXPECT_EQ(manager->DeleteKvStore(STORE_ID_1), INVALID_ARGS); ReleaseManager(manager); }