diff --git a/src/bin/pg_ctl/pg_ctl.cpp b/src/bin/pg_ctl/pg_ctl.cpp index 0fcefc0759a6e7edcec94ae903646c33f4733651..388a47024eaec18026a244c3b76e93e4ad001c74 100644 --- a/src/bin/pg_ctl/pg_ctl.cpp +++ b/src/bin/pg_ctl/pg_ctl.cpp @@ -106,6 +106,7 @@ typedef enum { SYNCHRONOUS_COMMIT_REMOTE_RECEIVE, /* wait for local flush and remote receive */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote write */ SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_REPLAY, SYNCHRONOUS_BAD } SyncCommitLevel; diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index 95af67d25fecad7053c49dc4688b5721358a16be..9af1df0b74e3cf9b7a8dc08de3d18d69bbdc68dd 100644 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -888,6 +888,8 @@ static const struct config_enum_entry synchronous_commit_options[] = {{"local", {"no", SYNCHRONOUS_COMMIT_OFF, true}, {"1", SYNCHRONOUS_COMMIT_ON, true}, {"0", SYNCHRONOUS_COMMIT_OFF, true}, + {"2", SYNCHRONOUS_COMMIT_REMOTE_REPLAY, false}, + {"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_REPLAY, false}, {NULL, 0, false}}; static const struct config_enum_entry plan_cache_mode_options[] = { diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index 3f0b5f832db9fdd85ecf4918b2c0f8d24ed2a252..bc6ffaba0eb4e6f2e6a0ed0c19d7a88e14787ad4 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -80,11 +80,11 @@ static void SyncRepWaitCompletionQueue(); static void SyncRepNotifyComplete(); static int SyncRepGetStandbyPriority(void); -static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, bool* am_sync); +static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, bool* am_sync); static void SyncRepGetOldestSyncRecPtr( - XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys); + XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys); static void SyncRepGetNthLatestSyncRecPtr( - XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys, uint8 nth); + XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth); #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); @@ -434,6 +434,7 @@ void SyncRepReleaseWaiters(void) XLogRecPtr receivePtr; XLogRecPtr writePtr; XLogRecPtr flushPtr; + XLogRecPtr replayPtr; int numreceive = 0; int numwrite = 0; int numflush = 0; @@ -465,7 +466,7 @@ void SyncRepReleaseWaiters(void) * Check whether we are a sync standby or not, and calculate the synced * positions among all sync standbys. */ - got_recptr = SyncRepGetSyncRecPtr(&receivePtr, &writePtr, &flushPtr, &am_sync); + got_recptr = SyncRepGetSyncRecPtr(&receivePtr, &writePtr, &flushPtr, &replayPtr, &am_sync); /* * If we are managing a sync standby, though we weren't prior to this, @@ -512,6 +513,11 @@ void SyncRepReleaseWaiters(void) numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_REPALY], replayPtr)) { + walsndctl->lsn[SYNC_REP_WAIT_REPALY] = t_thrd.walsender_cxt.MyWalSnd->apply; + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_REPALY); + } + LWLockRelease(SyncRepLock); ereport(DEBUG3, @@ -537,13 +543,14 @@ void SyncRepReleaseWaiters(void) * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ -static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, bool* am_sync) +static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, bool* am_sync) { List* sync_standbys = NIL; *receivePtr = InvalidXLogRecPtr; *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; + *replayPtr = InvalidXLogRecPtr; *am_sync = false; /* Get standbys that are considered as synchronous at this moment */ @@ -572,10 +579,10 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, X * positions even in a quorum-based sync replication. */ if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { - SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, sync_standbys); + SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, sync_standbys); } else { SyncRepGetNthLatestSyncRecPtr( - receivePtr, writePtr, flushPtr, sync_standbys, t_thrd.syncrep_cxt.SyncRepConfig->num_sync); + receivePtr, writePtr, flushPtr, replayPtr, sync_standbys, t_thrd.syncrep_cxt.SyncRepConfig->num_sync); } list_free(sync_standbys); @@ -586,7 +593,7 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, X * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void SyncRepGetOldestSyncRecPtr( - XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys) + XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys) { ListCell* cell = NULL; @@ -599,11 +606,13 @@ static void SyncRepGetOldestSyncRecPtr( XLogRecPtr receive; XLogRecPtr write; XLogRecPtr flush; + XLogRecPtr apply; SpinLockAcquire(&walsnd->mutex); receive = walsnd->receive; write = walsnd->write; flush = walsnd->flush; + apply = walsnd->apply; SpinLockRelease(&walsnd->mutex); if (XLogRecPtrIsInvalid(*writePtr) || !XLByteLE(*writePtr, write)) @@ -612,6 +621,8 @@ static void SyncRepGetOldestSyncRecPtr( *flushPtr = flush; if (XLogRecPtrIsInvalid(*receivePtr) || !XLByteLE(*receivePtr, receive)) *receivePtr = receive; + if (XLogRecPtrIsInvalid(*replayPtr) || !XLByteLE(*replayPtr, apply)) + *replayPtr = apply; } } @@ -620,12 +631,13 @@ static void SyncRepGetOldestSyncRecPtr( * standbys. */ static void SyncRepGetNthLatestSyncRecPtr( - XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys, uint8 nth) + XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth) { ListCell* cell = NULL; XLogRecPtr* receive_array = NULL; XLogRecPtr* write_array = NULL; XLogRecPtr* flush_array = NULL; + XLogRecPtr* apply_array = NULL; int len; int i = 0; @@ -633,6 +645,7 @@ static void SyncRepGetNthLatestSyncRecPtr( receive_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); write_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); flush_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); + apply_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); foreach (cell, sync_standbys) { WalSnd* walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[lfirst_int(cell)]; @@ -641,6 +654,7 @@ static void SyncRepGetNthLatestSyncRecPtr( receive_array[i] = walsnd->receive; write_array[i] = walsnd->write; flush_array[i] = walsnd->flush; + apply_array[i] = walsnd->apply; SpinLockRelease(&walsnd->mutex); i++; @@ -649,11 +663,13 @@ static void SyncRepGetNthLatestSyncRecPtr( qsort(receive_array, len, sizeof(XLogRecPtr), cmp_lsn); qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *receivePtr = receive_array[nth - 1]; *writePtr = write_array[nth - 1]; *flushPtr = flush_array[nth - 1]; + *replayPtr = apply_array[nth - 1]; pfree(receive_array); receive_array = NULL; @@ -661,6 +677,8 @@ static void SyncRepGetNthLatestSyncRecPtr( write_array = NULL; pfree(flush_array); flush_array = NULL; + pfree(apply_array); + apply_array = NULL; } /* @@ -1332,6 +1350,9 @@ void assign_synchronous_commit(int newval, void* extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_REPLAY: + SyncRepWaitMode = SYNC_REP_WAIT_REPALY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 09f05a846c51abd5638c563c6c6ee8479e5ad3f4..4b78e926e4cb7faa239dca6b899d384af486396c 100755 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -92,6 +92,7 @@ typedef enum { SYNCHRONOUS_COMMIT_REMOTE_RECEIVE, /* wait for local flush and remote receive */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote write */ SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_REPLAY, /* wait for local and remote replay */ SYNCHRONOUS_BAD } SyncCommitLevel; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 6a411e030a706a30ff8ae83cc8acd5b7f9ed0aca..967707f6fb17631ba3e66732276d96f3e5061e66 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -25,8 +25,9 @@ #define SYNC_REP_WAIT_RECEIVE 0 #define SYNC_REP_WAIT_WRITE 1 #define SYNC_REP_WAIT_FLUSH 2 +#define SYNC_REP_WAIT_REPALY 3 -#define NUM_SYNC_REP_WAIT_MODE 3 +#define NUM_SYNC_REP_WAIT_MODE 4 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0