7
7
8
8
package org .elasticsearch .xpack .remotecluster ;
9
9
10
+ import org .apache .http .util .EntityUtils ;
11
+ import org .elasticsearch .Build ;
10
12
import org .elasticsearch .action .search .SearchResponse ;
11
13
import org .elasticsearch .client .Request ;
12
14
import org .elasticsearch .client .RequestOptions ;
13
15
import org .elasticsearch .client .Response ;
14
16
import org .elasticsearch .client .ResponseException ;
15
17
import org .elasticsearch .common .UUIDs ;
16
18
import org .elasticsearch .common .settings .Settings ;
19
+ import org .elasticsearch .common .xcontent .XContentHelper ;
17
20
import org .elasticsearch .core .Strings ;
18
21
import org .elasticsearch .search .SearchHit ;
19
22
import org .elasticsearch .search .SearchResponseUtils ;
20
23
import org .elasticsearch .test .cluster .ElasticsearchCluster ;
24
+ import org .elasticsearch .test .cluster .local .distribution .DistributionType ;
21
25
import org .elasticsearch .test .cluster .util .resource .Resource ;
22
26
import org .elasticsearch .test .junit .RunnableTestRuleAdapter ;
23
27
import org .elasticsearch .xcontent .ObjectPath ;
28
+ import org .elasticsearch .xcontent .json .JsonXContent ;
24
29
import org .junit .ClassRule ;
25
30
import org .junit .rules .RuleChain ;
26
31
import org .junit .rules .TestRule ;
36
41
import java .util .concurrent .atomic .AtomicBoolean ;
37
42
import java .util .concurrent .atomic .AtomicInteger ;
38
43
import java .util .concurrent .atomic .AtomicReference ;
44
+ import java .util .function .Consumer ;
39
45
import java .util .stream .Collectors ;
40
46
41
47
import static org .hamcrest .Matchers .anEmptyMap ;
@@ -58,6 +64,7 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
58
64
59
65
static {
60
66
fulfillingCluster = ElasticsearchCluster .local ()
67
+ .distribution (DistributionType .DEFAULT )
61
68
.name ("fulfilling-cluster" )
62
69
.nodes (3 )
63
70
.apply (commonClusterConfig )
@@ -73,6 +80,7 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
73
80
.build ();
74
81
75
82
queryCluster = ElasticsearchCluster .local ()
83
+ .distribution (DistributionType .DEFAULT )
76
84
.name ("query-cluster" )
77
85
.apply (commonClusterConfig )
78
86
.setting ("xpack.security.remote_cluster_client.ssl.enabled" , () -> String .valueOf (SSL_ENABLED_REF .get ()))
@@ -137,6 +145,168 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
137
145
INVALID_SECRET_LENGTH .set (randomValueOtherThan (22 , () -> randomIntBetween (0 , 99 )));
138
146
})).around (fulfillingCluster ).around (queryCluster );
139
147
148
+ public void testTaskCancellation () throws Exception {
149
+ assumeTrue ("[error_query] is only available in snapshot builds" , Build .current ().isSnapshot ());
150
+ configureRemoteCluster ();
151
+
152
+ final String indexName = "index_fulfilling" ;
153
+ final String roleName = "taskCancellationRoleName" ;
154
+ final String userName = "taskCancellationUsername" ;
155
+ try {
156
+ // create some index on the fulfilling cluster, to be searched from the querying cluster
157
+ {
158
+ Request bulkRequest = new Request ("POST" , "/_bulk?refresh=true" );
159
+ bulkRequest .setJsonEntity (Strings .format ("""
160
+ { "index": { "_index": "%s" } }
161
+ { "foo": "bar" }
162
+ """ , indexName ));
163
+ assertOK (performRequestAgainstFulfillingCluster (bulkRequest ));
164
+ }
165
+
166
+ // Create user and role with privileges for remote indices
167
+ var putRoleRequest = new Request ("PUT" , "/_security/role/" + roleName );
168
+ putRoleRequest .setJsonEntity (Strings .format ("""
169
+ {
170
+ "remote_indices": [
171
+ {
172
+ "names": ["%s"],
173
+ "privileges": ["read", "read_cross_cluster"],
174
+ "clusters": ["my_remote_cluster"]
175
+ }
176
+ ]
177
+ }""" , indexName ));
178
+ assertOK (adminClient ().performRequest (putRoleRequest ));
179
+ var putUserRequest = new Request ("PUT" , "/_security/user/" + userName );
180
+ putUserRequest .setJsonEntity (Strings .format ("""
181
+ {
182
+ "password": "%s",
183
+ "roles" : ["%s"]
184
+ }""" , PASS , roleName ));
185
+ assertOK (adminClient ().performRequest (putUserRequest ));
186
+ var submitAsyncSearchRequest = new Request (
187
+ "POST" ,
188
+ Strings .format (
189
+ "/%s:%s/_async_search?ccs_minimize_roundtrips=%s" ,
190
+ randomFrom ("my_remote_cluster" , "*" , "my_remote_*" ),
191
+ indexName ,
192
+ randomBoolean ()
193
+ )
194
+ );
195
+
196
+ // submit a stalling remote async search
197
+ submitAsyncSearchRequest .setJsonEntity ("""
198
+ {
199
+ "query": {
200
+ "error_query": {
201
+ "indices": [
202
+ {
203
+ "name": "*:*",
204
+ "error_type": "exception",
205
+ "stall_time_seconds": 60
206
+ }
207
+ ]
208
+ }
209
+ }
210
+ }""" );
211
+ String asyncSearchOpaqueId = "async-search-opaque-id-" + randomUUID ();
212
+ submitAsyncSearchRequest .setOptions (
213
+ RequestOptions .DEFAULT .toBuilder ()
214
+ .addHeader ("Authorization" , headerFromRandomAuthMethod (userName , PASS ))
215
+ .addHeader ("X-Opaque-Id" , asyncSearchOpaqueId )
216
+ );
217
+ Response submitAsyncSearchResponse = client ().performRequest (submitAsyncSearchRequest );
218
+ assertOK (submitAsyncSearchResponse );
219
+ Map <String , Object > submitAsyncSearchResponseMap = XContentHelper .convertToMap (
220
+ JsonXContent .jsonXContent ,
221
+ EntityUtils .toString (submitAsyncSearchResponse .getEntity ()),
222
+ false
223
+ );
224
+ assertThat (submitAsyncSearchResponseMap .get ("is_running" ), equalTo (true ));
225
+ String asyncSearchId = (String ) submitAsyncSearchResponseMap .get ("id" );
226
+ assertThat (asyncSearchId , notNullValue ());
227
+ // wait for the tasks to show up on the querying cluster
228
+ assertBusy (() -> {
229
+ try {
230
+ Response queryingClusterTasks = adminClient ().performRequest (new Request ("GET" , "/_tasks" ));
231
+ assertOK (queryingClusterTasks );
232
+ Map <String , Object > responseMap = XContentHelper .convertToMap (
233
+ JsonXContent .jsonXContent ,
234
+ EntityUtils .toString (queryingClusterTasks .getEntity ()),
235
+ false
236
+ );
237
+ AtomicBoolean someTasks = new AtomicBoolean (false );
238
+ selectTasksWithOpaqueId (responseMap , asyncSearchOpaqueId , task -> {
239
+ // search tasks should not be cancelled at this point (but some transitory ones might be,
240
+ // e.g. for action "indices:admin/seq_no/global_checkpoint_sync")
241
+ if (task .get ("action" ) instanceof String action && action .contains ("indices:data/read/search" )) {
242
+ assertThat (task .get ("cancelled" ), equalTo (false ));
243
+ someTasks .set (true );
244
+ }
245
+ });
246
+ assertTrue (someTasks .get ());
247
+ } catch (IOException e ) {
248
+ throw new RuntimeException (e );
249
+ }
250
+ });
251
+ // wait for the tasks to show up on the fulfilling cluster
252
+ assertBusy (() -> {
253
+ try {
254
+ Response fulfillingClusterTasks = performRequestAgainstFulfillingCluster (new Request ("GET" , "/_tasks" ));
255
+ assertOK (fulfillingClusterTasks );
256
+ Map <String , Object > responseMap = XContentHelper .convertToMap (
257
+ JsonXContent .jsonXContent ,
258
+ EntityUtils .toString (fulfillingClusterTasks .getEntity ()),
259
+ false
260
+ );
261
+ AtomicBoolean someTasks = new AtomicBoolean (false );
262
+ selectTasksWithOpaqueId (responseMap , asyncSearchOpaqueId , task -> {
263
+ // search tasks should not be cancelled at this point (but some transitory ones might be,
264
+ // e.g. for action "indices:admin/seq_no/global_checkpoint_sync")
265
+ if (task .get ("action" ) instanceof String action && action .contains ("indices:data/read/search" )) {
266
+ assertThat (task .get ("cancelled" ), equalTo (false ));
267
+ someTasks .set (true );
268
+ }
269
+ });
270
+ assertTrue (someTasks .get ());
271
+ } catch (IOException e ) {
272
+ throw new RuntimeException (e );
273
+ }
274
+ });
275
+ // delete the stalling async search
276
+ var deleteAsyncSearchRequest = new Request ("DELETE" , Strings .format ("/_async_search/%s" , asyncSearchId ));
277
+ deleteAsyncSearchRequest .setOptions (
278
+ RequestOptions .DEFAULT .toBuilder ().addHeader ("Authorization" , headerFromRandomAuthMethod (userName , PASS ))
279
+ );
280
+ assertOK (client ().performRequest (deleteAsyncSearchRequest ));
281
+ // ensure any remaining tasks are all cancelled on the querying cluster
282
+ {
283
+ Response queryingClusterTasks = adminClient ().performRequest (new Request ("GET" , "/_tasks" ));
284
+ assertOK (queryingClusterTasks );
285
+ Map <String , Object > responseMap = XContentHelper .convertToMap (
286
+ JsonXContent .jsonXContent ,
287
+ EntityUtils .toString (queryingClusterTasks .getEntity ()),
288
+ false
289
+ );
290
+ selectTasksWithOpaqueId (responseMap , asyncSearchOpaqueId , task -> assertThat (task .get ("cancelled" ), equalTo (true )));
291
+ }
292
+ // ensure any remaining tasks are all cancelled on the fulfilling cluster
293
+ {
294
+ Response fulfillingClusterTasks = performRequestAgainstFulfillingCluster (new Request ("GET" , "/_tasks" ));
295
+ assertOK (fulfillingClusterTasks );
296
+ Map <String , Object > responseMap = XContentHelper .convertToMap (
297
+ JsonXContent .jsonXContent ,
298
+ EntityUtils .toString (fulfillingClusterTasks .getEntity ()),
299
+ false
300
+ );
301
+ selectTasksWithOpaqueId (responseMap , asyncSearchOpaqueId , task -> assertThat (task .get ("cancelled" ), equalTo (true )));
302
+ }
303
+ } finally {
304
+ assertOK (adminClient ().performRequest (new Request ("DELETE" , "/_security/user/" + userName )));
305
+ assertOK (adminClient ().performRequest (new Request ("DELETE" , "/_security/role/" + roleName )));
306
+ assertOK (performRequestAgainstFulfillingCluster (new Request ("DELETE" , indexName )));
307
+ }
308
+ }
309
+
140
310
public void testCrossClusterSearch () throws Exception {
141
311
configureRemoteCluster ();
142
312
final String crossClusterAccessApiKeyId = (String ) API_KEY_MAP_REF .get ().get ("id" );
@@ -446,4 +616,24 @@ private Response performRequestWithLocalSearchUser(final Request request) throws
446
616
);
447
617
return client ().performRequest (request );
448
618
}
619
+
620
+ @ SuppressWarnings ("unchecked" )
621
+ private static void selectTasksWithOpaqueId (
622
+ Map <String , Object > tasksResponse ,
623
+ String opaqueId ,
624
+ Consumer <Map <String , Object >> taskConsumer
625
+ ) {
626
+ Map <String , Map <String , Object >> nodes = (Map <String , Map <String , Object >>) tasksResponse .get ("nodes" );
627
+ for (Map <String , Object > node : nodes .values ()) {
628
+ Map <String , Map <String , Object >> tasks = (Map <String , Map <String , Object >>) node .get ("tasks" );
629
+ for (Map <String , Object > task : tasks .values ()) {
630
+ if (task .get ("headers" ) != null ) {
631
+ Map <String , Object > headers = (Map <String , Object >) task .get ("headers" );
632
+ if (opaqueId .equals (headers .get ("X-Opaque-Id" ))) {
633
+ taskConsumer .accept (task );
634
+ }
635
+ }
636
+ }
637
+ }
638
+ }
449
639
}
0 commit comments