diff --git a/src/gausskernel/runtime/executor/execQual.cpp b/src/gausskernel/runtime/executor/execQual.cpp index f7ace875784754f414f2760c850a565fdd0227b1..20e339757ad49b2572f31e37826f3de5042e97ea 100644 --- a/src/gausskernel/runtime/executor/execQual.cpp +++ b/src/gausskernel/runtime/executor/execQual.cpp @@ -6858,7 +6858,7 @@ ExprState* ExecPrepareExpr(Expr* node, EState* estate) * ExecQual * * Evaluates a conjunctive boolean expression (qual list) and -* returns true iff none of the subexpressions are false. +* returns true if none of the subexpressions are false. * (We also return true if the list is empty.) * * If some of the subexpressions yield NULL but none yield FALSE, diff --git a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp index e07865b9dde19e4da50f5bb71397f868acc06ea6..25e734805910c5b7f005f59828f32f4825ce08f9 100644 --- a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp @@ -39,7 +39,7 @@ static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirectio static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); static void _bt_check_natts_correct(const Relation index, bool heapkeyspace, Page page, OffsetNumber offnum); -static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); + static int btree_setup_posting_items(BTScanOpaque so, int itemIndex, OffsetNumber offnum, ItemPointer heapTid, IndexTuple itup); static void btree_save_posting_item(BTScanOpaque so, int itemIndex, OffsetNumber offnum, ItemPointer heapTid, @@ -2083,7 +2083,7 @@ static inline void btree_save_posting_item(BTScanOpaque so, int item_idx, Offset * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately * for scan direction */ -static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) +void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) { /* initialize moreLeft/moreRight appropriately for scan direction */ if (ScanDirectionIsForward(dir)) { diff --git a/src/gausskernel/storage/access/ubtree/ubtree.cpp b/src/gausskernel/storage/access/ubtree/ubtree.cpp index daae414351c1e7bdc712753500352bbc82127f05..c3bc6cd727328a104f76963f30a0dfe8ff0326a1 100644 --- a/src/gausskernel/storage/access/ubtree/ubtree.cpp +++ b/src/gausskernel/storage/access/ubtree/ubtree.cpp @@ -52,6 +52,50 @@ typedef struct { MemoryContext pagedelcontext; } BTVacState; +/* + * UBTPARALLEL_NOT_INITIALIZED indicates that the scan has not started. + * + * UBTPARALLEL_ADVANCING indicates that some process is advancing the scan to + * a new page; others must wait. + * + * UBTPARALLEL_IDLE indicates that no backend is currently advancing the scan + * to a new page; some process can start doing that. + * + * UBTPARALLEL_DONE indicates that the scan is complete (including error exit). + * We reach this state once for every distinct combination of array keys. + */ +typedef enum { + UBTPARALLEL_NOT_INITIALIZED, + UBTPARALLEL_ADVANCING, + UBTPARALLEL_IDLE, + UBTPARALLEL_DONE +} UBTPS_State; + +/* + * UBTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct UBTParallelScanDescData : public BaseObject { + void Reset() + { + ubtps_scanPage = InvalidBlockNumber; + ubtpsPageStatus = UBTPARALLEL_NOT_INITIALIZED; + ubtpsArrayKeyCount = 0; + } + + UBTParallelScanDescData() { Reset(); } + + BlockNumber ubtps_scanPage; /* latest or next page to be scanned */ + UBTPS_State ubtpsPageStatus;/* indicates whether next page is available + * for scan. see above for possible states of + * parallel scan. */ + int ubtpsArrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ +} UBTParallelScanDescData; + +typedef struct UBTParallelScanDescData *UBTParallelScanDesc; + static void UBTreeVacuumScan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, BTCycleId cycleid); static void UBTreeVacuumPage(BTVacState *vstate, OidRBTree* invisibleParts, BlockNumber blkno, BlockNumber origBlkno); @@ -339,7 +383,7 @@ Datum ubtgetbitmap(PG_FUNCTION_ARGS) } } /* Now see if we have more array keys to deal with */ - } while (so->numArrayKeys && _bt_advance_array_keys(scan, ForwardScanDirection)); + } while (so->numArrayKeys && UBTreeAdvanceArrayKeys(scan, ForwardScanDirection)); PG_RETURN_INT64(ntids); } @@ -379,6 +423,7 @@ Datum ubtbeginscan(PG_FUNCTION_ARGS) so->arrayContext = NULL; so->killedItems = NULL; /* until needed */ so->numKilled = 0; + so->arrayKeyCount = 0; /* * We don't know yet whether the scan will be index-only, so we do not @@ -608,6 +653,157 @@ Datum ubtrestrpos(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * UbtParallelRescan() -- reset parallel scan + */ +void UbtParallelRescan(void* pscan) +{ + UBTParallelScanDesc ubtscan = (UBTParallelScanDesc)pscan; + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + ubtscan->Reset(); + LWLockRelease(ParallelIndexScanLock); +} + +/* + * ubt_parallel_seize() -- Begin the process of advancing the scan to a new + * page. Other scans must wait until we call ubt_parallel_release() or + * ubt_parallel_done(). + * + * The return value is true if we successfully seized the scan and false + * if we did not. The latter case occurs if no pages remain for the current + * set of scankeys. + * + * If the return value is true, *pageno returns the next or current page + * of the scan (depending on the scan direction). An invalid block number + * means the scan hasn't yet started, and P_NONE means we've reached the end. + * The first time a participating process reaches the last page, it will return + * true and set *pageno to P_NONE; after that, further attempts to seize the + * scan will return false. + * + * Callers should ignore the value of pageno if the return value is false. + */ +bool ubt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + UBTPS_State pageStatus; + bool exitLoop = false; + bool status = true; + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + UBTParallelScanDesc ubtscan; + + *pageno = P_NONE; + + ubtscan = (UBTParallelScanDesc) (parallel_scan->psBtpscan); + + while (1) { + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + pageStatus = ubtscan->ubtpsPageStatus; + + if (so->arrayKeyCount < ubtscan->ubtpsArrayKeyCount) { + /* Parallel scan has already advanced to a new set of scankeys. */ + status = false; + } else if (pageStatus == UBTPARALLEL_DONE) { + /* + * We're done with this set of scankeys. This may be the end, or + * there could be more sets to try. + */ + status = false; + } else if (pageStatus != UBTPARALLEL_ADVANCING) { + /* + * We have successfully seized control of the scan for the purpose + * of advancing it to a new page! + */ + ubtscan->ubtpsPageStatus = UBTPARALLEL_ADVANCING; + *pageno = ubtscan->ubtps_scanPage; + exitLoop = true; + } + LWLockRelease(ParallelIndexScanLock); + + if (exitLoop || !status) { + break; + } + } + + return status; +} + +/* + * ubt_parallel_release() -- Complete the process of advancing the scan to a + * new page. We now have the new value btps_scanPage; some other backend + * can now begin advancing the scan. + */ +void ubt_parallel_release(IndexScanDesc scan, BlockNumber scan_page) +{ + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + UBTParallelScanDesc ubtscan; + + ubtscan = (UBTParallelScanDesc) (parallel_scan->psBtpscan); + + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + ubtscan->ubtps_scanPage = scan_page; + ubtscan->ubtpsPageStatus = UBTPARALLEL_IDLE; + LWLockRelease(ParallelIndexScanLock); +} + +/* + * ubt_parallel_done() -- Mark the parallel scan as complete. + * + * When there are no pages left to scan, this function should be called to + * notify other workers. Otherwise, they might wait forever for the scan to + * advance to the next page. + */ +void ubt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + UBTParallelScanDesc ubtscan; + bool statusChanged = false; + + /* Do nothing, for non-parallel scans */ + if (parallel_scan == NULL) { + return; + } + + ubtscan = (UBTParallelScanDesc) (parallel_scan->psBtpscan); + + /* + * Mark the parallel scan as done for this combination of scan keys, + * unless some other process already did so. See also + * UBTreeAdvanceArrayKeys. + */ + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + if (so->arrayKeyCount >= ubtscan->ubtpsArrayKeyCount + && ubtscan->ubtpsPageStatus != UBTPARALLEL_DONE) { + ubtscan->ubtpsPageStatus = UBTPARALLEL_DONE; + statusChanged = true; + } + LWLockRelease(ParallelIndexScanLock); +} + +/* + * ubt_parallel_advance_array_keys() -- Advances the parallel scan for array keys. + * + * Updates the count of array keys processed for both local and parallel + * scans. + */ +void ubt_parallel_advance_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + UBTParallelScanDesc ubtscan; + + ubtscan = (UBTParallelScanDesc) (parallel_scan->psBtpscan); + + ++so->arrayKeyCount; + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + if (ubtscan->ubtpsPageStatus == UBTPARALLEL_DONE) { + ubtscan->ubtps_scanPage = InvalidBlockNumber; + ubtscan->ubtpsPageStatus = UBTPARALLEL_NOT_INITIALIZED; + ++ubtscan->ubtpsArrayKeyCount; + } + LWLockRelease(ParallelIndexScanLock); +} + /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells diff --git a/src/gausskernel/storage/access/ubtree/ubtsearch.cpp b/src/gausskernel/storage/access/ubtree/ubtsearch.cpp index 8343ec9b036e3f04e74963b9ed92fed9b287521c..c83d2ba02d18af1100be22630313221a8fc51b26 100644 --- a/src/gausskernel/storage/access/ubtree/ubtsearch.cpp +++ b/src/gausskernel/storage/access/ubtree/ubtsearch.cpp @@ -36,9 +36,31 @@ static void UBTreeSaveItem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, Oid partOid, bool wantItup, bool needRecheck); static bool UBTreeStepPage(IndexScanDesc scan, ScanDirection dir); static bool UBTreeEndPoint(IndexScanDesc scan, ScanDirection dir); +static bool UBTreeReadNextPage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); const uint16 INVALID_TUPLE_OFFSET = (uint16)0xa5a5; +/* + * UBTreeParallelReadPage() -- Read current page containing valid data for scan + * + * On success, release lock and maybe pin on buffer. We return true to + * indicate success. + */ +static bool UBTreeParallelReadPage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + + _bt_initialize_more_data(so, dir); + + if (!UBTreeReadNextPage(scan, blkno, dir)) { + return false; + } + + /* Drop the lock, but not pin, on the new page */ + LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); + return true; +} + /* * UBTreeSearch() -- Search the tree for a particular scankey, * or more precisely for the first leaf page it could be on. @@ -619,6 +641,8 @@ bool UBTreeFirst(IndexScanDesc scan, ScanDirection dir) int i; StrategyNumber strat_total; BTScanPosItem *currItem = NULL; + BlockNumber blkno; + bool status = true; pgstat_count_index_scan(rel); @@ -632,8 +656,30 @@ bool UBTreeFirst(IndexScanDesc scan, ScanDirection dir) * Quit now if _bt_preprocess_keys() discovered that the scan keys can * never be satisfied (eg, x == 1 AND x > 2). */ - if (!so->qual_ok) + if (!so->qual_ok) { return false; + } + + /* + * For parallel scans, get the starting page from shared state. If the + * scan has not started, proceed to find out first leaf page in the usual + * way while keeping other participating processes waiting. If the scan + * has already begun, use the page number from the shared structure. + */ + if (scan->parallelScan != NULL) { + status = ubt_parallel_seize(scan, &blkno); + if (!status) { + return false; + } else if (blkno == P_NONE) { + ubt_parallel_done(scan); + return false; + } else if (blkno != InvalidBlockNumber) { + if (!UBTreeParallelReadPage(scan, blkno, dir)) { + return false; + } + goto readcomplete; + } + } /* ---------- * Examine the scan keys to discover where we need to start the scan. @@ -799,9 +845,14 @@ bool UBTreeFirst(IndexScanDesc scan, ScanDirection dir) * the tree. Walk down that edge to the first or last key, and scan from * there. */ - if (keysCount == 0) - return UBTreeEndPoint(scan, dir); - + if (keysCount == 0) { + bool match = UBTreeEndPoint(scan, dir); + if (!match) { + /* No match, so mark (parallel) scan finished */ + ubt_parallel_done(scan); + } + return match; + } /* * We want to start the scan somewhere within the index. Set up an * insertion scankey we can use to search for the boundary point we @@ -827,8 +878,10 @@ bool UBTreeFirst(IndexScanDesc scan, ScanDirection dir) */ ScanKey subkey = (ScanKey)DatumGetPointer(cur->sk_argument); Assert(subkey->sk_flags & SK_ROW_MEMBER); - if (subkey->sk_flags & SK_ISNULL) + if (subkey->sk_flags & SK_ISNULL) { + ubt_parallel_done(scan); return false; + } rc = memcpy_s(inskey.scankeys + i, sizeof(ScanKeyData), subkey, sizeof(ScanKeyData)); securec_check(rc, "\0", "\0"); @@ -1033,6 +1086,12 @@ bool UBTreeFirst(IndexScanDesc scan, ScanDirection dir) * because nothing finer to lock exists. */ PredicateLockRelation(rel, scan->xs_snapshot); + + /* + * mark parallel scan as done, so that all the workers can finish + * their scan + */ + ubt_parallel_done(scan); return false; } else PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot); @@ -1087,6 +1146,7 @@ bool UBTreeFirst(IndexScanDesc scan, ScanDirection dir) /* Drop the lock, but not pin, on the current page */ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); +readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; @@ -1314,6 +1374,15 @@ static bool UBTreeReadPage(IndexScanDesc scan, ScanDirection dir, OffsetNumber o page = BufferGetPage(so->currPos.buf); opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page); + + /* allow next page be processed by parallel worker */ + if (scan->parallelScan) { + if (ScanDirectionIsForward(dir)) + ubt_parallel_release(scan, opaque->btpo_next); + else + ubt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); xidBase = opaque->xid_base; @@ -1325,6 +1394,12 @@ static bool UBTreeReadPage(IndexScanDesc scan, ScanDirection dir, OffsetNumber o */ so->currPos.nextPage = opaque->btpo_next; + /* + * We note the buffer's block number so that we can release the pin later. + * This allows us to re-read the buffer if it is needed again for hinting. + */ + so->currPos.currPage = BufferGetBlockNumber(so->currPos.buf); + /* initialize tuple workspace to empty */ so->currPos.nextTupleOffset = 0; @@ -1486,8 +1561,8 @@ static bool UBTreeStepPage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque)scan->opaque; Relation rel; - Page page; - UBTPageOpaqueInternal opaque; + BlockNumber blkno = InvalidBlockNumber; + bool status; /* we must have the buffer pinned and locked */ Assert(BufferIsValid(so->currPos.buf)); @@ -1515,24 +1590,80 @@ static bool UBTreeStepPage(IndexScanDesc scan, ScanDirection dir) so->markPos.itemIndex = so->markItemIndex; so->markItemIndex = -1; } - rel = scan->indexRelation; - + /* release the previous buffer */ + _bt_relbuf(rel, so->currPos.buf); if (ScanDirectionIsForward(dir)) { + so->currPos.buf = InvalidBuffer; /* Walk right to the next page with data */ - /* We must rely on the previously saved nextPage link! */ - BlockNumber blkno = so->currPos.nextPage; - + if (scan->parallelScan != NULL) { + /* + * Seize the scan to get the next block number; if the scan has + * ended already, bail out. + */ + status = ubt_parallel_seize(scan, &blkno); + if (!status) { + return false; + } + } else { + /* Walk right to the next page with data */ + /* We must rely on the previously saved nextPage link! */ + blkno = so->currPos.nextPage; + } /* Remember we left a page with data */ so->currPos.moreLeft = true; + } else { + /* Remember we left a page with data */ + so->currPos.moreRight = true; + if (scan->parallelScan != NULL) { + /* + * Seize the scan to get the current block number; if the scan has + * ended already, bail out. + */ + status = ubt_parallel_seize(scan, &blkno); + if (!status) { + so->currPos.buf = InvalidBuffer; + return false; + } + } else { + /* Not parallel, so just use our own notion of the current page */ + blkno = so->currPos.currPage; + } + } + if (!UBTreeReadNextPage(scan, blkno, dir)) { + return false; + } + return true; +} +/* + * UBTreeReadNextPage() -- Read next page containing valid data for scan + * + * On success exit, so->currPos is updated to contain data from the next + * interesting page. Caller is responsible to release lock and pin on + * buffer on success. We return true to indicate success. + * + * If there are no more matching records in the given direction, we drop all + * locks and pins, set so->currPos.buf to InvalidBuffer, and return false. + */ +static bool UBTreeReadNextPage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Relation rel; + Page page; + UBTPageOpaqueInternal opaque; + bool status; + + rel = scan->indexRelation; + + if (ScanDirectionIsForward(dir)) { for (;;) { - /* release the previous buffer */ - _bt_relbuf(rel, so->currPos.buf); - so->currPos.buf = InvalidBuffer; /* if we're at end of scan, give up */ - if (blkno == P_NONE || !so->currPos.moreRight) + if (blkno == P_NONE || !so->currPos.moreRight) { + ubt_parallel_done(scan); return false; + } + /* check for interrupts while we're not holding any buffer lock */ CHECK_FOR_INTERRUPTS(); /* step right one page */ @@ -1544,15 +1675,35 @@ static bool UBTreeStepPage(IndexScanDesc scan, ScanDirection dir) PredicateLockPage(rel, blkno, scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreRight if we can stop */ - if (UBTreeReadPage(scan, dir, P_FIRSTDATAKEY(opaque))) + if (UBTreeReadPage(scan, dir, P_FIRSTDATAKEY(opaque))) { break; + } + } else if (scan->parallelScan != NULL) { + /* allow next page be processed by parallel worker */ + ubt_parallel_release(scan, opaque->btpo_next); } + /* release the previous buffer */ + _bt_relbuf(rel, so->currPos.buf); + so->currPos.buf = InvalidBuffer; + /* nope, keep going */ - blkno = opaque->btpo_next; + if (scan->parallelScan != NULL) { + status = ubt_parallel_seize(scan, &blkno); + if (!status) { + return false; + } + } else { + blkno = opaque->btpo_next; + } } } else { - /* Remember we left a page with data */ - so->currPos.moreRight = true; + /* + * Should only happen in parallel cases, when some other backend + * advanced the scan. + */ + + so->currPos.currPage = blkno; + so->currPos.buf = _bt_getbuf(rel, so->currPos.currPage, BT_READ); /* * Walk left to the next page with data. This is much more complex @@ -1565,19 +1716,20 @@ static bool UBTreeStepPage(IndexScanDesc scan, ScanDirection dir) /* Done if we know there are no matching keys to the left */ if (!so->currPos.moreLeft) { _bt_relbuf(rel, so->currPos.buf); + ubt_parallel_done(scan); so->currPos.buf = InvalidBuffer; return false; } - /* Step to next physical page */ Buffer temp = so->currPos.buf; so->currPos.buf = InvalidBuffer; so->currPos.buf = _bt_walk_left(rel, temp); /* if we're physically at end of index, return failure */ - if (so->currPos.buf == InvalidBuffer) + if (so->currPos.buf == InvalidBuffer) { + ubt_parallel_done(scan); return false; - + } /* * Okay, we managed to move left to a non-deleted page. Done if * it's not half-dead and contains matching tuples. Else loop back @@ -1589,12 +1741,31 @@ static bool UBTreeStepPage(IndexScanDesc scan, ScanDirection dir) PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreLeft if we can stop */ - if (UBTreeReadPage(scan, dir, PageGetMaxOffsetNumber(page))) + if (UBTreeReadPage(scan, dir, PageGetMaxOffsetNumber(page))) { break; + } + } else if (scan->parallelScan != NULL) { + /* allow next page be processed by parallel worker */ + ubt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + + /* + * For parallel scans, get the last page scanned as it is quite + * possible that by the time we try to seize the scan, some other + * worker has already advanced the scan to a different page. We + * must continue based on the latest page scanned by any worker. + */ + if (scan->parallelScan != NULL) { + _bt_relbuf(rel, so->currPos.buf); + status = ubt_parallel_seize(scan, &blkno); + if (!status) { + so->currPos.buf = InvalidBuffer; + return false; + } + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); } } } - return true; } @@ -1835,7 +2006,7 @@ bool UBTreeGetTupleInternal(IndexScanDesc scan, ScanDirection dir) if (res) break; /* ... otherwise see if we have more array keys to deal with */ - } while (so->numArrayKeys && _bt_advance_array_keys(scan, dir)); + } while (so->numArrayKeys && UBTreeAdvanceArrayKeys(scan, dir)); return res; } \ No newline at end of file diff --git a/src/gausskernel/storage/access/ubtree/ubtutils.cpp b/src/gausskernel/storage/access/ubtree/ubtutils.cpp index 83687a96d82c486ec8d71f045f8b326207daf6cc..7c1571d405fe6069d19f790a335954e935bbcaff 100644 --- a/src/gausskernel/storage/access/ubtree/ubtutils.cpp +++ b/src/gausskernel/storage/access/ubtree/ubtutils.cpp @@ -379,6 +379,60 @@ static bool UBTreeShowAnyTupleCheckWrap(IndexScanDesc scan, Page page, return true; } +/* + * UBTreeAdvanceArrayKeys() -- Advance to next set of array elements + * + * Returns TRUE if there is another set of values to consider, FALSE if not. + * On TRUE result, the scankeys are initialized with the next set of values. + */ +bool UBTreeAdvanceArrayKeys(IndexScanDesc scan, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque)scan->opaque; + bool found = false; + int i; + + /* + * We must advance the last array key most quickly, since it will + * correspond to the lowest-order index column among the available + * qualifications. This is necessary to ensure correct ordering of output + * when there are multiple array keys. + */ + for (i = so->numArrayKeys - 1; i >= 0; --i) { + BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; + ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key]; + int curElem = curArrayKey->cur_elem; + int numElems = curArrayKey->num_elems; + + if (ScanDirectionIsBackward(dir)) { + if (--curElem < 0) { + curElem = numElems - 1; + found = false; /* need to advance next array key */ + } else { + found = true; + } + } else { + if (++curElem >= numElems) { + curElem = 0; + found = false; /* need to advance next array key */ + } else { + found = true; + } + } + + curArrayKey->cur_elem = curElem; + skey->sk_argument = curArrayKey->elem_values[curElem]; + if (found) { + break; + } + } + + /* advance parallel scan */ + if (scan->parallelScan != NULL) + ubt_parallel_advance_array_keys(scan); + + return found; +} + static bool UBTreeVisibilityCheckWrap(IndexScanDesc scan, Page page, OffsetNumber offnum, bool *needRecheck) { bool needVisibilityCheck = scan->xs_snapshot->satisfies != SNAPSHOT_ANY && diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index bb6d1e83cd430c03737a19d30a1a9db8e58276bc..76a6541a2d902bf5d1468f1b05c425a24d084441 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -1279,6 +1279,8 @@ typedef union unified_atomic_iterator { #define INVALID_ATOMIC_ITERATOR PG_UINT32_MAX +#define CV_WAIT_TIMEOUT 10 /* every 10s to check interrupt for condition variable waiting */ + /* * prototypes for functions in nbtree.c (external entry points for btree) */ @@ -1393,7 +1395,7 @@ extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost); extern bool _bt_gettuple_internal(IndexScanDesc scan, ScanDirection dir); extern bool _bt_check_natts(const Relation index, bool heapkeyspace, Page page, OffsetNumber offnum); extern int _bt_getrootheight(Relation rel); - +extern void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); /* * prototypes for functions in nbtutils.c */ diff --git a/src/include/access/ubtree.h b/src/include/access/ubtree.h index 33c9e68499a7c442bb5321293973c3faa85a8f5e..658bf72d58ad5c22bd57c61470925bae2040597a 100644 --- a/src/include/access/ubtree.h +++ b/src/include/access/ubtree.h @@ -48,6 +48,12 @@ extern Datum ubtvacuumcleanup(PG_FUNCTION_ARGS); extern Datum ubtcanreturn(PG_FUNCTION_ARGS); extern Datum ubtoptions(PG_FUNCTION_ARGS); +extern void UbtParallelRescan(void* pscan); +extern bool ubt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno); +extern void ubt_parallel_release(IndexScanDesc scan, BlockNumber scan_page); +extern void ubt_parallel_done(IndexScanDesc scan); +extern void ubt_parallel_advance_array_keys(IndexScanDesc scan); + extern bool UBTreeDelete(Relation index_relation, Datum* values, const bool* isnull, ItemPointer heapTCtid, bool isRollbackIndex); extern bool IndexPagePrepareForXid(Relation rel, Page page, TransactionId xid, bool needWal, Buffer buf); @@ -565,6 +571,7 @@ extern void BtCheckThirdPage(Relation rel, Relation heap, bool needheaptidspace, extern bool UBTreeItupGetXminXmax(Page page, OffsetNumber offnum, TransactionId oldest_xmin, TransactionId *xmin, TransactionId *xmax, bool *xminCommitted, bool *xmaxCommitted, bool isToast); extern TransactionIdStatus UBTreeCheckXid(TransactionId xid); +extern bool UBTreeAdvanceArrayKeys(IndexScanDesc scan, ScanDirection dir); /* * prototypes for functions in ubtpage.cpp