diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index 4a47e1942525bb0260055fc6c9b361976de3f220..6af0f71fd295a52da2965ae9fbae7f9adea3abda 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -56,7 +56,7 @@ ifneq "$(MAKECMDGOALS)" "clean" endif OBJS= pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \ pg_backup_null.o pg_backup_tar.o \ - pg_backup_directory.o dumpmem.o dumputils.o compress_io.o common_cipher.o $(WIN32RES) + parallel.o pg_backup_directory.o dumpmem.o dumputils.o compress_io.o common_cipher.o $(WIN32RES) LIBS += -lcjson -lcurl -lz ifeq ($(enable_lite_mode), no) diff --git a/src/bin/pg_dump/compress_io.cpp b/src/bin/pg_dump/compress_io.cpp index 560d74fd61ba6735ee9d4ae3d1d99e40e8a9b449..16b2c83dd6d24bbf71053e0409a4ca710f1a441e 100644 --- a/src/bin/pg_dump/compress_io.cpp +++ b/src/bin/pg_dump/compress_io.cpp @@ -54,6 +54,7 @@ #include "compress_io.h" #include "dumpmem.h" +#include #ifdef GAUSS_SFT_TEST #include "gauss_sft.h" @@ -413,6 +414,7 @@ struct cfp { #ifdef HAVE_LIBZ gzFile compressedfp; #endif + FILE* lock; }; #ifdef HAVE_LIBZ @@ -503,6 +505,30 @@ cfp* cfopen_write(const char* path, const char* mode, int compression) return fp; } +cfp* cfopen_write4Lock(const char* path, const char* mode, int compression) +{ + cfp* fp = NULL; + + if (compression == 0) + fp = cfopen4Lock(path, mode, 0); + else { +#ifdef HAVE_LIBZ + int fnamelen = strlen(path) + 4; + char* fname = (char*)pg_malloc(fnamelen); + int nRet = 0; + + nRet = snprintf_s(fname, fnamelen, fnamelen - 1, "%s%s", path, ".gz"); + securec_check_ss_c(nRet, fname, "\0"); + fp = cfopen4Lock(fname, mode, compression); + free_keep_errno(fname); + fname = NULL; +#else + fp = NULL; /* keep compiler quiet */ + exit_horribly(modulename, "not built with zlib support\n"); +#endif + } + return fp; +} /* * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file * is opened with libz gzopen(), otherwise with plain fopen(). @@ -553,6 +579,54 @@ cfp* cfopen(const char* path, const char* mode, int compression) return fp; } +cfp* cfopen4Lock(const char* path, const char* mode, int compression) { + cfp* fp = (cfp*)pg_malloc(sizeof(cfp)); + + if (compression != 0) { +#ifdef HAVE_LIBZ + char mode_compression[32] = {0}; + int nRet = 0; + + nRet = snprintf_s(mode_compression, + sizeof(mode_compression) / sizeof(char), + sizeof(mode_compression) / sizeof(char) - 1, + "%s%d", + mode, + compression); + securec_check_ss_c(nRet, "\0", "\0"); + fp->lock = fopen(path, mode); + if (fp->lock == NULL) { + free_keep_errno(fp); + fp = NULL; + } + fp->compressedfp = gzdopen(fileno(fp->lock), mode_compression); + fp->uncompressedfp = NULL; + if (fp->compressedfp == NULL) { + free_keep_errno(fp); + fp = NULL; + } +#else + exit_horribly(modulename, "not built with zlib support\n"); +#endif + } else { +#ifdef HAVE_LIBZ + fp->compressedfp = NULL; +#endif + fp->uncompressedfp = fopen(path, mode); + if (fp->uncompressedfp == NULL) { + free_keep_errno(fp); + fp = NULL; + } + fp->lock = fp->uncompressedfp; + } + + /* update file permission */ + if (chmod(path, FILE_PERMISSION) == -1) { + exit_horribly(modulename, "changing permissions for file \"%s\" failed with: %s\n", path, strerror(errno)); + } + + return fp; +} int cfread(void* ptr, int size, cfp* fp) { size_t read_len; @@ -575,6 +649,24 @@ int cfwrite(const void* ptr, int size, cfp* fp) return fwrite(ptr, 1, size, fp->uncompressedfp); } +int cfwriteWithLock(const void* ptr, int size, cfp* fp) +{ + int rc = 0; +#ifdef HAVE_LIBZ + if (fp->compressedfp != NULL) { + flock(fileno(fp->lock), LOCK_EX); + rc = gzwrite(fp->compressedfp, ptr, size); + flock(fileno(fp->lock), LOCK_UN); + return rc; + } + else +#endif + flock(fileno(fp->lock), LOCK_EX); + rc = fwrite(ptr, 1, size, fp->uncompressedfp); + flock(fileno(fp->lock), LOCK_UN); + return rc; +} + int cfgetc(cfp* fp) { #ifdef HAVE_LIBZ diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 105abb10ff00763db427b7c09e5e55ff934ad5a2..2ccfe525a13edc0d348c78eda2b4bb3240110091 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -51,10 +51,13 @@ extern void EndCompressor(ArchiveHandle* AH, CompressorState* cs); typedef struct cfp cfp; extern cfp* cfopen(const char* path, const char* mode, int compression); +extern cfp* cfopen4Lock(const char* path, const char* mode, int compression); extern cfp* cfopen_read(const char* path, const char* mode); extern cfp* cfopen_write(const char* path, const char* mode, int compression); +extern cfp* cfopen_write4Lock(const char* path, const char* mode, int compression); extern int cfread(void* ptr, int size, cfp* fp); extern int cfwrite(const void* ptr, int size, cfp* fp); +extern int cfwriteWithLock(const void* ptr, int size, cfp* fp); extern int cfgetc(cfp* fp); extern char* cfgets(cfp* fp, char* buf, int len); extern int cfclose(cfp* fp); diff --git a/src/bin/pg_dump/dumpmem.cpp b/src/bin/pg_dump/dumpmem.cpp index 042017497e07ff9ded3772df3863523a58738be0..7d617cb761e859c3b8a12f6a173c00dc3871aa71 100644 --- a/src/bin/pg_dump/dumpmem.cpp +++ b/src/bin/pg_dump/dumpmem.cpp @@ -16,6 +16,9 @@ #include "postgres_fe.h" #include "dumputils.h" #include "dumpmem.h" +#include "securec.h" +#include "securec_check.h" +#include #ifdef GAUSS_SFT_TEST #include "gauss_sft.h" @@ -51,6 +54,19 @@ void* pg_malloc(size_t size) return tmp; } +void* pg_malloc_zero(size_t size) +{ + void* tmp = NULL; + errno_t rc = 0; + + tmp = pg_malloc(size); + if (NULL == tmp) + exit_horribly(NULL, "out of memory\n"); + rc = memset_s(tmp, size, 0, size); + securec_check_c((rc), "", ""); + return tmp; +} + void* pg_calloc(size_t nmemb, size_t size) { void* tmp = NULL; @@ -75,3 +91,11 @@ void* pg_realloc(void* ptr, size_t size) exit_horribly(NULL, "out of memory\n"); return tmp; } + +void* pg_free(void* ptr) +{ + if (ptr != NULL) { + free(ptr); + ptr = NULL; + } +} \ No newline at end of file diff --git a/src/bin/pg_dump/dumpmem.h b/src/bin/pg_dump/dumpmem.h index 1340c01aec4890e1d807b1c709bcfe6f721834f3..af8b8bb807c85928b388ee89af1f9788126821c5 100644 --- a/src/bin/pg_dump/dumpmem.h +++ b/src/bin/pg_dump/dumpmem.h @@ -16,8 +16,10 @@ extern char* gs_strdup(const char* string); extern void* pg_malloc(size_t size); +extern void* pg_malloc_zero(size_t size); extern void* pg_calloc(size_t nmemb, size_t size); extern void* pg_realloc(void* ptr, size_t size); +extern void* pg_free(void* pointer); #endif /* DUMPMEM_H */ diff --git a/src/bin/pg_dump/dumputils.h b/src/bin/pg_dump/dumputils.h index 135b955a75704c4d4c6932780b407f0c5c94f306..48665fed5d7113da6bcb0cf3792b06e0710450dc 100644 --- a/src/bin/pg_dump/dumputils.h +++ b/src/bin/pg_dump/dumputils.h @@ -85,7 +85,7 @@ typedef enum /* bits returned by set_dump_section */ ptr = NULL; \ } \ } while (0) - +#define fatal(...) do { write_msg(__VA_ARGS__); exit_nicely(1); } while(0) typedef void (*on_exit_nicely_callback)(int code, void* arg); extern int quote_all_identifiers; diff --git a/src/bin/pg_dump/nls.mk b/src/bin/pg_dump/nls.mk index 0b679b77a69f18f4c537b76b282b0603904b1c10..7d80bc0dba3e878844a46bdf4529a4d4190717cc 100644 --- a/src/bin/pg_dump/nls.mk +++ b/src/bin/pg_dump/nls.mk @@ -2,7 +2,7 @@ CATALOG_NAME = pg_dump AVAIL_LANGUAGES = cs de es fr it ja pl pt_BR ru zh_CN GETTEXT_FILES = pg_backup_archiver.cpp pg_backup_db.cpp pg_backup_custom.cpp \ - pg_backup_null.cpp pg_backup_tar.cpp \ + pg_backup_null.cpp pg_backup_tar.cpp pg_backup_parallel.cpp\ pg_backup_directory.cpp dumpmem.cpp dumputils.cpp compress_io.cpp \ pg_dump.cpp common.cpp pg_dump_sort.cpp \ pg_restore.cpp pg_dumpall.cpp \ diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c new file mode 100644 index 0000000000000000000000000000000000000000..ed037c03a7681034bf07aca65a1e3c45334c12d5 --- /dev/null +++ b/src/bin/pg_dump/parallel.c @@ -0,0 +1,1225 @@ +/*------------------------------------------------------------------------- + * + * parallel.c + * + * Parallel support for pg_dump and pg_restore + * + * IDENTIFICATION + * src/bin/pg_dump/parallel.c + * + *------------------------------------------------------------------------- + */ + +/* + * Parallel operation works like this: + * + * The original, leader process calls ParallelBackupStart(), which forks off + * the desired number of worker processes, which each enter WaitForCommands(). + * + * The leader process dispatches an individual work item to one of the worker + * processes in DispatchJobForTocEntry(). We send a command string such as + * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID. + * The worker process receives and decodes the command and passes it to the + * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, + * which are routines of the current archive format. That routine performs + * the required action (dump or restore) and returns an integer status code. + * This is passed back to the leader where we pass it to the + * ParallelCompletionPtr callback function that was passed to + * DispatchJobForTocEntry(). The callback function does state updating + * for the leader control logic in pg_backup_archiver.c. + * + * In principle additional archive-format-specific information might be needed + * in commands or worker status responses, but so far that hasn't proved + * necessary, since workers have full copies of the ArchiveHandle/TocEntry + * data structures. Remember that we have forked off the workers only after + * we have read in the catalog. That's why our worker processes can also + * access the catalog information. (In the Windows case, the workers are + * threads in the same process. To avoid problems, they work with cloned + * copies of the Archive data structure; see RunWorker().) + * + * In the leader process, the workerStatus field for each worker has one of + * the following values: + * WRKR_NOT_STARTED: we've not yet forked this worker + * WRKR_IDLE: it's waiting for a command + * WRKR_WORKING: it's working on a command + * WRKR_TERMINATED: process ended + * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING + * state, and must be NULL in other states. + */ +#include "postgres_fe.h" +#include "dumpmem.h" +#include "parallel.h" +#include "stdio.h" +#include +#include +#include +#include "libpq/pqsignal.h" +#include "bin/elog.h" + +/* Mnemonic macros for indexing the fd array returned by pipe(2) */ +#define PIPE_READ 0 +#define PIPE_WRITE 1 + +#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ + +/* Worker process statuses */ +typedef enum +{ + WRKR_NOT_STARTED = 0, + WRKR_IDLE, + WRKR_WORKING, + WRKR_TERMINATED +} T_WorkerStatus; + +#define WORKER_IS_RUNNING(workerStatus) \ + ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING) + +#define messageStartsWith(msg, prefix) \ + (strncmp(msg, prefix, strlen(prefix)) == 0) + +/* + * Private per-parallel-worker state (typedef for this is in parallel.h). + * + * Much of this is valid only in the leader process (or, on Windows, should + * be touched only by the leader thread). But the AH field should be touched + * only by workers. The pipe descriptors are valid everywhere. + */ +struct ParallelSlot { + T_WorkerStatus workerStatus; /* see enum above */ + int pipeRead; /* leader's end of the pipes */ + int pipeWrite; + int pipeRevRead; /* child's end of the pipes */ + int pipeRevWrite; + + ArchiveHandle *AH; /* Archive data worker is using */ + + /* Child process/thread identity info: */ + pid_t pid; +}; + +/* + * State info for archive_close_connection() shutdown callback. + */ +typedef struct ShutdownInformation +{ + ParallelState *pstate; + Archive *AHX; +} ShutdownInformation; + +static ShutdownInformation shutdown_info; + +/* + * State info for signal handling. + * We assume signal_info initializes to zeroes. + * + * On Unix, myAH is the leader DB connection in the leader process, and the + * worker's own connection in worker processes. On Windows, we have only one + * instance of signal_info, so myAH is the leader connection and the worker + * connections must be dug out of pstate->parallelSlot[]. + */ +typedef struct DumpSignalInformation +{ + ArchiveHandle *myAH; /* database connection to issue cancel for */ + ParallelState *pstate; /* parallel state, if any */ + bool handler_set; /* signal handler set up in this process? */ +#ifndef WIN32 + bool am_worker; /* am I a worker process? */ +#endif +} DumpSignalInformation; + +static volatile DumpSignalInformation signal_info; + +static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); +static void sendMessageToPipe(int fd, const char *str); +static char *readMessageFromPipe(int fd); +static char *getMessageFromLeader(int pipefd[2]); +static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot); +static void buildWorkerCommand(TocEntry *te, T_Action act, char *buf, int buflen); +static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg); +static int GetIdleWorker(ParallelState *pstate); +static bool ListenToWorkers(ParallelState *pstate, bool do_wait); +static char *getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker); +static int select_loop(int maxFd, fd_set *workerset); +static bool HasEveryWorkerTerminated(ParallelState *pstate); +static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); +static ParallelSlot *GetMyPSlot(ParallelState *pstate); +static void ShutdownWorkersHard(ParallelState *pstate); +static void WaitForTerminatingWorkers(ParallelState *pstate); +static void sigTermHandler(SIGNAL_ARGS); +static void setup_cancel_handler(void); +static const char *fmtQualifiedId(const char *schema, const char *id); +static void archive_close_connection(int code, void *arg); + +/* + * fmtQualifiedId - construct a schema-qualified name, with quoting as needed. + * + * Like fmtId, use the result before calling again. + * + * Since we call fmtId and it also uses getLocalPQExpBuffer() we cannot + * use that buffer until we're finished with calling fmtId(). + */ +static const char * +fmtQualifiedId(const char *schema, const char *id) +{ + static PQExpBuffer id_return = NULL; + + if (id_return != NULL) /* first time through? */ + resetPQExpBuffer(id_return); + else + id_return = createPQExpBuffer(); + + /* Suppress schema name if fetching from pre-7.3 DB */ + if (schema != NULL && *schema) { + appendPQExpBuffer(id_return, "%s.", fmtId(schema)); + } + appendPQExpBuffer(id_return, "%s", fmtId(id)); + + return id_return->data; +} +/* + * Find the ParallelSlot for the current worker process or thread. + * + * Returns NULL if no matching slot is found (this implies we're the leader). + */ +static ParallelSlot * +GetMyPSlot(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->workerNum; i++) + { + if (pstate->parallelSlot[i].pid == getpid()) + return &(pstate->parallelSlot[i]); + } + + return NULL; +} + +/* + * pg_dump and pg_restore call this to register the cleanup handler + * as soon as they've created the ArchiveHandle. + */ +// void +// on_exit_close_archive(Archive *AHX) +// { +// shutdown_info.AHX = AHX; +// on_exit_nicely(archive_close_connection, &shutdown_info); +// } + +/* + * on_exit_nicely handler for shutting down database connections and + * worker processes cleanly. + */ +static void +archive_close_connection(int code, void *arg) +{ + ShutdownInformation *si = (ShutdownInformation *) arg; + + if (si->pstate) + { + /* In parallel mode, must figure out who we are */ + ParallelSlot *slot = GetMyPSlot(si->pstate); + + if (!slot) + { + /* + * We're the leader. Forcibly shut down workers, then close our + * own database connection, if any. + */ + ShutdownWorkersHard(si->pstate); + + if (si->AHX) + DisconnectDatabase(si->AHX); + } + else + { + /* + * We're a worker. Shut down our own DB connection if any. On + * Windows, we also have to close our communication sockets, to + * emulate what will happen on Unix when the worker process exits. + * (Without this, if this is a premature exit, the leader would + * fail to detect it because there would be no EOF condition on + * the other end of the pipe.) + */ + if (slot->AH) + DisconnectDatabase(&(slot->AH->publicArc)); + } + } + else + { + /* Non-parallel operation: just kill the leader DB connection */ + if (si->AHX) + DisconnectDatabase(si->AHX); + } +} + +/* + * Forcibly shut down any remaining workers, waiting for them to finish. + * + * Note that we don't expect to come here during normal exit (the workers + * should be long gone, and the ParallelState too). We're only here in a + * fatal() situation, so intervening to cancel active commands is + * appropriate. + */ +static void +ShutdownWorkersHard(ParallelState *pstate) +{ + int i; + + /* + * Close our write end of the sockets so that any workers waiting for + * commands know they can exit. (Note: some of the pipeWrite fields might + * still be zero, if we failed to initialize all the workers. Hence, just + * ignore errors here.) + */ + for (i = 0; i < pstate->workerNum; i++) + closesocket(pstate->parallelSlot[i].pipeWrite); + + /* + * Force early termination of any commands currently in progress. + */ + /* On non-Windows, send SIGTERM to each worker process. */ + for (i = 0; i < pstate->workerNum; i++) + { + pid_t pid = pstate->parallelSlot[i].pid; + + if (pid != 0) + kill(pid, SIGTERM); + } + + /* Now wait for them to terminate. */ + WaitForTerminatingWorkers(pstate); +} +/* + * Wait for all workers to terminate. + */ +static void +WaitForTerminatingWorkers(ParallelState *pstate) +{ + while (!HasEveryWorkerTerminated(pstate)) + { + ParallelSlot *slot = NULL; + int j; + + /* On non-Windows, use wait() to wait for next worker to end */ + int status; + pid_t pid = wait(&status); + + /* Find dead worker's slot, and clear the PID field */ + for (j = 0; j < pstate->workerNum; j++) + { + slot = &(pstate->parallelSlot[j]); + if (slot->pid == pid) + { + slot->pid = 0; + break; + } + } + + /* On all platforms, update workerStatus and te[] as well */ + Assert(j < pstate->workerNum); + slot->workerStatus = WRKR_TERMINATED; + pstate->te[j] = NULL; + } +} + +/* + * Code for responding to cancel interrupts (SIGINT, control-C, etc) + * + * This doesn't quite belong in this module, but it needs access to the + * ParallelState data, so there's not really a better place either. + * + * When we get a cancel interrupt, we could just die, but in pg_restore that + * could leave a SQL command (e.g., CREATE INDEX on a large table) running + * for a long time. Instead, we try to send a cancel request and then die. + * pg_dump probably doesn't really need this, but we might as well use it + * there too. Note that sending the cancel directly from the signal handler + * is safe because PQcancel() is written to make it so. + * + * In parallel operation on Unix, each process is responsible for canceling + * its own connection (this must be so because nobody else has access to it). + * Furthermore, the leader process should attempt to forward its signal to + * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't + * needed because typing control-C at the console would deliver SIGINT to + * every member of the terminal process group --- but in other scenarios it + * might be that only the leader gets signaled. + * + * On Windows, the cancel handler runs in a separate thread, because that's + * how SetConsoleCtrlHandler works. We make it stop worker threads, send + * cancels on all active connections, and then return FALSE, which will allow + * the process to die. For safety's sake, we use a critical section to + * protect the PGcancel structures against being changed while the signal + * thread runs. + */ + +/* + * Signal handler (Unix only) + */ +static void +sigTermHandler(SIGNAL_ARGS) +{ + int i; + char errbuf[1]; + + /* + * Some platforms allow delivery of new signals to interrupt an active + * signal handler. That could muck up our attempt to send PQcancel, so + * disable the signals that setup_cancel_handler enabled. + */ + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGTERM, SIG_IGN); + pqsignal(SIGQUIT, SIG_IGN); + + /* + * If we're in the leader, forward signal to all workers. (It seems best + * to do this before PQcancel; killing the leader transaction will result + * in invalid-snapshot errors from active workers, which maybe we can + * quiet by killing workers first.) Ignore any errors. + */ + if (signal_info.pstate != NULL) + { + for (i = 0; i < signal_info.pstate->workerNum; i++) + { + pid_t pid = signal_info.pstate->parallelSlot[i].pid; + + if (pid != 0) + kill(pid, SIGTERM); + } + } + + /* + * Send QueryCancel if we have a connection to send to. Ignore errors, + * there's not much we can do about them anyway. + */ + if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) + (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf)); + + /* + * Report we're quitting, using nothing more complicated than write(2). + * When in parallel operation, only the leader process should do this. + */ + if (!signal_info.am_worker) + { + if (progname) + { + write_stderr(progname); + write_stderr(": "); + } + write_stderr("terminated by user\n"); + } + + /* + * And die, using _exit() not exit() because the later will invoke atexit + * handlers that can fail if we interrupted related code. + */ + _exit(1); +} + +/* + * Enable cancel interrupt handler, if not already done. + */ +static void +setup_cancel_handler(void) +{ + /* + * When forking, signal_info.handler_set will propagate into the new + * process, but that's fine because the signal handler state does too. + */ + if (!signal_info.handler_set) + { + signal_info.handler_set = true; + + pqsignal(SIGINT, sigTermHandler); + pqsignal(SIGTERM, sigTermHandler); + pqsignal(SIGQUIT, sigTermHandler); + } +} + +/* + * set_archive_cancel_info + * + * Fill AH->connCancel with cancellation info for the specified database + * connection; or clear it if conn is NULL. + */ +void +set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn) +{ + PGcancel *oldConnCancel; + + /* + * Activate the interrupt handler if we didn't yet in this process. On + * Windows, this also initializes signal_info_lock; therefore it's + * important that this happen at least once before we fork off any + * threads. + */ + setup_cancel_handler(); + + /* + * On Unix, we assume that storing a pointer value is atomic with respect + * to any possible signal interrupt. On Windows, use a critical section. + */ + + /* Free the old one if we have one */ + oldConnCancel = AH->connCancel; + /* be sure interrupt handler doesn't use pointer while freeing */ + AH->connCancel = NULL; + + if (oldConnCancel != NULL) + PQfreeCancel(oldConnCancel); + + /* Set the new one if specified */ + if (conn) + AH->connCancel = PQgetCancel(conn); + + /* + * On Unix, there's only ever one active ArchiveHandle per process, so we + * can just set signal_info.myAH unconditionally. On Windows, do that + * only in the main thread; worker threads have to make sure their + * ArchiveHandle appears in the pstate data, which is dealt with in + * RunWorker(). + */ + signal_info.myAH = AH; +} + +ParallelState * +ParallelBackupStart(ArchiveHandle *AH) +{ + ParallelState *pstate; + + Assert(AH->publicArc.workerNum > 1); + + pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); + + pstate->workerNum = AH->publicArc.workerNum; + + /* Create status arrays, being sure to initialize all fields to 0 */ + pstate->te = (TocEntry **)pg_malloc_zero(pstate->workerNum * sizeof(TocEntry *)); + pstate->parallelSlot = (ParallelSlot *) pg_malloc_zero(pstate->workerNum * sizeof(ParallelSlot)); + + /* + * Set the pstate in shutdown_info, to tell the exit handler that it must + * clean up workers as well as the main database connection. But we don't + * set this in signal_info yet, because we don't want child processes to + * inherit non-NULL signal_info.pstate. + */ + shutdown_info.pstate = pstate; + + /* + * Temporarily disable query cancellation on the leader connection. This + * ensures that child processes won't inherit valid AH->connCancel + * settings and thus won't try to issue cancels against the leader's + * connection. No harm is done if we fail while it's disabled, because + * the leader connection is idle at this point anyway. + */ + set_archive_cancel_info(AH, NULL); + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + + for (int i = 0; i < AH->publicArc.workerNum; i++) { + pid_t pid; + + /* instruct signal handler that we're in a worker now */ + signal_info.am_worker = true; + ParallelSlot *slot = &(pstate->parallelSlot[i]); + int pipeMW[2], // master -> worker + pipeWM[2]; + + /* Create communication pipes for this worker */ + if (pipe(pipeMW) < 0 || pipe(pipeWM) < 0) + exit_horribly(NULL, "could not create communication channels %m\n"); + + // leader 和 child 通过两个管道通信 + /* leader's ends of the pipes */ + slot->pipeRead = pipeWM[PIPE_READ]; + slot->pipeWrite = pipeMW[PIPE_WRITE]; + /* child's ends of the pipes */ + slot->pipeRevRead = pipeMW[PIPE_READ]; + slot->pipeRevWrite = pipeWM[PIPE_WRITE]; + + pid = fork(); + if (pid == 0) + { + /* we are the worker */ + /* close read end of Worker -> Leader */ + closesocket(pipeWM[PIPE_READ]); + /* close write end of Leader -> Worker */ + closesocket(pipeMW[PIPE_WRITE]); + + /* + * Close all inherited fds for communication of the leader with + * previously-forked workers. + */ + for (int j = 0; j < i; j++) + { + closesocket(pstate->parallelSlot[j].pipeRead); + closesocket(pstate->parallelSlot[j].pipeWrite); + } + + /* Run the worker ... */ + RunWorker(AH, slot); + + /* We can just exit(0) when done */ + exit(0); + } + else if (pid < 0) + { + /* fork failed */ + exit_horribly(NULL,"could not create worker process: \"%m\""); + } + + slot->pid = pid; + slot->workerStatus = WRKR_IDLE; + + /* close read end of Leader -> Worker */ + closesocket(pipeMW[PIPE_READ]); + /* close write end of Worker -> Leader */ + closesocket(pipeWM[PIPE_WRITE]); + } + + pqsignal(SIGPIPE, SIG_IGN); + + /* + * Re-establish query cancellation on the leader connection. + */ + set_archive_cancel_info(AH, AH->connection); + + /* + * Tell the cancel signal handler to forward signals to worker processes, + * too. (As with query cancel, we did not need this earlier because the + * workers have not yet been given anything to do; if we die before this + * point, any already-started workers will see EOF and quit promptly.) + */ + shutdown_info.pstate = pstate; + + return pstate; +} + +/* + * Close down a parallel dump or restore. + */ +void +ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) +{ + int i; + + /* No work if non-parallel */ + if (pstate->workerNum == 1) + return; + + /* There should not be any unfinished jobs */ + Assert(IsEveryWorkerIdle(pstate)); + + /* Close the sockets so that the workers know they can exit */ + for (i = 0; i < pstate->workerNum; i++) + { + closesocket(pstate->parallelSlot[i].pipeRead); + closesocket(pstate->parallelSlot[i].pipeWrite); + } + + /* Wait for them to exit */ + WaitForTerminatingWorkers(pstate); + + /* + * Unlink pstate from shutdown_info, so the exit handler will not try to + * use it; and likewise unlink from signal_info. + */ + shutdown_info.pstate = NULL; + + /* Release state (mere neatnik-ism, since we're about to terminate) */ + free(pstate->te); + free(pstate->parallelSlot); + free(pstate); +} + +/* + * This function is called by both Unix and Windows variants to set up + * and run a worker process. Caller should exit the process (or thread) + * upon return. + */ +static void +RunWorker(ArchiveHandle *AH, ParallelSlot *slot) +{ + int pipefd[2]; + + /* fetch child ends of pipes */ + pipefd[PIPE_READ] = slot->pipeRevRead; + pipefd[PIPE_WRITE] = slot->pipeRevWrite; + + /* + * Clone the archive so that we have our own state to work with, and in + * particular our own database connection. + * + * We clone on Unix as well as Windows, even though technically we don't + * need to because fork() gives us a copy in our own address space + * already. But CloneArchive resets the state information and also clones + * the database connection which both seem kinda helpful. + */ + AH = CloneArchive(AH); + + /* Remember cloned archive where signal handler can find it */ + slot->AH = AH; + + /* + * Call the setup worker function that's defined in the ArchiveHandle. + */ + (AH->SetupWorkerptr) ((Archive *) AH); + + /* + * Execute commands until done. + */ + WaitForCommands(AH, pipefd); + + /* + * Disconnect from database and clean up. + */ + slot->AH = NULL; + DisconnectDatabase(&AH->publicArc); + DeCloneArchive(AH); +} + +/* + * These next four functions handle construction and parsing of the command + * strings and response strings for parallel workers. + * + * Currently, these can be the same regardless of which archive format we are + * processing. In future, we might want to let format modules override these + * functions to add format-specific data to a command or response. + */ + +/* + * buildWorkerCommand: format a command string to send to a worker. + * + * The string is built in the caller-supplied buffer of size buflen. + */ +static void +buildWorkerCommand(TocEntry *te, T_Action act, + char *buf, int buflen) +{ + if (act == ACT_DUMP) + snprintf(buf, buflen, "DUMP %d", te->dumpId); + else + Assert(false); +} + +/* + * parseWorkerCommand: interpret a command string in a worker. + */ +static void +parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, + const char *msg) +{ + DumpId dumpId; + int nBytes; + + if (messageStartsWith(msg, "DUMP ")) + { + *act = ACT_DUMP; + sscanf(msg, "DUMP %d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(msg)); + *te = getTocEntryByDumpId(AH, dumpId); + Assert(*te != NULL); + } else { + exit_horribly(NULL,"unrecognized command received from leader: \"%s\"", msg); + } + +} + +static void +buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, char *buf, int buflen) +{ + snprintf(buf, buflen, "OK %d %d", + te->dumpId, + AH->publicArc.n_errors); +} + +static int +parseWorkerResponse(TocEntry *te, + const char *msg) +{ + DumpId dumpId; + int nBytes, + n_errors; + + if (messageStartsWith(msg, "OK ")) + { + sscanf(msg, "OK %d %d%n", &dumpId, &n_errors, &nBytes); + + Assert(dumpId == te->dumpId); + Assert(nBytes == strlen(msg)); + } + else + exit_horribly(NULL, "invalid message received from worker: \"%s\"", msg); + + return n_errors; +} +/* + * Acquire lock on a table to be dumped by a worker process. + * + * The leader process is already holding an ACCESS SHARE lock. Ordinarily + * it's no problem for a worker to get one too, but if anything else besides + * pg_dump is running, there's a possible deadlock: + * + * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode. + * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted + * because the leader holds a conflicting ACCESS SHARE lock). + * 3) A worker process also requests an ACCESS SHARE lock to read the table. + * The worker is enqueued behind the ACCESS EXCLUSIVE lock request. + * 4) Now we have a deadlock, since the leader is effectively waiting for + * the worker. The server cannot detect that, however. + * + * To prevent an infinite wait, prior to touching a table in a worker, request + * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock, + * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and + * so we have a deadlock. We must fail the backup in that case. + */ +static void +lockTableForWorker(ArchiveHandle *AH, TocEntry *te) +{ + const char *qualId; + PQExpBuffer query; + PGresult *res; + + /* Nothing to do for BLOBS */ + if (strcmp(te->desc, "BLOBS") == 0) + return; + + query = createPQExpBuffer(); + + qualId = fmtQualifiedId(te->nmspace, te->tag); + + appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT", + qualId); + + res = PQexec(AH->connection, query->data); + + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + fatal("could not obtain lock on relation \"%s\"\n" + "This usually means that someone requested an ACCESS EXCLUSIVE lock " + "on the table after the pg_dump parent process had gotten the " + "initial ACCESS SHARE lock on the table.", qualId); + + PQclear(res); + destroyPQExpBuffer(query); +} + +/* + * WaitForCommands: main routine for a worker process. + * + * Read and execute commands from the leader until we see EOF on the pipe. + */ +static void +WaitForCommands(ArchiveHandle *AH, int pipefd[2]) +{ + char *command; + TocEntry *te; + T_Action act; + char buf[256]; + int count = 0; + + for (;;) + { + if (!(command = getMessageFromLeader(pipefd))) + { + /* EOF, so done */ + return; + } + + /* Decode the command */ + parseWorkerCommand(AH, &te, &act, command); + + if (act == ACT_DUMP) { + /* Acquire lock on this table within the worker's session */ + lockTableForWorker(AH, te); + + /* Perform the dump command */ + WriteDataChunksForTocEntry(AH, te); + } + else + Assert(false); + + /* Return status to leader */ + buildWorkerResponse(AH, te, buf, sizeof(buf)); + sendMessageToPipe(pipefd[PIPE_WRITE], buf); + + /* command was pg_malloc'd and we are responsible for free()ing it. */ + pg_free(command); + } +} + +/* + * Dispatch a job to some free worker. + * + * te is the TocEntry to be processed, act is the action to be taken on it. + * callback is the function to call on completion of the job. + * + * If no worker is currently available, this will block, and previously + * registered callback functions may be called. + */ +void +DispatchJobForTocEntry(ParallelState *pstate, + TocEntry *te, + T_Action act) +{ + int worker; + char buf[256]; + + /* Get a worker, waiting if none are idle */ + while ((worker = GetIdleWorker(pstate)) == NO_SLOT) + WaitForWorkers(pstate, WFW_ONE_IDLE); + + /* Construct and send command string */ + buildWorkerCommand(te, act, buf, sizeof(buf)); + sendMessageToPipe(pstate->parallelSlot[worker].pipeWrite, buf); + + pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; + pstate->te[worker] = te; +} + +/* + * Find an idle worker and return its slot number. + * Return NO_SLOT if none are idle. + */ +static int +GetIdleWorker(ParallelState *pstate) +{ + for (int i = 0; i < pstate->workerNum; i++) + { + if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) + return i; + } + return NO_SLOT; +} + +/* + * Send a command message to the specified fd. + */ +static void +sendMessageToPipe(int fd, const char *str) +{ + int len = strlen(str) + 1; + + if (write(fd, str, len) != len) + { + exit_horribly(NULL, "could not write to the communication channel: %m\n"); + } +} + +/* + * Read one command message from the leader, blocking if necessary + * until one is available, and return it as a malloc'd string. + * On EOF, return NULL. + * + * This function is executed in worker processes. + */ +static char * +getMessageFromLeader(int pipefd[2]) +{ + return readMessageFromPipe(pipefd[PIPE_READ]); +} + +/* + * Read one message from the specified pipe (fd), blocking if necessary + * until one is available, and return it as a malloc'd string. + * On EOF, return NULL. + * + * A "message" on the channel is just a null-terminated string. + */ +static char * +readMessageFromPipe(int fd) +{ + char *msg; + int msgsize, + bufsize; + int ret; + + /* + * In theory, if we let piperead() read multiple bytes, it might give us + * back fragments of multiple messages. (That can't actually occur, since + * neither leader nor workers send more than one message without waiting + * for a reply, but we don't wish to assume that here.) For simplicity, + * read a byte at a time until we get the terminating '\0'. This method + * is a bit inefficient, but since this is only used for relatively short + * command and status strings, it shouldn't matter. + */ + bufsize = 64; /* could be any number */ + msg = (char *) pg_malloc(bufsize); + msgsize = 0; + for (;;) + { + Assert(msgsize < bufsize); + ret = read(fd, msg + msgsize, 1); + if (ret <= 0) + break; /* error or connection closure */ + + Assert(ret == 1); + + if (msg[msgsize] == '\0') + return msg; /* collected whole message */ + + msgsize++; + if (msgsize == bufsize) /* enlarge buffer if needed */ + { + bufsize += 16; /* could be any number */ + msg = (char *) pg_realloc(msg, bufsize); + } + } + + /* Other end has closed the connection */ + pg_free(msg); + return NULL; +} + +/* + * Wait until some descriptor in "workerset" becomes readable. + * Returns -1 on error, else the number of readable descriptors. + */ +static int +select_loop(int maxFd, fd_set *workerset) +{ + int i; + fd_set saveSet = *workerset; + + for (;;) + { + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, NULL); + + if (i < 0 && errno == EINTR) + continue; + break; + } + + return i; +} + +/* + * Check for messages from worker processes. + * + * If a message is available, return it as a malloc'd string, and put the + * index of the sending worker in *worker. + * + * If nothing is available, wait if "do_wait" is true, else return NULL. + * + * If we detect EOF on any socket, we'll return NULL. It's not great that + * that's hard to distinguish from the no-data-available case, but for now + * our one caller is okay with that. + * + * This function is executed in the leader process. + */ +static char * +getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) +{ + int i; + fd_set workerset; + int maxFd = -1; + struct timeval nowait = {0, 0}; + + /* construct bitmap of socket descriptors for select() */ + FD_ZERO(&workerset); + for (i = 0; i < pstate->workerNum; i++) + { + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; + FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); + if (pstate->parallelSlot[i].pipeRead > maxFd) + maxFd = pstate->parallelSlot[i].pipeRead; + } + + if (do_wait) + { + i = select_loop(maxFd, &workerset); + Assert(i != 0); + } + else + { + if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0) + return NULL; + } + + if (i < 0) + exit_horribly(NULL, "%s() failed: %m", "select"); + + for (i = 0; i < pstate->workerNum; i++) + { + char *msg; + + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; + if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) + continue; + + /* + * Read the message if any. If the socket is ready because of EOF, + * we'll return NULL instead (and the socket will stay ready, so the + * condition will persist). + * + * Note: because this is a blocking read, we'll wait if only part of + * the message is available. Waiting a long time would be bad, but + * since worker status messages are short and are always sent in one + * operation, it shouldn't be a problem in practice. + */ + msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); + *worker = i; + return msg; + } + Assert(false); + return NULL; +} + +/* + * Check for status messages from workers. + * + * If do_wait is true, wait to get a status message; otherwise, just return + * immediately if there is none available. + * + * When we get a status message, we pass the status code to the callback + * function that was specified to DispatchJobForTocEntry, then reset the + * worker status to IDLE. + * + * Returns true if we collected a status message, else false. + * + * XXX is it worth checking for more than one status message per call? + * It seems somewhat unlikely that multiple workers would finish at exactly + * the same time. + */ +static bool +ListenToWorkers(ParallelState *pstate, bool do_wait) +{ + int worker; + char *msg; + + /* Try to collect a status message */ + msg = getMessageFromWorker(pstate, do_wait, &worker); + + if (!msg) + { + /* If do_wait is true, we must have detected EOF on some socket */ + if (do_wait) + exit_horribly(NULL, "a worker process died unexpectedly"); + return false; + } + + /* Process it and update our idea of the worker's status */ + if (messageStartsWith(msg, "OK ")) + { + ParallelSlot *slot = &pstate->parallelSlot[worker]; + TocEntry *te = pstate->te[worker]; + int n_errors; + + n_errors = parseWorkerResponse(te, msg); + slot->workerStatus = WRKR_IDLE; + pstate->te[worker] = NULL; + } + else + exit_horribly(NULL, "invalid message received from worker: \"%s\"", + msg); + + /* Free the string returned from getMessageFromWorker */ + free(msg); + + return true; +} + +/* + * Return true iff no worker is running. + */ +static bool +HasEveryWorkerTerminated(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->workerNum; i++) + { + if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + return false; + } + return true; +} + +/* + * Return true iff every worker is in the WRKR_IDLE state. + */ +bool +IsEveryWorkerIdle(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->workerNum; i++) + { + if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) + return false; + } + return true; +} + +/* + * Check for status results from workers, waiting if necessary. + * + * Available wait modes are: + * WFW_NO_WAIT: reap any available status, but don't block + * WFW_GOT_STATUS: wait for at least one more worker to finish + * WFW_ONE_IDLE: wait for at least one worker to be idle + * WFW_ALL_IDLE: wait for all workers to be idle + * + * Any received results are passed to the callback specified to + * DispatchJobForTocEntry. + * + * This function is executed in the leader process. + */ +void +WaitForWorkers(ParallelState *pstate, WFW_WaitOption mode) +{ + bool do_wait = false; + /* + * In GOT_STATUS mode, always block waiting for a message, since we can't + * return till we get something. In other modes, we don't block the first + * time through the loop. + */ + if (mode == WFW_GOT_STATUS) + { + /* Assert that caller knows what it's doing */ + Assert(!IsEveryWorkerIdle(pstate)); + do_wait = true; + } + + for (;;) + { + /* + * Check for status messages, even if we don't need to block. We do + * not try very hard to reap all available messages, though, since + * there's unlikely to be more than one. + */ + if (ListenToWorkers(pstate, do_wait)) + { + /* + * If we got a message, we are done by definition for GOT_STATUS + * mode, and we can also be certain that there's at least one idle + * worker. So we're done in all but ALL_IDLE mode. + */ + if (mode != WFW_ALL_IDLE) + return; + } + + /* Check whether we must wait for new status messages */ + switch (mode) + { + case WFW_NO_WAIT: + return; /* never wait */ + case WFW_GOT_STATUS: + Assert(false); /* can't get here, because we waited */ + break; + case WFW_ONE_IDLE: + if (GetIdleWorker(pstate) != NO_SLOT) + return; + break; + case WFW_ALL_IDLE: + if (IsEveryWorkerIdle(pstate)) + return; + break; + } + + /* Loop back, and this time wait for something to happen */ + do_wait = true; + } +} \ No newline at end of file diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h new file mode 100644 index 0000000000000000000000000000000000000000..88b1c8bee30152adf2deb967e7c90db359d3c739 --- /dev/null +++ b/src/bin/pg_dump/parallel.h @@ -0,0 +1,29 @@ +#include "pg_backup_archiver.h" + +/* ParallelSlot is an opaque struct known only within parallel.c */ +typedef struct ParallelSlot ParallelSlot; + +typedef enum T_Action +{ + ACT_DUMP +} T_Action; + +typedef enum +{ + WFW_NO_WAIT, + WFW_GOT_STATUS, + WFW_ONE_IDLE, + WFW_ALL_IDLE +} WFW_WaitOption; + +typedef struct { + int workerNum; + TocEntry **te; + ParallelSlot* parallelSlot; +} ParallelState; + +extern ParallelState *ParallelBackupStart(ArchiveHandle *AH); +extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate); +extern void DispatchJobForTocEntry(ParallelState *pstate, TocEntry *te, T_Action act); +extern void WaitForWorkers(ParallelState *pstate, WFW_WaitOption mode); +extern bool IsEveryWorkerIdle(ParallelState *pstate); \ No newline at end of file diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 2d3bf5d4b6898c1923a14141cc0ecfd243f72d6f..4fe6ee39e2b6db64b45270897ce81206269cf838 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -103,10 +103,14 @@ struct Archive { /* get hash bucket info. */ bool getHashbucketInfo; + + int workerNum; /* number of parallel processes */ + char *sync_snapshot_id; /* sync snapshot id for parallel operation */ /* The rest is private */ }; typedef int (*DataDumperPtr)(Archive* AH, void* userArg); +typedef void (*SetupWorkerPtr) (Archive *AH); typedef struct _restoreOptions { int createDB; /* Issue commands to create the database */ @@ -197,6 +201,7 @@ extern void ArchiveEntry(Archive* AHX, CatalogId catalogId, DumpId dumpId, const /* Called to write *data* to the archive */ extern size_t WriteData(Archive* AH, const void* data, size_t dLen); +extern size_t WriteDataParallel(Archive* AH, const void* data, size_t dLen); extern int StartBlob(Archive* AH, Oid oid); extern int EndBlob(Archive* AH, Oid oid); @@ -211,7 +216,7 @@ extern void RestoreArchive(Archive* AH); extern Archive* OpenArchive(const char* FileSpec, const ArchiveFormat fmt, CryptoModuleCheckParam* cryptoModuleCheckParam = NULL); /* Create a new archive */ -extern Archive* CreateArchive(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode); +extern Archive* CreateArchive(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode, int workerNum, SetupWorkerPtr setupWorkerPtr); /* The --list option */ extern void PrintTOCSummary(Archive* AH, RestoreOptions* ropt); diff --git a/src/bin/pg_dump/pg_backup_archiver.cpp b/src/bin/pg_dump/pg_backup_archiver.cpp index 0ba2972d9cf145b53605219cbd7713ecb549c5e6..8fed41c43c4b188e96997ef8137b8ca2bf2e2f7d 100644 --- a/src/bin/pg_dump/pg_backup_archiver.cpp +++ b/src/bin/pg_dump/pg_backup_archiver.cpp @@ -144,7 +144,7 @@ static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; static bool disable_progress; #endif -static ArchiveHandle* _allocAH(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode, CryptoModuleCheckParam* cryptoModluleCheckParam = NULL); +static ArchiveHandle* _allocAH(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode, CryptoModuleCheckParam* cryptoModluleCheckParam = NULL, int workerNum = 1, SetupWorkerPtr setupWorkerPtr = NULL); static void _getObjectDescription(PQExpBuffer buf, TocEntry* te, ArchiveHandle* AH); static void _printTocEntry(ArchiveHandle* AH, TocEntry* te, RestoreOptions* ropt, bool isData, bool acl_pass); static char* replace_line_endings(const char* str); @@ -164,7 +164,6 @@ static bool _tocEntryIsACL(TocEntry* te); static void _disableTriggersIfNecessary(ArchiveHandle* AH, TocEntry* te, RestoreOptions* ropt); static void _enableTriggersIfNecessary(ArchiveHandle* AH, TocEntry* te, RestoreOptions* ropt); static void buildTocEntryArrays(ArchiveHandle* AH); -static TocEntry* getTocEntryByDumpId(ArchiveHandle* AH, DumpId id); static void _moveBefore(ArchiveHandle* AH, TocEntry* pos, TocEntry* te); static int _discoverArchiveFormat(ArchiveHandle* AH); @@ -195,8 +194,6 @@ static void identify_locking_dependencies(ArchiveHandle* AH, TocEntry* te); static void reduce_dependencies(ArchiveHandle* AH, TocEntry* te, TocEntry* ready_list); static void mark_create_done(ArchiveHandle* AH, TocEntry* te); static void inhibit_data_for_failed_table(ArchiveHandle* AH, TocEntry* te); -static ArchiveHandle* CloneArchive(ArchiveHandle* AH); -static void DeCloneArchive(ArchiveHandle* AH); static void setProcessIdentifier(ParallelStateEntry* pse, ArchiveHandle* AH); static void unsetProcessIdentifier(ParallelStateEntry* pse); @@ -214,10 +211,10 @@ static void get_role_password(RestoreOptions* opts); */ /* Create a new archive */ /* Public */ -Archive* CreateArchive(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode) +Archive* CreateArchive(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode, int workerNum, SetupWorkerPtr setupWorkerPtr) { - ArchiveHandle* AH = _allocAH(FileSpec, fmt, compression, mode); + ArchiveHandle* AH = _allocAH(FileSpec, fmt, compression, mode, NULL, workerNum, setupWorkerPtr); return (Archive*)AH; } @@ -924,10 +921,12 @@ static int restore_toc_entry(ArchiveHandle* AH, TocEntry* te, RestoreOptions* ro * TRUNCATE with ONLY so that child tables are not * wiped. */ - if (PQserverVersion(AH->connection) >= 80400) { - (void)ahprintf(AH, "TRUNCATE TABLE ONLY (%s);\n\n", fmtId(te->tag)); - } else { - (void)ahprintf(AH, "TRUNCATE TABLE %s;\n\n", fmtId(te->tag)); + if (AH->format != archDirectory) { + if (PQserverVersion(AH->connection) >= 80400) { + (void)ahprintf(AH, "TRUNCATE TABLE ONLY (%s);\n\n", fmtId(te->tag)); + } else { + (void)ahprintf(AH, "TRUNCATE TABLE %s;\n\n", fmtId(te->tag)); + } } } @@ -1050,6 +1049,17 @@ size_t WriteData(Archive* AHX, const void* data, size_t dLen) return (*AH->WriteDataptr)(AH, data, dLen); } +size_t WriteDataParallel(Archive* AHX, const void* data, size_t dLen) +{ + ArchiveHandle* AH = (ArchiveHandle*)AHX; + + if (NULL == (AH->currToc)) + exit_horribly( + modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n"); + + return (*AH->WriteDataptrP)(AH, data, dLen); +} + /* * Create a new TOC entry. The TOC was designed as a TOC, but is now the * repository for all metadata. But the name has stuck. @@ -1877,7 +1887,7 @@ static void buildTocEntryArrays(ArchiveHandle* AH) } } -static TocEntry* getTocEntryByDumpId(ArchiveHandle* AH, DumpId id) +TocEntry* getTocEntryByDumpId(ArchiveHandle* AH, DumpId id) { /* build index arrays if we didn't already */ if (AH->tocsByDumpId == NULL) { @@ -2236,11 +2246,11 @@ static int _discoverArchiveFormat(ArchiveHandle* AH) /* * Allocate an archive handle */ -static ArchiveHandle* _allocAH(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode, CryptoModuleCheckParam* cryptoModuleCheckParam) +static ArchiveHandle* _allocAH(const char* FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode, CryptoModuleCheckParam* cryptoModuleCheckParam, int workerNum, SetupWorkerPtr setupWorkerPtr) { ArchiveHandle* AH = NULL; - AH = (ArchiveHandle*)pg_calloc(1, sizeof(ArchiveHandle)); + AH = (ArchiveHandle*)pg_malloc_zero(sizeof(ArchiveHandle)); /* AH debugLevel was 100; */ AH->vmaj = K_VERS_MAJOR; @@ -2279,6 +2289,8 @@ static ArchiveHandle* _allocAH(const char* FileSpec, const ArchiveFormat fmt, co AH->fSpec = NULL; } + AH->SetupWorkerptr = setupWorkerPtr; + AH->connection = NULL; AH->currUser = NULL; /* unknown */ AH->currSchema = NULL; /* ditto */ @@ -2335,13 +2347,17 @@ static ArchiveHandle* _allocAH(const char* FileSpec, const ArchiveFormat fmt, co break; case archDirectory: - InitArchiveFmt_Directory(AH); + // workerNum <= 0 在 main 中已经处理 + if (workerNum > 1) { + InitArchiveFmt_Parallel(AH); + } else { + InitArchiveFmt_Directory(AH); + } break; case archTar: InitArchiveFmt_Tar(AH); break; - default: exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt); } @@ -2385,6 +2401,38 @@ void WriteDataChunks(ArchiveHandle* AH) } } +void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te) +{ + StartDataPtr startPtr; + EndDataPtr endPtr; + + AH->currToc = te; + + if (strcmp(te->desc, "BLOBS") == 0) + { + startPtr = AH->StartBlobsptr; + endPtr = AH->EndBlobsptr; + } + else + { + startPtr = AH->StartDataptr; + endPtr = AH->EndDataptr; + } + + if (startPtr != NULL) + (*startPtr) (AH, te); + + /* + * The user-provided DataDumper routine needs to call AH->WriteData + */ + te->dataDumper((Archive *) AH, te->dataDumperArg); + + if (endPtr != NULL) + (*endPtr) (AH, te); + + AH->currToc = NULL; +} + void WriteToc(ArchiveHandle* AH) { TocEntry* te = NULL; @@ -4566,7 +4614,7 @@ static void inhibit_data_for_failed_table(ArchiveHandle* AH, TocEntry* te) * * These could be public, but no need at present. */ -static ArchiveHandle* CloneArchive(ArchiveHandle* AH) +ArchiveHandle* CloneArchive(ArchiveHandle* AH) { ArchiveHandle* pstClone = NULL; @@ -4651,7 +4699,7 @@ static ArchiveHandle* CloneArchive(ArchiveHandle* AH) * * Note: we assume any clone-local connection was already closed. */ -static void DeCloneArchive(ArchiveHandle* AH) +void DeCloneArchive(ArchiveHandle* AH) { errno_t rc; /* Clear format-specific state */ diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index f89bee4d2f29dc3add1293019ec67cd56c9c3ee2..0aae269d24458afd2bf0a1970c461bd2b98d75b3 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -209,6 +209,8 @@ typedef struct _archiveHandle { * dumped */ WriteDataPtr WriteDataptr; /* Called to send some table data to the * archive */ + WriteDataPtr WriteDataptrP; /* Called to send some table data to the + * archive */ EndDataPtr EndDataptr; /* Called when table data dump is finished */ WriteBytePtr WriteByteptr; /* Write a byte to output */ ReadBytePtr ReadByteptr; /* Read a byte from an archive */ @@ -229,6 +231,8 @@ typedef struct _archiveHandle { StartBlobPtr StartBlobptr; EndBlobPtr EndBlobptr; + SetupWorkerPtr SetupWorkerptr; + ClonePtr Cloneptr; /* Clone format-specific fields */ DeClonePtr DeCloneptr; /* Clean up cloned fields */ @@ -287,6 +291,9 @@ typedef struct _archiveHandle { ArchiverStage lastErrorStage; struct _tocEntry* currentTE; struct _tocEntry* lastErrorTE; + + /* If connCancel isn't NULL, SIGINT handler will send a cancel */ + PGcancel *volatile connCancel; } ArchiveHandle; typedef struct _tocEntry { @@ -340,6 +347,7 @@ extern void ReadHead(ArchiveHandle* AH); extern void WriteToc(ArchiveHandle* AH); extern void ReadToc(ArchiveHandle* AH); extern void WriteDataChunks(ArchiveHandle* AH); +extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te); extern teReqs TocIDRequired(ArchiveHandle* AH, DumpId id); extern bool checkSeek(FILE* fp); @@ -370,6 +378,7 @@ extern void InitArchiveFmt_Custom(ArchiveHandle* AH); extern void InitArchiveFmt_Null(ArchiveHandle* AH); extern void InitArchiveFmt_Directory(ArchiveHandle* AH); extern void InitArchiveFmt_Tar(ArchiveHandle* AH); +extern void InitArchiveFmt_Parallel(ArchiveHandle* AH); extern bool isValidTarHeader(const char* header); @@ -397,6 +406,9 @@ extern size_t fread_file(void *buf, size_t size, size_t nmemb, FILE *fh); extern bool findDBCompatibility(Archive* fout, const char* databasename); extern bool hasSpecificExtension(Archive* fout, const char* databasename); +extern TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id); +extern ArchiveHandle* CloneArchive(ArchiveHandle* AH); +extern void DeCloneArchive(ArchiveHandle* AH); #ifdef HAVE_LIBZ extern size_t gzread_file(void *buf, unsigned len, gzFile fp); #endif diff --git a/src/bin/pg_dump/pg_backup_db.cpp b/src/bin/pg_dump/pg_backup_db.cpp index d7ff1053ca109f0424340e6a22f238ecf8b4d179..ed9c0e4cbcc1ccb029a781f09f000b158235df9f 100644 --- a/src/bin/pg_dump/pg_backup_db.cpp +++ b/src/bin/pg_dump/pg_backup_db.cpp @@ -403,6 +403,22 @@ PGresult* ExecuteSqlQuery(Archive* AHX, const char* query, ExecStatusType status return res; } +PGresult* ExecuteSqlQueryForSingleRow(Archive *fout, const char *query) +{ + PGresult *res; + int ntups; + + res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK); + + /* Expecting a single result only */ + ntups = PQntuples(res); + if (ntups != 1) { + exit_horribly(NULL, "query returned %d row instead of one: %s", ntups, query); + } + + return res; +} + /* * Convenience function to send a query. * Monitors result to detect COPY statements diff --git a/src/bin/pg_dump/pg_backup_directory.cpp b/src/bin/pg_dump/pg_backup_directory.cpp index f0d998684f01263ba9e3378244656101bb72941e..638a456880652a44dc0d8f09a64b69df10594741 100644 --- a/src/bin/pg_dump/pg_backup_directory.cpp +++ b/src/bin/pg_dump/pg_backup_directory.cpp @@ -36,6 +36,9 @@ #include "compress_io.h" #include "dumpmem.h" #include "pg_backup_cipher.h" +#include "parallel.h" + +#include #ifdef GAUSS_SFT_TEST #include "gauss_sft.h" @@ -116,6 +119,14 @@ static void fillWriteCryptoCache(ArchiveHandle* AH, DFormatCryptoCache* cryptoCa static void fillReadCryptoCache(ArchiveHandle* AH, DFormatCryptoCache* cryptoCache, cfp* FH); static int readFromCryptoCache(ArchiveHandle* AH, DFormatCryptoCache* cryptoCache, cfp* FH, void* buf, size_t len, bool *isempty); + +static void _Clone(ArchiveHandle *AH); +static void _ReopenArchive(ArchiveHandle *AH); +static void _DeClone(ArchiveHandle *AH); +static void _CloseArchiveP(ArchiveHandle* AH); +static size_t _WriteDataP(ArchiveHandle* AH, const void* data, size_t dLen); +static void _StartDataP(ArchiveHandle* AH, TocEntry* te); + /* * Init routine required by ALL formats. This is a global routine * and should be declared in pg_backup_archiver.h @@ -140,7 +151,7 @@ void InitArchiveFmt_Directory(ArchiveHandle* AH) AH->WriteBufptr = _WriteBuf; AH->ReadBufptr = _ReadBuf; AH->Closeptr = _CloseArchive; - AH->Reopenptr = NULL; + AH->Reopenptr = _ReopenArchive;; AH->PrintTocDataptr = _PrintTocData; AH->ReadExtraTocptr = _ReadExtraToc; AH->WriteExtraTocptr = _WriteExtraToc; @@ -151,8 +162,8 @@ void InitArchiveFmt_Directory(ArchiveHandle* AH) AH->EndBlobptr = _EndBlob; AH->EndBlobsptr = _EndBlobs; - AH->Cloneptr = NULL; - AH->DeCloneptr = NULL; + AH->Cloneptr = _Clone; + AH->DeCloneptr = _DeClone; /* Set up our private context */ ctx = (lclContext*)pg_calloc(1, sizeof(lclContext)); @@ -904,4 +915,170 @@ static int readFromCryptoCache(ArchiveHandle* AH, DFormatCryptoCache* cryptoCach } return 0; +} + + + + +typedef struct { + /* + * Our archive location. This is basically what the user specified as his + * backup file but of course here it is a directory. + */ + char* directory; + + cfp* dataFH; /* currently open data file */ + DFormatCryptoCache* dataCryptoCache; + + cfp* blobsTocFH; /* file handle for blobs.toc */ + + ParallelState* parallelState; +} lclContextP; + +void InitArchiveFmt_Parallel(ArchiveHandle* AH) +{ + InitArchiveFmt_Directory(AH); + AH->Closeptr = _CloseArchiveP; + AH->WriteDataptr = _WriteData; + AH->WriteDataptrP = _WriteDataP; + AH->StartDataptr = _StartDataP; +} + +static void +_Clone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext)); + memcpy(AH->formatData, ctx, sizeof(lclContext)); + ctx = (lclContext *) AH->formatData; + + /* + * Note: we do not make a local lo_buf because we expect at most one BLOBS + * entry per archive, so no parallelism is possible. Likewise, + * TOC-entry-local state isn't an issue because any one TOC entry is + * touched by just one worker child. + */ + + /* + * We also don't copy the ParallelState pointer (pstate), only the leader + * process ever writes to it. + */ +} + +static void +_ReopenArchive(ArchiveHandle *AH) +{ + /* + * Our TOC is in memory, our data files are opened by each child anyway as + * they are separate. We support reopening the archive by just doing + * nothing. + */ +} + +static void +_DeClone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + free(ctx); +} + + +static void _CloseArchiveP(ArchiveHandle* AH) +{ + lclContextP* ctx = (lclContextP*)AH->formatData; + + if (AH->mode == archModeWrite) { + cfp* tocFH = NULL; + char* fname = prependDirectory(AH, "toc.dat"); + + ctx->parallelState = ParallelBackupStart(AH); + /* The TOC is always created uncompressed */ + tocFH = cfopen_write(fname, PG_BINARY_W, 0); + if (tocFH == NULL) + exit_horribly(modulename, "could not open output file \"%s\": %s\n", fname, strerror(errno)); + ctx->dataFH = tocFH; + + if (AH->publicArc.encryptfile) { + initCryptoCache(archModeWrite, &(ctx->dataCryptoCache)); + } + + /* + * Write 'tar' in the format field of the toc.dat file. The directory + * is compatible with 'tar', so there's no point having a different + * format code for it. + */ + AH->format = archTar; + WriteHead(AH); + AH->format = archDirectory; + WriteToc(AH); + + if (AH->publicArc.encryptfile) { + encryptAndFlushCache(AH, ctx->dataCryptoCache, tocFH); + releaseCryptoCache(ctx->dataCryptoCache); + } + + if (cfclose(tocFH) != 0) + exit_horribly(modulename, "could not close TOC file: %s\n", strerror(errno)); + + TocEntry **tes; + int ntes; + + tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *)); + ntes = 0; + for (TocEntry *te = AH->toc->next; te != AH->toc; te = te->next) + { + /* Consider only TEs with dataDumper functions ... */ + if (!te->dataDumper) + continue; + /* ... and ignore ones not enabled for dump */ + if ((te->reqs & REQ_DATA) == 0) + continue; + + tes[ntes++] = te; + } + + for (int i = 0; i < ntes; i++) + DispatchJobForTocEntry(ctx->parallelState, tes[i], ACT_DUMP); + + pg_free(tes); + WaitForWorkers(ctx->parallelState, WFW_ALL_IDLE); + + ParallelBackupEnd(AH, ctx->parallelState); + } + AH->FH = NULL; +} + +static size_t _WriteDataP(ArchiveHandle* AH, const void* data, size_t dLen) +{ + lclContext* ctx = (lclContext*)AH->formatData; + + if (dLen == 0) + return 0; + + if (ctx->dataCryptoCache) { + fillWriteCryptoCache(AH, ctx->dataCryptoCache, ctx->dataFH, data, dLen); + } else { + return (size_t)cfwriteWithLock(data, (int)dLen, ctx->dataFH);; + } + + return dLen; +} + +static void _StartDataP(ArchiveHandle* AH, TocEntry* te) +{ + lclTocEntry* tctx = (lclTocEntry*)te->formatData; + lclContext* ctx = (lclContext*)AH->formatData; + char* fname = NULL; + + fname = prependDirectory(AH, tctx->filename); + + ctx->dataFH = cfopen_write4Lock(fname, PG_BINARY_A, AH->compression); + if (ctx->dataFH == NULL) + exit_horribly(modulename, "could not open output file \"%s\": %s\n", fname, strerror(errno)); + + if (AH->publicArc.encryptfile) { + initCryptoCache(AH->mode, &(ctx->dataCryptoCache)); + } } \ No newline at end of file diff --git a/src/bin/pg_dump/pg_dump.cpp b/src/bin/pg_dump/pg_dump.cpp index cb14cf07eab9e58440f6f53aac195d5a9f9595ca..61f2dcb4e2af460de8d091005b68ac513569f7eb 100644 --- a/src/bin/pg_dump/pg_dump.cpp +++ b/src/bin/pg_dump/pg_dump.cpp @@ -32,6 +32,7 @@ #include "postgres_fe.h" #include +#include #ifdef WIN32_PG_DUMP #undef PGDLLIMPORT @@ -76,7 +77,6 @@ #include "dumputils.h" #include "postgres.h" #include "knl/knl_variable.h" -#include "common/fe_memutils.h" #include "openssl/rand.h" #include "miscadmin.h" #include "bin/elog.h" @@ -160,7 +160,12 @@ const int MAX_CMK_STORE_SIZE = 64; #define BEGIN_P_STR " BEGIN_B_PROC " /* used in dolphin type proc body*/ #define BEGIN_P_LEN 14 #define BEGIN_N_STR " BEGIN " /* BEGIN_P_STR to same length*/ + +#define PositiveInfinity -1 +#define NegativeInfinity -1 /* used for progress report */ +int g_totalPageNum = 0; +int g_splitPageNum = 0; int g_curStep = 0; int g_totalObjNums = 0; int g_dumpObjNums = 0; @@ -378,6 +383,8 @@ static int exclude_function = 0; static bool is_pipeline = false; static int no_subscriptions = 0; static int no_publications = 0; +static int workerNum = 1; +static bool enableSplitTable = false; #if defined(USE_ASSERT_CHECKING) || defined(FASTCHECK) static bool disable_progress = false; #endif @@ -420,6 +427,7 @@ static void exclude_error_tables(Archive* fout, SimpleOidList* oids); static void ExcludeMatRelTables(Archive* fout, SimpleOidList* oids); static NamespaceInfo* findNamespace(Archive* fout, Oid nsoid, Oid objoid); +static bool isHeapTable(const char* option); static void dumpTableData(Archive* fout, TableDataInfo* tdinfo); static void guessConstraintInheritance(TableInfo* tblinfo, int numTables); static void dumpComment(Archive* fout, const char* target, const char* nmspace, const char* owner, CatalogId catalogId, @@ -570,6 +578,9 @@ static bool needIgnoreSequence(TableInfo* tbinfo); static void *ProgressReportDump(void *arg); static void *ProgressReportScanDatabase(void *arg); inline bool isDB4AIschema(const NamespaceInfo *nspinfo); + +static void setupDumpWorker(Archive *AHX); + #ifdef DUMPSYSLOG static void ReceiveSyslog(PGconn* conn, const char* current_path); #endif @@ -619,6 +630,7 @@ int main(int argc, char** argv) {"file", required_argument, NULL, 'f'}, {"format", required_argument, NULL, 'F'}, {"host", required_argument, NULL, 'h'}, + {"jobs", required_argument, NULL, 'j'}, {"oids", no_argument, NULL, 'o'}, {"no-owner", no_argument, NULL, 'O'}, {"port", required_argument, NULL, 'p'}, @@ -698,6 +710,7 @@ int main(int argc, char** argv) #endif {"with-module-params", required_argument, NULL, 19}, {"gen-key", no_argument, NULL, 20}, + {"split-huge-table", no_argument, NULL, 21}, {NULL, 0, NULL, 0}}; set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("gs_dump")); @@ -810,10 +823,19 @@ int main(int argc, char** argv) exit_nicely(0); } + if (workerNum <= 0) { + exit_horribly(NULL, "invalid number of parallel jobs"); + } else if (workerNum > 1 && archiveFormat != archDirectory ) { + exit_horribly(NULL, "parallel backup only supported by the directory format"); + } + if (workerNum <= 1 && enableSplitTable) { + exit_horribly(NULL, "can not split table without parallel"); + } + /* Open the output file */ - fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode); - + fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode, workerNum, setupDumpWorker); + fout->workerNum = workerNum; /* Register the cleanup hook */ on_exit_close_archive(fout); @@ -990,22 +1012,6 @@ int main(int argc, char** argv) PQclear(res); } - /* - * Start transaction-snapshot mode transaction to dump consistent data. - */ - ExecuteSqlStatement(fout, "START TRANSACTION"); - if (fout->remoteVersion >= 90100) { - if (serializable_deferrable) - ExecuteSqlStatement(fout, - "SET TRANSACTION ISOLATION LEVEL " - "SERIALIZABLE, READ ONLY, DEFERRABLE"); - else - ExecuteSqlStatement(fout, - "SET TRANSACTION ISOLATION LEVEL " - "REPEATABLE READ"); - } else - ExecuteSqlStatement(fout, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); - /* Select the appropriate subquery to convert user IDs to names */ if (fout->remoteVersion >= 80100) username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid ="; @@ -1150,6 +1156,17 @@ int main(int argc, char** argv) /* Dump nls_length_semantics parameter */ DummpLengthSemantics(fout); + if (enableSplitTable) { + for (i = 0; i < numObjs; i++) { + if (dobjs[i]->objType == DO_TABLE_DATA) { + TableInfo* tbinfo = ((TableDataInfo*)dobjs[i])->tdtable; + g_totalPageNum += tbinfo->relpages; + } + } + if (g_totalPageNum > 0) { + g_splitPageNum = g_totalPageNum / workerNum; + } + } /* Now the rearrangeable objects. */ for (i = 0; i < numObjs; i++) { g_dumpObjNums++; @@ -1508,7 +1525,7 @@ void getopt_dump(int argc, char** argv, struct option options[], int* result) } } - while ((c = getopt_long(argc, argv, "abcCE:f:F:g:h:n:N:oOp:q:RsS:t:T:U:vwW:xZ:", options, result)) != -1) { + while ((c = getopt_long(argc, argv, "abcCE:f:F:g:h:j:n:N:oOp:q:RsS:t:T:U:vwW:xZ:", options, result)) != -1) { switch (c) { case 'a': /* Dump data only */ dataOnly = true; @@ -1550,6 +1567,10 @@ void getopt_dump(int argc, char** argv, struct option options[], int* result) pghost = gs_strdup(optarg); break; + case 'j': + workerNum = atoi(optarg); + break; + case 'g': /* exclude guc parameter */ simple_string_list_append(&exclude_guc, optarg); break; @@ -1761,6 +1782,9 @@ void getopt_dump(int argc, char** argv, struct option options[], int* result) gen_key = true; is_encrypt = true; break; + case 21: + enableSplitTable = true; + break; default: write_stderr(_("Try \"%s --help\" for more information.\n"), progname); exit_nicely(1); @@ -1914,6 +1938,7 @@ void help(const char* pchProgname) printf(_(" -f, --file=FILENAME output file or directory name\n")); printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n" " plain text (default))\n")); + printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n")); printf(_(" -v, --verbose verbose mode\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -Z, --compress=0-9 compression level for compressed formats\n")); @@ -1978,6 +2003,7 @@ void help(const char* pchProgname) "MODULE_CONFIG_FILE_PATH:GDACCARD need not,JNTAKMS exclude lib file name absolute path,SWXA need include lib file absolute path" "used by gs_dump, load device\n")); printf(_(" --gen-key if you have not key for using,you can set this option to generate key and encrypt dump data,store it to using again\n")); + printf(_(" --split-huge-table enable same-table parallel dump\n")); #ifdef ENABLE_MULTIPLE_NODES printf(_(" --include-nodes include TO NODE/GROUP clause in the dumped CREATE TABLE " "and CREATE FOREIGN TABLE commands.\n")); @@ -2148,6 +2174,38 @@ static void setup_connection(Archive* AH) if (quote_all_identifiers && AH->remoteVersion >= 90100) ExecuteSqlStatement(AH, "SET quote_all_identifiers = true"); + + /* + * Start transaction-snapshot mode transaction to dump consistent data. + */ + ExecuteSqlStatement(AH, "START TRANSACTION"); + if (AH->remoteVersion >= 90100) { + if (serializable_deferrable) + ExecuteSqlStatement(AH, + "SET TRANSACTION ISOLATION LEVEL " + "SERIALIZABLE, READ ONLY, DEFERRABLE"); + else + ExecuteSqlStatement(AH, + "SET TRANSACTION ISOLATION LEVEL " + "REPEATABLE READ"); + } else + ExecuteSqlStatement(AH, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + + if (AH->sync_snapshot_id) + { + PQExpBuffer query = createPQExpBuffer(); + + appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT "); + appendStringLiteralConn(query, AH->sync_snapshot_id, conn); + ExecuteSqlStatement(AH, query->data); + destroyPQExpBuffer(query); + } + else if (AH->workerNum > 1) + { + res = ExecuteSqlQueryForSingleRow(AH, "SELECT pg_catalog.pg_export_snapshot()"); + AH->sync_snapshot_id = gs_strdup(PQgetvalue(res, 0, 0));; + PQclear(res); + } } static ArchiveFormat parseArchiveFormat(ArchiveMode* mode) @@ -2648,6 +2706,185 @@ static void selectDumpableFuncs(FuncInfo *fcinfo, Archive *fout = NULL) } } +static int dumpTableDataSplit_copy(Archive* fout, void* dcontext) { + TableDataSplitInfo* tdsinfo = (TableDataSplitInfo*)dcontext; + TableDataInfo* tdinfo = tdsinfo->tdinfo; + char* splitcond = tdsinfo->splitcond; + + TableInfo* tbinfo = tdinfo->tdtable; + const char* classname = tbinfo->dobj.name; + const bool hasoids = tbinfo->hasoids; + const bool boids = tdinfo->oids; + PQExpBuffer q = createPQExpBuffer(); + PGconn* conn = GetConnection(fout); + PGresult* res = NULL; + int ret = 0; + char* copybuf = NULL; + const char* column_list = NULL; + + if (g_verbose) + write_msg(NULL, + "dumping contents of table \"%s\"\n", + fmtQualifiedId(fout, tbinfo->dobj.nmspace->dobj.name, tbinfo->dobj.name)); + + if (isDB4AIschema(tbinfo->dobj.nmspace) && !isExecUserSuperRole(fout)) { + write_msg(NULL, "WARNING: schema db4ai not dumped because current user is not a superuser\n"); + destroyPQExpBuffer(q); + return 1; + } + + /* + * Make sure we are in proper schema. We will qualify the table name + * below anyway (in case its name conflicts with a pg_catalog table); but + * this ensures reproducible results in case the table contains regproc, + * regclass, etc columns. + */ + selectSourceSchema(fout, tbinfo->dobj.nmspace->dobj.name); + + /* + * If possible, specify the column list explicitly so that we have no + * possibility of retrieving data in the wrong column order. (The default + * column ordering of COPY will not be what we want in certain corner + * cases involving ADD COLUMN and inheritance.) + */ + if (fout->remoteVersion >= 70300) + column_list = fmtCopyColumnList(tbinfo); + else + column_list = ""; /* can't select columns in COPY */ + + if (boids && hasoids) { + appendPQExpBuffer(q, + "COPY %s %s WITH OIDS TO stdout;", + fmtQualifiedId(fout, tbinfo->dobj.nmspace->dobj.name, classname), + column_list); + } else if (NULL != tdinfo->filtercond || NULL != splitcond || tdinfo->tdtable->isMOT) { + /* Note: this syntax is only supported in 8.2 and up */ + appendPQExpBufferStr(q, "COPY (SELECT "); + /* klugery to get rid of parens in column list */ + if (strlen(column_list) > 2) { + appendPQExpBufferStr(q, column_list + 1); + q->data[q->len - 1] = ' '; + } else { + appendPQExpBufferStr(q, "* "); + } + if (tdinfo->filtercond) { + if (!splitcond) { + appendPQExpBuffer(q, + "FROM %s %s) TO stdout;", + fmtQualifiedId(fout, tbinfo->dobj.nmspace->dobj.name, classname), + tdinfo->filtercond); + } else { + appendPQExpBuffer(q, + "FROM %s %s AND %s) TO stdout;", + fmtQualifiedId(fout, tbinfo->dobj.nmspace->dobj.name, classname), + tdinfo->filtercond, + splitcond); + } + + } else if (splitcond) { + appendPQExpBuffer(q, + "FROM %s WHERE %s) TO stdout;", + fmtQualifiedId(fout, tbinfo->dobj.nmspace->dobj.name, classname), + splitcond); + } else { + appendPQExpBuffer(q, + "FROM %s) TO stdout;", + fmtQualifiedId(fout, tbinfo->dobj.nmspace->dobj.name, classname)); + } + } else { + appendPQExpBuffer( + q, "COPY %s %s TO stdout;", fmtQualifiedId(fout, tbinfo->dobj.nmspace->dobj.name, classname), column_list); + } + res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT); + PQclear(res); + + for (;;) { + ret = PQgetCopyData(conn, ©buf, 0); + + if (ret < 0) + break; /* done or error */ + + if (NULL != copybuf) { + size_t writeBytes = 0; + writeBytes = WriteDataParallel(fout, copybuf, ret); + if (writeBytes != (size_t)ret) { + write_msg(NULL, "could not write to output file: %s\n", strerror(errno)); + exit_nicely(1); + } + + PQfreemem(copybuf); + } + + /* ---------- + * THROTTLE: + * + * There was considerable discussion in late July, 2000 regarding + * slowing down pg_dump when backing up large tables. Users with both + * slow & fast (multi-processor) machines experienced performance + * degradation when doing a backup. + * + * Initial attempts based on sleeping for a number of ms for each ms + * of work were deemed too complex, then a simple 'sleep in each loop' + * implementation was suggested. The latter failed because the loop + * was too tight. Finally, the following was implemented: + * + * If throttle is non-zero, then + * See how long since the last sleep. + * Work out how long to sleep (based on ratio). + * If sleep is more than 100ms, then + * sleep + * reset timer + * EndIf + * EndIf + * + * where the throttle value was the number of ms to sleep per ms of + * work. The calculation was done in each loop. + * + * Most of the hard work is done in the backend, and this solution + * still did not work particularly well: on slow machines, the ratio + * was 50:1, and on medium paced machines, 1:1, and on fast + * multi-processor machines, it had little or no effect, for reasons + * that were unclear. + * + * Further discussion ensued, and the proposal was dropped. + * + * For those people who want this feature, it can be implemented using + * gettimeofday in each loop, calculating the time since last sleep, + * multiplying that by the sleep ratio, then if the result is more + * than a preset 'minimum sleep time' (say 100ms), call the 'select' + * function to sleep for a subsecond period ie. + * + * select(0, NULL, NULL, NULL, &tvi); + * + * This will return after the interval specified in the structure tvi. + * Finally, call gettimeofday again to save the 'last sleep time'. + * ---------- + */ + } + + archprintf(fout, "\\.\n;\n\n"); + + if (ret == -2) { + /* copy data transfer failed */ + write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.\n", classname); + write_msg(NULL, "Error message from server: %s", PQerrorMessage(conn)); + write_msg(NULL, "The command was: %s\n", q->data); + exit_nicely(1); + } + + /* Check command status and return to normal libpq state */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetResult() failed.\n", classname); + write_msg(NULL, "Error message from server: %s", PQerrorMessage(conn)); + write_msg(NULL, "The command was: %s\n", q->data); + exit_nicely(1); + } + PQclear(res); + + destroyPQExpBuffer(q); + return 1; +} /* * Dump a table's contents for loading using the COPY command * - this routine is called by the Archiver when it wants the table @@ -2980,6 +3217,15 @@ static int dumpTableData_insert(Archive* fout, void* dcontext) return 1; } +static bool isHeapTable(const char* option) { + if (option == NULL) + return true; + if (strstr(option, "storage_type=astore") != NULL) + return true; + if (strstr(option, "storage_type") != NULL) + return false; + return true; +} /* * dumpTableData - * dump the contents of a single table @@ -2994,8 +3240,6 @@ static void dumpTableData(Archive* fout, TableDataInfo* tdinfo) char* copyStmt = NULL; if (!dump_inserts) { - /* Dump/restore using COPY */ - dumpFn = dumpTableData_copy; /* must use 2 steps here 'cause fmtId is nonreentrant */ appendPQExpBuffer(copyBuf, "COPY %s.%s ", tbinfo->dobj.nmspace->dobj.name, fmtId(tbinfo->dobj.name)); appendPQExpBuffer(copyBuf, @@ -3003,6 +3247,58 @@ static void dumpTableData(Archive* fout, TableDataInfo* tdinfo) fmtCopyColumnList(tbinfo), (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : ""); copyStmt = copyBuf->data; + if (g_splitPageNum > 0 && tbinfo->relpages > g_splitPageNum && tbinfo->parttype == 'n' && isHeapTable(tbinfo->reloptions)) { + DataDumperPtr dumpFnSplit = dumpTableDataSplit_copy; + int partition = tbinfo->relpages / g_splitPageNum; + if (tbinfo->relpages % g_splitPageNum != 0) { + partition += 1; + } + for (int part = 1; part <= partition; part++) { + TableDataSplitInfo* tdsinfo = (TableDataSplitInfo*) pg_malloc(sizeof(TableDataSplitInfo)); + tdsinfo->tdinfo = tdinfo; + char* splitcond = (char *)pg_malloc(256 * sizeof(char)); + if (part == 1) { + sprintf(splitcond, + "ctid <= '(%d,0)'::tid", + part * g_splitPageNum); + } else if (part == partition) { + sprintf(splitcond, + "ctid > '(%d,0)'::tid", + (part - 1) * g_splitPageNum); + } else { + sprintf(splitcond, + "ctid > '(%d,0)'::tid and ctid <= '(%d,0)'::tid", + (part - 1) * g_splitPageNum, part * g_splitPageNum); + } + tdsinfo->splitcond = splitcond; + if (part > 1) { + AssignDumpId(&tdinfo->dobj); + } + ArchiveEntry(fout, + tdinfo->dobj.catId, + tdinfo->dobj.dumpId, + tbinfo->dobj.name, + tbinfo->dobj.nmspace->dobj.name, + NULL, + tbinfo->rolname, + false, + "TABLE DATA", + SECTION_DATA, + "", + "", + copyStmt, + &(tbinfo->dobj.dumpId), + 1, + dumpFnSplit, + tdsinfo); + } + + destroyPQExpBuffer(copyBuf); + return; + } else { + /* Dump/restore using COPY */ + dumpFn = dumpTableData_copy; + } } else { /* Restore using INSERT */ dumpFn = dumpTableData_insert; @@ -7072,6 +7368,7 @@ TableInfo* getTables(Archive* fout, int* numTables) int i_checkoption = 0; int i_toastreloptions = 0; int i_reloftype = 0; + int i_relpages = 0; int i_parttype = 0; int i_relrowmovement = 0; int i_relhsblockchain = 0; @@ -7166,7 +7463,7 @@ TableInfo* getTables(Archive* fout, int* numTables) } appendPQExpBuffer(query, - "c.parttype, c.relrowmovement, c.relcmprs, " + "c.relpages, c.parttype, c.relrowmovement, c.relcmprs, " "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL::Oid END AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " @@ -7223,7 +7520,7 @@ TableInfo* getTables(Archive* fout, int* numTables) appendPQExpBuffer(query, "'d' AS relreplident, "); } appendPQExpBuffer(query, - "c.parttype, c.relrowmovement, c.relcmprs, " + "c.relpages, c.parttype, c.relrowmovement, c.relcmprs, " "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL::Oid END AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " @@ -7581,6 +7878,7 @@ TableInfo* getTables(Archive* fout, int* numTables) i_toastfrozenxid64 = PQfnumber(res, "tfrozenxid64"); i_relpersistence = PQfnumber(res, "relpersistence"); i_relreplident = PQfnumber(res, "relreplident"); + i_relpages = PQfnumber(res, "relpages"); i_relbucket = PQfnumber(res, "relbucket"); i_parttype = PQfnumber(res, "parttype"); i_relrowmovement = PQfnumber(res, "relrowmovement"); @@ -7637,6 +7935,7 @@ TableInfo* getTables(Archive* fout, int* numTables) tblinfo[i].isblockchain = (strcmp(PQgetvalue(res, i, i_relhsblockchain), "t") == 0); tblinfo[i].isMOT = false; tblinfo[i].relreplident = *(PQgetvalue(res, i, i_relreplident)); + tblinfo[i].relpages = atoi(PQgetvalue(res, i, i_relpages)); tblinfo[i].frozenxid = atooid(PQgetvalue(res, i, i_relfrozenxid)); tblinfo[i].frozenxid64 = atoxid(PQgetvalue(res, i, i_relfrozenxid64)); tblinfo[i].toast_oid = atooid(PQgetvalue(res, i, i_toastoid)); @@ -24179,3 +24478,15 @@ void RemoveQuotes(char *str) { } str[writePtr] = '\0'; // Add null terminator at the end } + +static void +setupDumpWorker(Archive *AH) +{ + /* + * We want to re-select all the same values the leader connection is + * using. We'll have inherited directly-usable values in + * AH->sync_snapshot_id and AH->use_role, but we need to translate the + * inherited encoding value back to a string to pass to setup_connection. + */ + setup_connection(AH); +} \ No newline at end of file diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index ea3562b5ab333f509284c589bbddf2606125dc79..e0d932ceff5640dab0df594b43238d3df4d160c5 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -258,6 +258,8 @@ typedef struct _tableInfo { bool relrowmovement; bool isIncremental; /* for matview, true if is an incremental type */ + int relpages; /* table's size in pages (from pg_class) */ + bool interesting; /* true if need to collect more data */ int autoinc_attnum; DumpId autoincconstraint; @@ -323,6 +325,10 @@ typedef struct _tableDataInfo { char* filtercond; /* WHERE condition to limit rows dumped */ } TableDataInfo; +typedef struct _tableDataSplitInfo { + TableDataInfo *tdinfo; + char* splitcond; +} TableDataSplitInfo; typedef struct _indxInfo { DumpableObject dobj; TableInfo* indextable; /* link to table the index is for */