From 6fe6270171ded6549de13e08ce443e739dc58eb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=92=9F=E6=89=BF=E5=BF=97?= <842536125@qq.com> Date: Wed, 20 Nov 2024 09:37:08 +0800 Subject: [PATCH] modified: src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp modified: src/gausskernel/ddes/adapter/ss_dms_callback.cpp modified: src/gausskernel/process/threadpool/knl_instance.cpp modified: src/gausskernel/storage/buffer/bufmgr.cpp modified: src/gausskernel/storage/smgr/segment/segbuffer.cpp modified: src/include/ddes/dms/ss_dms_recovery.h modified: src/include/knl/knl_thread.h --- .../ddes/adapter/ss_dms_bufmgr.cpp | 9 +++++++++ .../ddes/adapter/ss_dms_callback.cpp | 1 + .../process/threadpool/knl_instance.cpp | 1 + src/gausskernel/storage/buffer/bufmgr.cpp | 19 ++++++++++++------- .../storage/smgr/segment/segbuffer.cpp | 12 +++++++++++- src/include/ddes/dms/ss_dms_recovery.h | 3 +++ src/include/knl/knl_thread.h | 3 +++ 7 files changed, 40 insertions(+), 8 deletions(-) diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 41ae638dac..72fb2cc036 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -30,6 +30,7 @@ #include "utils/resowner.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "ddes/dms/ss_reform_common.h" +#include "ddes/dms/ss_dms_recovery.h" #include "securec_check.h" #include "miscadmin.h" #include "access/double_write.h" @@ -483,6 +484,14 @@ Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, return 0; } + if (SS_FAILOVER_TO_JOBSCHEDULE) { + *with_io = false; + ereport(LOG, (errmsg("The new master starts reading pages: spc/db/rel/bucket fork-block:[%u/%u/%u/%d %d-%u]", + buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, + buf_desc->tag.rnode.relNode, buf_desc->tag.rnode.bucketNode, + buf_desc->tag.forkNum, buf_desc->tag.blockNum))); + } + if (!DmsStartBufferIO(buf_desc, mode)) { if (!DmsCheckBufAccessible()) { *with_io = false; diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 63e59b873c..81d2e9c46c 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1871,6 +1871,7 @@ static void FailoverCleanBackends() if (g_instance.dms_cxt.SSRecoveryInfo.no_backend_left && !CheckpointInProgress()) { ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] backends exit successfully, " "wait_time = %ds", wait_time / FAILOVER_TIME_CONVERT))); + g_instance.dms_cxt.SSRecoveryInfo.failover_to_jobschedule = true; break; } diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 273f5963f4..81e128fb8f 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -216,6 +216,7 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt) dms_cxt->SSRecoveryInfo.dorado_sharestorage_inited = false; dms_cxt->SSRecoveryInfo.ondemand_recovery_pause_status = NOT_PAUSE; dms_cxt->SSRecoveryInfo.enableRealtimeBuildLogCtrl = false; + dms_cxt->SSRecoveryInfo.failover_to_jobschedule = false; dms_cxt->SSRecoveryInfo.globalSleepTime = 0; dms_cxt->SSRecoveryInfo.sleepTimeSyncLock = (slock_t)0; errno_t rc = memset_s(dms_cxt->SSRecoveryInfo.rtBuildCtrl, diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index f8967c8f79..414eedcede 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -88,6 +88,7 @@ #include "ddes/dms/ss_common_attr.h" #include "ddes/dms/ss_reform_common.h" #include "ddes/dms/ss_transaction.h" +#include "knl/knl_thread.h" const int ONE_MILLISECOND = 1; const int TEN_MICROSECOND = 10; @@ -2589,6 +2590,10 @@ Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber fork */ } else if (RecoveryInProgress()) { BlockNumber totalBlkNum = smgrnblocks_cached(smgr, forkNum); + /* when in failover, should return to worker thread exit */ + if ((SS_IN_FAILOVER) && (SS_AM_BACKENDS_WORKERS)) { + return InvalidBuffer; + } /* Update cached blocks */ if (totalBlkNum == InvalidBlockNumber || blockNum >= totalBlkNum) { @@ -2785,7 +2790,7 @@ found_branch: return InvalidBuffer; } /* when in failover, should return to worker thread exit */ - if ((SS_IN_FAILOVER) && ((t_thrd.role == WORKER) || (t_thrd.role == THREADPOOL_WORKER))) { + if (SS_IN_FAILOVER && SS_AM_BACKENDS_WORKERS) { SSUnPinBuffer(bufHdr); return InvalidBuffer; } @@ -3333,6 +3338,11 @@ retry_new_buffer: return buf; } + /* when in failover, should return to worker thread exit */ + if ((SS_IN_FAILOVER) && (SS_AM_BACKENDS_WORKERS)) { + ClearReadHint(buf->buf_id, true); + break; + } /* * Somebody could have pinned or re-dirtied the buffer while we were * doing the I/O and making the new hashtable entry. If so, we can't @@ -3353,11 +3363,6 @@ retry_new_buffer: ClearReadHint(buf->buf_id, true); break; } - /* when in failover, woker thread should return InvalidBuffer and exit */ - if (SS_IN_FAILOVER && ((t_thrd.role == WORKER) || (t_thrd.role == THREADPOOL_WORKER))) { - ClearReadHint(buf->buf_id, true); - break; - } } else { break; } @@ -6355,7 +6360,7 @@ retry: } bool with_io_in_progress = true; /* when in failover, should return to worker thread exit */ - if ((SS_IN_FAILOVER) && ((t_thrd.role == WORKER) || (t_thrd.role == THREADPOOL_WORKER))) { + if (SS_IN_FAILOVER && SS_AM_BACKENDS_WORKERS) { return; } if (IsSegmentBufferID(buf->buf_id)) { diff --git a/src/gausskernel/storage/smgr/segment/segbuffer.cpp b/src/gausskernel/storage/smgr/segment/segbuffer.cpp index 5d864da823..fd1bb3fe08 100644 --- a/src/gausskernel/storage/smgr/segment/segbuffer.cpp +++ b/src/gausskernel/storage/smgr/segment/segbuffer.cpp @@ -34,6 +34,8 @@ #include "pgstat.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "replication/ss_disaster_cluster.h" +#include "knl/knl_thread.h" + /* * Segment buffer, used for segment meta data, e.g., segment head, space map head. We separate segment * meta data buffer and normal data buffer (in bufmgr.cpp) to avoid potential dead locks. @@ -503,6 +505,14 @@ Buffer ReadSegBufferForDMS(BufferDesc* bufHdr, ReadBufferMode mode, SegSpace *sp SegSpcTag tag = {.spcNode = bufHdr->tag.rnode.spcNode, .dbNode = bufHdr->tag.rnode.dbNode}; SegmentCheck(t_thrd.storage_cxt.SegSpcCache != NULL); spc = (SegSpace *)hash_search(t_thrd.storage_cxt.SegSpcCache, (void *)&tag, HASH_FIND, &found); + if (SS_FAILOVER_TO_JOBSCHEDULE) { + ereport(LOG, (errmsg("Job schedule starts reading pages by ReadSegBufferForDMS: + spc/db/rel/bucket fork-block:[%u/%u/%u/%d %d-%u]", + bufHdr->tag.rnode.spcNode, bufHdr->tag.rnode.dbNode, + bufHdr->tag.rnode.relNode, bufHdr->tag.rnode.bucketNode, + bufHdr->tag.forkNum, bufHdr->tag.blockNum))); + g_instance.dms_cxt.SSRecoveryInfo.failover_to_jobschedule = false; + } SegmentCheck(found); ereport(DEBUG1, (errmsg("Fetch cached SegSpace success, spcNode:%u dbNode:%u.", bufHdr->tag.rnode.spcNode, bufHdr->tag.rnode.dbNode))); @@ -619,7 +629,7 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc return InvalidBuffer; } /* when in failover, should return to worker thread exit */ - if ((SS_IN_FAILOVER) && ((t_thrd.role == WORKER) || (t_thrd.role == THREADPOOL_WORKER))) { + if (SS_IN_FAILOVER && SS_AM_BACKENDS_WORKERS) { SSUnPinBuffer(bufHdr); return InvalidBuffer; } diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index 8cd55a0abb..531993008a 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -66,6 +66,8 @@ SS_NORMAL_PRIMARY && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl > 0) #define SS_STANDBY_ENABLE_TARGET_RTO (SS_NORMAL_STANDBY && \ SS_ONDEMAND_REALTIME_BUILD_NORMAL && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl > 0) +#define SS_FAILOVER_TO_JOBSCHEDULE (t_thrd.role == JOB_SCHEDULER && \ + g_instance.dms_cxt.SSRecoveryInfo.failover_to_jobschedule == true) #define REFORM_CTRL_VERSION 1 typedef struct st_reformer_ctrl { @@ -173,6 +175,7 @@ typedef struct ss_recovery_info { bool disaster_cluster_promoting; // standby cluster is promoting volatile ondemand_recovery_pause_status_t ondemand_recovery_pause_status; bool realtime_build_in_reform; // used to avoid starting realtime build during reform + bool failover_to_jobschedule; // used to failover to jobschedule during reform volatile bool enableRealtimeBuildLogCtrl; slock_t sleepTimeSyncLock; volatile int globalSleepTime; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 2a3dba1f43..559832eeb2 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -82,6 +82,9 @@ #define MAX_PATH_LEN 1024 extern const int g_reserve_param_num; #define PARTKEY_VALUE_MAXNUM 64 +#define SS_AM_BACKENDS_WORKERS (t_thrd.role == WORKER || \ + t_thrd.role == THREADPOOL_WORKER || \ + t_thrd.role == TRACK_STMT_CLEANER) typedef struct ResourceOwnerData* ResourceOwner; typedef struct logicalLog logicalLog; -- Gitee