diff --git a/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.cpp b/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.cpp index fc2bc9371a54ea781770642ba2b5dc78842d226a..ee6d6928030d52ea87fb6ff6c8dee6d998c4b82a 100644 --- a/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.cpp +++ b/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.cpp @@ -14,7 +14,7 @@ * ------------------------------------------------------------------------- * * rw_lock.cpp - * Implements a reader/writer lock used in the checkpoint manager. + * Implements a reader/writer lock using spinlock. * * IDENTIFICATION * src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.cpp diff --git a/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.h b/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.h index 088e182712bc8e7c6f807e10342eb2d933cbd1e5..f317a62b6a2bdb5c6dd9b31c72869b640586d170 100644 --- a/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.h +++ b/src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.h @@ -14,7 +14,7 @@ * ------------------------------------------------------------------------- * * rw_lock.h - * Implements a reader/writer lock used in the checkpoint manager. + * Implements a reader/writer lock using spinlock. * * IDENTIFICATION * src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.h @@ -30,7 +30,7 @@ typedef unsigned Spinlock; /** * @class RwLock - * @brief this class implements a reader/writer lock used in the checkpint manager + * @brief this class implements a reader/writer lock using spinlock */ class RwLock { public: diff --git a/src/gausskernel/storage/mot/core/src/storage/table.cpp b/src/gausskernel/storage/mot/core/src/storage/table.cpp index 371f19df874888fb251657d3c54fbb26ab8bd8c7..7e3038e701ca85e1773bffea345f6d4244f6c746 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.cpp +++ b/src/gausskernel/storage/mot/core/src/storage/table.cpp @@ -67,10 +67,21 @@ Table::~Table() if (m_rowPool) { ObjAllocInterface::FreeObjPool(&m_rowPool); } + + int destroyRc = pthread_rwlock_destroy(&m_rwLock); + if (destroyRc != 0) { + MOT_LOG_ERROR("~Table: rwlock destroy failed (%d)", destroyRc); + } } bool Table::Init(const char* tableName, const char* longName, unsigned int fieldCnt, uint64_t tableExId) { + int initRc = pthread_rwlock_init(&m_rwLock, NULL); + if (initRc != 0) { + MOT_LOG_ERROR("failed to initialize Table %s, could not init rwlock (%d)", tableName, initRc); + return false; + } + m_tableName.assign(tableName); m_longTableName.assign(longName); @@ -676,7 +687,7 @@ void Table::PrintSchema() void Table::Truncate(TxnManager* txn) { uint32_t pid = txn->GetThdId(); - m_mutex.lock(); + (void)pthread_rwlock_wrlock(&m_rwLock); // first destroy secondary index data for (int i = 1; i < m_numIndexes; i++) { @@ -696,7 +707,7 @@ void Table::Truncate(TxnManager* txn) m_longTableName.c_str()); } - m_mutex.unlock(); + (void)pthread_rwlock_unlock(&m_rwLock); } void Table::Compact(TxnManager* txn) @@ -1039,7 +1050,7 @@ RC Table::DropImpl() if (m_numIndexes == 0) return res; - m_mutex.lock(); + (void)pthread_rwlock_wrlock(&m_rwLock); do { m_secondaryIndexes.clear(); MOT_LOG_DEBUG("DropImpl numIndexes = %d \n", m_numIndexes); @@ -1056,7 +1067,7 @@ RC Table::DropImpl() } m_numIndexes = 0; } while (0); - m_mutex.unlock(); + (void)pthread_rwlock_unlock(&m_rwLock); return res; } diff --git a/src/gausskernel/storage/mot/core/src/storage/table.h b/src/gausskernel/storage/mot/core/src/storage/table.h index 467c6e8af47b3e7c4c0083425fc7d849e2039fa8..ed0d83f259fe2e4aaaf47e1c3e65db842dd8d5e0 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.h +++ b/src/gausskernel/storage/mot/core/src/storage/table.h @@ -30,6 +30,7 @@ #include #include #include +#include #include "global.h" #include "sentinel.h" #include "surrogate_key_generator.h" @@ -704,19 +705,39 @@ public: } /** - * @brief locks that table for modifications + * @brief takes a read lock on the table. */ - void Lock() + void RdLock() { - m_mutex.lock(); + (void)pthread_rwlock_rdlock(&m_rwLock); } /** - * @brief unlocks the table + * @brief tries to takes a write lock on the table. + * @return True on success, False if the lock could not be acquired. + */ + bool WrTryLock() + { + if (pthread_rwlock_trywrlock(&m_rwLock) != 0) { + return false; + } + return true; + } + + /** + * @brief takes a write lock on the table. + */ + void WrLock() + { + (void)pthread_rwlock_wrlock(&m_rwLock); + } + + /** + * @brief releases the table lock. */ void Unlock() { - m_mutex.unlock(); + (void)pthread_rwlock_unlock(&m_rwLock); } bool IsDeserialized() const @@ -785,8 +806,8 @@ private: /** @var Secondary index map accessed by name. */ SecondaryIndexMap m_secondaryIndexes; - /** @var Lock that guards against deletion while checkpointing. */ - std::mutex m_mutex; + /** @var RW Lock that guards against deletion during checkpoint/vacuum. */ + pthread_rwlock_t m_rwLock; string m_tableName; diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp index 683dd94064b775976086c9f4ecc37a927884411d..47b1d2a9c77d97995bfde1005d4706671d93d840 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp @@ -47,7 +47,6 @@ CheckpointManager::CheckpointManager() m_availableBit(true), m_numCpTasks(0), m_numThreads(GetGlobalConfiguration().m_checkpointWorkers), - m_checkpointValidation(GetGlobalConfiguration().m_validateCheckpoint), m_cpSegThreshold(GetGlobalConfiguration().m_checkpointSegThreshold), m_stopFlag(false), m_checkpointEnded(false), @@ -56,10 +55,22 @@ CheckpointManager::CheckpointManager() m_counters{{0}}, m_lsn(0), m_id(0), + m_inProgressId(0), m_lastReplayLsn(0), m_emptyCheckpoint(false) {} +bool CheckpointManager::Initialize() +{ + int initRc = pthread_rwlock_init(&m_fetchLock, NULL); + if (initRc != 0) { + MOT_LOG_ERROR("Failed to initialize CheckpointManager, could not init rwlock (%d)", initRc); + return false; + } + + return true; +} + void CheckpointManager::ResetFlags() { m_checkpointEnded = false; @@ -74,26 +85,24 @@ CheckpointManager::~CheckpointManager() delete m_checkpointers; m_checkpointers = nullptr; } + (void)pthread_rwlock_destroy(&m_fetchLock); } bool CheckpointManager::CreateSnapShot() { - if (!CheckpointManager::CreateCheckpointId(m_id)) { + if (!CheckpointManager::CreateCheckpointId(m_inProgressId)) { MOT_LOG_ERROR("Could not begin checkpoint, checkpoint id creation failed"); OnError(CheckpointWorkerPool::ErrCodes::CALC, "Could not begin checkpoint, checkpoint id creation failed"); return false; } - MOT_LOG_INFO("Creating MOT checkpoint snapshot: id: %lu", GetId()); + MOT_LOG_INFO("Creating MOT checkpoint snapshot: id: %lu", m_inProgressId); if (m_phase != CheckpointPhase::REST) { MOT_LOG_WARN("Could not begin checkpoint, checkpoint is already running"); OnError(CheckpointWorkerPool::ErrCodes::CALC, "Could not begin checkpoint, checkpoint is already running"); return false; } - MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance(); - - engine->LockDDLForCheckpoint(); ResetFlags(); // Ensure that there are no transactions that started in Checkpoint COMPLETE @@ -114,29 +123,36 @@ bool CheckpointManager::CreateSnapShot() bool CheckpointManager::SnapshotReady(uint64_t lsn) { - MOT_LOG_INFO("MOT snapshot ready. id: %lu, lsn: %lu", GetId(), m_lsn); + MOT_LOG_INFO("MOT snapshot ready. id: %lu, lsn: %lu", m_inProgressId, m_lsn); if (m_phase != CheckpointPhase::CAPTURE) { MOT_LOG_ERROR("BAD Checkpoint state. Checkpoint ID: %lu, expected: 'CAPTURE', actual: %s", - GetId(), + m_inProgressId, PhaseToString(m_phase)); m_errorSet = true; } else { SetLsn(lsn); if (m_redoLogHandler != nullptr) m_redoLogHandler->WrUnlock(); - MOT_LOG_DEBUG("Checkpoint snapshot ready. Checkpoint ID: %lu, LSN: %lu", GetId(), GetLsn()); + MOT_LOG_DEBUG("Checkpoint snapshot ready. Checkpoint ID: %lu, LSN: %lu", m_inProgressId, GetLsn()); } return !m_errorSet; } bool CheckpointManager::BeginCheckpoint() { - MOT_LOG_INFO("MOT begin checkpoint capture. id: %lu, lsn: %lu", GetId(), m_lsn); + MOT_LOG_INFO("MOT begin checkpoint capture. id: %lu, lsn: %lu", m_inProgressId, m_lsn); Capture(); while (!m_checkpointEnded) { usleep(100000L); + if (m_finishedTasks.empty() == false) { + std::lock_guard guard(m_tasksMutex); + UnlockAndClearTables(m_finishedTasks); + } } + // No locking required here, as the checkpoint workers have already exited. + UnlockAndClearTables(m_finishedTasks); + // Move to complete. // No need to wait for transactions started in previous checkpoint phase // to complete since there are no transactions that start in RESOLVE @@ -145,9 +161,13 @@ bool CheckpointManager::BeginCheckpoint() m_lock.WrUnlock(); if (!m_errorSet) { - CompleteCheckpoint(GetId()); + CompleteCheckpoint(); } + // No locking required here, as the checkpoint workers have already exited. + UnlockAndClearTables(m_tasksList); + m_numCpTasks = 0; + // Ensure that there are no transactions that started in Checkpoint CAPTURE // phase that are not yet completed before moving to REST phase WaitPrevPhaseCommittedTxnComplete(); @@ -157,8 +177,6 @@ bool CheckpointManager::BeginCheckpoint() MoveToNextPhase(); m_lock.WrUnlock(); - MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance(); - engine->UnlockDDLForCheckpoint(); return !m_errorSet; } @@ -183,13 +201,15 @@ bool CheckpointManager::Abort() // phase that are not yet completed before moving to REST phase WaitPrevPhaseCommittedTxnComplete(); + // No locking required here, as there no checkpoint workers when the control reaches here. + UnlockAndClearTables(m_tasksList); + UnlockAndClearTables(m_finishedTasks); + m_numCpTasks = 0; + // Move to rest m_lock.WrLock(); MoveToNextPhase(); m_lock.WrUnlock(); - - MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance(); - engine->UnlockDDLForCheckpoint(); } return true; } @@ -250,8 +270,10 @@ void CheckpointManager::CommitTransaction(TxnManager* txn, int writeSetSize) void CheckpointManager::TransactionCompleted(TxnManager* txn) { - if (txn->m_checkpointPhase == CheckpointPhase::NONE) + if (txn->m_checkpointPhase == CheckpointPhase::NONE) { return; + } + m_lock.RdLock(); CheckpointPhase current_phase = m_phase; if (txn->m_checkpointPhase == m_phase) { // current phase @@ -302,14 +324,18 @@ void CheckpointManager::MoveToNextPhase() m_phase = (CheckpointPhase)nextPhase; m_cntBit = !m_cntBit; - if (m_phase == CheckpointPhase::CAPTURE && m_redoLogHandler != nullptr) { - // hold the redo log lock to avoid inserting additional entries to the - // log. Once snapshot is taken, this lock will be released in SnapshotReady(). - m_redoLogHandler->WrLock(); + if (m_phase == CheckpointPhase::PREPARE) { + // Obtain a list of all tables. The tables are read locked + // in order to avoid delete/truncate during checkpoint. + FillTasksQueue(); } - if (m_phase == PREPARE && m_checkpointValidation == true) { - Checkbits(); + if (m_phase == CheckpointPhase::CAPTURE) { + if (m_redoLogHandler != nullptr) { + // hold the redo log lock to avoid inserting additional entries to the + // log. Once snapshot is taken, this lock will be released in SnapshotReady(). + m_redoLogHandler->WrLock(); + } } // there are no open transactions from previous phase, we can move forward to next phase @@ -318,41 +344,6 @@ void CheckpointManager::MoveToNextPhase() } } -void CheckpointManager::Checkbits() -{ - GetTableManager()->AddTableIdsToList(m_tasksList); - m_numCpTasks = m_tasksList.size(); - while (!m_tasksList.empty()) { - uint32_t curId = m_tasksList.front(); - MOT_LOG_DEBUG("checkbits - %u", curId); - Table* table = GetTableManager()->GetTable(curId); - if (table == nullptr) { - MOT_LOG_ERROR("could not find tableId %u", curId); - continue; - } - - m_tasksList.pop_front(); - m_numCpTasks--; - Index* index = table->GetPrimaryIndex(); - if (index == nullptr) { - MOT_LOG_ERROR("could not get primary index for tableId %u", curId); - continue; - } - - IndexIterator* it = index->Begin(0); - while (it != nullptr && it->IsValid()) { - MOT::Sentinel* Sentinel = it->GetPrimarySentinel(); - MOT::Row* r = Sentinel->GetData(); - if (!r->IsAbsentRow() && Sentinel->GetStableStatus() == m_availableBit) - MOT_LOG_ERROR("CHECKPOINT, AVAILABLE BIT IS SET"); - if (Sentinel->GetStable() != nullptr) - MOT_LOG_ERROR("CHECKPOINT, HAS STABLE DATA!!!"); - it->Next(); - } - } - MOT_LOG_DEBUG("checkbits - done"); -} - const char* CheckpointManager::PhaseToString(CheckpointPhase phase) { switch (phase) { @@ -375,6 +366,7 @@ const char* CheckpointManager::PhaseToString(CheckpointPhase phase) bool CheckpointManager::ApplyWrite(TxnManager* txnMan, Row* origRow, AccessType type) { CheckpointPhase startPhase = txnMan->m_checkpointPhase; + MOT_ASSERT(startPhase != RESOLVE); Sentinel* s = origRow->GetPrimarySentinel(); MOT_ASSERT(s); if (s == nullptr) { @@ -385,32 +377,36 @@ bool CheckpointManager::ApplyWrite(TxnManager* txnMan, Row* origRow, AccessType bool statusBit = s->GetStableStatus(); switch (startPhase) { case REST: - if (type == INS) + if (type == INS) { s->SetStableStatus(!m_availableBit); + } break; case PREPARE: - if (type == INS) + if (type == INS) { s->SetStableStatus(!m_availableBit); - else if (statusBit == !m_availableBit) { - if (!CheckpointUtils::SetStableRow(origRow)) + } else if (statusBit == !m_availableBit) { + if (!CheckpointUtils::SetStableRow(origRow)) { return false; + } } break; case RESOLVE: case CAPTURE: - if (type == INS) + if (type == INS) { s->SetStableStatus(m_availableBit); - else { + } else { if (statusBit == !m_availableBit) { - if (!CheckpointUtils::SetStableRow(origRow)) + if (!CheckpointUtils::SetStableRow(origRow)) { return false; + } s->SetStableStatus(m_availableBit); } } break; case COMPLETE: - if (type == INS) + if (type == INS) { s->SetStableStatus(!txnMan->m_checkpointNABit); + } break; default: MOT_LOG_ERROR("Unknown transaction start phase: %s", CheckpointManager::PhaseToString(startPhase)); @@ -426,22 +422,40 @@ void CheckpointManager::FillTasksQueue() OnError(CheckpointWorkerPool::ErrCodes::CALC, "CheckpointManager::fillTasksQueue: queue is not empty!"); return; } - GetTableManager()->AddTableIdsToList(m_tasksList); + GetTableManager()->AddTablesToList(m_tasksList); m_numCpTasks = m_tasksList.size(); m_mapfileInfo.clear(); MOT_LOG_DEBUG("CheckpointManager::fillTasksQueue:: got %d tasks", m_tasksList.size()); } -void CheckpointManager::TaskDone(uint32_t tableId, uint32_t numSegs, bool success) +void CheckpointManager::UnlockAndClearTables(std::list& tables) +{ + std::list
::iterator it; + for (it = tables.begin(); it != tables.end(); ++it) { + Table *table = *it; + if (table != nullptr) { + table->Unlock(); + } + } + tables.clear(); +} + +void CheckpointManager::TaskDone(Table* table, uint32_t numSegs, bool success) { + MOT_ASSERT(table); if (success) { /* only successful tasks are added to the map file */ + if (table == nullptr) { + OnError(CheckpointWorkerPool::ErrCodes::MEMORY, "Got a null table on task done"); + return; + } MapFileEntry* entry = new (std::nothrow) MapFileEntry(); if (entry != nullptr) { - entry->m_id = tableId; + entry->m_id = table->GetTableId(); entry->m_numSegs = numSegs; - MOT_LOG_DEBUG("TaskDone %lu: %u %u segs", GetId(), tableId, numSegs); - std::lock_guard guard(m_mapfileMutex); + MOT_LOG_DEBUG("TaskDone %lu: %u %u segs", m_inProgressId, entry->m_id, numSegs); + std::lock_guard guard(m_tasksMutex); m_mapfileInfo.push_back(entry); + m_finishedTasks.push_back(table); } else { OnError(CheckpointWorkerPool::ErrCodes::MEMORY, "Failed to allocate map file entry"); return; @@ -453,7 +467,7 @@ void CheckpointManager::TaskDone(uint32_t tableId, uint32_t numSegs, bool succes } } -void CheckpointManager::CompleteCheckpoint(uint64_t checkpointId) +void CheckpointManager::CompleteCheckpoint() { if (m_emptyCheckpoint == true && CreateEmptyCheckpoint() == false) { OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create empty checkpoint"); @@ -466,12 +480,12 @@ void CheckpointManager::CompleteCheckpoint(uint64_t checkpointId) return; } - if (!CreateCheckpointMap(checkpointId)) { + if (!CreateCheckpointMap()) { OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create map file"); return; } - if (!CreateTpcRecoveryFile(checkpointId)) { + if (!CreateTpcRecoveryFile()) { OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create 2pc recovery file"); return; } @@ -481,22 +495,32 @@ void CheckpointManager::CompleteCheckpoint(uint64_t checkpointId) return; } - m_fetchLock.WrLock(); - if (!ctrlFile->Update(checkpointId, GetLsn(), GetLastReplayLsn())) { - OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to update control file"); - return; - } + bool finishedUpdatingFiles = false; + (void)pthread_rwlock_wrlock(&m_fetchLock); + do { + if (!CreateEndFile()) { + OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create completion file"); + break; + } + + if (!ctrlFile->Update(m_inProgressId, GetLsn(), GetLastReplayLsn())) { + OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to update control file"); + break; + } - GetRecoveryManager()->SetCheckpointId(checkpointId); + // Update checkpoint Id + SetId(m_inProgressId); + GetRecoveryManager()->SetCheckpointId(m_id); + finishedUpdatingFiles = true; + } while (0); + (void)pthread_rwlock_unlock(&m_fetchLock); - if (!CreateEndFile(checkpointId)) { - OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create completion file"); + if (!finishedUpdatingFiles) { return; } - m_fetchLock.WrUnlock(); - RemoveOldCheckpoints(checkpointId); - MOT_LOG_INFO("Checkpoint [%lu] completed", checkpointId); + RemoveOldCheckpoints(m_inProgressId); + MOT_LOG_INFO("Checkpoint [%lu] completed", m_inProgressId); } void CheckpointManager::DestroyCheckpointers() @@ -510,18 +534,12 @@ void CheckpointManager::DestroyCheckpointers() void CheckpointManager::CreateCheckpointers() { m_checkpointers = new (std::nothrow) - CheckpointWorkerPool(m_numThreads, !m_availableBit, m_tasksList, m_cpSegThreshold, m_id, *this); + CheckpointWorkerPool(m_numThreads, !m_availableBit, m_tasksList, m_cpSegThreshold, m_inProgressId, *this); } void CheckpointManager::Capture() { MOT_LOG_DEBUG("CheckpointManager::capture"); - if (m_numCpTasks) { - MOT_LOG_ERROR("The number of tasks is %d, cannot start capture!", m_numCpTasks.load()); - return; - } - - FillTasksQueue(); if (m_numCpTasks == 0) { MOT_LOG_INFO("No tasks in queue - empty checkpoint"); @@ -628,7 +646,7 @@ void CheckpointManager::RemoveCheckpointDir(uint64_t checkpointId) free(buf); } -bool CheckpointManager::CreateCheckpointMap(uint64_t checkpointId) +bool CheckpointManager::CreateCheckpointMap() { int fd = -1; std::string fileName; @@ -636,10 +654,11 @@ bool CheckpointManager::CreateCheckpointMap(uint64_t checkpointId) bool ret = false; do { - if (!CheckpointUtils::SetWorkingDir(workingDir, checkpointId)) + if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) { break; + } - CheckpointUtils::MakeMapFilename(fileName, workingDir, checkpointId); + CheckpointUtils::MakeMapFilename(fileName, workingDir, m_inProgressId); if (!CheckpointUtils::OpenFileWrite(fileName, fd)) { MOT_LOG_ERROR("createCheckpointMap: failed to create file '%s' - %d - %s", fileName.c_str(), @@ -704,7 +723,7 @@ bool CheckpointManager::CreateEmptyCheckpoint() { std::string workingDir; - if (!CheckpointUtils::SetWorkingDir(workingDir, m_id)) { + if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) { OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "failed to setup working dir"); return false; } @@ -763,7 +782,7 @@ bool CheckpointManager::CreateCheckpointDir(std::string& dir) return true; } -bool CheckpointManager::CreateTpcRecoveryFile(uint64_t checkpointId) +bool CheckpointManager::CreateTpcRecoveryFile() { int fd = -1; std::string fileName; @@ -771,10 +790,11 @@ bool CheckpointManager::CreateTpcRecoveryFile(uint64_t checkpointId) bool ret = false; do { - if (!CheckpointUtils::SetWorkingDir(workingDir, checkpointId)) + if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) { break; + } - CheckpointUtils::MakeTpcFilename(fileName, workingDir, checkpointId); + CheckpointUtils::MakeTpcFilename(fileName, workingDir, m_inProgressId); if (!CheckpointUtils::OpenFileWrite(fileName, fd)) { MOT_LOG_ERROR("create2PCRecoveryFile: failed to create file '%s' - %d - %s", fileName.c_str(), @@ -819,7 +839,7 @@ bool CheckpointManager::CreateTpcRecoveryFile(uint64_t checkpointId) return ret; } -bool CheckpointManager::CreateEndFile(uint64_t checkpointId) +bool CheckpointManager::CreateEndFile() { int fd = -1; std::string fileName; @@ -827,11 +847,11 @@ bool CheckpointManager::CreateEndFile(uint64_t checkpointId) bool ret = false; do { - if (!CheckpointUtils::SetWorkingDir(workingDir, checkpointId)) { + if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) { break; } - CheckpointUtils::MakeEndFilename(fileName, workingDir, checkpointId); + CheckpointUtils::MakeEndFilename(fileName, workingDir, m_inProgressId); if (!CheckpointUtils::OpenFileWrite(fileName, fd)) { MOT_LOG_ERROR( "CreateEndFile: failed to create file '%s' - %d - %s", fileName.c_str(), errno, gs_strerror(errno)); diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h index 2c33ce5b447a8a07b3d7aa8e61e7acfae14992aa..d608d8b8c1a2683e59107815cf2907015e15c9df 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.h @@ -27,6 +27,7 @@ #include #include +#include #include "rw_lock.h" #include "global.h" #include "txn.h" @@ -48,10 +49,7 @@ public: virtual ~CheckpointManager(); - void SetValidation(bool val) - { - m_checkpointValidation = val; - } + bool Initialize(); /** * @brief Starts an MOT checkpoint snapshot operation. @@ -112,11 +110,11 @@ public: /** * @brief Checkpoint task completion callback * @param checkpointId The checkpoint's id. - * @param tableId The table's id. + * @param table The table's pointer. * @param numSegs number of segments written. * @param success Indicates a success or a failure. */ - virtual void TaskDone(uint32_t tableId, uint32_t numSegs, bool success); + virtual void TaskDone(Table* table, uint32_t numSegs, bool success); virtual bool ShouldStop() const { @@ -178,12 +176,12 @@ public: void FetchRdLock() { - m_fetchLock.RdLock(); + (void)pthread_rwlock_rdlock(&m_fetchLock); } void FetchRdUnlock() { - m_fetchLock.RdUnlock(); + (void)pthread_rwlock_unlock(&m_fetchLock); } bool GetCheckpointDirName(std::string& dirName); @@ -209,14 +207,17 @@ private: volatile CheckpointPhase m_phase; // NA 'bit' handling - volatile std::atomic_bool m_availableBit; + std::atomic_bool m_availableBit; // Counts the number of table ids that we are checkpointing. // When this reaches 0, the checkpoint is complete; std::atomic m_numCpTasks; - // Holds table IDs to checkpoint - std::list m_tasksList; + // Holds tables to checkpoint to be passed to the checkpoint threads + std::list m_tasksList; + + // Holds finished (checkpointed) tables that can be released by the main thread + std::list m_finishedTasks; // the checkpoint workers pool CheckpointWorkerPool* m_checkpointers = nullptr; @@ -224,11 +225,8 @@ private: // Number of threads to run int m_numThreads; - // Enable checkpoint validation - checkbits - bool m_checkpointValidation; - - // Checkpoint map file information - std::mutex m_mapfileMutex; + // mutex for safeguarding mapfile and tasks queues access + std::mutex m_tasksMutex; std::list m_mapfileInfo; @@ -259,16 +257,19 @@ private: // Envelope's checkpoint lsn uint64_t m_lsn; - // Current Checkpoint's ID + // Last Valid (completed) Checkpoint ID uint64_t m_id; + // Current (in-progress) Checkpoint ID + uint64_t m_inProgressId; + // last seen recovery lsn uint64_t m_lastReplayLsn; bool m_emptyCheckpoint; // this lock guards gs_ctl checkpoint fetching - RwLock m_fetchLock; + pthread_rwlock_t m_fetchLock; void SetId(uint64_t id) { @@ -297,7 +298,7 @@ private: static const char* PhaseToString(CheckpointPhase phase); - void SwapAvailableAndNotAvailable() + inline void SwapAvailableAndNotAvailable() { m_availableBit = !m_availableBit; } @@ -309,12 +310,11 @@ private: bool CreateEmptyCheckpoint(); /** - * @brief Performs a checkpoint completion tasks: + * @brief Performs checkpoint completion tasks: * updates control file, creates the map file * and 2pc recovery file - * @param checkpointId The checkpoint's id. */ - void CompleteCheckpoint(uint64_t checkpointId); + void CompleteCheckpoint(); /** * @brief Performs the checkpoint's Capture phase @@ -327,6 +327,12 @@ private: */ void FillTasksQueue(); + /** + * @brief Unlocks tables and clear the tables' list + * @param tables Tables list to clear + */ + void UnlockAndClearTables(std::list
& tables); + /** * @brief Destroys all the checkpoint threads */ @@ -357,35 +363,25 @@ private: return false; } - /** - * @brief A utility function to validate the checkpoint. - * should not be enabled by default since it can - * have an impact on checkpoint's completion time - */ - void Checkbits(); - /** * @brief Creates the checkpoint's map file - where all * the metadata is stored in - * @param id The new checkpoint id * @return Boolean value denoting success or failure. */ - bool CreateCheckpointMap(uint64_t checkpointId); + bool CreateCheckpointMap(); /** * @brief Saves the in-process transaction data for 2pc recovery * purposes during the checkpoint. - * @param id The new checkpoint id * @return Boolean value denoting success or failure. */ - bool CreateTpcRecoveryFile(uint64_t checkpointId); + bool CreateTpcRecoveryFile(); /** * @brief Creates a file that indicates checkpoint completion. - * @param id The checkoint id * @return Boolean value denoting success or failure. */ - bool CreateEndFile(uint64_t checkpointId); + bool CreateEndFile(); void ResetFlags(); diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp index 1eba8bd63609be85d1fa84ba977a43ac40ebe62e..af1189d7b4e6ede60b2fc54a16544832f21fb3c7 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp @@ -146,11 +146,9 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, do { if (statusBit == !m_na) { /* has stable version */ - if (!deleted && stableRow == nullptr) - break; - if (deleted && stableRow == nullptr) + if (stableRow == nullptr) { break; - if (stableRow != nullptr) { + } else { if (!Write(buffer, stableRow, fd)) { wrote = -1; } else { @@ -172,13 +170,13 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, break; } sentinel->SetStableStatus(!m_na); - if (!Write(buffer, mainRow, fd)) + if (!Write(buffer, mainRow, fd)) { wrote = -1; // we failed to write, set error - else + } else { wrote = 1; + } break; - } - if (stableRow != nullptr) { /* should not happen! */ + } else { /* should not happen! */ wrote = -1; m_cpManager.OnError(ErrCodes::CALC, "Calc logic error - stable row"); } @@ -190,19 +188,18 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, return wrote; } -bool CheckpointWorkerPool::GetTask(uint32_t& task) +Table* CheckpointWorkerPool::GetTask() { - bool ret = false; + Table* table = nullptr; + m_tasksLock.lock(); do { - m_tasksLock.lock(); if (m_tasksList.empty()) break; - task = m_tasksList.front(); + table = m_tasksList.front(); m_tasksList.pop_front(); - ret = true; } while (0); m_tasksLock.unlock(); - return ret; + return table; } void CheckpointWorkerPool::WorkerFunc() @@ -232,22 +229,17 @@ void CheckpointWorkerPool::WorkerFunc() bool taskSucceeded = false; Table* table = nullptr; - if (m_cpManager.ShouldStop()) + if (m_cpManager.ShouldStop()) { break; + } - bool haveWork = GetTask(tableId); - if (haveWork) { + table = GetTask(); + if (table != nullptr) { int fd = -1; do { uint32_t overallOps = 0; - table = GetTableManager()->GetTableSafe(tableId); - if (table == nullptr) { - MOT_LOG_INFO( - "CheckpointWorkerPool::workerFunc:Table %u does not exist - probably deleted already", tableId); - break; - } - + tableId = table->GetTableId(); exId = table->GetTableExId(); size_t tableSize = table->SerializeSize(); char* tableBuf = new (std::nothrow) char[tableSize]; @@ -438,16 +430,7 @@ void CheckpointWorkerPool::WorkerFunc() } } - if (table != nullptr) { - table->Unlock(); - m_cpManager.TaskDone(tableId, seg, taskSucceeded); - } else { - /* taskSucceeded is false, so this table won't be added to the map file. */ - m_cpManager.TaskDone(tableId, seg, taskSucceeded); - - /* Table is dropped, but we need to continue processing other tables. */ - taskSucceeded = true; - } + m_cpManager.TaskDone(table, seg, taskSucceeded); if (!taskSucceeded) { break; diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h index e490d502cbbf7d4547df58ff108a1b871b985e7a..381a0818b687190f1c9a6baee650b74e35212a3a 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h @@ -47,11 +47,11 @@ public: /** * @brief Checkpoint task completion callback * @param checkpointId The checkpoint's id. - * @param tableId The table's id. + * @param table The table's pointer. * @param numSegs number of segments written. * @param success Indicates a success or a failure. */ - virtual void TaskDone(uint32_t tableId, uint32_t numSegs, bool success) = 0; + virtual void TaskDone(Table* table, uint32_t numSegs, bool success) = 0; /** * @brief Checks if the thread should terminate it work @@ -77,7 +77,7 @@ public: */ class CheckpointWorkerPool { public: - CheckpointWorkerPool(int n, bool b, std::list& l, uint32_t s, uint64_t id, CheckpointManagerCallbacks& m) + CheckpointWorkerPool(int n, bool b, std::list& l, uint32_t s, uint64_t id, CheckpointManagerCallbacks& m) : m_numWorkers(n), m_tasksList(l), m_checkpointId(id), m_na(b), m_cpManager(m), m_checkpointSegsize(s) { Start(); @@ -117,10 +117,10 @@ private: int Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid); /** - * @brief Pops a task (table id) from the tasks queue. - * @return true if a task was fetched, false if the queue was empty. + * @brief Pops a task (table pointer) from the tasks queue. + * @return the address of the pop'd table, or nullptr if the queue was empty. */ - bool GetTask(uint32_t& task); + Table* GetTask(); /** * @brief Creates a checkpoint id for the current checkpoint @@ -153,8 +153,8 @@ private: volatile std::atomic m_numWorkers; - // Holds table IDs to checkpoint - std::list& m_tasksList; + // Holds tables to checkpoint + std::list& m_tasksList; // Guards tasksList pops std::mutex m_tasksLock; diff --git a/src/gausskernel/storage/mot/core/src/system/common/session_manager.cpp b/src/gausskernel/storage/mot/core/src/system/common/session_manager.cpp index 338e3e03272c8f73a182ec146a8a867ec083ea9c..09643202ed77e340f9d43adc84a91a0037f02d03 100644 --- a/src/gausskernel/storage/mot/core/src/system/common/session_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/common/session_manager.cpp @@ -38,7 +38,7 @@ DECLARE_LOGGER(SessionManager, System) static void PrintActiveSession(SessionId sessionId, SessionContext* sessionContext) { - MOT_LOG_ERROR("Still active session: %u", sessionId); + MOT_LOG_WARN("Still active session: %u", sessionId); } bool SessionManager::Initialize(uint32_t nodeCount, uint32_t threadCount) @@ -359,7 +359,7 @@ void SessionManager::DestroySessionContext(SessionContext* sessionContext) void SessionManager::ReportActiveSessions() { if (!m_sessionContextMap.empty()) { - MOT_LOG_PANIC("Attempting to Destroy MOT Engine while there are still %u active sessions", + MOT_LOG_WARN("Attempting to Destroy MOT Engine while there are still %u active sessions", (unsigned)m_sessionContextMap.size()); m_sessionContextMap.for_each(PrintActiveSession); } diff --git a/src/gausskernel/storage/mot/core/src/system/common/table_manager.cpp b/src/gausskernel/storage/mot/core/src/system/common/table_manager.cpp index aaf3890884ee77593fff291cd7c346ead2663e6a..08d4c6c4b60fd1c6a2d130a92e24247519f25505 100644 --- a/src/gausskernel/storage/mot/core/src/system/common/table_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/common/table_manager.cpp @@ -25,7 +25,7 @@ #include "table_manager.h" namespace MOT { -IMPLEMENT_CLASS_LOGGER(TableManager, System) +IMPLEMENT_CLASS_LOGGER(TableManager, System); bool TableManager::AddTable(Table* table) { @@ -33,7 +33,6 @@ bool TableManager::AddTable(Table* table) "Adding table %s with external id: %" PRIu64, table->GetLongTableName().c_str(), table->GetTableExId()); m_rwLock.WrLock(); InternalTableMap::iterator it = m_tablesById.find(table->GetTableId()); - if (it != m_tablesById.end()) { m_rwLock.WrUnlock(); MOT_LOG_ERROR( @@ -54,7 +53,7 @@ void TableManager::ClearTablesThreadMemoryCache() InternalTableMap::iterator it = m_tablesById.begin(); while (it != m_tablesById.end()) { // lock table to prevent concurrent ddl and truncate operations - it->second->Lock(); + it->second->RdLock(); it->second->ClearThreadMemoryCache(); it->second->Unlock(); it++; diff --git a/src/gausskernel/storage/mot/core/src/system/common/table_manager.h b/src/gausskernel/storage/mot/core/src/system/common/table_manager.h index ef77c4f69aa06f03cb8db1a349b66241de59a830..f2932054a6d3c95d7408767e9ae0a7640a836f8e 100644 --- a/src/gausskernel/storage/mot/core/src/system/common/table_manager.h +++ b/src/gausskernel/storage/mot/core/src/system/common/table_manager.h @@ -63,11 +63,13 @@ public: */ inline RC DropTable(Table* table, SessionContext* sessionContext) { + if (table == nullptr) { + return RC_ERROR; + } + MOT_LOG_INFO("Dropping table %s", table->GetLongTableName().c_str()); RC status = DropTableInternal(table, sessionContext); - if (table != nullptr) { - delete table; - } + delete table; return status; } @@ -82,7 +84,6 @@ public: Table* table = nullptr; m_rwLock.RdLock(); InternalTableMap::iterator it = m_tablesById.find(tableId); - if (it != m_tablesById.end()) table = it->second; @@ -93,20 +94,19 @@ public: /** * @brief Retrieves a locked table from the engine. Caller is responsible for unlocking the table when done using * it, by calling @ref Table::Unlock. - * @param tableId The internal (engine-given) identifier of the table to retrieve. + * @param tableId The external (envelope) identifier of the table to retrieve. * @return The table object or null pointer if not found. * @note It is assumed that the envelope guards against concurrent removal of the table. */ - inline Table* GetTableSafe(InternalTableId tableId) + inline Table* GetTableSafeByExId(ExternalTableId tableId) { Table* table = nullptr; m_rwLock.RdLock(); - InternalTableMap::iterator it = m_tablesById.find(tableId); - - if (it != m_tablesById.end()) { + ExternalTableMap::iterator it = m_tablesByExId.find(tableId); + if (it != m_tablesByExId.end()) { table = it->second; if (table != nullptr) { - table->Lock(); + table->RdLock(); } } m_rwLock.RdUnlock(); @@ -124,10 +124,10 @@ public: InternalTableId internalId, ExternalTableId externalId, const std::string& name, const std::string& longName) { bool ret = false; - Table* table = GetTableSafe(internalId); + Table* table = GetTableSafeByExId(externalId); if (table != nullptr) { ret = ((table->GetTableName() == name) && (table->GetLongTableName() == longName) && - (table->GetTableExId() == externalId)); + (table->GetTableId() == internalId)); table->Unlock(); } return ret; @@ -179,18 +179,20 @@ public: } /** - * @brief Copies the internal identifiers of all tables into a list. - * @param[out] idQueue Receives all the table identifiers. - * @return The number of table identifiers copied. + * @brief Adds the pointers of all tables into a list. + * @param[out] tablesQueue Receives all the tables. + * @return The number of tables added. */ - inline uint32_t AddTableIdsToList(std::list& idQueue) + inline uint32_t AddTablesToList(std::list& tablesQueue) { m_rwLock.RdLock(); for (InternalTableMap::iterator it = m_tablesById.begin(); it != m_tablesById.end(); ++it) { - idQueue.push_back(it->first); + tablesQueue.push_back(it->second); + // lock the table, so it won't get deleted/truncated + it->second->RdLock(); } m_rwLock.RdUnlock(); - return (uint32_t)idQueue.size(); + return (uint32_t)tablesQueue.size(); } /** @brief Clears all object-pool table caches for the current thread. */ diff --git a/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp b/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp index 04fe9c1292e95ca44b5e06fb5071f3f79500ceee..fba1bb10a49cdc34a096196e7c623e0bcf25a54a 100644 --- a/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp +++ b/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp @@ -50,7 +50,6 @@ constexpr const char* MOTConfiguration::DEFAULT_CHECKPOINT_DIR; constexpr const char* MOTConfiguration::DEFAULT_CHECKPOINT_SEGSIZE; constexpr uint32_t MOTConfiguration::DEFAULT_CHECKPOINT_SEGSIZE_BYTES; constexpr uint32_t MOTConfiguration::DEFAULT_CHECKPOINT_WORKERS; -constexpr bool MOTConfiguration::DEFAULT_VALIDATE_CHECKPOINT; // recovery configuration members constexpr uint32_t MOTConfiguration::DEFAULT_CHECKPOINT_RECOVERY_WORKERS; constexpr bool MOTConfiguration::DEFAULT_ENABLE_LOG_RECOVERY_STATS; @@ -390,7 +389,6 @@ MOTConfiguration::MOTConfiguration() m_checkpointDir(DEFAULT_CHECKPOINT_DIR), m_checkpointSegThreshold(DEFAULT_CHECKPOINT_SEGSIZE_BYTES), m_checkpointWorkers(DEFAULT_CHECKPOINT_WORKERS), - m_validateCheckpoint(DEFAULT_VALIDATE_CHECKPOINT), m_checkpointRecoveryWorkers(DEFAULT_CHECKPOINT_RECOVERY_WORKERS), m_abortBufferEnable(true), m_preAbort(true), @@ -485,7 +483,6 @@ bool MOTConfiguration::SetFlag(const std::string& name, const std::string& value } else if (ParseString(name, "checkpoint_dir", value, &m_checkpointDir)) { } else if (ParseUint32(name, "checkpoint_segsize", value, &m_checkpointSegThreshold)) { } else if (ParseUint32(name, "checkpoint_workers", value, &m_checkpointWorkers)) { - } else if (ParseBool(name, "validate_checkpoint", value, &m_validateCheckpoint)) { } else if (ParseUint32(name, "checkpoint_recovery_workers", value, &m_checkpointRecoveryWorkers)) { } else if (ParseBool(name, "abort_buffer_enable", value, &m_abortBufferEnable)) { } else if (ParseBool(name, "pre_abort", value, &m_preAbort)) { @@ -647,7 +644,6 @@ void MOTConfiguration::LoadConfig() UPDATE_STRING_CFG(m_checkpointDir, "checkpoint_dir", DEFAULT_CHECKPOINT_DIR); UPDATE_MEM_CFG(m_checkpointSegThreshold, "checkpoint_segsize", DEFAULT_CHECKPOINT_SEGSIZE, 1); UPDATE_INT_CFG(m_checkpointWorkers, "checkpoint_workers", DEFAULT_CHECKPOINT_WORKERS); - UPDATE_CFG(m_validateCheckpoint, "validate_checkpoint", DEFAULT_VALIDATE_CHECKPOINT); // Recovery configuration UPDATE_INT_CFG(m_checkpointRecoveryWorkers, "checkpoint_recovery_workers", DEFAULT_CHECKPOINT_RECOVERY_WORKERS); diff --git a/src/gausskernel/storage/mot/core/src/system/mot_configuration.h b/src/gausskernel/storage/mot/core/src/system/mot_configuration.h index 0765df1aec5b60499bf896c8d62170aa0e845384..ac6ea789dc4e0dd7ccc9d5fc394ab62662d7ddb1 100644 --- a/src/gausskernel/storage/mot/core/src/system/mot_configuration.h +++ b/src/gausskernel/storage/mot/core/src/system/mot_configuration.h @@ -153,9 +153,6 @@ public: /** @var number of worker threads to spawn to perform checkpoint. */ uint32_t m_checkpointWorkers; - /** @var Do checkpoints bit validations - use it for debugging only */ - bool m_validateCheckpoint; - /**********************************************************************/ // Recovery configuration /**********************************************************************/ @@ -430,9 +427,6 @@ private: /** @var Default number of worker threads to spawn */ static constexpr uint32_t DEFAULT_CHECKPOINT_WORKERS = 3; - /** @var Default enable checkpoint validation. */ - static constexpr bool DEFAULT_VALIDATE_CHECKPOINT = false; - // default recovery configuration /** @var Default number of workers used in recovery from checkpoint. */ static constexpr uint32_t DEFAULT_CHECKPOINT_RECOVERY_WORKERS = 3; diff --git a/src/gausskernel/storage/mot/core/src/system/mot_engine.cpp b/src/gausskernel/storage/mot/core/src/system/mot_engine.cpp index 374911071981a0bf93f27e32d9ce14f58960129d..4dd2285cf644481a2de2e3d7bb9ecfcb36c15cb2 100644 --- a/src/gausskernel/storage/mot/core/src/system/mot_engine.cpp +++ b/src/gausskernel/storage/mot/core/src/system/mot_engine.cpp @@ -367,11 +367,6 @@ bool MOTEngine::InitializeCoreServices() CHECK_INIT_STATUS(result, "Failed to Initialize garbage collection sub-system"); m_initCoreStack.push(INIT_GC_PHASE); - int rc = pthread_mutex_init(&m_DDLCheckpointGuard, nullptr); - result = (rc == 0); - CHECK_SYS_INIT_STATUS(rc, pthread_mutex_init, "Failed to Initialize global DDL lock"); - m_initCoreStack.push(INIT_DDL_LOCK_PHASE); - result = InitializeDebugUtils(); CHECK_INIT_STATUS(result, "Failed to Initialize debug utilities"); m_initCoreStack.push(INIT_DEBUG_UTILS); @@ -454,10 +449,6 @@ void MOTEngine::DestroyCoreServices() DestroyDebugUtils(); break; - case INIT_DDL_LOCK_PHASE: - pthread_mutex_destroy(&m_DDLCheckpointGuard); - break; - case INIT_GC_PHASE: break; @@ -771,6 +762,13 @@ bool MOTEngine::InitializeCheckpointManager() return false; } + if (!m_checkpointManager->Initialize()) { + MOT_REPORT_ERROR(MOT_ERROR_OOM, "MOT Engine Startup", "Failed to initialize checkpoint manager"); + delete m_checkpointManager; + m_checkpointManager = nullptr; + return false; + } + MOT_LOG_INFO("Startup: Checkpoint manager initialized successfully"); return true; } diff --git a/src/gausskernel/storage/mot/core/src/system/mot_engine.h b/src/gausskernel/storage/mot/core/src/system/mot_engine.h index 290d549763d34c8e7e66fd46530137772a4031b1..e5492df8f341dc332d8b27dae3cb7cce402f5534 100644 --- a/src/gausskernel/storage/mot/core/src/system/mot_engine.h +++ b/src/gausskernel/storage/mot/core/src/system/mot_engine.h @@ -272,21 +272,6 @@ public: return nullptr; } - int LockDDLForCheckpoint() - { - return pthread_mutex_lock(&m_DDLCheckpointGuard); - } - - int TryLockDDLForCheckpoint() - { - return pthread_mutex_trylock(&m_DDLCheckpointGuard); - } - - int UnlockDDLForCheckpoint() - { - return pthread_mutex_unlock(&m_DDLCheckpointGuard); - } - /** * @brief Order the engine to write all of its redo log records to the log file. */ @@ -524,9 +509,6 @@ private: /** @var The commit sequence number handler (CSN). */ CSNManager m_csnManager; - /** ddl <> checkpoint sync */ - pthread_mutex_t m_DDLCheckpointGuard; - /** @var Global flag for soft memory limit. */ uint32_t m_softMemoryLimitReached; @@ -575,7 +557,6 @@ private: INIT_TABLE_MANAGER_PHASE, INIT_SURROGATE_KEY_MANAGER_PHASE, INIT_GC_PHASE, - INIT_DDL_LOCK_PHASE, INIT_DEBUG_UTILS, INIT_CORE_DONE }; diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp index 1b839637358b4c71bbf4892b4e9c007fb9ddcffc..62ec7849da9ee0c811a7512e9e7a6898299b779e 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp @@ -570,7 +570,7 @@ void TxnManager::WriteDDLChanges() case DDL_ACCESS_TRUNCATE_TABLE: indexes = (Index**)ddl_access->GetEntry(); table = indexes[0]->GetTable(); - table->Lock(); + table->WrLock(); table->m_rowCount = 0; for (int i = 0; i < table->GetNumIndexes(); i++) { index = indexes[i]; @@ -589,7 +589,7 @@ void TxnManager::WriteDDLChanges() if (index->IsPrimaryKey()) break; table = index->GetTable(); - table->Lock(); + table->WrLock(); table->RemoveSecondaryIndex((char*)index->GetName().c_str(), this); table->Unlock(); break; @@ -1232,7 +1232,7 @@ RC TxnManager::CreateIndex(Table* table, Index* index, bool is_primary) // is should only be added on successful commit. Assuming that if // a client did a create index, all other clients are waiting on a lock // until the changes are either commited or aborted - table->Lock(); // for concurrent access + table->WrLock(); // for concurrent access if (table->GetNumIndexes() == MAX_NUM_INDEXES) { table->Unlock(); MOT_REPORT_ERROR(MOT_ERROR_RESOURCE_LIMIT, diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn.h b/src/gausskernel/storage/mot/core/src/system/transaction/txn.h index ff0f33111232d6f770b1cd80f9f0701032b23f8d..d7eb1b27273c09fabf1dd60e8fff44540300d874 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn.h @@ -532,10 +532,10 @@ private: GcManager* m_gcSession; /** @var Checkpoint phase captured during transaction start. */ - CheckpointPhase m_checkpointPhase; + volatile CheckpointPhase m_checkpointPhase; /** @var Checkpoint not available capture during being transaction. */ - bool m_checkpointNABit; + volatile bool m_checkpointNABit; /** @var CSN taken at the commit stage. */ uint64_t m_csn; diff --git a/src/gausskernel/storage/mot/core/src/utils/utilities.h b/src/gausskernel/storage/mot/core/src/utils/utilities.h index e4fc40edc964a561133f018c942dd6b93fb8bee4..09385cc8e0ffe0bd26d7acc099d9f3eb7c5b7e22 100644 --- a/src/gausskernel/storage/mot/core/src/utils/utilities.h +++ b/src/gausskernel/storage/mot/core/src/utils/utilities.h @@ -76,7 +76,7 @@ std::string HexStr(const uint8_t* data, uint16_t len); #define HIGH_NIBBLE(byte) (((byte) >> 4) & 0x0F) /** @define Low nibble of a byte. */ -#define LOW_NIBBLE(byte) ((byte)&0x0F) +#define LOW_NIBBLE(byte) ((byte) & 0x0F) /** @define Compile-time conversion of identifier to string literal. */ #define stringify(name) #name diff --git a/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.cpp b/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.cpp index 7301b7406e9bd9ffbcfdb739c0bf3acbbee3aa8b..9b5a3dd9fc480ba28eb04802ee039e9980509e17 100644 --- a/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.cpp +++ b/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.cpp @@ -1497,7 +1497,7 @@ static MOT::RC TableFieldType(const ColumnDef* colDef, MOT::MOT_CATALOG_FIELD_TY MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) { - MOT::RC rc = MOT::RC_OK; + MOT::RC res; EnsureSafeThreadAccessInline(); MOT::TxnManager* txn = GetSafeTxn(); txn->SetTransactionId(tid); @@ -1508,7 +1508,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) (errmodule(MOD_MM), errcode(ERRCODE_UNDEFINED_TABLE), errmsg("Table not found for oid %u", index->relation->foreignOid))); - return MOT::RC_OK; + return MOT::RC_ERROR; } if (table->GetNumIndexes() == MAX_NUM_INDEXES) { @@ -1516,7 +1516,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) (errmodule(MOD_MM), errcode(ERRCODE_FDW_TOO_MANY_INDEXES), errmsg("Can not create index, max number of indexes %u reached", MAX_NUM_INDEXES))); - return MOT::RC_OK; + return MOT::RC_ERROR; } elog(LOG, @@ -1537,7 +1537,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) flavor = MOT::GetGlobalConfiguration().m_indexTreeFlavor; } else { ereport(ERROR, (errmodule(MOD_MM), errmsg("MOT supports indexes of type BTREE only (btree or btree_art)"))); - return MOT::RC_OK; + return MOT::RC_ERROR; } if (list_length(index->indexParams) > (int)MAX_KEY_COLUMNS) { @@ -1547,7 +1547,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) errmsg("Can't create index"), errdetail( "Number of columns exceeds %d max allowed %u", list_length(index->indexParams), MAX_KEY_COLUMNS))); - return MOT::RC_OK; + return MOT::RC_ERROR; } // check if we have primary and delete previous definition @@ -1576,7 +1576,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) errcode(ERRCODE_INVALID_COLUMN_DEFINITION), errmsg("Can't create index on field"), errdetail("Specified column not found in table definition"))); - return MOT::RC_OK; + return MOT::RC_ERROR; } MOT::Column* col = table->GetField(colid); @@ -1589,7 +1589,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) errcode(ERRCODE_FDW_INDEX_ON_NULLABLE_COLUMN_NOT_ALLOWED), errmsg("Can't create index on nullable columns"), errdetail("Column %s is nullable", col->m_name))); - return MOT::RC_OK; + return MOT::RC_ERROR; } // Temp solution, we have to support DECIMAL and NUMERIC indexes as well @@ -1600,7 +1600,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Can't create index on field"), errdetail("INDEX on NUMERIC or DECIMAL fields not supported yet"))); - return MOT::RC_OK; + return MOT::RC_ERROR; } if (col->m_keySize > MAX_KEY_SIZE) { delete ix; @@ -1609,7 +1609,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) errcode(ERRCODE_INVALID_COLUMN_DEFINITION), errmsg("Can't create index on field"), errdetail("Column size is greater than maximum index size"))); - return MOT::RC_OK; + return MOT::RC_ERROR; } keyLength += col->m_keySize; @@ -1619,13 +1619,13 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid) ix->SetNumIndexFields(count); - if ((rc = ix->IndexInit(keyLength, index->unique, index->idxname, nullptr)) != MOT::RC_OK) { + if ((res = ix->IndexInit(keyLength, index->unique, index->idxname, nullptr)) != MOT::RC_OK) { delete ix; - report_pg_error(rc, txn); - return rc; + report_pg_error(res, txn); + return res; } - MOT::RC res = txn->CreateIndex(table, ix, index->primary); + res = txn->CreateIndex(table, ix, index->primary); if (res != MOT::RC_OK) { delete ix; if (res == MOT::RC_TABLE_EXCEEDS_MAX_INDEXES) { @@ -1670,10 +1670,13 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t // prepare table name dbname = get_database_name(u_sess->proc_cxt.MyDatabaseId); if (dbname == nullptr) { + delete currentTable; + currentTable = nullptr; ereport(ERROR, (errmodule(MOD_MM), errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database with OID %u does not exist", u_sess->proc_cxt.MyDatabaseId))); + break; } tname.append(dbname); tname.append("_"); @@ -1686,11 +1689,23 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t tname.append("_"); tname.append(table->base.relation->relname); - currentTable->Init(table->base.relation->relname, tname.c_str(), columnCount, table->base.relation->foreignOid); + if (!currentTable->Init( + table->base.relation->relname, tname.c_str(), columnCount, table->base.relation->foreignOid)) { + delete currentTable; + currentTable = nullptr; + report_pg_error(MOT::RC_MEMORY_ALLOCATION_ERROR, txn); + break; + } // the null fields are copied verbatim because we have to give them back at some point - currentTable->AddColumn( + res = currentTable->AddColumn( "null_bytes", BITMAPLEN(columnCount - 1), MOT::MOT_CATALOG_FIELD_TYPES::MOT_TYPE_NULLBYTES); + if (res != MOT::RC_OK) { + delete currentTable; + currentTable = nullptr; + report_pg_error(MOT::RC_MEMORY_ALLOCATION_ERROR, txn); + break; + } ListCell* cell; @@ -1701,6 +1716,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t ColumnDef* colDef = (ColumnDef*)lfirst(cell); if (colDef == nullptr || colDef->typname == nullptr) { + delete currentTable; + currentTable = nullptr; ereport(ERROR, (errmodule(MOD_MM), errcode(ERRCODE_INVALID_COLUMN_DEFINITION), @@ -1711,6 +1728,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t res = TableFieldType(colDef, colType, &typeLen, isBlob); if (res != MOT::RC_OK) { + delete currentTable; + currentTable = nullptr; report_pg_error(res, txn, colDef, (void*)(int64)typeLen); break; } @@ -1763,6 +1782,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t } res = currentTable->AddColumn(colDef->colname, typeLen, colType, colDef->is_not_null); if (res != MOT::RC_OK) { + delete currentTable; + currentTable = nullptr; report_pg_error(res, txn, colDef, (void*)(int64)typeLen); break; } @@ -1776,6 +1797,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t uint32_t tupleSize = currentTable->GetTupleSize(); if (tupleSize > (unsigned int)MAX_TUPLE_SIZE) { + delete currentTable; + currentTable = nullptr; ereport(ERROR, (errmodule(MOD_MM), errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -1787,6 +1810,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t } if (!currentTable->InitRowPool()) { + delete currentTable; + currentTable = nullptr; report_pg_error(MOT::RC_MEMORY_ALLOCATION_ERROR, txn); break; } @@ -1806,6 +1831,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t rc, nullptr); if (rc != MOT::RC_OK) { + delete currentTable; + currentTable = nullptr; report_pg_error(rc, txn); break; } @@ -1891,11 +1918,6 @@ MOT::RC MOTAdaptor::TruncateTable(Relation rel, ::TransactionId tid) EnsureSafeThreadAccessInline(); - if (DdlCpTryLock() != 0) { - elog(LOG, "Cannot perform: truncateTable, a checkpoint is in progress"); - return MOT::RC_NA; - } - MOT::TxnManager* txn = GetSafeTxn(); txn->SetTransactionId(tid); @@ -1907,10 +1929,11 @@ MOT::RC MOTAdaptor::TruncateTable(Relation rel, ::TransactionId tid) break; } + tab->WrLock(); res = txn->TruncateTable(tab); + tab->Unlock(); } while (0); - DdlCpUnlock(); return res; } @@ -1919,23 +1942,20 @@ MOT::RC MOTAdaptor::VacuumTable(Relation rel, ::TransactionId tid) MOT::RC res = MOT::RC_OK; MOT::Table* tab = nullptr; EnsureSafeThreadAccessInline(); - MOT::MOTEngine::GetInstance()->LockDDLForCheckpoint(); MOT::TxnManager* txn = GetSafeTxn(); txn->SetTransactionId(tid); elog(LOG, "vacuuming table %s, oid: %u", NameStr(rel->rd_rel->relname), rel->rd_id); do { - tab = txn->GetTableByExternalId(rel->rd_id); + tab = MOT::GetTableManager()->GetTableSafeByExId(rel->rd_id); if (tab == nullptr) { elog(LOG, "Vacuum table %s error, table oid %u not found.", NameStr(rel->rd_rel->relname), rel->rd_id); break; } - tab->Lock(); tab->Compact(txn); tab->Unlock(); } while (0); - MOT::MOTEngine::GetInstance()->UnlockDDLForCheckpoint(); return res; } @@ -2436,25 +2456,6 @@ void MOTAdaptor::DatumToMOTKey( } } -// ddl <> checkpoint sync -int MOTAdaptor::DdlCpTryLock() -{ - EnsureSafeThreadAccessInline(); - if (m_engine != nullptr) { - return m_engine->TryLockDDLForCheckpoint(); - } - return 0; -} - -int MOTAdaptor::DdlCpUnlock() -{ - EnsureSafeThreadAccessInline(); - if (m_engine != nullptr) { - return m_engine->UnlockDDLForCheckpoint(); - } - return 0; -} - bool MatchIndex::IsSameOper(KEY_OPER op1, KEY_OPER op2) const { bool res = true; diff --git a/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.h b/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.h index dc34fbe585dac68ccf42369a0cf33d2b113174b3..fe6fe62440f9e2613b1e681154cd5e110bb567aa 100644 --- a/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.h +++ b/src/gausskernel/storage/mot/fdw_adapter/src/mot_internal.h @@ -402,10 +402,6 @@ public: MOTFdwStateSt* festate, MatchIndexArr* marr, int numClauses, bool setLocal = true); inline static int32_t AddParam(List** params, Expr* expr); - // ddl <> checkpoint sync - static int DdlCpTryLock(); - static int DdlCpUnlock(); - static MOT::MOTEngine* m_engine; static bool m_initialized; static bool m_callbacks_initialized;