Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/changelog/132744.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pr: 132744
summary: Fix async operator warnings not always sent when blocking
area: ES|QL
type: bug
issues:
- 130642
- 132554
- 132778
- 130296
- 132555
- 131563
- 131148
- 132604
- 128030
21 changes: 0 additions & 21 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT
method: testOneRemoteClusterPartial
issue: https://github.com/elastic/elasticsearch/issues/124055
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {csv-spec:lookup-join.MvJoinKeyOnTheLookupIndex}
issue: https://github.com/elastic/elasticsearch/issues/128030
- class: org.elasticsearch.packaging.test.EnrollmentProcessTests
method: test20DockerAutoFormCluster
issue: https://github.com/elastic/elasticsearch/issues/128113
Expand Down Expand Up @@ -348,18 +345,12 @@ tests:
- class: org.elasticsearch.index.IndexingPressureIT
method: testWriteCanRejectOnPrimaryBasedOnMaxOperationSize
issue: https://github.com/elastic/elasticsearch/issues/130281
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {csv-spec:lookup-join.MvJoinKeyOnFrom}
issue: https://github.com/elastic/elasticsearch/issues/130296
- class: org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceExecutorTests
method: testSuccessfulExecution
issue: https://github.com/elastic/elasticsearch/issues/130306
- class: org.elasticsearch.gradle.LoggedExecFuncTest
method: failed tasks output logged to console when spooling true
issue: https://github.com/elastic/elasticsearch/issues/119509
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
method: test {csv-spec:lookup-join.MvJoinKeyFromRow}
issue: https://github.com/elastic/elasticsearch/issues/130642
- class: org.elasticsearch.indices.stats.IndexStatsIT
method: testFilterCacheStats
issue: https://github.com/elastic/elasticsearch/issues/124447
Expand All @@ -369,9 +360,6 @@ tests:
- class: org.elasticsearch.search.SearchWithRejectionsIT
method: testOpenContextsAfterRejections
issue: https://github.com/elastic/elasticsearch/issues/130821
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {csv-spec:lookup-join.MvJoinKeyOnFromAfterStats}
issue: https://github.com/elastic/elasticsearch/issues/131148
- class: org.elasticsearch.xpack.esql.ccq.MultiClustersIT
method: testLookupJoinAliases
issue: https://github.com/elastic/elasticsearch/issues/131166
Expand Down Expand Up @@ -462,12 +450,6 @@ tests:
- class: org.elasticsearch.xpack.esql.inference.completion.CompletionOperatorTests
method: testSimpleCircuitBreaking
issue: https://github.com/elastic/elasticsearch/issues/132382
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
method: test {csv-spec:lookup-join.MvJoinKeyOnFrom}
issue: https://github.com/elastic/elasticsearch/issues/132554
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {csv-spec:lookup-join.MvJoinKeyFromRow}
issue: https://github.com/elastic/elasticsearch/issues/132555
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
method: test {csv-spec:spatial.ConvertFromStringParseError}
issue: https://github.com/elastic/elasticsearch/issues/132558
Expand All @@ -480,9 +462,6 @@ tests:
- class: org.elasticsearch.xpack.logsdb.qa.StoredSourceLogsDbVersusReindexedLogsDbChallengeRestIT
method: testEsqlSource
issue: https://github.com/elastic/elasticsearch/issues/132602
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
method: test {csv-spec:lookup-join.MvJoinKeyFromRowExpanded}
issue: https://github.com/elastic/elasticsearch/issues/132604
- class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT
method: testRevertModelSnapshot_DeleteInterveningResults
issue: https://github.com/elastic/elasticsearch/issues/132349
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,16 @@ public void addInput(Page input) {
driverContext.addAsyncAction();
boolean success = false;
try {
final ActionListener<Fetched> listener = ActionListener.wrap(output -> {
buffers.put(seqNo, output);
onSeqNoCompleted(seqNo);
}, e -> {
final ActionListener<Fetched> listener = ActionListener.wrap(output -> buffers.put(seqNo, output), e -> {
releasePageOnAnyThread(input);
failureCollector.unwrapAndCollect(e);
onSeqNoCompleted(seqNo);
});
final long startNanos = System.nanoTime();
performAsync(input, ActionListener.runAfter(listener, () -> {
responseHeadersCollector.collect();
driverContext.removeAsyncAction();
processNanos.add(System.nanoTime() - startNanos);
onSeqNoCompleted(seqNo);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Answering you here @dnhatn, as to have a thread)

This PR prevents that sequence. However, we should not increase the sequence number in onFailure() (call onSeqNoCompleted), as it may cause a NullPointerException later

From what I see, there's already an if (fetched != null) there on line 170, so no possible NPE, unless you mean something else or I missed something 👀

private void discardResults() {
long nextCheckpoint;
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) {
Fetched result = buffers.remove(nextCheckpoint);
checkpoint.markSeqNoAsPersisted(nextCheckpoint);
if (result != null) {
releaseFetchedOnAnyThread(result);
}
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Ivan. I took another closer look. I think we are all good.

driverContext.removeAsyncAction();
}));
success = true;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ emp_no:integer | language_code:integer | language_name:keyword

mvJoinKeyOnTheLookupIndex
required_capability: join_lookup_v12
required_capability: join_lookup_skip_mv_warnings
required_capability: async_operator_warnings_fix

FROM employees
| WHERE 10003 < emp_no AND emp_no < 10008
Expand All @@ -451,7 +451,7 @@ emp_no:integer | language_code:integer | language_name:keyword

mvJoinKeyOnFrom
required_capability: join_lookup_v12
required_capability: join_lookup_skip_mv_warnings
required_capability: async_operator_warnings_fix

FROM employees
| WHERE emp_no < 10006
Expand All @@ -474,7 +474,7 @@ emp_no:integer | language_code:integer | language_name:keyword

mvJoinKeyOnTheLookupIndexAfterStats
required_capability: join_lookup_v12
required_capability: join_lookup_skip_mv_warnings
required_capability: async_operator_warnings_fix

FROM employees
| WHERE 10003 < emp_no AND emp_no < 10008
Expand All @@ -497,7 +497,7 @@ emp_no:integer | language_code:integer | language_name:keyword

mvJoinKeyOnFromAfterStats
required_capability: join_lookup_v12
required_capability: join_lookup_skip_mv_warnings
required_capability: async_operator_warnings_fix

FROM employees
| WHERE emp_no < 10006
Expand All @@ -521,7 +521,7 @@ emp_no:integer | language_code:integer | language_name:keyword

mvJoinKeyFromRow
required_capability: join_lookup_v12
required_capability: join_lookup_skip_mv_warnings
required_capability: async_operator_warnings_fix

ROW language_code = [4, 5, 6, 7]
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
Expand All @@ -538,7 +538,7 @@ language_code:integer | language_name:keyword | country:text

mvJoinKeyFromRowExpanded
required_capability: join_lookup_v12
required_capability: join_lookup_skip_mv_warnings
required_capability: async_operator_warnings_fix

ROW language_code = [4, 5, 6, 7, 8]
| MV_EXPAND language_code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,11 @@ public enum Cap {
*/
JOIN_LOOKUP_SKIP_MV_WARNINGS(JOIN_LOOKUP_V12.isEnabled()),

/**
* Fix for async operator sometimes completing the driver without emitting the stored warnings
*/
ASYNC_OPERATOR_WARNINGS_FIX,

/**
* Fix pushing down LIMIT past LOOKUP JOIN in case of multiple matching join keys.
*/
Expand Down