Skip to content

Conversation

@ivancea
Copy link
Contributor

@ivancea ivancea commented Aug 12, 2025

Fixes #128030
Fixes #130296
Fixes #130642
Fixes #131148
Fixes #132554
Fixes #132555
Fixes #132604
Fixes #131563
Fixes #132778

Extracted from #132738

An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the onSeqNoCompleted()>notifyIfBlocked()>future.onResponse(null), which ends the processing in some cases.

Bug explanation

An example profile of one of the CI fails:

{
  planning={took_millis=8, start_millis=1754605547195, stop_millis=1754605547203, took_nanos=8612200},
  plans=[
    {
      cluster_name=test-cluster,
      node_name=test-cluster-1,
      description=single,
      plan=
        OutputExec[org.elasticsearch.xpack.esql.plugin.ComputeService$$Lambda/0x0000000095705068@4387ca3]	
        \_ProjectExec[[language_code{r}#25708, language_name{f}#25719, country{f}#25716]]	
          \_TopNExec[[Order[language_code{r}#25708,ASC,LAST], Order[language_name{f}#25719,ASC,LAST], Order[country{f}#25716,ASC,LAST]],1000[INTEGER],104]	
            \_LookupJoinExec[[language_code{r}#25708],[language_code{f}#25718],[country{f}#25716, language_name{f}#25719]]	
              |_LocalSourceExec[[language_code{r}#25708],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@1ef1bc4]	
              \_EsQueryExec[languages_lookup_non_unique_key], indexMode[lookup], query[][_doc{f}#25720], limit[], sort[] estimatedRowSize[104]
    }
  ],
  query={took_millis=14, start_millis=1754605547195, stop_millis=1754605547209, took_nanos=14013972},
  drivers=[
    {
      cluster_name=test-cluster, sleeps={last=[], counts={}, first=[]}, cpu_nanos=3189539,
      operators=[
        {operator=LocalSourceOperator},
        {
          operator=LookupOperator[
            index=languages_lookup_non_unique_key
            input_type=INTEGER
            match_field=language_code
            load_fields=[country{f}#25716, language_name{f}#25719]
            inputChannel=0
          ],
          status={received_pages=0, total_terms=4, emitted_pages=1, process_nanos=0, completed_pages=0}
        },
        {
          operator=TopNOperator[
            count=0/1000,
            elementTypes=[INT, BYTES_REF, BYTES_REF],
            encoders=[DefaultSortable, UTF8TopNEncoder, UTF8TopNEncoder],
            sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false], SortOrder[channel=2, asc=true, nullsFirst=false], SortOrder[channel=1, asc=true, nullsFirst=false]]
          ],
          status={receive_nanos=36566, emit_nanos=33132, occupied_rows=0, rows_emitted=1, pages_emitted=1, ram_used=4.2kb, rows_received=1, pages_received=1, ram_bytes_used=4368}
        },
        {operator=ProjectOperator[projection = [0, 2, 1]], status={pages_processed=1, rows_emitted=1, process_nanos=2856, rows_received=1}},
        {operator=OutputOperator[columns = [language_code, language_name, country]]}
      ],
      node_name=test-cluster-1,
      description=single,
      documents_found=0,
      start_millis=1754605547205,
      stop_millis=1754605547208,
      values_loaded=0,
      iterations=3124,
      took_nanos=3432530
    }
  ]
}

The LookupOperator has this strange status:
status={received_pages=0, total_terms=4, emitted_pages=1, process_nanos=0, completed_pages=0}

0 received pages, 0 completed pages, 0 prcess_nanos, but 4 total terms and 1 emitted page. All the 0s come from AsyncOperator, while the others come from LookupFromIndexOperator.

After some investigation, the received_pages and completed_pages were actually wrong (Fixed in #132738), but the process_nanos shows the issue: That part of the code didn't get to execute yet.

@ivancea ivancea added >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.2.0 labels Aug 12, 2025
@ivancea ivancea requested a review from nik9000 August 12, 2025 15:30
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @ivancea, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Hi @ivancea, I've updated the changelog YAML for you.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better to me. Let @dnhatn look too please.

@Override
public final Operator.Status status() {
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum());
return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn , could you have a look at this bit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this whole PR to be honest. It's the kind of thing I don't trust myself with. Obviously, I broke it in the first place.

Copy link
Contributor Author

@ivancea ivancea Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey! This change feels a bit mystic. It's properly tested in the other PR, where I fix and test multiple things around the status() here.

I'll remove it from here, it doesn't make sense in this PR, and I wanted only the other bugfix to be here!

@dnhatn
Copy link
Member

dnhatn commented Aug 12, 2025

Thanks Ivan!

I think the problematic sequence is: onSeqNoCompleted(seqNo)close()responseHeadersCollector.finish()responseHeadersCollector.collect()driverContext.removeAsyncAction(), where responseHeadersCollector.finish() is called before responseHeadersCollector.collect(). This PR prevents that sequence. However, we should not increase the sequence number in onFailure() (call onSeqNoCompleted), as it may cause a NullPointerException later:

@dnhatn dnhatn self-requested a review August 12, 2025 17:06
@elasticsearchmachine
Copy link
Collaborator

Hi @ivancea, I've updated the changelog YAML for you.

responseHeadersCollector.collect();
driverContext.removeAsyncAction();
processNanos.add(System.nanoTime() - startNanos);
onSeqNoCompleted(seqNo);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Answering you here @dnhatn, as to have a thread)

This PR prevents that sequence. However, we should not increase the sequence number in onFailure() (call onSeqNoCompleted), as it may cause a NullPointerException later

From what I see, there's already an if (fetched != null) there on line 170, so no possible NPE, unless you mean something else or I missed something 👀

private void discardResults() {
long nextCheckpoint;
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) {
Fetched result = buffers.remove(nextCheckpoint);
checkpoint.markSeqNoAsPersisted(nextCheckpoint);
if (result != null) {
releaseFetchedOnAnyThread(result);
}
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Ivan. I took another closer look. I think we are all good.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks Ivan!

responseHeadersCollector.collect();
driverContext.removeAsyncAction();
processNanos.add(System.nanoTime() - startNanos);
onSeqNoCompleted(seqNo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Ivan. I took another closer look. I think we are all good.

@ivancea ivancea merged commit 4e3602d into elastic:main Aug 14, 2025
33 checks passed
@ivancea ivancea deleted the esql-fix-async-operator-warnings branch August 14, 2025 10:05
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 14, 2025
…-stats

* upstream/main: (36 commits)
  ESQL: Fix async operator warnings not always sent when blocking (elastic#132744)
  Method not needed anymore (elastic#132912)
  [Test] Excercise shutdown more reliably in snapshot stress IT (elastic#132909)
  Update Gradle shadow plugin to 9.0.1 (elastic#132637)
  Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/410_named_queries/named_queries_with_score} elastic#132906
  Update docker.elastic.co/wolfi/chainguard-base-fips:latest Docker digest to fa6cb69 (elastic#132735)
  Remove unnecessary calls to fold()  (elastic#131870)
  Use consistent terminology for transport version resources/references (elastic#132882)
  Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search.vectors/40_knn_search_cosine/kNN search only regular query} elastic#132890
  Finalize release notes for v9.1.2 release (elastic#132745)
  Finalize release notes for v9.0.5 release (elastic#132718)
  Move inner records out of TransportVersionUtils (elastic#132872)
  Add support for Lookup Join on Multiple Fields (elastic#131559)
  Bootstrap PR-based benchmarks (elastic#132717)
  Refactor MetadataIndexTemplateService to use template maps instead of project metadata (elastic#132662)
  [Gradle] Update nebula ospackage plugin to 12.1.0 (elastic#132640)
  Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:ip.CdirMatchEqualsInsOrs} elastic#132860
  Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:floats.InMultivalue} elastic#132859
  Revert "Reuse prod code and reduce EsqlSession public surface" (elastic#132843)
  Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:string.LengthOfText} elastic#132857
  ...
joshua-adams-1 pushed a commit to joshua-adams-1/elasticsearch that referenced this pull request Aug 14, 2025
…tic#132744)

Fixes elastic#128030
Fixes elastic#130296
Fixes elastic#130642
Fixes elastic#131148
Fixes elastic#132554
Fixes elastic#132555
Fixes elastic#132604
Fixes elastic#131563
Fixes elastic#132778

Extracted from elastic#132738

An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the `onSeqNoCompleted()`>`notifyIfBlocked()`>`future.onResponse(null)`, which ends the processing in some cases.
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 15, 2025
* upstream/main: (278 commits)
  ESQL - dense vector support cosine normalization (elastic#132721)
  [ML] Add support for dimensions in google vertex ai request (elastic#132689)
  ESQL - Add  byte element support for dense_vector data type (elastic#131863)
  ESQL: Fix async operator warnings not always sent when blocking (elastic#132744)
  Method not needed anymore (elastic#132912)
  [Test] Excercise shutdown more reliably in snapshot stress IT (elastic#132909)
  Update Gradle shadow plugin to 9.0.1 (elastic#132637)
  Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/410_named_queries/named_queries_with_score} elastic#132906
  Update docker.elastic.co/wolfi/chainguard-base-fips:latest Docker digest to fa6cb69 (elastic#132735)
  Remove unnecessary calls to fold()  (elastic#131870)
  Use consistent terminology for transport version resources/references (elastic#132882)
  Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search.vectors/40_knn_search_cosine/kNN search only regular query} elastic#132890
  Finalize release notes for v9.1.2 release (elastic#132745)
  Finalize release notes for v9.0.5 release (elastic#132718)
  Move inner records out of TransportVersionUtils (elastic#132872)
  Add support for Lookup Join on Multiple Fields (elastic#131559)
  Bootstrap PR-based benchmarks (elastic#132717)
  Refactor MetadataIndexTemplateService to use template maps instead of project metadata (elastic#132662)
  [Gradle] Update nebula ospackage plugin to 12.1.0 (elastic#132640)
  Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:ip.CdirMatchEqualsInsOrs} elastic#132860
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment