Skip to content

Commit 72f8530

Browse files
authored
Refactor pausable field plugin to have common codebase (#118909) (#119008)
(cherry picked from commit 65faabd)
1 parent 0b4937f commit 72f8530

File tree

5 files changed

+136
-200
lines changed

5 files changed

+136
-200
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java

Lines changed: 3 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,15 @@
1010
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1111
import org.elasticsearch.action.support.WriteRequest;
1212
import org.elasticsearch.common.Strings;
13-
import org.elasticsearch.common.settings.Settings;
1413
import org.elasticsearch.common.util.CollectionUtils;
1514
import org.elasticsearch.index.engine.SegmentsStats;
16-
import org.elasticsearch.index.mapper.OnScriptError;
17-
import org.elasticsearch.logging.LogManager;
18-
import org.elasticsearch.logging.Logger;
1915
import org.elasticsearch.plugins.Plugin;
20-
import org.elasticsearch.plugins.ScriptPlugin;
21-
import org.elasticsearch.script.LongFieldScript;
22-
import org.elasticsearch.script.ScriptContext;
23-
import org.elasticsearch.script.ScriptEngine;
24-
import org.elasticsearch.search.lookup.SearchLookup;
2516
import org.elasticsearch.xcontent.XContentBuilder;
2617
import org.elasticsearch.xcontent.json.JsonXContent;
2718
import org.junit.Before;
2819

2920
import java.io.IOException;
3021
import java.util.Collection;
31-
import java.util.Map;
32-
import java.util.Set;
3322
import java.util.concurrent.Semaphore;
3423
import java.util.concurrent.TimeUnit;
3524

@@ -40,8 +29,6 @@
4029
*/
4130
public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {
4231

43-
private static final Logger LOGGER = LogManager.getLogger(AbstractPausableIntegTestCase.class);
44-
4532
protected static final Semaphore scriptPermits = new Semaphore(0);
4633

4734
protected int pageSize = -1;
@@ -108,53 +95,10 @@ public void setupIndex() throws IOException {
10895
}
10996
}
11097

111-
public static class PausableFieldPlugin extends Plugin implements ScriptPlugin {
112-
98+
public static class PausableFieldPlugin extends AbstractPauseFieldPlugin {
11399
@Override
114-
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
115-
return new ScriptEngine() {
116-
@Override
117-
public String getType() {
118-
return "pause";
119-
}
120-
121-
@Override
122-
@SuppressWarnings("unchecked")
123-
public <FactoryType> FactoryType compile(
124-
String name,
125-
String code,
126-
ScriptContext<FactoryType> context,
127-
Map<String, String> params
128-
) {
129-
return (FactoryType) new LongFieldScript.Factory() {
130-
@Override
131-
public LongFieldScript.LeafFactory newFactory(
132-
String fieldName,
133-
Map<String, Object> params,
134-
SearchLookup searchLookup,
135-
OnScriptError onScriptError
136-
) {
137-
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
138-
@Override
139-
public void execute() {
140-
try {
141-
assertTrue(scriptPermits.tryAcquire(1, TimeUnit.MINUTES));
142-
} catch (Exception e) {
143-
throw new AssertionError(e);
144-
}
145-
LOGGER.debug("--> emitting value");
146-
emit(1);
147-
}
148-
};
149-
}
150-
};
151-
}
152-
153-
@Override
154-
public Set<ScriptContext<?>> getSupportedContexts() {
155-
return Set.of(LongFieldScript.CONTEXT);
156-
}
157-
};
100+
protected boolean onWait() throws InterruptedException {
101+
return scriptPermits.tryAcquire(1, TimeUnit.MINUTES);
158102
}
159103
}
160104
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.index.mapper.OnScriptError;
12+
import org.elasticsearch.plugins.Plugin;
13+
import org.elasticsearch.plugins.ScriptPlugin;
14+
import org.elasticsearch.script.LongFieldScript;
15+
import org.elasticsearch.script.ScriptContext;
16+
import org.elasticsearch.script.ScriptEngine;
17+
import org.elasticsearch.search.lookup.SearchLookup;
18+
19+
import java.util.Collection;
20+
import java.util.Map;
21+
import java.util.Set;
22+
23+
import static org.junit.Assert.assertTrue;
24+
25+
/**
26+
* A plugin that provides a script language "pause" that can be used to simulate slow running queries.
27+
* See also {@link AbstractPausableIntegTestCase}.
28+
*/
29+
public abstract class AbstractPauseFieldPlugin extends Plugin implements ScriptPlugin {
30+
31+
// Called when the engine enters the execute() method.
32+
protected void onStartExecute() {}
33+
34+
// Called when the engine needs to wait for further execution to be allowed.
35+
protected abstract boolean onWait() throws InterruptedException;
36+
37+
@Override
38+
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
39+
return new ScriptEngine() {
40+
@Override
41+
public String getType() {
42+
return "pause";
43+
}
44+
45+
@Override
46+
@SuppressWarnings("unchecked")
47+
public <FactoryType> FactoryType compile(
48+
String name,
49+
String code,
50+
ScriptContext<FactoryType> context,
51+
Map<String, String> params
52+
) {
53+
if (context == LongFieldScript.CONTEXT) {
54+
return (FactoryType) new LongFieldScript.Factory() {
55+
@Override
56+
public LongFieldScript.LeafFactory newFactory(
57+
String fieldName,
58+
Map<String, Object> params,
59+
SearchLookup searchLookup,
60+
OnScriptError onScriptError
61+
) {
62+
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
63+
@Override
64+
public void execute() {
65+
onStartExecute();
66+
try {
67+
assertTrue(onWait());
68+
} catch (InterruptedException e) {
69+
throw new AssertionError(e);
70+
}
71+
emit(1);
72+
}
73+
};
74+
}
75+
};
76+
}
77+
throw new IllegalStateException("unsupported type " + context);
78+
}
79+
80+
@Override
81+
public Set<ScriptContext<?>> getSupportedContexts() {
82+
return Set.of(LongFieldScript.CONTEXT);
83+
}
84+
};
85+
}
86+
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java

Lines changed: 4 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,8 @@
1919
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2020
import org.elasticsearch.core.TimeValue;
2121
import org.elasticsearch.core.Tuple;
22-
import org.elasticsearch.index.mapper.OnScriptError;
2322
import org.elasticsearch.index.query.QueryBuilder;
2423
import org.elasticsearch.plugins.Plugin;
25-
import org.elasticsearch.plugins.ScriptPlugin;
26-
import org.elasticsearch.script.LongFieldScript;
27-
import org.elasticsearch.script.ScriptContext;
28-
import org.elasticsearch.script.ScriptEngine;
29-
import org.elasticsearch.search.lookup.SearchLookup;
3024
import org.elasticsearch.test.AbstractMultiClustersTestCase;
3125
import org.elasticsearch.test.XContentTestUtils;
3226
import org.elasticsearch.transport.RemoteClusterAware;
@@ -44,7 +38,6 @@
4438
import java.util.List;
4539
import java.util.Map;
4640
import java.util.Set;
47-
import java.util.concurrent.CountDownLatch;
4841
import java.util.concurrent.TimeUnit;
4942
import java.util.concurrent.atomic.AtomicReference;
5043

@@ -80,7 +73,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
8073
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
8174
plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
8275
plugins.add(InternalExchangePlugin.class);
83-
plugins.add(PauseFieldPlugin.class);
76+
plugins.add(SimplePauseFieldPlugin.class);
8477
return plugins;
8578
}
8679

@@ -99,64 +92,7 @@ public List<Setting<?>> getSettings() {
9992

10093
@Before
10194
public void resetPlugin() {
102-
PauseFieldPlugin.allowEmitting = new CountDownLatch(1);
103-
PauseFieldPlugin.startEmitting = new CountDownLatch(1);
104-
}
105-
106-
public static class PauseFieldPlugin extends Plugin implements ScriptPlugin {
107-
public static CountDownLatch startEmitting = new CountDownLatch(1);
108-
public static CountDownLatch allowEmitting = new CountDownLatch(1);
109-
110-
@Override
111-
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
112-
return new ScriptEngine() {
113-
@Override
114-
115-
public String getType() {
116-
return "pause";
117-
}
118-
119-
@Override
120-
@SuppressWarnings("unchecked")
121-
public <FactoryType> FactoryType compile(
122-
String name,
123-
String code,
124-
ScriptContext<FactoryType> context,
125-
Map<String, String> params
126-
) {
127-
if (context == LongFieldScript.CONTEXT) {
128-
return (FactoryType) new LongFieldScript.Factory() {
129-
@Override
130-
public LongFieldScript.LeafFactory newFactory(
131-
String fieldName,
132-
Map<String, Object> params,
133-
SearchLookup searchLookup,
134-
OnScriptError onScriptError
135-
) {
136-
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
137-
@Override
138-
public void execute() {
139-
startEmitting.countDown();
140-
try {
141-
assertTrue(allowEmitting.await(30, TimeUnit.SECONDS));
142-
} catch (InterruptedException e) {
143-
throw new AssertionError(e);
144-
}
145-
emit(1);
146-
}
147-
};
148-
}
149-
};
150-
}
151-
throw new IllegalStateException("unsupported type " + context);
152-
}
153-
154-
@Override
155-
public Set<ScriptContext<?>> getSupportedContexts() {
156-
return Set.of(LongFieldScript.CONTEXT);
157-
}
158-
};
159-
}
95+
SimplePauseFieldPlugin.resetPlugin();
16096
}
16197

16298
/**
@@ -184,7 +120,7 @@ public void testSuccessfulPathways() throws Exception {
184120
}
185121

186122
// wait until we know that the query against 'remote-b:blocking' has started
187-
PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
123+
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
188124

189125
// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
190126
assertBusy(() -> {
@@ -234,7 +170,7 @@ public void testSuccessfulPathways() throws Exception {
234170
}
235171

236172
// allow remoteB query to proceed
237-
PauseFieldPlugin.allowEmitting.countDown();
173+
SimplePauseFieldPlugin.allowEmitting.countDown();
238174

239175
// wait until both remoteB and local queries have finished
240176
assertBusy(() -> {

0 commit comments

Comments
 (0)