From 21fee69cd853075b4e83ae11df37b08300de786b Mon Sep 17 00:00:00 2001 From: Vinoth Date: Wed, 19 Aug 2020 10:19:36 +0800 Subject: [PATCH] Transactional recovery to fix issues in standby and update from core --- .../storage/mot/core/src/storage/table.cpp | 83 +++-- .../storage/mot/core/src/storage/table.h | 3 +- .../system/checkpoint/checkpoint_manager.cpp | 17 +- .../system/checkpoint/checkpoint_manager.h | 10 +- .../system/common/commit_sequence_number.cpp | 2 +- .../system/common/commit_sequence_number.h | 4 + .../src/system/recovery/recovery_manager.cpp | 74 +++- .../src/system/recovery/recovery_manager.h | 59 +++- .../core/src/system/recovery/recovery_ops.cpp | 322 ++++++++++++------ .../recovery/transaction_buffer_iterator.cpp | 3 +- .../recovery/transaction_buffer_iterator.h | 4 +- .../mot/core/src/system/transaction/txn.cpp | 21 +- .../mot/core/src/system/transaction/txn.h | 23 +- .../transaction_logger/redo_log_writer.cpp | 16 +- .../storage/mot/fdw_adapter/src/mot_fdw.cpp | 17 +- .../mot/fdw_adapter/src/mot_fdw_xlog.cpp | 3 +- .../multi_standby_single/failover_mot.sh | 3 +- .../inc_build_failover_mot.sh | 25 +- .../inc_build_reconnect_mot.sh | 15 +- src/test/ha/util.sh | 29 ++ 20 files changed, 534 insertions(+), 199 deletions(-) diff --git a/src/gausskernel/storage/mot/core/src/storage/table.cpp b/src/gausskernel/storage/mot/core/src/storage/table.cpp index 17f8255692..35be7565a8 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.cpp +++ b/src/gausskernel/storage/mot/core/src/storage/table.cpp @@ -24,6 +24,7 @@ #include #include +#include #include "table.h" #include "mot_engine.h" #include "utilities.h" @@ -127,7 +128,9 @@ bool Table::InitRowPool(bool local) void Table::ClearThreadMemoryCache() { for (int i = 0; i < m_numIndexes; i++) { - m_indexes[i]->ClearThreadMemoryCache(); + if (m_indexes[i] != nullptr) { + m_indexes[i]->ClearThreadMemoryCache(); + } } if (m_rowPool != nullptr) { @@ -189,9 +192,13 @@ bool Table::UpdatePrimaryIndex(Index* index, TxnManager* txn, uint32_t tid) { if (this->m_primaryIndex) { if (txn == nullptr) { - DeletePrimaryIndex(this->m_primaryIndex); + if (DeletePrimaryIndex(this->m_primaryIndex) != RC_OK) { + return false; + } } else { - txn->DropIndex(this->m_primaryIndex); + if (txn->DropIndex(this->m_primaryIndex) != RC_OK) { + return false; + } } } else { if (m_numIndexes == 0) { @@ -449,7 +456,13 @@ RC Table::InsertRow(Row* row, TxnManager* txn) // set primary key row->SetRowId(txn->GetSurrogateKey()); + mot_vector cleanupKeys; key = txn->GetTxnKey(ix); + if (key == nullptr) { + MOT_REPORT_ERROR(MOT_ERROR_OOM, "Insert Row", "Failed to create primary key"); + return RC_MEMORY_ALLOCATION_ERROR; + } + cleanupKeys.push_back(key); if (ix->IsFakePrimary()) { surrogateprimaryKey = htobe64(row->GetRowId()); row->SetSurrogateKey(surrogateprimaryKey); @@ -464,10 +477,19 @@ RC Table::InsertRow(Row* row, TxnManager* txn) for (uint16_t i = 1; i < numIndexes; i++) { ix = GetSecondaryIndex(i); key = txn->GetTxnKey(ix); + if (key == nullptr) { + MOT_REPORT_ERROR( + MOT_ERROR_OOM, "Insert Row", "Failed to create key for secondary index %s", ix->GetName().c_str()); + std::for_each(cleanupKeys.begin(), cleanupKeys.end(), [](Key*& key) { MOTCurrTxn->DestroyTxnKey(key); }); + MOTCurrTxn->Rollback(); + return RC_MEMORY_ALLOCATION_ERROR; + } + cleanupKeys.push_back(key); ix->BuildKey(this, row, key); txn->GetNextInsertItem()->SetItem(row, ix, key); } + cleanupKeys.clear(); // subsequent call to insert row takes care of key cleanup return txn->InsertRow(row); } @@ -675,21 +697,23 @@ bool Table::ModifyColumnSize(const uint32_t& id, const uint64_t& size) } // column size is uint64_t but tuple size is uint32_t, so we must check for overflow - if (size >= (uint64_t)std::numeric_limitsm_tupleSize)>::max()) + if (size >= (uint64_t)std::numeric_limitsm_tupleSize)>::max()) { return false; + } uint64_t oldColSize = m_columns[id]->m_size; - uint64_t newTupleSize = ((uint64_t)m_tupleSize) - oldColSize + size; - if (newTupleSize >= (uint64_t)std::numeric_limitsm_tupleSize)>::max()) + if (newTupleSize >= (uint64_t)std::numeric_limitsm_tupleSize)>::max()) { return false; + } m_tupleSize = newTupleSize; m_columns[id]->m_size = size; // now we need to fix the offset of all subsequent fields - for (uint32_t i = id + 1; i < m_fieldCnt; ++i) + for (uint32_t i = id + 1; i < m_fieldCnt; ++i) { m_columns[id]->m_offset = m_columns[id]->m_offset - oldColSize + size; + } return true; } @@ -909,13 +933,17 @@ char* Table::DesrializeMeta(char* dataIn, CommonIndexMeta& meta) return dataIn; } -RC Table::CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid) +RC Table::CreateIndexFromMeta( + CommonIndexMeta& meta, bool primary, uint32_t tid, bool addToTable /* = true */, Index** outIndex /* = nullptr */) { IndexTreeFlavor flavor = DEFAULT_TREE_FLAVOR; Index* ix = nullptr; + MOT_LOG_DEBUG("%s: %s (%s)", __func__, meta.m_name.c_str(), primary ? "primary" : "secondary"); - if (meta.m_indexingMethod == IndexingMethod::INDEXING_METHOD_TREE) + if (meta.m_indexingMethod == IndexingMethod::INDEXING_METHOD_TREE) { flavor = GetGlobalConfiguration().m_indexTreeFlavor; + } + ix = IndexFactory::CreateIndex(meta.m_indexOrder, meta.m_indexingMethod, flavor); if (ix == nullptr) { MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, @@ -924,6 +952,7 @@ RC Table::CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid) m_longTableName.c_str()); return RC_ERROR; } + ix->SetUnique(meta.m_unique); if (!ix->SetNumTableFields(meta.m_numTableFields)) { MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, @@ -934,30 +963,40 @@ RC Table::CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid) delete ix; return RC_ERROR; } - for (int i = 0; i < meta.m_numKeyFields; i++) + + for (int i = 0; i < meta.m_numKeyFields; i++) { ix->SetLenghtKeyFields(i, meta.m_columnKeyFields[i], meta.m_lengthKeyFields[i]); + } ix->SetFakePrimary(meta.m_fake); ix->SetNumIndexFields(meta.m_numKeyFields); ix->SetTable(this); - ix->SetIsCommited(true); if (ix->IndexInit(meta.m_keyLength, meta.m_unique, meta.m_name, nullptr) != RC_OK) { MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to initialize index"); delete ix; return RC_ERROR; } - if (primary) { - if (UpdatePrimaryIndex(ix, nullptr, tid) != true) { - MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add primary index"); - delete ix; - return RC_ERROR; - } - } else { - if (AddSecondaryIndex(ix->GetName(), (Index*)ix, nullptr, tid) != true) { - MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add secondary index"); - delete ix; - return RC_ERROR; + + if (addToTable) { + // In transactional recovery we set index as committed only during commit. + ix->SetIsCommited(true); + if (primary) { + if (UpdatePrimaryIndex(ix, nullptr, tid) != true) { + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add primary index"); + delete ix; + return RC_ERROR; + } + } else { + if (AddSecondaryIndex(ix->GetName(), (Index*)ix, nullptr, tid) != true) { + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add secondary index"); + delete ix; + return RC_ERROR; + } } } + + if (outIndex != nullptr) { + *outIndex = ix; + } return RC_OK; } diff --git a/src/gausskernel/storage/mot/core/src/storage/table.h b/src/gausskernel/storage/mot/core/src/storage/table.h index 65647a553c..036b350884 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.h +++ b/src/gausskernel/storage/mot/core/src/storage/table.h @@ -944,7 +944,8 @@ public: * @param tid the thread identifier * @return RC error code. */ - RC CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid); + RC CreateIndexFromMeta( + CommonIndexMeta& meta, bool primary, uint32_t tid, bool addToTable = true, Index** outIndex = nullptr); /** * @brief returns the serialized size of a table diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp index a6db9b93f4..f3335943de 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp @@ -281,6 +281,11 @@ void CheckpointManager::TransactionCompleted(TxnManager* txn) } else { // previous phase m_counters[!m_cntBit].fetch_sub(1); } + if (txn->m_replayLsn != 0 && MOTEngine::GetInstance()->IsRecovering()) { + // Update the last replay LSN in recovery manager in case of redo replay. + // This is needed for getting the last replay LSN during checkpoint in standby. + GetRecoveryManager()->SetLastReplayLsn(txn->GetReplayLsn()); + } m_lock.RdUnlock(); if (m_counters[!m_cntBit] == 0 && IsAutoCompletePhase()) { @@ -339,6 +344,15 @@ void CheckpointManager::MoveToNextPhase() // relevant for asynchronous logging or group commit m_redoLogHandler->Flush(); } + if (MOTEngine::GetInstance()->IsRecovering()) { + // We are moving from RESOLVE to CAPTURE phase. No transaction is allowed to commit + // as this point. This is the point where we take snapshot and any rows committed + // after this point will not be included in this checkpoint. + // Get the current last replay LSN from recovery manager and use it as m_lastReplayLsn + // for the current checkpoint. If the system recovers from disk after this checkpoint, + // it is safe to ignore any redo replay before this LSN. + SetLastReplayLsn(GetRecoveryManager()->GetLastReplayLsn()); + } } // there are no open transactions from previous phase, we can move forward to next phase @@ -760,7 +774,8 @@ bool CheckpointManager::CreateCheckpointId(uint64_t& checkpointId) bool CheckpointManager::GetCheckpointDirName(std::string& dirName) { - if (!CheckpointUtils::SetDirName(dirName, m_id)) { + uint64_t checkpointId = GetRecoveryManager()->GetCheckpointId(); + if (!CheckpointUtils::SetDirName(dirName, checkpointId)) { MOT_LOG_ERROR("SetDirName failed"); return false; } diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h index 7f9fad1cf5..29ff09b918 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h @@ -164,11 +164,6 @@ public: return m_id; } - void SetLastReplayLsn(uint64_t lsn) - { - m_lastReplayLsn = lsn; - } - uint64_t GetLastReplayLsn() { return m_lastReplayLsn; @@ -296,6 +291,11 @@ private: return m_lsn; } + void SetLastReplayLsn(uint64_t lsn) + { + m_lastReplayLsn = lsn; + } + static const char* PhaseToString(CheckpointPhase phase); inline void SwapAvailableAndNotAvailable() diff --git a/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.cpp b/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.cpp index 6ce7d292e2..943991952b 100644 --- a/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.cpp +++ b/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.cpp @@ -31,7 +31,7 @@ namespace MOT { DECLARE_LOGGER(CSNManager, System); -CSNManager::CSNManager() : m_csn(0) +CSNManager::CSNManager() : m_csn(MOT_INITIAL_CSN) {} CSNManager::~CSNManager() diff --git a/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.h b/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.h index ce302d984d..f6a9049692 100644 --- a/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.h +++ b/src/gausskernel/storage/mot/core/src/system/common/commit_sequence_number.h @@ -28,6 +28,10 @@ #include #include "global.h" +#define MOT_INVALID_CSN ((uint64_t)-1) +#define MOT_RECOVERED_TABLE_CSN ((uint64_t)0) +#define MOT_INITIAL_CSN ((uint64_t)1) + namespace MOT { /** * @class CSNManager diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp index a047a08948..56fb03677e 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp @@ -327,6 +327,7 @@ bool RecoveryManager::RecoverTableRows( break; } + BeginTransaction(); InsertRow(tableId, fileHeader.m_exId, keyData, @@ -338,8 +339,13 @@ bool RecoveryManager::RecoverTableRows( m_sState, status, entry.m_rowId); - if (status != RC_OK) + status = CommitTransaction(entry.m_csn); + if (status != RC_OK) { + MOT_LOG_ERROR( + "Failed to commit row recovery from checkpoint: %s (error code: %d)", RcToString(status), (int)status); break; + } + MOT_LOG_DEBUG("Inserted into table %u row with CSN %" PRIu64, tableId, entry.m_csn); if (entry.m_csn > maxCsn) maxCsn = entry.m_csn; } @@ -412,7 +418,6 @@ void RecoveryManager::CpWorkerFunc() bool RecoveryManager::RecoverFromCheckpoint() { - uint64_t lastReplayLsn = 0; m_checkpointWorkerStop = false; if (!m_tasksList.empty()) { MOT_LOG_ERROR("RecoveryManager:: tasksQueue is not empty!"); @@ -426,7 +431,7 @@ bool RecoveryManager::RecoverFromCheckpoint() if (IsCheckpointValid(CheckpointControlFile::GetCtrlFile()->GetId())) { m_checkpointId = CheckpointControlFile::GetCtrlFile()->GetId(); m_lsn = CheckpointControlFile::GetCtrlFile()->GetLsn(); - lastReplayLsn = CheckpointControlFile::GetCtrlFile()->GetLastReplayLsn(); + m_lastReplayLsn = CheckpointControlFile::GetCtrlFile()->GetLastReplayLsn(); } else { MOT_LOG_ERROR("RecoveryManager:: no valid checkpoint exist"); OnError(RecoveryManager::ErrCodes::CP_SETUP, "RecoveryManager:: no valid checkpoint exist"); @@ -440,11 +445,15 @@ bool RecoveryManager::RecoverFromCheckpoint() return false; } - if (m_lsn >= lastReplayLsn) { - MOT_LOG_DEBUG("Recovery LSN Check: will use lsn: %lu (last replay lsn: %lu)", m_lsn, lastReplayLsn); - } else { - m_lsn = lastReplayLsn; - MOT_LOG_WARN("Recovery LSN Check: modifying lsn to %lu (last replay lsn)", m_lsn); + if (m_checkpointId != CheckpointControlFile::invalidId) { + if (m_lsn >= m_lastReplayLsn) { + MOT_LOG_INFO( + "Recovery LSN Check will use the LSN (%lu), ignoring the lastReplayLSN (%lu)", m_lsn, m_lastReplayLsn); + } else { + MOT_LOG_WARN( + "Recovery LSN Check will use the lastReplayLSN (%lu), ignoring the LSN (%lu)", m_lastReplayLsn, m_lsn); + m_lsn = m_lastReplayLsn; + } } int taskFillStat = FillTasksFromMapFile(); @@ -467,6 +476,7 @@ bool RecoveryManager::RecoverFromCheckpoint() m_tableIds.size(), m_checkpointId); + BeginTransaction(); for (auto it = m_tableIds.begin(); it != m_tableIds.end(); ++it) { if (IsRecoveryMemoryLimitReached(NUM_REDO_RECOVERY_THREADS)) { MOT_LOG_ERROR("Memory hard limit reached. Cannot recover datanode"); @@ -481,6 +491,12 @@ bool RecoveryManager::RecoverFromCheckpoint() return false; } } + RC status = CommitTransaction(MOT_RECOVERED_TABLE_CSN); + if (status != RC_OK) { + MOT_LOG_ERROR("Failed to commit table recovery: %s (error code: %d)", RcToString(status), (int)status); + OnError(RecoveryManager::ErrCodes::CP_TABLE_COMMIT, "Failed to commit table recovery from checkpoint"); + return false; + } std::vector recoveryThreadPool; for (uint32_t i = 0; i < m_numWorkers; ++i) { @@ -593,20 +609,18 @@ void RecoveryManager::FreeRedoSegment(LogSegment* segment) delete segment; } -bool RecoveryManager::ApplyLogSegmentFromData(uint64_t redoLsn, char* data, size_t len) +bool RecoveryManager::ApplyRedoLog(uint64_t redoLsn, char* data, size_t len) { if (redoLsn < m_lsn) { // ignore old redo records which are prior to our checkpoint LSN - MOT_LOG_DEBUG( - "ApplyLogSegmentFromData - ignoring old redo record. Checkpoint LSN: %lu, redo LSN: %lu", m_lsn, redoLsn); + MOT_LOG_DEBUG("ApplyRedoLog - ignoring old redo record. Checkpoint LSN: %lu, redo LSN: %lu", m_lsn, redoLsn); return true; } - MOTEngine::GetInstance()->GetCheckpointManager()->SetLastReplayLsn(redoLsn); - return ApplyLogSegmentFromData(data, len); + return ApplyLogSegmentFromData(data, len, redoLsn); } -bool RecoveryManager::ApplyLogSegmentFromData(char* data, size_t len) +bool RecoveryManager::ApplyLogSegmentFromData(char* data, size_t len, uint64_t replayLsn /* = 0 */) { bool result = false; char* curData = data; @@ -614,7 +628,7 @@ bool RecoveryManager::ApplyLogSegmentFromData(char* data, size_t len) while (data + len > curData) { // obtain LogSegment from buffer RedoLogTransactionIterator iterator(curData, len); - LogSegment* segment = iterator.AllocRedoSegment(); + LogSegment* segment = iterator.AllocRedoSegment(replayLsn); if (segment == nullptr) { MOT_LOG_ERROR("ApplyLogSegmentFromData - failed to allocate segment"); return false; @@ -741,18 +755,41 @@ RC RecoveryManager::RedoSegment(LogSegment* segment, uint64_t csn, uint64_t tran bool is2pcRecovery = !MOTEngine::GetInstance()->IsRecovering(); uint8_t* endPosition = (uint8_t*)(segment->m_data + segment->m_len); uint8_t* operationData = (uint8_t*)(segment->m_data); + bool txnStarted = false; + bool wasCommit = false; while (operationData < endPosition) { - // redolog recovery - single threaded + // redo log recovery - single threaded if (IsRecoveryMemoryLimitReached(NUM_REDO_RECOVERY_THREADS)) { status = RC_ERROR; MOT_LOG_ERROR("Memory hard limit reached. Cannot recover datanode"); break; } + // begin transaction on-demand + if (!txnStarted) { + if (!BeginTransaction(segment->m_replayLsn)) { + MOT_REPORT_ERROR(MOT_ERROR_RESOURCE_LIMIT, "Recover Redo Segment", "Cannot start a new transaction"); + return RC_ERROR; + } else { + txnStarted = true; + } + } + if (!is2pcRecovery) { - operationData += RecoverLogOperation(operationData, csn, transactionId, MOTCurrThreadId, m_sState, status); - GcManager* gc = MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager()->GetGcSession(); + operationData += + RecoverLogOperation(operationData, csn, transactionId, MOTCurrThreadId, m_sState, status, wasCommit); + // check operation result status + if (status != RC_OK) { + MOT_REPORT_ERROR(MOT_ERROR_RESOURCE_LIMIT, "Recover Redo Segment", "Failed to recover redo segment"); + break; + } + // update transactional state + if (wasCommit) { + txnStarted = false; + } + + GcManager* gc = MOTCurrTxn->GetGcSession(); if (m_numRedoOps == 0 && gc != nullptr) { gc->GcStartTxn(); } @@ -1221,6 +1258,7 @@ bool RecoveryManager::DeserializeInProcessTxns(int fd, uint64_t numEntries) break; } + segment->m_replayLsn = 0; erc = memcpy_s(&segment->m_len, sizeof(size_t), buf, sizeof(size_t)); securec_check(erc, "\0", "\0"); segment->m_data = new (std::nothrow) char[segment->m_len]; diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h index bf55177334..992dc4c22e 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h @@ -53,7 +53,10 @@ public: CP_RECOVERY = 3, XLOG_SETUP = 4, XLOG_RECOVERY = 5, - SURROGATE = 6 + SURROGATE = 6, + CP_TABLE_COMMIT = 7, + CP_ROW_COMMIT = 8, + XLOG_TXN_COMMIT = 9 }; enum RecoveryOpState { COMMIT = 1, ABORT = 2, TPC_APPLY = 3, TPC_COMMIT = 4, TPC_ABORT = 5 }; @@ -174,6 +177,7 @@ public: m_recoverFromCkptDone(false), m_checkpointId(0), m_lsn(0), + m_lastReplayLsn(0), m_numWorkers(GetGlobalConfiguration().m_checkpointRecoveryWorkers), m_tid(0), m_maxRecoveredCsn(0), @@ -488,7 +492,7 @@ public: * transactions map and operate on it * @return Boolean value denoting success or failure. */ - bool ApplyLogSegmentFromData(char* data, size_t len); + bool ApplyLogSegmentFromData(char* data, size_t len, uint64_t replayLsn = 0); /** * @brief attempts to insert a data chunk into the in-process @@ -497,7 +501,7 @@ public: * ignored. * @return Boolean value denoting success or failure. */ - bool ApplyLogSegmentFromData(uint64_t redoLsn, char* data, size_t len); + bool ApplyRedoLog(uint64_t redoLsn, char* data, size_t len); /** * @brief performs a commit on an in-process transaction, * @return Boolean value denoting success or failure to commit. @@ -575,6 +579,11 @@ public: m_checkpointId = id; } + inline uint64_t GetCheckpointId() const + { + return m_checkpointId; + } + /** * @brief applies a failed 2pc transaction according to its type. * a detailed info is described in the function's implementation. @@ -608,6 +617,18 @@ public: m_tableDeletesStat[t]++; } + inline void SetLastReplayLsn(uint64_t lastReplayLsn) + { + if (m_lastReplayLsn < lastReplayLsn) { + m_lastReplayLsn = lastReplayLsn; + } + } + + inline uint64_t GetLastReplayLsn() const + { + return m_lastReplayLsn; + } + void ClearTableCache(); LogStats* m_logStats; @@ -670,11 +691,12 @@ private: * @param transactionId the transaction id * @param tid the thread id of the recovering thread * @param sState the returned surrogate state of this operation - * @param status the returned status of the operation + * @param[out] status the returned status of the operation + * @param[out] Was this a commit operation (required for managing transactional state). * @return Int value denoting the number of bytes recovered */ - static uint32_t RecoverLogOperation( - uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid, SurrogateState& sState, RC& status); + static uint32_t RecoverLogOperation(uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid, + SurrogateState& sState, RC& status, bool& wasCommit); /** * @brief performs an insert operation of a data buffer @@ -727,7 +749,16 @@ private: * @param tid the thread id of the recovering thread * @return Int value denoting the number of bytes recovered */ - static uint32_t RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid); + static uint32_t RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid, RC& status); + + /** + * @brief performs a rollback operation of a data buffer + * @param data the buffer to recover. + * @param csn The CSN of the operation. + * @param tid the thread id of the recovering thread + * @return Int value denoting the number of bytes recovered + */ + static uint32_t RecoverLogOperationRollback(uint8_t* data, uint64_t csn, uint32_t tid, RC& status); /** * @brief performs a create table operation from a data buffer @@ -983,6 +1014,18 @@ private: */ bool IsRecoveryMemoryLimitReached(uint32_t numThreads); + /** + * @brief Starts a new transaction for recovery operations. + * @param replayLsn the redo LSN for this transaction during replay. + */ + bool BeginTransaction(uint64_t replayLsn = 0); + + /** @brief Commits the current recovery transaction. */ + RC CommitTransaction(uint64_t csn); + + /** @brief Rolls back the current recovery transaction. */ + RC RollbackTransaction(); + /** * @brief a helper to extract a type from a buffer * @param data the data buffer to extract from. @@ -1017,6 +1060,8 @@ private: uint64_t m_lsn; + uint64_t m_lastReplayLsn; + std::string m_workingDir; std::set m_tableIds; diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp index f599f4839c..36a3d1f194 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp @@ -28,14 +28,16 @@ #include "checkpoint_utils.h" #include "bitmapset.h" #include "column.h" +#include "txn_insert_action.h" #include +#include namespace MOT { DECLARE_LOGGER(RecoveryOps, Recovery); -uint32_t RecoveryManager::RecoverLogOperation( - uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid, SurrogateState& sState, RC& status) +uint32_t RecoveryManager::RecoverLogOperation(uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid, + SurrogateState& sState, RC& status, bool& wasCommit) { OperationCode opCode = *static_cast((void*)data); switch (opCode) { @@ -59,11 +61,17 @@ uint32_t RecoveryManager::RecoverLogOperation( return RecoverLogOperationTruncateTable(data, status, COMMIT); case COMMIT_TX: case COMMIT_PREPARED_TX: - return RecoverLogOperationCommit(data, csn, tid); + wasCommit = true; + return RecoverLogOperationCommit(data, csn, tid, status); case PARTIAL_REDO_TX: case PREPARE_TX: return sizeof(EndSegmentBlock); + case ROLLBACK_TX: + case ROLLBACK_PREPARED_TX: + return RecoverLogOperationRollback(data, csn, tid, status); default: + MOT_LOG_ERROR("Unknown recovery redo record op-code: %u", (unsigned)opCode); + status = RC_ERROR; return 0; } } @@ -173,7 +181,7 @@ uint32_t RecoveryManager::RecoverLogOperationDropTable(uint8_t* data, RC& status status = RC_ERROR; break; } - return sizeof(OperationCode) + sizeof(uint32_t); + return sizeof(OperationCode) + sizeof(uint64_t); } uint32_t RecoveryManager::RecoverLogOperationCreateIndex(uint8_t* data, uint32_t tid, RC& status, RecoveryOpState state) @@ -197,7 +205,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateIndex(uint8_t* data, uint32_t status = RC_ERROR; break; } - return sizeof(OperationCode) + sizeof(bufSize) + sizeof(uint32_t) + bufSize; // sizeof(uint32_t) is for tableId + return sizeof(OperationCode) + sizeof(bufSize) + sizeof(uint64_t) + bufSize; // sizeof(uint64_t) is for tableId } uint32_t RecoveryManager::RecoverLogOperationDropIndex(uint8_t* data, RC& status, RecoveryOpState state) @@ -220,11 +228,11 @@ uint32_t RecoveryManager::RecoverLogOperationDropIndex(uint8_t* data, RC& status status = RC_ERROR; break; } - uint32_t tableId = 0; + uint64_t tableId = 0; size_t nameLen = 0; Extract(data, tableId); Extract(data, nameLen); - return sizeof(OperationCode) + sizeof(uint32_t) + sizeof(size_t) + nameLen; + return sizeof(OperationCode) + sizeof(uint64_t) + sizeof(size_t) + nameLen; } uint32_t RecoveryManager::RecoverLogOperationTruncateTable(uint8_t* data, RC& status, RecoveryOpState state) @@ -246,7 +254,7 @@ uint32_t RecoveryManager::RecoverLogOperationTruncateTable(uint8_t* data, RC& st status = RC_ERROR; break; } - return sizeof(OperationCode) + sizeof(uint32_t); + return sizeof(OperationCode) + sizeof(uint64_t); } uint32_t RecoveryManager::RecoverLogOperationInsert( @@ -319,7 +327,7 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn, return 0; } key->CpKey((const uint8_t*)keyData, keyLength); - Row* row = index->IndexRead(key, tid); + Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key); if (row == nullptr) { /// row not found... @@ -433,140 +441,144 @@ uint32_t RecoveryManager::RecoverLogOperationDelete(uint8_t* data, uint64_t csn, return sizeof(OperationCode) + sizeof(tableId) + sizeof(exId) + sizeof(keyLength) + keyLength; } -uint32_t RecoveryManager::RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid) +uint32_t RecoveryManager::RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid, RC& status) { // OperationCode + CSN + transaction_type + commit_counter + transaction_id if (MOT::GetRecoveryManager()->m_logStats != nullptr) MOT::GetRecoveryManager()->m_logStats->m_tcls++; + status = MOT::GetRecoveryManager()->CommitTransaction(csn); + if (status != RC_OK) { + MOT_LOG_ERROR("Failed to commit row recovery from log: %s (error code: %d)", RcToString(status), (int)status); + } return sizeof(EndSegmentBlock); } -void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, char* rowData, - uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId, bool insertLocked) +uint32_t RecoveryManager::RecoverLogOperationRollback(uint8_t* data, uint64_t csn, uint32_t tid, RC& status) { - Table* table = nullptr; - if (!GetRecoveryManager()->FetchTable(tableId, table)) { - status = RC_ERROR; - MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "RecoveryManager::insertRow", "Table %llu does not exist", tableId); - return; + // OperationCode + CSN + transaction_type + commit_counter + transaction_id + status = MOT::GetRecoveryManager()->RollbackTransaction(); + if (status != RC_OK) { + MOT_LOG_ERROR("Failed to rollback row recovery from log: %s (error code: %d)", RcToString(status), (int)status); } + return sizeof(EndSegmentBlock); +} - uint64_t tableExId = table->GetTableExId(); - if (tableExId != exId) { +void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, char* rowData, + uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId, bool insertLocked) +{ + Table* table = MOTCurrTxn->GetTableByExternalId(exId); + if (table == nullptr) { status = RC_ERROR; - MOT_REPORT_ERROR( - MOT_ERROR_INTERNAL, "Recovery Manager Insert Row", "exId mismatch: my %llu - pkt %llu", tableExId, exId); + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recover Insert Row", "Table %" PRIu64 " does not exist", exId); return; } Row* row = table->CreateNewRow(); - if (row == nullptr) { // OA: check result before using pointer + if (row == nullptr) { status = RC_ERROR; - MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create row"); + MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recover Insert Row", "Failed to create row"); return; } row->CopyData((const uint8_t*)rowData, rowLen); row->SetCommitSequenceNumber(csn); + row->SetRowId(rowId); if (insertLocked == true) { row->SetTwoPhaseMode(true); row->m_rowHeader.Lock(); } - uint64_t surrogateprimaryKey = 0; - uint8_t* keyBytes = nullptr; Key* key = nullptr; MOT::Index* ix = nullptr; - uint32_t numIndexes = table->GetNumIndexes(); + mot_vector cleanupKeys; ix = table->GetPrimaryIndex(); - key = ix->CreateNewKey(); + key = MOTCurrTxn->GetTxnKey(ix); if (key == nullptr) { table->DestroyRow(row); status = RC_ERROR; - MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create key"); + MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recover Insert Row", "Failed to create primary key"); return; } + cleanupKeys.push_back(key); if (ix->IsFakePrimary()) { row->SetSurrogateKey(*(uint64_t*)keyData); sState.UpdateMaxKey(rowId); } key->CpKey((const uint8_t*)keyData, keyLen); - status = table->InsertRowNonTransactional(row, tid, key); + MOTCurrTxn->GetNextInsertItem()->SetItem(row, ix, key); + for (uint16_t i = 1; i < table->GetNumIndexes(); i++) { + ix = table->GetSecondaryIndex(i); + key = MOTCurrTxn->GetTxnKey(ix); + if (key == nullptr) { + status = RC_MEMORY_ALLOCATION_ERROR; + MOT_REPORT_ERROR(MOT_ERROR_OOM, + "Recover Insert Row", + "Failed to create key for secondary index %s", + ix->GetName().c_str()); + std::for_each(cleanupKeys.begin(), cleanupKeys.end(), [](Key*& key) { MOTCurrTxn->DestroyTxnKey(key); }); + MOTCurrTxn->Rollback(); + return; + } + cleanupKeys.push_back(key); + ix->BuildKey(table, row, key); + MOTCurrTxn->GetNextInsertItem()->SetItem(row, ix, key); + } + cleanupKeys.clear(); // subsequent call to insert row takes care of key cleanup + status = MOTCurrTxn->InsertRow(row); if (insertLocked == true) { row->GetPrimarySentinel()->Lock(0); } if (status == RC_UNIQUE_VIOLATION && DuplicateRow(table, keyData, keyLen, rowData, rowLen, tid)) { - /* Same row already exists. ok. */ - table->DestroyRow(row); + // Same row already exists. ok. + // no need to destroy row (already destroyed by TxnManager::InsertRow() in case of unique violation) status = RC_OK; } else if (status == RC_MEMORY_ALLOCATION_ERROR) { MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to insert row"); table->DestroyRow(row); - } else if (status != RC_OK) { - table->DestroyRow(row); } - - ix->DestroyKey(key); } void RecoveryManager::DeleteRow( uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, uint64_t csn, uint32_t tid, RC& status) { - Sentinel* pSentinel = nullptr; - RC rc; Row* row = nullptr; Key* key = nullptr; Index* index = nullptr; - uint64_t tableExId; - Table* table = nullptr; - if (!GetRecoveryManager()->FetchTable(tableId, table)) { - MOT_REPORT_ERROR(MOT_ERROR_INVALID_ARG, "Recovery Manager Delete Row", "table %u does not exist", tableId); + Table* table = MOTCurrTxn->GetTableByExternalId(exId); + if (table == nullptr) { + MOT_REPORT_ERROR( + MOT_ERROR_INVALID_ARG, "Recovery Manager Delete Row", "table %" PRIu64 " does not exist", exId); status = RC_ERROR; - MOT_LOG_ERROR("RecoveryManager::deleteRow - table %u does not exist", tableId); return; } index = table->GetPrimaryIndex(); - key = index->CreateNewKey(); - if (key == nullptr) { // OA: check result before using pointer + key = MOTCurrTxn->GetTxnKey(index); + if (key == nullptr) { MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Delete Row", "failed to create key"); status = RC_ERROR; return; } key->CpKey((const uint8_t*)keyData, keyLen); - - tableExId = table->GetTableExId(); - if (tableExId != exId) { - MOT_REPORT_ERROR( - MOT_ERROR_INTERNAL, "Recovery Manager Delete Row", "exId mismatch: my %lu - pkt %lu", tableExId, exId); - status = RC_ERROR; - index->DestroyKey(key); - return; - } - rc = table->FindRow(key, pSentinel, tid); - if ((rc != RC_OK) || (pSentinel == 0)) { - MOT_LOG_ERROR("RecoveryManager::deleteRow - findRow rc %u, sentinel %p", rc, pSentinel); - status = RC_OK; - index->DestroyKey(key); - return; - } - - row = pSentinel->GetData(); - if (row != 0) { - if (!table->RemoveRow(row, tid)) { + row = MOTCurrTxn->RowLookupByKey(table, WR, key); + if (row != nullptr) { + status = MOTCurrTxn->DeleteLastRow(); + if (status != RC_OK) { if (MOT_IS_OOM()) { - // OA: report error if remove row failed due to OOM (what about other errors?) MOT_REPORT_ERROR( MOT_ERROR_OOM, "Recovery Manager Delete Row", "failed to remove row due to lack of memory"); status = RC_MEMORY_ALLOCATION_ERROR; } else { - status = RC_ERROR; + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, + "Recovery Manager Delete Row", + "failed to remove row: %s (error code: %d)", + RcToString(status), + status); } - MOT_LOG_ERROR("RecoveryManager::deleteRow2 - findRow rc %u, sentinel %p", rc, pSentinel); } else { GetRecoveryManager()->IncreaseTableDeletesStat(table); } @@ -574,17 +586,18 @@ void RecoveryManager::DeleteRow( MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Delete Row", "getData failed"); status = RC_ERROR; } - index->DestroyKey(key); + MOTCurrTxn->DestroyTxnKey(key); status = RC_OK; } void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, char* rowData, uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status) { - Table* table = nullptr; - if (!GetRecoveryManager()->FetchTable(tableId, table)) { + Table* table = MOTCurrTxn->GetTableByExternalId(exId); + if (table == nullptr) { status = RC_ERROR; - MOT_REPORT_ERROR(MOT_ERROR_INVALID_ARG, "Recovery Manager Update Row", "table %u does not exist", tableId); + MOT_REPORT_ERROR( + MOT_ERROR_INVALID_ARG, "Recovery Manager Update Row", "table %" PRIu64 " does not exist", exId); return; } @@ -597,14 +610,14 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData, } Index* index = table->GetPrimaryIndex(); - Key* key = index->CreateNewKey(); - if (key == nullptr) { // OA: check result before using pointer + Key* key = MOTCurrTxn->GetTxnKey(index); + if (key == nullptr) { status = RC_ERROR; MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Update Row", "failed to create key"); return; } key->CpKey((const uint8_t*)keyData, keyLen); - Row* row = index->IndexRead(key, tid); + Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key); if (row == nullptr) { /// row not found... need to check row version // if row version is less than the updated row version it means that we @@ -628,7 +641,7 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData, csn); } } - index->DestroyKey(key); + MOTCurrTxn->DestroyTxnKey(key); } void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool addToEngine) @@ -647,7 +660,7 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool ad table = new (std::nothrow) Table(); if (table == nullptr) { - MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Create Table", "failed to allocate table's memory"); + MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Create Table", "failed to allocate table object"); status = RC_ERROR; return; } @@ -655,16 +668,16 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool ad table->Deserialize((const char*)data); do { if (!table->IsDeserialized()) { - MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to deserialize table"); + MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to de-serialize table"); break; } - if (addToEngine && !GetTableManager()->AddTable(table)) { + if (addToEngine && ((status = MOTCurrTxn->CreateTable(table)) != RC_OK)) { MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to add table to engine"); break; } - MOT_LOG_DEBUG("RecoveryManager::CreateTable: table %s [%lu] created (%s to engine)", + MOT_LOG_DEBUG("RecoveryManager::CreateTable: table %s [internal id %u] created (%s to engine)", table->GetLongTableName().c_str(), table->GetTableId(), addToEngine ? "added" : "not added"); @@ -675,60 +688,79 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool ad MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to recover table"); delete table; - status = RC_ERROR; + if (status == RC_OK) { + status = RC_ERROR; + } return; } void RecoveryManager::DropTable(char* data, RC& status) { char* in = (char*)data; - uint32_t tableId; + uint64_t externalTableId; Table* table; string tableName; - in = SerializablePOD::Deserialize(in, tableId); - table = GetTableManager()->GetTable(tableId); + in = SerializablePOD::Deserialize(in, externalTableId); + table = MOTCurrTxn->GetTableByExternalId(externalTableId); if (table == nullptr) { - MOT_LOG_DEBUG("DropTable: could not find table %u", tableId); + MOT_LOG_DEBUG("DropTable: could not find table %" PRIu64, externalTableId); /* this might happen if we try to replay an outdated xlog entry - currently we do not error out */ return; } tableName.assign(table->GetLongTableName()); - if (GetTableManager()->DropTable(table, MOT_GET_CURRENT_SESSION_CONTEXT()) != RC_OK) { + status = MOTCurrTxn->DropTable(table); + if (status != RC_OK) { MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Drop Table", - "Failed to drop table %s [%lu])", + "Failed to drop table %s [%" PRIu64 "])", tableName.c_str(), - tableId); - status = RC_ERROR; + externalTableId); } else { MOT::GetRecoveryManager()->m_tableDeletesStat.erase(table); } - MOT_LOG_DEBUG("RecoveryManager::DropTable: table %s [%lu] dropped", tableName.c_str(), tableId); + MOT_LOG_DEBUG("RecoveryManager::DropTable: table %s [%" PRIu64 "] dropped", tableName.c_str(), externalTableId); } void RecoveryManager::CreateIndex(char* data, uint32_t tid, RC& status) { char* in = (char*)data; - uint32_t tableId; + uint64_t externalTableId; Table* table; Table::CommonIndexMeta idx; - in = SerializablePOD::Deserialize(in, tableId); - table = GetTableManager()->GetTable(tableId); + in = SerializablePOD::Deserialize(in, externalTableId); + table = MOTCurrTxn->GetTableByExternalId(externalTableId); if (table == nullptr) { - MOT_REPORT_ERROR(MOT_ERROR_INVALID_ARG, "Recovery Manager Create Index", "Could not find table %u", tableId); + MOT_REPORT_ERROR( + MOT_ERROR_INVALID_ARG, "Recover Create Index", "Could not find table %" PRIu64, externalTableId); status = RC_ERROR; return; } in = table->DesrializeMeta(in, idx); - if (idx.m_indexOrder == IndexOrder::INDEX_ORDER_PRIMARY) { - MOT_LOG_DEBUG("createIndex: creating Primary Index"); - table->CreateIndexFromMeta(idx, true, tid); - } else { - MOT_LOG_DEBUG("createIndex: creating Secondary Index"); - table->CreateIndexFromMeta(idx, false, tid); + bool primary = idx.m_indexOrder == IndexOrder::INDEX_ORDER_PRIMARY; + MOT_LOG_DEBUG("createIndex: creating %s Index", primary ? "Primary" : "Secondary"); + Index* index = nullptr; + status = table->CreateIndexFromMeta(idx, primary, tid, false, &index); + if (status != RC_OK) { + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, + "Recover Create Index", + "Failed to create index for table %" PRIu64 " from meta-data: %s (error code: %d)", + externalTableId, + RcToString(status), + status); + } + if (status == RC_OK) { + status = MOTCurrTxn->CreateIndex(table, index, primary); + if (status != RC_OK) { + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, + "Recover Create Index", + "Failed to create index for table %" PRIu64 ": %s (error code: %d)", + externalTableId, + RcToString(status), + status); + } } } @@ -736,21 +768,26 @@ void RecoveryManager::DropIndex(char* data, RC& status) { RC res; char* in = (char*)data; - uint32_t tableId; + uint64_t externalTableId; Table* table; uint32_t indexNameLength; string indexName; - in = SerializablePOD::Deserialize(in, tableId); - table = GetTableManager()->GetTable(tableId); + in = SerializablePOD::Deserialize(in, externalTableId); + table = MOTCurrTxn->GetTableByExternalId(externalTableId); if (table == nullptr) { /* this might happen if we try to replay an outdated xlog entry - currently we do not error out */ - MOT_LOG_DEBUG("dropIndex: could not find table %u", tableId); + MOT_LOG_DEBUG("dropIndex: could not find table %" PRIu64, externalTableId); return; } in = SerializableSTR::Deserialize(in, indexName); - res = table->RemoveSecondaryIndex((char*)(indexName.c_str()), MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager()); + Index* index = table->GetSecondaryIndex(indexName); + if (index == nullptr) { + res = RC_INDEX_NOT_FOUND; + } else { + res = MOTCurrTxn->DropIndex(index); + } if (res != RC_OK) { MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Drop Index", @@ -764,18 +801,20 @@ void RecoveryManager::TruncateTable(char* data, RC& status) { RC res = RC_OK; char* in = (char*)data; - uint32_t tableId; + uint64_t externalTableId; Table* table; - in = SerializablePOD::Deserialize(in, tableId); - table = GetTableManager()->GetTable(tableId); + in = SerializablePOD::Deserialize(in, externalTableId); + table = MOTCurrTxn->GetTableByExternalId(externalTableId); if (table == nullptr) { /* this might happen if we try to replay an outdated xlog entry - currently we do not error out */ - MOT_LOG_DEBUG("truncateTable: could not find table %u", tableId); + MOT_LOG_DEBUG("truncateTable: could not find table %" PRIu64, externalTableId); return; } - table->Truncate(MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager()); + table->WrLock(); + status = MOTCurrTxn->TruncateTable(table); + table->Unlock(); } // in-process (2pc) transactions recovery @@ -839,8 +878,8 @@ uint32_t RecoveryManager::TwoPhaseRecoverOp(RecoveryOpState state, uint8_t* data if (opCode == CREATE_ROW) ret += sizeof(uint64_t); // rowId - Table* table = nullptr; - if (!GetRecoveryManager()->FetchTable(tableId, table)) { + Table* table = MOTCurrTxn->GetTableByExternalId(exId); + if (table == nullptr) { status = RC_ERROR; MOT_LOG_ERROR("RecoveryManager::applyInProcessInsert: fetch table failed (id %lu)", tableId); return ret; @@ -1080,4 +1119,65 @@ bool RecoveryManager::DuplicateRow( index->DestroyKey(key); return res; } + +bool RecoveryManager::BeginTransaction(uint64_t replayLsn /* = 0 */) +{ + bool result = false; + SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT(); + if (sessionContext == nullptr) { + MOT_REPORT_ERROR( + MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot start recovery transaction: no session context"); + } else { + TxnManager* txn = sessionContext->GetTxnManager(); + txn->StartTransaction(INVALID_TRANSACTIOIN_ID, READ_COMMITED); + txn->SetReplayLsn(replayLsn); + result = true; + } + return result; +} + +RC RecoveryManager::CommitTransaction(uint64_t csn) +{ + RC result = RC_ERROR; + SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT(); + if (sessionContext == nullptr) { + MOT_REPORT_ERROR( + MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot commit recovery transaction: no session context"); + } else { + TxnManager* txn = sessionContext->GetTxnManager(); + result = txn->Commit(INVALID_TRANSACTIOIN_ID, csn); + if (result != RC_OK) { + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, + "Recover DB", + "Failed to commit recovery transaction: %s (error code: %d)", + RcToString(result), + (int)result); + txn->Rollback(); + } else { + txn->EndTransaction(); + } + } + return result; +} + +RC RecoveryManager::RollbackTransaction() +{ + RC result = RC_ERROR; + SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT(); + if (sessionContext == nullptr) { + MOT_REPORT_ERROR( + MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot rollback recovery transaction: no session context"); + } else { + TxnManager* txn = sessionContext->GetTxnManager(); + result = txn->Rollback(); + if (result != RC_OK) { + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, + "Recover DB", + "Failed to rollback recovery transaction: %s (error code: %d)", + RcToString(result), + (int)result); + } + } + return result; +} } // namespace MOT diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.cpp b/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.cpp index 4c4754738b..ae7bda801a 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.cpp +++ b/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.cpp @@ -51,7 +51,7 @@ void* RedoLogTransactionIterator::GetTransactionEntry() return reinterpret_cast(m_buffer + m_position); } -LogSegment* RedoLogTransactionIterator::AllocRedoSegment() +LogSegment* RedoLogTransactionIterator::AllocRedoSegment(uint64_t replayLsn) { LogSegment* segment = new (std::nothrow) LogSegment(); if (segment == nullptr) { @@ -69,6 +69,7 @@ LogSegment* RedoLogTransactionIterator::AllocRedoSegment() reinterpret_cast(m_buffer + m_position + sizeof(uint32_t)), segment->m_len); securec_check(erc, "\0", "\0"); + segment->m_replayLsn = replayLsn; return segment; } diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.h b/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.h index a7ed3a6dfb..1cadb24998 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.h +++ b/src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.h @@ -41,6 +41,8 @@ struct LogSegment : public Serializable { EndSegmentBlock m_controlBlock; + uint64_t m_replayLsn; + /** * @brief fetches the size of the log segment * @return Size_t value denoting the size of the segment. @@ -106,7 +108,7 @@ public: void* GetTransactionEntry(); - LogSegment* AllocRedoSegment(); + LogSegment* AllocRedoSegment(uint64_t replayLsn); private: char* m_buffer; diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp index 9ba9671804..4bef0df1ed 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp @@ -227,9 +227,13 @@ RC TxnManager::LitePrepare(TransactionId transactionId) return RC_OK; } -RC TxnManager::CommitInternal() +RC TxnManager::CommitInternal(uint64_t csn) { - SetCommitSequenceNumber(GetCSNManager().GetNextCSN()); + if (csn == MOT_INVALID_CSN) { + SetCommitSequenceNumber(GetCSNManager().GetNextCSN()); + } else { + SetCommitSequenceNumber(csn); // for recovery + } // Record the start write phase for this transaction if (GetGlobalConfiguration().m_enableCheckpoint) { GetCheckpointManager()->BeginTransaction(this); @@ -255,14 +259,14 @@ RC TxnManager::Commit() return Commit(INVALID_TRANSACTIOIN_ID); } -RC TxnManager::Commit(uint64_t transcationId) +RC TxnManager::Commit(uint64_t transcationId, uint64_t csn /* = MOT_INVALID_CSN */) { // Validate concurrency control if (transcationId != INVALID_TRANSACTIOIN_ID) m_transactionId = transcationId; RC rc = m_occManager.ValidateOcc(this); if (rc == RC_OK) { - rc = CommitInternal(); + rc = CommitInternal(csn); MOT::DbSessionStatisticsProvider::GetInstance().AddCommitTxn(); } return rc; @@ -515,6 +519,7 @@ void TxnManager::RollbackDDLs() indexes = (Index**)ddl_access->GetEntry(); table = indexes[0]->GetTable(); MOT_LOG_INFO("Rollback of truncate table %s", table->GetLongTableName().c_str()); + table->WrLock(); for (int idx = 0; idx < table->GetNumIndexes(); idx++) { index = table->m_indexes[idx]; table->m_indexes[idx] = indexes[idx]; @@ -526,6 +531,7 @@ void TxnManager::RollbackDDLs() index->Truncate(true); delete index; } + table->Unlock(); delete[] indexes; break; case DDL_ACCESS_CREATE_INDEX: @@ -534,6 +540,7 @@ void TxnManager::RollbackDDLs() MOT_LOG_INFO("Rollback of create index %s for table %s", index->GetName().c_str(), table->GetLongTableName().c_str()); + table->WrLock(); if (index->IsPrimaryKey()) { table->SetPrimaryIndex(nullptr); GcManager::ClearIndexElements(index->GetIndexId()); @@ -541,6 +548,7 @@ void TxnManager::RollbackDDLs() } else { table->RemoveSecondaryIndex((char*)index->GetName().c_str(), this); } + table->Unlock(); break; case DDL_ACCESS_DROP_INDEX: index = (Index*)ddl_access->GetEntry(); @@ -549,7 +557,9 @@ void TxnManager::RollbackDDLs() index->GetName().c_str(), table->GetLongTableName().c_str()); if (index->IsPrimaryKey()) { + table->WrLock(); table->SetPrimaryIndex(index); + table->Unlock(); } break; default: @@ -595,7 +605,9 @@ void TxnManager::WriteDDLChanges() table = index->GetTable(); index->SetIsCommited(true); if (index->IsPrimaryKey()) { + table->WrLock(); table->SetPrimaryIndex(index); + table->Unlock(); } break; case DDL_ACCESS_DROP_INDEX: @@ -679,6 +691,7 @@ TxnManager::TxnManager(SessionContext* session_context) m_checkpointNABit(false), m_csn(0), m_transactionId(INVALID_TRANSACTIOIN_ID), + m_replayLsn(0), m_surrogateGen(0), m_flushDone(false), m_internalTransactionId(((uint64_t)m_sessionContext->GetSessionId()) << SESSION_ID_BITS), diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn.h b/src/gausskernel/storage/mot/core/src/system/transaction/txn.h index d7eb1b2727..eddec1d76a 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn.h @@ -41,6 +41,7 @@ #include "bitmapset.h" #include "txn_ddl_access.h" #include "mm_session_api.h" +#include "commit_sequence_number.h" namespace MOT { class MOTContext; @@ -56,6 +57,8 @@ class LoggerTask; class Key; class Index; +#define MOTCurrTxn MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager() + /** * @class TxnManager * @brief Transaction manager is used to manage the life cycle of a single @@ -138,6 +141,16 @@ public: m_internalTransactionId = transactionId; } + inline void SetReplayLsn(uint64_t replayLsn) + { + m_replayLsn = replayLsn; + } + + inline uint64_t GetReplayLsn() const + { + return m_replayLsn; + } + void RemoveTableFromStat(Table* t); /** @@ -151,7 +164,7 @@ public: * @return Result code denoting success or failure. */ RC Commit(); - RC Commit(uint64_t transcationId); + RC Commit(uint64_t transcationId, uint64_t csn = MOT_INVALID_CSN); RC LiteCommit(uint64_t transcationId); /** @@ -472,7 +485,7 @@ private: /** * @brief Internal commit (used by commit and commitPrepared) */ - RC CommitInternal(); + RC CommitInternal(uint64_t csn); RC RollbackInternal(bool isPrepared); // Disable class level new operator @@ -543,8 +556,10 @@ private: /** @var transaction_id Provided by envelop on start transaction. */ uint64_t m_transactionId; - /** @var serogate_counter Promotes every insert - transaction. */ + /** @var Replay LSN for this transaction, used only during replay in standby. */ + uint64_t m_replayLsn; + + /** @var surrogate_counter Promotes every insert transaction. */ SurrogateKeyGenerator m_surrogateGen; bool m_flushDone; diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_writer.cpp b/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_writer.cpp index 10af02bf1a..a1b6f73394 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_writer.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_writer.cpp @@ -238,7 +238,7 @@ bool RedoLogWriter::AppendPartial(RedoLogBuffer& redoLogBuffer, TxnManager* txn) bool RedoLogWriter::AppendIndex(RedoLogBuffer& buffer, Table* table, Index* index) { size_t bufSize = table->SerializeItemSize(index); - uint16_t entrySize = sizeof(OperationCode) + sizeof(size_t) + sizeof(uint32_t) + bufSize + sizeof(EndSegmentBlock); + uint16_t entrySize = sizeof(OperationCode) + sizeof(size_t) + sizeof(uint64_t) + bufSize + sizeof(EndSegmentBlock); if (buffer.FreeSize() < entrySize) return false; @@ -248,7 +248,7 @@ bool RedoLogWriter::AppendIndex(RedoLogBuffer& buffer, Table* table, Index* inde return false; } - uint32_t tableId = table->GetTableId(); + uint64_t tableId = table->GetTableExId(); table->SerializeItem(buf, index); buffer.Append(OperationCode::CREATE_INDEX); buffer.Append(bufSize); @@ -262,10 +262,10 @@ bool RedoLogWriter::AppendIndex(RedoLogBuffer& buffer, Table* table, Index* inde bool RedoLogWriter::AppendDropIndex(RedoLogBuffer& buffer, Table* table, Index* index) { uint16_t entrySize = - sizeof(OperationCode) + sizeof(uint32_t) + sizeof(size_t) + index->GetName().length() + sizeof(EndSegmentBlock); + sizeof(OperationCode) + sizeof(uint64_t) + sizeof(size_t) + index->GetName().length() + sizeof(EndSegmentBlock); if (buffer.FreeSize() < entrySize) return false; - uint32_t tableId = table->GetTableId(); + uint64_t tableId = table->GetTableExId(); buffer.Append(OperationCode::DROP_INDEX); buffer.Append(tableId); buffer.Append(index->GetName().length()); @@ -297,11 +297,11 @@ bool RedoLogWriter::AppendTable(RedoLogBuffer& buffer, Table* table) bool RedoLogWriter::AppendDropTable(RedoLogBuffer& buffer, Table* table) { - uint16_t entrySize = sizeof(OperationCode) + sizeof(uint32_t) + sizeof(EndSegmentBlock); + uint16_t entrySize = sizeof(OperationCode) + sizeof(uint64_t) + sizeof(EndSegmentBlock); if (buffer.FreeSize() < entrySize) return false; - uint32_t tableId = table->GetTableId(); + uint64_t tableId = table->GetTableExId(); buffer.Append(OperationCode::DROP_TABLE); buffer.Append(tableId); return true; @@ -309,11 +309,11 @@ bool RedoLogWriter::AppendDropTable(RedoLogBuffer& buffer, Table* table) bool RedoLogWriter::AppendTruncateTable(RedoLogBuffer& buffer, Table* table) { - uint16_t entrySize = sizeof(OperationCode) + sizeof(uint32_t) + sizeof(EndSegmentBlock); + uint16_t entrySize = sizeof(OperationCode) + sizeof(uint64_t) + sizeof(EndSegmentBlock); if (buffer.FreeSize() < entrySize) return false; - uint32_t tableId = table->GetTableId(); + uint64_t tableId = table->GetTableExId(); buffer.Append(OperationCode::TRUNCATE_TABLE); buffer.Append(tableId); return true; diff --git a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw.cpp b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw.cpp index bdbc208d5c..a1a8bc75a3 100644 --- a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw.cpp +++ b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw.cpp @@ -1329,13 +1329,16 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR RangeTblEntry* rte = planner_rt_fetch(resultRelation, root); Relation rel = heap_open(rte->relid, NoLock); TupleDesc desc = RelationGetDescr(rel); - bool isFromScan = false; uint8_t attrsModify[BITMAP_GETLEN(desc->natts)]; + uint8_t* ptrAttrsModify = attrsModify; MOT::TxnManager* currTxn = GetSafeTxn(/*GetCurrentTransactionId()*/); MOT::Table* table = currTxn->GetTableByExternalId(RelationGetRelid(rel)); if ((int)resultRelation < root->simple_rel_array_size && root->simple_rel_array[resultRelation] != nullptr) { - isFromScan = true; + if (root->simple_rel_array[resultRelation]->fdw_private != nullptr) { + fdwState = (MOTFdwStateSt*)root->simple_rel_array[resultRelation]->fdw_private; + ptrAttrsModify = fdwState->m_attrsUsed; + } } else { fdwState = (MOTFdwStateSt*)palloc0(sizeof(MOTFdwStateSt)); fdwState->m_cmdOper = plan->operation; @@ -1367,7 +1370,7 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR securec_check(erc, "\0", "\0"); for (int i = 0; i < desc->natts; i++) { if (bms_is_member(desc->attrs[i]->attnum - FirstLowInvalidHeapAttributeNumber, rte->updatedCols)) { - BITMAP_SET(attrsModify, (desc->attrs[i]->attnum - 1)); + BITMAP_SET(ptrAttrsModify, (desc->attrs[i]->attnum - 1)); } } break; @@ -1378,7 +1381,7 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR securec_check(erc, "\0", "\0"); for (int i = 0; i < desc->natts; i++) { if (!desc->attrs[i]->attisdropped) { - BITMAP_SET(attrsModify, (desc->attrs[i]->attnum - 1)); + BITMAP_SET(ptrAttrsModify, (desc->attrs[i]->attnum - 1)); } } } @@ -1390,8 +1393,8 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR heap_close(rel, NoLock); - return (isFromScan ? (List*)BitmapSerialize(nullptr, attrsModify, BITMAP_GETLEN(desc->natts)) - : (List*)SerializeFdwState(fdwState)); + return ((fdwState == nullptr) ? (List*)BitmapSerialize(nullptr, attrsModify, BITMAP_GETLEN(desc->natts)) + : (List*)SerializeFdwState(fdwState)); } static TupleTableSlot* MOTExecForeignInsert( @@ -2121,7 +2124,7 @@ uint64_t MOTCheckpointGetId() { MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance(); if (engine != nullptr) { - return engine->GetCheckpointManager()->GetId(); + return engine->GetRecoveryManager()->GetCheckpointId(); } return 0; } diff --git a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp index 5fad11a84f..a257624aee 100644 --- a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp +++ b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp @@ -71,8 +71,7 @@ void MOTRedo(XLogReaderState* record) if (!IsValidEntry(recordType)) { elog(ERROR, "MOTRedo: invalid op code %u", recordType); } - if (MOT::GetRecoveryManager()->IsErrorSet() || - !MOT::GetRecoveryManager()->ApplyLogSegmentFromData(lsn, data, len)) { + if (MOT::GetRecoveryManager()->IsErrorSet() || !MOT::GetRecoveryManager()->ApplyRedoLog(lsn, data, len)) { // we treat errors fatally. ereport(FATAL, (MOTXlateRecoveryErr(MOT::GetRecoveryManager()->GetErrorCode()), diff --git a/src/test/ha/testcase/multi_standby_single/failover_mot.sh b/src/test/ha/testcase/multi_standby_single/failover_mot.sh index 8c32b43d27..8e6f86ff19 100644 --- a/src/test/ha/testcase/multi_standby_single/failover_mot.sh +++ b/src/test/ha/testcase/multi_standby_single/failover_mot.sh @@ -38,8 +38,9 @@ function test_1() fi start_primary_as_standby - sleep 5 + sleep 30 switchover_to_primary + sleep 30 #test the copy results on dn1_primary if [ $(gsql -d $db -p $dn1_primary_port -c "select pgxc_pool_reload();select count(1) from cstore_copy_t1;" | grep `expr 1 \* $cstore_rawdata_lines` |wc -l) -eq 1 ]; then diff --git a/src/test/ha/testcase/multi_standby_single/inc_build_failover_mot.sh b/src/test/ha/testcase/multi_standby_single/inc_build_failover_mot.sh index de5feb44e4..805d4c9fe4 100644 --- a/src/test/ha/testcase/multi_standby_single/inc_build_failover_mot.sh +++ b/src/test/ha/testcase/multi_standby_single/inc_build_failover_mot.sh @@ -13,18 +13,32 @@ function test_1() #create mot data gsql -d $db -p $dn1_primary_port -c "DROP FOREIGN TABLE if exists mot_switch1; CREATE FOREIGN TABLE mot_switch1(id INT,name VARCHAR(15) NOT NULL) SERVER mot_server;" gsql -d $db -p $dn1_primary_port -c "copy mot_switch1 from '$g_data_path/datanode1/pg_copydir/data5';" - + + print_time echo "start cluter success!" + inc_build_pattern="dn incremental build completed" + print_time + echo "killing primary" kill_primary + + print_time echo "primary killed" + + print_time + echo "failing over to standby" failover_to_standby - echo "failover_to_standby" + + print_time + echo "failover_to_standby DONE" + + print_time echo build #sleep 5 build_result=`gs_ctl build -D ${primary_data_dir}` #build_result=`gs_ctl build -D ${primary_data_dir} -b full` echo $build_result + print_time if [[ $build_result =~ $inc_build_pattern ]] then echo "inc build success" @@ -32,9 +46,11 @@ function test_1() echo "inc build $failed_keyword" fi - sleep 5 + # wait for recovery to complete on primary node + sleep 30 gsql -d $db -p $dn1_primary_port -m -c "select count(1) from mot_switch1;" + print_time if [ $(gsql -d $db -p $dn1_primary_port -m -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then echo "copy success on dn1_primary" else @@ -42,6 +58,7 @@ function test_1() fi gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;" + print_time if [ $(gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then echo "copy success on dn1_standby" else @@ -51,6 +68,8 @@ function test_1() function tear_down() { sleep 1 + print_time + echo "Test done. Tearing down cluter. Failing over to primary after test" failover_to_primary gsql -d $db -p $dn1_standby_port -m -c " select * from pg_drop_replication_slot('dn_s2');" gsql -d $db -p $dn1_standby_port -m -c " select * from pg_drop_replication_slot('dn_s3');" diff --git a/src/test/ha/testcase/multi_standby_single/inc_build_reconnect_mot.sh b/src/test/ha/testcase/multi_standby_single/inc_build_reconnect_mot.sh index da48bbd644..058f45f2e2 100644 --- a/src/test/ha/testcase/multi_standby_single/inc_build_reconnect_mot.sh +++ b/src/test/ha/testcase/multi_standby_single/inc_build_reconnect_mot.sh @@ -14,9 +14,13 @@ function test_1() gsql -d $db -p $dn1_primary_port -c "copy mot_switch1 from '$g_data_path/datanode1/pg_copydir/data5';" inc_build_pattern="dn incremental build completed" + print_time + echo "Killing stadnby..." kill_standby - echo "standy killed" + print_time + echo "standy killed, building standby..." build_result=`gs_ctl build -D ${standby_data_dir}` + print_time if [[ $build_result =~ $inc_build_pattern ]] then echo "inc build success" @@ -24,16 +28,23 @@ function test_1() echo "inc build $failed_keyword" fi - sleep 5 + # wait for recovery on standby to complete + sleep 30 + print_time + echo "Querying primary..." gsql -d $db -p $dn1_primary_port -c "select count(1) from mot_switch1;" + print_time if [ $(gsql -d $db -p $dn1_primary_port -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then echo "copy success on dn1_primary" else echo "copy $failed_keyword on dn1_primary" fi + print_time + echo "Attempting access to standby" gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;" + print_time if [ $(gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then echo "copy success on dn1_standby" else diff --git a/src/test/ha/util.sh b/src/test/ha/util.sh index fb2e8a33f7..1e2ccc0d3b 100644 --- a/src/test/ha/util.sh +++ b/src/test/ha/util.sh @@ -315,6 +315,35 @@ function failover_to_standby4() { fi } +function print_time() { + cur_time=`date +"%F %T.%N" | cut -c1-23` + echo -n "[${cur_time}] " +} + +function wait_recovery_done() { + # wait for recovery to complete on primary node + # this function is incomplete as it does not find the correct log line (still need to support minimum timstamp for search) + recovery_done=0 + node_data_dir=$1 + wait_time_seconds=$2 + last_log=`ls -ltr ${node_data_dir}/pg_log/postgresql-* | tail -1 | awk '{print $9}'` + for i in `seq 1 $wait_time_seconds`; + do + recovery_done=`grep "database system is ready to accept read only connections" $last_log | wc -l` + if [ $recovery_done -eq 1 ]; then + print_time + echo "Recovery done on node detected after $i seconds at: $node_data_dir" + break + fi + sleep 1 + done + + if [ $recovery_done -eq 0 ]; then + print_time + echo "Failed to find recovery done message after $wait_time_seconds seconds in node log at: $node_data_dir" + fi +} + #check_synchronous_commit "datanode1" 1 #check_detailed_instance #check_primary "datanode1" 2 -- Gitee