Skip to content

Commit b268be9

Browse files
committed
Added tests to ensure cancellation
1 parent 9dd64a6 commit b268be9

File tree

1 file changed

+218
-0
lines changed

1 file changed

+218
-0
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.aggregations.bucket;
11+
12+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
13+
import org.elasticsearch.action.search.SearchRequestBuilder;
14+
import org.elasticsearch.action.search.TransportSearchAction;
15+
import org.elasticsearch.action.support.WriteRequest;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.CollectionUtils;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.index.mapper.OnScriptError;
20+
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.plugins.ScriptPlugin;
22+
import org.elasticsearch.script.LongFieldScript;
23+
import org.elasticsearch.script.ScriptContext;
24+
import org.elasticsearch.script.ScriptEngine;
25+
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter;
26+
import org.elasticsearch.search.lookup.SearchLookup;
27+
import org.elasticsearch.tasks.TaskInfo;
28+
import org.elasticsearch.test.ESIntegTestCase;
29+
import org.elasticsearch.xcontent.XContentBuilder;
30+
import org.elasticsearch.xcontent.json.JsonXContent;
31+
32+
import java.util.Collection;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Set;
36+
import java.util.concurrent.TimeUnit;
37+
38+
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
39+
import static org.elasticsearch.search.aggregations.AggregationBuilders.filters;
40+
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
41+
import static org.hamcrest.Matchers.empty;
42+
import static org.hamcrest.Matchers.not;
43+
44+
@ESIntegTestCase.SuiteScopeTestCase
45+
public class FiltersCancellationIT extends ESIntegTestCase {
46+
47+
private static final String INDEX = "idx";
48+
private static final String SLEEP_FIELD = "sleep";
49+
private static final String NUMERIC_FIELD = "value";
50+
51+
private static final int NUM_DOCS = 100_000;
52+
/**
53+
* The number of milliseconds to sleep in the script that simulates a long-running operation.
54+
* <p>
55+
* As CancellableBulkScorer does a minimum of 4096 docs per batch, this number must be low to avoid long test times.
56+
* </p>
57+
*/
58+
private static final long SLEEP_SCRIPT_MS = 1;
59+
60+
@Override
61+
protected Collection<Class<? extends Plugin>> nodePlugins() {
62+
return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass());
63+
}
64+
65+
protected Class<? extends Plugin> pausableFieldPluginClass() {
66+
return SleepScriptPlugin.class;
67+
}
68+
69+
@Override
70+
public void setupSuiteScopeCluster() throws Exception {
71+
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
72+
mapping.startObject("runtime");
73+
{
74+
mapping.startObject(SLEEP_FIELD);
75+
{
76+
mapping.field("type", "long");
77+
mapping.startObject("script").field("source", "").field("lang", SleepScriptPlugin.PAUSE_SCRIPT_LANG).endObject();
78+
}
79+
mapping.endObject();
80+
mapping.startObject(NUMERIC_FIELD);
81+
{
82+
mapping.field("type", "long");
83+
}
84+
mapping.endObject();
85+
}
86+
mapping.endObject();
87+
client().admin().indices().prepareCreate(INDEX).setMapping(mapping.endObject()).get();
88+
89+
BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
90+
for (int i = 0; i < NUM_DOCS; i++) {
91+
bulk.add(prepareIndex(INDEX).setId(Integer.toString(i)).setSource(NUMERIC_FIELD, i));
92+
}
93+
bulk.get();
94+
client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get();
95+
}
96+
97+
public void testFiltersCountCancellation() throws Exception {
98+
ensureProperCancellation(
99+
client().prepareSearch(INDEX)
100+
.addAggregation(
101+
filters(
102+
"filters",
103+
new KeyedFilter[] {
104+
new KeyedFilter("filter1", termQuery(SLEEP_FIELD, 1)),
105+
new KeyedFilter("filter2", termQuery(SLEEP_FIELD, 2)) }
106+
)
107+
)
108+
);
109+
}
110+
111+
public void testFiltersSubAggsCancellation() throws Exception {
112+
ensureProperCancellation(
113+
client().prepareSearch(INDEX)
114+
.addAggregation(
115+
filters(
116+
"filters",
117+
new KeyedFilter[] {
118+
new KeyedFilter("filter1", termQuery(SLEEP_FIELD, 1)),
119+
new KeyedFilter("filter2", termQuery(SLEEP_FIELD, 2)) }
120+
).subAggregation(terms("sub").field(SLEEP_FIELD))
121+
)
122+
);
123+
}
124+
125+
private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception {
126+
var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute();
127+
assertFalse(searchRequestFuture.isCancelled());
128+
assertFalse(searchRequestFuture.isDone());
129+
130+
// Check that there are search tasks running
131+
assertThat(getSearchTasks(), not(empty()));
132+
133+
// Wait to ensure scripts started executing and that we don't cancel too early
134+
safeSleep(2000);
135+
136+
// CancellableBulkScorer does a starting batch of 4096 items, x2 after each iteration.
137+
// That times SLEEP_SCRIPT_MS gives us the maximum time to wait (x2 to avoid flakiness)
138+
long maxWaitMs = 3 * (4096 * SLEEP_SCRIPT_MS);
139+
140+
// Cancel the tasks
141+
client().admin()
142+
.cluster()
143+
.prepareCancelTasks()
144+
.setActions(TransportSearchAction.NAME + "*")
145+
.waitForCompletion(true)
146+
.setTimeout(TimeValue.timeValueMillis(maxWaitMs))
147+
.get();
148+
149+
// Ensure the search request finished and that there are no more search tasks
150+
assertBusy(() -> {
151+
assertThat(getSearchTasks(), empty());
152+
assertTrue(searchRequestFuture.isDone());
153+
}, maxWaitMs, TimeUnit.MILLISECONDS);
154+
}
155+
156+
private List<TaskInfo> getSearchTasks() {
157+
return client().admin()
158+
.cluster()
159+
.prepareListTasks()
160+
.setActions(TransportSearchAction.NAME + "*")
161+
.setDetailed(true)
162+
.get()
163+
.getTasks();
164+
}
165+
166+
public static class SleepScriptPlugin extends Plugin implements ScriptPlugin {
167+
public static final String PAUSE_SCRIPT_LANG = "pause";
168+
169+
@Override
170+
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
171+
return new ScriptEngine() {
172+
@Override
173+
public String getType() {
174+
return PAUSE_SCRIPT_LANG;
175+
}
176+
177+
@Override
178+
@SuppressWarnings("unchecked")
179+
public <FactoryType> FactoryType compile(
180+
String name,
181+
String code,
182+
ScriptContext<FactoryType> context,
183+
Map<String, String> params
184+
) {
185+
if (context == LongFieldScript.CONTEXT) {
186+
return (FactoryType) new LongFieldScript.Factory() {
187+
@Override
188+
public LongFieldScript.LeafFactory newFactory(
189+
String fieldName,
190+
Map<String, Object> params,
191+
SearchLookup searchLookup,
192+
OnScriptError onScriptError
193+
) {
194+
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
195+
@Override
196+
public void execute() {
197+
try {
198+
Thread.sleep(SLEEP_SCRIPT_MS);
199+
} catch (InterruptedException e) {
200+
throw new AssertionError(e);
201+
}
202+
emit(1);
203+
}
204+
};
205+
}
206+
};
207+
}
208+
throw new IllegalStateException("unsupported type " + context);
209+
}
210+
211+
@Override
212+
public Set<ScriptContext<?>> getSupportedContexts() {
213+
return Set.of(LongFieldScript.CONTEXT);
214+
}
215+
};
216+
}
217+
}
218+
}

0 commit comments

Comments
 (0)