From 1cbd7fcf8b52c3c6701980220bcc2ce717a63ad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A2=9C=E5=95=B8?= Date: Thu, 25 Jul 2024 17:59:34 +0800 Subject: [PATCH] fix task #IAFLSQ parallel hint for query and rel --- src/common/backend/nodes/copyfuncs.cpp | 15 ++ src/common/backend/nodes/outfuncs.cpp | 20 ++ src/common/backend/nodes/readfuncs.cpp | 21 ++ src/common/backend/parser/hint_gram.y | 38 +++- src/common/backend/parser/hint_scan.l | 1 + src/common/backend/parser/parse_hint.cpp | 121 +++++++++++- src/common/backend/utils/adt/ruleutils.cpp | 1 + src/common/backend/utils/init/globals.cpp | 1 + src/common/backend/utils/misc/guc/guc_sql.cpp | 4 +- .../optimizer/commands/explain.cpp | 3 + src/gausskernel/optimizer/path/allpaths.cpp | 8 +- src/gausskernel/optimizer/path/joinpath.cpp | 28 ++- .../optimizer/path/streampath_base.cpp | 10 +- src/gausskernel/optimizer/plan/createplan.cpp | 8 +- src/gausskernel/optimizer/plan/planner.cpp | 29 ++- src/gausskernel/optimizer/plan/streamplan.cpp | 22 +++ .../optimizer/plan/streamplan_single.cpp | 2 +- .../optimizer/prep/prepjointree.cpp | 1 + src/gausskernel/optimizer/util/pathnode.cpp | 56 +++++- src/gausskernel/optimizer/util/relnode.cpp | 29 +++ src/gausskernel/process/tcop/postgres.cpp | 3 + .../process/threadpool/knl_session.cpp | 1 + src/include/knl/knl_session.h | 2 + src/include/miscadmin.h | 1 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes_common.h | 1 + src/include/nodes/relation.h | 6 + src/include/optimizer/planner.h | 4 +- src/include/optimizer/streampath.h | 3 + src/include/optimizer/streamplan.h | 1 + src/include/parser/parse_hint.h | 11 ++ src/test/regress/input/parallel_hint.source | 57 ++++++ src/test/regress/output/parallel_hint.source | 181 ++++++++++++++++++ src/test/regress/parallel_schedule | 3 + 34 files changed, 671 insertions(+), 22 deletions(-) create mode 100644 src/test/regress/input/parallel_hint.source create mode 100644 src/test/regress/output/parallel_hint.source diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index fbbbde7e25..20e543d2c8 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -4584,6 +4584,17 @@ static SetHint* _copySetHint(const SetHint* from) return newnode; } +static ParallelHint* _copyParallelHint(const ParallelHint* from) +{ + ParallelHint* newnode = makeNode(ParallelHint); + + CopyBaseHintFilelds((const Hint*)from, (Hint*)newnode); + COPY_STRING_FIELD(name); + COPY_STRING_FIELD(value); + COPY_BITMAPSET_FIELD(relid); + return newnode; +} + static PlanCacheHint* _copyPlanCacheHint(const PlanCacheHint* from) { PlanCacheHint* newnode = makeNode(PlanCacheHint); @@ -4896,6 +4907,7 @@ static HintState* _copyHintState(const HintState* from) COPY_NODE_FIELD(gather_hint); COPY_NODE_FIELD(no_expand_hint); COPY_NODE_FIELD(set_hint); + COPY_NODE_FIELD(rel_dop_hint); COPY_NODE_FIELD(cache_plan_hint); COPY_NODE_FIELD(no_gpc_hint); COPY_NODE_FIELD(predpush_same_level_hint); @@ -9205,6 +9217,9 @@ void* copyObject(const void* from) case T_PlanCacheHint: retval = _copyPlanCacheHint((PlanCacheHint*)from); break; + case T_ParallelHint: + retval = _copyParallelHint((ParallelHint*)from); + break; case T_NoGPCHint: retval = _copyNoGPCHint((NoGPCHint*)from); break; diff --git a/src/common/backend/nodes/outfuncs.cpp b/src/common/backend/nodes/outfuncs.cpp index e70984e914..db670816eb 100755 --- a/src/common/backend/nodes/outfuncs.cpp +++ b/src/common/backend/nodes/outfuncs.cpp @@ -4577,6 +4577,20 @@ static void _outSetHint(StringInfo str, const SetHint* node) WRITE_STRING_FIELD(value); } +/* + * @Description: Parallel hint node to string. + * @out str: String buf. + * @in node: Parallel hint struct. + */ +static void _outParallelHint(StringInfo str, const ParallelHint* node) +{ + WRITE_NODE_TYPE("Parallel"); + _outBaseHint(str, (Hint*)node); + WRITE_STRING_FIELD(name); + WRITE_BITMAPSET_FIELD(relid); + WRITE_STRING_FIELD(value); +} + /* * @Description: Plancache hint node to string. * @out str: String buf. @@ -4863,6 +4877,9 @@ static void _outHintState(StringInfo str, HintState* node) if (t_thrd.proc->workingVersionNum >= SQL_PATCH_VERSION_NUM) { WRITE_BOOL_FIELD(from_sql_patch); } + if (t_thrd.proc->workingVersionNum >= PARALLEL_HINT_VERSION_NUM) { + WRITE_NODE_FIELD(rel_dop_hint); + } } static void _outRightRefState(StringInfo str, RightRefState* node) @@ -7342,6 +7359,9 @@ static void _outNode(StringInfo str, const void* obj) case T_SetHint: _outSetHint(str, (SetHint*) obj); break; + case T_ParallelHint: + _outParallelHint(str, (ParallelHint*) obj); + break; case T_PlanCacheHint: _outPlanCacheHint(str, (PlanCacheHint*) obj); break; diff --git a/src/common/backend/nodes/readfuncs.cpp b/src/common/backend/nodes/readfuncs.cpp index 7df5fdb85d..22c3f35d14 100755 --- a/src/common/backend/nodes/readfuncs.cpp +++ b/src/common/backend/nodes/readfuncs.cpp @@ -1219,6 +1219,22 @@ static SetHint* _readSetHint(void) READ_DONE(); } +/* + * @Description: Read string to parallel hint struct. + * @return: parallel hint struct. + */ +static ParallelHint* _readParallelHint(void) +{ + READ_LOCALS(ParallelHint); + + _readBaseHint(&(local_node->base)); + READ_STRING_FIELD(name); + READ_STRING_FIELD(value); + READ_BITMAPSET_FIELD(relid); + + READ_DONE(); +} + /* * @Description: Read string to rewrite plancache hint struct. * @return: plancache hint struct. @@ -1429,6 +1445,9 @@ static HintState* _readHintState() IF_EXIST(set_hint) { READ_NODE_FIELD(set_hint); } + IF_EXIST(rel_dop_hint) { + READ_NODE_FIELD(rel_dop_hint); + } IF_EXIST(cache_plan_hint) { READ_NODE_FIELD(cache_plan_hint); } @@ -7035,6 +7054,8 @@ Node* parseNodeString(void) return_value = _readNoExpandHint(); } else if (MATCH("SETHINT", 7)) { return_value = _readSetHint(); + } else if (MATCH("PARALLELHINT", 12)) { + return_value = _readParallelHint(); } else if (MATCH("PLANCACHEHINT", 13)) { return_value = _readPlanCacheHint(); } else if (MATCH("NOGPCHINT", 9)) { diff --git a/src/common/backend/parser/hint_gram.y b/src/common/backend/parser/hint_gram.y index 37079d9f20..fc8b30f02c 100755 --- a/src/common/backend/parser/hint_gram.y +++ b/src/common/backend/parser/hint_gram.y @@ -52,7 +52,7 @@ static double convert_to_numeric(Node *value); %type join_hint_item join_order_hint join_method_hint stream_hint row_hint scan_hint skew_hint expr_const pred_push_hint pred_push_same_level_hint rewrite_hint gather_hint set_hint plancache_hint guc_value no_expand_hint - no_gpc_hint + no_gpc_hint parallel_hint %type relation_list join_hint_list relation_item relation_list_with_p ident_list skew_relist column_list_p column_list value_list_p value_list value_list_item value_type value_list_with_bracket %token IDENT FCONST SCONST BCONST XCONST @@ -61,7 +61,7 @@ static double convert_to_numeric(Node *value); %token NestLoop_P MergeJoin_P HashJoin_P No_P Leading_P Rows_P Broadcast_P Redistribute_P BlockName_P TableScan_P IndexScan_P IndexOnlyScan_P Skew_P HINT_MULTI_NODE_P NULL_P TRUE_P FALSE_P Predpush_P PredpushSameLevel_P Rewrite_P Gather_P Set_P USE_CPLAN_P USE_GPLAN_P ON_P OFF_P No_expand_P SQL_IGNORE_P NO_GPC_P - CHOOSE_ADAPTIVE_GPLAN_P + CHOOSE_ADAPTIVE_GPLAN_P Parallel_P %nonassoc IDENT NULL_P @@ -159,6 +159,10 @@ join_hint_item: { $$ = $1; } + | parallel_hint + { + $$ = $1; + } | plancache_hint { $$ = $1; @@ -279,6 +283,36 @@ set_hint: $$ = (Node *) setHint; } +parallel_hint: + Parallel_P '(' ICONST ')' + { + Value* guc_val = integerToString(makeInteger($3)); + SetHint *setHint = makeNode(SetHint); + setHint->base.hint_keyword = HINT_KEYWORD_SET; + setHint->base.state = HINT_STATE_NOTUSED; + setHint->name = "query_dop"; + setHint->value = strVal(guc_val); + $$ = (Node *) setHint; + } + | + Parallel_P '(' IDENT ICONST ')' + { + int dop = $4; + Value* dop_val = integerToString(makeInteger($4)); + ParallelHint *parallelHint = makeNode(ParallelHint); + parallelHint->base.hint_keyword = HINT_KEYWORD_PARALLEL; + parallelHint->base.state = HINT_STATE_NOTUSED; + parallelHint->name = $3; + parallelHint->value = strVal(dop_val); + + if (dop > 1) { + parallelHint->base.relnames = list_make1(makeString($3)); + if (dop > u_sess->parser_cxt.hint_dop) + u_sess->parser_cxt.hint_dop = dop; + } + $$ = (Node *) parallelHint; + } + plancache_hint: USE_CPLAN_P { diff --git a/src/common/backend/parser/hint_scan.l b/src/common/backend/parser/hint_scan.l index 41b9bcb1bd..2f86f6c4b0 100755 --- a/src/common/backend/parser/hint_scan.l +++ b/src/common/backend/parser/hint_scan.l @@ -51,6 +51,7 @@ static const hintKeyword parsers[] = {HINT_CHOOSE_ADAPTIVE_GPLAN, CHOOSE_ADAPTIVE_GPLAN_P}, {HINT_NO_GPC, NO_GPC_P}, {HINT_SQL_IGNORE, SQL_IGNORE_P}, + {HINT_PARALLEL, Parallel_P}, }; static const hintKeyword* HintKeywordLookup(const char *str); diff --git a/src/common/backend/parser/parse_hint.cpp b/src/common/backend/parser/parse_hint.cpp index efacffd1fa..391361ea16 100755 --- a/src/common/backend/parser/parse_hint.cpp +++ b/src/common/backend/parser/parse_hint.cpp @@ -185,8 +185,8 @@ static void append_value(StringInfo buf, Value* value, Node* node) } } -#define HINT_NUM 17 -#define HINT_KEYWORD_NUM 23 +#define HINT_NUM 18 +#define HINT_KEYWORD_NUM 24 typedef struct { HintKeyword keyword; @@ -216,6 +216,7 @@ const char* G_HINT_KEYWORD[HINT_KEYWORD_NUM] = { (char*) HINT_GPLAN, (char*) HINT_SQL_IGNORE, (char*) HINT_CHOOSE_ADAPTIVE_GPLAN, + (char*) HINT_PARALLEL, (char*) HINT_NO_GPC, }; @@ -386,6 +387,28 @@ static void SetHintDesc(SetHint* hint, StringInfo buf) appendStringInfoString(buf, ")"); } +/* + * @Description: get the prompts for set-guc hint into subquery. + * @in hint: set-guc hint. + * @out buf: String buf. + */ +static void ParallelHintDesc(ParallelHint* hint, StringInfo buf) +{ + Assert(buf != NULL); + + appendStringInfo(buf, " %s(", KeywordDesc(hint->base.hint_keyword)); + + if (hint->name != NULL) { + appendStringInfo(buf, "%s", hint->name); + } + + if (hint->value != NULL) { + appendStringInfo(buf, " %s", hint->value); + } + + appendStringInfoString(buf, ")"); +} + /* * @Description: get the prompts for redicate pushdown into subquery. * @in hint: predicate pushdown hint. @@ -787,6 +810,9 @@ char* descHint(Hint* hint) case T_SetHint: SetHintDesc((SetHint*)hint, &str); break; + case T_ParallelHint: + ParallelHintDesc((ParallelHint*)hint, &str); + break; case T_PlanCacheHint: PlanCacheHintDesc((PlanCacheHint*)hint, &str); break; @@ -842,6 +868,7 @@ void desc_hint_in_state(PlannerInfo* root, HintState* hstate) find_unused_hint_to_buf(hstate->set_hint, &str_buf); find_unused_hint_to_buf(hstate->no_gpc_hint, &str_buf); find_unused_hint_to_buf(hstate->predpush_same_level_hint, &str_buf); + find_unused_hint_to_buf(hstate->rel_dop_hint, &str_buf); /* for skew hint */ ListCell* lc = NULL; @@ -1072,6 +1099,23 @@ static void SkewHintTransfDelete(SkewHintTransf* hint) pfree_ext(hint); } +/* + * @Description: Delete parallel hint. + * @in hint: parallel hint. + */ +static void ParallelHintDelete(ParallelHint* hint) +{ + if (hint == NULL) { + return; + } + + HINT_FREE_RELNAMES(hint); + + bms_free(hint->relid); + + pfree_ext(hint); +} + /* * @Description: Delete hint, call different delete function according to type. * @in hint: Deleted hint. @@ -1114,6 +1158,9 @@ void hintDelete(Hint* hint) case T_GatherHint: GatherHintDelete((GatherHint*)hint); break; + case T_ParallelHint: + ParallelHintDelete((ParallelHint*)hint); + break; default: elog(WARNING, "unrecognized hint method: %d", (int)nodeTag(hint)); break; @@ -1185,6 +1232,11 @@ void HintStateDelete(HintState* hintState) PredpushSameLevelHint* hint = (PredpushSameLevelHint*)lfirst(lc); PredpushSameLevelHintDelete(hint); } + + foreach (lc,hintState->rel_dop_hint) { + ParallelHint* hint = (ParallelHint*)lfirst(lc); + ParallelHintDelete(hint); + } } /* @@ -1215,6 +1267,7 @@ HintState* HintStateCreate() hstate->no_expand_hint = NIL; hstate->sql_ignore_hint = false; hstate->from_sql_patch = false; + hstate->rel_dop_hint = NIL; return hstate; } @@ -1368,6 +1421,11 @@ static void AddSetHint(HintState* hstate, Hint* hint) hstate->set_hint = lappend(hstate->set_hint, hint); } +static void AddParallelHint(HintState* hstate, Hint* hint) +{ + hstate->rel_dop_hint = lappend(hstate->rel_dop_hint, hint); +} + static void AddPlanCacheHint(HintState* hstate, Hint* hint) { hstate->cache_plan_hint = lappend(hstate->cache_plan_hint, hint); @@ -1413,6 +1471,7 @@ const AddHintFunc G_HINT_CREATOR[HINT_NUM] = { AddPlanCacheHint, AddNoExpandHint, AddSqlIgnoreHint, + AddParallelHint, AddNoGPCHint, }; @@ -1570,7 +1629,7 @@ static Relids create_bms_of_relids(PlannerInfo* root, Query* parse, Hint* hint, /* For skew hint we will found relation from parse`s rtable. */ bool find_in_rtable = false; - if (IsA(hint, SkewHint) || IsA(hint, PredpushHint) || IsA(hint, PredpushSameLevelHint)) { + if (IsA(hint, SkewHint) || IsA(hint, PredpushHint) || IsA(hint, PredpushSameLevelHint) || IsA(hint, ParallelHint)) { find_in_rtable = true; } @@ -1668,6 +1727,9 @@ static List* set_hint_relids(PlannerInfo* root, Query* parse, List* l) case T_PredpushSameLevelHint: ((PredpushSameLevelHint*)hint)->candidates = relids; break; + case T_ParallelHint: + ((ParallelHint*)hint)->relid = relids; + break; default: break; } @@ -1710,6 +1772,32 @@ ScanMethodHint* find_scan_hint(HintState* hstate, Relids relid, HintKeyword keyW return NULL; } +/* + * @Description: Find parallel hint according to relid and hint_key_word. + * @in hstate: HintState. + * @in relid: Relids. + * @in keyWord: Hint key word. + * @return: Parallel hint or Null. + */ +ParallelHint* find_parallel_hint(HintState* hstate, Relids relid) +{ + if (hstate == NULL) { + return NULL; + } + + ListCell* l = NULL; + + foreach (l, hstate->rel_dop_hint) { + ParallelHint* parallelHint = (ParallelHint*)lfirst(l); + + if (bms_equal(parallelHint->relid, relid)) { + return parallelHint; + } + } + + return NULL; +} + /* * @Descriptoion: Find join hint according to relid and join hint key word. * @in hstate: HintState. @@ -3633,6 +3721,7 @@ void transform_hints(PlannerInfo* root, Query* parse, HintState* hstate) hstate->skew_hint = set_hint_relids(root, parse, hstate->skew_hint); hstate->predpush_hint = set_hint_relids(root, parse, hstate->predpush_hint); hstate->predpush_same_level_hint = set_hint_relids(root, parse, hstate->predpush_same_level_hint); + hstate->rel_dop_hint = set_hint_relids(root, parse, hstate->rel_dop_hint); transform_leading_hint(root, parse, hstate); @@ -3958,3 +4047,29 @@ bool CheckNodeNameHint(HintState* hintstate) } return false; } + +bool is_parallel_rel(int x, HintState* hstate) +{ + /* if no hint exists, default parallel */ + if (hstate == NULL) { + return false; + } + /* if no rel dop hint exists, default parallel */ + if (hstate->rel_dop_hint == NULL) { + return false; + } + + /* if join rel, default parallel */ + if (x==0) { + return false; + } + + ListCell* lc = NULL; + foreach (lc,hstate->rel_dop_hint) { + ParallelHint* hint = (ParallelHint*)lfirst(lc); + if (bms_is_member(x, hint->relid)) { + return true; + } + } + return false; +} \ No newline at end of file diff --git a/src/common/backend/utils/adt/ruleutils.cpp b/src/common/backend/utils/adt/ruleutils.cpp index 0f8b670b9a..e14fc87fa8 100644 --- a/src/common/backend/utils/adt/ruleutils.cpp +++ b/src/common/backend/utils/adt/ruleutils.cpp @@ -6531,6 +6531,7 @@ void get_hint_string(HintState* hstate, StringInfo buf) get_hint_string_internal(hstate->gather_hint, buf); get_hint_string_internal(hstate->cache_plan_hint, buf); get_hint_string_internal(hstate->set_hint, buf); + get_hint_string_internal(hstate->rel_dop_hint, buf); get_hint_string_internal(hstate->no_expand_hint, buf); get_hint_string_internal(hstate->no_gpc_hint, buf); foreach (lc, hstate->skew_hint) { diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index c0c6072463..e8f4489ab6 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -222,6 +222,7 @@ const uint32 PARTIALPUSH_VERSION_NUM = 92087; const uint32 CURSOR_EXPRESSION_VERSION_NUMBER = 92935; const uint32 FLOAT_VERSION_NUMBER = 92938; +const uint32 PARALLEL_HINT_VERSION_NUM = 92943; /* This variable indicates wheather the instance is in progress of upgrade as a whole */ diff --git a/src/common/backend/utils/misc/guc/guc_sql.cpp b/src/common/backend/utils/misc/guc/guc_sql.cpp index c6e45d8e57..04698fdea1 100755 --- a/src/common/backend/utils/misc/guc/guc_sql.cpp +++ b/src/common/backend/utils/misc/guc/guc_sql.cpp @@ -3302,7 +3302,7 @@ static bool parse_query_dop(int* newval, void** extra, GucSource source) /* should not reach here */ ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("Current degree of parallelism can only be set within [-64,64]"))); + errmsg("Current degree of parallelism can only be set within [0,64]"))); } #else @@ -3317,7 +3317,7 @@ static bool parse_query_dop(int* newval, void** extra, GucSource source) } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("Current degree of parallelism can only be set within [-64,64]"))); + errmsg("Current degree of parallelism can only be set within [0,64]"))); } #endif diff --git a/src/gausskernel/optimizer/commands/explain.cpp b/src/gausskernel/optimizer/commands/explain.cpp index 8a801e9176..83ac4224cf 100755 --- a/src/gausskernel/optimizer/commands/explain.cpp +++ b/src/gausskernel/optimizer/commands/explain.cpp @@ -719,6 +719,9 @@ static void ExplainOneQuery( * Temporarily apply SET hint using PG_TRY for later recovery */ int nest_level = apply_set_hint(query); + if (query->hintState && query->hintState->rel_dop_hint != NULL && u_sess->parser_cxt.hint_dop > 1) { + apply_hint_query_dop(&(u_sess->parser_cxt.hint_dop)); + } AFTER_EXPLAIN_APPLY_SET_HINT(); PG_TRY(); { diff --git a/src/gausskernel/optimizer/path/allpaths.cpp b/src/gausskernel/optimizer/path/allpaths.cpp index 41f8e196a8..b2103eb484 100755 --- a/src/gausskernel/optimizer/path/allpaths.cpp +++ b/src/gausskernel/optimizer/path/allpaths.cpp @@ -1181,11 +1181,15 @@ static void set_plain_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblE add_path(root, rel, create_tsstorescan_path(root, rel)); if (can_parallel) add_path(root, rel, create_tsstorescan_path(root, rel, u_sess->opt_cxt.query_dop)); + if (rel->rel_dop > 1) + add_path(root, rel, create_tsstorescan_path(root, rel, rel->rel_dop)); #endif /* ENABLE_MULTIPLE_NODES */ } else { add_path(root, rel, create_cstorescan_path(root, rel)); if (can_parallel) add_path(root, rel, create_cstorescan_path(root, rel, u_sess->opt_cxt.query_dop)); + if (rel->rel_dop > 1) + add_path(root, rel, create_cstorescan_path(root, rel, rel->rel_dop)); } break; } @@ -1193,6 +1197,8 @@ static void set_plain_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblE add_path(root, rel, create_seqscan_path(root, rel, required_outer)); if (can_parallel) add_path(root, rel, create_seqscan_path(root, rel, required_outer, u_sess->opt_cxt.query_dop)); + if (rel->rel_dop > 1) + add_path(root, rel, create_seqscan_path(root, rel, required_outer, rel->rel_dop)); break; } default: { @@ -2878,7 +2884,7 @@ set_subquery_path(PlannerInfo *root, RelOptInfo *rel, /* Generate the plan for the subquery */ rel->subplan = subquery_planner( - root->glob, subquery, root, false, tuple_fraction, &subroot, options, &rel->rel_dis_keys, rel->baserestrictinfo); + root->glob, subquery, root, false, tuple_fraction, &subroot, options, &rel->rel_dis_keys, rel); rel->subroot = subroot; /* Isolate the params needed by this specific subplan */ rel->subplan_params = root->plan_params; diff --git a/src/gausskernel/optimizer/path/joinpath.cpp b/src/gausskernel/optimizer/path/joinpath.cpp index b951ae1b45..f3e3efd337 100755 --- a/src/gausskernel/optimizer/path/joinpath.cpp +++ b/src/gausskernel/optimizer/path/joinpath.cpp @@ -597,6 +597,8 @@ static void try_nestloop_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j return; } + int path_dop = CreateStreamGatherPaths(root, &outer_path, &inner_path); + /* * Do a precheck to quickly eliminate obviously-inferior paths. We * calculate a cheap lower bound on the path's cost and then use @@ -607,7 +609,7 @@ static void try_nestloop_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j * methodology worthwhile. * Note: smp does not support parameterized paths. */ - int max_dop = (required_outer != NULL) ? 1 : u_sess->opt_cxt.query_dop; + int max_dop = (required_outer != NULL) ? 1 : Max(u_sess->opt_cxt.query_dop, path_dop); initial_cost_nestloop(root, &workspace, jointype, outer_path, inner_path, extra, max_dop); if (add_path_precheck(joinrel, workspace.startup_cost, workspace.total_cost, pathkeys, required_outer) || @@ -644,6 +646,8 @@ static void try_nestloop_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j if (u_sess->opt_cxt.query_dop > 1) nlpgen->addNestLoopPath(&cur_workspace, distribution, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + nlpgen->addNestLoopPath(&cur_workspace, distribution, path_dop); } else { #ifdef ENABLE_MULTIPLE_NODES /* @@ -670,6 +674,8 @@ static void try_nestloop_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j if (u_sess->opt_cxt.query_dop > 1) nlpgen->addNestLoopPath(&cur_workspace, distribution, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + nlpgen->addNestLoopPath(&cur_workspace, distribution, path_dop); } #else JoinCostWorkspace cur_workspace; @@ -679,6 +685,8 @@ static void try_nestloop_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j if (u_sess->opt_cxt.query_dop > 1) nlpgen->addNestLoopPath(&cur_workspace, NULL, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + nlpgen->addNestLoopPath(&cur_workspace, NULL, path_dop); #endif } @@ -799,6 +807,8 @@ static void try_mergejoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType return; } + int path_dop = CreateStreamGatherPaths(root, &outer_path, &inner_path); + /* * If the given paths are already well enough ordered, we can skip doing * an explicit sort. @@ -849,6 +859,8 @@ static void try_mergejoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType mjpgen->addMergeJoinPath(&cur_workspace, distribution, 1); if (u_sess->opt_cxt.query_dop > 1) mjpgen->addMergeJoinPath(&cur_workspace, distribution, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + mjpgen->addMergeJoinPath(&cur_workspace, distribution, path_dop); } else { #ifdef ENABLE_MULTIPLE_NODES /* @@ -874,6 +886,8 @@ static void try_mergejoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType mjpgen->addMergeJoinPath(&cur_workspace, distribution, 1); if (u_sess->opt_cxt.query_dop > 1) mjpgen->addMergeJoinPath(&cur_workspace, distribution, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + mjpgen->addMergeJoinPath(&cur_workspace, distribution, path_dop); } #else JoinCostWorkspace cur_workspace; @@ -882,6 +896,8 @@ static void try_mergejoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType mjpgen->addMergeJoinPath(&cur_workspace, NULL, 1); if (u_sess->opt_cxt.query_dop > 1) mjpgen->addMergeJoinPath(&cur_workspace, NULL, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + mjpgen->addMergeJoinPath(&cur_workspace, NULL, path_dop); #endif } @@ -978,6 +994,8 @@ static void try_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j return; } + int path_dop = CreateStreamGatherPaths(root, &outer_path, &inner_path); + /* * See comments in try_nestloop_path(). Also note that hashjoin paths * never have any output pathkeys, per comments in create_hashjoin_path. @@ -985,7 +1003,7 @@ static void try_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j */ int max_dop = ((required_outer != NULL) || (u_sess->opt_cxt.query_dop <= 4 && u_sess->opt_cxt.max_query_dop >= 0)) ? 1 - : u_sess->opt_cxt.query_dop; + : Max(u_sess->opt_cxt.query_dop, path_dop); initial_cost_hashjoin( root, &workspace, jointype, hashclauses, outer_path, inner_path, extra, max_dop); @@ -1021,6 +1039,8 @@ static void try_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j hjpgen->addHashJoinPath(&cur_workspace, distribution, 1); if (u_sess->opt_cxt.query_dop > 1) hjpgen->addHashJoinPath(&cur_workspace, distribution, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + hjpgen->addHashJoinPath(&cur_workspace, distribution, path_dop); } else { #ifdef ENABLE_MULTIPLE_NODES /* @@ -1046,6 +1066,8 @@ static void try_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j hjpgen->addHashJoinPath(&cur_workspace, distribution, 1); if (u_sess->opt_cxt.query_dop > 1) hjpgen->addHashJoinPath(&cur_workspace, distribution, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + hjpgen->addHashJoinPath(&cur_workspace, distribution, path_dop); } #else JoinCostWorkspace cur_workspace; @@ -1053,6 +1075,8 @@ static void try_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j hjpgen->addHashJoinPath(&cur_workspace, NULL, 1); if (u_sess->opt_cxt.query_dop > 1) hjpgen->addHashJoinPath(&cur_workspace, NULL, u_sess->opt_cxt.query_dop); + else if (path_dop > 1) + hjpgen->addHashJoinPath(&cur_workspace, NULL, path_dop); #endif } diff --git a/src/gausskernel/optimizer/path/streampath_base.cpp b/src/gausskernel/optimizer/path/streampath_base.cpp index 5bf2d10f05..485c21ccc3 100755 --- a/src/gausskernel/optimizer/path/streampath_base.cpp +++ b/src/gausskernel/optimizer/path/streampath_base.cpp @@ -159,7 +159,8 @@ JoinPathGenBase::JoinPathGenBase(PlannerInfo* root, RelOptInfo* joinrel, JoinTyp m_redistributeInner(false), m_redistributeOuter(false), m_canBroadcastInner(false), - m_canBroadcastOuter(false) + m_canBroadcastOuter(false), + m_canParallel(false) { init(); } @@ -231,6 +232,7 @@ void JoinPathGenBase::init() can_broadcast_inner(m_jointype, m_saveJointype, m_replicateOuter, m_distributeKeysOuter, m_outerPath); m_canBroadcastOuter = can_broadcast_outer(m_jointype, m_saveJointype, m_replicateInner, m_distributeKeysInner, m_innerPath); + m_canParallel = (m_innerRel->rel_dop >1 && m_outerRel->rel_dop == m_innerRel->rel_dop); m_streamInfoList = NIL; m_streamInfoPair = NULL; @@ -334,7 +336,7 @@ const bool JoinPathGenBase::isParallelEnable() if (m_innerPath->param_info != NULL || m_outerPath->param_info != NULL) return false; - if (u_sess->opt_cxt.query_dop > 1 && IS_STREAM_PLAN) + if ((u_sess->opt_cxt.query_dop > 1 || m_canParallel) && IS_STREAM_PLAN) return true; else return false; @@ -735,8 +737,8 @@ void JoinPathGenBase::addStreamParallelInfo() List* tmp_list = m_streamInfoList; ListCell* lc = NULL; - /* only try smp path when u_sess->opt_cxt.query_dop > 1 */ - if (u_sess->opt_cxt.query_dop <= 1) + /* only try smp path when u_sess->opt_cxt.query_dop > 1 or hint specified */ + if (u_sess->opt_cxt.query_dop <= 1 && !m_canParallel) return; /* Try to add parallel info to spare stream info, only keep suitable stream info. */ diff --git a/src/gausskernel/optimizer/plan/createplan.cpp b/src/gausskernel/optimizer/plan/createplan.cpp index 2e25e066fc..45f8739874 100755 --- a/src/gausskernel/optimizer/plan/createplan.cpp +++ b/src/gausskernel/optimizer/plan/createplan.cpp @@ -464,6 +464,10 @@ Plan* create_stream_plan(PlannerInfo* root, StreamPath* best_path) subplan = create_plan_recurse(root, best_path->subpath); + if (subplan->dop > 1 && IS_STREAM_TYPE(best_path, STREAM_GATHER)) { + return create_local_gather(subplan); + } + if (is_execute_on_coordinator(subplan)) { return subplan; } @@ -10118,8 +10122,8 @@ Plan* make_redistribute_for_agg(PlannerInfo* root, Plan* lefttree, List* redistr plan->multiple = multiple; if (lefttree->dop > 1) { - stream->smpDesc.producerDop = lefttree->dop > 1 ? lefttree->dop : 1; - stream->smpDesc.consumerDop = u_sess->opt_cxt.query_dop; + stream->smpDesc.producerDop = SET_DOP(lefttree->dop); + stream->smpDesc.consumerDop = u_sess->opt_cxt.query_dop > 1 ? u_sess->opt_cxt.query_dop : SET_DOP(lefttree->dop); #ifdef ENABLE_MULTIPLE_NODES stream->smpDesc.distriType = is_local_redistribute ? LOCAL_DISTRIBUTE : REMOTE_SPLIT_DISTRIBUTE; #else diff --git a/src/gausskernel/optimizer/plan/planner.cpp b/src/gausskernel/optimizer/plan/planner.cpp index eac0ad1ed0..d059878ed8 100755 --- a/src/gausskernel/optimizer/plan/planner.cpp +++ b/src/gausskernel/optimizer/plan/planner.cpp @@ -688,6 +688,9 @@ PlannedStmt* standard_planner(Query* parse, int cursorOptions, ParamListInfo bou init_optimizer_context(glob); old_context = MemoryContextSwitchTo(glob->plannerContext->plannerMemContext); + glob->max_dop = u_sess->parser_cxt.hint_dop; + u_sess->parser_cxt.hint_dop = -1; + /* primary planning entry point (may recurse for subqueries) */ top_plan = subquery_planner(glob, parse, NULL, false, tuple_fraction, &root); @@ -1242,6 +1245,22 @@ int apply_set_hint(const Query* parse) return gucNestLevel; } +void apply_hint_query_dop(int* max_dop) +{ + if(*max_dop > MAX_QUERY_DOP) { + *max_dop = MAX_QUERY_DOP; + } + + set_config_option("query_dop", + "1", + PGC_USERSET, /* for now set hint only support */ + PGC_S_SESSION, /* session-level userset guc */ + GUC_ACTION_SAVE, /* need to rollback later */ + true, + WARNING, + false); +} + void recover_set_hint(int savedNestLevel) { if (savedNestLevel < 0) { @@ -1332,7 +1351,7 @@ static inline bool contain_placeholdervar(Node *var_list) * -------------------- */ Plan* subquery_planner(PlannerGlobal* glob, Query* parse, PlannerInfo* parent_root, bool hasRecursion, - double tuple_fraction, PlannerInfo** subroot, int options, ItstDisKey* diskeys, List* subqueryRestrictInfo) + double tuple_fraction, PlannerInfo** subroot, int options, ItstDisKey* diskeys, RelOptInfo* parent_rel) { int num_old_subplans = list_length(glob->subplans); PlannerInfo* root = NULL; @@ -1365,6 +1384,7 @@ Plan* subquery_planner(PlannerGlobal* glob, Query* parse, PlannerInfo* parent_ro root->glob = glob; root->query_level = parent_root ? parent_root->query_level + 1 : 1; root->parent_root = parent_root; + root->parent_rel = parent_rel; root->plan_params = NIL; root->planner_cxt = CurrentMemoryContext; root->init_plans = NIL; @@ -1421,7 +1441,10 @@ Plan* subquery_planner(PlannerGlobal* glob, Query* parse, PlannerInfo* parent_ro root->wt_param_id = -1; root->qualSecurityLevel = 0; root->non_recursive_plan = NULL; - root->subqueryRestrictInfo = subqueryRestrictInfo; + + if (parent_rel) { + root->subqueryRestrictInfo = parent_rel->baserestrictinfo; + } /* Mark current planner root is correlated as well */ if (parent_root != NULL && parent_root->is_under_recursive_cte && parent_root->is_correlated) { @@ -4800,7 +4823,7 @@ static Plan* internal_grouping_planner(PlannerInfo* root, double tuple_fraction) * Don't add gather for non-select statement. */ if (IS_STREAM_PLAN && (!IsConnFromCoord()) && root->query_level == 1 && (parse->commandType == CMD_SELECT) && - is_execute_on_datanodes(result_plan)) { + is_execute_on_datanodes(result_plan) && result_plan->dop > 1) { bool single_node = (result_plan->exec_nodes != NULL && list_length(result_plan->exec_nodes->nodeList) == 1); diff --git a/src/gausskernel/optimizer/plan/streamplan.cpp b/src/gausskernel/optimizer/plan/streamplan.cpp index 354819530e..1d1d0be1a8 100644 --- a/src/gausskernel/optimizer/plan/streamplan.cpp +++ b/src/gausskernel/optimizer/plan/streamplan.cpp @@ -1111,6 +1111,28 @@ void CreateGatherPaths(PlannerInfo* root, RelOptInfo* rel, bool isJoin) set_cheapest(rel); } +/* + * Create Stream Gather Paths based on parallel subpath in join path + * if both outer_path and inner_path both are parallel, no need to + * create local gather, so return the dop. + * + */ +int CreateStreamGatherPaths(PlannerInfo* root,Path** outer_path, Path** inner_path) +{ + if((*outer_path)->dop >1 && (*inner_path)->dop <=1) { + *outer_path = create_stream_path(root, (*outer_path)->parent, STREAM_GATHER, NIL, NIL, (*outer_path), 1.0); + return 1; + } + + if((*inner_path)->dop >1 && (*outer_path)->dop <=1) { + *inner_path = create_stream_path(root, (*inner_path)->parent, STREAM_GATHER, NIL, NIL, (*inner_path), 1.0); + return 1; + } + + return Max((*outer_path)->dop, (*inner_path)->dop); + +} + /* * Check if node is modifyTable for DFS table. diff --git a/src/gausskernel/optimizer/plan/streamplan_single.cpp b/src/gausskernel/optimizer/plan/streamplan_single.cpp index 3c87e53e5b..e31bcbc63d 100644 --- a/src/gausskernel/optimizer/plan/streamplan_single.cpp +++ b/src/gausskernel/optimizer/plan/streamplan_single.cpp @@ -84,7 +84,7 @@ void set_default_stream() * And u_sess->stream_cxt.global_obj != NULL means outer query already init * stream object, do not use smp in inner query. */ - u_sess->opt_cxt.is_stream = (u_sess->opt_cxt.query_dop > 1 && + u_sess->opt_cxt.is_stream = ((u_sess->opt_cxt.query_dop > 1 || u_sess->parser_cxt.hint_dop > 1) && u_sess->opt_cxt.smp_enabled && u_sess->stream_cxt.global_obj == NULL); u_sess->opt_cxt.is_stream_support = u_sess->opt_cxt.is_stream; diff --git a/src/gausskernel/optimizer/prep/prepjointree.cpp b/src/gausskernel/optimizer/prep/prepjointree.cpp index dbe1bb830a..ca11e3317e 100755 --- a/src/gausskernel/optimizer/prep/prepjointree.cpp +++ b/src/gausskernel/optimizer/prep/prepjointree.cpp @@ -1040,6 +1040,7 @@ void pull_up_subquery_hint(PlannerInfo* root, Query* parse, HintState* hint_stat parse->hintState->scan_hint = list_concat(parse->hintState->scan_hint, hint_state->scan_hint); parse->hintState->hint_warning = list_concat(parse->hintState->hint_warning, hint_state->hint_warning); parse->hintState->skew_hint = list_concat(parse->hintState->skew_hint, hint_state->skew_hint); + parse->hintState->rel_dop_hint = list_concat(parse->hintState->rel_dop_hint, hint_state->rel_dop_hint); parse->hintState->nall_hints = parse->hintState->nall_hints + hint_state->nall_hints; parse->hintState->multi_node_hint = hint_state->multi_node_hint; parse->hintState->sql_ignore_hint = hint_state->sql_ignore_hint; diff --git a/src/gausskernel/optimizer/util/pathnode.cpp b/src/gausskernel/optimizer/util/pathnode.cpp index 8a5a43f98b..458d173cb7 100755 --- a/src/gausskernel/optimizer/util/pathnode.cpp +++ b/src/gausskernel/optimizer/util/pathnode.cpp @@ -1211,6 +1211,53 @@ static void set_scan_hint(Path* new_path, HintState* hstate) } } +/* + * @Description: Find parallel hint and set hint_value. + * @in new_path: New path. + * @in root: Planner info root. + */ +static void set_parallel_hint_value(Path* new_path, PlannerInfo* root) +{ + + if (new_path->parent!= NULL && new_path->dop == new_path->parent->rel_dop) { + new_path->hint_value++; + } + + HintState* hstate = root->parse->hintState; + + if (hstate == NULL || hstate->rel_dop_hint == NIL) { + return; + } + + ParallelHint* parallelHint = NULL; + bool has_stream = false; + + if (IsA(new_path,SubqueryScanPath)) { + /* only check if there's stream node */ + ContainStreamContext context; + context.outer_relids = NULL; + context.upper_params = NULL; + context.only_check_stream = true; + context.under_materialize_all = false; + context.has_stream = false; + context.has_parameterized_path = false; + context.has_cstore_index_delta = false; + + stream_path_walker(new_path, &context); + if (context.has_stream) { + has_stream = true; + } + } + + if (new_path->dop == new_path->parent->rel_dop || has_stream) { + parallelHint = find_parallel_hint(hstate, new_path->parent->relids); + } + + if (parallelHint != NULL) { + parallelHint->base.state = HINT_STATE_USED; + } +} + /* * @Description: Set path's hint kewword. * @in join_rel: Join relation information. @@ -1321,9 +1368,10 @@ static void set_predpush_same_level_hint(HintState* hstate, RelOptInfo* rel, Pat /* * @Description: Set hint values to this new path. - * @in join_rel: Join relition. + * @in join_rel: Join relation. * @in new_path: New path. * @in hstate: Hint state. + * @in parent_rel: Parent relation. */ void set_hint_value(RelOptInfo* join_rel, Path* new_path, HintState* hstate) { @@ -1335,6 +1383,7 @@ void set_hint_value(RelOptInfo* join_rel, Path* new_path, HintState* hstate) set_scan_hint(new_path, hstate); + /* Deal with join path. */ if (IsA(new_path, NestPath) || IsA(new_path, MergePath) || IsA(new_path, HashPath)) { JoinPath* join_path = (JoinPath*)new_path; @@ -1634,6 +1683,11 @@ void add_path(PlannerInfo* root, RelOptInfo* parent_rel, Path* new_path) set_index_hint_value(new_path, root->parse->indexhintList); } + /* Set path's parallel_hint */ + if (root != NULL) { + set_parallel_hint_value(new_path, root); + } + /* we will add cn gather path when cn gather hint switch on */ if (root != NULL && EXEC_CONTAIN_COORDINATOR(new_path->exec_type) && permit_gather(root)) { RangeTblEntry* rte = root->simple_rte_array[parent_rel->relid]; diff --git a/src/gausskernel/optimizer/util/relnode.cpp b/src/gausskernel/optimizer/util/relnode.cpp index 3dd1e2bd2f..90f86cdac4 100755 --- a/src/gausskernel/optimizer/util/relnode.cpp +++ b/src/gausskernel/optimizer/util/relnode.cpp @@ -55,6 +55,7 @@ static void build_joinrel_itst_diskeys( PlannerInfo* root, RelOptInfo* joinrel, RelOptInfo* outerrel, RelOptInfo* innerrel, JoinType jointype); static void add_eqjoin_diskey_for_ec( const RelOptInfo* rel, const RelOptInfo* siderel, const EquivalenceClass* ec, List** join_diskey_list, List** relid_list); +static int compute_rel_dop(PlannerInfo* root, int relid); /* * * Description: For the function of build/drop/delete UDF, we need not to cache in hash @@ -245,6 +246,7 @@ RelOptInfo* build_simple_rel(PlannerInfo* root, int relid, RelOptKind reloptkind rel->joininfo = NIL; rel->has_eclass_joins = false; rel->varratio = NIL; + rel->rel_dop = compute_rel_dop(root, rel->relid); #ifdef STREAMPLAN if (rel->rtekind == RTE_RELATION) { #ifndef ENABLE_MULTIPLE_NODES @@ -738,6 +740,7 @@ RelOptInfo* build_join_rel(PlannerInfo* root, Relids joinrelids, RelOptInfo* out joinrel->joininfo = NIL; joinrel->has_eclass_joins = false; joinrel->varratio = NIL; + joinrel->rel_dop = compute_rel_dop(root,joinrel->relid); if (IsLocatorReplicated(inner_rel->locator_type) && IsLocatorReplicated(outer_rel->locator_type)) { joinrel->locator_type = LOCATOR_TYPE_REPLICATED; } else { @@ -1734,3 +1737,29 @@ List* remove_duplicate_superset_keys(List* superset_key_list) list_free_ext(superset_key_list); return final_dis_key_list; } + +/* + * compute_rel_dop + * when parent_rel was a target of parallel hint, the + * whole subquery should all be parallel. If not, the + * parallel hint of subquery itself should work. + * Parameters: + * @in root: PlannerInfo of current query + * @in relid: index id of rel (0 for join rel) + * Return: + * final dop of current rel + */ +static int compute_rel_dop(PlannerInfo* root, int relid) +{ + int parent_dop = -1; + int rel_dop = -1; + + if (root->parent_rel) + parent_dop = root->parent_rel->rel_dop; + + if (is_parallel_rel(relid, root->parse->hintState)) { + rel_dop = root->glob->max_dop; + } + + return Max(rel_dop, parent_dop); +} diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index b364d40f8b..0bcfac6e56 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -1528,6 +1528,9 @@ List* pg_plan_queries(List* querytrees, int cursorOptions, ParamListInfo boundPa #endif /* Temporarily apply SET hint using PG_TRY for later recovery */ int nest_level = apply_set_hint(query); + if (query->hintState && query->hintState->rel_dop_hint != NULL && u_sess->parser_cxt.hint_dop > 1) { + apply_hint_query_dop(&(u_sess->parser_cxt.hint_dop)); + } PG_TRY(); { stmt = (Node*)pg_plan_query(query, cursorOptions, boundParams); diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 3cc387b5ad..a3b2a12433 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -281,6 +281,7 @@ static void knl_u_parser_init(knl_u_parser_context* parser_cxt) parser_cxt->isForbidTruncate = false; parser_cxt->isPerform = false; parser_cxt->stmt = NULL; + parser_cxt->hint_dop = -1; } static void knl_u_advisor_init(knl_u_advisor_context* adv_cxt) diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 0f078c1447..a323664cc7 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -464,6 +464,8 @@ typedef struct knl_u_parser_context { bool has_equal_uservar; bool is_straight_join; int cursor_expr_level; + + int hint_dop; } knl_u_parser_context; typedef struct knl_u_trigger_context { diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index fa1aae6793..031ed73dd2 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -156,6 +156,7 @@ extern const uint32 AUDIT_SHA_VERSION_NUM; extern const uint32 PARTITION_NAME_VERSION_NUM; extern const uint32 MINMAXEXPR_CMPTYPE_VERSION_NUM; extern const uint32 CHARBYTE_SEMANTIC_VERSION_NUMBER; +extern const uint32 PARALLEL_HINT_VERSION_NUM; extern void register_backend_version(uint32 backend_version); extern bool contain_backend_version(uint32 version_number); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 447e758ba0..0f2255c451 100755 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -827,6 +827,7 @@ typedef enum NodeTag { T_PlanCacheHint, T_NoExpandHint, T_SqlIgnoreHint, + T_ParallelHint, T_NoGPCHint, /* * pgfdw diff --git a/src/include/nodes/parsenodes_common.h b/src/include/nodes/parsenodes_common.h index b818ba8fa6..8f6128bac4 100644 --- a/src/include/nodes/parsenodes_common.h +++ b/src/include/nodes/parsenodes_common.h @@ -463,6 +463,7 @@ typedef struct HintState { List* no_gpc_hint; /* supress saving to global plan cache */ bool sql_ignore_hint; /* hint of keyword ignore in SQL*/ bool from_sql_patch; /* generated by sql patch */ + List* rel_dop_hint; /* hint for parallel relations */ } HintState; /* ---------------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 855c207da8..5c3f25fd9d 100755 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -260,6 +260,8 @@ typedef struct PlannerGlobal { /* There is a counter attempt to get name for sublinks */ int sublink_counter; + + int max_dop; #ifdef USE_SPQ ApplyShareInputContext share; /* workspace for GPDB plan sharing */ #endif @@ -309,6 +311,8 @@ typedef struct PlannerInfo { struct PlannerInfo* parent_root; /* NULL at outermost Query */ + struct RelOptInfo* parent_rel; /* NULL at outermost Query or sublink */ + /* * simple_rel_array holds pointers to "base rels" and "other rels" (see * comments for RelOptInfo for more info). It is indexed by rangetable @@ -868,6 +872,8 @@ typedef struct RelOptInfo { List* partial_pathlist; /* partial Paths */ int cursorDop; + + int rel_dop = -1; } RelOptInfo; /* diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index b613082a46..c21cafd9f4 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -73,7 +73,7 @@ typedef Plan* (*grouping_plannerFunc)(PlannerInfo* root, double tuple_fraction); extern Plan* subquery_planner(PlannerGlobal* glob, Query* parse, PlannerInfo* parent_root, bool hasRecursion, double tuple_fraction, PlannerInfo** subroot, int options = SUBQUERY_NORMAL, ItstDisKey* diskeys = NULL, - List* subqueryRestrictInfo = NIL); + RelOptInfo* parent_rel = NULL); extern void add_tlist_costs_to_plan(PlannerInfo* root, Plan* plan, List* tlist); @@ -93,6 +93,8 @@ extern void preprocess_qual_conditions(PlannerInfo* root, Node* jtnode); extern int apply_set_hint(const Query* parse); +extern void apply_hint_query_dop(int* max_dop); + extern void recover_set_hint(int savedNestLevel); typedef enum { diff --git a/src/include/optimizer/streampath.h b/src/include/optimizer/streampath.h index 6e21d24a39..a655300400 100644 --- a/src/include/optimizer/streampath.h +++ b/src/include/optimizer/streampath.h @@ -228,6 +228,9 @@ protected: /* If we can broadcast the outer side. */ bool m_canBroadcastOuter; + + /* If we can parallel by hint */ + bool m_canParallel; }; class JoinPathGen : public JoinPathGenBase { diff --git a/src/include/optimizer/streamplan.h b/src/include/optimizer/streamplan.h index ce01de91a0..be575d1e82 100644 --- a/src/include/optimizer/streamplan.h +++ b/src/include/optimizer/streamplan.h @@ -140,6 +140,7 @@ extern bool is_gather_stream(Stream* stream); extern bool is_hybid_stream(Stream* stream); extern void mark_distribute_setop(PlannerInfo* root, Node* node, bool isunionall, bool canDiskeyChange); extern void CreateGatherPaths(PlannerInfo* root, RelOptInfo* rel, bool is_join); +extern int CreateStreamGatherPaths(PlannerInfo* root,Path** outer_path, Path** inner_path); extern void foreign_qual_context_init(foreign_qual_context* context); extern void foreign_qual_context_free(foreign_qual_context* context); diff --git a/src/include/parser/parse_hint.h b/src/include/parser/parse_hint.h index 9135928b57..4cbf9d06cf 100644 --- a/src/include/parser/parse_hint.h +++ b/src/include/parser/parse_hint.h @@ -59,6 +59,7 @@ #define HINT_CPLAN "Use_cplan" #define HINT_GPLAN "Use_gplan" #define HINT_CHOOSE_ADAPTIVE_GPLAN "Choose_adaptive_gplan" +#define HINT_PARALLEL "Parallel" #define HINT_NO_GPC "No_gpc" #define HINT_SQL_IGNORE "Ignore_error" @@ -110,6 +111,7 @@ typedef enum HintKeyword { HINT_KEYWORD_GPLAN, HINT_KEYWORD_IGNORE, HINT_KEYWORD_CHOOSE_ADAPTIVE_GPLAN, + HINT_KEYWORD_PARALLEL, HINT_KEYWORD_NO_GPC, } HintKeyword; @@ -279,6 +281,13 @@ typedef struct SetHint { char* value; } SetHint; +typedef struct ParallelHint { + Hint base; /* base hint */ + char* name; + char* value; + Relids relid; /* parallel relation relids */ +} ParallelHint; + typedef struct PlanCacheHint { Hint base; /* base hint */ bool chooseCustomPlan; @@ -333,6 +342,8 @@ extern bool has_no_gpc_hint(HintState* hintState); extern void RemoveQueryHintByType(Query* query, HintKeyword hint); extern bool CheckNodeNameHint(HintState* hintstate); +extern bool is_parallel_rel(int x, HintState* hstate); +extern ParallelHint* find_parallel_hint(HintState* hstate, Relids relid); #define skip_space(str) \ while (isspace(*str)) \ diff --git a/src/test/regress/input/parallel_hint.source b/src/test/regress/input/parallel_hint.source new file mode 100644 index 0000000000..b5caa65ede --- /dev/null +++ b/src/test/regress/input/parallel_hint.source @@ -0,0 +1,57 @@ +drop schema if exists parallel_hint cascade; +create schema parallel_hint; +set current_schema='parallel_hint'; + +create table a(a int); +create table b(b int); +create table c(c int); + +explain (costs off) select /*+parallel(2)*/* from a; + + +explain (costs off) select /*+parallel(a 2)*/* from a; + + +explain (costs off) select /*+parallel(a 2)*/* from a order by a; + + +explain (costs off) select /*+parallel(a 2)*/count(*) from a; + + +explain (costs off) select /*+parallel(a 2)*/count(*) from a group by a; + + +explain (costs off) select /*+parallel(a 2)*/* from a,b; + + +explain (costs off) select /*+parallel(a 2)*/* from a,b where a=b; + + +explain (costs off) select /*+parallel(a 2) parallel(b 2)*/* from a,b; + + +explain (costs off) select /*+parallel(v 2)*/* from (select count(*) from a) v; + + +explain (costs off) select /*+parallel(v 2)*/* from (select count(*) from a group by a) v,b; + + +explain (costs off) select /*+parallel(a 2)*/* from (select count(*) from a group by a) v,b; + + +explain (costs off) select * from (select /*+parallel(a 2)*/count(*) from a group by a) v,b; + + +explain (costs off) select /*+parallel(v 2)*/* from (select /*+parallel(a 2)*/count(*) from a group by a) v,b; + + +explain (costs off) select /*+parallel(128)*/* from a; + + +explain (costs off) select /*+parallel(a 128)*/* from a; + + +explain (costs off) select /*+parallel(a 1)*/* from a; + + +drop schema parallel_hint cascade; diff --git a/src/test/regress/output/parallel_hint.source b/src/test/regress/output/parallel_hint.source new file mode 100644 index 0000000000..68a2980e24 --- /dev/null +++ b/src/test/regress/output/parallel_hint.source @@ -0,0 +1,181 @@ +drop schema if exists parallel_hint cascade; +NOTICE: schema "parallel_hint" does not exist, skipping +create schema parallel_hint; +set current_schema='parallel_hint'; +create table a(a int); +create table b(b int); +create table c(c int); +explain (costs off) select /*+parallel(2)*/* from a; + QUERY PLAN +--------------- + Seq Scan on a +(1 row) + +explain (costs off) select /*+parallel(a 2)*/* from a; + QUERY PLAN +---------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/2) + -> Seq Scan on a +(2 rows) + +explain (costs off) select /*+parallel(a 2)*/* from a order by a; + QUERY PLAN +---------------------------------------------- + Sort + Sort Key: a + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Seq Scan on a +(4 rows) + +explain (costs off) select /*+parallel(a 2)*/count(*) from a; + QUERY PLAN +---------------------------------------------- + Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Seq Scan on a +(4 rows) + +explain (costs off) select /*+parallel(a 2)*/count(*) from a group by a; + QUERY PLAN +---------------------------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/2) + -> HashAggregate + Group By Key: a + -> Streaming(type: LOCAL REDISTRIBUTE dop: 2/2) + -> HashAggregate + Group By Key: a + -> Seq Scan on a +(7 rows) + +explain (costs off) select /*+parallel(a 2)*/* from a,b; + QUERY PLAN +---------------------------------------------- + Nested Loop + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Seq Scan on a + -> Materialize + -> Seq Scan on b +(5 rows) + +explain (costs off) select /*+parallel(a 2)*/* from a,b where a=b; + QUERY PLAN +---------------------------------------------- + Hash Join + Hash Cond: (a.a = b.b) + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Seq Scan on a + -> Hash + -> Seq Scan on b +(6 rows) + +explain (costs off) select /*+parallel(a 2) parallel(b 2)*/* from a,b; + QUERY PLAN +------------------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/2) + -> Nested Loop + -> Streaming(type: BROADCAST dop: 2/2) + -> Seq Scan on a + -> Materialize + -> Seq Scan on b +(6 rows) + +explain (costs off) select /*+parallel(v 2)*/* from (select count(*) from a) v; + QUERY PLAN +------------------------------------------- + Aggregate + -> Streaming(type: BROADCAST dop: 1/2) + -> Aggregate + -> Seq Scan on a +(4 rows) + +explain (costs off) select /*+parallel(v 2)*/* from (select count(*) from a group by a) v,b; + QUERY PLAN +---------------------------------------------------------------------------- + Nested Loop + -> Seq Scan on b + -> Materialize + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> GroupAggregate + Group By Key: a.a + -> Sort + Sort Key: a.a + -> Streaming(type: LOCAL REDISTRIBUTE dop: 2/2) + -> GroupAggregate + Group By Key: a.a + -> Sort + Sort Key: a.a + -> Seq Scan on a +(14 rows) + +explain (costs off) select /*+parallel(a 2)*/* from (select count(*) from a group by a) v,b; +WARNING: Error hint: Parallel(a 2), relation name "a" is not found. + QUERY PLAN +--------------------------------------- + Nested Loop + -> Seq Scan on b + -> Materialize + -> Subquery Scan on v + -> HashAggregate + Group By Key: a.a + -> Seq Scan on a +(7 rows) + +explain (costs off) select * from (select /*+parallel(a 2)*/count(*) from a group by a) v,b; + QUERY PLAN +---------------------------------------------------------------------------- + Nested Loop + -> Seq Scan on b + -> Materialize + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> GroupAggregate + Group By Key: a.a + -> Sort + Sort Key: a.a + -> Streaming(type: LOCAL REDISTRIBUTE dop: 2/2) + -> GroupAggregate + Group By Key: a.a + -> Sort + Sort Key: a.a + -> Seq Scan on a +(14 rows) + +explain (costs off) select /*+parallel(v 2)*/* from (select /*+parallel(a 2)*/count(*) from a group by a) v,b; + QUERY PLAN +---------------------------------------------------------------------------- + Nested Loop + -> Seq Scan on b + -> Materialize + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> GroupAggregate + Group By Key: a.a + -> Sort + Sort Key: a.a + -> Streaming(type: LOCAL REDISTRIBUTE dop: 2/2) + -> GroupAggregate + Group By Key: a.a + -> Sort + Sort Key: a.a + -> Seq Scan on a +(14 rows) + +explain (costs off) select /*+parallel(128)*/* from a; +ERROR: Current degree of parallelism can only be set within [0,64] +explain (costs off) select /*+parallel(a 128)*/* from a; + QUERY PLAN +----------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/64) + -> Seq Scan on a +(2 rows) + +explain (costs off) select /*+parallel(a 1)*/* from a; + QUERY PLAN +--------------- + Seq Scan on a +(1 row) + +drop schema parallel_hint cascade; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table a +drop cascades to table b +drop cascades to table c diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 4f3e584de4..e2a473bd35 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -823,3 +823,6 @@ test: ledger_table_case # wlm_memory_trace test: wlm_memory_trace + +# parallel_hint +test: parallel_hint -- Gitee