diff --git a/src/common/backend/utils/init/miscinit.cpp b/src/common/backend/utils/init/miscinit.cpp index 72561d7cf3495f662907f98ee838be092c912306..52f97e346505fe9398411733e02cdd2c4aa2336d 100644 --- a/src/common/backend/utils/init/miscinit.cpp +++ b/src/common/backend/utils/init/miscinit.cpp @@ -2139,6 +2139,14 @@ void initDssPath(const char *dssdir, const char *xlogdir) "%s/pg_replication/pg_ss_ctl_info", xlogdir); securec_check_ss(rc, "", ""); + rc = snprintf_s(g_instance.datadir_cxt.configFilePath, MAXPGPATH, MAXPGPATH - 1, "%s/shared_postgresql.conf", + dssdir); + securec_check_ss(rc, "", ""); + + rc = snprintf_s(g_instance.datadir_cxt.hbaConfigFilePath, MAXPGPATH, MAXPGPATH - 1, "%s/shared_pg_hba.conf", + dssdir); + securec_check_ss(rc, "", ""); + ss_initdwsubdir(dssdir); } diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index de2bd560957a8e0d0a1d2c43ed7e1d2c6c0d75fe..45fc5c38d207128c5622b659e72c9ec44a4fcf4c 100755 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -137,6 +137,7 @@ #include "storage/procarray.h" #include "storage/standby.h" #include "storage/remote_adapter.h" +#include "storage/file/fio_device.h" #include "tcop/tcopprot.h" #include "threadpool/threadpool.h" #include "tsearch/ts_cache.h" @@ -232,6 +233,13 @@ #define NUM_KEYS 2 #define AUDITFILE_THRESHOLD_LOWER_BOUND 100 const uint32 AUDIT_THRESHOLD_VERSION_NUM = 92735; +#define DSS_BYTE_AGAINST 512 +#define TRY_COUNT 3 + +#define POSTGRESQL_CONF_LOCAL g_instance.attr.attr_common.ConfigFileName +#define POSTGRESQL_CONF_SHARED g_instance.datadir_cxt.configFilePath +#define HBA_CONF_SHARED g_instance.datadir_cxt.hbaConfigFilePath +#define HBA_CONF_LOCAL g_instance.attr.attr_common.HbaFileName extern volatile bool most_available_sync; extern void SetThreadLocalGUC(knl_session_context* session); @@ -12917,6 +12925,490 @@ ErrCode write_guc_file(const char* path, char** lines) return retcode; } +static int copy_file_dss(char *scpath, char *despath) +{ + int fd_source, fd_target; + int res = 0; + ssize_t step_size = DSS_BYTE_AGAINST; + ssize_t read_size = DSS_BYTE_AGAINST; + struct stat statbuf; + char buffer[step_size]; + off_t offset = 0; + + if (lstat(despath, &statbuf) == 0) { + if (remove(despath) != 0) { + ereport(LOG, (errmsg("could not remove file: %s", despath))); + return -1; + } + } + + fd_source = open(scpath, O_RDONLY, 0644); + if (fd_source < 0) { + ereport(LOG, (errmsg("could not open file: %s", scpath))); + return -1; + } + fd_target = open(despath, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd_target < 0) { + ereport(LOG, (errmsg("could not open file: %s", despath))); + close(fd_source); + return -1; + } + int size = lseek(fd_source, 0, SEEK_END); + + lseek(fd_source, 0, SEEK_SET); + while (offset < size) { + if (offset + step_size > size) { + read_size = size - offset; + errno_t rc = memset_s(buffer, step_size, ' ', step_size); + securec_check(rc, "\0", "\0"); + buffer[read_size] = '\n'; + } + res = pread(fd_source, buffer, step_size, offset); + if (res != read_size && res != step_size) { + ereport(LOG, (errmsg("read %s, failed, errno: %s", scpath, strerror(errno)))); + close(fd_source); + close(fd_target); + return -1; + } + res = pwrite(fd_target, buffer, step_size, offset); + if (res != step_size) { + ereport(LOG, (errmsg("write %s, failed, errno: %s", despath, strerror(errno)))); + close(fd_source); + close(fd_target); + return -1; + } + offset += step_size; + } + + res = ftruncate(fd_target, size); + if (res != 0) { + close(fd_source); + close(fd_target); + ereport(LOG, (errmsg("truncate %s, failed, errno: %s", despath, strerror(errno)))); + return -1; + } + + close(fd_source); + close(fd_target); + return 0; +} + +static int update_hba_file(char *scpath, char *despath) +{ + struct stat statbuf; + if (lstat(scpath, &statbuf) < 0 || statbuf.st_size == 0) { + ereport(LOG, + (errcode_for_file_access(), errmsg("could not stat file or directory \"%s\": %m", scpath))); + return -1; + } + + bool isWriteLocalFile = (strcmp(scpath, HBA_CONF_SHARED) == 0) ? true : false; + char conf_bak[MAXPGPATH]; + int count = 0; + int ret = 0; + ret = snprintf_s(conf_bak, MAXPGPATH, MAXPGPATH - 1, "%s/%s", + g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name, HBA_BAK_FILENAME_PM); + securec_check_ss(ret, "\0", "\0"); + while (lstat(conf_bak, &statbuf) == 0) { + if (count >= TRY_COUNT) { + if (isWriteLocalFile) { + /*source file is shered hba config file*/ + ereport(LOG, (errmsg("the shared hba config file %s is being written", conf_bak))); + return -1; + } else { + break; + } + } + count++; + pg_usleep(200000L); /* sleep 200ms for stat next time */ + } + + if (isWriteLocalFile) { + ret = snprintf_s(conf_bak, MAXPGPATH, MAXPGPATH - 1, "%s/%s", t_thrd.proc_cxt.DataDir, HBA_BAK_FILENAME_PM); + securec_check_ss(ret, "\0", "\0"); + } + + /* copy the hba file to the temporary file and rename */ + ret = copy_file_dss(scpath, conf_bak); + if (ret != 0) { + ereport(LOG, (errcode_for_file_access(), errmsg("update temporary config file \"%s\" failed", conf_bak))); + return -1; + } else { + if (rename(conf_bak, despath) != 0) { + ereport(LOG, + (errcode_for_file_access(), errmsg("could not rename \"%s\" to \"%s\"", conf_bak, despath))); + return -1; + } + } + return 0; +} + +static char** read_local_guc_file(char *path) +{ + char **lines = nullptr; + bool read_guc_file_success = true; + + PG_TRY(); + { + lines = read_guc_file(path); + } + PG_CATCH(); + { + read_guc_file_success = false; + EmitErrorReport(); + FlushErrorState(); + } + PG_END_TRY(); + if (!read_guc_file_success) { + /* if failed to read guc file, will log the error info in PG_CATCH(), no need to log again. */ + return NULL; + } + if (lines == NULL) { + ereport(LOG, (errmsg("the config file has no data,please check it."))); + return NULL; + } + return lines; +} + +static char** read_shared_guc_file(char *path) +{ + int infileFd = 0; + int maxlength = 1, linelen = 0; + int nlines = 0; + char** result; + char* buffer = NULL; + int res = 0; + ssize_t step_size = DSS_BYTE_AGAINST; + char temp_buffer[step_size]; + off_t offset = 0; + const int limit_of_length = 100000; // limit of maxlength and nlines + infileFd = open(path, O_RDONLY, 0644); + if (infileFd < 0) { + ereport(LOG, (errmsg("could not open file: %s", path))); + return NULL; + } + int size = lseek(infileFd, 0, SEEK_END); + lseek(infileFd, 0, SEEK_SET); + + while (offset < size) { + errno_t rc = memset_s(temp_buffer, step_size, '\0', step_size); + securec_check(rc, "\0", "\0"); + res = pread(infileFd, temp_buffer, step_size, offset); + if (res != step_size) { + ereport(LOG, (errmsg("config file read failed: %s", strerror(errno)))); + close(infileFd); + return NULL; + } + + for (int i = 0; i < step_size; i++) { + if (temp_buffer[i] == '\0') { + break; + } + linelen++; + if (temp_buffer[i] == '\n') { + nlines++; + if (linelen > maxlength) { + maxlength = linelen; + } + linelen = 0; + } + } + offset += step_size; + } + + /* handle last line without a terminating newline (yuck) */ + if (linelen) { + nlines++; + } + if (linelen > maxlength) { + maxlength = linelen; + } + if (maxlength <= 1) { + close(infileFd); + infileFd = 0; + return NULL; + } + if (maxlength > limit_of_length || nlines > limit_of_length) { + close(infileFd); + infileFd = 0; + ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), errmsg("Length of file or line is too long."))); + } + + /* set up the result and the line buffer */ + result = (char**)pg_malloc((nlines + 2) * sizeof(char*)); /* Reserve one extra for alter system set. */ + buffer = (char*)pg_malloc(maxlength + 1); + + /* now reprocess the file and store the lines */ + lseek(infileFd, 0, SEEK_SET); + int appendIndex = 0; + nlines = 0; + offset = 0; + errno_t rc = memset_s(buffer, maxlength + 1, '\0', maxlength + 1); + securec_check(rc, "\0", "\0"); + while (offset < size) { + errno_t rc = memset_s(temp_buffer, step_size, '\0', step_size); + securec_check(rc, "\0", "\0"); + res = pread(infileFd, temp_buffer, step_size, offset); + if (res != step_size) { + ereport(LOG, (errmsg("config file read failed: %s", strerror(errno)))); + close(infileFd); + return NULL; + } + + for (int i = 0; i < step_size; i++) { + if (temp_buffer[i] == '\0') { + break; + } + buffer[appendIndex++] = temp_buffer[i]; + if (temp_buffer[i] == '\n') { + result[nlines++] = xstrdup(buffer); + appendIndex = 0; + errno_t rc = memset_s(buffer, maxlength + 1, '\0', maxlength + 1); + securec_check(rc, "\0", "\0"); + } + } + + offset += step_size; + } + + (void)close(infileFd); + pfree(buffer); + result[nlines] = result[nlines + 1] = NULL; + + return result; +} + +static int write_shared_guc_file(char *path, char *buffer, int size) +{ + int fd_target; + off_t offset = 0; + struct stat statbuf; + int res = 0; + + if (lstat(path, &statbuf) == 0) { + if (remove(path) != 0) { + ereport(LOG, (errmsg("could not remove file: %s", path))); + return -1; + } + } + fd_target = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd_target < 0) { + ereport(LOG, (errmsg("could not open file: %s", path))); + return -1; + } + + res = pwrite(fd_target, buffer, size, offset); + if (res != size) { + ereport(LOG, (errmsg("write failed: %s", strerror(errno)))); + close(fd_target); + return -1; + } + + close(fd_target); + + return 0; +} + +static int update_shared_guc_file(char *path) +{ + struct stat statbuf; + if (lstat(g_instance.attr.attr_common.ConfigFileName, &statbuf) < 0 || statbuf.st_size == 0) { + ereport(LOG, (errcode_for_file_access(), errmsg("could not stat file or directory \"%s\"", + g_instance.attr.attr_common.ConfigFileName))); + return -1; + } + + char **lines = nullptr; + char *temp_buf = nullptr; + int temp_buf_len = 0; + char conf_bak[MAXPGPATH]; + int ret = snprintf_s(conf_bak, MAXPGPATH, MAXPGPATH - 1, "%s/%s", + g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name, CONFIG_BAK_FILENAME_PM); + securec_check_ss(ret, "\0", "\0"); + + lines = read_local_guc_file(g_instance.attr.attr_common.ConfigFileName); + if (lines == nullptr) { + return -1; + } + comment_guc_lines(lines, g_reserve_param); + temp_buf_len = add_guc_optlines_to_buffer(lines, &temp_buf); + release_opt_lines(lines); + Assert(temp_buf_len != 0); + + ret = write_shared_guc_file(conf_bak,temp_buf,temp_buf_len); + pfree(temp_buf); + temp_buf = NULL; + if (ret != 0) { + ereport(LOG, (errcode_for_file_access(), errmsg("write \"%s\" failed", conf_bak))); + return -1; + }else { + if (rename(conf_bak, path) != 0) { + ereport(LOG, (errcode_for_file_access(), errmsg("could not rename \"%s\" to \"%s\"", conf_bak, path))); + return -1; + } + } + + return 0; +} + +static bool proc_config_content(char *buf, Size len) +{ + struct stat statbuf; + ErrCode retcode = CODE_OK; + char conf_bak[MAXPGPATH]; + char **reserve_item = NULL; + int ret = 0; + + ret = snprintf_s(conf_bak, MAXPGPATH, MAXPGPATH - 1, "%s/%s", t_thrd.proc_cxt.DataDir, CONFIG_BAK_FILENAME_PM); + securec_check_ss(ret, "\0", "\0"); + + if (lstat(g_instance.attr.attr_common.ConfigFileName, &statbuf) != 0) { + if (errno != ENOENT) + ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file or directory \"%s\": %m", + g_instance.attr.attr_common.ConfigFileName))); + return false; + } + + reserve_item = alloc_opt_lines(g_reserve_param_num); + if (reserve_item == NULL) { + ereport(LOG, (errmsg("Alloc mem for reserved parameters failed"))); + return false; + } + + /* 1. load reserved parameters to reserve_item(array in memeory) */ + retcode = copy_asyn_lines(g_instance.attr.attr_common.ConfigFileName, reserve_item, g_reserve_param); + if (retcode != CODE_OK) { + release_opt_lines(reserve_item); + ereport(LOG, (errmsg("copy asynchronization items failed: %s\n", gs_strerror(retcode)))); + return false; + } + + /* 2. genreate temp files and fill it with content from primary. */ + retcode = generate_temp_file(buf, conf_bak, len); + if (retcode != CODE_OK) { + release_opt_lines(reserve_item); + ereport(LOG, (errmsg("create %s failed: %s\n", conf_bak, gs_strerror(retcode)))); + return false; + } + + /* 3. adjust the info with reserved parameters, and sync to temp file. */ + retcode = update_temp_file(conf_bak, reserve_item, g_reserve_param); + if (retcode != CODE_OK) { + release_opt_lines(reserve_item); + ereport(LOG, (errmsg("update gaussdb config file failed: %s\n", gs_strerror(retcode)))); + return false; + } else { + ereport(LOG, (errmsg("update gaussdb config file success"))); + if (rename(conf_bak, g_instance.attr.attr_common.ConfigFileName) != 0) { + release_opt_lines(reserve_item); + ereport(LOG, (errcode_for_file_access(), errmsg("could not rename \"%s\" to \"%s\"", conf_bak, + g_instance.attr.attr_common.ConfigFileName))); + return false; + } + } + + release_opt_lines(reserve_item); + + return true; +} + +static int update_local_guc_file(char *path) +{ + struct stat statbuf; + if (lstat(g_instance.datadir_cxt.configFilePath, &statbuf) < 0 || statbuf.st_size == 0) { + ereport(LOG, (errcode_for_file_access(), errmsg("could not stat file or directory \"%s\": %m", \ + g_instance.datadir_cxt.configFilePath))); + return -1; + } + + char **lines = nullptr; + bool read_guc_file_success = true; + char *temp_buf = nullptr; + int temp_buf_len = 0; + int count = 0; + char conf_bak[MAXPGPATH]; + int ret = snprintf_s(conf_bak, MAXPGPATH, MAXPGPATH - 1, "%s/%s", + g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name, CONFIG_BAK_FILENAME_PM); + securec_check_ss(ret, "\0", "\0"); + + while (lstat(conf_bak, &statbuf) == 0) { + if (count >= TRY_COUNT) { + ereport(LOG, (errmsg("the shared hba config file \"%s\" is being written", conf_bak))); + return -1; + } + count++; + pg_usleep(200000L); /* sleep 200ms for lstat next time */ + } + + PG_TRY(); + { + lines = read_shared_guc_file(g_instance.datadir_cxt.configFilePath); + } + PG_CATCH(); + { + read_guc_file_success = false; + EmitErrorReport(); + FlushErrorState(); + } + PG_END_TRY(); + if (!read_guc_file_success) { + /* if failed to read guc file, will log the error info in PG_CATCH(), no need to log again. */ + return -1; + } + if (lines == nullptr) { + ereport(LOG, (errmsg("the shared config file \"%s\" has no data, please check it.", + g_instance.datadir_cxt.configFilePath))); + return -1; + } + + temp_buf_len = add_guc_optlines_to_buffer(lines, &temp_buf); + release_opt_lines(lines); + Assert(temp_buf_len != 0); + + if (true != proc_config_content(temp_buf, temp_buf_len)) { + ereport(LOG, (errmsg("PM update config file \"%s\" failed", path))); + } + pfree(temp_buf); + temp_buf = NULL; + return 0; +} + +void sync_config_file() +{ + if (g_instance.dms_cxt.SSReformInfo.needSyncConfig) { + /* new primary node reads the shared configuration file during reform */ + g_instance.dms_cxt.SSReformInfo.needSyncConfig = false; + if (update_local_guc_file(POSTGRESQL_CONF_LOCAL) == 0 && + update_hba_file(HBA_CONF_SHARED, HBA_CONF_LOCAL) == 0) { + ereport(LOG, (errmsg("new primary node synchronize configuration files successfully"))); + }else { + ereport(LOG, (errmsg("new primary node synchronize configuration files failed"))); + } + } else if (SS_PRIMARY_MODE && ENABLE_DSS && ENABLE_DMS) { + /* + * primary node updates the shared configuration file + * with the contents of the local configuration file + */ + if (update_shared_guc_file(POSTGRESQL_CONF_SHARED) == 0 && + update_hba_file(HBA_CONF_LOCAL, HBA_CONF_SHARED) == 0) { + ereport(LOG, (errmsg("primary node synchronize configuration files successfully"))); + }else { + ereport(LOG, (errmsg("primary node synchronize configuration files failed"))); + } + } else if (SS_STANDBY_MODE && ENABLE_DSS && ENABLE_DMS) { + /* + * standby node updates the local configuration file + * with the contents of the shared configuration file + */ + if (update_local_guc_file(POSTGRESQL_CONF_LOCAL) == 0 && + update_hba_file(HBA_CONF_SHARED, HBA_CONF_LOCAL) == 0) { + ereport(LOG, (errmsg("standby node synchronize configuration files successfully"))); + }else { + ereport(LOG, (errmsg("standby node synchronize configuration files failed"))); + } + } +} + /* * @@GaussDB@@ * Brief : copy_guc_lines diff --git a/src/gausskernel/ddes/adapter/Makefile b/src/gausskernel/ddes/adapter/Makefile index 35b19f0d1fb0592e48857bf44de84eba0b516940..9c5953c235220d48e886e19c9262c979992c4c9c 100644 --- a/src/gausskernel/ddes/adapter/Makefile +++ b/src/gausskernel/ddes/adapter/Makefile @@ -23,7 +23,7 @@ endif OBJS = ss_dms_bufmgr.o ss_dms_callback.o ss_dms_recovery.o ss_dms.o ss_init.o \ ss_reform_common.o ss_switchover.o ss_transaction.o ss_aio.o ss_txnstatus.o \ - ss_xmin.o ss_dms_auxiliary.o ss_dms_fi.o + ss_xmin.o ss_dms_auxiliary.o ss_dms_fi.o ss_sync_auxiliary.o include $(top_srcdir)/src/gausskernel/common.mk diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 7bf73c78e28acca758b07ce2cbb7a5af1e44e2ea..ca881408656cfabd96b3200214a3d536cfe33653 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1411,6 +1411,9 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ case BCAST_REPORT_REALTIME_BUILD_PTR: ret = SSGetStandbyRealtimeBuildPtr(data, len); break; + case BCAST_CONFIG_SYNC: + ret = SSUpdateLocalConfFile(data, len); + break; default: ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] invalid broadcast operate type"))); ret = DMS_ERROR; diff --git a/src/gausskernel/ddes/adapter/ss_sync_auxiliary.cpp b/src/gausskernel/ddes/adapter/ss_sync_auxiliary.cpp new file mode 100644 index 0000000000000000000000000000000000000000..268594710c58f4e362de72bed9c45b45ed3f4a25 --- /dev/null +++ b/src/gausskernel/ddes/adapter/ss_sync_auxiliary.cpp @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * --------------------------------------------------------------------------------------- + * + * ss_sync_auxiliary.cpp + * + * + * + * IDENTIFICATION + * src/gausskernel/ddes/adapter/ss_sync_auxiliary.cpp + * + * --------------------------------------------------------------------------------------- + */ + +#include "ddes/dms/ss_sync_auxiliary.h" +#include "ddes/dms/ss_transaction.h" +#include "postgres.h" +#include "storage/procarray.h" +#include "storage/ipc.h" + +#define SYNC_AUXILIARY_SLEEP_TIME (60*60*1000) // 1h 60*60*1000ms + +static void sync_sighup_handler(SIGNAL_ARGS) +{ + if (SS_PRIMARY_MODE) { + SSBroadcastSyncGUC(); + } +} + +static void sync_auxiliary_request_shutdown_handler(SIGNAL_ARGS) +{ + int save_errno = errno; + + t_thrd.sync_auxiliary_cxt.shutdown_requested = true; + if (t_thrd.proc) { + SetLatch(&t_thrd.proc->procLatch); + } + errno = save_errno; +} + +static void sync_auxiliary_siguser1_handler(SIGNAL_ARGS) +{ + int save_errno = errno; + latch_sigusr1_handler(); + errno = save_errno; +} + +static void SetupSyncAuxiliarySignalHook(void) +{ + /* + * Reset some signals that are accepted by postmaster but not here + */ + (void)gspqsignal(SIGHUP, sync_sighup_handler); + (void)gspqsignal(SIGINT, sync_auxiliary_request_shutdown_handler); + (void)gspqsignal(SIGTERM, sync_auxiliary_request_shutdown_handler); + (void)gspqsignal(SIGQUIT, sync_auxiliary_request_shutdown_handler); /* hard crash time */ + (void)gspqsignal(SIGALRM, SIG_IGN); + (void)gspqsignal(SIGPIPE, SIG_IGN); + (void)gspqsignal(SIGUSR1, sync_auxiliary_siguser1_handler); + (void)gspqsignal(SIGUSR2, SIG_IGN); + (void)gspqsignal(SIGURG, print_stack); + /* + * Reset some signals that are accepted by postmaster but not here + */ + (void)gspqsignal(SIGCHLD, SIG_DFL); + (void)gspqsignal(SIGTTIN, SIG_DFL); + (void)gspqsignal(SIGTTOU, SIG_DFL); + (void)gspqsignal(SIGCONT, SIG_DFL); + (void)gspqsignal(SIGWINCH, SIG_DFL); +} + +void sync_auxiliary_handle_exception() +{ + /* Since not using PG_TRY, must reset error stack by hand */ + t_thrd.log_cxt.error_context_stack = NULL; + t_thrd.log_cxt.call_stack = NULL; + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + if (hash_get_seq_num() > 0) { + release_all_seq_scan(); + } + + LWLockReleaseAll(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + + /* + * Sleep at least 1 second after any error. A write error is likely + * to be repeated, and we don't want to be filling the error logs as + * fast as we can. + */ + pg_usleep(1000000L); + return; +} + +void SyncAuxiliaryMain(void) +{ + sigjmp_buf localSigjmpBuf; + t_thrd.role = SYNCINFO_THREAD; + SetupSyncAuxiliarySignalHook(); + (void)sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT); + + if (sigsetjmp(localSigjmpBuf, 1) != 0) { + ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] sync auxiliary thread exception occured."))); + sync_auxiliary_handle_exception(); + } + + /* we can now handle ereport(ERROR) */ + t_thrd.log_cxt.PG_exception_stack = &localSigjmpBuf; + + gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL); + (void)gs_signal_unblock_sigusr2(); + + if (SS_STANDBY_MODE) { + /* the standby node restarts or expands capacity */ + if (gs_signal_send(PostmasterPid, SIGHUP) != 0) { + ereport(WARNING, (errmsg("[SS] failed to send SIGHUP during startup"))); + } + } + + for (;;) { + if (t_thrd.sync_auxiliary_cxt.shutdown_requested) { + u_sess->attr.attr_common.ExitOnAnyError = true; + proc_exit(0); + } + + if (g_instance.dms_cxt.dms_status < DMS_STATUS_IN || SS_IN_REFORM) { + pg_usleep(SS_REFORM_WAIT_TIME); + continue; + } + + int rc = WaitLatch(&t_thrd.proc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, SYNC_AUXILIARY_SLEEP_TIME); + if (rc & WL_POSTMASTER_DEATH) { + gs_thread_exit(1); + } + + /* the primary node notify PM to synchronize */ + if (SS_PRIMARY_MODE) { + if (gs_signal_send(PostmasterPid, SIGHUP) != 0) { + ereport(WARNING, (errmsg("[SS] send SIGHUP to PM failed when time is up"))); + } + } + } +} \ No newline at end of file diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index daee52d0a32b7e5fe2d705d59285aaf6029b7ac0..ac8776b88e3a2848530f3eecfe327b78d723a673 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -40,6 +40,7 @@ static inline void txnstatusNetworkStats(uint64 timeDiff); static inline void txnstatusHashStats(uint64 timeDiff); +#define SYNC_TRY_COUNT 3 #define TxnStatusCalcStats(startTime, endTime, timeDiff, isHash) \ do { \ (void)INSTR_TIME_SET_CURRENT(endTime); \ @@ -1287,4 +1288,52 @@ int SSGetStandbyRealtimeBuildPtr(char* data, uint32 len) } } return DMS_SUCCESS; +} + +/* broadcast to standby node synchronization GUC */ +void SSBroadcastSyncGUC() +{ + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + int ret; + int count = 0; + SSBroadcastSyncGUCst syncGUC; + syncGUC.type = BCAST_CONFIG_SYNC; + dms_broadcast_info_t dms_broad_info = { + .data = (char *)&syncGUC, + .len = sizeof(SSBroadcastSyncGUCst), + .output = NULL, + .output_len = NULL, + .scope = DMS_BROADCAST_ONLINE_LIST, + .inst_map = 0, + .timeout = SS_BROADCAST_WAIT_FIVE_SECONDS, + .handle_recv_msg = (unsigned char)false, + .check_session_kill = (unsigned char)true + }; + + do { + count++; + ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info); + if (ret == DMS_SUCCESS) { + break; + } + pg_usleep(USECS_PER_SEC); + } while (ret != DMS_SUCCESS && count <= SYNC_TRY_COUNT); + if (ret == DMS_SUCCESS) { + ereport(LOG, (errmsg("notify standby node synchronization GUC success"))); + } +} + +int SSUpdateLocalConfFile(char* data, uint32 len) +{ + if (SS_PRIMARY_MODE) { + return DMS_SUCCESS; + } + + /* notify postmaster load the shared config file */ + if (gs_signal_send(PostmasterPid, SIGHUP) != 0) { + ereport(WARNING, (errmsg("send SIGHUP to PM failed"))); + return DMS_ERROR; + } + return DMS_SUCCESS; } \ No newline at end of file diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 3c850d18e011dbeca44ceb4b4554dfd179609a55..1b598c2e3bd7ad89e255667e9aacc314f5db1257 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -276,6 +276,7 @@ #include "ddes/dms/ss_reform_common.h" #include "ddes/dms/ss_dms_auxiliary.h" #include "storage/gs_uwal/gs_uwal.h" +#include "ddes/dms/ss_sync_auxiliary.h" #include "access/datavec/utils.h" #ifdef ENABLE_UT @@ -3117,6 +3118,10 @@ int PostmasterMain(int argc, char* argv[]) } } + if (ENABLE_DMS && ENABLE_DSS) { + g_instance.pid_cxt.SyncAuxiliaryPID = initialize_util_thread(SYNCINFO_THREAD); + } + /* init uwal */ if (g_instance.attr.attr_storage.enable_uwal) { int ret = GsUwalInit(t_thrd.postmaster_cxt.HaShmData->current_mode); @@ -3917,7 +3922,7 @@ static int ServerLoop(void) g_threadPoolControler->Init(enableNumaDistribute); } ereport(LOG, (errmsg("create thread end!"))); - + /* Only after postmaster_main thread starting completed, can reload listen_addresses */ t_thrd.postmaster_cxt.can_listen_addresses_reload = true; for (;;) { @@ -5889,6 +5894,7 @@ static void SIGHUP_handler(SIGNAL_ARGS) ereport(LOG, (errmsg("we try get lock times:%d", j))); } + sync_config_file(); ProcessConfigFile(PGC_SIGHUP); release_file_lock(&filelock); LWLockRelease(ConfigFileLock); @@ -5900,6 +5906,10 @@ static void SIGHUP_handler(SIGNAL_ARGS) g_threadPoolControler->GetScheduler()->SigHupHandler(); } + if (g_instance.pid_cxt.SyncAuxiliaryPID != 0) { + signal_child(g_instance.pid_cxt.SyncAuxiliaryPID, SIGHUP); + } + if (g_instance.pid_cxt.StartupPID != 0) signal_child(g_instance.pid_cxt.StartupPID, SIGHUP); @@ -13818,6 +13828,9 @@ static void SetAuxType() case DMS_AUXILIARY_THREAD: t_thrd.bootstrap_cxt.MyAuxProcType = DmsAuxiliaryProcess; break; + case SYNCINFO_THREAD: + t_thrd.bootstrap_cxt.MyAuxProcType = SyncAuxiliaryProcess; + break; default: ereport(ERROR, (errmsg("unrecorgnized proc type %d", thread_role))); } @@ -14141,6 +14154,10 @@ int GaussDbAuxiliaryThreadMain(knl_thread_arg* arg) DmsAuxiliaryMain(); proc_exit(1); break; + case SYNCINFO_THREAD: + SyncAuxiliaryMain(); + proc_exit(1); + break; default: ereport(PANIC, (errmsg("unrecognized process type: %d", (int)t_thrd.bootstrap_cxt.MyAuxProcType))); proc_exit(1); @@ -14397,6 +14414,7 @@ int GaussDbThreadMain(knl_thread_arg* arg) case THREADPOOL_LISTENER: case THREADPOOL_SCHEDULER: case DMS_AUXILIARY_THREAD: + case SYNCINFO_THREAD: case UNDO_RECYCLER: { SetAuxType(); /* Restore basic shared memory pointers */ @@ -14963,6 +14981,7 @@ static ThreadMetaData GaussdbThreadGate[] = { #ifdef USE_SPQ { GaussDbThreadMain, SPQ_COORDINATOR, "spqcoordinator", "QC node coordinating thread" }, #endif + { GaussDbThreadMain, SYNCINFO_THREAD, "sync_auxiliary", "synchronize configuration files" }, { GaussDbThreadMain, DMS_AUXILIARY_THREAD, "dms_auxiliary", "maintenance xmin in dms" }, { GaussDbThreadMain, EXRTO_RECYCLER, "exrtorecycler", "exrto recycler" }, { GaussDbThreadMain, BARRIER_PREPARSE, "barrierpreparse", "barrier preparse backend" }, diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 132c7b2063747c4a38bd9be14b6f5a49674c041e..d2f0f2c2c4dc33b8b7fc6f27fa530ca5f006d5f3 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -11052,6 +11052,16 @@ void StartupXLOG(void) LWLockRelease(ProcArrayLock); } + /* notify the PM to synchronize */ + if (SS_IN_REFORM && !SS_PRIMARY_DEMOTING && ENABLE_DMS && ENABLE_DSS && SS_PRIMARY_MODE) { + if (SS_STANDBY_PROMOTING || SS_STANDBY_FAILOVER) { + g_instance.dms_cxt.SSReformInfo.needSyncConfig = true; + } + if (gs_signal_send(PostmasterPid, SIGHUP) != 0) { + ereport(WARNING, (errmsg("[SS Reform]send SIGHUP to PM failed when reform is ending"))); + } + } + if (SS_PERFORMING_SWITCHOVER && g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_PROMOTING) { ereport(LOG, (errmsg("[SS switchover] Standby promote: StartupXLOG finished, promote success"))); Assert(g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_PROMOTING); diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index 60364cb35196276b446388b8db4a9d115a7d2af3..9b5a77356ce68f98d41ebf6ebe1db3025560661f 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -179,7 +179,19 @@ const char *g_reserve_param[] = { "uwal_rpc_flowcontrol_switch", "uwal_rpc_flowcontrol_value", "uwal_truncate_interval", - "uwal_async_append_switch" + "uwal_async_append_switch", + "ss_enable_dss", + "ss_enable_dms", + "ss_enable_catalog_centralized", + "ss_instance_id", + "ss_dss_vg_name", + "ss_dss_conn_path", + "ss_rdma_work_config", + "ss_ock_log_path", + "ss_scrlock_server_port", + "ss_enable_ondemand_recovery", + "ss_enable_ondemand_realtime_build", + "ss_disaster_mode" }; const int g_reserve_param_num = lengthof(g_reserve_param); diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index afb91709244eab2acab3b55fe0d958a0d9fdc12f..58b56b9b630a73d940745c6846e8c0ed55828a7f 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -186,6 +186,7 @@ typedef enum SSBroadcastOp { BCAST_RELOAD_REFORM_CTRL_PAGE, BCAST_REALTIME_BUILD_LOG_CTRL_ENABLE, BCAST_REPORT_REALTIME_BUILD_PTR, + BCAST_CONFIG_SYNC, BCAST_END } SSBroadcastOp; diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index a42993677708e5f25e89935d5b91a369fbc54841..fb99884db23b92b4b4d9f4ec19cd8f8705a1df05 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -98,6 +98,7 @@ typedef struct st_reform_info { TimestampTz reform_ver; TimestampTz reform_ver_startup_wait; bool switchover_demote_failure_signal_handled; + bool needSyncConfig; } ss_reform_info_t; typedef enum st_reform_ckpt_status { diff --git a/src/include/ddes/dms/ss_sync_auxiliary.h b/src/include/ddes/dms/ss_sync_auxiliary.h new file mode 100644 index 0000000000000000000000000000000000000000..8050063ce6c48f2c84d551f67e26ea78a18b2f85 --- /dev/null +++ b/src/include/ddes/dms/ss_sync_auxiliary.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * --------------------------------------------------------------------------------------- + * + * ss_sync_auxiliary.h + * include header file for synchronization information + * + * + * IDENTIFICATION + * src/include/ddes/dms/ss_sync_auxiliary.h + * + * --------------------------------------------------------------------------------------- + */ + + +#ifndef __SS_SYNC_AUXILIARY_H__ +#define __SS_SYNC_AUXILIARY_H__ + +void SyncAuxiliaryMain(void); + +#endif \ No newline at end of file diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index 5e30274207d1b8ca1898ab2abd6cb7e3219c8696..f202d10e003bd8cdf7d5dfb889c2d43aa97b3eaf 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -105,6 +105,10 @@ typedef struct SSBroadcastRealtimeBuildPtr { int srcInstId; } SSBroadcastRealtimeBuildPtr; +typedef struct SSBroadcastSyncGUCst { + SSBroadcastOp type; // must be first +} SSBroadcastSyncGUCst; + Snapshot SSGetSnapshotData(Snapshot snapshot); CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCommit, bool isMvcc, bool isNest, Snapshot snapshot, bool* sync); @@ -114,6 +118,7 @@ TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, u bool SSGetOldestXminFromAllStandby(TransactionId xmin, TransactionId xmax, CommitSeqNo csn); void SSBroadcastRealtimeBuildLogCtrlEnable(bool canncelInReform); bool SSReportRealtimeBuildPtr(XLogRecPtr realtimeBuildPtr); +void SSBroadcastSyncGUC(); int SSGetOldestXmin(char *data, uint32 len, char *output_msg, uint32 *output_msg_len); int SSGetOldestXminAck(SSBroadcastXminAck *ack_data); void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *pageMap, int *bitCount); @@ -140,5 +145,5 @@ void SSRequestAllStandbyReloadReformCtrlPage(); bool SSCanFetchLocalSnapshotTxnRelatedInfo(); int SSUpdateRealtimeBuildLogCtrl(char* data, uint32 len); int SSGetStandbyRealtimeBuildPtr(char* data, uint32 len); - +int SSUpdateLocalConfFile(char* data, uint32 len); #endif diff --git a/src/include/gs_thread.h b/src/include/gs_thread.h index f38614c06591b9c8007a88bbea6c7e7c31b0f3cb..e046ec38fe7fb2f5bfe424079dcfe2b5c33bd395 100755 --- a/src/include/gs_thread.h +++ b/src/include/gs_thread.h @@ -131,6 +131,7 @@ typedef enum knl_thread_role { #ifdef USE_SPQ SPQ_COORDINATOR, #endif + SYNCINFO_THREAD, DMS_AUXILIARY_THREAD, EXRTO_RECYCLER, BARRIER_PREPARSE, diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index e8876127e52c59bc0393bd326935ecf3986a8dac..1ad9b803fbc8ed7a41bc52740b759f6775297ec9 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -230,6 +230,7 @@ typedef struct knl_g_pid_context { ThreadId StackPerfPID; ThreadId CfsShrinkerPID; ThreadId DmsAuxiliaryPID; + ThreadId SyncAuxiliaryPID; #ifdef ENABLE_HTAP ThreadId IMCStoreVacuumPID; #endif @@ -1345,6 +1346,8 @@ typedef struct knl_g_datadir_context { char controlPath[MAXPGPATH]; char controlBakPath[MAXPGPATH]; char controlInfoPath[MAXPGPATH]; + char configFilePath[MAXPGPATH]; + char hbaConfigFilePath[MAXPGPATH]; knl_g_dwsubdatadir_context dw_subdir_cxt; } knl_g_datadir_context; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 8c73678d7b6cce006725fd079d00d6eaa56b2270..5f15d46c8f65645df38a257a75ae2fc1ab46ba79 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3465,6 +3465,10 @@ typedef struct knl_t_dms_auxiliary_context { volatile sig_atomic_t shutdown_requested; } knl_t_dms_auxiliary_context; +typedef struct knl_t_sync_auxiliary_context { + volatile sig_atomic_t shutdown_requested; +} knl_t_sync_auxiliary_context; + /* * in_progress_list is a stack of ongoing RelationBuildDesc() calls. CREATE * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock. @@ -3645,6 +3649,7 @@ typedef struct knl_thrd_context { knl_t_spq_context spq_ctx; #endif knl_t_dms_auxiliary_context dms_aux_cxt; + knl_t_sync_auxiliary_context sync_auxiliary_cxt; knl_t_invalidation_message_context inval_msg_cxt; } knl_thrd_context; diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index b65e0f4501d6b0dc7a5034012d37ad16b6a8bcf0..27d577258f78facd485981dab203ecb0eb994caf 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -632,6 +632,7 @@ typedef enum { DmsAuxiliaryProcess, ExrtoRecyclerProcess, NUM_SINGLE_AUX_PROC, /* Sentry for auxiliary type with single thread. */ + SyncAuxiliaryProcess, /* * If anyone want add a new auxiliary thread type, and will create several diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 14e823061dff65332e24154710109115981a243e..c884de5a0ba2982d8565653a10a32b3e22007ea6 100755 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -496,12 +496,15 @@ typedef struct { #define PG_LOCKFILE_SIZE 1024 #define CONFIG_BAK_FILENAME_WAL "postgresql.conf.wal" +#define CONFIG_BAK_FILENAME_PM "postgresql.conf.pm" +#define HBA_BAK_FILENAME_PM "pg_hba.conf.pm" extern void* pg_malloc(size_t size); extern char* xstrdup(const char* s); extern char** read_guc_file(const char* path); extern ErrCode write_guc_file(const char* path, char** lines); +extern void sync_config_file(); extern int find_guc_option(char** optlines, const char* opt_name, int* name_offset, int* name_len, int* value_offset, int* value_len, bool ignore_case); diff --git a/src/test/regress/expected/ss_r/ss_standby_support_write.out b/src/test/regress/expected/ss_r/ss_standby_support_write.out index 147e6c6eda2330e2c8a90f59fe07ffdd50fc60e9..ff071671d4aae51b6fb0cbf7f555d19bdefc60e0 100644 --- a/src/test/regress/expected/ss_r/ss_standby_support_write.out +++ b/src/test/regress/expected/ss_r/ss_standby_support_write.out @@ -202,7 +202,7 @@ call func_multi_node_write_0032('t_multi_node_write_0032'); ERROR: function "func_multi_node_write_0032" doesn't exist commit; select * from t_multi_node_write_0032; -ERROR: relation "t_multi_node_write_0032" does not exist on datanode2 +ERROR: relation "t_multi_node_write_0032" does not exist on datanode1 LINE 1: select * from t_multi_node_write_0032; ^ create or replace function func_multi_node_write_0032(tb_name varchar) return varchar is