Skip to content

Conversation

GalLalouche
Copy link
Contributor

@GalLalouche GalLalouche commented Aug 12, 2025

This PR adds a late(r) materialization for TopN queries, such that the materialization happes in the "node_reduce" phase instead of during the "data" phase.

For example, if the limit is 20, and each data node spawns 10 workers, we would only read 20 additional columns (i.e., ones not needed for the TopN) filters, instead of 200. To support this, the reducer node maintains a global list of all shard contexts used by its individual data workers (although some of those might be closed if they are no longer needed, thanks to #129454).

There is some additional book-keeping involved, since previously, every data node held a local list of shard contexts, and used its local indices to access it. To avoid changing too much (this local-index logic is spread throughout much of the code!), a new global index is introduced, which replaces the local index after all the rows are merged together in the reduce phase's TopN.

@GalLalouche GalLalouche requested a review from nik9000 August 19, 2025 13:24
Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya, I'm working my way through the changes, focusing mostly on the planning aspect.

Do we have at least some smoke tests that confirm that, with the pragma disabled, our reduction and data node plans remain the same as before this PR, resp., remain correct? That'd be fantastic, because the changes to the reduction planning are substantial enough that it's hard to confirm that they're safe by review alone. (Of course, our ITs do cover a lot of ground, so there's no reason to be overly paranoid, but still.)


public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);

public static final Setting<Boolean> REDUCTION_LATE_MATERIALIZATION = Setting.boolSetting("reduction_late_materialization", false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a pragma or in PlannerSettings. In PlannerSettings it's in the cluster level settings and we can disable it in serverless and anyone with their own cluster could disable it. Rather than require it on every request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to PlannerSettings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GalLalouche and I came up with the pragma to have a way to hide the feature until we're ready to switch it on in a separate PR.

Do we need a setting for this? For the purpose stated above, it'd even suffice to enable the behavior only in SNAPSHOT builds, in principle.

Of course, if we find it useful to keep this setting around, it's fine this way.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few final comments. I think we're good except for some extra logging questions. We have to be careful not to log a zillion times on each failure or we won't be able to hear ourselves think when production has problems. OTOH, I haven't traced to be sure that the logs are bad or wrong - just that their level should be lowered and we should double check if we want to keep them.

I'll give this another read soon to be sure, but it looks right to me.


public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);

public static final Setting<Boolean> REDUCTION_LATE_MATERIALIZATION = Setting.boolSetting("reduction_late_materialization", false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a pragma or in PlannerSettings. In PlannerSettings it's in the cluster level settings and we can disable it in serverless and anyone with their own cluster could disable it. Rather than require it on every request.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last tranche of reviews. Its all "javadoc please" and "change the logging level" stuff.

For the log things, I think the thing that'd make me most comfortable is to make them all debug and then make followup PRs to change their level where we can talk about that.

@GalLalouche
Copy link
Contributor Author

GalLalouche commented Oct 5, 2025

Do we have at least some smoke tests that confirm that, with the pragma disabled, our reduction and data node plans remain the same as before this PR, resp., remain correct?

@alex-spies There is a test in EsqlActionTaskIT that checks the plans with the feature turned on and off.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass, close to finishing the whole review.


public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);

public static final Setting<Boolean> REDUCTION_LATE_MATERIALIZATION = Setting.boolSetting("reduction_late_materialization", false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GalLalouche and I came up with the pragma to have a way to hide the feature until we're ready to switch it on in a separate PR.

Do we need a setting for this? For the purpose stated above, it'd even suffice to enable the behavior only in SNAPSHOT builds, in principle.

Of course, if we find it useful to keep this setting around, it's fine this way.

boolean splitTopN
) {
PhysicalPlan source = new ExchangeSourceExec(originalPlan.source(), originalPlan.output(), originalPlan.isIntermediateAgg());
if (splitTopN == false && runNodeLevelReduction == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't that just be

Suggested change
if (splitTopN == false && runNodeLevelReduction == false) {
if (runNodeLevelReduction == false) {

?

I guess the case where runNodeLevelReduction == false and splitTopN == true is invalid to begin with, but if we were to end up in this case, the current code will happily apply a node-level reduction for TopN even when it's disabled per runNodeLevelReduction, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the case where runNodeLevelReduction == false and splitTopN == true is invalid to begin with

Not really, these are two separate features, at least the way it's designed. We can make one dependent on the other, of course, but do we want to (this was actually I implemented it before adding the query pragma/planner setting, and it also made the actual handling of these flags more annoying due to the way runNodeLevelReduction is modified 😕).

but if we were to end up in this case, the current code will happily apply a node-level reduction for TopN even when it's disabled per runNodeLevelReduction, no?

Which is fine, since it's two different features.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I ended up replacing this with a snapshot check, and this will depend on runNodeLevelReduction.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nearly there!

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, all done now.

From a query planning POV, this is A-OK. Thanks a lot for the iterations! I'm much more confident that this will hold up as the planner evolves and is much more defensive than it originally was. Really nice.

I left some comments, most of which are rather minor. Please consider this unblocked from my end, but it'd be nice if we could do the following (of course, in follow-ups where it would increase the scope too much):

  • Double check if we really need a new setting (#132757 (comment)); let's discuss this offline.
  • Add more tests: it'd be nice to have some optimizer tests for the reduction plan + data plan together, as currently our optimizer tests don't account for node-level reduction at all.
  • Clarify the concurrency and ownership of the ComputeSearchContexts (#132757 (comment) and #132757 (comment)). I understand these are shared between data and reduce drivers, but if we can't decRef contexts when running the reduce driver, I think we didn't properly take ownership of the reference to begin with and this might become quite complicated going forward.
  • Double check + test if we correctly perform the row estimation here

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with it. Let's get Alex's last few comments solved and bring this thing in for a landing. We should get some rally benchmarks out of this. It's been a lot of work. We might get this for free from the nightly, but once you are good and ready to click that merge button I think you should try and find the rally tracks that we run that'll benefit from this. So we can watch them.

Copy link
Contributor Author

@GalLalouche GalLalouche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great in depth review, @alex-spies! I've addressed all things, although there is still the issue of of estimateRowSize which is waiting for @nik9000's feedback, and the questions of the dependence between the feature flags (runOnNodeReduce and reduceLateMaterialization or whatever we call it).

);
for (String q : queries) {
QueryPragmas pragmas = randomPragmas();
var pragmas = randomPragmas();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 FYI. I remember we discussed this, though I don't remember the exact solution we agreed on (if we did).


FROM dense_vector
| EVAL k = v_l2_norm(bit_vector, [1]) // workaround to enable fetching dense_vector
| EVAL k = v_l2_norm(bit_vector, [1,2]) // workaround to enable fetching dense_vector
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #136365.

@GalLalouche GalLalouche enabled auto-merge (squash) October 12, 2025 11:34
@alex-spies alex-spies self-assigned this Oct 13, 2025
@GalLalouche GalLalouche merged commit 0a7d113 into elastic:main Oct 13, 2025
34 checks passed
georgewallace pushed a commit to georgewallace/elasticsearch that referenced this pull request Oct 13, 2025
This PR adds a late(r) materialization for TopN queries, such that the materialization happes in the "node_reduce" phase instead of during the "data" phase.

For example, if the limit is 20, and each data node spawns 10 workers, we would only read 20 additional columns (i.e., ones not needed for the TopN) filters, instead of 200. To support this, the reducer node maintains a global list of all shard contexts used by its individual data workers (although some of those might be closed if they are no longer needed, thanks to elastic#129454).

There is some additional book-keeping involved, since previously, every data node held a local list of shard contexts, and used its local indices to access it. To avoid changing too much (this local-index logic is spread throughout much of the code!), a new global index is introduced, which replaces the local index after all the rows are merged together in the reduce phase's TopN.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants