From 112a9755f22d9a5f28abbfdc748e0c5a0311aac7 Mon Sep 17 00:00:00 2001 From: wangzhijun2018 Date: Thu, 30 Jul 2020 17:46:33 +0800 Subject: [PATCH] support shared thread pool for Global Temporary Table --- src/common/backend/catalog/catalog.cpp | 3 +- src/common/backend/catalog/storage.cpp | 3 +- src/common/backend/catalog/storage_gtt.cpp | 213 ++++++++++-------- src/common/backend/parser/parse_utilcmd.cpp | 3 +- src/common/backend/utils/adt/dbsize.cpp | 3 +- src/common/backend/utils/cache/relcache.cpp | 5 +- src/common/backend/utils/misc/guc.cpp | 4 +- src/gausskernel/optimizer/commands/vacuum.cpp | 4 +- .../process/threadpool/knl_session.cpp | 13 ++ .../process/threadpool/knl_thread.cpp | 11 - src/gausskernel/storage/buffer/bufmgr.cpp | 30 +-- src/gausskernel/storage/buffer/localbuf.cpp | 141 ++++++------ src/gausskernel/storage/ipc/ipc.cpp | 5 + src/gausskernel/storage/ipc/procarray.cpp | 74 +++++- src/gausskernel/storage/lmgr/proc.cpp | 4 +- src/include/knl/knl_session.h | 31 +++ src/include/knl/knl_thread.h | 12 - src/include/storage/bufmgr.h | 8 +- src/include/storage/proc.h | 2 +- src/include/storage/procarray.h | 3 +- src/include/threadpool/threadpool.h | 1 + src/test/regress/expected/gtt_function.out | 2 - src/test/regress/expected/gtt_stats.out | 24 +- 23 files changed, 351 insertions(+), 248 deletions(-) diff --git a/src/common/backend/catalog/catalog.cpp b/src/common/backend/catalog/catalog.cpp index 635d7cc408..be323dcc11 100755 --- a/src/common/backend/catalog/catalog.cpp +++ b/src/common/backend/catalog/catalog.cpp @@ -53,6 +53,7 @@ #include "commands/directory.h" #include "cstore.h" #include "storage/custorage.h" +#include "threadpool/threadpool.h" #include "catalog/pg_resource_pool.h" #include "catalog/pg_workload_group.h" #include "catalog/pg_app_workloadgroup_mapping.h" @@ -1053,7 +1054,7 @@ Oid GetNewRelFileNode(Oid reltablespace, Relation pg_class, char relpersistence) switch (relpersistence) { case RELPERSISTENCE_GLOBAL_TEMP: - backend = t_thrd.proc_cxt.MyBackendId; + backend = BackendIdForTempRelations; break; case RELPERSISTENCE_TEMP: case RELPERSISTENCE_UNLOGGED: diff --git a/src/common/backend/catalog/storage.cpp b/src/common/backend/catalog/storage.cpp index a68b693d32..4ebaba1c66 100644 --- a/src/common/backend/catalog/storage.cpp +++ b/src/common/backend/catalog/storage.cpp @@ -39,6 +39,7 @@ #include "storage/freespace.h" #include "storage/lmgr.h" #include "storage/smgr.h" +#include "threadpool/threadpool.h" #include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -120,7 +121,7 @@ static void StorageSetBackendAndLogged(_in_ char relpersistence, _out_ BackendId } break; case RELPERSISTENCE_GLOBAL_TEMP: - *backend = t_thrd.proc_cxt.MyBackendId; + *backend = BackendIdForTempRelations; *needs_wal = false; break; case RELPERSISTENCE_UNLOGGED: diff --git a/src/common/backend/catalog/storage_gtt.cpp b/src/common/backend/catalog/storage_gtt.cpp index dec09a707e..4297ca3972 100644 --- a/src/common/backend/catalog/storage_gtt.cpp +++ b/src/common/backend/catalog/storage_gtt.cpp @@ -46,6 +46,7 @@ #include "storage/shmem.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" +#include "threadpool/threadpool.h" #include "utils/catcache.h" #include "gs_threadlocal.h" #include "utils/guc.h" @@ -61,14 +62,6 @@ #define BITMAPSET_SIZE(nwords) (offsetof(Bitmapset, words) + (nwords) * sizeof(bitmapword)) -THR_LOCAL bool gtt_cleaner_exit_registered = false; -THR_LOCAL HTAB* gtt_storage_local_hash = NULL; -THR_LOCAL MemoryContext gtt_relstats_context = NULL; - -/* relfrozenxid of all gtts in the current session */ -THR_LOCAL List* gtt_session_relfrozenxid_list = NIL; -THR_LOCAL TransactionId gtt_session_frozenxid = InvalidTransactionId; - struct gtt_ctl_data { LWLock lock; int max_entry; @@ -139,7 +132,7 @@ static Size action_gtt_shared_hash_entry_size(void) if (u_sess->attr.attr_storage.max_active_gtt <= 0) return 0; - wordnum = WORDNUM(g_instance.shmem_cxt.MaxBackends + 1); + wordnum = WORDNUM(MAX_BACKEND_SLOT + 1); hashEntrySize += MAXALIGN(sizeof(gtt_shared_hash_entry)); hashEntrySize += (size_t)MAXALIGN(BITMAPSET_SIZE(wordnum + 1)); @@ -167,7 +160,7 @@ void active_gtt_shared_hash_init(void) return; t_thrd.shemem_ptr_cxt.gtt_shared_ctl = - reinterpret_cast(ShmemInitStruct("gtt_shared_ctl", sizeof(gtt_ctl_data), &found)); + (gtt_ctl_data*)ShmemInitStruct("gtt_shared_ctl", sizeof(gtt_ctl_data), &found); if (!found) { LWLockRegisterTranche((int)LWTRANCHE_GTT_CTL, "gtt_shared_ctl"); @@ -199,8 +192,8 @@ static void gtt_storage_checkin(Oid relid) fnode.dbNode = u_sess->proc_cxt.MyDatabaseId; fnode.relNode = relid; (void)LWLockAcquire(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock, LW_EXCLUSIVE); - entry = reinterpret_cast(hash_search( - t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, reinterpret_cast(&(fnode)), HASH_ENTER_NULL, &found)); + entry = (gtt_shared_hash_entry*)hash_search( + t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, &fnode, HASH_ENTER_NULL, &found); if (entry == NULL) { LWLockRelease(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock); @@ -213,14 +206,14 @@ static void gtt_storage_checkin(Oid relid) if (found == false) { int wordnum; - entry->map = reinterpret_cast((char*)entry + MAXALIGN(sizeof(gtt_shared_hash_entry))); - wordnum = WORDNUM(g_instance.shmem_cxt.MaxBackends + 1); + entry->map = (Bitmapset*)((char*)entry + MAXALIGN(sizeof(gtt_shared_hash_entry))); + wordnum = WORDNUM(MAX_BACKEND_SLOT + 1); errno_t rc = memset_s(entry->map, (size_t)BITMAPSET_SIZE(wordnum + 1), 0, (size_t)BITMAPSET_SIZE(wordnum + 1)); securec_check(rc, "", ""); entry->map->nwords = wordnum + 1; } - (void)bms_add_member(entry->map, t_thrd.proc_cxt.MyBackendId); + (void)bms_add_member(entry->map, BackendIdForTempRelations); LWLockRelease(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock); } @@ -238,8 +231,7 @@ static void gtt_storage_checkout(Oid relid, bool skiplock, bool isCommit) (void)LWLockAcquire(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock, LW_EXCLUSIVE); } - entry = reinterpret_cast(hash_search( - t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, reinterpret_cast(&(fnode)), HASH_FIND, NULL)); + entry = (gtt_shared_hash_entry*)hash_search(t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, &fnode, HASH_FIND, NULL); if (entry == NULL) { if (!skiplock) { @@ -251,8 +243,8 @@ static void gtt_storage_checkout(Oid relid, bool skiplock, bool isCommit) return; } - Assert(t_thrd.proc_cxt.MyBackendId >= 1 && t_thrd.proc_cxt.MyBackendId <= g_instance.shmem_cxt.MaxBackends); - (void)bms_del_member(entry->map, t_thrd.proc_cxt.MyBackendId); + Assert(BackendIdForTempRelations >= 1 && BackendIdForTempRelations <= MAX_BACKEND_SLOT); + (void)bms_del_member(entry->map, BackendIdForTempRelations); if (bms_is_empty(entry->map)) { if (!hash_search(t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, &fnode, HASH_REMOVE, NULL)) { @@ -278,8 +270,7 @@ Bitmapset* copy_active_gtt_bitmap(Oid relid) fnode.dbNode = u_sess->proc_cxt.MyDatabaseId; fnode.relNode = relid; (void)LWLockAcquire(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock, LW_SHARED); - entry = reinterpret_cast(hash_search( - t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, reinterpret_cast(&(fnode)), HASH_FIND, NULL)); + entry = (gtt_shared_hash_entry*)hash_search(t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, &fnode, HASH_FIND, NULL); if (entry == NULL) { LWLockRelease(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock); @@ -307,8 +298,7 @@ bool is_other_backend_use_gtt(Oid relid) fnode.dbNode = u_sess->proc_cxt.MyDatabaseId; fnode.relNode = relid; (void)LWLockAcquire(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock, LW_SHARED); - entry = reinterpret_cast(hash_search( - t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, reinterpret_cast(&(fnode)), HASH_FIND, NULL)); + entry = (gtt_shared_hash_entry*)hash_search(t_thrd.shemem_ptr_cxt.active_gtt_shared_hash, &fnode, HASH_FIND, NULL); if (entry == NULL) { LWLockRelease(&t_thrd.shemem_ptr_cxt.gtt_shared_ctl->lock); @@ -316,13 +306,13 @@ bool is_other_backend_use_gtt(Oid relid) } Assert(entry->map); - Assert(t_thrd.proc_cxt.MyBackendId >= 1 && t_thrd.proc_cxt.MyBackendId <= g_instance.shmem_cxt.MaxBackends); + Assert(BackendIdForTempRelations >= 1 && BackendIdForTempRelations <= MAX_BACKEND_SLOT); int numUse = bms_num_members(entry->map); if (numUse == 0) { inUse = false; } else if (numUse == 1) { - if (bms_is_member(t_thrd.proc_cxt.MyBackendId, entry->map)) { + if (bms_is_member(BackendIdForTempRelations, entry->map)) { inUse = false; } else { inUse = true; @@ -358,7 +348,8 @@ void remember_gtt_storage_info(const RelFileNode rnode, Relation rel) (!rel->rd_index->indisvalid || !rel->rd_index->indisready)) { elog(ERROR, "invalid gtt index %s not allow to create storage", RelationGetRelationName(rel)); } - if (gtt_storage_local_hash == NULL) { + + if (u_sess->gtt_ctx.gtt_storage_local_hash == NULL) { #define GTT_LOCAL_HASH_SIZE 1024 /* First time through: initialize the hash table */ HASHCTL ctl; @@ -366,17 +357,19 @@ void remember_gtt_storage_info(const RelFileNode rnode, Relation rel) securec_check(rc, "", ""); ctl.keysize = sizeof(Oid); ctl.entrysize = sizeof(gtt_local_hash_entry); - gtt_storage_local_hash = + u_sess->gtt_ctx.gtt_storage_local_hash = hash_create("global temporary table info", GTT_LOCAL_HASH_SIZE, &ctl, HASH_ELEM | HASH_BLOBS); - if (!CacheMemoryContext) - CreateCacheMemoryContext(); + if (u_sess->cache_mem_cxt == nullptr) { + u_sess->cache_mem_cxt = + AllocSetContextCreate(u_sess->top_mem_cxt, "SessionCacheMemoryContext", ALLOCSET_DEFAULT_SIZES); + } - gtt_relstats_context = - AllocSetContextCreate(CacheMemoryContext, "gtt relstats context", ALLOCSET_DEFAULT_SIZES); + u_sess->gtt_ctx.gtt_relstats_context = + AllocSetContextCreate(u_sess->cache_mem_cxt, "gtt relstats context", ALLOCSET_DEFAULT_SIZES); } - oldcontext = MemoryContextSwitchTo(gtt_relstats_context); + oldcontext = MemoryContextSwitchTo(u_sess->gtt_ctx.gtt_relstats_context); entry = gtt_search_by_relid(relid, true); if (!entry) { @@ -384,12 +377,12 @@ void remember_gtt_storage_info(const RelFileNode rnode, Relation rel) int natts = 0; /* Look up or create an entry */ - entry = reinterpret_cast(hash_search(gtt_storage_local_hash, - reinterpret_cast(&relid), HASH_ENTER, &found)); + entry = (gtt_local_hash_entry*)hash_search( + u_sess->gtt_ctx.gtt_storage_local_hash, (void*)&relid, HASH_ENTER, &found); if (found) { (void)MemoryContextSwitchTo(oldcontext); - elog(ERROR, "backend %d relid %u already exists in gtt local hash", t_thrd.proc_cxt.MyBackendId, relid); + elog(ERROR, "backend %d relid %u already exists in gtt local hash", BackendIdForTempRelations, relid); } entry->relfilenode_list = NIL; @@ -401,8 +394,8 @@ void remember_gtt_storage_info(const RelFileNode rnode, Relation rel) entry->oldrelid = InvalidOid; natts = RelationGetNumberOfAttributes(rel); - entry->attnum = reinterpret_cast(palloc0(sizeof(int) * (unsigned long)natts)); - entry->att_stat_tups = reinterpret_cast(palloc0(sizeof(HeapTuple) * (unsigned long)natts)); + entry->attnum = (int*)palloc0(sizeof(int) * (unsigned long)natts); + entry->att_stat_tups = (HeapTuple*)palloc0(sizeof(HeapTuple) * (unsigned long)natts); entry->natts = natts; if (entry->relkind == RELKIND_RELATION) { @@ -417,7 +410,7 @@ void remember_gtt_storage_info(const RelFileNode rnode, Relation rel) } } - newNode = reinterpret_cast(palloc0(sizeof(gtt_relfilenode))); + newNode = (gtt_relfilenode*)palloc0(sizeof(gtt_relfilenode)); newNode->relfilenode = rnode.relNode; newNode->spcnode = rnode.spcNode; newNode->relpages = 0; @@ -437,9 +430,13 @@ void remember_gtt_storage_info(const RelFileNode rnode, Relation rel) (void)MemoryContextSwitchTo(oldcontext); - if (!gtt_cleaner_exit_registered) { - on_shmem_exit(gtt_storage_removeall, 0); - gtt_cleaner_exit_registered = true; + if (!u_sess->gtt_ctx.gtt_cleaner_exit_registered) { + if (!ENABLE_THREAD_POOL) { + on_shmem_exit(gtt_storage_removeall, 0); + } else { + u_sess->gtt_ctx.gtt_sess_exit = gtt_storage_removeall; + } + u_sess->gtt_ctx.gtt_cleaner_exit_registered = true; } return; @@ -486,7 +483,7 @@ void forget_gtt_storage_info(Oid relid, const RelFileNode rnode, bool isCommit) gtt_storage_checkout(relid, false, isCommit); gtt_free_statistics(entry); - (void*)hash_search(gtt_storage_local_hash, reinterpret_cast(&(relid)), HASH_REMOVE, NULL); + (void*)hash_search(u_sess->gtt_ctx.gtt_storage_local_hash, (void*)&(relid), HASH_REMOVE, NULL); } return; } @@ -515,7 +512,7 @@ void forget_gtt_storage_info(Oid relid, const RelFileNode rnode, bool isCommit) } gtt_free_statistics(entry); - (void*)hash_search(gtt_storage_local_hash, reinterpret_cast(&(relid)), HASH_REMOVE, NULL); + (void*)hash_search(u_sess->gtt_ctx.gtt_storage_local_hash, (void*)&(relid), HASH_REMOVE, NULL); } else gtt_reset_statistics(entry); @@ -553,30 +550,30 @@ static void gtt_storage_removeall(int code, Datum arg) unsigned long maxfiles = 0; unsigned long i = 0; - if (gtt_storage_local_hash == NULL) + if (u_sess->gtt_ctx.gtt_storage_local_hash == NULL) return; - hash_seq_init(&status, gtt_storage_local_hash); - while ((entry = reinterpret_cast(hash_seq_search(&status))) != NULL) { + hash_seq_init(&status, u_sess->gtt_ctx.gtt_storage_local_hash); + while ((entry = (gtt_local_hash_entry*)hash_seq_search(&status)) != NULL) { ListCell* lc; foreach (lc, entry->relfilenode_list) { SMgrRelation srel; RelFileNode rnode; - gtt_relfilenode* gtt_rnode = reinterpret_cast(lfirst(lc)); + gtt_relfilenode* gtt_rnode = (gtt_relfilenode*)lfirst(lc); rnode.spcNode = gtt_rnode->spcnode; rnode.dbNode = u_sess->proc_cxt.MyDatabaseId; rnode.relNode = gtt_rnode->relfilenode; rnode.bucketNode = InvalidBktId; - srel = smgropen(rnode, t_thrd.proc_cxt.MyBackendId); + srel = smgropen(rnode, BackendIdForTempRelations); if (maxfiles == 0) { maxfiles = INIT_ALLOC_NUMS; - srels = reinterpret_cast(palloc(sizeof(SMgrRelation) * maxfiles)); + srels = (SMgrRelation*)palloc(sizeof(SMgrRelation) * maxfiles); } else if (maxfiles <= nfiles) { maxfiles *= 2; - srels = reinterpret_cast(repalloc(srels, sizeof(SMgrRelation) * maxfiles)); + srels = (SMgrRelation*)repalloc(srels, sizeof(SMgrRelation) * maxfiles); } srels[nfiles++] = srel; @@ -584,12 +581,12 @@ static void gtt_storage_removeall(int code, Datum arg) if (maxrels == 0) { maxrels = INIT_ALLOC_NUMS; - relids = reinterpret_cast(palloc(sizeof(Oid) * maxrels)); - relkinds = reinterpret_cast(palloc(sizeof(char) * maxrels)); + relids = (Oid*)palloc(sizeof(Oid) * maxrels); + relkinds = (char*)palloc(sizeof(char) * maxrels); } else if (maxrels <= nrels) { maxrels *= 2; - relids = reinterpret_cast(repalloc(relids, sizeof(Oid) * maxrels)); - relkinds = reinterpret_cast(repalloc(relkinds, sizeof(char) * maxrels)); + relids = (Oid*)repalloc(relids, sizeof(Oid) * maxrels); + relkinds = (char*)repalloc(relkinds, sizeof(char) * maxrels); } relkinds[nrels] = entry->relkind; @@ -617,7 +614,11 @@ static void gtt_storage_removeall(int code, Datum arg) pfree(relkinds); } - t_thrd.proc->session_gtt_frozenxid = InvalidTransactionId; + if (ENABLE_THREAD_POOL) { + u_sess->gtt_ctx.gtt_session_frozenxid = InvalidTransactionId; + } else { + t_thrd.proc->gtt_session_frozenxid = InvalidTransactionId; + } return; } @@ -640,12 +641,12 @@ void up_gtt_relstats(const Relation relation, BlockNumber numPages, double numTu if (entry == NULL) return; - gttRnode = reinterpret_cast(lfirst(list_tail(entry->relfilenode_list))); + gttRnode = (gtt_relfilenode*)lfirst(list_tail(entry->relfilenode_list)); if (gttRnode == NULL) return; - if (gttRnode->relpages != static_cast(numPages)) { - gttRnode->relpages = static_cast(numPages); + if (gttRnode->relpages != int32(numPages)) { + gttRnode->relpages = int32(numPages); } if (numTuples >= 0 && gttRnode->reltuples != (float4)numTuples) @@ -653,8 +654,8 @@ void up_gtt_relstats(const Relation relation, BlockNumber numPages, double numTu /* only heap contain transaction information and relallvisible */ if (entry->relkind == RELKIND_RELATION) { - if (gttRnode->relallvisible >= 0 && gttRnode->relallvisible != static_cast(numAllVisiblePages)) { - gttRnode->relallvisible = static_cast(numAllVisiblePages); + if (gttRnode->relallvisible >= 0 && gttRnode->relallvisible != int32(numAllVisiblePages)) { + gttRnode->relallvisible = int32(numAllVisiblePages); } if (TransactionIdIsNormal(relfrozenxid) && gttRnode->relfrozenxid != relfrozenxid && @@ -689,7 +690,7 @@ bool get_gtt_relstats(Oid relid, BlockNumber* relpages, double* reltuples, Block Assert(entry->relid == relid); - gttRnode = reinterpret_cast(lfirst(list_tail(entry->relfilenode_list))); + gttRnode = (gtt_relfilenode*)lfirst(list_tail(entry->relfilenode_list)); if (gttRnode == NULL) return false; @@ -712,8 +713,7 @@ bool get_gtt_relstats(Oid relid, BlockNumber* relpages, double* reltuples, Block * Update global temp table statistic info(definition is same as pg_statistic) * to local hashtable where ananyze global temp table */ -void up_gtt_att_statistic( - Oid reloid, int attnum, int natts, TupleDesc tupleDescriptor, Datum* values, bool* isnull) +void up_gtt_att_statistic(Oid reloid, int attnum, int natts, TupleDesc tupleDescriptor, Datum* values, bool* isnull) { gtt_local_hash_entry* entry; MemoryContext oldcontext; @@ -731,7 +731,7 @@ void up_gtt_att_statistic( return; } - oldcontext = MemoryContextSwitchTo(gtt_relstats_context); + oldcontext = MemoryContextSwitchTo(u_sess->gtt_ctx.gtt_relstats_context); Assert(entry->relid == reloid); for (i = 0; i < entry->natts; i++) { if (entry->attnum[i] == 0) { @@ -793,46 +793,50 @@ static void insert_gtt_relfrozenxid_to_ordered_list(Oid relfrozenxid) Assert(TransactionIdIsNormal(relfrozenxid)); - oldcontext = MemoryContextSwitchTo(gtt_relstats_context); + oldcontext = MemoryContextSwitchTo(u_sess->gtt_ctx.gtt_relstats_context); /* Does the datum belong at the front? */ - if (gtt_session_relfrozenxid_list == NIL || - TransactionIdFollowsOrEquals(relfrozenxid, linitial_oid(gtt_session_relfrozenxid_list))) { - gtt_session_relfrozenxid_list = lcons_oid(relfrozenxid, gtt_session_relfrozenxid_list); - (void)MemoryContextSwitchTo(oldcontext); + if (u_sess->gtt_ctx.gtt_session_relfrozenxid_list == NIL || + TransactionIdFollowsOrEquals(relfrozenxid, linitial_oid(u_sess->gtt_ctx.gtt_session_relfrozenxid_list))) { + u_sess->gtt_ctx.gtt_session_relfrozenxid_list = + lcons_oid(relfrozenxid, u_sess->gtt_ctx.gtt_session_relfrozenxid_list); + MemoryContextSwitchTo(oldcontext); return; } /* No, so find the entry it belongs after */ i = 0; - foreach (cell, gtt_session_relfrozenxid_list) { + foreach (cell, u_sess->gtt_ctx.gtt_session_relfrozenxid_list) { if (TransactionIdFollowsOrEquals(relfrozenxid, lfirst_oid(cell))) break; i++; } - gtt_session_relfrozenxid_list = - list_insert_nth_oid(gtt_session_relfrozenxid_list, i, relfrozenxid); - (void)MemoryContextSwitchTo(oldcontext); + u_sess->gtt_ctx.gtt_session_relfrozenxid_list = + list_insert_nth_oid(u_sess->gtt_ctx.gtt_session_relfrozenxid_list, i, relfrozenxid); + MemoryContextSwitchTo(oldcontext); return; } static void remove_gtt_relfrozenxid_from_ordered_list(Oid relfrozenxid) { - gtt_session_relfrozenxid_list = list_delete_oid(gtt_session_relfrozenxid_list, relfrozenxid); + u_sess->gtt_ctx.gtt_session_relfrozenxid_list = + list_delete_oid(u_sess->gtt_ctx.gtt_session_relfrozenxid_list, relfrozenxid); } static void set_gtt_session_relfrozenxid(void) { TransactionId gtt_frozenxid = InvalidTransactionId; - if (gtt_session_relfrozenxid_list) - gtt_frozenxid = llast_oid(gtt_session_relfrozenxid_list); + if (u_sess->gtt_ctx.gtt_session_relfrozenxid_list) + gtt_frozenxid = llast_oid(u_sess->gtt_ctx.gtt_session_relfrozenxid_list); - gtt_session_frozenxid = gtt_frozenxid; - if (t_thrd.proc->session_gtt_frozenxid != gtt_frozenxid) - t_thrd.proc->session_gtt_frozenxid = gtt_frozenxid; + if (ENABLE_THREAD_POOL) { + u_sess->gtt_ctx.gtt_session_frozenxid = gtt_frozenxid; + } else { + t_thrd.proc->gtt_session_frozenxid = gtt_frozenxid; + } } Datum pg_get_gtt_statistics(PG_FUNCTION_ARGS) @@ -841,7 +845,7 @@ Datum pg_get_gtt_statistics(PG_FUNCTION_ARGS) int attnum = PG_GETARG_INT32(1); Oid reloid = PG_GETARG_OID(0); char relPersistence; - ReturnSetInfo* rsinfo = reinterpret_cast(fcinfo->resultinfo); + ReturnSetInfo* rsinfo = (ReturnSetInfo*)fcinfo->resultinfo; TupleDesc tupdesc; MemoryContext oldcontext; Tuplestorestate* tupstore; @@ -908,7 +912,7 @@ Datum pg_get_gtt_statistics(PG_FUNCTION_ARGS) Datum pg_get_gtt_relstats(PG_FUNCTION_ARGS) { - ReturnSetInfo* rsinfo = reinterpret_cast(fcinfo->resultinfo); + ReturnSetInfo* rsinfo = (ReturnSetInfo*)fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate* tupstore; MemoryContext oldcontext; @@ -956,7 +960,7 @@ Datum pg_get_gtt_relstats(PG_FUNCTION_ARGS) (void)get_gtt_relstats(reloid, &relpages, &reltuples, &relallvisible, &relfrozenxid); Oid relnode = gtt_fetch_current_relfilenode(reloid); if (relnode != InvalidOid) { - // output attribute: relfilenode | relpages | reltuples | relallvisible | relfrozenxid | relminmxid + // output attribute: relfilenode | relpages | reltuples | relallvisible | relfrozenxid | relminmxid Datum values[6]; bool isnull[6]; @@ -981,7 +985,7 @@ Datum pg_get_gtt_relstats(PG_FUNCTION_ARGS) Datum pg_gtt_attached_pid(PG_FUNCTION_ARGS) { - ReturnSetInfo* rsinfo = reinterpret_cast(fcinfo->resultinfo); + ReturnSetInfo* rsinfo = (ReturnSetInfo*)fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate* tupstore; MemoryContext oldcontext; @@ -1027,8 +1031,15 @@ Datum pg_gtt_attached_pid(PG_FUNCTION_ARGS) backendid = bms_first_member(map); do { - proc = BackendIdGetProc(backendid); - pid = proc->pid; + if (ENABLE_THREAD_POOL) { + ThreadPoolSessControl* sess_ctrl = g_threadPoolControler->GetSessionCtrl(); + knl_session_context* sess = sess_ctrl->GetSessionByIdx(backendid); + pid = sess->attachPid; + } else { + proc = BackendIdGetProc(backendid); + pid = proc->pid; + } + // output attribute: relid | pid Datum values[2]; bool isnull[2]; @@ -1054,7 +1065,7 @@ Datum pg_gtt_attached_pid(PG_FUNCTION_ARGS) Datum pg_list_gtt_relfrozenxids(PG_FUNCTION_ARGS) { - ReturnSetInfo* rsinfo = reinterpret_cast(fcinfo->resultinfo); + ReturnSetInfo* rsinfo = (ReturnSetInfo*)fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate* tupstore; MemoryContext oldcontext; @@ -1098,15 +1109,16 @@ Datum pg_list_gtt_relfrozenxids(PG_FUNCTION_ARGS) } pids = (ThreadId*)palloc0(sizeof(ThreadId) * numXid); - xids = reinterpret_cast(palloc0(sizeof(TransactionId) * numXid)); - TransactionId oldest = list_all_session_gtt_frozenxids(numXid, pids, xids, &i); + xids = (TransactionId*)palloc0(sizeof(TransactionId) * numXid); + TransactionId oldest = ENABLE_THREAD_POOL ? ListAllSessionGttFrozenxids(numXid, pids, xids, &i) + : ListAllThreadGttFrozenxids(numXid, pids, xids, &i); if (i > 0) { pids[i] = 0; xids[i] = oldest; i++; for (j = 0; j < i; j++) { - // output attribute: pid | relfrozenxid + // output attribute: pid | relfrozenxid Datum values[2]; bool isnull[2]; @@ -1263,18 +1275,21 @@ Oid gtt_fetch_current_relfilenode(Oid relid) gtt_local_hash_entry* entry; gtt_relfilenode* gttRnode = NULL; - if (u_sess->attr.attr_storage.max_active_gtt <= 0) + if (u_sess->attr.attr_storage.max_active_gtt <= 0) { return InvalidOid; + } entry = gtt_search_by_relid(relid, true); - if (entry == NULL) + if (entry == NULL) { return InvalidOid; + } Assert(entry->relid == relid); - gttRnode = reinterpret_cast(lfirst(list_tail(entry->relfilenode_list))); - if (gttRnode == NULL) + gttRnode = (gtt_relfilenode*)lfirst(list_tail(entry->relfilenode_list)); + if (gttRnode == NULL) { return InvalidOid; + } return gttRnode->relfilenode; } @@ -1290,7 +1305,7 @@ void gtt_switch_rel_relfilenode(Oid rel1, Oid relfilenode1, Oid rel2, Oid relfil if (u_sess->attr.attr_storage.max_active_gtt <= 0) return; - if (gtt_storage_local_hash == NULL) + if (u_sess->gtt_ctx.gtt_storage_local_hash == NULL) return; entry1 = gtt_search_by_relid(rel1, false); @@ -1299,7 +1314,7 @@ void gtt_switch_rel_relfilenode(Oid rel1, Oid relfilenode1, Oid rel2, Oid relfil entry2 = gtt_search_by_relid(rel2, false); gttRnode2 = gtt_search_relfilenode(entry2, relfilenode2, false); - oldcontext = MemoryContextSwitchTo(gtt_relstats_context); + oldcontext = MemoryContextSwitchTo(u_sess->gtt_ctx.gtt_relstats_context); entry1->relfilenode_list = list_delete_ptr(entry1->relfilenode_list, gttRnode1); entry2->relfilenode_list = lappend(entry2->relfilenode_list, gttRnode1); @@ -1323,7 +1338,7 @@ static gtt_relfilenode* gtt_search_relfilenode(const gtt_local_hash_entry* entry Assert(entry); foreach (lc, entry->relfilenode_list) { - gtt_relfilenode* gtt_rnode = reinterpret_cast(lfirst(lc)); + gtt_relfilenode* gtt_rnode = (gtt_relfilenode*)lfirst(lc); if (gtt_rnode->relfilenode == relfilenode) { rnode = gtt_rnode; break; @@ -1341,12 +1356,12 @@ static gtt_local_hash_entry* gtt_search_by_relid(Oid relid, bool missingOk) { gtt_local_hash_entry* entry = NULL; - if (gtt_storage_local_hash == NULL) { + if (u_sess->gtt_ctx.gtt_storage_local_hash == NULL) { return NULL; } - entry = reinterpret_cast(hash_search(gtt_storage_local_hash, - reinterpret_cast(&(relid)), HASH_FIND, NULL)); + entry = + (gtt_local_hash_entry*)hash_search(u_sess->gtt_ctx.gtt_storage_local_hash, (void*)&(relid), HASH_FIND, NULL); if (entry == NULL && !missingOk) { elog(ERROR, "relid %u not found in local hash", relid); diff --git a/src/common/backend/parser/parse_utilcmd.cpp b/src/common/backend/parser/parse_utilcmd.cpp index b9cae1919f..14ee9a447f 100644 --- a/src/common/backend/parser/parse_utilcmd.cpp +++ b/src/common/backend/parser/parse_utilcmd.cpp @@ -4199,8 +4199,7 @@ void checkPartitionSynax(CreateStmt* stmt) errmsg("Range partitioned table with INTERVAL clause has more than one column"), errhint("Only support one partition key for interval partition"))); } - if (!IsA(stmt->partTableState->intervalPartDef->partInterval, A_Const) || - ((A_Const*)stmt->partTableState->intervalPartDef->partInterval)->val.type != T_String) { + if (!IsA(stmt->partTableState->intervalPartDef->partInterval, A_Const)) { ereport(ERROR, (errcode(ERRCODE_INVALID_DATETIME_FORMAT), // errmsg("invalid input syntax for type %s: \"%s\"", datatype, str))); diff --git a/src/common/backend/utils/adt/dbsize.cpp b/src/common/backend/utils/adt/dbsize.cpp index 0b1bf5f39e..f68b807ea1 100755 --- a/src/common/backend/utils/adt/dbsize.cpp +++ b/src/common/backend/utils/adt/dbsize.cpp @@ -39,6 +39,7 @@ #include "pgxc/execRemote.h" #endif #include "storage/fd.h" +#include "threadpool/threadpool.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -1827,7 +1828,7 @@ Datum pg_relation_filepath(PG_FUNCTION_ARGS) break; case RELPERSISTENCE_GLOBAL_TEMP: - backend = t_thrd.proc_cxt.MyBackendId; + backend = BackendIdForTempRelations; break; default: ereport(ERROR, diff --git a/src/common/backend/utils/cache/relcache.cpp b/src/common/backend/utils/cache/relcache.cpp index 0d16b6627b..e1d5628c58 100644 --- a/src/common/backend/utils/cache/relcache.cpp +++ b/src/common/backend/utils/cache/relcache.cpp @@ -133,6 +133,7 @@ #include "rewrite/rewriteRlsPolicy.h" #include "storage/lmgr.h" #include "storage/smgr.h" +#include "threadpool/threadpool.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -1709,7 +1710,7 @@ static Relation relation_build_desc(Oid targetRelId, bool insertIt, bool buildke double reltuples = 0; BlockNumber relallvisible = 0; - relation->rd_backend = t_thrd.proc_cxt.MyBackendId; + relation->rd_backend = BackendIdForTempRelations; relation->rd_islocaltemp = false; get_gtt_relstats(RelationGetRelid(relation), &relpages, &reltuples, &relallvisible, NULL); relation->rd_rel->relpages = static_cast(relpages); @@ -3711,7 +3712,7 @@ Relation RelationBuildLocalRelation(const char* relname, Oid relnamespace, Tuple rel->rd_islocaltemp = true; break; case RELPERSISTENCE_GLOBAL_TEMP: // global temp table - rel->rd_backend = t_thrd.proc_cxt.MyBackendId; + rel->rd_backend = BackendIdForTempRelations; rel->rd_islocaltemp = false; break; default: diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index e881e256ec..95af67d25f 100644 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -5267,7 +5267,7 @@ static void init_configure_names_int() GUC_UNIT_BLOCKS }, &u_sess->attr.attr_storage.num_temp_buffers, - 1024, + 128, 100, INT_MAX / 2, check_temp_buffers, @@ -17908,7 +17908,7 @@ static bool check_temp_buffers(int* newval, void** extra, GucSource source) /* * Once local buffers have been initialized, it's too late to change this. */ - if (t_thrd.storage_cxt.NLocBuffer && t_thrd.storage_cxt.NLocBuffer != *newval) { + if (u_sess->storage_cxt.NLocBuffer && u_sess->storage_cxt.NLocBuffer != *newval) { GUC_check_errdetail( "\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session."); return false; diff --git a/src/gausskernel/optimizer/commands/vacuum.cpp b/src/gausskernel/optimizer/commands/vacuum.cpp index d0a1949f0b..dd58d0fcc3 100755 --- a/src/gausskernel/optimizer/commands/vacuum.cpp +++ b/src/gausskernel/optimizer/commands/vacuum.cpp @@ -1226,7 +1226,9 @@ void vac_update_datfrozenxid(void) */ if (u_sess->attr.attr_storage.max_active_gtt > 0) { TransactionId safeAge; - TransactionId oldestGttFrozenxid = list_all_session_gtt_frozenxids(0, NULL, NULL, NULL); + TransactionId oldestGttFrozenxid = ENABLE_THREAD_POOL ? + ListAllSessionGttFrozenxids(0, NULL, NULL, NULL) : + ListAllThreadGttFrozenxids(0, NULL, NULL, NULL); if (TransactionIdIsNormal(oldestGttFrozenxid)) { safeAge = diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index fb7b013b88..6f7ac037bb 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -743,6 +743,19 @@ static void knl_u_storage_init(knl_u_storage_context* storage_cxt) storage_cxt->twoPhaseCommitInProgress = false; storage_cxt->dumpHashbucketIdNum = 0; storage_cxt->dumpHashbucketIds = NULL; + + /* session local buffer */ + storage_cxt->NLocBuffer = 0; /* until buffers are initialized */ + storage_cxt->LocalBufferDescriptors = NULL; + storage_cxt->LocalBufferBlockPointers = NULL; + storage_cxt->LocalRefCount = NULL; + storage_cxt->nextFreeLocalBuf = 0; + storage_cxt->LocalBufHash = NULL; + storage_cxt->cur_block = NULL; + storage_cxt->next_buf_in_block = 0; + storage_cxt->num_bufs_in_block = 0; + storage_cxt->total_bufs_allocated = 0; + storage_cxt->LocalBufferContext = NULL; } static void knl_u_libpq_init(knl_u_libpq_context* libpq_cxt) diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 878ba9e9d6..a93216504c 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1148,17 +1148,6 @@ static void knl_t_storage_init(knl_t_storage_context* storage_cxt) storage_cxt->smoothed_alloc = 0; storage_cxt->smoothed_density = 10.0; storage_cxt->StrategyControl = NULL; - storage_cxt->NLocBuffer = 0; /* until buffers are initialized */ - storage_cxt->LocalBufferDescriptors = NULL; - storage_cxt->LocalBufferBlockPointers = NULL; - storage_cxt->LocalRefCount = NULL; - storage_cxt->nextFreeLocalBuf = 0; - storage_cxt->LocalBufHash = NULL; - storage_cxt->cur_block = NULL; - storage_cxt->next_buf_in_block = 0; - storage_cxt->num_bufs_in_block = 0; - storage_cxt->total_bufs_allocated = 0; - storage_cxt->LocalBufferContext = NULL; storage_cxt->CacheBlockInProgressIO = CACHE_BLOCK_INVALID_IDX; storage_cxt->CacheBlockInProgressUncompress = CACHE_BLOCK_INVALID_IDX; storage_cxt->MetaBlockInProgressIO = CACHE_BLOCK_INVALID_IDX; diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 91221cba0f..e0602eef32 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -2821,12 +2821,12 @@ Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber block_ if (BufferIsValid(buffer)) { if (BufferIsLocal(buffer)) { - buf_desc = &t_thrd.storage_cxt.LocalBufferDescriptors[-buffer - 1]; + buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[-buffer - 1]; if (buf_desc->tag.blockNum == block_num && RelFileNodeEquals(buf_desc->tag.rnode, relation->rd_node) && buf_desc->tag.forkNum == fork_num) return buffer; ResourceOwnerForgetBuffer(t_thrd.utils_cxt.CurrentResourceOwner, buffer); - t_thrd.storage_cxt.LocalRefCount[-buffer - 1]--; + u_sess->storage_cxt.LocalRefCount[-buffer - 1]--; } else { buf_desc = GetBufferDescriptor(buffer - 1); /* we have pin, so it's ok to examine tag without spinlock */ @@ -3887,9 +3887,9 @@ void PrintBufferLeakWarning(Buffer buffer) Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { - buf = &t_thrd.storage_cxt.LocalBufferDescriptors[-buffer - 1]; - loccount = t_thrd.storage_cxt.LocalRefCount[-buffer - 1]; - backend = t_thrd.proc_cxt.MyBackendId; + buf = &u_sess->storage_cxt.LocalBufferDescriptors[-buffer - 1]; + loccount = u_sess->storage_cxt.LocalRefCount[-buffer - 1]; + backend = BackendIdForTempRelations; } else { buf = GetBufferDescriptor(buffer - 1); loccount = GetPrivateRefCount(buffer); @@ -3998,7 +3998,7 @@ BlockNumber BufferGetBlockNumber(Buffer buffer) Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) { - buf_desc = &(t_thrd.storage_cxt.LocalBufferDescriptors[-buffer - 1]); + buf_desc = &(u_sess->storage_cxt.LocalBufferDescriptors[-buffer - 1]); } else { buf_desc = GetBufferDescriptor(buffer - 1); } @@ -4015,7 +4015,7 @@ void BufferGetTag(Buffer buffer, RelFileNode* rnode, ForkNumber* forknum, BlockN /* Do the same checks as BufferGetBlockNumber. */ Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) { - buf_desc = &(t_thrd.storage_cxt.LocalBufferDescriptors[-buffer - 1]); + buf_desc = &(u_sess->storage_cxt.LocalBufferDescriptors[-buffer - 1]); } else { buf_desc = GetBufferDescriptor(buffer - 1); } @@ -4386,7 +4386,7 @@ void DropRelFileNodeBuffers(const RelFileNodeBackend& rnode, ForkNumber forkNum, /* If it's a local relation, it's localbuf.c's problem. */ if (RelFileNodeBackendIsTemp(rnode)) { - if (rnode.backend == t_thrd.proc_cxt.MyBackendId) { + if (rnode.backend == BackendIdForTempRelations) { DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock); } @@ -4411,7 +4411,7 @@ void DropRelFileNodeAllBuffers(const RelFileNodeBackend& rnode) /* If it's a local relation, it's localbuf.c's problem. */ if (RelFileNodeBackendIsTemp(rnode)) { - if (rnode.backend == t_thrd.proc_cxt.MyBackendId) + if (rnode.backend == BackendIdForTempRelations) DropRelFileNodeAllLocalBuffers(rnode.node); gstrace_exit(GS_TRC_ID_DropRelFileNodeAllBuffers); @@ -4679,8 +4679,8 @@ void ReleaseBuffer(Buffer buffer) ResourceOwnerForgetBuffer(t_thrd.utils_cxt.CurrentResourceOwner, buffer); if (BufferIsLocal(buffer)) { - Assert(t_thrd.storage_cxt.LocalRefCount[-buffer - 1] > 0); - t_thrd.storage_cxt.LocalRefCount[-buffer - 1]--; + Assert(u_sess->storage_cxt.LocalRefCount[-buffer - 1] > 0); + u_sess->storage_cxt.LocalRefCount[-buffer - 1]--; return; } @@ -4726,7 +4726,7 @@ void IncrBufferRefCount(Buffer buffer) Assert(BufferIsPinned(buffer)); ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner); if (BufferIsLocal(buffer)) { - t_thrd.storage_cxt.LocalRefCount[-buffer - 1]++; + u_sess->storage_cxt.LocalRefCount[-buffer - 1]++; } else { PrivateRefCountEntry* ref = NULL; PrivateRefCountEntry *free_entry = NULL; @@ -5020,10 +5020,10 @@ void LockBufferForCleanup(Buffer buffer) if (BufferIsLocal(buffer)) { /* There should be exactly one pin */ - if (t_thrd.storage_cxt.LocalRefCount[-buffer - 1] != 1) { + if (u_sess->storage_cxt.LocalRefCount[-buffer - 1] != 1) { ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), - (errmsg("incorrect local pin count: %d", t_thrd.storage_cxt.LocalRefCount[-buffer - 1])))); + (errmsg("incorrect local pin count: %d", u_sess->storage_cxt.LocalRefCount[-buffer - 1])))); } /* Nobody else to wait for */ return; @@ -5123,7 +5123,7 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { - refcount = t_thrd.storage_cxt.LocalRefCount[-buffer - 1]; + refcount = u_sess->storage_cxt.LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ Assert(refcount > 0); if (refcount != 1) diff --git a/src/gausskernel/storage/buffer/localbuf.cpp b/src/gausskernel/storage/buffer/localbuf.cpp index e126f1c9c3..4169f98b8e 100644 --- a/src/gausskernel/storage/buffer/localbuf.cpp +++ b/src/gausskernel/storage/buffer/localbuf.cpp @@ -34,7 +34,7 @@ typedef struct { } LocalBufferLookupEnt; /* Note: this macro only works on local buffers, not shared ones! */ -#define LocalBufHdrGetBlock(bufHdr) t_thrd.storage_cxt.LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] +#define LocalBufHdrGetBlock(bufHdr) u_sess->storage_cxt.LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] static void InitLocalBuffers(void); static Block GetLocalBufferStorage(void); @@ -55,11 +55,11 @@ void LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum, BlockNumber bloc INIT_BUFFERTAG(new_tag, smgr->smgr_rnode.node, forkNum, blockNum); /* Initialize local buffers if first request in this session */ - if (t_thrd.storage_cxt.LocalBufHash == NULL) + if (u_sess->storage_cxt.LocalBufHash == NULL) InitLocalBuffers(); /* See if the desired buffer already exists */ - hresult = (LocalBufferLookupEnt*)hash_search(t_thrd.storage_cxt.LocalBufHash, (void*)&new_tag, HASH_FIND, NULL); + hresult = (LocalBufferLookupEnt*)hash_search(u_sess->storage_cxt.LocalBufHash, (void*)&new_tag, HASH_FIND, NULL); if (hresult != NULL) { /* Yes, so nothing to do */ return; @@ -76,7 +76,7 @@ void LocalBufferWrite(BufferDesc *bufHdr) Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); /* Find smgr relation for buffer */ - oreln = smgropen(bufHdr->tag.rnode, t_thrd.proc_cxt.MyBackendId); + oreln = smgropen(bufHdr->tag.rnode, BackendIdForTempRelations); PageSetChecksumInplace(localpage, bufHdr->tag.blockNum); @@ -102,12 +102,12 @@ void LocalBufferFlushAllBuffer() { int i; - for (i = 0; i < t_thrd.storage_cxt.NLocBuffer; i++) { - BufferDesc *bufHdr = &t_thrd.storage_cxt.LocalBufferDescriptors[i]; + for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { + BufferDesc *bufHdr = &u_sess->storage_cxt.LocalBufferDescriptors[i]; uint32 buf_state; buf_state = pg_atomic_read_u32(&bufHdr->state); - Assert(t_thrd.storage_cxt.LocalRefCount[i] == 0); + Assert(u_sess->storage_cxt.LocalRefCount[i] == 0); if ((buf_state & BM_VALID) && (buf_state & BM_DIRTY)) { LocalBufferFlushForExtremRTO(bufHdr); @@ -141,14 +141,14 @@ BufferDesc* LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber INIT_BUFFERTAG(new_tag, smgr->smgr_rnode.node, forkNum, blockNum); /* Initialize local buffers if first request in this session */ - if (t_thrd.storage_cxt.LocalBufHash == NULL) + if (u_sess->storage_cxt.LocalBufHash == NULL) InitLocalBuffers(); /* See if the desired buffer already exists */ - hresult = (LocalBufferLookupEnt*)hash_search(t_thrd.storage_cxt.LocalBufHash, (void*)&new_tag, HASH_FIND, NULL); + hresult = (LocalBufferLookupEnt*)hash_search(u_sess->storage_cxt.LocalBufHash, (void*)&new_tag, HASH_FIND, NULL); if (hresult != NULL) { b = hresult->id; - buf_desc = &t_thrd.storage_cxt.LocalBufferDescriptors[b]; + buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[b]; if (!BUFFERTAGS_EQUAL(buf_desc->tag, new_tag)) { ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), (errmsg("local buffer hash tag mismatch.")))); @@ -159,13 +159,13 @@ BufferDesc* LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber buf_state = pg_atomic_read_u32(&buf_desc->state); /* this part is equivalent to PinBuffer for a shared buffer */ - if (t_thrd.storage_cxt.LocalRefCount[b] == 0) { + if (u_sess->storage_cxt.LocalRefCount[b] == 0) { if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) { buf_state += BUF_USAGECOUNT_ONE; pg_atomic_write_u32(&buf_desc->state, buf_state); } } - t_thrd.storage_cxt.LocalRefCount[b]++; + u_sess->storage_cxt.LocalRefCount[b]++; ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, BufferDescriptorGetBuffer(buf_desc)); if (buf_state & BM_VALID) *foundPtr = TRUE; @@ -188,31 +188,31 @@ BufferDesc* LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber smgr->smgr_rnode.node.relNode, forkNum, blockNum, - -t_thrd.storage_cxt.nextFreeLocalBuf - 1); + -u_sess->storage_cxt.nextFreeLocalBuf - 1); #endif /* * Need to get a new buffer. We use a clock sweep algorithm (essentially * the same as what freelist.c does now...) */ - try_counter = t_thrd.storage_cxt.NLocBuffer; + try_counter = u_sess->storage_cxt.NLocBuffer; for (;;) { - b = t_thrd.storage_cxt.nextFreeLocalBuf; + b = u_sess->storage_cxt.nextFreeLocalBuf; - if (++t_thrd.storage_cxt.nextFreeLocalBuf >= t_thrd.storage_cxt.NLocBuffer) - t_thrd.storage_cxt.nextFreeLocalBuf = 0; + if (++u_sess->storage_cxt.nextFreeLocalBuf >= u_sess->storage_cxt.NLocBuffer) + u_sess->storage_cxt.nextFreeLocalBuf = 0; - buf_desc = &t_thrd.storage_cxt.LocalBufferDescriptors[b]; + buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[b]; - if (t_thrd.storage_cxt.LocalRefCount[b] == 0) { + if (u_sess->storage_cxt.LocalRefCount[b] == 0) { buf_state = pg_atomic_read_u32(&buf_desc->state); if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0) { buf_state -= BUF_USAGECOUNT_ONE; pg_atomic_write_u32(&buf_desc->state, buf_state); - try_counter = t_thrd.storage_cxt.NLocBuffer; + try_counter = u_sess->storage_cxt.NLocBuffer; } else { /* Found a usable buffer */ - t_thrd.storage_cxt.LocalRefCount[b]++; + u_sess->storage_cxt.LocalRefCount[b]++; ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, BufferDescriptorGetBuffer(buf_desc)); #ifdef EXTREME_RTO_DEBUG @@ -267,7 +267,7 @@ BufferDesc* LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber */ if (buf_state & BM_TAG_VALID) { hresult = - (LocalBufferLookupEnt*)hash_search(t_thrd.storage_cxt.LocalBufHash, (void*)&buf_desc->tag, HASH_REMOVE, NULL); + (LocalBufferLookupEnt*)hash_search(u_sess->storage_cxt.LocalBufHash, (void*)&buf_desc->tag, HASH_REMOVE, NULL); if (hresult == NULL) /* shouldn't happen */ ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), (errmsg("local buffer hash table corrupted.")))); /* mark buffer invalid just in case hash insert fails */ @@ -276,7 +276,7 @@ BufferDesc* LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber pg_atomic_write_u32(&buf_desc->state, buf_state); } - hresult = (LocalBufferLookupEnt*)hash_search(t_thrd.storage_cxt.LocalBufHash, (void*)&new_tag, HASH_ENTER, &found); + hresult = (LocalBufferLookupEnt*)hash_search(u_sess->storage_cxt.LocalBufHash, (void*)&new_tag, HASH_ENTER, &found); if (found) /* shouldn't happen */ ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), (errmsg("local buffer hash table corrupted.")))); hresult->id = b; @@ -320,9 +320,9 @@ void MarkLocalBufferDirty(Buffer buffer) buf_id = -(buffer + 1); - Assert(t_thrd.storage_cxt.LocalRefCount[buf_id] > 0); + Assert(u_sess->storage_cxt.LocalRefCount[buf_id] > 0); - buf_desc = &t_thrd.storage_cxt.LocalBufferDescriptors[buf_id]; + buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[buf_id]; buf_state = pg_atomic_fetch_or_u32(&buf_desc->state, BM_DIRTY); if (!(buf_state & BM_DIRTY)) { @@ -353,25 +353,25 @@ void DropRelFileNodeLocalBuffers(const RelFileNode& rnode, ForkNumber forkNum, B { int i; - for (i = 0; i < t_thrd.storage_cxt.NLocBuffer; i++) { - BufferDesc* buf_desc = &t_thrd.storage_cxt.LocalBufferDescriptors[i]; + for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { + BufferDesc* buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[i]; LocalBufferLookupEnt* hresult = NULL; uint32 buf_state; buf_state = pg_atomic_read_u32(&buf_desc->state); if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(buf_desc->tag.rnode, rnode) && buf_desc->tag.forkNum == forkNum && buf_desc->tag.blockNum >= firstDelBlock) { - if (t_thrd.storage_cxt.LocalRefCount[i] != 0) { + if (u_sess->storage_cxt.LocalRefCount[i] != 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER_REFERENCE), (errmsg("block %u of %s is still referenced (local %d)", buf_desc->tag.blockNum, - relpathbackend(buf_desc->tag.rnode, t_thrd.proc_cxt.MyBackendId, buf_desc->tag.forkNum), - t_thrd.storage_cxt.LocalRefCount[i])))); + relpathbackend(buf_desc->tag.rnode, BackendIdForTempRelations, buf_desc->tag.forkNum), + u_sess->storage_cxt.LocalRefCount[i])))); } /* Remove entry from hashtable */ hresult = (LocalBufferLookupEnt*)hash_search( - t_thrd.storage_cxt.LocalBufHash, (void*)&buf_desc->tag, HASH_REMOVE, NULL); + u_sess->storage_cxt.LocalBufHash, (void*)&buf_desc->tag, HASH_REMOVE, NULL); if (hresult == NULL) /* shouldn't happen */ ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), (errmsg("local buffer hash table corrupted.")))); /* Mark buffer invalid */ @@ -394,24 +394,24 @@ void DropRelFileNodeAllLocalBuffers(const RelFileNode& rnode) { int i; - for (i = 0; i < t_thrd.storage_cxt.NLocBuffer; i++) { - BufferDesc* buf_desc = &t_thrd.storage_cxt.LocalBufferDescriptors[i]; + for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { + BufferDesc* buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[i]; LocalBufferLookupEnt* hresult = NULL; uint32 buf_state; buf_state = pg_atomic_read_u32(&buf_desc->state); if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(buf_desc->tag.rnode, rnode)) { - if (t_thrd.storage_cxt.LocalRefCount[i] != 0) { + if (u_sess->storage_cxt.LocalRefCount[i] != 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER_REFERENCE), (errmsg("block %u of %s is still referenced (local %d)", buf_desc->tag.blockNum, - relpathbackend(buf_desc->tag.rnode, t_thrd.proc_cxt.MyBackendId, buf_desc->tag.forkNum), - t_thrd.storage_cxt.LocalRefCount[i])))); + relpathbackend(buf_desc->tag.rnode, BackendIdForTempRelations, buf_desc->tag.forkNum), + u_sess->storage_cxt.LocalRefCount[i])))); } /* Remove entry from hashtable */ hresult = (LocalBufferLookupEnt*)hash_search( - t_thrd.storage_cxt.LocalBufHash, (void*)&buf_desc->tag, HASH_REMOVE, NULL); + u_sess->storage_cxt.LocalBufHash, (void*)&buf_desc->tag, HASH_REMOVE, NULL); if (hresult == NULL) /* shouldn't happen */ ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), (errmsg("local buffer hash table corrupted.")))); /* Mark buffer invalid */ @@ -436,21 +436,21 @@ static void InitLocalBuffers(void) int i; /* Allocate and zero buffer headers and auxiliary arrays */ - t_thrd.storage_cxt.LocalBufferDescriptors = - (BufferDesc*)MemoryContextAllocZero(t_thrd.top_mem_cxt, (unsigned int)nbufs * sizeof(BufferDesc)); - t_thrd.storage_cxt.LocalBufferBlockPointers = - (Block*)MemoryContextAllocZero(t_thrd.top_mem_cxt, (unsigned int)nbufs * sizeof(Block)); - t_thrd.storage_cxt.LocalRefCount = - (int32*)MemoryContextAllocZero(t_thrd.top_mem_cxt, (unsigned int)nbufs * sizeof(int32)); - if (!t_thrd.storage_cxt.LocalBufferDescriptors || !t_thrd.storage_cxt.LocalBufferBlockPointers || - !t_thrd.storage_cxt.LocalRefCount) + u_sess->storage_cxt.LocalBufferDescriptors = + (BufferDesc*)MemoryContextAllocZero(u_sess->top_mem_cxt, (unsigned int)nbufs * sizeof(BufferDesc)); + u_sess->storage_cxt.LocalBufferBlockPointers = + (Block*)MemoryContextAllocZero(u_sess->top_mem_cxt, (unsigned int)nbufs * sizeof(Block)); + u_sess->storage_cxt.LocalRefCount = + (int32*)MemoryContextAllocZero(u_sess->top_mem_cxt, (unsigned int)nbufs * sizeof(int32)); + if (!u_sess->storage_cxt.LocalBufferDescriptors || !u_sess->storage_cxt.LocalBufferBlockPointers || + !u_sess->storage_cxt.LocalRefCount) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); - t_thrd.storage_cxt.nextFreeLocalBuf = 0; + u_sess->storage_cxt.nextFreeLocalBuf = 0; /* initialize fields that need to start off nonzero */ for (i = 0; i < nbufs; i++) { - BufferDesc* buf = &t_thrd.storage_cxt.LocalBufferDescriptors[i]; + BufferDesc* buf = &u_sess->storage_cxt.LocalBufferDescriptors[i]; /* * negative to indicate local buffer. This is tricky: shared buffers @@ -468,13 +468,16 @@ static void InitLocalBuffers(void) info.entrysize = sizeof(LocalBufferLookupEnt); info.hash = tag_hash; - t_thrd.storage_cxt.LocalBufHash = hash_create("Local Buffer Lookup Table", nbufs, &info, HASH_ELEM | HASH_FUNCTION); + u_sess->storage_cxt.LocalBufHash = hash_create( + "Local Buffer Lookup Table", nbufs, &info, HASH_ELEM | HASH_FUNCTION); - if (!t_thrd.storage_cxt.LocalBufHash) - ereport(ERROR, (errcode(ERRCODE_INITIALIZE_FAILED), (errmsg("could not initialize local buffer hash table.")))); + if (!u_sess->storage_cxt.LocalBufHash) { + ereport(ERROR, (errcode(ERRCODE_INITIALIZE_FAILED), + (errmsg("could not initialize local buffer hash table.")))); + } /* Initialization done, mark buffers allocated */ - t_thrd.storage_cxt.NLocBuffer = nbufs; + u_sess->storage_cxt.NLocBuffer = nbufs; } /* @@ -490,9 +493,9 @@ static Block GetLocalBufferStorage(void) { char* this_buf = NULL; - Assert(t_thrd.storage_cxt.total_bufs_allocated < t_thrd.storage_cxt.NLocBuffer); + Assert(u_sess->storage_cxt.total_bufs_allocated < u_sess->storage_cxt.NLocBuffer); - if (t_thrd.storage_cxt.next_buf_in_block >= t_thrd.storage_cxt.num_bufs_in_block) { + if (u_sess->storage_cxt.next_buf_in_block >= u_sess->storage_cxt.num_bufs_in_block) { /* Need to make a new request to memmgr */ int num_bufs; @@ -501,30 +504,30 @@ static Block GetLocalBufferStorage(void) * space eaten for them is easily recognizable in MemoryContextStats * output. Create the context on first use. */ - if (t_thrd.storage_cxt.LocalBufferContext == NULL) - t_thrd.storage_cxt.LocalBufferContext = AllocSetContextCreate(t_thrd.top_mem_cxt, + if (u_sess->storage_cxt.LocalBufferContext == NULL) + u_sess->storage_cxt.LocalBufferContext = AllocSetContextCreate(u_sess->top_mem_cxt, "LocalBufferContext", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); /* Start with a 16-buffer request; subsequent ones double each time */ - num_bufs = Max(t_thrd.storage_cxt.num_bufs_in_block * 2, 16); + num_bufs = Max(u_sess->storage_cxt.num_bufs_in_block * 2, 16); /* But not more than what we need for all remaining local bufs */ - num_bufs = Min(num_bufs, t_thrd.storage_cxt.NLocBuffer - t_thrd.storage_cxt.total_bufs_allocated); + num_bufs = Min(num_bufs, u_sess->storage_cxt.NLocBuffer - u_sess->storage_cxt.total_bufs_allocated); /* And don't overflow MaxAllocSize, either */ num_bufs = Min((unsigned int)(num_bufs), MaxAllocSize / BLCKSZ); - t_thrd.storage_cxt.cur_block = - (char*)MemoryContextAlloc(t_thrd.storage_cxt.LocalBufferContext, num_bufs * BLCKSZ); - t_thrd.storage_cxt.next_buf_in_block = 0; - t_thrd.storage_cxt.num_bufs_in_block = num_bufs; + u_sess->storage_cxt.cur_block = + (char*)MemoryContextAlloc(u_sess->storage_cxt.LocalBufferContext, num_bufs * BLCKSZ); + u_sess->storage_cxt.next_buf_in_block = 0; + u_sess->storage_cxt.num_bufs_in_block = num_bufs; } /* Allocate next buffer in current memory block */ - this_buf = t_thrd.storage_cxt.cur_block + t_thrd.storage_cxt.next_buf_in_block * BLCKSZ; - t_thrd.storage_cxt.next_buf_in_block++; - t_thrd.storage_cxt.total_bufs_allocated++; + this_buf = u_sess->storage_cxt.cur_block + u_sess->storage_cxt.next_buf_in_block * BLCKSZ; + u_sess->storage_cxt.next_buf_in_block++; + u_sess->storage_cxt.total_bufs_allocated++; return (Block)this_buf; } @@ -540,8 +543,8 @@ void AtEOXact_LocalBuffers(bool isCommit) if (assert_enabled) { int i; - for (i = 0; i < t_thrd.storage_cxt.NLocBuffer; i++) { - Assert(t_thrd.storage_cxt.LocalRefCount[i] == 0); + for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { + Assert(u_sess->storage_cxt.LocalRefCount[i] == 0); } } #endif @@ -558,11 +561,11 @@ void AtEOXact_LocalBuffers(bool isCommit) void AtProcExit_LocalBuffers(void) { #ifdef USE_ASSERT_CHECKING - if (assert_enabled && t_thrd.storage_cxt.LocalRefCount) { + if (assert_enabled && u_sess->storage_cxt.LocalRefCount) { int i; - for (i = 0; i < t_thrd.storage_cxt.NLocBuffer; i++) { - Assert(t_thrd.storage_cxt.LocalRefCount[i] == 0); + for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { + Assert(u_sess->storage_cxt.LocalRefCount[i] == 0); } } #endif diff --git a/src/gausskernel/storage/ipc/ipc.cpp b/src/gausskernel/storage/ipc/ipc.cpp index 4fd99a3df7..f8a5d8afad 100644 --- a/src/gausskernel/storage/ipc/ipc.cpp +++ b/src/gausskernel/storage/ipc/ipc.cpp @@ -436,6 +436,11 @@ void sess_exit_prepare(int code) t_thrd.proc_cxt.sess_exit_inprogress = true; old_sigset = gs_signal_block_sigusr2(); + if (u_sess->gtt_ctx.gtt_cleaner_exit_registered) { + pg_on_exit_callback func = u_sess->gtt_ctx.gtt_sess_exit; + (*func)(code, UInt32GetDatum(NULL)); + } + for (; u_sess->on_sess_exit_index < on_sess_exit_size; u_sess->on_sess_exit_index++) (*on_sess_exit_list[u_sess->on_sess_exit_index])(code, UInt32GetDatum(NULL)); diff --git a/src/gausskernel/storage/ipc/procarray.cpp b/src/gausskernel/storage/ipc/procarray.cpp index 8308789aa4..5e7d069cda 100644 --- a/src/gausskernel/storage/ipc/procarray.cpp +++ b/src/gausskernel/storage/ipc/procarray.cpp @@ -113,6 +113,7 @@ #include "catalog/pg_control.h" #include "pgstat.h" #include "storage/lwlock.h" +#include "threadpool/threadpool_sessctl.h" #include "access/multi_redo_api.h" #include "gstrace/gstrace_infra.h" #include "gstrace/storage_gstrace.h" @@ -4528,7 +4529,7 @@ char* transfer_snapshot_type(SnapshotType snap_type) * search all active backend to get oldest frozenxid * for global temp table. */ -TransactionId list_all_session_gtt_frozenxids(int max_size, ThreadId *pids, TransactionId *xids, int *n) +TransactionId ListAllThreadGttFrozenxids(int maxSize, ThreadId *pids, TransactionId *xids, int *n) { ProcArrayStruct *arrayP = g_instance.proc_array_idx; TransactionId result = InvalidTransactionId; @@ -4539,7 +4540,7 @@ TransactionId list_all_session_gtt_frozenxids(int max_size, ThreadId *pids, Tran if (u_sess->attr.attr_storage.max_active_gtt <= 0) return 0; - if (max_size > 0) { + if (maxSize > 0) { Assert(pids); Assert(xids); Assert(n); @@ -4556,7 +4557,7 @@ TransactionId list_all_session_gtt_frozenxids(int max_size, ThreadId *pids, Tran flags |= PROC_IN_LOGICAL_DECODING; LWLockAcquire(ProcArrayLock, LW_SHARED); - if (max_size > 0 && max_size < arrayP->numProcs) { + if (maxSize > 0 && maxSize < arrayP->numProcs) { LWLockRelease(ProcArrayLock); elog(ERROR, "list_all_gtt_frozenxids require more array"); } @@ -4570,23 +4571,76 @@ TransactionId list_all_session_gtt_frozenxids(int max_size, ThreadId *pids, Tran continue; if (proc->databaseId == u_sess->proc_cxt.MyDatabaseId && - TransactionIdIsNormal(proc->session_gtt_frozenxid)) { + TransactionIdIsNormal(proc->gtt_session_frozenxid)) { if (result == InvalidTransactionId) - result = proc->session_gtt_frozenxid; - else if (TransactionIdPrecedes(proc->session_gtt_frozenxid, result)) - result = proc->session_gtt_frozenxid; + result = proc->gtt_session_frozenxid; + else if (TransactionIdPrecedes(proc->gtt_session_frozenxid, result)) + result = proc->gtt_session_frozenxid; - if (max_size > 0) { + if (maxSize > 0) { pids[i] = proc->pid; - xids[i] = proc->session_gtt_frozenxid; + xids[i] = proc->gtt_session_frozenxid; i++; } } } LWLockRelease(ProcArrayLock); - if (max_size > 0) { + if (maxSize > 0) { *n = i; } return result; } +TransactionId ListAllSessionGttFrozenxids(int maxSize, ThreadId *pids, TransactionId *xids, int *n) +{ + TransactionId result = InvalidTransactionId; + int i = 0; + + if (u_sess->attr.attr_storage.max_active_gtt <= 0) { + return 0; + } + + if (maxSize > 0) { + Assert(pids); + Assert(xids); + Assert(n); + *n = 0; + } + + if (u_sess->attr.attr_storage.max_active_gtt <= 0) { + return InvalidTransactionId; + } + + if (RecoveryInProgress()) { + return InvalidTransactionId; + } + + ThreadPoolSessControl *sessCtrl = g_threadPoolControler->GetSessionCtrl(); + const knl_sess_control *sessList = sessCtrl->GetSessionList(); + const knl_sess_control *currSess = sessList; + while (currSess != nullptr) { + knl_session_context *session = currSess->sess; + if (session->proc_cxt.MyDatabaseId == u_sess->proc_cxt.MyDatabaseId && + TransactionIdIsNormal(session->gtt_ctx.gtt_session_frozenxid)) { + if (result == InvalidTransactionId) { + result = session->gtt_ctx.gtt_session_frozenxid; + } else if (TransactionIdPrecedes(session->gtt_ctx.gtt_session_frozenxid, result)) { + result = session->gtt_ctx.gtt_session_frozenxid; + } + + if (maxSize > 0) { + pids[i] = session->attachPid; + xids[i] = session->gtt_ctx.gtt_session_frozenxid; + i++; + } + } + currSess = currSess->next; + } + + if (maxSize > 0) { + *n = i; + } + return result; +} + + diff --git a/src/gausskernel/storage/lmgr/proc.cpp b/src/gausskernel/storage/lmgr/proc.cpp index d5d11727fa..ce2eb19b01 100755 --- a/src/gausskernel/storage/lmgr/proc.cpp +++ b/src/gausskernel/storage/lmgr/proc.cpp @@ -573,7 +573,7 @@ void InitProcess(void) t_thrd.proc->backendId = InvalidBackendId; t_thrd.proc->databaseId = InvalidOid; t_thrd.proc->roleId = InvalidOid; - t_thrd.proc->session_gtt_frozenxid = InvalidTransactionId; /* init session level gtt frozenxid */ + t_thrd.proc->gtt_session_frozenxid = InvalidTransactionId; /* init session level gtt frozenxid */ /* For backends, upgrade status is either passed down from remote backends or inherit from PM */ t_thrd.proc->workingVersionNum = (u_sess->proc_cxt.MyProcPort ? u_sess->proc_cxt.MyProcPort->SessionVersionNum : pg_atomic_read_u32(&WorkingGrandVersionNum)); @@ -787,7 +787,7 @@ void InitAuxiliaryProcess(void) t_thrd.proc->backendId = InvalidBackendId; t_thrd.proc->databaseId = InvalidOid; t_thrd.proc->roleId = InvalidOid; - t_thrd.proc->session_gtt_frozenxid = InvalidTransactionId; /* init session level gtt frozenxid */ + t_thrd.proc->gtt_session_frozenxid = InvalidTransactionId; /* init session level gtt frozenxid */ t_thrd.pgxact->delayChkpt = false; t_thrd.pgxact->vacuumFlags = 0; t_thrd.proc->lwWaiting = false; diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 61041d80e5..4941823990 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -61,6 +61,8 @@ #include "storage/shmem.h" #include "utils/palloc.h" +typedef void (*pg_on_exit_callback)(int code, Datum arg); + /* all session level attribute which expose to user. */ typedef struct knl_session_attr { knl_session_attr_sql attr_sql; @@ -1300,6 +1302,7 @@ typedef struct knl_u_stat_context { #define MAX_LOCKMETHOD 2 typedef uint16 CycleCtr; +typedef void* Block; typedef struct knl_u_storage_context { /* * How many buffers PrefetchBuffer callers should try to stay ahead of their @@ -1387,6 +1390,20 @@ typedef struct knl_u_storage_context { bool twoPhaseCommitInProgress; int32 dumpHashbucketIdNum; int2 *dumpHashbucketIds; + + /* Pointers to shared state */ + // struct BufferStrategyControl* StrategyControl; + int NLocBuffer; /* until buffers are initialized */ + struct BufferDesc* LocalBufferDescriptors; + Block* LocalBufferBlockPointers; + int32* LocalRefCount; + int nextFreeLocalBuf; + struct HTAB* LocalBufHash; + char* cur_block; + int next_buf_in_block; + int num_bufs_in_block; + int total_bufs_allocated; + MemoryContext LocalBufferContext; } knl_u_storage_context; @@ -1986,6 +2003,17 @@ typedef struct knl_u_mot_context { MOT::TxnManager* jit_txn; } knl_u_mot_context; +typedef struct knl_u_gtt_context { + bool gtt_cleaner_exit_registered; + HTAB* gtt_storage_local_hash; + MemoryContext gtt_relstats_context; + + /* relfrozenxid of all gtts in the current session */ + List* gtt_session_relfrozenxid_list; + TransactionId gtt_session_frozenxid; + pg_on_exit_callback gtt_sess_exit; +} knl_u_gtt_context; + enum knl_session_status { KNL_SESS_FAKE, KNL_SESS_UNINIT, @@ -2075,6 +2103,9 @@ typedef struct knl_session_context { knl_u_unique_sql_context unique_sql_cxt; knl_u_user_login_context user_login_cxt; knl_u_percentile_context percentile_cxt; + + /* GTT */ + knl_u_gtt_context gtt_ctx; } knl_session_context; extern knl_session_context* create_session_context(MemoryContext parent, uint64 id); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 9ae6fc788f..c09c6ea919 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2211,7 +2211,6 @@ typedef struct knl_t_walrcvwriter_context { } knl_t_walrcvwriter_context; typedef int CacheSlotId_t; -typedef void* Block; typedef void (*pg_on_exit_callback)(int code, Datum arg); typedef void (*shmem_startup_hook_type)(void); typedef struct ONEXIT { @@ -2293,17 +2292,6 @@ typedef struct knl_t_storage_context { /* Pointers to shared state */ struct BufferStrategyControl* StrategyControl; - int NLocBuffer; /* until buffers are initialized */ - struct BufferDesc* LocalBufferDescriptors; - Block* LocalBufferBlockPointers; - int32* LocalRefCount; - int nextFreeLocalBuf; - struct HTAB* LocalBufHash; - char* cur_block; - int next_buf_in_block; - int num_bufs_in_block; - int total_bufs_allocated; - MemoryContext LocalBufferContext; /* remember global block slot in progress */ CacheSlotId_t CacheBlockInProgressIO; CacheSlotId_t CacheBlockInProgressUncompress; diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index e40d33f414..a09865eb10 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -120,7 +120,7 @@ struct WritebackContext; */ #define BufferIsPinned(bufnum) \ (!BufferIsValid(bufnum) ? false \ - : BufferIsLocal(bufnum) ? (t_thrd.storage_cxt.LocalRefCount[-(bufnum)-1] > 0) \ + : BufferIsLocal(bufnum) ? (u_sess->storage_cxt.LocalRefCount[-(bufnum)-1] > 0) \ : (GetPrivateRefCount(bufnum) > 0)) /* @@ -147,7 +147,7 @@ struct WritebackContext; * now demoted the range checks to assertions within the macro itself. */ #define BufferIsValid(bufnum) \ - (AssertMacro((bufnum) <= g_instance.attr.attr_storage.NBuffers && (bufnum) >= -t_thrd.storage_cxt.NLocBuffer), \ + (AssertMacro((bufnum) <= g_instance.attr.attr_storage.NBuffers && (bufnum) >= -u_sess->storage_cxt.NLocBuffer), \ (bufnum) != InvalidBuffer) /* @@ -159,7 +159,7 @@ struct WritebackContext; */ #define BufferGetBlock(buffer) \ (AssertMacro(BufferIsValid(buffer)), \ - BufferIsLocal(buffer) ? t_thrd.storage_cxt.LocalBufferBlockPointers[-(buffer)-1] \ + BufferIsLocal(buffer) ? u_sess->storage_cxt.LocalBufferBlockPointers[-(buffer)-1] \ : (Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)((uint)(buffer)-1)) * BLCKSZ)) /* @@ -186,7 +186,7 @@ struct WritebackContext; #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) /* Note: this macro only works on local buffers, not shared ones! */ -#define LocalBufHdrGetBlock(bufHdr) t_thrd.storage_cxt.LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] +#define LocalBufHdrGetBlock(bufHdr) u_sess->storage_cxt.LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] #define LocalBufGetLSN(bufHdr) (PageGetLSN(LocalBufHdrGetBlock(bufHdr))) /* diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 768540a057..b0a14ae3e5 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -128,7 +128,7 @@ struct PGPROC { ThreadId sessMemorySessionid; uint64 sessionid; /* if not zero, session id in thread pool*/ int logictid; /*logic thread id*/ - TransactionId session_gtt_frozenxid; /* session level global temp table relfrozenxid */ + TransactionId gtt_session_frozenxid; /* session level global temp table relfrozenxid */ int pgprocno; int nodeno; diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 287ab509f4..94602e0a7b 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -128,5 +128,6 @@ extern void ResetProcXidCache(PGPROC* proc, bool needlock); #endif /* USE_UT */ // For GTT -extern TransactionId list_all_session_gtt_frozenxids(int max_size, ThreadId *pids, TransactionId *xids, int *n); +extern TransactionId ListAllThreadGttFrozenxids(int maxSize, ThreadId *pids, TransactionId *xids, int *n); +extern TransactionId ListAllSessionGttFrozenxids(int maxSize, ThreadId *pids, TransactionId *xids, int *n); diff --git a/src/include/threadpool/threadpool.h b/src/include/threadpool/threadpool.h index ac1d057925..ecf3c253b1 100755 --- a/src/include/threadpool/threadpool.h +++ b/src/include/threadpool/threadpool.h @@ -36,6 +36,7 @@ #define ENABLE_THREAD_POOL (g_threadPoolControler != NULL) #define IS_THREAD_POOL_WORKER (t_thrd.role == THREADPOOL_WORKER) #define IS_THREAD_POOL_SESSION (u_sess->session_id > 0) +#define BackendIdForTempRelations (ENABLE_THREAD_POOL ? (BackendId)u_sess->session_ctr_index : t_thrd.proc_cxt.MyBackendId) #define THREAD_CORE_RATIO 1 #define DEFAULT_THREAD_POOL_SIZE 16 #define DEFAULT_THREAD_POOL_GROUPS 2 diff --git a/src/test/regress/expected/gtt_function.out b/src/test/regress/expected/gtt_function.out index 42ae372966..255882cf55 100644 --- a/src/test/regress/expected/gtt_function.out +++ b/src/test/regress/expected/gtt_function.out @@ -46,8 +46,6 @@ select * from gtt6; -- ERROR create index CONCURRENTLY idx_gtt1 on gtt1 (b); -ERROR: PGXC does not support concurrent INDEX yet -DETAIL: The feature is not currently supported -- ERROR cluster gtt1 using gtt1_pkey; -- ERROR diff --git a/src/test/regress/expected/gtt_stats.out b/src/test/regress/expected/gtt_stats.out index 503a201afc..2f58c82754 100644 --- a/src/test/regress/expected/gtt_stats.out +++ b/src/test/regress/expected/gtt_stats.out @@ -60,12 +60,12 @@ select * from pg_gtt_stats order by tablename, attname; gtt_stats | gtt | a | | | | | | | | | | | gtt_stats | gtt | b | | | | | | | | | | | gtt_stats | gtt_pkey | a | | | | | | | | | | | - pg_toast | pg_toast_16386 | chunk_data | | | | | | | | | | | - pg_toast | pg_toast_16386 | chunk_id | | | | | | | | | | | - pg_toast | pg_toast_16386 | chunk_seq | | | | | | | | | | | - pg_toast | pg_toast_16386_index | chunk_id | | | | | | | | | | | - pg_toast | pg_toast_16386_index | chunk_seq | | | | | | | | | | | -(22 rows) +--? pg_toast | pg_toast_..... | chunk_data | | | | | | | | | | | +--? pg_toast | pg_toast_..... | chunk_id | | | | | | | | | | | +--? pg_toast | pg_toast_..... | chunk_seq | | | | | | | | | | | +--? pg_toast | pg_toast_....._index | chunk_id | | | | | | | | | | | +--? pg_toast | pg_toast_....._index | chunk_seq | | | | | | | | | | | +(8 rows) reindex table gtt; reindex index gtt_pkey; @@ -83,12 +83,12 @@ select * from pg_gtt_stats order by tablename, attname; gtt_stats | gtt | a | f | 0 | 4 | -1 | | | {1,100,200,300,400,500,600,700,800,900,1000,1100,1200,1300,1400,1500,1600,1700,1800,1900,2000,2100,2200,2300,2400,2500,2600,2700,2800,2900,3000,3100,3200,3300,3400,3500,3600,3700,3800,3900,4000,4100,4200,4300,4400,4500,4600,4700,4800,4900,5000,5100,5200,5300,5400,5500,5600,5700,5800,5900,6000,6100,6200,6300,6400,6500,6600,6700,6800,6900,7000,7100,7200,7300,7400,7500,7600,7700,7800,7900,8000,8100,8200,8300,8400,8500,8600,8700,8800,8900,9000,9100,9200,9300,9400,9500,9600,9700,9800,9900,10000} | 1 | | | gtt_stats | gtt | b | f | 0 | 5 | 1 | {test} | {1} | | 1 | | | gtt_stats | gtt_pkey | a | | | | | | | | | | | - pg_toast | pg_toast_16386 | chunk_data | | | | | | | | | | | - pg_toast | pg_toast_16386 | chunk_id | | | | | | | | | | | - pg_toast | pg_toast_16386 | chunk_seq | | | | | | | | | | | - pg_toast | pg_toast_16386_index | chunk_id | | | | | | | | | | | - pg_toast | pg_toast_16386_index | chunk_seq | | | | | | | | | | | -(22 rows) +--? pg_toast | pg_toast_..... | chunk_data | | | | | | | | | | | +--? pg_toast | pg_toast_..... | chunk_id | | | | | | | | | | | +--? pg_toast | pg_toast_..... | chunk_seq | | | | | | | | | | | +--? pg_toast | pg_toast_....._index | chunk_id | | | | | | | | | | | +--? pg_toast | pg_toast_....._index | chunk_seq | | | | | | | | | | | +(8 rows) reset search_path; drop schema gtt_stats cascade; -- Gitee