From 26a06c95b865ab738d705e1e650784c646e2e3b5 Mon Sep 17 00:00:00 2001 From: jiang_jianyu Date: Mon, 31 Aug 2020 21:01:43 +0800 Subject: [PATCH 1/3] move global variable to instance or thread --- src/common/backend/libpq/pqcomm.cpp | 7 ++- src/common/backend/libpq/pqmq.cpp | 63 ++++++++----------- src/common/backend/utils/init/globals.cpp | 1 - src/common/backend/utils/init/miscinit.cpp | 2 +- src/common/backend/utils/init/postinit.cpp | 2 +- .../process/postmaster/bgworker.cpp | 19 +++--- .../process/postmaster/postmaster.cpp | 6 +- .../process/threadpool/knl_thread.cpp | 19 ++++++ src/gausskernel/storage/lmgr/proc.cpp | 6 +- src/include/gtm/pqcomm.h | 4 ++ src/include/knl/knl_thread.h | 21 +++++++ src/include/libpq/libpq.h | 22 +++---- src/include/libpq/pqcomm.h | 2 + src/include/miscadmin.h | 1 - src/include/postmaster/bgworker_internals.h | 2 - 15 files changed, 103 insertions(+), 74 deletions(-) diff --git a/src/common/backend/libpq/pqcomm.cpp b/src/common/backend/libpq/pqcomm.cpp index b6190904df..428cf47818 100644 --- a/src/common/backend/libpq/pqcomm.cpp +++ b/src/common/backend/libpq/pqcomm.cpp @@ -154,7 +154,7 @@ static int socket_putmessage_noblock(char msgtype, const char *s, size_t len); static void socket_startcopyout(void); static void socket_endcopyout(bool errorAbort); -static PQcommMethods PqCommSocketMethods = { +static const PQcommMethods PqCommSocketMethods = { socket_comm_reset, socket_flush, socket_flush_if_writable, @@ -165,7 +165,10 @@ static PQcommMethods PqCommSocketMethods = { socket_endcopyout }; -THR_LOCAL PQcommMethods *PqCommMethods = &PqCommSocketMethods; +void PqCommMethods_init() +{ + t_thrd.msqueue_cxt.PqCommMethods = &PqCommSocketMethods; +} extern bool FencedUDFMasterMode; diff --git a/src/common/backend/libpq/pqmq.cpp b/src/common/backend/libpq/pqmq.cpp index fd3197de2a..d9c6a2723c 100644 --- a/src/common/backend/libpq/pqmq.cpp +++ b/src/common/backend/libpq/pqmq.cpp @@ -21,12 +21,6 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" -static THR_LOCAL shm_mq *pq_mq; -static THR_LOCAL shm_mq_handle *pq_mq_handle; -static THR_LOCAL bool pq_mq_busy = false; -static THR_LOCAL ThreadId pq_mq_parallel_master_pid = 0; -static THR_LOCAL BackendId pq_mq_parallel_master_backend_id = InvalidBackendId; - static void mq_comm_reset(void); static int mq_flush(void); static int mq_flush_if_writable(void); @@ -36,7 +30,7 @@ static int mq_putmessage_noblock(char msgtype, const char *s, size_t len); static void mq_startcopyout(void); static void mq_endcopyout(bool errorAbort); -static THR_LOCAL PQcommMethods PqCommMqMethods = { +static const PQcommMethods PqCommMqMethods = { mq_comm_reset, mq_flush, mq_flush_if_writable, @@ -47,32 +41,29 @@ static THR_LOCAL PQcommMethods PqCommMqMethods = { mq_endcopyout }; -static THR_LOCAL PQcommMethods *save_PqCommMethods; -static THR_LOCAL CommandDest save_whereToSendOutput; -static THR_LOCAL ProtocolVersion save_FrontendProtocol; - /* * Arrange to redirect frontend/backend protocol messages to a message queue. */ void pq_redirect_to_shm_mq(shm_mq_handle *mqh) { - save_PqCommMethods = PqCommMethods; - save_whereToSendOutput = CommandDest(t_thrd.postgres_cxt.whereToSendOutput); - save_FrontendProtocol = FrontendProtocol; + t_thrd.msqueue_cxt.save_PqCommMethods = t_thrd.msqueue_cxt.PqCommMethods; + t_thrd.msqueue_cxt.save_whereToSendOutput = CommandDest(t_thrd.postgres_cxt.whereToSendOutput); + t_thrd.msqueue_cxt.save_FrontendProtocol = FrontendProtocol; - PqCommMethods = &PqCommMqMethods; - pq_mq_handle = mqh; + t_thrd.msqueue_cxt.PqCommMethods = &PqCommMqMethods; + t_thrd.msqueue_cxt.pq_mq = shm_mq_get_queue(mqh); + t_thrd.msqueue_cxt.pq_mq_handle = mqh; t_thrd.postgres_cxt.whereToSendOutput = static_cast(DestRemote); FrontendProtocol = PG_PROTOCOL_LATEST; } void pq_stop_redirect_to_shm_mq(void) { - PqCommMethods = save_PqCommMethods; - t_thrd.postgres_cxt.whereToSendOutput = static_cast(save_whereToSendOutput); - FrontendProtocol = save_FrontendProtocol; - pq_mq = NULL; - pq_mq_handle = NULL; + t_thrd.msqueue_cxt.PqCommMethods = t_thrd.msqueue_cxt.save_PqCommMethods; + t_thrd.postgres_cxt.whereToSendOutput = static_cast(t_thrd.msqueue_cxt.save_whereToSendOutput); + FrontendProtocol = t_thrd.msqueue_cxt.save_FrontendProtocol; + t_thrd.msqueue_cxt.pq_mq = NULL; + t_thrd.msqueue_cxt.pq_mq_handle = NULL; } /* @@ -81,9 +72,9 @@ void pq_stop_redirect_to_shm_mq(void) */ void pq_set_parallel_master(ThreadId pid, BackendId backend_id) { - Assert(PqCommMethods == &PqCommMqMethods); - pq_mq_parallel_master_pid = pid; - pq_mq_parallel_master_backend_id = backend_id; + Assert(t_thrd.msqueue_cxt.PqCommMethods == &PqCommMqMethods); + t_thrd.msqueue_cxt.pq_mq_parallel_master_pid = pid; + t_thrd.msqueue_cxt.pq_mq_parallel_master_backend_id = backend_id; } static void mq_comm_reset(void) @@ -127,10 +118,10 @@ static int mq_putmessage(char msgtype, const char *s, size_t len) * queueing the message would amount to indefinitely postponing the * response to the interrupt. So we do this instead. */ - if (pq_mq_busy) { - if (pq_mq_handle != NULL) - shm_mq_detach(pq_mq_handle); - pq_mq_handle = NULL; + if (t_thrd.msqueue_cxt.pq_mq_busy) { + if (t_thrd.msqueue_cxt.pq_mq_handle != NULL) + shm_mq_detach(t_thrd.msqueue_cxt.pq_mq_handle); + t_thrd.msqueue_cxt.pq_mq_handle = NULL; return EOF; } @@ -140,24 +131,24 @@ static int mq_putmessage(char msgtype, const char *s, size_t len) * be generated late in the shutdown sequence, after all DSMs have already * been detached. */ - if (pq_mq_handle == NULL) + if (t_thrd.msqueue_cxt.pq_mq_handle == NULL) return 0; - pq_mq_busy = true; + t_thrd.msqueue_cxt.pq_mq_busy = true; iov[0].data = &msgtype; iov[0].len = 1; iov[1].data = s; iov[1].len = len; - Assert(pq_mq_handle != NULL); + Assert(t_thrd.msqueue_cxt.pq_mq_handle != NULL); for (;;) { - result = shm_mq_sendv(pq_mq_handle, iov, 2, true); + result = shm_mq_sendv(t_thrd.msqueue_cxt.pq_mq_handle, iov, 2, true); - if (pq_mq_parallel_master_pid != 0) - (void)SendProcSignal(pq_mq_parallel_master_pid,PROCSIG_PARALLEL_MESSAGE, - pq_mq_parallel_master_backend_id); + if (t_thrd.msqueue_cxt.pq_mq_parallel_master_pid != 0) + (void)SendProcSignal(t_thrd.msqueue_cxt.pq_mq_parallel_master_pid,PROCSIG_PARALLEL_MESSAGE, + t_thrd.msqueue_cxt.pq_mq_parallel_master_backend_id); if (result != SHM_MQ_WOULD_BLOCK) break; @@ -167,7 +158,7 @@ static int mq_putmessage(char msgtype, const char *s, size_t len) CHECK_FOR_INTERRUPTS(); } - pq_mq_busy = false; + t_thrd.msqueue_cxt.pq_mq_busy = false; Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); if (result != SHM_MQ_SUCCESS) diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index e0601f5665..e0818b9a76 100755 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -51,7 +51,6 @@ THR_LOCAL object_access_hook_type object_access_hook = NULL; * These are initialized for the bootstrap/standalone case. */ THR_LOCAL bool IsUnderPostmaster = false; -THR_LOCAL bool IsBackgroundWorker = false; volatile ThreadId PostmasterPid = 0; bool IsPostmasterEnvironment = false; diff --git a/src/common/backend/utils/init/miscinit.cpp b/src/common/backend/utils/init/miscinit.cpp index a1b469cc3c..3e58554e39 100755 --- a/src/common/backend/utils/init/miscinit.cpp +++ b/src/common/backend/utils/init/miscinit.cpp @@ -846,7 +846,7 @@ void InitializeSessionUserIdStandalone(void) */ AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsJobSchedulerProcess() || IsJobWorkerProcess() || AM_WAL_SENDER || - IsBackgroundWorker); + t_thrd.bgworker_cxt.is_background_worker); /* In pooler stateless reuse mode, to reset session userid */ if (!g_instance.attr.attr_network.PoolerStatelessReuse) { diff --git a/src/common/backend/utils/init/postinit.cpp b/src/common/backend/utils/init/postinit.cpp index 2775810b90..de5ef40bc8 100644 --- a/src/common/backend/utils/init/postinit.cpp +++ b/src/common/backend/utils/init/postinit.cpp @@ -1495,7 +1495,7 @@ void PostgresInitializer::InitSession() if (!IsUnderPostmaster) { CheckAtLeastOneRoles(); SetSuperUserStandalone(); - } else if (IsBackgroundWorker) { + } else if (t_thrd.bgworker_cxt.is_background_worker) { if (m_username == NULL && !OidIsValid(m_useroid)) { InitializeSessionUserIdStandalone(); m_isSuperUser = true; diff --git a/src/gausskernel/process/postmaster/bgworker.cpp b/src/gausskernel/process/postmaster/bgworker.cpp index d38484b433..ec19ac5ebf 100644 --- a/src/gausskernel/process/postmaster/bgworker.cpp +++ b/src/gausskernel/process/postmaster/bgworker.cpp @@ -33,11 +33,6 @@ #include "utils/ps_status.h" #include "utils/postinit.h" -/* - * The postmaster's list of registered background workers, in private memory. - */ -THR_LOCAL slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList); - /* * BackgroundWorkerSlots exist in shared memory and can be accessed (via * the BackgroundWorkerArray) by both the postmaster and by regular backends. @@ -159,7 +154,7 @@ void BackgroundWorkerShmemInit(void) * correspondence between the postmaster's private list and the array * in shared memory. */ - slist_foreach(siter, &BackgroundWorkerList) { + slist_foreach(siter, &t_thrd.bgworker_cxt.background_worker_list) { BackgroundWorkerSlot *slot = &t_thrd.bgworker_cxt.background_worker_data->slot[slotno]; RegisteredBgWorker *rw; @@ -198,7 +193,7 @@ static RegisteredBgWorker * FindRegisteredWorkerBySlotNumber(int slotno) { slist_iter siter; - slist_foreach(siter, &BackgroundWorkerList) { + slist_foreach(siter, &t_thrd.bgworker_cxt.background_worker_list) { RegisteredBgWorker *rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur); if (rw->rw_shmem_slot == slotno) { return rw; @@ -371,7 +366,7 @@ void BackgroundWorkerStateChange(void) (errmsg("registering background worker \"%s\"", rw->rw_worker.bgw_name))); - slist_push_head(&BackgroundWorkerList, &rw->rw_lnode); + slist_push_head(&t_thrd.bgworker_cxt.background_worker_list, &rw->rw_lnode); } } @@ -475,7 +470,7 @@ void BackgroundWorkerStopNotifications(ThreadId pid) { slist_iter siter; - slist_foreach(siter, &BackgroundWorkerList) + slist_foreach(siter, &t_thrd.bgworker_cxt.background_worker_list) { RegisteredBgWorker *rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur); if (rw->rw_worker.bgw_notify_pid == pid) { @@ -495,7 +490,7 @@ void ResetBackgroundWorkerCrashTimes(void) { slist_mutable_iter iter; - slist_foreach_modify(iter, &BackgroundWorkerList) + slist_foreach_modify(iter, &t_thrd.bgworker_cxt.background_worker_list) { RegisteredBgWorker *rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); @@ -705,7 +700,7 @@ void StartBackgroundWorker(void* bgWorkerSlotShmAddr) (errmsg("unable to find bgworker entry"))); } - IsBackgroundWorker = true; + t_thrd.bgworker_cxt.is_background_worker = true; /* Identify myself via ps */ init_ps_display(worker->bgw_name, "", "", ""); @@ -889,7 +884,7 @@ void RegisterBackgroundWorker(BackgroundWorker *worker) rw->rw_crashed_at = 0; rw->rw_terminate = false; - slist_push_head(&BackgroundWorkerList, &rw->rw_lnode); + slist_push_head(&t_thrd.bgworker_cxt.background_worker_list, &rw->rw_lnode); } /* diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 8a0da6b553..c891b3a02e 100755 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -5595,7 +5595,7 @@ static bool CleanupBackgroundWorker(ThreadId pid, char namebuf[MAXPGPATH]; slist_mutable_iter iter; - slist_foreach_modify(iter, &BackgroundWorkerList) { + slist_foreach_modify(iter, &t_thrd.bgworker_cxt.background_worker_list) { RegisteredBgWorker *rw; rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); @@ -8019,7 +8019,7 @@ static void maybe_start_bgworkers(void) g_instance.bgworker_cxt.start_worker_needed = false; g_instance.bgworker_cxt.have_crashed_worker = false; - slist_foreach_modify(iter, &BackgroundWorkerList) { + slist_foreach_modify(iter, &t_thrd.bgworker_cxt.background_worker_list) { RegisteredBgWorker *rw; rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); @@ -10388,7 +10388,7 @@ int GaussDbThreadMain(knl_thread_arg* arg) #endif case BACKGROUND_WORKER: { - IsBackgroundWorker = true; + t_thrd.bgworker_cxt.is_background_worker = true; InitProcessAndShareMemory(); StartBackgroundWorker(arg->payload); proc_exit(0); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index efc36eafa3..7e2185ec87 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -67,6 +67,7 @@ #include "utils/postinit.h" #include "utils/relmapper.h" #include "workload/workload.h" +#include "libpq/pqcomm.h" THR_LOCAL knl_thrd_context t_thrd; @@ -1417,6 +1418,22 @@ void knl_t_bgworker_init(knl_t_bgworker_context* bgworker_cxt) { bgworker_cxt->background_worker_data = NULL; bgworker_cxt->my_bgworker_entry = NULL; + bgworker_cxt->is_background_worker = false; + bgworker_cxt->background_worker_list = SLIST_STATIC_INIT(background_worker_list); +} + +void knl_t_msqueue_init(knl_t_msqueue_context* msqueue_cxt) +{ + msqueue_cxt->pq_mq = NULL; + msqueue_cxt->pq_mq_handle = NULL; + msqueue_cxt->pq_mq_busy = false; + msqueue_cxt->pq_mq_parallel_master_pid = 0; + msqueue_cxt->pq_mq_parallel_master_backend_id = InvalidBackendId; + msqueue_cxt->save_PqCommMethods = NULL; + msqueue_cxt->save_whereToSendOutput = DestDebug; + msqueue_cxt->save_FrontendProtocol = PG_PROTOCOL_LATEST; + //msqueue_cxt->PqCommMethods = NULL; + PqCommMethods_init(); } void knl_thread_init(knl_thread_role role) @@ -1507,6 +1524,8 @@ void knl_thread_init(knl_thread_role role) knl_t_poolcleaner_init(&t_thrd.poolcleaner_cxt); knl_t_mot_init(&t_thrd.mot_cxt); knl_t_autonomous_init(&t_thrd.autonomous_cxt); + knl_t_bgworker_init(&t_thrd.bgworker_cxt); + knl_t_msqueue_init(&t_thrd.msqueue_cxt); } void knl_thread_set_name(const char* name) diff --git a/src/gausskernel/storage/lmgr/proc.cpp b/src/gausskernel/storage/lmgr/proc.cpp index aba442a0cc..01dcc0891d 100755 --- a/src/gausskernel/storage/lmgr/proc.cpp +++ b/src/gausskernel/storage/lmgr/proc.cpp @@ -476,7 +476,7 @@ void InitProcess(void) t_thrd.proc = g_instance.proc_base->autovacFreeProcs; else if (IsJobSchedulerProcess() || IsJobWorkerProcess()) t_thrd.proc = g_instance.proc_base->pgjobfreeProcs; - else if (IsBackgroundWorker) + else if (t_thrd.bgworker_cxt.is_background_worker) t_thrd.proc = g_instance.proc_base->bgworkerFreeProcs; else { #ifndef __USE_NUMA @@ -493,7 +493,7 @@ void InitProcess(void) g_instance.proc_base->autovacFreeProcs = (PGPROC *)t_thrd.proc->links.next; else if (IsJobSchedulerProcess() || IsJobWorkerProcess()) g_instance.proc_base->pgjobfreeProcs = (PGPROC *)t_thrd.proc->links.next; - else if (IsBackgroundWorker) + else if (t_thrd.bgworker_cxt.is_background_worker) g_instance.proc_base->bgworkerFreeProcs = (PGPROC *)t_thrd.proc->links.next; else { #ifndef __USE_NUMA @@ -1054,7 +1054,7 @@ static void ProcKill(int code, Datum arg) t_thrd.proc->links.next = (SHM_QUEUE *)g_instance.proc_base->pgjobfreeProcs; g_instance.proc_base->pgjobfreeProcs = t_thrd.proc; } - else if (IsBackgroundWorker) + else if (t_thrd.bgworker_cxt.is_background_worker) { t_thrd.proc->links.next = (SHM_QUEUE *)g_instance.proc_base->bgworkerFreeProcs; g_instance.proc_base->bgworkerFreeProcs = t_thrd.proc; diff --git a/src/include/gtm/pqcomm.h b/src/include/gtm/pqcomm.h index 720126200c..82cfcc2db9 100644 --- a/src/include/gtm/pqcomm.h +++ b/src/include/gtm/pqcomm.h @@ -29,6 +29,10 @@ typedef struct { size_t salen; } SockAddr; +void PqCommMethods_init(); +typedef unsigned int uint32; /* == 32 bits */ +typedef uint32 ProtocolVersion; /* FE/BE protocol version number */ + /* * In protocol 3.0 and later, the startup packet length is not fixed, but * we set an arbitrary limit on it anyway. This is just to prevent simple diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 472d3e2637..3591d9f619 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2716,8 +2716,28 @@ typedef struct knl_t_mot_context { typedef struct knl_t_bgworker_context { BackgroundWorkerArray *background_worker_data; BackgroundWorker *my_bgworker_entry; + bool is_background_worker; + /* + * The postmaster's list of registered background workers, in private memory. + */ + slist_head background_worker_list; } knl_t_bgworker_context; +struct shm_mq; +struct shm_mq_handle; +struct PQcommMethods; +typedef struct knl_t_msqueue_context { + shm_mq *pq_mq; + shm_mq_handle *pq_mq_handle; + bool pq_mq_busy; + ThreadId pq_mq_parallel_master_pid; + BackendId pq_mq_parallel_master_backend_id; + const PQcommMethods *save_PqCommMethods; + CommandDest save_whereToSendOutput; + ProtocolVersion save_FrontendProtocol; + const PQcommMethods *PqCommMethods; +} knl_t_msqueue_context; + /* thread context. */ typedef struct knl_thrd_context { knl_thread_role role; @@ -2817,6 +2837,7 @@ typedef struct knl_thrd_context { knl_t_poolcleaner_context poolcleaner_cxt; knl_t_mot_context mot_cxt; knl_t_bgworker_context bgworker_cxt; + knl_t_msqueue_context msqueue_cxt; } knl_thrd_context; extern void knl_thread_mot_init(); diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 3988224f46..4b44f11b5c 100755 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -35,7 +35,7 @@ typedef struct { } u; } PQArgBlock; -typedef struct { +struct PQcommMethods{ void (*comm_reset) (void); int (*flush) (void); int (*flush_if_writable) (void); @@ -44,20 +44,18 @@ typedef struct { int (*putmessage_noblock) (char msgtype, const char* s, size_t len); void (*startcopyout) (void); void (*endcopyout) (bool errorAbort); -} PQcommMethods; - -extern PGDLLIMPORT THR_LOCAL PQcommMethods *PqCommMethods; +}; -#define pq_comm_reset() (PqCommMethods->comm_reset()) -#define pq_flush() (PqCommMethods->flush()) -#define pq_flush_if_writable() (PqCommMethods->flush_if_writable()) -#define pq_is_send_pending() (PqCommMethods->is_send_pending()) +#define pq_comm_reset() (t_thrd.msqueue_cxt.PqCommMethods->comm_reset()) +#define pq_flush() (t_thrd.msqueue_cxt.PqCommMethods->flush()) +#define pq_flush_if_writable() (t_thrd.msqueue_cxt.PqCommMethods->flush_if_writable()) +#define pq_is_send_pending() (t_thrd.msqueue_cxt.PqCommMethods->is_send_pending()) #define pq_putmessage(msgtype, s, len) \ - (PqCommMethods->putmessage(msgtype, s, len)) + (t_thrd.msqueue_cxt.PqCommMethods->putmessage(msgtype, s, len)) #define pq_putmessage_noblock(msgtype, s, len) \ - (PqCommMethods->putmessage_noblock(msgtype, s, len)) -#define pq_startcopyout() (PqCommMethods->startcopyout()) -#define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort)) + (t_thrd.msqueue_cxt.PqCommMethods->putmessage_noblock(msgtype, s, len)) +#define pq_startcopyout() (t_thrd.msqueue_cxt.PqCommMethods->startcopyout()) +#define pq_endcopyout(errorAbort) (t_thrd.msqueue_cxt.PqCommMethods->endcopyout(errorAbort)) /* * External functions. diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index 00a9bbe8e4..acd9aedf10 100755 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -239,6 +239,8 @@ typedef struct ConnPack { ConnectStreamPacket cp; } ConnPack; +void PqCommMethods_init(); + /* * A client can also start by sending stop query request */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 6d73c6960d..4b0a3c9be2 100755 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -131,7 +131,6 @@ extern bool InplaceUpgradePrecommit; extern THR_LOCAL PGDLLIMPORT bool IsUnderPostmaster; extern THR_LOCAL PGDLLIMPORT char my_exec_path[]; -extern THR_LOCAL PGDLLIMPORT bool IsBackgroundWorker; #define MAX_QUERY_DOP (64) #define MIN_QUERY_DOP -(MAX_QUERY_DOP) diff --git a/src/include/postmaster/bgworker_internals.h b/src/include/postmaster/bgworker_internals.h index fa57bdc4da..c6017f4b70 100644 --- a/src/include/postmaster/bgworker_internals.h +++ b/src/include/postmaster/bgworker_internals.h @@ -41,8 +41,6 @@ typedef struct RegisteredBgWorker { slist_node rw_lnode; /* list link */ } RegisteredBgWorker; -extern THR_LOCAL slist_head BackgroundWorkerList; - extern Size BackgroundWorkerShmemSize(void); extern void BackgroundWorkerShmemInit(void); extern void BackgroundWorkerStateChange(void); -- Gitee From 21fb02c39717ea30eda771ad113c160cc6f493af Mon Sep 17 00:00:00 2001 From: jiang_jianyu Date: Mon, 31 Aug 2020 21:09:49 +0800 Subject: [PATCH 2/3] fix backendworker not exit bugs --- src/common/backend/libpq/pqmq.cpp | 2 ++ src/gausskernel/process/postmaster/bgworker.cpp | 15 ++++++++++++--- src/gausskernel/process/tcop/autonomous.cpp | 5 +++-- src/gausskernel/process/tcop/postgres.cpp | 11 ++++++++++- src/gausskernel/process/threadpool/knl_thread.cpp | 3 ++- src/include/knl/knl_thread.h | 2 ++ 6 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/common/backend/libpq/pqmq.cpp b/src/common/backend/libpq/pqmq.cpp index d9c6a2723c..94a304533d 100644 --- a/src/common/backend/libpq/pqmq.cpp +++ b/src/common/backend/libpq/pqmq.cpp @@ -55,6 +55,7 @@ void pq_redirect_to_shm_mq(shm_mq_handle *mqh) t_thrd.msqueue_cxt.pq_mq_handle = mqh; t_thrd.postgres_cxt.whereToSendOutput = static_cast(DestRemote); FrontendProtocol = PG_PROTOCOL_LATEST; + t_thrd.msqueue_cxt.is_changed = true; } void pq_stop_redirect_to_shm_mq(void) @@ -64,6 +65,7 @@ void pq_stop_redirect_to_shm_mq(void) FrontendProtocol = t_thrd.msqueue_cxt.save_FrontendProtocol; t_thrd.msqueue_cxt.pq_mq = NULL; t_thrd.msqueue_cxt.pq_mq_handle = NULL; + t_thrd.msqueue_cxt.is_changed = false; } /* diff --git a/src/gausskernel/process/postmaster/bgworker.cpp b/src/gausskernel/process/postmaster/bgworker.cpp index ec19ac5ebf..a254d2062f 100644 --- a/src/gausskernel/process/postmaster/bgworker.cpp +++ b/src/gausskernel/process/postmaster/bgworker.cpp @@ -638,8 +638,9 @@ static void bgworker_quickdie(SIGNAL_ARGS) */ static void bgworker_die(SIGNAL_ARGS) { - (void)PG_SETMASK(&t_thrd.libpq_cxt.BlockSig); + (void)gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL); + t_thrd.postgres_cxt.whereToSendOutput = DestNone; ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating background worker \"%s\" due to administrator command", @@ -734,6 +735,14 @@ void StartBackgroundWorker(void* bgWorkerSlotShmAddr) (void)gspqsignal(SIGUSR2, SIG_IGN); (void)gspqsignal(SIGCHLD, SIG_DFL); + (void)gs_signal_unblock_sigusr2(); + if (IsUnderPostmaster) { + /* We allow SIGQUIT (quickdie) at all times */ + (void)sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT); + } + + gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL); /* block everything except SIGQUIT */ + /* * If an exception is encountered, processing resumes here. * @@ -1298,12 +1307,12 @@ void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 fl */ void BackgroundWorkerBlockSignals(void) { - (void)PG_SETMASK(&t_thrd.libpq_cxt.BlockSig); + (void)gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL); } void BackgroundWorkerUnblockSignals(void) { - (void)PG_SETMASK(&t_thrd.libpq_cxt.UnBlockSig); + (void)gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL); } diff --git a/src/gausskernel/process/tcop/autonomous.cpp b/src/gausskernel/process/tcop/autonomous.cpp index e3f02bf6f1..9702af14df 100644 --- a/src/gausskernel/process/tcop/autonomous.cpp +++ b/src/gausskernel/process/tcop/autonomous.cpp @@ -191,6 +191,8 @@ AutonomousSession * AutonomousSessionStart(void) shm_mq_set_handle(session->command_qh, session->worker_handle); shm_mq_set_handle(session->response_qh, session->worker_handle); + t_thrd.autonomous_cxt.handle = session->worker_handle; + bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid); if (bgwstatus != BGWH_STARTED) ereport(ERROR, @@ -244,6 +246,7 @@ void AutonomousSessionEnd(AutonomousSession *session) pfree(session->worker_handle); pfree(session->seg); pfree(session); + t_thrd.autonomous_cxt.handle = NULL; } AutonomousResult *AutonomousSessionExecute(AutonomousSession *session, const char *sql) @@ -347,7 +350,6 @@ AutonomousPreparedStatement *AutonomousSessionPrepare(AutonomousSession *session invalid_protocol_message(msgtype); break; } - pq_redirect_to_shm_mq(session->command_qh); pq_beginmessage(&msg, 'D'); pq_sendbyte(&msg, 'S'); @@ -522,7 +524,6 @@ void autonomous_worker_main(Datum main_arg) char msgtype; - (void)gspqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); t_thrd.autonomous_cxt.isnested = true; diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index d252265147..5c4711948a 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -67,6 +67,7 @@ #endif /* PGXC */ #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "postmaster/bgworker.h" #include "replication/dataqueue.h" #include "replication/datasender.h" #include "replication/walsender.h" @@ -146,6 +147,7 @@ extern int optreset; /* might not be declared by system headers */ #include "utils/syscache.h" #include "utils/tqual.h" #include "storage/mot/jit_exec.h" +#include "libpq/pqmq.h" #define GSCGROUP_ATTACH_TASK() \ { \ @@ -6624,6 +6626,13 @@ int StreamMain(void* arg) int curTryCounter; int* oldTryCounter = NULL; if (sigsetjmp(local_sigjmp_buf, 1) != 0) { + if (t_thrd.msqueue_cxt.is_changed == true) { + pq_stop_redirect_to_shm_mq(); + } + if (t_thrd.autonomous_cxt.handle) { + TerminateBackgroundWorker(t_thrd.autonomous_cxt.handle); + t_thrd.autonomous_cxt.handle = NULL; + } gstrace_tryblock_exit(true, oldTryCounter); (void)pgstat_report_waitstatus(STATE_WAIT_UNDEFINED); @@ -7521,7 +7530,7 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam t_thrd.postgres_cxt.gpc_fisrt_send_clean = false; GPC->SendPrepareDestoryMsg(); } - + /* * Abort the current transaction in order to recover. */ diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 7e2185ec87..4282a39a71 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1387,6 +1387,7 @@ static void knl_t_heartbeat_init(knl_t_heartbeat_context* heartbeat_cxt) static void knl_t_autonomous_init(knl_t_autonomous_context* autonomous_cxt) { autonomous_cxt->isnested = false; + autonomous_cxt->handle = NULL; autonomous_cxt->sqlstmt = NULL; } @@ -1432,7 +1433,7 @@ void knl_t_msqueue_init(knl_t_msqueue_context* msqueue_cxt) msqueue_cxt->save_PqCommMethods = NULL; msqueue_cxt->save_whereToSendOutput = DestDebug; msqueue_cxt->save_FrontendProtocol = PG_PROTOCOL_LATEST; - //msqueue_cxt->PqCommMethods = NULL; + msqueue_cxt->is_changed = false; PqCommMethods_init(); } diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 3591d9f619..39bb16df08 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2665,6 +2665,7 @@ struct PLpgSQL_expr; typedef struct knl_t_autonomous_context { PLpgSQL_expr* sqlstmt; bool isnested; + BackgroundWorkerHandle* handle; } knl_t_autonomous_context; /* MOT thread attributes */ @@ -2736,6 +2737,7 @@ typedef struct knl_t_msqueue_context { CommandDest save_whereToSendOutput; ProtocolVersion save_FrontendProtocol; const PQcommMethods *PqCommMethods; + bool is_changed; } knl_t_msqueue_context; /* thread context. */ -- Gitee From 5be100f482453891cc9f2a40cec394ba7f38c0f4 Mon Sep 17 00:00:00 2001 From: jiang_jianyu Date: Tue, 1 Sep 2020 09:04:50 +0800 Subject: [PATCH 3/3] use rname to replace invalid role_name when InitializeSessionUserId is invoked with id --- src/bin/gs_guc/cluster_guc.conf | 1 + src/common/backend/utils/init/miscinit.cpp | 10 +++++----- src/gausskernel/process/postmaster/bgworker.cpp | 10 ++++++++++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 9726396cad..9e7af92936 100644 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -525,6 +525,7 @@ defer_csn_cleanup_time|int|0,2147483647|ms|NULL| tcp_recv_timeout|int|0,86400|s|Specify the receiving timeouts until reporting an error.| max_inner_tool_connections|int|1,8388607|NULL|NULL| max_keep_log_seg|int|0,2147483647|NULL|NULL| +max_background_workers|int|0,262143|NULL|NULL| [gtm] nodename|string|0,0|NULL|Name of this GTM/GTM-Standby.| port|int|1,65535|NULL|Listen Port of GTM or GTM standby server.| diff --git a/src/common/backend/utils/init/miscinit.cpp b/src/common/backend/utils/init/miscinit.cpp index 3e58554e39..7a6d083d9d 100755 --- a/src/common/backend/utils/init/miscinit.cpp +++ b/src/common/backend/utils/init/miscinit.cpp @@ -804,7 +804,7 @@ void InitializeSessionUserId(const char* role_name, Oid role_id) if (!rform->rolcanlogin) { ereport(FATAL, (errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION), - errmsg("role \"%s\" is not permitted to login", role_name))); + errmsg("role \"%s\" is not permitted to login", rname))); } /* * Check connection limit for this role. @@ -818,17 +818,17 @@ void InitializeSessionUserId(const char* role_name, Oid role_id) */ if (rform->rolconnlimit >= 0 && !u_sess->misc_cxt.AuthenticatedUserIsSuperuser && CountUserBackends(role_id) > rform->rolconnlimit) { - ReportAlarmTooManyDbUserConn(role_name); + ReportAlarmTooManyDbUserConn(rname); ereport(FATAL, - (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("too many connections for role \"%s\"", role_name))); + (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("too many connections for role \"%s\"", rname))); } else if (!u_sess->misc_cxt.AuthenticatedUserIsSuperuser) { - ReportResumeTooManyDbUserConn(role_name); + ReportResumeTooManyDbUserConn(rname); } } /* Record username and superuser status as GUC settings too */ - SetConfigOption("session_authorization", role_name, PGC_BACKEND, PGC_S_OVERRIDE); + SetConfigOption("session_authorization", rname, PGC_BACKEND, PGC_S_OVERRIDE); SetConfigOption( "is_sysadmin", u_sess->misc_cxt.AuthenticatedUserIsSuperuser ? "on" : "off", PGC_INTERNAL, PGC_S_OVERRIDE); diff --git a/src/gausskernel/process/postmaster/bgworker.cpp b/src/gausskernel/process/postmaster/bgworker.cpp index a254d2062f..3951f57262 100644 --- a/src/gausskernel/process/postmaster/bgworker.cpp +++ b/src/gausskernel/process/postmaster/bgworker.cpp @@ -32,6 +32,8 @@ #include "utils/ascii.h" #include "utils/ps_status.h" #include "utils/postinit.h" +#include "access/xact.h" +#include "utils/memtrack.h" /* * BackgroundWorkerSlots exist in shared memory and can be accessed (via @@ -675,6 +677,7 @@ void StartBackgroundWorker(void* bgWorkerSlotShmAddr) BackgroundWorker *worker = t_thrd.bgworker_cxt.my_bgworker_entry; bgworker_main_type entrypt; + t_thrd.proc_cxt.MyProgName = "BackgroundWorker"; /* * Create memory context and buffer used for RowDescription messages. As * SendRowDescriptionMessage(), via exec_describe_statement_message(), is @@ -755,9 +758,13 @@ void StartBackgroundWorker(void* bgWorkerSlotShmAddr) /* Prevent interrupts while cleaning up */ HOLD_INTERRUPTS(); + /* output the memory tracking information when error happened */ + MemoryTrackingOutputFile(); + /* Report the error to the server log */ EmitErrorReport(); + AbortCurrentTransaction(); /* * Do we need more cleanup here? For shmem-connected bgworkers, we * will call InitProcess below, which will install ProcKill as exit @@ -795,6 +802,9 @@ void StartBackgroundWorker(void* bgWorkerSlotShmAddr) #endif } + /* Initialize the memory tracking information */ + MemoryTrackingInit(); + /* * Look up the entry point function, loading its library if necessary. */ -- Gitee