Skip to content

Conversation

@pawankartik-elastic
Copy link
Contributor

@pawankartik-elastic pawankartik-elastic commented Oct 21, 2025

This PR enables CPS for both _search and _async_search when ?ccs_minimize_roundtrips=false.

TODO:

  • Reconcile the Clusters count after shard responses are pieced back.
  • Ensure CI passes with prelim changes,
  • Link with Serverless tests,
  • Review.

Notes to myself:

  • Perhaps, could swap out the transport version for a newer dedicated one?
    • Yes, I've swapped out the transport version to use a dedicated newer one.

@elasticsearchmachine elasticsearchmachine added the serverless-linked Added by automation, don't add manually label Oct 22, 2025
searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
SearchResponse.Clusters participatingProjects = clusters;
if (resolvesCrossProject && rewritten.getResolvedIndexExpressions() != null) {
participatingProjects = reconcileProjects(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 other ways to potentially do the same thing:

  1. Make SearchResponse.Cluster's members mutable, i.e. non-final and keep modifying them on the fly, or,
  2. Do the reconciliation at the end. This would mean sprinkling CPS-specific code in the progress listeners or *Task.

Making members mutable to accommodate something doesn't feel right to me, and IIRC, we had a brief discussion earlier this year where a point was made that the progress listener is already complex enough due to the more and more code leaking into it (this was back when I was fixing the NPE bug in the async search). This approach seems simpler, IMO.

@pawankartik-elastic pawankartik-elastic added >refactoring Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch :Search Foundations/CCS labels Oct 24, 2025
@pawankartik-elastic pawankartik-elastic marked this pull request as ready for review October 24, 2025 17:07
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search-foundations (Team:Search Foundations)

Copy link
Contributor

@quux00 quux00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial first pass review. (Didn't have time to review everything yet)
Submitting my initial comments for now.
Overall this is looking good!


// Same as we do in stateful right now.
if (linkedProjectsWithResponses.isEmpty()) {
return SearchResponse.Clusters.EMPTY;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to check whether there are any origin project indices to search before returning EMPTY here? Or does shardResponses also include the origin project?

Copy link
Contributor Author

@pawankartik-elastic pawankartik-elastic Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, shardResponses only contains responses from linked projects. I'm not checking for the origin project here because we don't create a Clusters object if the search effectively becomes a local search (irrespective of whether we find local shards later on), since we don't have to display the metadata in the response. Here's the same behaviour for stateful currently on main:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, makes sense. Let's add a code comment stating that, since in this separate method the context is not immediately clear.

expr -> expr.localExpressions().localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS
);

Map<String, SearchResponse.Cluster> reconciledMap = ConcurrentCollections.newConcurrentMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a concurrent map here? Can this be called by more than one thread within a given request?

Copy link
Contributor Author

@pawankartik-elastic pawankartik-elastic Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CHM wasn't used until: #99101. We only started using it after: #100129. I'm simply retaining the existing behaviour that's introduced as an enhancement.

Copy link
Contributor

@quux00 quux00 Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map passed into return new SearchResponse.Clusters(reconciledMap, false); (at the bottom of this method) is not used directly - the entries are copied into a ConcurrentHashMap in the SearchResponse.Clusters constructor. So unless this reconcileProjects method is ever called by multiple threads for a given request (which I don't think it is), then there is no need for this map to be concurrent. You are paying additional overhead for no reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is SearchResponse.Clusters:

public Clusters(Map<String, Cluster> clusterInfoMap, boolean ccsMinimizeRoundtrips) {
    assert clusterInfoMap.size() > 0 : "this constructor should not be called with an empty Cluster info map";
    this.total = clusterInfoMap.size();
    this.clusterInfo = clusterInfoMap;
    this.successful = getClusterStateCount(Cluster.Status.SUCCESSFUL);
    this.skipped = getClusterStateCount(Cluster.Status.SKIPPED);
    // should only be called if "details" section of fromXContent is present (for ccsMinimizeRoundtrips)
    this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
}

In this ctor, the passed map is directly assigned to the field. I don't think this field is initialised prior and a copying is happening here. Or am I misreading this?

Copy link
Contributor

@quux00 quux00 Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK. This is worse than I thought :-) and points out a fragility in the SearchResponse.Clusters code, so thanks for pointing that out.

I was referring to this constructor (and the one that takes in a StreamInput).

The constructors that take in Map<String, Cluster> are fragile - you need to pass in ConcurrentHashMap and the arg signature of the function should require ConcurrentHashMap (not Map) OR those need to be refactored to copy from the incoming map into a ConcurrentHashMap. Otherwise we risk buggy code.

I think the proper fix is two-fold:

  1. The Map type in the SearchResponse.Clusters class should be declared as ConcurrentHashMap, not Map, otherwise the code is fragile. I think that could be a follow-on refactoring PR that we leave for later.

  2. The current constructor (in main) that takes Map<String, Cluster> (see below), should be changed to copy out of the incoming map into a new ConcurrentHashMap.

        public Clusters(Map<String, Cluster> clusterInfoMap) {
            assert clusterInfoMap.size() > 0 : "this constructor should not be called with an empty Cluster info map";
            this.total = clusterInfoMap.size();
            this.clusterInfo = clusterInfoMap;
            this.successful = getClusterStateCount(Cluster.Status.SUCCESSFUL);
            this.skipped = getClusterStateCount(Cluster.Status.SKIPPED);
            // should only be called if "details" section of fromXContent is present (for ccsMinimizeRoundtrips)
            this.ccsMinimizeRoundtrips = true;
        }

For your PR, that constructor was changed to call your new one, so let's change your new one to do the copying. In which case the code above on line 1023 should just be a Map not a ConcurrentHashMap.

Copy link
Contributor

@quux00 quux00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next round of feedback on the ConcurrentHashMap discussion.

expr -> expr.localExpressions().localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS
);

Map<String, SearchResponse.Cluster> reconciledMap = ConcurrentCollections.newConcurrentMap();
Copy link
Contributor

@quux00 quux00 Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK. This is worse than I thought :-) and points out a fragility in the SearchResponse.Clusters code, so thanks for pointing that out.

I was referring to this constructor (and the one that takes in a StreamInput).

The constructors that take in Map<String, Cluster> are fragile - you need to pass in ConcurrentHashMap and the arg signature of the function should require ConcurrentHashMap (not Map) OR those need to be refactored to copy from the incoming map into a ConcurrentHashMap. Otherwise we risk buggy code.

I think the proper fix is two-fold:

  1. The Map type in the SearchResponse.Clusters class should be declared as ConcurrentHashMap, not Map, otherwise the code is fragile. I think that could be a follow-on refactoring PR that we leave for later.

  2. The current constructor (in main) that takes Map<String, Cluster> (see below), should be changed to copy out of the incoming map into a new ConcurrentHashMap.

        public Clusters(Map<String, Cluster> clusterInfoMap) {
            assert clusterInfoMap.size() > 0 : "this constructor should not be called with an empty Cluster info map";
            this.total = clusterInfoMap.size();
            this.clusterInfo = clusterInfoMap;
            this.successful = getClusterStateCount(Cluster.Status.SUCCESSFUL);
            this.skipped = getClusterStateCount(Cluster.Status.SKIPPED);
            // should only be called if "details" section of fromXContent is present (for ccsMinimizeRoundtrips)
            this.ccsMinimizeRoundtrips = true;
        }

For your PR, that constructor was changed to call your new one, so let's change your new one to do the copying. In which case the code above on line 1023 should just be a Map not a ConcurrentHashMap.

Copy link
Contributor

@quux00 quux00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@pawankartik-elastic pawankartik-elastic merged commit 87807f0 into elastic:main Oct 30, 2025
34 checks passed
@pawankartik-elastic pawankartik-elastic deleted the pkar/cps-mrt-false-tsa branch October 30, 2025 16:31
chrisparrinello pushed a commit to chrisparrinello/elasticsearch that referenced this pull request Nov 3, 2025
…T=false (elastic#136871)

These changes implement and enable Cross Project Search for `_async_search`
and `_search` for when `ccs_minimize_roundtrips-false`.
phananh1010 added a commit to phananh1010/elasticsearch that referenced this pull request Nov 6, 2025
BASE=9aed5f4584bafbbaa6129d3ee2345b56ddb744c9
HEAD=ffbe556e736c1294091045b07d78d5da2d70e53d
Branch=main
phananh1010 added a commit to phananh1010/elasticsearch that referenced this pull request Nov 7, 2025
BASE=9aed5f4584bafbbaa6129d3ee2345b56ddb744c9
HEAD=ffbe556e736c1294091045b07d78d5da2d70e53d
Branch=main
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>refactoring :Search Foundations/CCS serverless-linked Added by automation, don't add manually Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants