Skip to content

Commit 1607149

Browse files
authored
Clean up exchanges in EsqlNodeFailureIT (elastic#121633) (elastic#121926)
If the query hits the failing index first, we will cancel the request, preventing exchange-sink requests and data-node requests from reaching another data node. As a result, exchange sinks could stay for 30 seconds.
1 parent ab58158 commit 1607149

File tree

1 file changed

+17
-3
lines changed

1 file changed

+17
-3
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.action.index.IndexRequestBuilder;
1212
import org.elasticsearch.common.settings.Settings;
13-
import org.elasticsearch.common.util.CollectionUtils;
13+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
14+
import org.elasticsearch.core.TimeValue;
1415
import org.elasticsearch.index.mapper.OnScriptError;
1516
import org.elasticsearch.plugins.Plugin;
1617
import org.elasticsearch.plugins.ScriptPlugin;
@@ -38,7 +39,20 @@
3839
public class EsqlNodeFailureIT extends AbstractEsqlIntegTestCase {
3940
@Override
4041
protected Collection<Class<? extends Plugin>> nodePlugins() {
41-
return CollectionUtils.appendToCopy(super.nodePlugins(), FailingFieldPlugin.class);
42+
var plugins = new ArrayList<>(super.nodePlugins());
43+
plugins.add(FailingFieldPlugin.class);
44+
plugins.add(InternalExchangePlugin.class);
45+
return plugins;
46+
}
47+
48+
@Override
49+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
50+
Settings settings = Settings.builder()
51+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
52+
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(3000, 4000)))
53+
.build();
54+
logger.info("settings {}", settings);
55+
return settings;
4256
}
4357

4458
/**
@@ -58,7 +72,7 @@ public void testFailureLoadingFields() throws IOException {
5872
mapping.endObject();
5973
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();
6074

61-
int docCount = 100;
75+
int docCount = 50;
6276
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
6377
for (int d = 0; d < docCount; d++) {
6478
docs.add(client().prepareIndex("ok").setSource("foo", d));

0 commit comments

Comments
 (0)