diff --git a/src/gausskernel/storage/mot/core/src/mot.conf b/src/gausskernel/storage/mot/core/src/mot.conf index f83c48834cd96fabc6f59ff710fb56d8275c723a..3f1ebbc58ef15e72bac7cf560677f23805ae7b94 100644 --- a/src/gausskernel/storage/mot/core/src/mot.conf +++ b/src/gausskernel/storage/mot/core/src/mot.conf @@ -41,7 +41,7 @@ #enable_redo_log = true # Specifies whether to use group commit. -# This option is relevant only when GaussDB is configured to use synchronous commit (i.e. when +# This option is relevant only when openGauss is configured to use synchronous commit (i.e. when # postgresql.conf has synchronous_commit configured to any value other than 'off'). #enable_group_commit = false @@ -54,6 +54,13 @@ #group_commit_size = 16 #group_commit_timeout = 10 ms +# Specifies the number of redo log buffers to use for asynchronous commit mode. +# Allowed range of values for this configuration is [8, 128]. The size of one buffer is 128 MB. +# This option is relevant only when openGauss is configured to use asynchronous commit (i.e. when +# postgresql.conf has synchronous_commit configured to 'off'). +# +#async_log_buffer_count = 24 + #------------------------------------------------------------------------------ # CHECKPOINT #------------------------------------------------------------------------------ diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp index 47b1d2a9c77d97995bfde1005d4706671d93d840..ad47c05bf8e68d3210483b96c574fe067b48d27c 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_manager.cpp @@ -335,6 +335,9 @@ void CheckpointManager::MoveToNextPhase() // hold the redo log lock to avoid inserting additional entries to the // log. Once snapshot is taken, this lock will be released in SnapshotReady(). m_redoLogHandler->WrLock(); + // write all buffer entries before taking the LSN position + // relevant for asynchronous logging or group commit + m_redoLogHandler->Flush(); } } diff --git a/src/gausskernel/storage/mot/core/src/system/global.h b/src/gausskernel/storage/mot/core/src/system/global.h index e69400b2d44f60aeb4e8bea30c8d060538ea5b6a..437a52c7b3d060a4c39c8551a7f3493e39f45436 100644 --- a/src/gausskernel/storage/mot/core/src/system/global.h +++ b/src/gausskernel/storage/mot/core/src/system/global.h @@ -300,6 +300,10 @@ inline void Prefetch(const void* ptr) /** @define Constant denoting indentation used for MOT printouts. */ #define PRINT_REPORT_INDENT 2 + +/** @define Min and Max values for asynchronous redo log buffer array count. */ +#define MIN_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT 8 +#define MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT 128 } // namespace MOT #endif // MOT_GLOBAL_H diff --git a/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp b/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp index fba1bb10a49cdc34a096196e7c623e0bcf25a54a..ebb6c47e30e1bc354043667d6acfc13f17e1ae8c 100644 --- a/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp +++ b/src/gausskernel/storage/mot/core/src/system/mot_configuration.cpp @@ -39,6 +39,7 @@ MOTConfiguration MOTConfiguration::motGlobalConfiguration; constexpr bool MOTConfiguration::DEFAULT_ENABLE_REDO_LOG; constexpr LoggerType MOTConfiguration::DEFAULT_LOGGER_TYPE; constexpr RedoLogHandlerType MOTConfiguration::DEFAULT_REDO_LOG_HANDLER_TYPE; +constexpr uint32_t MOTConfiguration::DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT; constexpr bool MOTConfiguration::DEFAULT_ENABLE_GROUP_COMMIT; constexpr uint64_t MOTConfiguration::DEFAULT_GROUP_COMMIT_SIZE; constexpr const char* MOTConfiguration::DEFAULT_GROUP_COMMIT_TIMEOUT; @@ -381,6 +382,7 @@ MOTConfiguration::MOTConfiguration() : m_enableRedoLog(DEFAULT_ENABLE_REDO_LOG), m_loggerType(DEFAULT_LOGGER_TYPE), m_redoLogHandlerType(DEFAULT_REDO_LOG_HANDLER_TYPE), + m_asyncRedoLogBufferArrayCount(DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT), m_enableGroupCommit(DEFAULT_ENABLE_GROUP_COMMIT), m_groupCommitSize(DEFAULT_GROUP_COMMIT_SIZE), m_groupCommitTimeoutUSec(DEFAULT_GROUP_COMMIT_TIMEOUT_USEC), @@ -475,6 +477,7 @@ bool MOTConfiguration::SetFlag(const std::string& name, const std::string& value if (ParseBool(name, "enable_redo_log", value, &m_enableRedoLog)) { } else if (ParseLoggerType(name, "logger_type", value, &m_loggerType)) { } else if (ParseRedoLogHandlerType(name, "redo_log_handler_type", value, &m_redoLogHandlerType)) { + } else if (ParseUint32(name, "async_log_buffer_count", value, &m_asyncRedoLogBufferArrayCount)) { } else if (ParseBool(name, "enable_group_commit", value, &m_enableGroupCommit)) { } else if (ParseUint64(name, "group_commit_size", value, &m_groupCommitSize)) { } else if (ParseUint64(name, "group_commit_timeout_usec", value, &m_groupCommitTimeoutUSec)) { @@ -603,6 +606,9 @@ int MOTConfiguration::GetMappedCore(int logicId) const #define UPDATE_INT_CFG(var, cfgPath, defaultValue) \ UpdateConfigItem(var, cfg->GetIntegerConfigValue(cfgPath, defaultValue), cfgPath) +#define UPDATE_INT_CFG_BOUNDS(var, cfgPath, defaultValue, lowerBound, upperBound) \ + UpdateConfigItem(var, cfg->GetIntegerConfigValue(cfgPath, defaultValue), cfgPath, lowerBound, upperBound) + #define UPDATE_MEM_CFG(var, cfgPath, defaultValue, scale) \ do { \ uint64_t memoryValueBytes = \ @@ -633,6 +639,11 @@ void MOTConfiguration::LoadConfig() UPDATE_CFG(m_enableRedoLog, "enable_redo_log", DEFAULT_ENABLE_REDO_LOG); UPDATE_USER_CFG(m_loggerType, "logger_type", DEFAULT_LOGGER_TYPE); UPDATE_USER_CFG(m_redoLogHandlerType, "redo_log_handler_type", DEFAULT_REDO_LOG_HANDLER_TYPE); + UPDATE_INT_CFG_BOUNDS(m_asyncRedoLogBufferArrayCount, + "async_log_buffer_count", + DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT, + MIN_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT, + MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT); // commit configuration UPDATE_CFG(m_enableGroupCommit, "enable_group_commit", DEFAULT_ENABLE_GROUP_COMMIT); diff --git a/src/gausskernel/storage/mot/core/src/system/mot_configuration.h b/src/gausskernel/storage/mot/core/src/system/mot_configuration.h index ac6ea789dc4e0dd7ccc9d5fc394ab62662d7ddb1..b557ccc9ad6e80aca4bf85b7b014712eaabc9d74 100644 --- a/src/gausskernel/storage/mot/core/src/system/mot_configuration.h +++ b/src/gausskernel/storage/mot/core/src/system/mot_configuration.h @@ -123,6 +123,9 @@ public: /** Determines the redo log handler type (not configurable, but derived). */ RedoLogHandlerType m_redoLogHandlerType; + /** Determines the number of asynchronous redo log buffer arrays. */ + uint32_t m_asyncRedoLogBufferArrayCount; + /**********************************************************************/ // Commit configuration /**********************************************************************/ @@ -397,6 +400,9 @@ private: /** @var Default redo log handler type. */ static constexpr RedoLogHandlerType DEFAULT_REDO_LOG_HANDLER_TYPE = RedoLogHandlerType::SYNC_REDO_LOG_HANDLER; + /** @var Default asynchronous redo log buffer array count. */ + static constexpr uint32_t DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT = 24; + // default commit configuration /** @var Default enable group commit. */ static constexpr bool DEFAULT_ENABLE_GROUP_COMMIT = false; @@ -630,9 +636,12 @@ private: } template - static void UpdateConfigItem(uint32_t& oldValue, T newValue, const char* name) + static void UpdateConfigItem(uint32_t& oldValue, T newValue, const char* name, + uint32_t lowerBound = 0, uint32_t upperBound = UINT_MAX) { - if (newValue > UINT_MAX) { + if (newValue > upperBound) { + MOT_LOG_WARN("Configuration of %s overflowed: keeping default value %u", name, oldValue); + } else if (lowerBound > 0 && newValue < lowerBound) { MOT_LOG_WARN("Configuration of %s overflowed: keeping default value %u", name, oldValue); } else if (oldValue != newValue) { MOT_LOG_TRACE("Configuration of %s changed: %u --> %u", name, oldValue, (uint32_t)newValue); diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp index b1d393b8e5ecde85c6d1deaffd2e62ef3449ccc4..76edb76210dc1347d093d6bfc60bb23c5e605278 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp @@ -1343,7 +1343,7 @@ void RecoveryManager::ClearTableCache() while (it != m_tableDeletesStat.end()) { auto table = *it; if (table.second > NUM_DELETE_MAX_INC) { - MOT_LOG_INFO("RecoveryManager::ClearTableCache: Table = %s items = %lu\n", + MOT_LOG_TRACE("RecoveryManager::ClearTableCache: Table = %s items = %lu\n", table.first->GetTableName().c_str(), table.second); table.first->ClearRowCache(); diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp index d29f283d12cfe2bdd7e9229f5370536a94995bae..f599f4839ce7c67eb3292b6a6ec00b316d6edd1b 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp @@ -93,7 +93,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateTable( Table* table = nullptr; switch (state) { case COMMIT: - MOT_LOG_INFO("RecoverLogOperationCreateTable: COMMIT"); + MOT_LOG_DEBUG("RecoverLogOperationCreateTable: COMMIT"); CreateTable((char*)data, status, table, true); break; diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.cpp b/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.cpp index 985ff5cbcffa8e7a01ffce08d479d998694435fc..25d24c612d8118f3aeda4534b8e28b84edcdaad9 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.cpp @@ -29,19 +29,42 @@ #include "global.h" #include "utilities.h" #include "mot_atomic_ops.h" +#include "mot_configuration.h" namespace MOT { DECLARE_LOGGER(AsyncRedoLogHandler, redolog) -AsyncRedoLogHandler::AsyncRedoLogHandler() : m_bufferPool(), m_activeBuffer(0) +AsyncRedoLogHandler::AsyncRedoLogHandler() + : m_bufferPool(), + m_redoLogBufferArrayCount(GetGlobalConfiguration().m_asyncRedoLogBufferArrayCount), + m_activeBuffer(0), + m_initialized(false) {} AsyncRedoLogHandler::~AsyncRedoLogHandler() -{} +{ + // wait for all redo log buffers to be written + while (!m_writeQueue.empty()) { + usleep(WRITE_LOG_WAIT_INTERVAL); + } + if (m_initialized) { + pthread_mutex_destroy(&m_writeLock); + } + m_initialized = false; +} bool AsyncRedoLogHandler::Init() { - return m_bufferPool.Init(); + bool result = false; + int rc = pthread_mutex_init(&m_writeLock, nullptr); + result = (rc == 0); + if (result != true) { + MOT_LOG_ERROR("Error initializing async redolog handler lock"); + return result; + } + result = m_bufferPool.Init(); + m_initialized = true; + return result; } RedoLogBuffer* AsyncRedoLogHandler::CreateBuffer() @@ -56,16 +79,18 @@ void AsyncRedoLogHandler::DestroyBuffer(RedoLogBuffer* buffer) RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer) { - int position = 0; + int position = -1; do { - uint64_t activeBuffer = MOT_ATOMIC_LOAD(m_activeBuffer) % WRITE_LOG_BUFFER_COUNT; - position = m_tripleBuffer[activeBuffer].PushBack(buffer); - if (position == MAX_BUFFERS / 2) { - WakeupWalWriter(); - } else if (position == -1) { - // async redo log ternary buffer is full, waiting for write thread - // to flush the buffer - usleep(WRITE_LOG_WAIT_INTERVAL); + m_switchLock.RdLock(); + int activeBuffer = m_activeBuffer; + position = m_redoLogBufferArrayArray[m_activeBuffer].PushBack(buffer); + m_switchLock.RdUnlock(); + if (position == -1) { + usleep(1000); + } else if (position == (int)(MAX_BUFFERS - 1)) { + if (TrySwitchBuffers(activeBuffer)) { + WriteSingleBuffer(); + } } } while (position == -1); return CreateBuffer(); @@ -73,34 +98,89 @@ RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer) void AsyncRedoLogHandler::Write() { - // buffer list switch logic: - // while writers write to list 0, we increment index to 1 and then write list 2 to log - // while writers write to list 1, we increment index to 2 and then write list 0 to log - // while writers write to list 2, we increment index to 0 and then write list 1 to log - uint64_t currentIndex = MOT_ATOMIC_LOAD(m_activeBuffer); - uint64_t nextIndex = (currentIndex + 1) % WRITE_LOG_BUFFER_COUNT; - uint64_t prevIndex = (currentIndex + 2) % WRITE_LOG_BUFFER_COUNT; - - // allow writers to start writing to next buffer list - MOT_ATOMIC_STORE(m_activeBuffer, nextIndex); - - // in the meantime we write and flush previous list to log - // writers might still be writing to current list, but we do not touch it in this cycle - RedoLogBufferArray& prevBufferArray = m_tripleBuffer[prevIndex]; - - // invoke logger only if something happened, otherwise we just juggle with empty buffer arrays - if (!prevBufferArray.Empty()) { - m_logger->AddToLog(prevBufferArray); - m_logger->FlushLog(); - FreeBuffers(prevBufferArray); - prevBufferArray.Reset(); + m_switchLock.RdLock(); + int activeBuffer = m_activeBuffer; + m_switchLock.RdUnlock(); + if (TrySwitchBuffers(activeBuffer)) { + WriteSingleBuffer(); } } +bool AsyncRedoLogHandler::TrySwitchBuffers(int index) +{ + bool result = false; + int nextIndex = (index + 1) % m_redoLogBufferArrayCount; + m_switchLock.WrLock(); + while (index == m_activeBuffer && !m_redoLogBufferArrayArray[index].Empty()) { + if (!m_redoLogBufferArrayArray[nextIndex].Empty()) { + // the next buffer was not yet written to the log + // wait until write complete + m_switchLock.WrUnlock(); + usleep(WRITE_LOG_WAIT_INTERVAL); + m_switchLock.WrLock(); + continue; + } + m_writeQueue.push(m_activeBuffer); + m_activeBuffer = nextIndex; + result = true; + } + m_switchLock.WrUnlock(); + return result; +} + void AsyncRedoLogHandler::FreeBuffers(RedoLogBufferArray& bufferArray) { for (uint32_t i = 0; i < bufferArray.Size(); i++) { m_bufferPool.Free(bufferArray[i]); } } + +void AsyncRedoLogHandler::WriteSingleBuffer() +{ + uint32_t writeBufferIndex; + pthread_mutex_lock(&m_writeLock); + if (!m_writeQueue.empty()) { + writeBufferIndex = m_writeQueue.front(); + m_writeQueue.pop(); + RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex]; + + // invoke logger only if something happened, otherwise we just juggle with empty buffer arrays + if (!bufferArray.Empty()) { + m_logger->AddToLog(bufferArray); + m_logger->FlushLog(); + FreeBuffers(bufferArray); + bufferArray.Reset(); + } + } + pthread_mutex_unlock(&m_writeLock); +} + +void AsyncRedoLogHandler::WriteAllBuffers() +{ + uint32_t writeBufferIndex; + pthread_mutex_lock(&m_writeLock); + while (!m_writeQueue.empty()) { + writeBufferIndex = m_writeQueue.front(); + m_writeQueue.pop(); + RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex]; + + // invoke logger only if something happened, otherwise we just juggle with empty buffer arrays + if (!bufferArray.Empty()) { + m_logger->AddToLog(bufferArray); + m_logger->FlushLog(); + FreeBuffers(bufferArray); + bufferArray.Reset(); + } + } + pthread_mutex_unlock(&m_writeLock); +} + +void AsyncRedoLogHandler::Flush() +{ + m_switchLock.RdLock(); + int activeBuffer = m_activeBuffer; + m_switchLock.RdUnlock(); + TrySwitchBuffers(activeBuffer); + WriteAllBuffers(); +} } // namespace MOT diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.h b/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.h index 370ff3c97128cbc66c65059fe7762229a1208057..fa04ddf86f9dd3c2653343bf337f39f2c3815fe4 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/asynchronous_redo_log_handler.h @@ -26,8 +26,10 @@ #ifndef ASYNCHRONOUS_REDO_LOG_HANDLER_H #define ASYNCHRONOUS_REDO_LOG_HANDLER_H +#include #include "redo_log_handler.h" #include "redo_log_buffer_pool.h" +#include "rw_lock.h" namespace MOT { class TxnManager; @@ -62,6 +64,8 @@ public: */ RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); + void Flush(); + /** * @brief switches the buffers and flushes the log */ @@ -72,17 +76,26 @@ public: ~AsyncRedoLogHandler(); private: - static constexpr unsigned int WRITE_LOG_WAIT_INTERVAL = 10000; // micro seconds - static constexpr unsigned int WRITE_LOG_BUFFER_COUNT = 3; + static constexpr unsigned int WRITE_LOG_WAIT_INTERVAL = 1000; // micro second + /** * @brief free all the RedoLogBuffers in the array and return them to the pool */ void FreeBuffers(RedoLogBufferArray& bufferArray); + bool TrySwitchBuffers(int index); + void WriteSingleBuffer(); + void WriteAllBuffers(); RedoLogBufferPool m_bufferPool; - RedoLogBufferArray m_tripleBuffer[WRITE_LOG_BUFFER_COUNT]; // 3 buffer arrays for switching in cyclic manner. - volatile uint64_t m_activeBuffer; + // array of RedoLogBufferArray for switching in cyclic manner. + RedoLogBufferArray m_redoLogBufferArrayArray[MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT]; + uint32_t m_redoLogBufferArrayCount; + volatile int m_activeBuffer; + bool m_initialized; + RwLock m_switchLock; + pthread_mutex_t m_writeLock; + std::queue m_writeQueue; }; } // namespace MOT -#endif /* ASYNCHRONOUS_REDO_LOG_HANDLER_H */ +#endif /* ASYNCHRONOUS_REDO_LOG_HANDLER_H */ \ No newline at end of file diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/redo_log_buffer_array.h b/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/redo_log_buffer_array.h index b055de242990312538dbb9f0bd59176eeadbf557..bcb7d35f7d7372aab71097d2102de21675789b8c 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/redo_log_buffer_array.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/redo_log_buffer_array.h @@ -30,7 +30,7 @@ #include "global.h" #include "redo_log_buffer.h" -#define MAX_BUFFERS 1000 +#define MAX_BUFFERS 128 namespace MOT { class RedoLogBufferArray { @@ -91,6 +91,10 @@ public: return m_array; } + static uint32_t MaxSize() + { + return MAX_BUFFERS; + } private: std::atomic m_nextFree; RedoLogBuffer* m_array[MAX_BUFFERS]; diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/commit_group.h b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/commit_group.h index 7e847866ddb93e0a78713e0e8ec6e16fa06bf3cd..573c9e69510d8006af82b1270c2b25bf9d6b5ac3 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/commit_group.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/commit_group.h @@ -72,6 +72,11 @@ public: */ void Commit(bool isLeader, std::shared_ptr groupRef); + inline bool IsCommitted() + { + return m_commited; + } + private: const uint8_t m_handlerId; GroupSyncRedoLogHandler* m_handler; diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.cpp b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.cpp index 80a36c9e69c58acf2a49245cbe65482b11999585..875d28dd1b9ae7a416be13510b37fb94bc8d3b88 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.cpp @@ -155,4 +155,17 @@ RedoLogBuffer* GroupSyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer) joinedGroup->Commit(leader, joinedGroup); return buffer; } + +void GroupSyncRedoLogHandler::Flush() +{ + std::shared_ptr flushGroup = m_currentGroup; + if (flushGroup == nullptr) { + return; + } + + while (!flushGroup->IsCommitted()) { + // spin until group is flushed + cpu_relax(); + } +} } // namespace MOT diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.h b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.h index 223f3eba4601089320c9887409db64f5231d4ac7..583433ddc65e67622f30d5535d29b22436deb90c 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/group_synchronous_redo_log_handler.h @@ -63,6 +63,8 @@ public: */ virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); + void Flush(); + /** * @brief initializes group params * @param group the group to work on diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.cpp b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.cpp index abd7240a6da16b47b9938eec539d1bf7cf4f5d33..4d28d5575f2021b9fa6568974ccdeccc2ca92475 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.cpp @@ -74,4 +74,11 @@ void SegmentedGroupSyncRedoLogHandler::SetLogger(ILogger* logger) for (unsigned int i = 0; i < m_numaNodes; i++) m_redoLogHandlerArray[i].SetLogger(logger); } + +void SegmentedGroupSyncRedoLogHandler::Flush() +{ + for (unsigned int i = 0; i < m_numaNodes; i++) { + m_redoLogHandlerArray[i].Flush(); + } +} } // namespace MOT diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.h b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.h index 516bc624613d025eb1c8ff14310625c496bf206d..6dcfad9b95c87720ac0870c0d0215c97013d5ea8 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/group_synchronous_redo_log/segmented_group_synchronous_redo_log_handler.h @@ -61,6 +61,7 @@ public: * @return The next buffer to write to, or null in case of failure. */ virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); + virtual void Flush(); virtual void SetLogger(ILogger* logger); private: diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_handler.h b/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_handler.h index 13bd21b62bd9ef53ca0754840b3876ba93d5e59d..df04eecbc16a2541394fa69c8e747230960c29c3 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_handler.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/redo_log_handler.h @@ -68,6 +68,11 @@ public: */ virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer) = 0; + /** + * @brief flush all buffers (if exist) to log + */ + virtual void Flush() = 0; + /** * @brief flushes the the log */ diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.cpp b/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.cpp index 01178db27657dd4f8e4417bd393d5aa4f21105bb..f224bd99defcf2ea49ca30928df51dfb8da4cb7a 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.cpp @@ -60,4 +60,7 @@ RedoLogBuffer* SynchronousRedoLogHandler::WriteToLog(RedoLogBuffer* buffer) m_logger->FlushLog(); return buffer; } + +void SynchronousRedoLogHandler::Flush() +{} } // namespace MOT diff --git a/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.h b/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.h index de87844db5ab039f97d404dc0f2424a9e2769a7b..24a4428131677b160d5f4e72890e9f29cf706bd6 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction_logger/synchronous_redo_log/synchronous_redo_log_handler.h @@ -57,6 +57,7 @@ public: * @return The next buffer to write to, or null in case of failure. */ RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); + void Flush(); SynchronousRedoLogHandler(const SynchronousRedoLogHandler& orig) = delete; SynchronousRedoLogHandler& operator=(const SynchronousRedoLogHandler& orig) = delete; ~SynchronousRedoLogHandler(); diff --git a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp index 2977959cd1730d984e829dda9496004599c125cd..5fad11a84fbd5206b864587ced3bc5ec8dc21d8d 100644 --- a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp +++ b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.cpp @@ -80,35 +80,19 @@ void MOTRedo(XLogReaderState* record) } } -uint64_t XLOGLogger::AddToLog(uint8_t* data, uint32_t size) -{ - XLogBeginInsert(); - XLogRegisterData((char*)data, size); - XLogInsert(RM_MOT_ID, MOT_REDO_DATA); - return size; -} - -uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer* redoBuffer) +uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer** redoLogBufferArray, uint32_t size) { - uint32_t length; - uint8_t* data = redoBuffer->Serialize(&length); - return AddToLog(data, length); + uint64_t written = MOT::ILogger::AddToLog(redoLogBufferArray, size); + XLogSetAsyncXactLSN(t_thrd.xlog_cxt.XactLastRecEnd); + return written; } -uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer** redoTransactionArray, uint32_t size) +uint64_t XLOGLogger::AddToLog(uint8_t* data, uint32_t size) { - uint32_t written = 0; - // ensure that we have enough space to add all transaction buffers - XLogEnsureRecordSpace(0, size); XLogBeginInsert(); - for (uint32_t i = 0; i < size; i++) { - uint32_t length; - uint8_t* data = redoTransactionArray[i]->Serialize(&length); - XLogRegisterData((char*)data, length); - written += length; - } + XLogRegisterData((char*)data, size); XLogInsert(RM_MOT_ID, MOT_REDO_DATA); - return written; + return size; } void XLOGLogger::FlushLog() diff --git a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.h b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.h index e42ff4a3e22016841458e15f2b191475d21c01b3..108099187cd0796f4cc4dc964bc0af05d28c7377 100644 --- a/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.h +++ b/src/gausskernel/storage/mot/fdw_adapter/src/mot_fdw_xlog.h @@ -45,9 +45,8 @@ public: inline ~XLOGLogger() {} + uint64_t AddToLog(MOT::RedoLogBuffer** redoLogBufferArray, uint32_t size); uint64_t AddToLog(uint8_t* data, uint32_t size); - uint64_t AddToLog(MOT::RedoLogBuffer* redoBuffer); - uint64_t AddToLog(MOT::RedoLogBuffer** redoBufferArray, uint32_t size); void FlushLog(); void CloseLog(); void ClearLog();