diff --git a/src/common/backend/utils/fmgr/fmgr.cpp b/src/common/backend/utils/fmgr/fmgr.cpp index 0b6db1672be1296763bf80ed2cb6c1da9ca868ee..ffa29d4a59fcab8d0e3be304673ffa1fb2bed8f2 100644 --- a/src/common/backend/utils/fmgr/fmgr.cpp +++ b/src/common/backend/utils/fmgr/fmgr.cpp @@ -1105,6 +1105,10 @@ static Datum fmgr_security_definer(PG_FUNCTION_ARGS) volatile int save_nestlevel; PgStat_FunctionCallUsage fcusage; + /* Does not allow commit in pre setting scenary */ + bool savedisTopLevelForSTP = u_sess->SPI_cxt.is_toplevel_stp; + u_sess->SPI_cxt.is_toplevel_stp = false; + if (!fcinfo->flinfo->fn_extra) { HeapTuple tuple; Form_pg_proc procedureStruct; @@ -1216,6 +1220,9 @@ static Datum fmgr_security_definer(PG_FUNCTION_ARGS) (*fmgr_hook)(FHET_END, &(fcache->flinfo), &(fcache->arg)); } + /* restore is_toplevel_stp */ + u_sess->SPI_cxt.is_toplevel_stp = savedisTopLevelForSTP; + return result; } diff --git a/src/common/backend/utils/mmgr/portalmem.cpp b/src/common/backend/utils/mmgr/portalmem.cpp index d138056af27563d27fe86151bcc9d6b7f91ca770..ed2719dc1398a864d3bd63d5a2c20c4713a9b4b8 100755 --- a/src/common/backend/utils/mmgr/portalmem.cpp +++ b/src/common/backend/utils/mmgr/portalmem.cpp @@ -27,6 +27,8 @@ #include "utils/memutils.h" #include "utils/plpgsql.h" #include "utils/timestamp.h" +#include "utils/resowner.h" +#include "nodes/execnodes.h" #ifdef PGXC #include "pgxc/pgxc.h" @@ -594,6 +596,36 @@ void PortalHashTableDeleteAll(void) } } +/* + * "Hold" a portal. Prepare it for access by later transactions. + */ +static void HoldPortal(Portal portal) +{ + /* + * Note that PersistHoldablePortal() must release all resources + * used by the portal that are local to the creating transaction. + */ + PortalCreateHoldStore(portal); + PersistHoldablePortal(portal); + + /* drop cached plan reference, if any */ + PortalReleaseCachedPlan(portal); + + /* + * Any resources belonging to the portal will be released in the + * upcoming transaction-wide cleanup; the portal will no longer + * have its own resources. + */ + portal->resowner = NULL; + + /* + * Having successfully exported the holdable cursor, mark it as + * not belonging to this transaction. + */ + portal->createSubid = InvalidSubTransactionId; + portal->activeSubid = InvalidSubTransactionId; +} + /* * Pre-commit processing for portals. * @@ -606,7 +638,7 @@ void PortalHashTableDeleteAll(void) * Returns TRUE if any portals changed state (possibly causing user-defined * code to be run), FALSE if not. */ -bool PreCommit_Portals(bool isPrepare) +bool PreCommit_Portals(bool isPrepare, bool stpCommit) { bool result = false; HASH_SEQ_STATUS status; @@ -619,12 +651,14 @@ bool PreCommit_Portals(bool isPrepare) while ((hentry = (PortalHashEnt*)hash_seq_search(&status)) != NULL) { Portal portal = hentry->portal; + ResourceOwner owner = portal->resowner; /* * There should be no pinned portals anymore. Complain if someone - * leaked one. + * leaked one. Auto-held portals are allowed; we assume that whoever + * pinned them is managing them. */ - if (portal->portalPinned) + if (portal->portalPinned && !portal->autoHeld) ereport(ERROR, (errcode(ERRCODE_OPERATE_NOT_SUPPORTED), errmsg("cannot commit while a portal is pinned"))); /* @@ -635,7 +669,22 @@ bool PreCommit_Portals(bool isPrepare) * still going to go away, so don't leave a dangling pointer. */ if (portal->status == PORTAL_ACTIVE) { - portal->resowner = NULL; + /* + * If we are in multi commit and we also have owner, then need to cleanup all the snapshots + * during commit time. Otherwise it will cause leak snapshots reference warning. + */ + if (owner && stpCommit) { + ResourceOwnerDecrementNsnapshots(owner, portal->queryDesc); + } + + /* + * If we are in commit within stored procedure need to keep resowner and it will be used to + * connect new local resources. Otherwise it will cause leak snapshots reference warning, because + * the new snapshot does not have owner. + */ + if (!stpCommit) { + portal->resowner = NULL; + } continue; } @@ -655,28 +704,7 @@ bool PreCommit_Portals(bool isPrepare) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot PREPARE a transaction that has created a cursor WITH HOLD"))); - /* - * Note that PersistHoldablePortal() must release all resources - * used by the portal that are local to the creating transaction. - */ - PortalCreateHoldStore(portal); - PersistHoldablePortal(portal); - - /* drop cached plan reference, if any */ - PortalReleaseCachedPlan(portal); - - /* - * Any resources belonging to the portal will be released in the - * upcoming transaction-wide cleanup; the portal will no longer - * have its own resources. - */ - portal->resowner = NULL; - - /* - * Having successfully exported the holdable cursor, mark it as - * not belonging to this transaction. - */ - portal->createSubid = InvalidSubTransactionId; + HoldPortal(portal); /* Report we changed state */ result = true; @@ -709,13 +737,13 @@ bool PreCommit_Portals(bool isPrepare) /* * Abort processing for portals. * - * At this point we reset "active" status and run the cleanup hook if - * present, but we can't release the portal's memory until the cleanup call. + * At this point we run the cleanup hook if present, but we can't release the + * portal's memory until the cleanup call. * * The reason we need to reset active is so that we can replace the unnamed * portal, else we'll fail to execute ROLLBACK when it arrives. */ -void AtAbort_Portals(void) +void AtAbort_Portals(bool stpRollback) { HASH_SEQ_STATUS status; PortalHashEnt* hentry = NULL; @@ -728,16 +756,28 @@ void AtAbort_Portals(void) while ((hentry = (PortalHashEnt*)hash_seq_search(&status)) != NULL) { Portal portal = hentry->portal; - /* Any portal that was actually running has to be considered broken */ - if (portal->status == PORTAL_ACTIVE) - MarkPortalFailed(portal); - /* * Do nothing else to cursors held over from a previous transaction. */ if (portal->createSubid == InvalidSubTransactionId) continue; + /* + * Do nothing to auto-held cursors. This is similar to the case of a + * cursor from a previous transaction, but it could also be that the + * cursor was auto-held in this trasnaction, so it wants to live on. + */ + if (portal->autoHeld) + continue; + + /* + * Within multi rollback need to clean up snapshots before release + * its resource owner. + */ + if (portal->resowner && stpRollback) { + ResourceOwnerDecrementNsnapshots(portal->resowner, portal->queryDesc); + } + /* * If it was created in the current transaction, we can't do normal * shutdown on a READY portal either; it might refer to objects @@ -749,9 +789,10 @@ void AtAbort_Portals(void) /* * Allow portalcmds.c to clean up the state it knows about, if we - * haven't already. + * haven't already. Should haven't already. Should not clean up portal + * during multi commit and rollback. */ - if (PointerIsValid(portal->cleanup)) { + if (PointerIsValid(portal->cleanup) && !stpRollback) { (*portal->cleanup)(portal); portal->cleanup = NULL; } @@ -762,17 +803,24 @@ void AtAbort_Portals(void) /* * Any resources belonging to the portal will be released in the * upcoming transaction-wide cleanup; they will be gone before we run - * PortalDrop. + * PortalDrop. Can not reset resowner NULL if + * we are in stpRollback, because the Portal is still alive. If we set + * resowner to NULL it will cause leak snapshots reference error, because + * the new snaphosts does not have owner. */ - portal->resowner = NULL; + if (!stpRollback) { + portal->resowner = NULL; + } /* * Although we can't delete the portal data structure proper, we can * release any memory in subsidiary contexts, such as executor state. * The cleanup hook was the last thing that might have needed data - * there. + * there. But leave active portals alone. */ - MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); + if (portal->status != PORTAL_ACTIVE) { + MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); + } } } @@ -793,8 +841,18 @@ void AtCleanup_Portals(void) while ((hentry = (PortalHashEnt*)hash_seq_search(&status)) != NULL) { Portal portal = hentry->portal; - /* Do nothing to cursors held over from a previous transaction */ - if (portal->createSubid == InvalidSubTransactionId) { + /* + * Do not touch active portals --- this can only happen in the case of + * a multi-transaction command. + */ + if (portal->status == PORTAL_ACTIVE) + continue; + + /* + * Do nothing to cursors held over from a previous transaction or + * auto-held ones. + */ + if (portal->createSubid == InvalidSubTransactionId || portal->autoHeld) { Assert(portal->status != PORTAL_ACTIVE); Assert(portal->resowner == NULL); continue; @@ -820,6 +878,29 @@ void AtCleanup_Portals(void) } } +/* + * Potal-related cleanup when we return to the main loop on error. + * + * This is different from the cleanup at transaction abort. Auto-held portals + * are cleaned up on error but not on transaction abort. + */ +void PortalErrorCleanup(void) +{ + HASH_SEQ_STATUS status; + PortalHashEnt* hentry = NULL; + + hash_seq_init(&status, u_sess->exec_cxt.PortalHashTable); + + while ((hentry = (PortalHashEnt *)hash_seq_search(&status)) != NULL) { + Portal portal = hentry->portal; + + if (portal->autoHeld) { + portal->portalPinned = false; + PortalDrop(portal, false); + } + } +} + /* * Pre-subcommit processing for portals. * @@ -1132,3 +1213,55 @@ void ResetPortalCursor(SubTransactionId mySubid, Oid funOid, int funUseCount) ResetCursorOption(portal, true); } } + +/* + * Hold all pinned portals. + * + * When initialing a COMMIT or ROLLBACK insise a procedure, this must be + * called to protect internally-generated cursors from being dropped during + * the transaction shutdown. Currently, SPI calls this automatically; PLs + * that initiate COMMIT or ROLLBACK some other way are on the hook to do it + * themselves. (Note that we couldn't do this in, say, AtAbort_Portals + * because we need to run user-defined code while persisting a portal. + * It's too late to do that once transaction abort has started.) + * + * We protect such portals by converting them to held cursors. We mark them + * as "auto-held" so that exception exit knowns to clean them up. (In normal, + * non-exception code paths, the PL needs to clean such portals itself, since + * transaction end won't do it anymore; but that should be normal practice + * anyway.) + */ +void HoldPinnedPortals(void) +{ + HASH_SEQ_STATUS status; + PortalHashEnt* hentry = NULL; + + hash_seq_init(&status, u_sess->exec_cxt.PortalHashTable); + + while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { + Portal portal = hentry->portal; + + if (portal->portalPinned && !portal->autoHeld) { + /* + * Doing transaction control, especially abort, inside a cursor + * loop that is not read-only, for example using UPDATE + * ... RETURNING, has weird semantics issues. Also, this + * implementation wouldn't work, because such portals cannot be + * held. (The core grammer enforces that only SELECT statements + * can drive a cursor, but for example PL/pgSQL does not restrict + * it.) + */ + if (portal->strategy != PORTAL_ONE_SELECT) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot perform transaction commands inside a cursor loop that is not read-only"))); + + /* Verify it's in a suitable state to be held */ + if (portal->status != PORTAL_READY) + elog(ERROR, "pinned portal is not ready to be auto-held"); + + HoldPortal(portal); + portal->autoHeld = true; + } + } +} \ No newline at end of file diff --git a/src/common/backend/utils/resowner/resowner.cpp b/src/common/backend/utils/resowner/resowner.cpp index b73a80339d88c3edcb9a51000c741855dd90e84a..d4f52b899dab6899819d5db6e8e17104290efc68 100644 --- a/src/common/backend/utils/resowner/resowner.cpp +++ b/src/common/backend/utils/resowner/resowner.cpp @@ -431,6 +431,10 @@ void ResourceOwnerDelete(ResourceOwner owner) */ ResourceOwnerNewParent(owner, NULL); + if (owner == t_thrd.utils_cxt.StpSavedResourceOwner) { + return; + } + /* And free the object. */ if (owner->buffers) pfree(owner->buffers); @@ -469,6 +473,30 @@ ResourceOwner ResourceOwnerGetParent(ResourceOwner owner) return owner->parent; } +/* + * Fetch nextchild of a ResourceOwner (returns) + */ +ResourceOwner ResourceOwnerGetNextChild(ResourceOwner owner) +{ + return owner->nextchild; +} + +/* + * Fetch name of a ResourceOwner (should always has value) + */ +const char* ResourceOwnerGetName(ResourceOwner owner) +{ + return owner->name; +} + +/* + * Fetch firstchild of a ResourceOwner. + */ +ResourceOwner ResourceOwnerGetFirstChild(ResourceOwner owner) +{ + return owner->firstchild; +} + /* * Reassign a ResourceOwner to have a new parent */ @@ -1320,6 +1348,55 @@ void ResourceOwnerForgetSnapshot(ResourceOwner owner, const Snapshot snapshot) errmsg("snapshot is not owned by resource owner %s", owner->name))); } +/* + * This function is used to clean up the snapshots. + * It will be called by PreCommit_Portals and Abort_Portals. + */ +void ResourceOwnerDecrementNsnapshots(ResourceOwner owner, void* queryDesc) +{ + QueryDesc* queryDescTemp = (QueryDesc*)queryDesc; + + while (owner->nsnapshots > 0) { + if (queryDescTemp) { + /* + * check if owner's snapshot is same as queryDesc's snapshot, need to set queryDesc + * snapshot to null, because this function will clean up those snapshots. + */ + if (owner->snapshots[owner->nsnapshots - 1] == queryDescTemp->estate->es_snapshot) { + queryDescTemp->estate->es_snapshot = NULL; + } + + if (owner->snapshots[owner->nsnapshots - 1] == queryDescTemp->estate->es_crosscheck_snapshot) { + queryDescTemp->estate->es_crosscheck_snapshot = NULL; + } + + if (owner->snapshots[owner->nsnapshots - 1] == queryDescTemp->snapshot) { + queryDescTemp->snapshot = NULL; + } + + if (owner->snapshots[owner->nsnapshots - 1] == queryDescTemp->crosscheck_snapshot) { + queryDescTemp->crosscheck_snapshot = NULL; + } + } + + UnregisterSnapshotFromOwner(owner->snapshots[owner->nsnapshots - 1], owner); + } +} + +/* + * This function is used to clean up the cached plan. + * It will ba called by CommitTransaction. + */ +void ResourceOwnerDecrementNPlanRefs(ResourceOwner owner, bool useResOwner) +{ + if (!owner) { + return; + } + + while (owner->nplanrefs > 0) { + ReleaseCachedPlan(owner->planrefs[owner->nplanrefs - 1], useResOwner); + } +} /* * Debugging subroutine */ diff --git a/src/common/backend/utils/time/snapmgr.cpp b/src/common/backend/utils/time/snapmgr.cpp index 7ce814bd43944f060e9e76411ac1a24afc6549a8..b7d53dad2e6f3b08d42e28c812624390de12e3a4 100755 --- a/src/common/backend/utils/time/snapmgr.cpp +++ b/src/common/backend/utils/time/snapmgr.cpp @@ -534,6 +534,14 @@ void UpdateActiveSnapshotCommandId(void) */ void PopActiveSnapshot(void) { + /* + * In multi commit/rollback within stored procedure, the ActiveSnapshot already poped. + * Therefore, no need to pop the active snapshot. Otherwise it will cause seg fault. + */ + if (!u_sess->utils_cxt.ActiveSnapshot) { + return; + } + ActiveSnapshotElt* newstack = NULL; newstack = u_sess->utils_cxt.ActiveSnapshot->as_next; diff --git a/src/common/pl/plpgsql/src/pl_exec.cpp b/src/common/pl/plpgsql/src/pl_exec.cpp index 8d03a997c90ce76f58d1de15ca060091449299bb..a18b898282d01598128f70a282d30ea513e83d86 100755 --- a/src/common/pl/plpgsql/src/pl_exec.cpp +++ b/src/common/pl/plpgsql/src/pl_exec.cpp @@ -130,6 +130,8 @@ static bool is_anonymous_block(const char* query); static int exec_stmt_dynfors(PLpgSQL_execstate* estate, PLpgSQL_stmt_dynfors* stmt); static void plpgsql_estate_setup(PLpgSQL_execstate* estate, PLpgSQL_function* func, ReturnSetInfo* rsi); +static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt); +static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback* stmt); static void exec_eval_cleanup(PLpgSQL_execstate* estate); static void exec_prepare_plan(PLpgSQL_execstate* estate, PLpgSQL_expr* expr, int cursorOptions); @@ -291,6 +293,7 @@ Datum plpgsql_exec_function(PLpgSQL_function* func, FunctionCallInfo fcinfo, boo { PLpgSQL_execstate estate; ErrorContextCallback plerrcontext; + bool savedIsStp; int i; int rc; @@ -412,7 +415,9 @@ Datum plpgsql_exec_function(PLpgSQL_function* func, FunctionCallInfo fcinfo, boo */ estate.err_text = NULL; estate.err_stmt = (PLpgSQL_stmt*)(func->action); + savedIsStp = u_sess->SPI_cxt.is_stp; rc = exec_stmt_block(&estate, func->action); + u_sess->SPI_cxt.is_stp = savedIsStp; if (rc != PLPGSQL_RC_RETURN) { estate.err_stmt = NULL; estate.err_text = NULL; @@ -633,6 +638,7 @@ HeapTuple plpgsql_exec_trigger(PLpgSQL_function* func, TriggerData* trigdata) PLpgSQL_rec *rec_new = NULL; PLpgSQL_rec *rec_old = NULL; HeapTuple rettup; + bool saveIsStp; /* * Setup the execution state @@ -842,7 +848,10 @@ HeapTuple plpgsql_exec_trigger(PLpgSQL_function* func, TriggerData* trigdata) */ estate.err_text = NULL; estate.err_stmt = (PLpgSQL_stmt*)(func->action); + saveIsStp = u_sess->SPI_cxt.is_stp; + u_sess->SPI_cxt.is_stp = false; rc = exec_stmt_block(&estate, func->action); + u_sess->SPI_cxt.is_stp = saveIsStp; if (rc != PLPGSQL_RC_RETURN) { estate.err_stmt = NULL; estate.err_text = NULL; @@ -1399,6 +1408,10 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block) int i; int n; SubTransactionId subXid = InvalidSubTransactionId; + bool savedIsTopLevelForStp = u_sess->SPI_cxt.is_toplevel_stp; + bool savedIsStp = u_sess->SPI_cxt.is_stp; + TransactionId oldTransactionId = InvalidTransactionId; + /* * First initialize all variables declared in this block */ @@ -1477,13 +1490,17 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block) } if (block->exceptions != NULL) { + u_sess->SPI_cxt.portal_stp_exception_counter++; + /* * Execute the statements in the block's body inside a sub-transaction */ MemoryContext oldcontext = CurrentMemoryContext; ResourceOwner oldowner = t_thrd.utils_cxt.CurrentResourceOwner; - ExprContext* old_eval_econtext = estate->eval_econtext; ErrorData* save_cur_error = estate->cur_error; + if (!RecoveryInProgress()) { + oldTransactionId = GetTopTransactionId(); + } estate->err_text = gettext_noop("during statement block entry"); @@ -1539,13 +1556,25 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block) } MemoryContextSwitchTo(oldcontext); + if ((!RecoveryInProgress()) && (oldTransactionId != GetTopTransactionId())) { + if (ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner)) { + oldowner = ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner); + } else { + if (ResourceOwnerGetParent(t_thrd.utils_cxt.CurrentResourceOwner)) { + oldowner = ResourceOwnerGetParent(t_thrd.utils_cxt.CurrentResourceOwner); + } else { + oldowner = ResourceOwnerGetFirstChild(t_thrd.utils_cxt.CurrentResourceOwner); + } + } + } t_thrd.utils_cxt.CurrentResourceOwner = oldowner; + u_sess->SPI_cxt.portal_stp_exception_counter--; /* * Revert to outer eval_econtext. (The inner one was * automatically cleaned up during subxact exit.) */ - estate->eval_econtext = old_eval_econtext; + estate->eval_econtext = u_sess->plsql_cxt.simple_econtext_stack->stack_econtext; /* * AtEOSubXact_SPI() should not have popped any SPI context, but @@ -1557,6 +1586,10 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block) { ErrorData* edata = NULL; ListCell* e = NULL; + + u_sess->SPI_cxt.is_toplevel_stp = savedIsTopLevelForStp; + u_sess->SPI_cxt.is_stp = savedIsStp; + estate->cursor_return_data = saved_cursor_data; /* gs_signal_handle maybe block sigusr2 when accept SIGINT */ @@ -1601,10 +1634,22 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block) } MemoryContextSwitchTo(oldcontext); + if ((!RecoveryInProgress()) && (oldTransactionId != GetTopTransactionId())) { + if (ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner)) { + oldowner = ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner); + } else { + if (ResourceOwnerGetParent(t_thrd.utils_cxt.CurrentResourceOwner)) { + oldowner = ResourceOwnerGetParent(t_thrd.utils_cxt.CurrentResourceOwner); + } else { + oldowner = ResourceOwnerGetFirstChild(t_thrd.utils_cxt.CurrentResourceOwner); + } + } + } t_thrd.utils_cxt.CurrentResourceOwner = oldowner; /* Revert to outer eval_econtext */ - estate->eval_econtext = old_eval_econtext; + estate->eval_econtext = u_sess->plsql_cxt.simple_econtext_stack->stack_econtext; + u_sess->SPI_cxt.portal_stp_exception_counter--; /* * If AtEOSubXact_SPI() popped any SPI context of the subxact, it @@ -2061,14 +2106,28 @@ static int exec_stmt_assign(PLpgSQL_execstate* estate, PLpgSQL_stmt_assign* stmt static int exec_stmt_perform(PLpgSQL_execstate* estate, PLpgSQL_stmt_perform* stmt) { PLpgSQL_expr* expr = stmt->expr; + TransactionId oldTransactionId = InvalidTransactionId; int rc; + if (!RecoveryInProgress()) { + oldTransactionId = GetTopTransactionId(); + } + rc = exec_run_select(estate, expr, 0, NULL); if (rc != SPI_OK_SELECT) { ereport(DEBUG1, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmodule(MOD_PLSQL), errmsg("exec_run_select returns %d", rc))); } + /* + * This is used for nested STP. If the transaction Id changed, + * then need to create new econtext for the TopTransaction. + */ + if ((!RecoveryInProgress()) && (oldTransactionId != GetTopTransactionId())) { + u_sess->plsql_cxt.simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + } + exec_set_found(estate, (estate->eval_processed != 0)); exec_eval_cleanup(estate); @@ -3695,7 +3754,7 @@ static void exec_eval_cleanup(PLpgSQL_execstate* estate) estate->eval_tuptable = NULL; /* Clear result of exec_eval_simple_expr (but keep the econtext) */ - if (estate->eval_econtext != NULL) { + if (estate->eval_econtext != NULL && estate->eval_econtext->ecxt_per_tuple_memory != NULL) { ResetExprContext(estate->eval_econtext); } } @@ -3762,6 +3821,11 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st int rc; PLpgSQL_expr* expr = stmt->sqlstmt; Cursor_Data* saved_cursor_data = NULL; + TransactionId oldTransactionId = InvalidTransactionId; + + if (!RecoveryInProgress()) { + oldTransactionId = GetTopTransactionId(); + } /* * On the first call for this statement generate the plan, and detect @@ -3836,6 +3900,16 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st * Execute the plan */ rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI, estate->readonly_func, tcount); + + /* + * This is used for nested STP. If the transaction Id changed, + * then need to create new econtext for the TopTransaction. + */ + if ((!RecoveryInProgress()) && (oldTransactionId != GetTopTransactionId())) { + u_sess->plsql_cxt.simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + } + plpgsql_estate = NULL; /* @@ -4179,6 +4253,11 @@ static int exec_stmt_dynexecute(PLpgSQL_execstate* estate, PLpgSQL_stmt_dynexecu FmgrInfo flinfo; int ppdindex = 0; int datumindex = 0; + TransactionId oldTransactionId = InvalidTransactionId; + + if (!RecoveryInProgress()) { + oldTransactionId = GetTopTransactionId(); + } /* Compile the anonymous code block */ /* support pass external parameter in anonymous block */ @@ -4207,6 +4286,16 @@ static int exec_stmt_dynexecute(PLpgSQL_execstate* estate, PLpgSQL_stmt_dynexecu flinfo.fn_mcxt = CurrentMemoryContext; (void)plpgsql_exec_function(func, &fake_fcinfo, true); + + /* + * This is used for nested STP. If the transaction Id changed, + * then need to create new econtext for the TopTransaction. + */ + if ((!RecoveryInProgress()) && (oldTransactionId != GetTopTransactionId())) { + u_sess->plsql_cxt.simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + } + exec_set_sql_isopen(estate, false); exec_set_sql_cursor_found(estate, PLPGSQL_TRUE); exec_set_sql_notfound(estate, PLPGSQL_FALSE); @@ -4895,6 +4984,58 @@ static int exec_stmt_null(PLpgSQL_execstate* estate, PLpgSQL_stmt* stmt) */ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt) { + const char* PORTAL = "Portal"; + int subTransactionCount = u_sess->SPI_cxt.portal_stp_exception_counter; + + if (u_sess->SPI_cxt.portal_stp_exception_counter == 0) { + t_thrd.utils_cxt.StpSavedResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner; + } + + if (strcmp(PORTAL, ResourceOwnerGetName(t_thrd.utils_cxt.CurrentResourceOwner)) == 0) { + if (ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner) + && (strcmp(PORTAL, ResourceOwnerGetName(ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner))) == 0)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("commit with PE is not supported"))); + } + + SPI_commit(); + SPI_start_transaction(); + + u_sess->plsql_cxt.simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + + /* link portal to new TopTransaction */ + ResourceOwnerNewParent(t_thrd.utils_cxt.StpSavedResourceOwner, t_thrd.utils_cxt.CurrentResourceOwner); + t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.utils_cxt.StpSavedResourceOwner; + + while (subTransactionCount > 0) { + if (u_sess->SPI_cxt.portal_stp_exception_counter > 0) { + MemoryContext oldcontext = CurrentMemoryContext; + + estate->err_text = gettext_noop("during statement block entry"); + + /* CN should send savesopint command to remote nodes to begin sub transaction remotely */ + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { + pgxc_node_remote_savepoint("Savepoint s1", EXEC_ON_ALL_NODES, true, true); + } + + BeginInternalSubTransaction(NULL); + + /* Want to run statements inside function's memory context */ + MemoryContextSwitchTo(oldcontext); + + plpgsql_create_econtext(estate); + estate->err_text = NULL; + } + + subTransactionCount--; + } + + if (u_sess->SPI_cxt.portal_stp_exception_counter == 0) { + t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.utils_cxt.StpSavedResourceOwner; + } + return PLPGSQL_RC_OK; } @@ -4905,6 +5046,58 @@ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt */ static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback* stmt) { + const char* PORTAL = "Portal"; + int subTransactionCount = u_sess->SPI_cxt.portal_stp_exception_counter; + + if (u_sess->SPI_cxt.portal_stp_exception_counter == 0) { + t_thrd.utils_cxt.StpSavedResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner; + } + + if (strcmp(PORTAL, ResourceOwnerGetName(t_thrd.utils_cxt.CurrentResourceOwner)) == 0) { + if (ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner) + && (strcmp(PORTAL, ResourceOwnerGetName(ResourceOwnerGetNextChild(t_thrd.utils_cxt.CurrentResourceOwner))) == 0)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("commit with PE is not supported"))); + } + + SPI_rollback(); + SPI_start_transaction(); + + u_sess->plsql_cxt.simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + + /* link portal to new TopTransaction */ + ResourceOwnerNewParent(t_thrd.utils_cxt.StpSavedResourceOwner, t_thrd.utils_cxt.CurrentResourceOwner); + t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.utils_cxt.StpSavedResourceOwner; + + while (subTransactionCount > 0) { + if (u_sess->SPI_cxt.portal_stp_exception_counter > 0) { + MemoryContext oldcontext = CurrentMemoryContext; + + estate->err_text = gettext_noop("during statement block entry"); + + /* CN should send savesopint command to remote nodes to begin sub transaction remotely */ + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { + pgxc_node_remote_savepoint("Savepoint s1", EXEC_ON_ALL_NODES, true, true); + } + + BeginInternalSubTransaction(NULL); + + /* Want to run statements inside function's memory context */ + MemoryContextSwitchTo(oldcontext); + + plpgsql_create_econtext(estate); + estate->err_text = NULL; + } + + subTransactionCount--; + } + + if (u_sess->SPI_cxt.portal_stp_exception_counter == 0) { + t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.utils_cxt.StpSavedResourceOwner; + } + return PLPGSQL_RC_OK; } @@ -7555,26 +7748,21 @@ static void plpgsql_destroy_econtext(PLpgSQL_execstate* estate) */ void plpgsql_xact_cb(XactEvent event, void* arg) { + u_sess->plsql_cxt.simple_eval_estate = NULL; + /* * If we are doing a clean transaction shutdown, free the EState (so that * any remaining resources will be released correctly). In an abort, we * expect the regular abort recovery procedures to release everything of * interest. */ - if (event == XACT_EVENT_PREROLLBACK_CLEANUP) { - return; - } else if (event != XACT_EVENT_ABORT) { - /* Shouldn't be any econtext stack entries left at commit */ - AssertEreport(u_sess->plsql_cxt.simple_econtext_stack == NULL, - MOD_PLSQL, - "Shouldn't be any econtext stack entries left at commit"); - + u_sess->plsql_cxt.simple_econtext_stack = NULL; + if (event != XACT_EVENT_ABORT) { if (u_sess->plsql_cxt.simple_eval_estate) { FreeExecutorState(u_sess->plsql_cxt.simple_eval_estate); } u_sess->plsql_cxt.simple_eval_estate = NULL; } else { - u_sess->plsql_cxt.simple_econtext_stack = NULL; u_sess->plsql_cxt.simple_eval_estate = NULL; } } diff --git a/src/common/pl/plpgsql/src/pl_handler.cpp b/src/common/pl/plpgsql/src/pl_handler.cpp index 70f19e192ead8cdef5d44bd0bb4818528ccca856..5fbaec2d6e3c6546ec73c9e5cc49489f688317d4 100644 --- a/src/common/pl/plpgsql/src/pl_handler.cpp +++ b/src/common/pl/plpgsql/src/pl_handler.cpp @@ -179,6 +179,7 @@ Datum plpgsql_call_handler(PG_FUNCTION_ARGS) // PGSTAT_INIT_PLSQL_TIME_RECORD int64 startTime = 0; bool needRecord = false; + bool nonatomic = false; #ifdef STREAMPLAN bool outer_is_stream = false; bool outer_is_stream_support = false; @@ -193,11 +194,20 @@ Datum plpgsql_call_handler(PG_FUNCTION_ARGS) } #endif + /* + * If the atomic stored in fcinfo is false means allow + * commit/rollback within stord procedure. + * set the noatomic and will be reused within function. + */ + nonatomic = fcinfo->context && + IsA(fcinfo->context, FunctionScanState) && + !castNode(FunctionScanState, fcinfo->context)->atomic; + _PG_init(); /* * Connect to SPI manager */ - if ((rc = SPI_connect()) != SPI_OK_CONNECT) { + if ((rc = SPI_connect_ext(DestSPI, NULL, NULL, nonatomic ? SPI_OPT_NOATOMIC : 0)) != SPI_OK_CONNECT) { ereport(ERROR, (errmodule(MOD_PLSQL), errcode(ERRCODE_UNDEFINED_OBJECT), @@ -338,7 +348,7 @@ Datum plpgsql_inline_handler(PG_FUNCTION_ARGS) /* * Connect to SPI manager */ - if ((rc = SPI_connect()) != SPI_OK_CONNECT) { + if ((rc = SPI_connect_ext(DestSPI, NULL, NULL, codeblock->atomic ? 0 : SPI_OPT_NOATOMIC)) != SPI_OK_CONNECT) { ereport(ERROR, (errmodule(MOD_PLSQL), errcode(ERRCODE_SPI_CONNECTION_FAILURE), diff --git a/src/gausskernel/optimizer/commands/functioncmds.cpp b/src/gausskernel/optimizer/commands/functioncmds.cpp index 5b2b3cb982af27bc225382e3f9b5c281538cfe0d..5f37e8ab431e0b16738e286954c597e9305bd736 100755 --- a/src/gausskernel/optimizer/commands/functioncmds.cpp +++ b/src/gausskernel/optimizer/commands/functioncmds.cpp @@ -2133,7 +2133,7 @@ Oid AlterFunctionNamespace_oid(Oid procOid, Oid nspOid) * ExecuteDoStmt * Execute inline procedural-language code */ -void ExecuteDoStmt(const DoStmt* stmt) +void ExecuteDoStmt(const DoStmt* stmt, bool atomic) { InlineCodeBlock* codeblock = makeNode(InlineCodeBlock); ListCell* arg = NULL; @@ -2200,6 +2200,7 @@ void ExecuteDoStmt(const DoStmt* stmt) /* get the handler function's OID */ laninline = languageStruct->laninline; + codeblock->atomic = atomic; if (!OidIsValid(laninline)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index 55a04c5c591f3a187cd1df2343238edf1d8f08c1..c6892eb451fc2918aa981c5ceb7ff74d69567613 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -44,6 +44,7 @@ #include "catalog/pg_authid.h" #include "commands/async.h" #include "commands/prepare.h" +#include "executor/spi.h" #include "commands/user.h" #include "commands/vacuum.h" #ifdef PGXC @@ -93,6 +94,7 @@ #include "mb/pg_wchar.h" #include "pgaudit.h" #include "auditfuncs.h" +#include "funcapi.h" #ifdef PGXC #include "storage/procarray.h" #include "pgxc/pgxc.h" @@ -2039,6 +2041,7 @@ static void exec_simple_query(const char* query_string, MessageType messageType, * significant to PreventTransactionChain.) */ isTopLevel = (list_length(parsetree_list) == 1); + u_sess->SPI_cxt.is_toplevel_stp = isTopLevel; if (isTopLevel != 1) t_thrd.explain_cxt.explain_perf_mode = EXPLAIN_NORMAL; @@ -2478,6 +2481,12 @@ static void exec_simple_query(const char* query_string, MessageType messageType, MemoryContextDelete(OptimizerContext); + /* Reset store procedure's session variables. */ + u_sess->SPI_cxt.is_toplevel_stp = false; + u_sess->SPI_cxt.is_stp = true; + u_sess->SPI_cxt.is_proconfig_set = false; + u_sess->SPI_cxt.portal_stp_exception_counter = 0; + /* * Close down transaction statement, if one is open. */ @@ -4331,6 +4340,7 @@ static void exec_execute_message(const char* portal_name, long max_rows) bool execute_is_fetch = false; bool was_logged = false; char msec_str[32]; + bool savedIsTopLevelForSTP = false; gstrace_entry(GS_TRC_ID_exec_execute_message); /* Adjust destination to tell printtup.c what to do */ @@ -4469,6 +4479,9 @@ static void exec_execute_message(const char* portal_name, long max_rows) /* Check for cancel signal before we start execution */ CHECK_FOR_INTERRUPTS(); + savedIsTopLevelForSTP = u_sess->SPI_cxt.is_toplevel_stp; + u_sess->SPI_cxt.is_toplevel_stp = true; + /* * Okay to run the portal. */ @@ -4482,6 +4495,7 @@ static void exec_execute_message(const char* portal_name, long max_rows) receiver, completionTag); + u_sess->SPI_cxt.is_toplevel_stp = savedIsTopLevelForSTP; (*receiver->rDestroy)(receiver); if (completed) { @@ -7371,6 +7385,10 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam if (sigsetjmp(local_sigjmp_buf, 1) != 0) { gstrace_tryblock_exit(true, oldTryCounter); + u_sess->SPI_cxt.is_stp = true; + u_sess->SPI_cxt.is_proconfig_set = false; + u_sess->SPI_cxt.portal_stp_exception_counter = 0; + (void)pgstat_report_waitstatus(STATE_WAIT_UNDEFINED); t_thrd.pgxc_cxt.GlobalNetInstr = NULL; /* output the memory tracking information when error happened */ @@ -7494,6 +7512,9 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam */ AbortCurrentTransaction(); + PortalErrorCleanup(); + SPICleanup(); + /* Notice: at the most time it isn't necessary to call because * all the LWLocks are released in AbortCurrentTransaction(). * but in some rare exception not in one transaction (for diff --git a/src/gausskernel/process/tcop/utility.cpp b/src/gausskernel/process/tcop/utility.cpp index 7cde3454510f9b1ae26ac3bbca1d02784b24095a..fd4a26241c6ac94bf5769b8f8d28fe6755e1326a 100644 --- a/src/gausskernel/process/tcop/utility.cpp +++ b/src/gausskernel/process/tcop/utility.cpp @@ -4425,7 +4425,7 @@ void standard_ProcessUtility(Node* parse_tree, const char* query_string, ParamLi break; case T_DoStmt: - ExecuteDoStmt((DoStmt*)parse_tree); + ExecuteDoStmt((DoStmt*)parse_tree, (!u_sess->SPI_cxt.is_toplevel_stp || IsTransactionBlock())); break; case T_CreatedbStmt: diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 84dbdaf163319146781e352ff686946e19e056ed..b2cb2cb4ce38de3766cdebee0472518ff550b211 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -239,6 +239,10 @@ static void knl_u_SPI_init(knl_u_SPI_context* spi) spi->_curid = -1; spi->_stack = NULL; spi->_current = NULL; + spi->is_toplevel_stp = false; + spi->is_stp = true; + spi->is_proconfig_set = false; + spi->portal_stp_exception_counter = 0; } static void knl_u_trigger_init(knl_u_trigger_context* tri_cxt) diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index a93216504cfbd151ce81cb6b6e93c36bdc6a7917..125cab28d05ecf55987af45d1bd904a5552ef459 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -845,6 +845,7 @@ static void knl_t_utils_init(knl_t_utils_context* utils_cxt) utils_cxt->CurrentResourceOwner = NULL; utils_cxt->CurTransactionResourceOwner = NULL; utils_cxt->TopTransactionResourceOwner = NULL; + utils_cxt->StpSavedResourceOwner = NULL; utils_cxt->ResourceRelease_callbacks = NULL; utils_cxt->SortColumnOptimize = false; utils_cxt->pRelatedRel = NULL; diff --git a/src/gausskernel/runtime/executor/execQual.cpp b/src/gausskernel/runtime/executor/execQual.cpp index 40df8c80fc487399732e029254e70143306f9d88..919cbc7f08e78dc4bf6753bb63f77e21b4f23c34 100755 --- a/src/gausskernel/runtime/executor/execQual.cpp +++ b/src/gausskernel/runtime/executor/execQual.cpp @@ -2042,6 +2042,52 @@ static Datum ExecMakeFunctionResultNoSets( PgStat_FunctionCallUsage fcusage; int i; int* var_dno = NULL; + FunctionScanState* node = NULL; + HeapTuple tup; + FuncExpr* fexpr = NULL; + bool savedIsStp = u_sess->SPI_cxt.is_stp; + bool savedProConfigIsSet = u_sess->SPI_cxt.is_proconfig_set; + bool proIsProcedure = false; + bool supportTransaction = false; + +#ifdef ENABLE_MULTIPLE_NODES + if (IS_PGXC_COORDINATOR) { + supportTransaction = true; + } +#else + supportTransaction = true; +#endif + + if (supportTransaction && IsA(fcache->xprstate.expr, FuncExpr)) { + fexpr = (FuncExpr*)(fcache->xprstate.expr); + node = makeNode(FunctionScanState); + node->atomic = (!u_sess->SPI_cxt.is_toplevel_stp || IsTransactionBlock()); + + /* + * If proconfig is set we can't allow transaction commands because of the + * way the GUC stacking works. The transaction boundary would have to pop + * the proconfig setting off the stack. That restriction could be lefted + * by redesigning the GUC nesting mechanism a bit. + */ + Relation relation = heap_open(ProcedureRelationId, AccessShareLock); + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(fexpr->funcid)); + if (!HeapTupleIsValid(tup)) { + elog(ERROR, "cache lookup failed for function %u", fexpr->funcid); + } + + if (!heap_attisnull(tup, Anum_pg_proc_proconfig, NULL) || u_sess->SPI_cxt.is_proconfig_set) { + u_sess->SPI_cxt.is_proconfig_set = true; + node->atomic = true; + } + + proIsProcedure = PROC_IS_PRO(((Form_pg_proc)GETSTRUCT(tup))->prokind); + + heap_close(relation, AccessShareLock); + + /* If proisprocedure is true means it was a stored procedure. */ + u_sess->SPI_cxt.is_stp = savedIsStp && proIsProcedure; + ReleaseSysCache(tup); + } /* Guard against stack overflow due to overly complex expressions */ check_stack_depth(); @@ -2058,6 +2104,10 @@ static Datum ExecMakeFunctionResultNoSets( /* init the number of arguments to a function*/ InitFunctionCallInfoArgs(*fcinfo, list_length(fcache->args), 1); + if (supportTransaction) { + fcinfo->context = (Node*)node; + } + if (has_cursor_return) { /* init returnCursor to store out-args cursor info on ExprContext*/ fcinfo->refcursor_data.returnCursor = @@ -2152,6 +2202,9 @@ static Datum ExecMakeFunctionResultNoSets( pfree_ext(var_dno); } + u_sess->SPI_cxt.is_stp = savedIsStp; + u_sess->SPI_cxt.is_proconfig_set = savedProConfigIsSet; + return result; } @@ -2234,9 +2287,53 @@ Tuplestorestate* ExecMakeTableFunctionResult( bool first_time = true; int* var_dno = NULL; bool has_refcursor = false; + HeapTuple tup; + FuncExpr* fexpr = NULL; + bool savedIsStp = u_sess->SPI_cxt.is_stp; + bool savedProConfigIsSet = u_sess->SPI_cxt.is_proconfig_set; + bool proIsProcedure = false; + bool supportTransaction = false; callerContext = CurrentMemoryContext; +#ifdef ENABLE_MULTIPLE_NODES + if (IS_PGXC_COORDINATOR) { + supportTransaction = true; + } +#else + supportTransaction = true; +#endif + + if (supportTransaction && IsA(funcexpr->expr, FuncExpr)) { + fexpr = (FuncExpr*)(funcexpr->expr); + node->atomic = (!u_sess->SPI_cxt.is_toplevel_stp || IsTransactionBlock()); + + /* + * If proconfig is set we can't allow transaction commands because of the + * way the GUC stacking works. The transaction boundary would have to pop + * the proconfig setting off the stack. That restriction could be lefted + * by redesigning the GUC nesting mechanism a bit. + */ + Relation relation = heap_open(ProcedureRelationId, AccessShareLock); + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(fexpr->funcid)); + if (!HeapTupleIsValid(tup)) { + elog(ERROR, "cache lookup failed for function %u", fexpr->funcid); + } + + if (!heap_attisnull(tup, Anum_pg_proc_proconfig, NULL) || u_sess->SPI_cxt.is_proconfig_set) { + u_sess->SPI_cxt.is_proconfig_set = true; + node->atomic = true; + } + + proIsProcedure = PROC_IS_PRO(((Form_pg_proc)GETSTRUCT(tup))->prokind); + + heap_close(relation, AccessShareLock); + + /* If proisprocedure is true means it was a stored procedure. */ + u_sess->SPI_cxt.is_stp = savedIsStp && proIsProcedure; + ReleaseSysCache(tup); + } + if (unlikely(funcexpr == NULL)) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("The input function expression is NULL."))); } @@ -2351,7 +2448,11 @@ Tuplestorestate* ExecMakeTableFunctionResult( } else { /* Treat funcexpr as a generic expression */ direct_function_call = false; - InitFunctionCallInfoData(fcinfo, NULL, 0, InvalidOid, NULL, NULL); + if (supportTransaction) { + InitFunctionCallInfoData(fcinfo, NULL, 0, InvalidOid, (Node*)node, NULL); + } else { + InitFunctionCallInfoData(fcinfo, NULL, 0, InvalidOid, NULL, NULL); + } } /* @@ -2592,6 +2693,9 @@ no_function_result: pfree_ext(var_dno); } + u_sess->SPI_cxt.is_stp = savedIsStp; + u_sess->SPI_cxt.is_proconfig_set = savedProConfigIsSet; + /* All done, pass back the tuplestore */ return rsinfo.setResult; } diff --git a/src/gausskernel/runtime/executor/spi.cpp b/src/gausskernel/runtime/executor/spi.cpp index 821887964b0d529b1c5a8bea73bae2f75e27c427..66c7b89ac5480f7eed50610e01e97a1277af219f 100755 --- a/src/gausskernel/runtime/executor/spi.cpp +++ b/src/gausskernel/runtime/executor/spi.cpp @@ -77,6 +77,11 @@ static void CopySPI_Plan(SPIPlanPtr newplan, SPIPlanPtr plan, MemoryContext plan /* =================== interface functions =================== */ int SPI_connect(CommandDest dest, void (*spiCallbackfn)(void *), void *clientData) +{ + return SPI_connect_ext(dest, spiCallbackfn, clientData, 0); +} + +int SPI_connect_ext(CommandDest dest, void (*spiCallbackfn)(void *), void *clientData, int options) { int new_depth; /* @@ -93,8 +98,12 @@ int SPI_connect(CommandDest dest, void (*spiCallbackfn)(void *), void *clientDat u_sess->SPI_cxt._connected != -1 ? "init level is not -1." : "stack depth is not zero."))); } new_depth = 16; + /* + * Need TopMemoryContext because commit is allowed in stored procedure and it will clear all memory + * context from TopTransaction. Therefor,need to use TopMemoryContext. + */ u_sess->SPI_cxt._stack = - (_SPI_connection *)MemoryContextAlloc(u_sess->top_transaction_mem_cxt, new_depth * sizeof(_SPI_connection)); + (_SPI_connection *)MemoryContextAlloc(u_sess->top_mem_cxt, new_depth * sizeof(_SPI_connection)); u_sess->SPI_cxt._stack_depth = new_depth; } else { if (u_sess->SPI_cxt._stack_depth <= 0 || u_sess->SPI_cxt._stack_depth <= u_sess->SPI_cxt._connected) { @@ -125,18 +134,28 @@ int SPI_connect(CommandDest dest, void (*spiCallbackfn)(void *), void *clientDat u_sess->SPI_cxt._current->dest = dest; u_sess->SPI_cxt._current->spiCallback = (void (*)(void *))spiCallbackfn; u_sess->SPI_cxt._current->clientData = clientData; + u_sess->SPI_cxt._current->atomic = (options & SPI_OPT_NOATOMIC) ? false : true; + u_sess->SPI_cxt._current->internal_xact = false; /* * Create memory contexts for this procedure + * + * In atomic contexts (the normal case), we use TopTransactionContext, + * otherwise PortalContext, so that it lives across transaction + * boundaries. * - * XXX it would be better to use t_thrd.mem_cxt.portal_mem_cxt as the parent context, but - * we may not be inside a portal (consider deferred-trigger execution). - * Perhaps t_thrd.mem_cxt.cur_transaction_mem_cxt would do? For now it doesn't matter - * because we clean up explicitly in AtEOSubXact_SPI(). + * XXX it would be better to use PortalContext as the parent context in + * all cases, but we may not be inside a portal (consider deferred-trigger + * execution). Perhaps CurTransactionContext could be an option? For now + * it doesn't matter because we clean up explicitly in ATEOSubXact_SPI(). */ - u_sess->SPI_cxt._current->procCxt = AllocSetContextCreate(u_sess->top_transaction_mem_cxt, "SPI Proc", + u_sess->SPI_cxt._current->procCxt = + AllocSetContextCreate(u_sess->SPI_cxt._current->atomic ? u_sess->top_transaction_mem_cxt : + t_thrd.mem_cxt.portal_mem_cxt, "SPI Proc", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - u_sess->SPI_cxt._current->execCxt = AllocSetContextCreate(u_sess->top_transaction_mem_cxt, "SPI Exec", + u_sess->SPI_cxt._current->execCxt = + AllocSetContextCreate(u_sess->SPI_cxt._current->atomic ? u_sess->top_transaction_mem_cxt : + u_sess->SPI_cxt._current->procCxt, "SPI Exec", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); /* ... and switch to procedure's context */ u_sess->SPI_cxt._current->savedcxt = MemoryContextSwitchTo(u_sess->SPI_cxt._current->procCxt); @@ -185,11 +204,138 @@ int SPI_finish(void) return SPI_OK_FINISH; } +void SPI_start_transaction(void) +{ + MemoryContext oldContext = CurrentMemoryContext; + + StartTransactionCommand(true); + MemoryContextSwitchTo(oldContext); +} + +void SPI_commit(void) +{ + MemoryContext oldContext = CurrentMemoryContext; + +#ifdef ENABLE_MULTIPLE_NODES + /* Can not commit at non-CN nodes */ + if (!IS_PGXC_COORDINATOR || IsConnFromCoord()) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot commit at non-CN node"))); + } +#endif + + /* If commit is not within stored procedure report error */ + if (!u_sess->SPI_cxt.is_stp) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot commit within function"))); + } + + /* Cannot commit if it's atomic is true */ + if (u_sess->SPI_cxt._current->atomic) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("invalid transaction termination"))); + } + + /* + * Hold any pinned portals that any PLs might be using. We have to do + * this before changing trasnaction state, since this will run + * user-defined code that might throw an error. + */ + HoldPinnedPortals(); + + /* + * This restriction is required by PLs implemented on top of SPI. They + * use subtransactions to establish exception blocks that are supposed to + * be rolled back together if there is an error. Terminating the + * top-level transaction in such a block violates that idea. A future PL + * implementation might have different ideas about this, in which case + * this restriction would have to be refined or the check possibly be + * moved out of SPI into the PLs. + */ + u_sess->SPI_cxt._current->internal_xact = true; + + while (ActiveSnapshotSet()) { + PopActiveSnapshot(); + } + + CommitTransactionCommand(true); + MemoryContextSwitchTo(oldContext); + + u_sess->SPI_cxt._current->internal_xact = false; +} + +void SPI_rollback(void) +{ + MemoryContext oldContext = CurrentMemoryContext; + +#ifdef ENABLE_MULTIPLE_NODES + /* Can not commit at non-CN nodes */ + if (!IS_PGXC_COORDINATOR || IsConnFromCoord()) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot rollback at non-CN node"))); + } +#endif + + /* If commit is not within stored procedure report error */ + if (!u_sess->SPI_cxt.is_stp) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot rollback within function"))); + } + + /* Cannot commit if it's atomic is true */ + if (u_sess->SPI_cxt._current->atomic) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("invalid transaction termination"))); + } + + /* + * Hold any pinned portals that any PLs might be using. We have to do + * this before changing trasnaction state, since this will run + * user-defined code that might throw an error. + */ + HoldPinnedPortals(); + + /* see under SPI_commit() */ + u_sess->SPI_cxt._current->internal_xact = true; + + AbortCurrentTransaction(true); + MemoryContextSwitchTo(oldContext); + + u_sess->SPI_cxt._current->internal_xact = false; +} + +/* + * Clean up SPI state. Called on trasnaction end (of non-SPI-internal + * trasnactions) and when retruning to the main loop on error. + */ +void SPICleanup(void) +{ + u_sess->SPI_cxt._current = u_sess->SPI_cxt._stack = NULL; + u_sess->SPI_cxt._stack_depth = 0; + u_sess->SPI_cxt._connected = u_sess->SPI_cxt._curid = -1; + SPI_processed = 0; + u_sess->SPI_cxt.lastoid = InvalidOid; + SPI_tuptable = NULL; +} + /* * Clean up SPI state at transaction commit or abort. */ -void AtEOXact_SPI(bool isCommit) +void AtEOXact_SPI(bool isCommit, bool stpRollback, bool stpCommit) { + /* + * Do nothing if the trasnaction end was initiated by SPI. + */ + if (stpRollback || stpCommit) { + return; + } + /* * Note that memory contexts belonging to SPI stack entries will be freed * automatically, so we can ignore them here. We just need to restore our @@ -200,12 +346,7 @@ void AtEOXact_SPI(bool isCommit) errhint("Check for missing \"SPI_finish\" calls."))); } - u_sess->SPI_cxt._current = u_sess->SPI_cxt._stack = NULL; - u_sess->SPI_cxt._stack_depth = 0; - u_sess->SPI_cxt._connected = u_sess->SPI_cxt._curid = -1; - SPI_processed = 0; - u_sess->SPI_cxt.lastoid = InvalidOid; - SPI_tuptable = NULL; + SPICleanup(); } /* @@ -214,8 +355,12 @@ void AtEOXact_SPI(bool isCommit) * During commit, there shouldn't be any unclosed entries remaining from * the current subtransaction; we emit a warning if any are found. */ -void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) +void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid, bool stpRollback, bool stpCommit) { + if (stpRollback || stpCommit) { + return; + } + bool found = false; while (u_sess->SPI_cxt._connected >= 0) { @@ -225,6 +370,10 @@ void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) break; /* couldn't be any underneath it either */ } + if (connection->internal_xact) { + break; + } + found = true; /* * Release procedure memory explicitly (see note in SPI_connect) @@ -1873,6 +2022,11 @@ static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot sn CachedPlan *cplan = NULL; ListCell *lc1 = NULL; bool tmp_enable_light_proxy = u_sess->attr.attr_sql.enable_light_proxy; + TransactionId oldTransactionId = InvalidTransactionId; + + if (!RecoveryInProgress()) { + oldTransactionId = GetTopTransactionId(); + } /* not allow Light CN */ u_sess->attr.attr_sql.enable_light_proxy = false; @@ -2118,7 +2272,9 @@ static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot sn } /* Done with this plan, so release refcount */ - ReleaseCachedPlan(cplan, plan->saved); + if ((!RecoveryInProgress()) && (oldTransactionId == GetTopTransactionId())) { + ReleaseCachedPlan(cplan, plan->saved); + } cplan = NULL; /* @@ -2228,6 +2384,7 @@ static int _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount, bo int operation = queryDesc->operation; int eflags; int res; + TransactionId oldTransactionId = InvalidTransactionId; switch (operation) { case CMD_SELECT: @@ -2275,6 +2432,9 @@ static int _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount, bo eflags = EXEC_FLAG_SKIP_TRIGGERS; } + if (!RecoveryInProgress()) { + oldTransactionId = GetTopTransactionId(); + } ExecutorStart(queryDesc, eflags); bool forced_control = !from_lock && IS_PGXC_COORDINATOR && @@ -2351,6 +2511,17 @@ static int _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount, bo } } + /* + * If there are commit/rollback within stored proedure. Snapshot has already free during commit/rollback process. + * Therefor, need to set queryDesc snapshots to NULL. Otherwise the reference will be stale pointers. + */ + if ((!RecoveryInProgress()) && (oldTransactionId != GetTopTransactionId())) { + queryDesc->snapshot = NULL; + queryDesc->crosscheck_snapshot = NULL; + queryDesc->estate->es_snapshot = NULL; + queryDesc->estate->es_crosscheck_snapshot = NULL; + } + ExecutorFinish(queryDesc); ExecutorEnd(queryDesc); /* FreeQueryDesc is done by the caller */ diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 90fc45193cd6185d967fdad92ae01d98eefb3a9f..60900c8e4085e942cb4e7bd7e6eacf19e6b16345 100644 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -277,7 +277,7 @@ typedef struct GTMCallbackItem { /* local function prototypes */ static void AssignTransactionId(TransactionState s); -static void AbortTransaction(bool PerfectRollback); +static void AbortTransaction(bool PerfectRollback = false, bool stpRollback = false); static void AtAbort_Memory(void); static void AtCleanup_Memory(void); static void AtAbort_ResourceOwner(void); @@ -294,13 +294,13 @@ static void CleanSequenceCallbacks(void); static void CallSequenceCallbacks(GTMEvent event); #endif static void CleanupTransaction(void); -static void CommitTransaction(void); +static void CommitTransaction(bool stpCommit = false); static TransactionId RecordTransactionAbort(bool isSubXact); static void StartTransaction(bool begin_on_gtm); static void StartSubTransaction(void); -static void CommitSubTransaction(void); -static void AbortSubTransaction(void); +static void CommitSubTransaction(bool stpCommit = false); +static void AbortSubTransaction(bool stpRollback = false); static void CleanupSubTransaction(void); static void PushTransaction(void); static void PopTransaction(void); @@ -316,7 +316,7 @@ static void ShowTransactionState(const char* str); static void ShowTransactionStateRec(TransactionState state); static const char* BlockStateAsString(TBlockState blockState); static const char* TransStateAsString(TransState state); -static void PrepareTransaction(void); +static void PrepareTransaction(bool stpCommit = false); extern void print_leak_warning_at_commit(); #ifndef ENABLE_LLT @@ -2365,7 +2365,7 @@ void ThreadLocalFlagCleanUp() * * NB: if you change this routine, better look at PrepareTransaction too! */ -static void CommitTransaction(void) +static void CommitTransaction(bool stpCommit) { u_sess->need_report_top_xid = false; TransactionState s = CurrentTransactionState; @@ -2410,6 +2410,14 @@ static void CommitTransaction(void) t_thrd.xact_cxt.handlesDestroyedInCancelQuery = false; ThreadLocalFlagCleanUp(); + /* + * When commit within nested store procedure, it will create a plan cache. + * During commit time, need to clean up those plan cache. + */ + if (stpCommit) { + ResourceOwnerDecrementNPlanRefs(t_thrd.utils_cxt.CurrentResourceOwner, true); + } + #ifdef PGXC /* * If we are a Coordinator and currently serving the client, @@ -2486,7 +2494,7 @@ static void CommitTransaction(void) */ Assert(GlobalTransactionIdIsValid(s->transactionId)); - PrepareTransaction(); + PrepareTransaction(stpCommit); s->blockState = TBLOCK_DEFAULT; /* @@ -2519,7 +2527,7 @@ static void CommitTransaction(void) * If there weren't any, we are done ... otherwise loop back to check * if they queued deferred triggers. Lather, rinse, repeat. */ - if (!PreCommit_Portals(false)) + if (!PreCommit_Portals(false, stpCommit)) break; } @@ -2848,9 +2856,11 @@ static void CommitTransaction(void) AtCommit_Notify(); AtEOXact_GUC(true, 1); - AtEOXact_SPI(true); + AtEOXact_SPI(true, false, stpCommit); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + if (!stpCommit){ + AtEOXact_Namespace(true); + } AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -3093,7 +3103,7 @@ bool AtEOXact_GlobalTxn(bool commit, bool is_write) * If PrepareTransaction is called during an implicit 2PC, do not release ressources, * this is made by CommitTransaction when transaction has been committed on Nodes. */ -static void PrepareTransaction(void) +static void PrepareTransaction(bool stpCommit) { u_sess->need_report_top_xid = false; TransactionState s = CurrentTransactionState; @@ -3198,7 +3208,7 @@ static void PrepareTransaction(void) * If there weren't any, we are done ... otherwise loop back to check * if they queued deferred triggers. Lather, rinse, repeat. */ - if (!PreCommit_Portals(true)) + if (!PreCommit_Portals(true, stpCommit)) break; } @@ -3392,9 +3402,16 @@ static void PrepareTransaction(void) /* PREPARE acts the same as COMMIT as far as GUC is concerned */ AtEOXact_GUC(true, 1); - AtEOXact_SPI(true); + AtEOXact_SPI(true, false, stpCommit); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + /* + * For commit within stored procedure don't clean up namespace. + * Otherwise it will throw warning leaked override search path, + * since we push the search path hasn't pop yet. + */ + if (!stpCommit) { + AtEOXact_Namespace(true); + } AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -3468,7 +3485,7 @@ static void PrepareTransaction(void) #endif } -static void AbortTransaction(bool PerfectRollback = false) +static void AbortTransaction(bool PerfectRollback, bool stpRollback) { u_sess->need_report_top_xid = false; TransactionState s = CurrentTransactionState; @@ -3723,7 +3740,7 @@ static void AbortTransaction(bool PerfectRollback = false) */ AfterTriggerEndXact(false); /* 'false' means it's abort */ CallXactCallbacks(XACT_EVENT_PREROLLBACK_CLEANUP); - AtAbort_Portals(); + AtAbort_Portals(stpRollback); AtEOXact_LargeObject(false); AtAbort_Notify(); AtEOXact_RelationMap(false); @@ -3787,9 +3804,11 @@ static void AbortTransaction(bool PerfectRollback = false) if (change_user_name) u_sess->misc_cxt.CurrentUserName = NULL; - AtEOXact_SPI(false); + AtEOXact_SPI(false, stpRollback, false); AtEOXact_on_commit_actions(false); - AtEOXact_Namespace(false); + if (!stpRollback) { + AtEOXact_Namespace(false); + } AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -3876,7 +3895,7 @@ static void CleanupTransaction(void) #endif } -void StartTransactionCommand(void) +void StartTransactionCommand(bool stpRollback) { TransactionState s = CurrentTransactionState; @@ -3918,6 +3937,9 @@ void StartTransactionCommand(void) */ case TBLOCK_ABORT: case TBLOCK_SUBABORT: + if (stpRollback) { + s->blockState = TBLOCK_DEFAULT; + } break; /* These cases are invalid. */ @@ -3949,10 +3971,11 @@ void StartTransactionCommand(void) (void)MemoryContextSwitchTo(t_thrd.mem_cxt.cur_transaction_mem_cxt); } -void CommitTransactionCommand(void) +void CommitTransactionCommand(bool stpCommit) { TransactionState s = CurrentTransactionState; TBlockState oldstate = s->blockState; + const int stpownerlevel = 2; switch (s->blockState) { /* @@ -3971,7 +3994,7 @@ void CommitTransactionCommand(void) * transaction commit, and return to the idle state. */ case TBLOCK_STARTED: - CommitTransaction(); + CommitTransaction(stpCommit); s->blockState = TBLOCK_DEFAULT; break; @@ -3993,6 +4016,29 @@ void CommitTransactionCommand(void) case TBLOCK_INPROGRESS: case TBLOCK_SUBINPROGRESS: CommandCounterIncrement(); + + if (stpCommit && u_sess->SPI_cxt.portal_stp_exception_counter > 0) { + int subTransactionCounter = 0; + Assert(!StreamThreadAmI()); + + do { + MemoryContextSwitchTo(t_thrd.mem_cxt.cur_transaction_mem_cxt); + CommitSubTransaction(stpCommit); + s = CurrentTransactionState; + subTransactionCounter++; + + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { + /* CN should send release savepoint command to remote nodes for savepoint name reuse */ + pgxc_node_remote_savepoint("release s1", EXEC_ON_ALL_NODES, true, false); + } + } while (s->blockState == TBLOCK_SUBINPROGRESS); + + /* If we had a commit command, finish off the main xact too */ + Assert(subTransactionCounter == u_sess->SPI_cxt.portal_stp_exception_counter); + t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.utils_cxt.StpSavedResourceOwner; + CommitTransaction(stpCommit); + s->blockState = TBLOCK_DEFAULT; + } break; /* @@ -4000,7 +4046,7 @@ void CommitTransactionCommand(void) * idle state. */ case TBLOCK_END: - CommitTransaction(); + CommitTransaction(stpCommit); s->blockState = TBLOCK_DEFAULT; break; @@ -4029,7 +4075,7 @@ void CommitTransactionCommand(void) * and then clean up. */ case TBLOCK_ABORT_PENDING: - AbortTransaction(true); + AbortTransaction(true, false); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -4050,6 +4096,9 @@ void CommitTransactionCommand(void) * state.) */ case TBLOCK_SUBBEGIN: + if (CurrentTransactionState->nestingLevel == stpownerlevel) { + t_thrd.utils_cxt.StpSavedResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner; + } StartSubTransaction(); s->blockState = TBLOCK_SUBINPROGRESS; break; @@ -4112,7 +4161,7 @@ void CommitTransactionCommand(void) /* As above, but it's not dead yet, so abort first. */ case TBLOCK_SUBABORT_PENDING: - AbortSubTransaction(); + AbortSubTransaction(stpCommit); CleanupSubTransaction(); if (t_thrd.xact_cxt.handlesDestroyedInCancelQuery) { ereport(WARNING, @@ -4137,7 +4186,7 @@ void CommitTransactionCommand(void) s->name = NULL; savepointLevel = s->savepointLevel; - AbortSubTransaction(); + AbortSubTransaction(stpCommit); CleanupSubTransaction(); if (t_thrd.xact_cxt.handlesDestroyedInCancelQuery) { ereport(WARNING, @@ -4198,7 +4247,7 @@ void CommitTransactionCommand(void) } } -void AbortCurrentTransaction(void) +void AbortCurrentTransaction(bool stpRollback) { TransactionState s = CurrentTransactionState; @@ -4216,7 +4265,7 @@ void AbortCurrentTransaction(void) */ if (s->state == TRANS_START) s->state = TRANS_INPROGRESS; - AbortTransaction(); + AbortTransaction(false, stpRollback); CleanupTransaction(); } break; @@ -4226,7 +4275,7 @@ void AbortCurrentTransaction(void) * & cleanup transaction. */ case TBLOCK_STARTED: - AbortTransaction(); + AbortTransaction(false, stpRollback); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -4239,7 +4288,7 @@ void AbortCurrentTransaction(void) * state. */ case TBLOCK_BEGIN: - AbortTransaction(); + AbortTransaction(false, stpRollback); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -4250,7 +4299,7 @@ void AbortCurrentTransaction(void) * ABORT state. We will stay in ABORT until we get a ROLLBACK. */ case TBLOCK_INPROGRESS: - AbortTransaction(); + AbortTransaction(false, stpRollback); s->blockState = TBLOCK_ABORT; /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ break; @@ -4261,7 +4310,7 @@ void AbortCurrentTransaction(void) * the transaction). */ case TBLOCK_END: - AbortTransaction(); + AbortTransaction(false, stpRollback); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -4290,7 +4339,7 @@ void AbortCurrentTransaction(void) * Abort, cleanup, go to idle state. */ case TBLOCK_ABORT_PENDING: - AbortTransaction(); + AbortTransaction(false, stpRollback); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -4301,7 +4350,7 @@ void AbortCurrentTransaction(void) * the transaction). */ case TBLOCK_PREPARE: - AbortTransaction(); + AbortTransaction(false, stpRollback); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -4312,8 +4361,29 @@ void AbortCurrentTransaction(void) * we get ROLLBACK. */ case TBLOCK_SUBINPROGRESS: - AbortSubTransaction(); - s->blockState = TBLOCK_SUBABORT; + if (stpRollback && u_sess->SPI_cxt.portal_stp_exception_counter > 0) { + int subTransactionCounter = 0; + do { + AbortSubTransaction(stpRollback); + s->blockState = TBLOCK_SUBABORT; + CleanupSubTransaction(); + s = CurrentTransactionState; + subTransactionCounter++; + } while (s->blockState == TBLOCK_SUBINPROGRESS); + + Assert(subTransactionCounter == u_sess->SPI_cxt.portal_stp_exception_counter); + if (s->state == TRANS_START) { + s->state = TRANS_INPROGRESS; + } + + AbortTransaction(false, stpRollback); + CleanupTransaction(); + s->blockState = TBLOCK_DEFAULT; + } else { + AbortSubTransaction(); + s->blockState = TBLOCK_SUBABORT; + } + if (t_thrd.xact_cxt.handlesDestroyedInCancelQuery) { ereport(WARNING, (errmsg( @@ -4332,7 +4402,7 @@ void AbortCurrentTransaction(void) case TBLOCK_SUBCOMMIT: case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: - AbortSubTransaction(); + AbortSubTransaction(stpRollback); CleanupSubTransaction(); if (t_thrd.xact_cxt.handlesDestroyedInCancelQuery) { ereport(WARNING, @@ -4347,7 +4417,7 @@ void AbortCurrentTransaction(void) case TBLOCK_SUBABORT_END: case TBLOCK_SUBABORT_RESTART: CleanupSubTransaction(); - AbortCurrentTransaction(); + AbortCurrentTransaction(stpRollback); break; default: ereport(FATAL, @@ -5473,7 +5543,7 @@ void RollbackAndReleaseCurrentSubTransaction(void) /* Abort the current subtransaction, if needed. */ if (s->blockState == TBLOCK_SUBINPROGRESS) { - AbortSubTransaction(); + AbortSubTransaction(false); } /* And clean it up, too */ @@ -5586,7 +5656,7 @@ void AbortOutOfAnyTransaction(bool reserve_topxact_abort) case TBLOCK_SUBCOMMIT: case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: - AbortSubTransaction(); + AbortSubTransaction(false); CleanupSubTransaction(); s = CurrentTransactionState; /* changed by pop */ break; @@ -5627,6 +5697,10 @@ bool IsTransactionBlock(void) { TransactionState s = CurrentTransactionState; + if (u_sess->SPI_cxt.portal_stp_exception_counter > 0 && s->blockState == TBLOCK_SUBINPROGRESS) { + return false; + } + if (s->blockState == TBLOCK_DEFAULT || s->blockState == TBLOCK_STARTED) { return false; } @@ -5770,7 +5844,7 @@ static void StartSubTransaction(void) * The caller has to make sure to always reassign CurrentTransactionState * if it has a local pointer to it after calling this function. */ -static void CommitSubTransaction(void) +static void CommitSubTransaction(bool stpCommit) { TransactionState s = CurrentTransactionState; @@ -5862,14 +5936,27 @@ static void CommitSubTransaction(void) XactLockTableDelete(s->transactionId); } + /* + * When commit within nedted store procedure, it will create a plan cache. + * During commit time, need to clean up those plan cache. + */ + if (stpCommit) { + ResourceOwnerDecrementNPlanRefs(t_thrd.utils_cxt.CurrentResourceOwner, true); + ResourceOwnerDecrementNsnapshots(t_thrd.utils_cxt.CurrentResourceOwner, NULL); + } + /* Other locks should get transferred to their parent resource owner. */ ResourceOwnerRelease(s->curTransactionOwner, RESOURCE_RELEASE_LOCKS, true, false); ResourceOwnerRelease(s->curTransactionOwner, RESOURCE_RELEASE_AFTER_LOCKS, true, false); AtEOXact_GUC(true, s->gucNestLevel); - AtEOSubXact_SPI(true, s->subTransactionId); + if (!stpCommit) { + AtEOSubXact_SPI(true, s->subTransactionId, false, stpCommit); + } AtEOSubXact_on_commit_actions(true, s->subTransactionId, s->parent->subTransactionId); - AtEOSubXact_Namespace(true, s->subTransactionId, s->parent->subTransactionId); + if (!stpCommit) { + AtEOSubXact_Namespace(true, s->subTransactionId, s->parent->subTransactionId); + } AtEOSubXact_Files(true, s->subTransactionId, s->parent->subTransactionId); AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel); @@ -5895,7 +5982,7 @@ static void CommitSubTransaction(void) PopTransaction(); } -static void AbortSubTransaction(void) +static void AbortSubTransaction(bool stpRollback) { TransactionState s = CurrentTransactionState; t_thrd.xact_cxt.bInAbortTransaction = true; @@ -6045,9 +6132,11 @@ static void AbortSubTransaction(void) ResourceOwnerRelease(s->curTransactionOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false); AtEOXact_GUC(false, s->gucNestLevel); - AtEOSubXact_SPI(false, s->subTransactionId); + AtEOSubXact_SPI(false, s->subTransactionId, stpRollback, false); AtEOSubXact_on_commit_actions(false, s->subTransactionId, s->parent->subTransactionId); - AtEOSubXact_Namespace(false, s->subTransactionId, s->parent->subTransactionId); + if (!stpRollback) { + AtEOSubXact_Namespace(false, s->subTransactionId, s->parent->subTransactionId); + } AtEOSubXact_Files(false, s->subTransactionId, s->parent->subTransactionId); AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel); @@ -6217,6 +6306,7 @@ static void PopTransaction(void) /* Ditto for ResourceOwner links */ t_thrd.utils_cxt.CurTransactionResourceOwner = s->parent->curTransactionOwner; t_thrd.utils_cxt.CurrentResourceOwner = s->parent->curTransactionOwner; + t_thrd.xact_cxt.currentSubTransactionId = s->parent->subTransactionId; /* Free the old child structure */ if (s->name) { diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 4b78e926e4cb7faa239dca6b899d384af486396c..c5444ff71d4cdf90c470797afa29bfdca9d1bc90 100755 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -311,12 +311,12 @@ extern void CopyTransactionIdLoggedIfAny(TransactionState state); extern bool TransactionIdIsCurrentTransactionId(TransactionId xid); extern void CommandCounterIncrement(void); extern void ForceSyncCommit(void); -extern void StartTransactionCommand(void); -extern void CommitTransactionCommand(void); +extern void StartTransactionCommand(bool stpRollback = false); +extern void CommitTransactionCommand(bool stpCommit = false); #ifdef PGXC extern void AbortCurrentTransactionOnce(void); #endif -extern void AbortCurrentTransaction(void); +extern void AbortCurrentTransaction(bool stpRollback = false); extern void BeginTransactionBlock(void); extern bool EndTransactionBlock(void); extern bool PrepareTransactionBlock(const char* gid); diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index 861a5e119d8c1d8048e3bab9bad3b52f231d9779..1048f5497f848df10851a3f7c4c37fea257d1d27 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -52,7 +52,7 @@ extern void CreateCast(CreateCastStmt* stmt); extern void DropCastById(Oid castOid); extern void AlterFunctionNamespace(List* name, List* argtypes, bool isagg, const char* newschema); extern Oid AlterFunctionNamespace_oid(Oid procOid, Oid nspOid); -extern void ExecuteDoStmt(const DoStmt* stmt); +extern void ExecuteDoStmt(const DoStmt* stmt, bool atomic); extern Oid get_cast_oid(Oid sourcetypeid, Oid targettypeid, bool missing_ok); /* commands/operatorcmds.c */ diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index da85ceb66f941a128fb5ae101803fa4e74e83a63..962cc56b84d51cf2f68b75e35aae519d253bfd0a 100755 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -56,12 +56,15 @@ typedef struct _SPI_plan* SPIPlanPtr; #define SPI_OK_UPDATE_RETURNING 13 #define SPI_OK_REWRITTEN 14 #define SPI_OK_MERGE 15 +#define SPI_OPT_NOATOMIC (1 << 0) extern THR_LOCAL PGDLLIMPORT uint32 SPI_processed; extern THR_LOCAL PGDLLIMPORT SPITupleTable* SPI_tuptable; extern THR_LOCAL PGDLLIMPORT int SPI_result; extern int SPI_connect(CommandDest dest = DestSPI, void (*spiCallbackfn)(void*) = NULL, void* clientData = NULL); +extern int SPI_connect_ext(CommandDest dest = DestSPI, void (*spiCallbackfn)(void*) = NULL, + void* clientData = NULL, int options =0); extern int SPI_finish(void); extern void SPI_push(void); extern void SPI_pop(void); @@ -123,8 +126,13 @@ extern void SPI_scroll_cursor_fetch(Portal, FetchDirection direction, long count extern void SPI_scroll_cursor_move(Portal, FetchDirection direction, long count); extern void SPI_cursor_close(Portal portal); -extern void AtEOXact_SPI(bool isCommit); -extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid); +extern void SPI_start_transaction(void); +extern void SPI_commit(void); +extern void SPI_rollback(void); +extern void SPICleanup(void); + +extern void AtEOXact_SPI(bool isCommit, bool stpRollback, bool stpCommit); +extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid, bool stpRollback, bool stpCommit); extern DestReceiver* createAnalyzeSPIDestReceiver(CommandDest dest); /* SPI execution helpers */ extern void spi_exec_with_callback(CommandDest dest, const char* src, bool read_only, long tcount, bool direct_call, diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h index 01bacd991a5594725ac1271c93f8d1c7446ce436..7f6942121fa266dd7a8ec901287cdd09037b1d9e 100755 --- a/src/include/executor/spi_priv.h +++ b/src/include/executor/spi_priv.h @@ -28,6 +28,11 @@ typedef struct _SPI_connection { MemoryContext savedcxt; /* context of SPI_connect's caller */ SubTransactionId connectSubid; /* ID of connecting subtransaction */ CommandDest dest; /* identify which is the orientated caller of spi interface, analyze or normal */ + + /* transaction management suppoort */ + bool atomic; /* atomic execution context, does not allow transactions */ + bool internal_xact; /* SPI-managed transaction boundary, skip cleanup */ + void* clientData; /* argument to call back function */ void (*spiCallback)(void*); /* callback for process received data. */ } _SPI_connection; diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index a5fe56fffc9e4288eae5fe3ad883e34c6f0aea01..130161cc3126cfeef2e2c484f345a12301051b53 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -183,6 +183,14 @@ typedef struct knl_u_SPI_context { struct _SPI_connection* _stack; struct _SPI_connection* _current; + + bool is_toplevel_stp; + + bool is_stp; + + bool is_proconfig_set; + + int portal_stp_exception_counter; } knl_u_SPI_context; typedef struct knl_u_index_context { diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index c09c6ea9194a24c780e5b93b6429d4cc8e6c8343..ed3799e953cf657cfaa7289d1c3a2d0f7d20e12b 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -1578,6 +1578,7 @@ typedef struct knl_t_utils_context { struct ResourceOwnerData* CurrentResourceOwner; struct ResourceOwnerData* CurTransactionResourceOwner; struct ResourceOwnerData* TopTransactionResourceOwner; + struct ResourceOwnerData* StpSavedResourceOwner; struct ResourceReleaseCallbackItem* ResourceRelease_callbacks; bool SortColumnOptimize; struct RelationData* pRelatedRel; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4ccfc4d2809b4daa7e5e23cf1a1476e8802f3084..11c8ab5d68a99f1692b4c83485c2e4248d848964 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1734,6 +1734,7 @@ typedef struct FunctionScanState { TupleDesc tupdesc; Tuplestorestate* tuplestorestate; ExprState* funcexpr; + bool atomic; } FunctionScanState; /* ---------------- diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3c1faae88412789c3cf721ee1f71ded09229da9e..78d9f6db443a7861e1ae7cde54fcc1bee0cbedec 100755 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2683,8 +2683,14 @@ typedef struct InlineCodeBlock { char* source_text; /* source text of anonymous code block */ Oid langOid; /* OID of selected language */ bool langIsTrusted; /* trusted property of the language */ + bool atomic; /* atomic execution context */ } InlineCodeBlock; +typedef struct CallContext { + NodeTag type; + bool atomic; +} CallContext; + /* ---------------------- * Alter Object Rename Statement * ---------------------- diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index 57890b4060e43a1dfcacaa5f934e0e6464ec8014..662eede93689bbb114ff9be3c411bf5c7a073656 100755 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -145,6 +145,7 @@ typedef struct PortalData { /* Status data */ PortalStatus status; /* see above */ bool portalPinned; /* a pinned portal can't be dropped */ + bool autoHeld; /* was automatically converted from pinned to held */ /* If not NULL, Executor is active; call ExecutorEnd eventually: */ QueryDesc* queryDesc; /* info needed for executor invocation */ @@ -206,9 +207,10 @@ typedef struct PortalData { /* Prototypes for functions in utils/mmgr/portalmem.c */ extern void EnablePortalManager(void); -extern bool PreCommit_Portals(bool isPrepare); -extern void AtAbort_Portals(void); +extern bool PreCommit_Portals(bool isPrepare, bool stpCommit); +extern void AtAbort_Portals(bool stpRollback); extern void AtCleanup_Portals(void); +extern void PortalErrorCleanup(void); extern void AtSubCommit_Portals(SubTransactionId mySubid, SubTransactionId parentSubid, ResourceOwner parentXactOwner); extern void AtSubAbort_Portals( SubTransactionId mySubid, SubTransactionId parentSubid, ResourceOwner myXactOwner, ResourceOwner parentXactOwner); @@ -229,5 +231,6 @@ extern void PortalCreateHoldStore(Portal portal); extern void PortalHashTableDeleteAll(void); extern bool ThereAreNoReadyPortals(void); extern void ResetPortalCursor(SubTransactionId mySubid, Oid funOid, int funUseCount); +extern void HoldPinnedPortals(void); #endif /* PORTAL_H */ diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h index c6a8c9743f2974c7209e3d0c407116d9b3cc3b03..e12d01bae5ce1d0d4bbf46879b8759110ae91726 100644 --- a/src/include/utils/resowner.h +++ b/src/include/utils/resowner.h @@ -63,6 +63,9 @@ extern ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char* name) extern void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel); extern void ResourceOwnerDelete(ResourceOwner owner); extern ResourceOwner ResourceOwnerGetParent(ResourceOwner owner); +extern ResourceOwner ResourceOwnerGetNextChild(ResourceOwner owner); +extern ResourceOwner ResourceOwnerGetFirstChild(ResourceOwner owner); +extern const char* ResourceOwnerGetName(ResourceOwner owner); extern void ResourceOwnerNewParent(ResourceOwner owner, ResourceOwner newparent); extern void RegisterResourceReleaseCallback(ResourceReleaseCallback callback, void* arg); extern void UnregisterResourceReleaseCallback(ResourceReleaseCallback callback, void* arg); @@ -116,6 +119,8 @@ extern void ResourceOwnerForgetTupleDesc(ResourceOwner owner, const TupleDesc tu extern void ResourceOwnerEnlargeSnapshots(ResourceOwner owner); extern void ResourceOwnerRememberSnapshot(ResourceOwner owner, Snapshot snapshot); extern void ResourceOwnerForgetSnapshot(ResourceOwner owner, const Snapshot snapshot); +extern void ResourceOwnerDecrementNsnapshots(ResourceOwner owner, void* queryDesc); +extern void ResourceOwnerDecrementNPlanRefs(ResourceOwner owner, bool useResOwner); /* support for temporary file management */ extern void ResourceOwnerEnlargeFiles(ResourceOwner owner); diff --git a/src/test/regress/expected/transactions_control.out b/src/test/regress/expected/transactions_control.out new file mode 100644 index 0000000000000000000000000000000000000000..2065374d0d4c887c6baf8ea6e62405c6e9be68f9 --- /dev/null +++ b/src/test/regress/expected/transactions_control.out @@ -0,0 +1,379 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +AS +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END; +/ +CALL transaction_test1(); + transaction_test1 +------------------- + +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpgsql +$$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +-- transaction commands not allowed when called in transaction block +START TRANSACTION; +CALL transaction_test1(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT +COMMIT; +START TRANSACTION; +DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$; +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function inline_code_block line 1 at COMMIT +COMMIT; +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + RETURN 1; +END +$$; +SELECT transaction_test2(); +ERROR: cannot commit within function +CONTEXT: PL/pgSQL function transaction_test2() line 6 at COMMIT +referenced column: transaction_test2 +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + CALL transaction_test1(); + RETURN 1; +END; +$$; +SELECT transaction_test3(); +ERROR: cannot commit within function +CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT +SQL statement "CALL transaction_test1()" +PL/pgSQL function transaction_test3() line 3 at SQL statement +referenced column: transaction_test3 +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE 'DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$'; + RETURN 1; +END; +$$; +SELECT transaction_test4(); +ERROR: cannot commit within function +CONTEXT: PL/pgSQL function inline_code_block line 1 at COMMIT +SQL statement "DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$" +PL/pgSQL function transaction_test4() line 3 at EXECUTE statement +referenced column: transaction_test4 +-- proconfig settings currently disallow transaction statements +CREATE PROCEDURE transaction_test5() +SET work_mem = 555 +AS +BEGIN + COMMIT; +END; +/ +CALL transaction_test5(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test5() line 3 at COMMIT +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + COMMIT; + END LOOP; +END; +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 1 | + 2 | + 3 | + 4 | +(5 rows) + +-- check that this doesn't leak a holdable portal +SELECT * FROM pg_cursors; + name | statement | is_holdable | is_binary | is_scrollable | creation_time +------+-----------+-------------+-----------+---------------+--------------- +(0 rows) + +-- error in cursor loop with commit +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (12/(r.x-2)); + COMMIT; + END LOOP; +END; +$$; +ERROR: division by zero +CONTEXT: referenced column: a +SQL statement "INSERT INTO test1 (a) VALUES (12/(r.x-2))" +PL/pgSQL function inline_code_block line 6 at SQL statement +SELECT * FROM test1; + a | b +-----+--- + -6 | + -12 | +(2 rows) + +SELECT * FROM pg_cursors; + name | statement | is_holdable | is_binary | is_scrollable | creation_time +------+-----------+-------------+-----------+---------------+--------------- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +SELECT * FROM pg_cursors; + name | statement | is_holdable | is_binary | is_scrollable | creation_time +------+-----------+-------------+-----------+---------------+--------------- +(0 rows) + +-- first commit then rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + IF r.x % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END; +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | +(3 rows) + +SELECT * FROM pg_cursors; + name | statement | is_holdable | is_binary | is_scrollable | creation_time +------+-----------+-------------+-----------+---------------+--------------- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN UPDATE test2 SET x = x * 2 RETURNING x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; +ERROR: cannot perform transaction commands inside a cursor loop that is not read-only +CONTEXT: PL/pgSQL function inline_code_block line 7 at ROLLBACK +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +SELECT * FROM test2; + x +--- + 0 + 1 + 2 + 3 + 4 +(5 rows) + +SELECT * FROM pg_cursors; + name | statement | is_holdable | is_binary | is_scrollable | creation_time +------+-----------+-------------+-----------+---------------+--------------- +(0 rows) + +-- commit inside block with exception handler +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + COMMIT; + INSERT INTO test1 (a) VALUES (1/0); + COMMIT; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; +NOTICE: caught division_by_zero +SELECT * FROM test1; + a | b +---+--- + 1 | +(1 row) + +-- rollback inside block with exception handler +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + ROLLBACK; + INSERT INTO test1 (a) VALUES (1/0); + ROLLBACK; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; +NOTICE: caught division_by_zero +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- COMMIT failures +DO LANGUAGE plpgsql $$ +BEGIN + CREATE TABLE test3 (y int UNIQUE DEFERRABLE INITIALLY DEFERRED); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + INSERT INTO test3 (y) VALUES (2); + COMMIT; + INSERT INTO test3 (y) VALUES (3); -- won't get here +END; +$$; +NOTICE: CREATE TABLE / UNIQUE will create implicit index "test3_y_key" for table "test3" +CONTEXT: SQL statement "CREATE TABLE test3 (y int UNIQUE DEFERRABLE INITIALLY DEFERRED)" +PL/pgSQL function inline_code_block line 3 at SQL statement +ERROR: duplicate key value violates unique constraint "test3_y_key" +DETAIL: Key (y)=(1) already exists. +CONTEXT: PL/pgSQL function inline_code_block line 9 at COMMIT +SELECT * FROM test3; + y +--- + 1 +(1 row) + +DROP TABLE test1; +DROP TABLE test2; +DROP TABLE test3; +-- +CREATE TABLE test1(id int, name varchar(20)); +INSERT INTO test1 values(1, 'bbb'); +CREATE OR REPLACE PROCEDURE PROC_OUT_PARAM_001(P1 OUT INT) +AS +BEGIN +select id into P1 from test1 where name = 'bbb'; +insert into test1 values(P1, 'dddd'); +COMMIT; +insert into test1 values(P1, 'eee'); +ROLLBACK; +END; +/ +DECLARE +V_P1 INT; +BEGIN +PROC_OUT_PARAM_001(V_P1); +END; +/ +SELECT * from test1; + id | name +----+------ + 1 | bbb + 1 | dddd +(2 rows) + +DROP TABLE TEST1; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 2c49c0347c0a6d32c694fe2fa2ac8f5f283c57f9..0b39cb6019cfb5537830548fba81f62e2942bbce 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -248,7 +248,7 @@ test: subplan_new test: select test: col_subplan_base_1 col_subplan_new test: join -test: select_into select_distinct subselect_part1 subselect_part2 transactions random btree_index select_distinct_on union gs_aggregate arrays hash_index +test: select_into select_distinct subselect_part1 subselect_part2 transactions transactions_control random btree_index select_distinct_on union gs_aggregate arrays hash_index test: aggregates test: portals_p2 window tsearch temp__6 holdable_cursor col_subplan_base_2 diff --git a/src/test/regress/sql/transactions_control.sql b/src/test/regress/sql/transactions_control.sql new file mode 100644 index 0000000000000000000000000000000000000000..a8514b4e2cf66f30c4b6e9e72285d8c2753b36aa --- /dev/null +++ b/src/test/regress/sql/transactions_control.sql @@ -0,0 +1,305 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +AS +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END; +/ + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpgsql +$$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; + +SELECT * FROM test1; + + +-- transaction commands not allowed when called in transaction block +START TRANSACTION; +CALL transaction_test1(); +COMMIT; + +START TRANSACTION; +DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$; +COMMIT; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + RETURN 1; +END +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + CALL transaction_test1(); + RETURN 1; +END; +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE 'DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$'; + RETURN 1; +END; +$$; + +SELECT transaction_test4(); + + +-- proconfig settings currently disallow transaction statements +CREATE PROCEDURE transaction_test5() +SET work_mem = 555 +AS +BEGIN + COMMIT; +END; +/ + +CALL transaction_test5(); + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + COMMIT; + END LOOP; +END; +$$; + +SELECT * FROM test1; + +-- check that this doesn't leak a holdable portal +SELECT * FROM pg_cursors; + + +-- error in cursor loop with commit +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (12/(r.x-2)); + COMMIT; + END LOOP; +END; +$$; + +SELECT * FROM test1; + +SELECT * FROM pg_cursors; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; + +SELECT * FROM test1; + +SELECT * FROM pg_cursors; + + +-- first commit then rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + IF r.x % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END; +$$; + +SELECT * FROM test1; + +SELECT * FROM pg_cursors; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN UPDATE test2 SET x = x * 2 RETURNING x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; + +SELECT * FROM test1; +SELECT * FROM test2; + +SELECT * FROM pg_cursors; + +-- commit inside block with exception handler +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + COMMIT; + INSERT INTO test1 (a) VALUES (1/0); + COMMIT; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; + +SELECT * FROM test1; + + +-- rollback inside block with exception handler +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + ROLLBACK; + INSERT INTO test1 (a) VALUES (1/0); + ROLLBACK; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; + +SELECT * FROM test1; + + +-- COMMIT failures +DO LANGUAGE plpgsql $$ +BEGIN + CREATE TABLE test3 (y int UNIQUE DEFERRABLE INITIALLY DEFERRED); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + INSERT INTO test3 (y) VALUES (2); + COMMIT; + INSERT INTO test3 (y) VALUES (3); -- won't get here +END; +$$; + +SELECT * FROM test3; + + +DROP TABLE test1; +DROP TABLE test2; +DROP TABLE test3; + +-- +CREATE TABLE test1(id int, name varchar(20)); +INSERT INTO test1 values(1, 'bbb'); + +CREATE OR REPLACE PROCEDURE PROC_OUT_PARAM_001(P1 OUT INT) +AS +BEGIN +select id into P1 from test1 where name = 'bbb'; +insert into test1 values(P1, 'dddd'); +COMMIT; +insert into test1 values(P1, 'eee'); +ROLLBACK; +END; +/ + +DECLARE +V_P1 INT; +BEGIN +PROC_OUT_PARAM_001(V_P1); +END; +/ + +SELECT * from test1; + +DROP TABLE TEST1; \ No newline at end of file