diff --git a/src/common/backend/utils/mmgr/mcxt.cpp b/src/common/backend/utils/mmgr/mcxt.cpp index 9de135b1200bfbae9522de7601dcbdffdd31f15a..e9a73f330dcadd149712aaa98ad0a122e1c5c5de 100644 --- a/src/common/backend/utils/mmgr/mcxt.cpp +++ b/src/common/backend/utils/mmgr/mcxt.cpp @@ -50,6 +50,7 @@ THR_LOCAL MemoryContext CurrentMemoryContext = NULL; MemoryContext AdioSharedContext = NULL; MemoryContext ProcSubXidCacheContext = NULL; MemoryContext PmTopMemoryContext = NULL; // save the topm of postmaster thread +MemoryContext DMSDrcContext = NULL; /* Shared memory context for stream thread connection info. */ MemoryContext StreamInfoContext = NULL; diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 7c172d5585781e53ceee4d8b3718bf87cd09de5e..2c7822ff44bac316cdac9256ac028bf361a1fe26 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1198,6 +1198,70 @@ static void CBMemReset(void *context) PG_END_TRY(); } +static void *CBDMSMemAlloc(size_t size) +{ + void *ptr = NULL; + uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; + if (t_thrd.dms_cxt.memContext == NULL) { + MemoryContextInit(); + t_thrd.dms_cxt.memContext = AllocSetContextCreate(t_thrd.top_mem_cxt, + "DMSMemContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } + MemoryContext old_cxt = MemoryContextSwitchTo(t_thrd.dms_cxt.memContext); + + PG_TRY(); + { + ptr = palloc(size); + if (ptr == NULL) { + ereport(FATAL, (errmsg("Failed to allocate memory for DMSMemContext."))); + } + } + PG_CATCH(); + { + t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount; + FlushErrorState(); + } + PG_END_TRY(); + (void)MemoryContextSwitchTo(old_cxt); + return ptr; +} + +static void CBDMSMemFree(void *pointer) +{ + uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; + + PG_TRY(); + { + pfree(pointer); + } + PG_CATCH(); + { + t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount; + FlushErrorState(); + } + PG_END_TRY(); +} + +static void *CBDrcMemAlloc(size_t size) +{ + void *ptr = NULL; + MemoryContext old_cxt = MemoryContextSwitchTo(DMSDrcContext); + ptr = palloc(size); + if (ptr == NULL) { + ereport(FATAL, (errmsg("Failed to allocate memory for DMSDrcContext."))); + } + (void)MemoryContextSwitchTo(old_cxt); + return ptr; +} + +static void CBDrcMemFree(void *pointer) +{ + pfree(pointer); +} + static int32 CBProcessLockAcquire(char *data, uint32 len) { if (unlikely(len != sizeof(SSBroadcastDDLLock))) { @@ -2397,6 +2461,12 @@ void DmsInitCallback(dms_callback_t *callback) callback->mem_free = CBMemFree; callback->mem_reset = CBMemReset; + callback->dms_malloc_prot = CBDMSMemAlloc; + callback->dms_free_prot = CBDMSMemFree; + + callback->drc_malloc_prot = CBDrcMemAlloc; + callback->drc_free_prot = CBDrcMemFree; + callback->get_page_lsn = CBGetPageLSN; callback->get_global_lsn = CBGetGlobalLSN; callback->log_flush = CBXLogFlush; diff --git a/src/gausskernel/ddes/adapter/ss_init.cpp b/src/gausskernel/ddes/adapter/ss_init.cpp index f0f4eb0627642d12af09d03fe124eaf083d6a38d..f6c1896aa6c0c0bb01e2d4a47b1a1a848e5a4451 100644 --- a/src/gausskernel/ddes/adapter/ss_init.cpp +++ b/src/gausskernel/ddes/adapter/ss_init.cpp @@ -442,6 +442,12 @@ static inline void DMSDfxStatReset(){ void DMSInit() { + MemoryContextInit(); + t_thrd.dms_cxt.memContext = AllocSetContextCreate(t_thrd.top_mem_cxt, + "DMSMemContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); if (ss_dms_func_init() != DMS_SUCCESS) { ereport(FATAL, (errmsg("failed to init dms library"))); } diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 766f84ce4b5a6d7a436af02a1b76b16434b09d7d..65b5dfe65bb9c2f8de922f4d06cd294082498bea 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -3102,6 +3102,12 @@ int PostmasterMain(int argc, char* argv[]) if (g_instance.attr.attr_storage.dms_attr.enable_dms) { /* need to initialize before STARTUP */ + DMSDrcContext = AllocSetContextCreate(g_instance.instance_context, + "DMSDrcContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE, + SHARED_CONTEXT); DMSInit(); if (!ENABLE_SS_BCAST_GETOLDESTXMIN) { g_instance.pid_cxt.DmsAuxiliaryPID = initialize_util_thread(DMS_AUXILIARY_THREAD); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 2b157acf5f9867d25f2be6d653cecff7df5ddf9a..0062d70389f0afc2430fd4b9581e48e8df3b3e51 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1742,6 +1742,7 @@ static void knl_t_sql_patch_init(knl_t_sql_patch_context* sql_patch_cxt) static void knl_t_dms_context_init(knl_t_dms_context *dms_cxt) { dms_cxt->msgContext = NULL; + dms_cxt->memContext = NULL; dms_cxt->buf_in_aio = false; dms_cxt->is_reform_proc = false; dms_cxt->CloseAllSessionsFailed = false; diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index 33feb6447c8a1e523c373b75d9f8a1cda076fd18..a628175c283aaf5222de4a0d4b6a8aca9f69d9c5 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -878,6 +878,8 @@ typedef void(*dms_leave_page)(void *db_handle, unsigned char changed, unsigned i typedef char *(*dms_mem_alloc)(void *context, unsigned int size); typedef void(*dms_mem_free)(void *context, void *ptr); typedef void(*dms_mem_reset)(void *context); +typedef void(*dms_drc_alloc)(size_t size); +typedef void(*dms_drc_free)(void *ptr); // The maximum length of output_msg is 128 bytes. typedef int (*dms_process_broadcast)(void *db_handle, dms_broadcast_context_t *broad_ctx); typedef int (*dms_process_broadcast_ack)(void *db_handle, dms_broadcast_context_t *broad_ctx); @@ -1098,6 +1100,9 @@ typedef struct st_dms_callback { dms_mem_free mem_free; dms_mem_reset mem_reset; + dms_drc_alloc drc_malloc_prot; + dms_drc_free drc_free_prot; + dms_process_broadcast process_broadcast; dms_process_broadcast_ack process_broadcast_ack; dms_get_txn_info get_txn_info; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 33664d6c325529bdb1593ed5ff476f570c134c3b..99359d8542f68742407147c33935d8b4d7897727 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3430,7 +3430,7 @@ typedef struct knl_t_publication_context { } knl_t_publication_context; typedef struct knl_t_dms_context { - MemoryContext msgContext; + MemoryContext memContext; bool buf_in_aio; bool is_reform_proc; bool CloseAllSessionsFailed; diff --git a/src/include/utils/memutils.h b/src/include/utils/memutils.h index 37f04e9d579c09ea0786c0fee18971f40cd79425..b3442fefe50b3d458e6d8f35c0cf8d167f4c1591 100644 --- a/src/include/utils/memutils.h +++ b/src/include/utils/memutils.h @@ -84,6 +84,7 @@ typedef struct StandardChunkHeader { // extern MemoryContext AdioSharedContext; extern MemoryContext ProcSubXidCacheContext; +extern MemoryContext DMSDrcContext; extern MemoryContext PmTopMemoryContext; extern MemoryContext StreamInfoContext;