Skip to content

Commit ee667c4

Browse files
authored
[Transform] Extract common test code to TransformCommonRestTestCase class (#107103)
1 parent 8716188 commit ee667c4

File tree

11 files changed

+157
-208
lines changed

11 files changed

+157
-208
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
apply plugin: 'elasticsearch.internal-java-rest-test'
2+
3+
dependencies {
4+
api project(':libs:elasticsearch-x-content')
5+
api project(':test:framework')
6+
api project(xpackModule('core'))
7+
}
8+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.transform.integration.common;
9+
10+
import org.apache.logging.log4j.Level;
11+
import org.elasticsearch.client.Request;
12+
import org.elasticsearch.client.Response;
13+
import org.elasticsearch.client.ResponseException;
14+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
15+
import org.elasticsearch.test.rest.ESRestTestCase;
16+
import org.elasticsearch.xpack.core.transform.TransformField;
17+
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
18+
19+
import java.io.IOException;
20+
import java.time.Instant;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Locale;
24+
import java.util.Map;
25+
import java.util.concurrent.TimeUnit;
26+
27+
public abstract class TransformCommonRestTestCase extends ESRestTestCase {
28+
29+
protected static final String TRANSFORM_ENDPOINT = TransformField.REST_BASE_PATH_TRANSFORMS;
30+
protected static final String AUTH_KEY = "Authorization";
31+
protected static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";
32+
33+
protected static String getTransformEndpoint() {
34+
return TRANSFORM_ENDPOINT;
35+
}
36+
37+
/**
38+
* Returns the list of transform tasks as reported by the _tasks API.
39+
*/
40+
@SuppressWarnings("unchecked")
41+
protected List<String> getTransformTasks() throws IOException {
42+
Request tasksRequest = new Request("GET", "/_tasks");
43+
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
44+
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
45+
46+
Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
47+
if (nodes == null) {
48+
return List.of();
49+
}
50+
51+
List<String> foundTasks = new ArrayList<>();
52+
for (Map.Entry<String, Object> node : nodes.entrySet()) {
53+
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
54+
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
55+
if (tasks != null) {
56+
foundTasks.addAll(tasks.keySet());
57+
}
58+
}
59+
return foundTasks;
60+
}
61+
62+
/**
63+
* Returns the list of transform tasks for the given transform as reported by the _cluster/state API.
64+
*/
65+
@SuppressWarnings("unchecked")
66+
protected List<String> getTransformTasksFromClusterState(String transformId) throws IOException {
67+
Request request = new Request("GET", "_cluster/state");
68+
Map<String, Object> response = entityAsMap(adminClient().performRequest(request));
69+
70+
List<Map<String, Object>> tasks = (List<Map<String, Object>>) XContentMapValues.extractValue(
71+
response,
72+
"metadata",
73+
"persistent_tasks",
74+
"tasks"
75+
);
76+
77+
return tasks.stream().map(t -> (String) t.get("id")).filter(transformId::equals).toList();
78+
}
79+
80+
@SuppressWarnings("unchecked")
81+
protected void logAudits() throws Exception {
82+
logger.info("writing audit messages to the log");
83+
Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
84+
searchRequest.setJsonEntity("""
85+
{
86+
"size": 100,
87+
"sort": [ { "timestamp": { "order": "asc" } } ]
88+
}""");
89+
90+
assertBusy(() -> {
91+
try {
92+
refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);
93+
Response searchResponse = client().performRequest(searchRequest);
94+
95+
Map<String, Object> searchResult = entityAsMap(searchResponse);
96+
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
97+
"hits.hits",
98+
searchResult
99+
);
100+
101+
for (Map<String, Object> hit : searchHits) {
102+
Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
103+
String level = (String) source.getOrDefault("level", "info");
104+
logger.log(
105+
Level.getLevel(level.toUpperCase(Locale.ROOT)),
106+
"Transform audit: [{}] [{}] [{}] [{}]",
107+
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
108+
source.getOrDefault("transform_id", "n/a"),
109+
source.getOrDefault("message", "n/a"),
110+
source.getOrDefault("node_name", "n/a")
111+
);
112+
}
113+
} catch (ResponseException e) {
114+
// see gh#54810, wrap temporary 503's as assertion error for retry
115+
if (e.getResponse().getStatusLine().getStatusCode() != 503) {
116+
throw e;
117+
}
118+
throw new AssertionError("Failed to retrieve audit logs", e);
119+
}
120+
}, 5, TimeUnit.SECONDS);
121+
}
122+
123+
protected void refreshIndex(String index) throws IOException {
124+
Request refreshRequest = new Request("POST", index + "/_refresh");
125+
assertOK(adminClient().performRequest(refreshRequest));
126+
}
127+
}

x-pack/plugin/transform/qa/multi-node-tests/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ apply plugin: 'elasticsearch.legacy-java-rest-test'
33
dependencies {
44
javaRestTestImplementation(testArtifact(project(xpackModule('core'))))
55
javaRestTestImplementation project(path: xpackModule('transform'))
6+
javaRestTestImplementation project(path: xpackModule('transform:qa:common'))
67
}
78

89
// location for keys and certificates

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/LatestIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void testLatest() throws Exception {
126126
waitUntilCheckpoint(transformConfig.getId(), 1L);
127127
stopTransform(transformConfig.getId());
128128

129-
refreshIndex(destIndexName, RequestOptions.DEFAULT);
129+
refreshIndex(destIndexName);
130130
var mappings = getIndexMapping(destIndexName, RequestOptions.DEFAULT);
131131
assertThat(
132132
(Map<String, Object>) XContentMapValues.extractValue(destIndexName + ".mappings", mappings),

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

Lines changed: 5 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
2323
import org.elasticsearch.xcontent.XContentBuilder;
2424
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
25-
import org.elasticsearch.xpack.core.transform.TransformField;
2625
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
2726
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
2827
import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
@@ -37,9 +36,7 @@
3736

3837
import java.io.IOException;
3938
import java.time.Instant;
40-
import java.util.ArrayList;
4139
import java.util.HashMap;
42-
import java.util.List;
4340
import java.util.Map;
4441
import java.util.concurrent.TimeUnit;
4542
import java.util.concurrent.atomic.AtomicInteger;
@@ -247,23 +244,23 @@ public void testTransformLifecycleInALoop() throws Exception {
247244
// Create the continuous transform
248245
putTransform(transformId, config, RequestOptions.DEFAULT);
249246
assertThat(getTransformTasks(), is(empty()));
250-
assertThatTransformTaskDoesNotExist(transformId);
247+
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
251248

252249
startTransform(transformId, RequestOptions.DEFAULT);
253250
// There is 1 transform task after start
254251
assertThat(getTransformTasks(), hasSize(1));
255-
assertThatTransformTaskExists(transformId);
252+
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
256253

257254
Thread.sleep(sleepAfterStartMillis);
258255
// There should still be 1 transform task as the transform is continuous
259256
assertThat(getTransformTasks(), hasSize(1));
260-
assertThatTransformTaskExists(transformId);
257+
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
261258

262259
// Stop the transform with force set randomly
263260
stopTransform(transformId, true, null, false, force);
264261
// After the transform is stopped, there should be no transform task left
265262
assertThat(getTransformTasks(), is(empty()));
266-
assertThatTransformTaskDoesNotExist(transformId);
263+
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
267264

268265
// Delete the transform
269266
deleteTransform(transformId);
@@ -303,63 +300,6 @@ private String createConfig(String transformId, String sourceIndex, String destI
303300
return Strings.toString(config);
304301
}
305302

306-
/**
307-
* Returns the list of transform tasks as reported by _tasks API.
308-
*/
309-
@SuppressWarnings("unchecked")
310-
protected List<String> getTransformTasks() throws IOException {
311-
final Request tasksRequest = new Request("GET", "/_tasks");
312-
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
313-
final Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
314-
315-
Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
316-
if (nodes == null) {
317-
return List.of();
318-
}
319-
320-
List<String> foundTasks = new ArrayList<>();
321-
for (Map.Entry<String, Object> node : nodes.entrySet()) {
322-
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
323-
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
324-
if (tasks != null) {
325-
foundTasks.addAll(tasks.keySet());
326-
}
327-
}
328-
return foundTasks;
329-
}
330-
331-
/**
332-
* Verifies that the given transform task exists in cluster state.
333-
*/
334-
private void assertThatTransformTaskExists(String transformId) throws IOException {
335-
assertThatTransformTaskCountIsEqualTo(transformId, 1);
336-
}
337-
338-
/**
339-
* Verifies that the given transform task does not exist in cluster state.
340-
*/
341-
private void assertThatTransformTaskDoesNotExist(String transformId) throws IOException {
342-
assertThatTransformTaskCountIsEqualTo(transformId, 0);
343-
}
344-
345-
/**
346-
* Verifies that the number of transform tasks in cluster state for the given transform is as expected.
347-
*/
348-
@SuppressWarnings("unchecked")
349-
private void assertThatTransformTaskCountIsEqualTo(String transformId, int expectedCount) throws IOException {
350-
Request request = new Request("GET", "_cluster/state");
351-
Map<String, Object> response = entityAsMap(adminClient().performRequest(request));
352-
353-
List<Map<String, Object>> tasks = (List<Map<String, Object>>) XContentMapValues.extractValue(
354-
response,
355-
"metadata",
356-
"persistent_tasks",
357-
"tasks"
358-
);
359-
360-
assertThat("Tasks were: " + tasks, tasks.stream().filter(t -> transformId.equals(t.get("id"))).toList(), hasSize(expectedCount));
361-
}
362-
363303
public void testContinuousTransformUpdate() throws Exception {
364304
String indexName = "continuous-reviews-update";
365305
createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
@@ -447,7 +387,7 @@ public void testContinuousTransformUpdate() throws Exception {
447387
assertOK(searchResponse);
448388
var responseMap = entityAsMap(searchResponse);
449389
assertThat((Integer) XContentMapValues.extractValue("hits.total.value", responseMap), greaterThan(0));
450-
refreshIndex(dest, RequestOptions.DEFAULT);
390+
refreshIndex(dest);
451391
}, 30, TimeUnit.SECONDS);
452392

453393
stopTransform(config.getId());

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.http.client.methods.HttpGet;
1111
import org.apache.http.entity.ContentType;
1212
import org.apache.http.entity.StringEntity;
13-
import org.apache.logging.log4j.Level;
1413
import org.elasticsearch.client.Request;
1514
import org.elasticsearch.client.RequestOptions;
1615
import org.elasticsearch.client.Response;
@@ -27,7 +26,6 @@
2726
import org.elasticsearch.search.SearchModule;
2827
import org.elasticsearch.search.aggregations.AggregatorFactories;
2928
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
30-
import org.elasticsearch.test.rest.ESRestTestCase;
3129
import org.elasticsearch.xcontent.NamedXContentRegistry;
3230
import org.elasticsearch.xcontent.ToXContent;
3331
import org.elasticsearch.xcontent.XContentBuilder;
@@ -40,22 +38,20 @@
4038
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
4139
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
4240
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
43-
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
4441
import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
4542
import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource;
4643
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
4744
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
4845
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
46+
import org.elasticsearch.xpack.transform.integration.common.TransformCommonRestTestCase;
4947

5048
import java.io.IOException;
5149
import java.nio.charset.StandardCharsets;
52-
import java.time.Instant;
5350
import java.time.ZoneId;
5451
import java.util.Base64;
5552
import java.util.Collections;
5653
import java.util.HashSet;
5754
import java.util.List;
58-
import java.util.Locale;
5955
import java.util.Map;
6056
import java.util.Set;
6157
import java.util.concurrent.TimeUnit;
@@ -67,9 +63,8 @@
6763
import static org.hamcrest.Matchers.hasSize;
6864
import static org.hamcrest.core.Is.is;
6965

70-
public abstract class TransformRestTestCase extends ESRestTestCase {
66+
public abstract class TransformRestTestCase extends TransformCommonRestTestCase {
7167

72-
protected static String TRANSFORM_ENDPOINT = "/_transform/";
7368
protected static final String AUTH_KEY = "Authorization";
7469
protected static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";
7570

@@ -81,49 +76,6 @@ protected void cleanUp() throws Exception {
8176
waitForPendingTasks();
8277
}
8378

84-
@SuppressWarnings("unchecked")
85-
private void logAudits() throws Exception {
86-
logger.info("writing audit messages to the log");
87-
Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
88-
searchRequest.setJsonEntity("""
89-
{
90-
"size": 100,
91-
"sort": [ { "timestamp": { "order": "asc" } } ]
92-
}""");
93-
94-
assertBusy(() -> {
95-
try {
96-
refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN, RequestOptions.DEFAULT);
97-
Response searchResponse = client().performRequest(searchRequest);
98-
99-
Map<String, Object> searchResult = entityAsMap(searchResponse);
100-
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
101-
"hits.hits",
102-
searchResult
103-
);
104-
105-
for (Map<String, Object> hit : searchHits) {
106-
Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
107-
String level = (String) source.getOrDefault("level", "info");
108-
logger.log(
109-
Level.getLevel(level.toUpperCase(Locale.ROOT)),
110-
"Transform audit: [{}] [{}] [{}] [{}]",
111-
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
112-
source.getOrDefault("transform_id", "n/a"),
113-
source.getOrDefault("message", "n/a"),
114-
source.getOrDefault("node_name", "n/a")
115-
);
116-
}
117-
} catch (ResponseException e) {
118-
// see gh#54810, wrap temporary 503's as assertion error for retry
119-
if (e.getResponse().getStatusLine().getStatusCode() != 503) {
120-
throw e;
121-
}
122-
throw new AssertionError("Failed to retrieve audit logs", e);
123-
}
124-
}, 5, TimeUnit.SECONDS);
125-
}
126-
12779
protected void cleanUpTransforms() throws IOException {
12880
for (String id : createdTransformIds) {
12981
try {
@@ -140,12 +92,6 @@ protected void cleanUpTransforms() throws IOException {
14092
createdTransformIds.clear();
14193
}
14294

143-
protected void refreshIndex(String index, RequestOptions options) throws IOException {
144-
var r = new Request("POST", index + "/_refresh");
145-
r.setOptions(options);
146-
assertOK(adminClient().performRequest(r));
147-
}
148-
14995
protected Map<String, Object> getIndexMapping(String index, RequestOptions options) throws IOException {
15096
var r = new Request("GET", "/" + index + "/_mapping");
15197
r.setOptions(options);

0 commit comments

Comments
 (0)