diff --git a/src/jrd/RecordSourceNodes.cpp b/src/jrd/RecordSourceNodes.cpp index f355b6bf1c1..130c2e60c8c 100644 --- a/src/jrd/RecordSourceNodes.cpp +++ b/src/jrd/RecordSourceNodes.cpp @@ -54,6 +54,165 @@ static ValueExprNode* resolveUsingField(DsqlCompilerScratch* dsqlScratch, const namespace { + // Search through the list of ANDed booleans to find comparisons + // referring streams of parent select expressions. + // Extract those booleans and return them to the caller. + + bool findDependentBooleans(CompilerScratch* csb, + const StreamList& rseStreams, + BoolExprNode** parentBoolean, + BoolExprNodeStack& booleanStack) + { + const auto boolean = *parentBoolean; + + const auto binaryNode = nodeAs(boolean); + if (binaryNode && binaryNode->blrOp == blr_and) + { + const bool found1 = findDependentBooleans(csb, rseStreams, + binaryNode->arg1.getAddress(), booleanStack); + const bool found2 = findDependentBooleans(csb, rseStreams, + binaryNode->arg2.getAddress(), booleanStack); + + if (!binaryNode->arg1 && !binaryNode->arg2) + *parentBoolean = nullptr; + else if (!binaryNode->arg1) + *parentBoolean = binaryNode->arg2; + else if (!binaryNode->arg2) + *parentBoolean = binaryNode->arg1; + + return (found1 || found2); + } + + if (const auto cmpNode = nodeAs(boolean)) + { + if (cmpNode->blrOp == blr_eql || cmpNode->blrOp == blr_equiv) + { + SortedStreamList streams; + cmpNode->collectStreams(streams); + + for (const auto stream : streams) + { + if (rseStreams.exist(stream)) + { + booleanStack.push(boolean); + *parentBoolean = nullptr; + return true; + } + } + } + } + + return false; + } + + // Search through the list of ANDed booleans to find correlated EXISTS/IN sub-queries. + // They are candidates to be converted into semi- or anti-joins. + + bool findPossibleJoins(CompilerScratch* csb, + const StreamList& rseStreams, + BoolExprNode** parentBoolean, + RecordSourceNodeStack& rseStack, + BoolExprNodeStack& booleanStack) + { + auto boolNode = *parentBoolean; + + const auto binaryNode = nodeAs(boolNode); + if (binaryNode && binaryNode->blrOp == blr_and) + { + const bool found1 = findPossibleJoins(csb, rseStreams, + binaryNode->arg1.getAddress(), rseStack, booleanStack); + const bool found2 = findPossibleJoins(csb, rseStreams, + binaryNode->arg2.getAddress(), rseStack, booleanStack); + + if (!binaryNode->arg1 && !binaryNode->arg2) + *parentBoolean = nullptr; + else if (!binaryNode->arg1) + *parentBoolean = binaryNode->arg2; + else if (!binaryNode->arg2) + *parentBoolean = binaryNode->arg1; + + return (found1 || found2); + } + + const auto rseNode = nodeAs(boolNode); + // Both EXISTS (blr_any) and IN (blr_ansi_any) sub-queries are handled + if (rseNode && (rseNode->blrOp == blr_any || rseNode->blrOp == blr_ansi_any)) + { + auto rse = rseNode->rse; + fb_assert(rse && (rse->flags & RseNode::FLAG_SUB_QUERY)); + + if (rse->rse_boolean && rse->rse_jointype == blr_inner && + !rse->rse_first && !rse->rse_skip && !rse->rse_plan) + { + // Find booleans convertable into semi-joins + + StreamList streams; + rse->computeRseStreams(streams); + + BoolExprNodeStack booleans; + if (findDependentBooleans(csb, rseStreams, + rse->rse_boolean.getAddress(), + booleans)) + { + // Compose the conjunct boolean + + fb_assert(booleans.hasData()); + auto boolean = booleans.pop(); + while (booleans.hasData()) + { + const auto andNode = FB_NEW_POOL(csb->csb_pool) + BinaryBoolNode(csb->csb_pool, blr_and); + andNode->arg1 = boolean; + andNode->arg2 = booleans.pop(); + boolean = andNode; + } + + // Ensure that no external references are left inside the subquery. + // If so, mark the RSE as joined and add it to the stack. + + SortedStreamList streams; + rse->collectStreams(streams); + + bool dependent = false; + for (const auto stream : streams) + { + if (rseStreams.exist(stream)) + { + dependent = true; + break; + } + } + + if (!dependent) + { + rse->flags &= ~RseNode::FLAG_SUB_QUERY; + rse->flags |= RseNode::FLAG_SEMI_JOINED; + rseStack.push(rse); + booleanStack.push(boolean); + *parentBoolean = nullptr; + return true; + } + + // Otherwise, restore the original sub-query by adding + // the collected booleans back to the RSE. + + if (rse->rse_boolean) + { + const auto andNode = FB_NEW_POOL(csb->csb_pool) + BinaryBoolNode(csb->csb_pool, blr_and); + andNode->arg1 = boolean; + andNode->arg2 = rse->rse_boolean; + boolean = andNode; + } + + rse->rse_boolean = boolean; + } + } + } + + return false; + } + class AutoActivateResetStreams : public AutoStorage { public: @@ -3025,6 +3184,9 @@ RseNode* RseNode::pass1(thread_db* tdbb, CompilerScratch* csb) { SET_TDBB(tdbb); + if (const auto newRse = processPossibleJoins(tdbb, csb)) + return newRse->pass1(tdbb, csb); + // for scoping purposes, maintain a stack of RseNode's which are // currently being parsed; if there are none on the stack as // yet, mark the RseNode as variant to make sure that statement- @@ -3130,6 +3292,12 @@ RseNode* RseNode::pass1(thread_db* tdbb, CompilerScratch* csb) void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse, BoolExprNode** boolean, RecordSourceNodeStack& stack) { + if (const auto newRse = processPossibleJoins(tdbb, csb)) + { + newRse->pass1Source(tdbb, csb, rse, boolean, stack); + return; + } + if (rse_jointype != blr_inner) { // Check whether any of the upper level booleans (those belonging to the WHERE clause) @@ -3183,7 +3351,7 @@ void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse, } } - // in the case of an RseNode, it is possible that a new RseNode will be generated, + // In the case of an RseNode, it is possible that a new RseNode will be generated, // so wait to process the source before we push it on the stack (bug 8039) // The addition of the JOIN syntax for specifying inner joins causes an @@ -3191,7 +3359,7 @@ void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse, // where we are just trying to inner join more than 2 streams. If possible, // try to flatten the tree out before we go any further. - if (!isLateral() && + if (!isLateral() && !isSemiJoined() && rse->rse_jointype == blr_inner && rse_jointype == blr_inner && !rse_sorted && !rse_projection && @@ -3296,11 +3464,11 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr StreamStateHolder stateHolder(csb, opt->getOuterStreams()); - if (opt->isLeftJoin() || isLateral()) + if (opt->isLeftJoin() || isLateral() || isSemiJoined()) { stateHolder.activate(); - if (opt->isLeftJoin()) + if (opt->isLeftJoin() || isSemiJoined()) { // Push all conjuncts except "missing" ones (e.g. IS NULL) for (auto iter = opt->getConjuncts(false, true); iter.hasData(); ++iter) @@ -3323,6 +3491,87 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr return opt->compile(this, &conjunctStack); } +RseNode* RseNode::processPossibleJoins(thread_db* tdbb, CompilerScratch* csb) +{ + if (rse_jointype != blr_inner || !rse_boolean || rse_plan) + return nullptr; + + // If the sub-query is nested inside the other sub-query which wasn't converted into semi-join, + // it makes no sense to apply a semi-join at the deeper levels, as a sub-query is expected + // to be executed repeatedly. + // This is a temporary fix until nested loop semi-joins are allowed by the optimizer. + + if (flags & FLAG_SUB_QUERY) + return nullptr; + + for (const auto node : csb->csb_current_nodes) + { + if (const auto rse = nodeAs(node)) + { + if (rse->flags & FLAG_SUB_QUERY) + return nullptr; + } + } + + RecordSourceNodeStack rseStack; + BoolExprNodeStack booleanStack; + + // Find possibly joinable sub-queries + + StreamList rseStreams; + computeRseStreams(rseStreams); + + if (!findPossibleJoins(csb, rseStreams, rse_boolean.getAddress(), rseStack, booleanStack)) + return nullptr; + + fb_assert(rseStack.hasData() && booleanStack.hasData()); + fb_assert(rseStack.getCount() == booleanStack.getCount()); + + // Create joins between the original node and detected joinable nodes. + // Preserve FIRST/SKIP nodes at their original position, i.e. outside semi-joins. + + const auto first = rse_first; + rse_first = nullptr; + + const auto skip = rse_skip; + rse_skip = nullptr; + + const auto orgFlags = flags; + flags = 0; + + auto rse = this; + while (rseStack.hasData()) + { + const auto newRse = FB_NEW_POOL(*tdbb->getDefaultPool()) + RseNode(*tdbb->getDefaultPool()); + + newRse->rse_relations.add(rse); + newRse->rse_relations.add(rseStack.pop()); + + newRse->rse_jointype = blr_inner; + newRse->rse_boolean = booleanStack.pop(); + + rse = newRse; + } + + if (first || skip) + { + const auto newRse = FB_NEW_POOL(*tdbb->getDefaultPool()) + RseNode(*tdbb->getDefaultPool()); + + newRse->rse_relations.add(rse); + newRse->rse_jointype = blr_inner; + newRse->rse_first = first; + newRse->rse_skip = skip; + + rse = newRse; + } + + rse->flags = orgFlags; + + return rse; +} + // Check that all streams in the RseNode have a plan specified for them. // If they are not, there are streams in the RseNode which were not mentioned in the plan. void RseNode::planCheck(const CompilerScratch* csb) const diff --git a/src/jrd/RecordSourceNodes.h b/src/jrd/RecordSourceNodes.h index ce4f7c0d51d..1bf53028570 100644 --- a/src/jrd/RecordSourceNodes.h +++ b/src/jrd/RecordSourceNodes.h @@ -718,14 +718,15 @@ class RseNode final : public TypedNode dsqlFirst; diff --git a/src/jrd/optimizer/InnerJoin.cpp b/src/jrd/optimizer/InnerJoin.cpp index afd26c60cb1..a6b1a364f2e 100644 --- a/src/jrd/optimizer/InnerJoin.cpp +++ b/src/jrd/optimizer/InnerJoin.cpp @@ -108,6 +108,7 @@ void InnerJoin::calculateStreamInfo() innerStream->baseIndexes = candidate->indexes; innerStream->baseUnique = candidate->unique; innerStream->baseNavigated = candidate->navigated; + innerStream->baseConjuncts = candidate->conjuncts; csb->csb_rpt[innerStream->number].deactivate(); } @@ -579,13 +580,15 @@ River* InnerJoin::formRiver() // Create a hash join rsb = FB_NEW_POOL(getPool()) - HashJoin(tdbb, csb, 2, hashJoinRsbs, keys.begin(), stream.selectivity); + HashJoin(tdbb, csb, INNER_JOIN, 2, hashJoinRsbs, keys.begin(), stream.selectivity); // Clear priorly processed rsb's, as they're already incorporated into a hash join rsbs.clear(); } else + { rsb = optimizer->generateRetrieval(stream.number, sortPtr, false, false); + } rsbs.add(rsb); streams.add(stream.number); diff --git a/src/jrd/optimizer/Optimizer.cpp b/src/jrd/optimizer/Optimizer.cpp index f40713f4638..13daa79c92d 100644 --- a/src/jrd/optimizer/Optimizer.cpp +++ b/src/jrd/optimizer/Optimizer.cpp @@ -168,9 +168,14 @@ namespace class CrossJoin : public River { public: - CrossJoin(CompilerScratch* csb, RiverList& rivers) - : River(csb, nullptr, rivers) + CrossJoin(Optimizer* opt, RiverList& rivers, JoinType joinType) + : River(opt->getCompilerScratch(), nullptr, rivers) { + fb_assert(joinType != OUTER_JOIN); + + const auto csb = opt->getCompilerScratch(); + Optimizer::ConjunctIterator iter(opt->getBaseConjuncts()); + // Save states of the underlying streams and restore them afterwards StreamStateHolder stateHolder(csb, m_streams); @@ -182,57 +187,80 @@ namespace if (riverCount == 1) { - River* const sub_river = rivers.pop(); - m_rsb = sub_river->getRecordSource(); + const auto subRiver = rivers.pop(); + const auto subRsb = subRiver->getRecordSource(); + subRiver->activate(csb); + m_rsb = opt->applyBoolean(subRsb, iter); } else { HalfStaticArray rsbs(riverCount); - // Reorder input rivers according to their possible inter-dependencies - - while (rivers.hasData()) + if (joinType == INNER_JOIN) { - const auto orgCount = rsbs.getCount(); + // Reorder input rivers according to their possible inter-dependencies - for (auto& subRiver : rivers) + while (rivers.hasData()) { - const auto subRsb = subRiver->getRecordSource(); - fb_assert(!rsbs.exist(subRsb)); + const auto orgCount = rsbs.getCount(); - subRiver->activate(csb); - - if (subRiver->isComputable(csb)) + for (auto& subRiver : rivers) { - rsbs.add(subRsb); - rivers.remove(&subRiver); - break; + auto subRsb = subRiver->getRecordSource(); + + subRiver->activate(csb); + subRsb = opt->applyBoolean(subRsb, iter); + + if (subRiver->isComputable(csb)) + { + rsbs.add(subRsb); + rivers.remove(&subRiver); + break; + } + + subRiver->deactivate(csb); } - subRiver->deactivate(csb); + if (rsbs.getCount() == orgCount) + break; } - if (rsbs.getCount() == orgCount) - break; - } + if (rivers.hasData()) + { + // Ideally, we should never get here. But just in case it happened, handle it. - if (rivers.hasData()) - { - // Ideally, we should never get here. But just in case it happened, handle it. + for (auto& subRiver : rivers) + { + auto subRsb = subRiver->getRecordSource(); + + subRiver->activate(csb); + subRsb = opt->applyBoolean(subRsb, iter); + + const auto pos = &subRiver - rivers.begin(); + rsbs.insert(pos, subRsb); + } - for (auto& subRiver : rivers) + rivers.clear(); + } + } + else + { + for (const auto subRiver : rivers) { - const auto subRsb = subRiver->getRecordSource(); - fb_assert(!rsbs.exist(subRsb)); + auto subRsb = subRiver->getRecordSource(); + + subRiver->activate(csb); + if (subRiver != rivers.front()) + subRsb = opt->applyBoolean(subRsb, iter); - const auto pos = &subRiver - rivers.begin(); - rsbs.insert(pos, subRsb); + rsbs.add(subRsb); } rivers.clear(); } - m_rsb = FB_NEW_POOL(csb->csb_pool) NestedLoopJoin(csb, rsbs.getCount(), rsbs.begin()); + m_rsb = FB_NEW_POOL(csb->csb_pool) + NestedLoopJoin(csb, rsbs.getCount(), rsbs.begin(), joinType); } } }; @@ -267,7 +295,6 @@ namespace } } - unsigned getRiverCount(unsigned count, const ValueExprNode* const* eq_class) { // Given an sort/merge join equivalence class (vector of node pointers @@ -563,10 +590,12 @@ namespace // Constructor // -Optimizer::Optimizer(thread_db* aTdbb, CompilerScratch* aCsb, RseNode* aRse, bool parentFirstRows) +Optimizer::Optimizer(thread_db* aTdbb, CompilerScratch* aCsb, RseNode* aRse, + bool parentFirstRows, double parentCardinality) : PermanentStorage(*aTdbb->getDefaultPool()), tdbb(aTdbb), csb(aCsb), rse(aRse), firstRows(rse->firstRows.valueOr(parentFirstRows)), + cardinality(parentCardinality), compileStreams(getPool()), bedStreams(getPool()), keyStreams(getPool()), @@ -618,7 +647,7 @@ Optimizer::~Optimizer() RecordSource* Optimizer::compile(RseNode* subRse, BoolExprNodeStack* parentStack) { - Optimizer subOpt(tdbb, csb, subRse, firstRows); + Optimizer subOpt(tdbb, csb, subRse, firstRows, cardinality); const auto rsb = subOpt.compile(parentStack); if (parentStack && subOpt.isInnerJoin()) @@ -669,11 +698,33 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) conjunctCount += distributeEqualities(conjunctStack, conjunctCount); - // AB: If we have limit our retrieval with FIRST / SKIP syntax then - // we may not deliver above conditions (from higher rse's) to this - // rse, because the results should be consistent. - if (rse->rse_skip || rse->rse_first) - parentStack = nullptr; + if (parentStack) + { + // AB: If we have limit our retrieval with FIRST / SKIP syntax then + // we may not deliver above conditions (from higher rse's) to this + // rse, because the results should be consistent. + if (rse->rse_skip || rse->rse_first) + parentStack = nullptr; + + if (isSemiJoined()) + { + fb_assert(parentStack->hasData()); + + // We have a semi-join, look at the parent (priorly joined streams) cardinality. + // If it's known to be not very small, nullify the parent conjuncts + // to give up a possible nested loop join in favor of a hash join. + // Here we assume every equi-join condition having a default selectivity (0.1). + // TODO: replace with a proper cost-based decision in the future. + + double subSelectivity = MAXIMUM_SELECTIVITY; + for (auto count = parentStack->getCount(); count; count--) + subSelectivity *= DEFAULT_SELECTIVITY; + const auto thresholdCardinality = MINIMUM_CARDINALITY / subSelectivity; + + if (!cardinality || cardinality > thresholdCardinality) + parentStack = nullptr; + } + } // Set base-point before the parent/distributed nodes begin. const unsigned baseCount = conjunctCount; @@ -799,7 +850,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) if (isInnerJoin()) { - for (auto iter = getConjuncts(); iter.hasData(); ++iter) + for (auto iter = getBaseConjuncts(); iter.hasData(); ++iter) { if (!(iter & CONJUNCT_USED) && iter->deterministic() && @@ -815,6 +866,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) // record source blocks for all streams RiverList rivers, dependentRivers; + HalfStaticArray specialSubQueries; bool innerSubStream = false; for (auto node : rse->rse_relations) @@ -822,6 +874,15 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) fb_assert(sort == rse->rse_sorted); fb_assert(aggregate == rse->rse_aggregate); + const auto subRse = nodeAs(node); + + if (subRse && subRse->isSemiJoined()) + { + fb_assert(rse->rse_jointype == blr_inner); + specialSubQueries.add(subRse); + continue; + } + // Find the stream number and place it at the end of the bedStreams array // (if this is really a stream and not another RseNode) @@ -883,11 +944,6 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) if (compileStreams.getCount() > 5) CCH_expand(tdbb, (ULONG) (compileStreams.getCount() * CACHE_PAGES_PER_STREAM)); - // At this point we are ready to start optimizing. - // We will use the opt block to hold information of - // a global nature, meaning that it needs to stick - // around for the rest of the optimization process. - // Attempt to optimize aggregates via an index, if possible if (aggregate && !sort) sort = aggregate; @@ -921,6 +977,8 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) } else { + JoinType joinType = INNER_JOIN; + // AB: If previous rsb's are already on the stack we can't use // a navigational-retrieval for an ORDER BY because the next // streams are JOINed to the previous ones @@ -931,7 +989,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) // AB: We could already have multiple rivers at this // point so try to do some hashing or sort/merging now. - while (generateEquiJoin(rivers)) + while (generateEquiJoin(rivers, joinType)) ; } @@ -968,7 +1026,7 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) // Generate one river which holds a cross join rsb between // all currently available rivers - rivers.add(FB_NEW_POOL(getPool()) CrossJoin(csb, rivers)); + rivers.add(FB_NEW_POOL(getPool()) CrossJoin(this, rivers, joinType)); rivers.back()->activate(csb); } else @@ -988,16 +1046,60 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack) // Attempt to form joins in decreasing order of desirability generateInnerJoin(joinStreams, rivers, &sort, rse->rse_plan); - // Re-activate remaining rivers to be hashable/mergeable - for (const auto river : rivers) - river->activate(csb); + if (rivers.isEmpty() && dependentRivers.isEmpty()) + { + // This case may look weird, but it's possible for recursive unions + rsb = FB_NEW_POOL(csb->csb_pool) NestedLoopJoin(csb, 0, nullptr, joinType); + } + else + { + while (rivers.hasData() || dependentRivers.hasData()) + { + // Re-activate remaining rivers to be hashable/mergeable + for (const auto river : rivers) + river->activate(csb); - // If there are multiple rivers, try some hashing or sort/merging - while (generateEquiJoin(rivers)) - ; + // If there are multiple rivers, try some hashing or sort/merging + while (generateEquiJoin(rivers, joinType)) + ; - rivers.join(dependentRivers); - rsb = CrossJoin(csb, rivers).getRecordSource(); + if (dependentRivers.hasData()) + { + rivers.join(dependentRivers); + dependentRivers.clear(); + } + + const auto finalRiver = FB_NEW_POOL(getPool()) CrossJoin(this, rivers, joinType); + fb_assert(rivers.isEmpty()); + rsb = finalRiver->getRecordSource(); + cardinality = rsb->getCardinality(); + + if (specialSubQueries.hasData()) + { + fb_assert(joinType == INNER_JOIN); + joinType = SEMI_JOIN; + + rivers.add(finalRiver); + + for (const auto rse : specialSubQueries) + { + const auto sub = rse->compile(tdbb, this, true); + fb_assert(sub); + + StreamList localStreams; + sub->findUsedStreams(localStreams); + + const auto subRiver = FB_NEW_POOL(getPool()) River(csb, sub, rse, localStreams); + auto& list = subRiver->isDependent(*finalRiver) ? dependentRivers : rivers; + list.add(subRiver); + } + + specialSubQueries.clear(); + } + } + } + + fb_assert(rsb); // Pick up any residual boolean that may have fallen thru the cracks rsb = applyResidualBoolean(rsb); @@ -2281,16 +2383,38 @@ void Optimizer::formRivers(const StreamList& streams, // If the whole things is a moby no-op, return false. // -bool Optimizer::generateEquiJoin(RiverList& orgRivers) +bool Optimizer::generateEquiJoin(RiverList& rivers, JoinType joinType) { + fb_assert(joinType != OUTER_JOIN); + ULONG selected_rivers[OPT_STREAM_BITS], selected_rivers2[OPT_STREAM_BITS]; ValueExprNode** eq_class; + RiverList orgRivers(rivers); + + // Find dependent rivers and exclude them from processing + + for (River** iter = orgRivers.begin(); iter < orgRivers.end();) + { + const auto river = *iter; + + StreamStateHolder stateHolder2(csb, river->getStreams()); + stateHolder2.activate(); + + if (river->isComputable(csb)) + { + iter++; + continue; + } + + orgRivers.remove(iter); + } + // Count the number of "rivers" involved in the operation, then allocate // a scratch block large enough to hold values to compute equality // classes. - const unsigned orgCount = (unsigned) orgRivers.getCount(); + const auto orgCount = (unsigned) orgRivers.getCount(); if (orgCount < 2) return false; @@ -2397,7 +2521,7 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers) // Prepare rivers for joining StreamList streams; - RiverList rivers; + RiverList joinedRivers; HalfStaticArray keys; unsigned position = 0, maxCardinalityPosition = 0, lowestPosition = MAX_ULONG; double maxCardinality1 = 0, maxCardinality2 = 0; @@ -2426,13 +2550,13 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers) { maxCardinality2 = maxCardinality1; maxCardinality1 = cardinality; - maxCardinalityPosition = rivers.getCount(); + maxCardinalityPosition = joinedRivers.getCount(); } else if (cardinality > maxCardinality2) maxCardinality2 = cardinality; streams.join(river->getStreams()); - rivers.add(river); + joinedRivers.add(river); orgRivers.remove(iter); // Collect keys to join on @@ -2455,10 +2579,11 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers) HalfStaticArray rsbs; RecordSource* finalRsb = nullptr; - if (useMergeJoin) + // MERGE JOIN does not support other join types yet + if (useMergeJoin && joinType == INNER_JOIN) { position = 0; - for (const auto river : rivers) + for (const auto river : joinedRivers) { const auto sort = FB_NEW_POOL(getPool()) SortNode(getPool()); @@ -2482,29 +2607,36 @@ bool Optimizer::generateEquiJoin(RiverList& orgRivers) } else { - // Ensure that the largest river is placed at the first position. - // It's important for a hash join to be efficient. + if (joinType == INNER_JOIN) + { + // Ensure that the largest river is placed at the first position. + // It's important for a hash join to be efficient. - const auto maxCardinalityRiver = rivers[maxCardinalityPosition]; - rivers[maxCardinalityPosition] = rivers[0]; - rivers[0] = maxCardinalityRiver; + const auto maxCardinalityRiver = joinedRivers[maxCardinalityPosition]; + joinedRivers[maxCardinalityPosition] = joinedRivers[0]; + joinedRivers[0] = maxCardinalityRiver; - const auto maxCardinalityKey = keys[maxCardinalityPosition]; - keys[maxCardinalityPosition] = keys[0]; - keys[0] = maxCardinalityKey; + const auto maxCardinalityKey = keys[maxCardinalityPosition]; + keys[maxCardinalityPosition] = keys[0]; + keys[0] = maxCardinalityKey; + } - for (const auto river : rivers) + for (const auto river : joinedRivers) rsbs.add(river->getRecordSource()); finalRsb = FB_NEW_POOL(getPool()) - HashJoin(tdbb, csb, rsbs.getCount(), rsbs.begin(), keys.begin()); + HashJoin(tdbb, csb, joinType, rsbs.getCount(), rsbs.begin(), keys.begin()); } // Pick up any boolean that may apply finalRsb = applyLocalBoolean(finalRsb, streams, iter); - const auto finalRiver = FB_NEW_POOL(getPool()) River(csb, finalRsb, rivers); - orgRivers.insert(lowestPosition, finalRiver); + const auto finalRiver = FB_NEW_POOL(getPool()) River(csb, finalRsb, joinedRivers); + + for (const auto river : joinedRivers) + rivers.findAndRemove(river); + + rivers.insert(lowestPosition, finalRiver); return true; } @@ -2736,6 +2868,20 @@ RecordSource* Optimizer::generateRetrieval(StreamType stream, } +// +// Compose a filter including all computable booleans +// + +RecordSource* Optimizer::applyBoolean(RecordSource* rsb, ConjunctIterator& iter) +{ + double selectivity = MAXIMUM_SELECTIVITY; + if (const auto boolean = composeBoolean(iter, &selectivity)) + rsb = FB_NEW_POOL(getPool()) FilteredStream(csb, rsb, boolean, selectivity); + + return rsb; +} + + // // Find conjuncts local to the given river and compose an appropriate filter // @@ -2750,11 +2896,7 @@ RecordSource* Optimizer::applyLocalBoolean(RecordSource* rsb, StreamStateHolder localHolder(csb, streams); localHolder.activate(); - double selectivity = MAXIMUM_SELECTIVITY; - if (const auto boolean = composeBoolean(iter, &selectivity)) - rsb = FB_NEW_POOL(getPool()) FilteredStream(csb, rsb, boolean, selectivity); - - return rsb; + return applyBoolean(rsb, iter); } diff --git a/src/jrd/optimizer/Optimizer.h b/src/jrd/optimizer/Optimizer.h index a8c78b1ab9e..9566b4b3882 100644 --- a/src/jrd/optimizer/Optimizer.h +++ b/src/jrd/optimizer/Optimizer.h @@ -228,6 +228,11 @@ class River return true; } + bool isDependent(const River& river) const + { + return m_rsb->isDependent(river.getStreams()); + } + protected: RecordSource* m_rsb; Firebird::HalfStaticArray m_nodes; @@ -450,7 +455,7 @@ class Optimizer : public Firebird::PermanentStorage firstRows = attachment->att_opt_first_rows.valueOr(defaultFirstRows); } - return Optimizer(tdbb, csb, rse, firstRows).compile(nullptr); + return Optimizer(tdbb, csb, rse, firstRows, 0).compile(nullptr); } ~Optimizer(); @@ -499,6 +504,7 @@ class Optimizer : public Firebird::PermanentStorage return firstRows; } + RecordSource* applyBoolean(RecordSource* rsb, ConjunctIterator& iter); RecordSource* applyLocalBoolean(RecordSource* rsb, const StreamList& streams, ConjunctIterator& iter); @@ -513,6 +519,11 @@ class Optimizer : public Firebird::PermanentStorage return composeBoolean(iter, selectivity); } + bool isSemiJoined() const + { + return (rse->flags & RseNode::FLAG_SEMI_JOINED) != 0; + } + bool checkEquiJoin(BoolExprNode* boolean); bool getEquiJoinKeys(BoolExprNode* boolean, NestConst* node1, @@ -523,7 +534,8 @@ class Optimizer : public Firebird::PermanentStorage void printf(const char* format, ...); private: - Optimizer(thread_db* aTdbb, CompilerScratch* aCsb, RseNode* aRse, bool parentFirstRows); + Optimizer(thread_db* aTdbb, CompilerScratch* aCsb, RseNode* aRse, + bool parentFirstRows, double parentCardinality); RecordSource* compile(BoolExprNodeStack* parentStack); @@ -537,7 +549,7 @@ class Optimizer : public Firebird::PermanentStorage RiverList& rivers, SortNode** sortClause, const PlanNode* planClause); - bool generateEquiJoin(RiverList& org_rivers); + bool generateEquiJoin(RiverList& rivers, JoinType joinType = INNER_JOIN); void generateInnerJoin(StreamList& streams, RiverList& rivers, SortNode** sortClause, @@ -555,6 +567,7 @@ class Optimizer : public Firebird::PermanentStorage RseNode* const rse; bool firstRows = false; // optimize for first rows + double cardinality = 0; // self or parent cardinality FILE* debugFile = nullptr; unsigned baseConjuncts = 0; // number of conjuncts in our rse, next conjuncts are distributed parent @@ -582,7 +595,7 @@ enum segmentScanType { segmentScanList }; -typedef Firebird::HalfStaticArray MatchedBooleanList; +typedef Firebird::HalfStaticArray BooleanList; struct IndexScratchSegment { @@ -610,7 +623,7 @@ struct IndexScratchSegment segmentScanType scanType = segmentScanNone; // scan type SSHORT scale = 0; // scale for SINT64/Int128-based segment of index - MatchedBooleanList matches; // matched booleans + BooleanList matches; // matched booleans }; struct IndexScratch @@ -631,7 +644,7 @@ struct IndexScratch bool useRootListScan = false; Firebird::ObjectsArray segments; - MatchedBooleanList matches; // matched booleans (partial indices only) + BooleanList matches; // matched booleans (partial indices only) }; typedef Firebird::ObjectsArray IndexScratchList; @@ -643,7 +656,7 @@ typedef Firebird::ObjectsArray IndexScratchList; struct InversionCandidate { explicit InversionCandidate(MemoryPool& p) - : matches(p), dbkeyRanges(p), dependentFromStreams(p) + : conjuncts(p), matches(p), dbkeyRanges(p), dependentFromStreams(p) {} double selectivity = MAXIMUM_SELECTIVITY; @@ -660,7 +673,8 @@ struct InversionCandidate bool unique = false; bool navigated = false; - MatchedBooleanList matches; + BooleanList conjuncts; // booleans referring our stream + BooleanList matches; // booleans matched to any index Firebird::Array dbkeyRanges; SortedStreamList dependentFromStreams; }; @@ -691,7 +705,7 @@ class Retrieval : private Firebird::PermanentStorage void analyzeNavigation(const InversionCandidateList& inversions); bool betterInversion(const InversionCandidate* inv1, const InversionCandidate* inv2, bool navigation) const; - bool checkIndexCondition(index_desc& idx, MatchedBooleanList& matches) const; + bool checkIndexCondition(index_desc& idx, BooleanList& matches) const; bool checkIndexExpression(const index_desc* idx, ValueExprNode* node) const; InversionNode* composeInversion(InversionNode* node1, InversionNode* node2, InversionNode::Type node_type) const; @@ -791,7 +805,7 @@ class InnerJoin : private Firebird::PermanentStorage { public: StreamInfo(MemoryPool& p, StreamType num) - : number(num), indexedRelationships(p) + : number(num), baseConjuncts(p), indexedRelationships(p) {} bool isIndependent() const @@ -838,6 +852,7 @@ class InnerJoin : private Firebird::PermanentStorage bool used = false; unsigned previousExpectedStreams = 0; + BooleanList baseConjuncts; IndexedRelationships indexedRelationships; }; diff --git a/src/jrd/optimizer/OuterJoin.cpp b/src/jrd/optimizer/OuterJoin.cpp index 876b7209eb8..14e6d7a2370 100644 --- a/src/jrd/optimizer/OuterJoin.cpp +++ b/src/jrd/optimizer/OuterJoin.cpp @@ -117,6 +117,11 @@ RecordSource* OuterJoin::generate() auto& outerStream = joinStreams[0]; auto& innerStream = joinStreams[1]; + // Collect the outer streams to be used in the full outer join algorithm + + StreamList checkStreams; + outerStream.rsb->findUsedStreams(checkStreams); + std::swap(outerStream, innerStream); // Reset both streams to their original states diff --git a/src/jrd/optimizer/Retrieval.cpp b/src/jrd/optimizer/Retrieval.cpp index ef6b11c6da5..4a0b0aaa04a 100644 --- a/src/jrd/optimizer/Retrieval.cpp +++ b/src/jrd/optimizer/Retrieval.cpp @@ -162,7 +162,7 @@ Retrieval::Retrieval(thread_db* aTdbb, Optimizer* opt, StreamType streamNumber, if (!tail->csb_idx) return; - MatchedBooleanList matches; + BooleanList matches; for (auto& index : *tail->csb_idx) { @@ -346,6 +346,12 @@ InversionCandidate* Retrieval::getInversion() { selectivity *= Optimizer::getSelectivity(*iter); } + + if (iter->computable(csb, INVALID_STREAM, false) && + iter->containsStream(stream)) + { + invCandidate->conjuncts.add(*iter); + } } } @@ -800,7 +806,7 @@ bool Retrieval::betterInversion(const InversionCandidate* inv1, return false; } -bool Retrieval::checkIndexCondition(index_desc& idx, MatchedBooleanList& matches) const +bool Retrieval::checkIndexCondition(index_desc& idx, BooleanList& matches) const { fb_assert(idx.idx_condition); @@ -911,7 +917,7 @@ void Retrieval::getInversionCandidates(InversionCandidateList& inversions, const double minSelectivity = MIN(MAXIMUM_SELECTIVITY / cardinality, DEFAULT_SELECTIVITY); // Walk through indexes to calculate selectivity / candidate - MatchedBooleanList matches; + BooleanList matches; for (auto& scratch : fromIndexScratches) { @@ -1448,7 +1454,7 @@ InversionCandidate* Retrieval::makeInversion(InversionCandidateList& inversions) } } - MatchedBooleanList matches; + BooleanList matches; if (navigationCandidate) { diff --git a/src/jrd/recsrc/AggregatedStream.cpp b/src/jrd/recsrc/AggregatedStream.cpp index 911b6c39c80..be3a9f507d4 100644 --- a/src/jrd/recsrc/AggregatedStream.cpp +++ b/src/jrd/recsrc/AggregatedStream.cpp @@ -136,6 +136,12 @@ void BaseAggWinStream::findUsedStreams(StreamList& streams, m_next->findUsedStreams(streams, true); } +template +bool BaseAggWinStream::isDependent(const StreamList& streams) const +{ + return m_next->isDependent(streams); +} + // Compute the next aggregated record of a value group. template bool BaseAggWinStream::evaluateGroup(thread_db* tdbb) const diff --git a/src/jrd/recsrc/BufferedStream.cpp b/src/jrd/recsrc/BufferedStream.cpp index 06b95c57f4c..0aef2874a8f 100644 --- a/src/jrd/recsrc/BufferedStream.cpp +++ b/src/jrd/recsrc/BufferedStream.cpp @@ -345,6 +345,11 @@ void BufferedStream::findUsedStreams(StreamList& streams, bool expandAll) const m_next->findUsedStreams(streams, expandAll); } +bool BufferedStream::isDependent(const StreamList& streams) const +{ + return m_next->isDependent(streams); +} + void BufferedStream::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); diff --git a/src/jrd/recsrc/ConditionalStream.cpp b/src/jrd/recsrc/ConditionalStream.cpp index 1c7d2236fd7..8a1cc48c7c9 100644 --- a/src/jrd/recsrc/ConditionalStream.cpp +++ b/src/jrd/recsrc/ConditionalStream.cpp @@ -157,6 +157,13 @@ void ConditionalStream::findUsedStreams(StreamList& streams, bool expandAll) con m_second->findUsedStreams(streams, expandAll); } +bool ConditionalStream::isDependent(const StreamList& streams) const +{ + return m_boolean->containsAnyStream(streams) || + m_first->isDependent(streams) || + m_second->isDependent(streams); +} + void ConditionalStream::invalidateRecords(Request* request) const { m_first->invalidateRecords(request); diff --git a/src/jrd/recsrc/FilteredStream.cpp b/src/jrd/recsrc/FilteredStream.cpp index d3fa6209fc4..744ebf484dd 100644 --- a/src/jrd/recsrc/FilteredStream.cpp +++ b/src/jrd/recsrc/FilteredStream.cpp @@ -150,6 +150,14 @@ void FilteredStream::findUsedStreams(StreamList& streams, bool expandAll) const m_next->findUsedStreams(streams, expandAll); } +bool FilteredStream::isDependent(const StreamList& streams) const +{ + if (m_anyBoolean && m_anyBoolean->containsAnyStream(streams)) + return true; + + return m_boolean->containsAnyStream(streams) || m_next->isDependent(streams); +} + void FilteredStream::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); diff --git a/src/jrd/recsrc/FirstRowsStream.cpp b/src/jrd/recsrc/FirstRowsStream.cpp index 2e3d81002ba..e811166450c 100644 --- a/src/jrd/recsrc/FirstRowsStream.cpp +++ b/src/jrd/recsrc/FirstRowsStream.cpp @@ -149,6 +149,11 @@ void FirstRowsStream::findUsedStreams(StreamList& streams, bool expandAll) const m_next->findUsedStreams(streams, expandAll); } +bool FirstRowsStream::isDependent(const StreamList& streams) const +{ + return m_value->containsAnyStream(streams) || m_next->isDependent(streams); +} + void FirstRowsStream::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); diff --git a/src/jrd/recsrc/FullOuterJoin.cpp b/src/jrd/recsrc/FullOuterJoin.cpp index d570fba14cc..ca62d6a5a52 100644 --- a/src/jrd/recsrc/FullOuterJoin.cpp +++ b/src/jrd/recsrc/FullOuterJoin.cpp @@ -172,6 +172,11 @@ void FullOuterJoin::findUsedStreams(StreamList& streams, bool expandAll) const m_arg2->findUsedStreams(streams, expandAll); } +bool FullOuterJoin::isDependent(const StreamList& streams) const +{ + return m_arg1->isDependent(streams) || m_arg2->isDependent(streams); +} + void FullOuterJoin::invalidateRecords(Request* request) const { m_arg1->invalidateRecords(request); diff --git a/src/jrd/recsrc/HashJoin.cpp b/src/jrd/recsrc/HashJoin.cpp index ebca83781b5..3ba9941c8ca 100644 --- a/src/jrd/recsrc/HashJoin.cpp +++ b/src/jrd/recsrc/HashJoin.cpp @@ -37,13 +37,15 @@ using namespace Firebird; using namespace Jrd; +//#define PRINT_HASH_TABLE + // ---------------------- // Data access: hash join // ---------------------- // NS: FIXME - Why use static hash table here??? Hash table shall support dynamic resizing static const ULONG HASH_SIZE = 1009; -static const ULONG BUCKET_PREALLOCATE_SIZE = 32; // 256 bytes per slot +static const ULONG BUCKET_PREALLOCATE_SIZE = 32; // 256 bytes per bucket unsigned HashJoin::maxCapacity() { @@ -92,6 +94,11 @@ class HashJoin::HashTable : public PermanentStorage m_collisions.sort(); } + ULONG getCount() const + { + return (ULONG) m_collisions.getCount(); + } + void add(ULONG hash, ULONG position) { m_collisions.add(Entry(hash, position)); @@ -200,13 +207,38 @@ class HashJoin::HashTable : public PermanentStorage void sort() { + for (ULONG i = 0; i < m_streamCount * m_tableSize; i++) + { + if (const auto collisions = m_collisions[i]) + collisions->sort(); + } + +#ifdef PRINT_HASH_TABLE + FB_UINT64 total = 0; + ULONG min = MAX_ULONG, max = 0, count = 0; + for (ULONG i = 0; i < m_streamCount * m_tableSize; i++) { CollisionList* const collisions = m_collisions[i]; + if (!collisions) + continue; - if (collisions) - collisions->sort(); + const auto cnt = collisions->getCount(); + + if (cnt < min) + min = cnt; + if (cnt > max) + max = cnt; + total += cnt; + count++; } + + if (count) + { + printf("Hash table size %u, count %u, buckets %u, min %u, max %u, avg %u\n", + m_tableSize, (ULONG) total, count, min, max, (ULONG) (total / count)); + } +#endif } private: @@ -217,14 +249,35 @@ class HashJoin::HashTable : public PermanentStorage }; -HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count, - RecordSource* const* args, NestValueArray* const* keys, +HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, JoinType joinType, + FB_SIZE_T count, RecordSource* const* args, NestValueArray* const* keys, double selectivity) : RecordSource(csb), + m_joinType(joinType), + m_boolean(nullptr), m_args(csb->csb_pool, count - 1) { fb_assert(count >= 2); + init(tdbb, csb, count, args, keys, selectivity); +} + +HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, + BoolExprNode* boolean, + RecordSource* const* args, NestValueArray* const* keys, + double selectivity) + : RecordSource(csb), + m_joinType(OUTER_JOIN), + m_boolean(boolean), + m_args(csb->csb_pool, 1) +{ + init(tdbb, csb, 2, args, keys, selectivity); +} + +void HashJoin::init(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count, + RecordSource* const* args, NestValueArray* const* keys, + double selectivity) +{ m_impure = csb->allocImpure(); m_leader.source = args[0]; @@ -360,6 +413,8 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const if (!(impure->irsb_flags & irsb_open)) return false; + const auto inner = m_args.front().source; + while (true) { if (impure->irsb_flags & irsb_mustread) @@ -369,6 +424,14 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const if (!m_leader.source->getRecord(tdbb)) return false; + if (m_boolean && !m_boolean->execute(tdbb, request)) + { + // The boolean pertaining to the left sub-stream is false + // so just join sub-stream to a null valued right sub-stream + inner->nullRecords(tdbb); + return true; + } + // We have something to join with, so ensure the hash table is initialized if (!impure->irsb_hash_table && !impure->irsb_leader_buffer) @@ -410,7 +473,15 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const // Setup the hash table for the iteration through collisions. if (!impure->irsb_hash_table->setup(impure->irsb_leader_hash)) - continue; + { + if (m_joinType == INNER_JOIN || m_joinType == SEMI_JOIN) + continue; + + if (m_joinType == OUTER_JOIN) + inner->nullRecords(tdbb); + + return true; + } impure->irsb_flags &= ~irsb_mustread; impure->irsb_flags |= irsb_first; @@ -434,13 +505,29 @@ bool HashJoin::internalGetRecord(thread_db* tdbb) const if (!found) { impure->irsb_flags |= irsb_mustread; - continue; + + if (m_joinType == INNER_JOIN || m_joinType == SEMI_JOIN) + continue; + + if (m_joinType == OUTER_JOIN) + inner->nullRecords(tdbb); + + break; + } + + if (m_joinType == SEMI_JOIN || m_joinType == ANTI_JOIN) + { + impure->irsb_flags |= irsb_mustread; + + if (m_joinType == ANTI_JOIN) + continue; } impure->irsb_flags &= ~irsb_first; } else if (!fetchRecord(tdbb, impure, m_args.getCount() - 1)) { + fb_assert(m_joinType == INNER_JOIN); impure->irsb_flags |= irsb_mustread; continue; } @@ -481,11 +568,36 @@ void HashJoin::internalGetPlan(thread_db* tdbb, PlanEntry& planEntry, unsigned l { planEntry.className = "HashJoin"; + planEntry.lines.add().text = "Hash Join "; + + switch (m_joinType) + { + case INNER_JOIN: + planEntry.lines.back().text += "(inner)"; + break; + + case OUTER_JOIN: + planEntry.lines.back().text += "(outer)"; + break; + + case SEMI_JOIN: + planEntry.lines.back().text += "(semi)"; + break; + + case ANTI_JOIN: + planEntry.lines.back().text += "(anti)"; + break; + + default: + fb_assert(false); + } + string extras; extras.printf(" (keys: %" ULONGFORMAT", total key length: %" ULONGFORMAT")", m_leader.keys->getCount(), m_leader.totalKeyLength); - planEntry.lines.add().text = "Hash Join (inner)" + extras; + planEntry.lines.back().text += extras; + printOptInfo(planEntry.lines); if (recurse) @@ -503,32 +615,46 @@ void HashJoin::markRecursive() { m_leader.source->markRecursive(); - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i].source->markRecursive(); + for (const auto& arg : m_args) + arg.source->markRecursive(); } void HashJoin::findUsedStreams(StreamList& streams, bool expandAll) const { m_leader.source->findUsedStreams(streams, expandAll); - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i].source->findUsedStreams(streams, expandAll); + for (const auto& arg : m_args) + arg.source->findUsedStreams(streams, expandAll); +} + +bool HashJoin::isDependent(const StreamList& streams) const +{ + if (m_leader.source->isDependent(streams)) + return true; + + for (const auto& arg : m_args) + { + if (arg.source->isDependent(streams)) + return true; + } + + return (m_boolean && m_boolean->containsAnyStream(streams)); } void HashJoin::invalidateRecords(Request* request) const { m_leader.source->invalidateRecords(request); - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i].source->invalidateRecords(request); + for (const auto& arg : m_args) + arg.source->invalidateRecords(request); } void HashJoin::nullRecords(thread_db* tdbb) const { m_leader.source->nullRecords(tdbb); - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i].source->nullRecords(tdbb); + for (const auto& arg : m_args) + arg.source->nullRecords(tdbb); } ULONG HashJoin::computeHash(thread_db* tdbb, @@ -625,6 +751,9 @@ bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) co return true; } + if (m_joinType == SEMI_JOIN || m_joinType == ANTI_JOIN) + return false; + while (true) { if (stream == 0 || !fetchRecord(tdbb, impure, stream - 1)) diff --git a/src/jrd/recsrc/LockedStream.cpp b/src/jrd/recsrc/LockedStream.cpp index e62da5c2aa6..cdcfe8f3556 100644 --- a/src/jrd/recsrc/LockedStream.cpp +++ b/src/jrd/recsrc/LockedStream.cpp @@ -139,6 +139,11 @@ void LockedStream::findUsedStreams(StreamList& streams, bool expandAll) const m_next->findUsedStreams(streams, expandAll); } +bool LockedStream::isDependent(const StreamList& streams) const +{ + return m_next->isDependent(streams); +} + void LockedStream::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); diff --git a/src/jrd/recsrc/MergeJoin.cpp b/src/jrd/recsrc/MergeJoin.cpp index 2c0ee1c7129..de3bf491a6d 100644 --- a/src/jrd/recsrc/MergeJoin.cpp +++ b/src/jrd/recsrc/MergeJoin.cpp @@ -381,26 +381,37 @@ void MergeJoin::internalGetPlan(thread_db* tdbb, PlanEntry& planEntry, unsigned void MergeJoin::markRecursive() { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->markRecursive(); + for (auto arg : m_args) + arg->markRecursive(); } void MergeJoin::findUsedStreams(StreamList& streams, bool expandAll) const { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->findUsedStreams(streams, expandAll); + for (const auto arg : m_args) + arg->findUsedStreams(streams, expandAll); +} + +bool MergeJoin::isDependent(const StreamList& streams) const +{ + for (const auto arg : m_args) + { + if (arg->isDependent(streams)) + return true; + } + + return false; } void MergeJoin::invalidateRecords(Request* request) const { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->invalidateRecords(request); + for (const auto arg : m_args) + arg->invalidateRecords(request); } void MergeJoin::nullRecords(thread_db* tdbb) const { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->nullRecords(tdbb); + for (const auto arg : m_args) + arg->nullRecords(tdbb); } int MergeJoin::compare(thread_db* tdbb, const NestValueArray* node1, diff --git a/src/jrd/recsrc/NestedLoopJoin.cpp b/src/jrd/recsrc/NestedLoopJoin.cpp index 9380d6a1d0c..484cf54d0eb 100644 --- a/src/jrd/recsrc/NestedLoopJoin.cpp +++ b/src/jrd/recsrc/NestedLoopJoin.cpp @@ -35,20 +35,21 @@ using namespace Jrd; // Data access: nested loops join // ------------------------------ -NestedLoopJoin::NestedLoopJoin(CompilerScratch* csb, FB_SIZE_T count, RecordSource* const* args) +NestedLoopJoin::NestedLoopJoin(CompilerScratch* csb, + FB_SIZE_T count, + RecordSource* const* args, + JoinType joinType) : RecordSource(csb), - m_joinType(INNER_JOIN), - m_args(csb->csb_pool), - m_boolean(NULL) + m_joinType(joinType), + m_boolean(nullptr), + m_args(csb->csb_pool, count) { m_impure = csb->allocImpure(); m_cardinality = MINIMUM_CARDINALITY; - m_args.resize(count); - for (FB_SIZE_T i = 0; i < count; i++) { - m_args[i] = args[i]; + m_args.add(args[i]); m_cardinality *= args[i]->getCardinality(); } } @@ -58,8 +59,8 @@ NestedLoopJoin::NestedLoopJoin(CompilerScratch* csb, BoolExprNode* boolean) : RecordSource(csb), m_joinType(OUTER_JOIN), - m_args(csb->csb_pool), - m_boolean(boolean) + m_boolean(boolean), + m_args(csb->csb_pool, 2) { fb_assert(outer && inner); @@ -91,8 +92,8 @@ void NestedLoopJoin::close(thread_db* tdbb) const { impure->irsb_flags &= ~irsb_open; - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->close(tdbb); + for (const auto arg : m_args) + arg->close(tdbb); } } @@ -128,12 +129,70 @@ bool NestedLoopJoin::internalGetRecord(thread_db* tdbb) const else if (!fetchRecord(tdbb, m_args.getCount() - 1)) return false; } + else if (m_joinType == SEMI_JOIN || m_joinType == ANTI_JOIN) + { + const auto outer = m_args[0]; + + if (impure->irsb_flags & irsb_first) + { + outer->open(tdbb); + + impure->irsb_flags &= ~irsb_first; + } + + while (true) + { + if (impure->irsb_flags & irsb_joined) + { + for (FB_SIZE_T i = 1; i < m_args.getCount(); i++) + m_args[i]->close(tdbb); + + impure->irsb_flags &= ~irsb_joined; + } + + if (!outer->getRecord(tdbb)) + return false; + + FB_SIZE_T stopArg = 0; + + for (FB_SIZE_T i = 1; i < m_args.getCount(); i++) + { + m_args[i]->open(tdbb); + + if (m_args[i]->getRecord(tdbb)) + { + if (m_joinType == ANTI_JOIN) + { + stopArg = i; + break; + } + } + else + { + if (m_joinType == SEMI_JOIN) + { + stopArg = i; + break; + } + } + } + + if (!stopArg) + break; + + for (FB_SIZE_T i = 1; i <= stopArg; i++) + m_args[i]->close(tdbb); + } + + impure->irsb_flags |= irsb_joined; + } else { + fb_assert(m_joinType == OUTER_JOIN); fb_assert(m_args.getCount() == 2); - const RecordSource* const outer = m_args[0]; - const RecordSource* const inner = m_args[1]; + const auto outer = m_args[0]; + const auto inner = m_args[1]; if (impure->irsb_flags & irsb_first) { @@ -160,27 +219,10 @@ bool NestedLoopJoin::internalGetRecord(thread_db* tdbb) const inner->open(tdbb); } - if (m_joinType == SEMI_JOIN) - { - if (inner->getRecord(tdbb)) - impure->irsb_flags &= ~irsb_joined; - else - impure->irsb_flags |= irsb_joined; - } - else if (m_joinType == ANTI_JOIN) - { - if (inner->getRecord(tdbb)) - impure->irsb_flags |= irsb_joined; - else - impure->irsb_flags &= ~irsb_joined; - } - else + if (inner->getRecord(tdbb)) { - if (inner->getRecord(tdbb)) - { - impure->irsb_flags |= irsb_joined; - return true; - } + impure->irsb_flags |= irsb_joined; + return true; } inner->close(tdbb); @@ -267,26 +309,37 @@ void NestedLoopJoin::internalGetPlan(thread_db* tdbb, PlanEntry& planEntry, unsi void NestedLoopJoin::markRecursive() { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->markRecursive(); + for (auto arg : m_args) + arg->markRecursive(); } void NestedLoopJoin::findUsedStreams(StreamList& streams, bool expandAll) const { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->findUsedStreams(streams, expandAll); + for (const auto arg : m_args) + arg->findUsedStreams(streams, expandAll); +} + +bool NestedLoopJoin::isDependent(const StreamList& streams) const +{ + for (const auto arg : m_args) + { + if (arg->isDependent(streams)) + return true; + } + + return (m_boolean && m_boolean->containsAnyStream(streams)); } void NestedLoopJoin::invalidateRecords(Request* request) const { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->invalidateRecords(request); + for (const auto arg : m_args) + arg->invalidateRecords(request); } void NestedLoopJoin::nullRecords(thread_db* tdbb) const { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->nullRecords(tdbb); + for (const auto arg : m_args) + arg->nullRecords(tdbb); } bool NestedLoopJoin::fetchRecord(thread_db* tdbb, FB_SIZE_T n) const diff --git a/src/jrd/recsrc/ProcedureScan.cpp b/src/jrd/recsrc/ProcedureScan.cpp index a351d4a99d9..e098512c348 100644 --- a/src/jrd/recsrc/ProcedureScan.cpp +++ b/src/jrd/recsrc/ProcedureScan.cpp @@ -249,6 +249,12 @@ WriteLockResult ProcedureScan::lockRecord(thread_db* /*tdbb*/) const status_exception::raise(Arg::Gds(isc_record_lock_not_supp)); } +bool ProcedureScan::isDependent(const StreamList& streams) const +{ + return (m_sourceList && m_sourceList->containsAnyStream(streams)) || + (m_targetList && m_targetList->containsAnyStream(streams)); +} + void ProcedureScan::getLegacyPlan(thread_db* tdbb, string& plan, unsigned level) const { if (!level) diff --git a/src/jrd/recsrc/RecordSource.h b/src/jrd/recsrc/RecordSource.h index c6a1b7bf023..c734f3c88ff 100644 --- a/src/jrd/recsrc/RecordSource.h +++ b/src/jrd/recsrc/RecordSource.h @@ -147,6 +147,7 @@ namespace Jrd virtual void invalidateRecords(Request* request) const = 0; virtual void findUsedStreams(StreamList& streams, bool expandAll = false) const = 0; + virtual bool isDependent(const StreamList& streams) const = 0; virtual void nullRecords(thread_db* tdbb) const = 0; virtual void setAnyBoolean(BoolExprNode* /*anyBoolean*/, bool /*ansiAny*/, bool /*ansiNot*/) @@ -217,6 +218,11 @@ namespace Jrd void findUsedStreams(StreamList& streams, bool expandAll = false) const override; void nullRecords(thread_db* tdbb) const override; + bool isDependent(const StreamList& /*streams*/) const override + { + return false; + } + protected: const StreamType m_stream; const Format* const m_format; @@ -416,6 +422,7 @@ namespace Jrd bool refetchRecord(thread_db* tdbb) const override; WriteLockResult lockRecord(thread_db* tdbb) const override; + bool isDependent(const StreamList& streams) const override; void getLegacyPlan(thread_db* tdbb, Firebird::string& plan, unsigned level) const override; @@ -454,6 +461,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: @@ -484,6 +492,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: @@ -516,6 +525,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; void setAnyBoolean(BoolExprNode* anyBoolean, bool ansiAny, bool ansiNot) override @@ -554,6 +564,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; void setAnyBoolean(BoolExprNode* anyBoolean, bool ansiAny, bool ansiNot) override @@ -588,6 +599,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; void setAnyBoolean(BoolExprNode* anyBoolean, bool ansiAny, bool ansiNot) override @@ -707,6 +719,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; void setAnyBoolean(BoolExprNode* anyBoolean, bool ansiAny, bool ansiNot) override @@ -864,6 +877,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; protected: void internalOpen(thread_db* tdbb) const override; @@ -1002,6 +1016,7 @@ namespace Jrd void getLegacyPlan(thread_db* tdbb, Firebird::string& plan, unsigned level) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: @@ -1047,6 +1062,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: @@ -1113,6 +1129,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; void locate(thread_db* tdbb, FB_UINT64 position) const override; @@ -1140,7 +1157,8 @@ namespace Jrd class NestedLoopJoin : public RecordSource { public: - NestedLoopJoin(CompilerScratch* csb, FB_SIZE_T count, RecordSource* const* args); + NestedLoopJoin(CompilerScratch* csb, FB_SIZE_T count, RecordSource* const* args, + JoinType joinType = INNER_JOIN); NestedLoopJoin(CompilerScratch* csb, RecordSource* outer, RecordSource* inner, BoolExprNode* boolean); @@ -1155,6 +1173,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: @@ -1166,8 +1185,9 @@ namespace Jrd bool fetchRecord(thread_db*, FB_SIZE_T) const; const JoinType m_joinType; + const NestConst m_boolean; + Firebird::Array > m_args; - NestConst const m_boolean; }; class FullOuterJoin : public RecordSource @@ -1187,6 +1207,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: @@ -1225,7 +1246,11 @@ namespace Jrd }; public: - HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count, + HashJoin(thread_db* tdbb, CompilerScratch* csb, JoinType joinType, + FB_SIZE_T count, RecordSource* const* args, NestValueArray* const* keys, + double selectivity = 0); + HashJoin(thread_db* tdbb, CompilerScratch* csb, + BoolExprNode* boolean, RecordSource* const* args, NestValueArray* const* keys, double selectivity = 0); @@ -1240,6 +1265,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; static unsigned maxCapacity(); @@ -1250,10 +1276,16 @@ namespace Jrd bool internalGetRecord(thread_db* tdbb) const override; private: + void init(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count, + RecordSource* const* args, NestValueArray* const* keys, + double selectivity); ULONG computeHash(thread_db* tdbb, Request* request, const SubStream& sub, UCHAR* buffer) const; bool fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) const; + const JoinType m_joinType; + const NestConst m_boolean; + SubStream m_leader; Firebird::Array m_args; }; @@ -1304,6 +1336,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: @@ -1365,6 +1398,7 @@ namespace Jrd void markRecursive() override; void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; protected: void internalGetPlan(thread_db* tdbb, PlanEntry& planEntry, unsigned level, bool recurse) const override; @@ -1408,6 +1442,7 @@ namespace Jrd void markRecursive() override; void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; protected: void internalGetPlan(thread_db* tdbb, PlanEntry& planEntry, unsigned level, bool recurse) const override; @@ -1449,6 +1484,7 @@ namespace Jrd void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll = false) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; protected: diff --git a/src/jrd/recsrc/RecursiveStream.cpp b/src/jrd/recsrc/RecursiveStream.cpp index 265c8bd3ae9..a70a0d64016 100644 --- a/src/jrd/recsrc/RecursiveStream.cpp +++ b/src/jrd/recsrc/RecursiveStream.cpp @@ -81,11 +81,8 @@ void RecursiveStream::internalOpen(thread_db* tdbb) const // Initialize the record number of each stream in the union - for (FB_SIZE_T i = 0; i < m_innerStreams.getCount(); i++) - { - const StreamType stream = m_innerStreams[i]; + for (const auto stream : m_innerStreams) request->req_rpb[stream].rpb_number.setValue(BOF_NUMBER); - } m_root->open(tdbb); } @@ -294,6 +291,11 @@ void RecursiveStream::findUsedStreams(StreamList& streams, bool expandAll) const } } +bool RecursiveStream::isDependent(const StreamList& streams) const +{ + return m_root->isDependent(streams) || m_inner->isDependent(streams); +} + void RecursiveStream::cleanupLevel(Request* request, Impure* impure) const { Impure* const saveImpure = request->getImpure(m_saveOffset); @@ -305,9 +307,8 @@ void RecursiveStream::cleanupLevel(Request* request, Impure* impure) const const UCHAR* p = tmp + m_saveSize; - for (FB_SIZE_T i = 0; i < m_innerStreams.getCount(); i++) + for (const auto stream : m_innerStreams) { - const StreamType stream = m_innerStreams[i]; record_param* const rpb = &request->req_rpb[stream]; Record* const tempRecord = rpb->rpb_record; memmove(rpb, p, sizeof(record_param)); diff --git a/src/jrd/recsrc/SingularStream.cpp b/src/jrd/recsrc/SingularStream.cpp index 2e205071946..c5b961afff1 100644 --- a/src/jrd/recsrc/SingularStream.cpp +++ b/src/jrd/recsrc/SingularStream.cpp @@ -175,6 +175,11 @@ void SingularStream::findUsedStreams(StreamList& streams, bool expandAll) const m_next->findUsedStreams(streams, expandAll); } +bool SingularStream::isDependent(const StreamList& streams) const +{ + return m_next->isDependent(streams); +} + void SingularStream::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); diff --git a/src/jrd/recsrc/SkipRowsStream.cpp b/src/jrd/recsrc/SkipRowsStream.cpp index 7d472303591..2f20cf76729 100644 --- a/src/jrd/recsrc/SkipRowsStream.cpp +++ b/src/jrd/recsrc/SkipRowsStream.cpp @@ -145,6 +145,11 @@ void SkipRowsStream::findUsedStreams(StreamList& streams, bool expandAll) const m_next->findUsedStreams(streams, expandAll); } +bool SkipRowsStream::isDependent(const StreamList& streams) const +{ + return m_value->containsAnyStream(streams) || m_next->isDependent(streams); +} + void SkipRowsStream::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); diff --git a/src/jrd/recsrc/SortedStream.cpp b/src/jrd/recsrc/SortedStream.cpp index 4576f9cdbe1..911602b4f37 100644 --- a/src/jrd/recsrc/SortedStream.cpp +++ b/src/jrd/recsrc/SortedStream.cpp @@ -170,6 +170,11 @@ void SortedStream::findUsedStreams(StreamList& streams, bool expandAll) const m_next->findUsedStreams(streams, expandAll); } +bool SortedStream::isDependent(const StreamList& streams) const +{ + return m_next->isDependent(streams); +} + void SortedStream::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); diff --git a/src/jrd/recsrc/Union.cpp b/src/jrd/recsrc/Union.cpp index 225fcabfc25..c7fbb29ac6c 100644 --- a/src/jrd/recsrc/Union.cpp +++ b/src/jrd/recsrc/Union.cpp @@ -36,7 +36,7 @@ using namespace Jrd; Union::Union(CompilerScratch* csb, StreamType stream, FB_SIZE_T argCount, RecordSource* const* args, NestConst* maps, const StreamList& streams) - : RecordStream(csb, stream), m_args(csb->csb_pool), m_maps(csb->csb_pool), + : RecordStream(csb, stream), m_args(csb->csb_pool, argCount), m_maps(csb->csb_pool, argCount), m_streams(csb->csb_pool, streams) { fb_assert(argCount); @@ -44,17 +44,14 @@ Union::Union(CompilerScratch* csb, StreamType stream, m_impure = csb->allocImpure(); m_args.resize(argCount); + m_maps.resize(argCount); for (FB_SIZE_T i = 0; i < argCount; i++) { m_args[i] = args[i]; m_cardinality += args[i]->getCardinality(); - } - - m_maps.resize(argCount); - - for (FB_SIZE_T i = 0; i < argCount; i++) m_maps[i] = maps[i]; + } } void Union::internalOpen(thread_db* tdbb) const @@ -69,11 +66,8 @@ void Union::internalOpen(thread_db* tdbb) const // Initialize the record number of each stream in the union - for (FB_SIZE_T i = 0; i < m_streams.getCount(); i++) - { - const StreamType stream = m_streams[i]; + for (const auto stream : m_streams) request->req_rpb[stream].rpb_number.setValue(BOF_NUMBER); - } m_args[impure->irsb_count]->open(tdbb); } @@ -197,14 +191,14 @@ void Union::internalGetPlan(thread_db* tdbb, PlanEntry& planEntry, unsigned leve void Union::markRecursive() { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->markRecursive(); + for (auto arg : m_args) + arg->markRecursive(); } void Union::invalidateRecords(Request* request) const { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->invalidateRecords(request); + for (const auto arg : m_args) + arg->invalidateRecords(request); } void Union::findUsedStreams(StreamList& streams, bool expandAll) const @@ -213,7 +207,18 @@ void Union::findUsedStreams(StreamList& streams, bool expandAll) const if (expandAll) { - for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) - m_args[i]->findUsedStreams(streams, true); + for (const auto arg : m_args) + arg->findUsedStreams(streams, true); } } + +bool Union::isDependent(const StreamList& streams) const +{ + for (const auto arg : m_args) + { + if (arg->isDependent(streams)) + return true; + } + + return false; +} diff --git a/src/jrd/recsrc/WindowedStream.cpp b/src/jrd/recsrc/WindowedStream.cpp index 0279223dc79..9f8bf9b1a11 100644 --- a/src/jrd/recsrc/WindowedStream.cpp +++ b/src/jrd/recsrc/WindowedStream.cpp @@ -63,6 +63,7 @@ namespace void invalidateRecords(Request* request) const override; void findUsedStreams(StreamList& streams, bool expandAll) const override; + bool isDependent(const StreamList& streams) const override; void nullRecords(thread_db* tdbb) const override; void locate(thread_db* tdbb, FB_UINT64 position) const override @@ -177,6 +178,11 @@ namespace m_next->findUsedStreams(streams, expandAll); } + bool BufferedStreamWindow::isDependent(const StreamList& streams) const + { + return m_next->isDependent(streams); + } + void BufferedStreamWindow::invalidateRecords(Request* request) const { m_next->invalidateRecords(request); @@ -441,6 +447,11 @@ void WindowedStream::findUsedStreams(StreamList& streams, bool expandAll) const m_joinedStream->findUsedStreams(streams, expandAll); } +bool WindowedStream::isDependent(const StreamList& streams) const +{ + return m_joinedStream->isDependent(streams); +} + void WindowedStream::nullRecords(thread_db* tdbb) const { m_joinedStream->nullRecords(tdbb); @@ -931,6 +942,11 @@ void WindowedStream::WindowStream::findUsedStreams(StreamList& streams, bool exp m_next->findUsedStreams(streams, expandAll); } +bool WindowedStream::WindowStream::isDependent(const StreamList& streams) const +{ + return m_next->isDependent(streams); +} + void WindowedStream::WindowStream::nullRecords(thread_db* tdbb) const { BaseAggWinStream::nullRecords(tdbb);