|
9 | 9 | import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; |
10 | 10 | import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; |
11 | 11 |
|
| 12 | +import org.apache.http.HttpEntity; |
12 | 13 | import org.apache.http.util.EntityUtils; |
13 | 14 | import org.apache.lucene.search.DocIdSetIterator; |
14 | 15 | import org.elasticsearch.Build; |
|
17 | 18 | import org.elasticsearch.client.ResponseException; |
18 | 19 | import org.elasticsearch.common.io.Streams; |
19 | 20 | import org.elasticsearch.common.settings.Settings; |
| 21 | +import org.elasticsearch.common.xcontent.XContentHelper; |
20 | 22 | import org.elasticsearch.test.ListMatcher; |
21 | 23 | import org.elasticsearch.test.MapMatcher; |
22 | 24 | import org.elasticsearch.test.TestClustersThreadFilter; |
23 | 25 | import org.elasticsearch.test.cluster.ElasticsearchCluster; |
24 | 26 | import org.elasticsearch.test.cluster.LogType; |
| 27 | +import org.elasticsearch.xcontent.XContentBuilder; |
25 | 28 | import org.elasticsearch.xcontent.XContentType; |
| 29 | +import org.elasticsearch.xcontent.json.JsonXContent; |
26 | 30 | import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; |
| 31 | +import org.elasticsearch.xpack.esql.tools.ProfileParser; |
27 | 32 | import org.hamcrest.Matchers; |
28 | 33 | import org.junit.Assert; |
29 | 34 | import org.junit.ClassRule; |
30 | 35 |
|
| 36 | +import java.io.ByteArrayInputStream; |
| 37 | +import java.io.ByteArrayOutputStream; |
31 | 38 | import java.io.IOException; |
32 | 39 | import java.io.InputStream; |
33 | 40 | import java.nio.charset.StandardCharsets; |
34 | 41 | import java.util.ArrayList; |
35 | 42 | import java.util.Arrays; |
| 43 | +import java.util.HashSet; |
36 | 44 | import java.util.List; |
37 | 45 | import java.util.Locale; |
38 | 46 | import java.util.Map; |
| 47 | +import java.util.Set; |
39 | 48 |
|
40 | 49 | import static org.elasticsearch.test.ListMatcher.matchesList; |
41 | 50 | import static org.elasticsearch.test.MapMatcher.assertMap; |
42 | 51 | import static org.elasticsearch.test.MapMatcher.matchesMap; |
| 52 | +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; |
| 53 | +import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile; |
| 54 | +import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse; |
43 | 55 | import static org.hamcrest.Matchers.any; |
44 | 56 | import static org.hamcrest.Matchers.containsInAnyOrder; |
45 | 57 | import static org.hamcrest.Matchers.containsString; |
46 | 58 | import static org.hamcrest.Matchers.either; |
| 59 | +import static org.hamcrest.Matchers.empty; |
47 | 60 | import static org.hamcrest.Matchers.equalTo; |
48 | 61 | import static org.hamcrest.Matchers.greaterThan; |
| 62 | +import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
49 | 63 | import static org.hamcrest.Matchers.hasItem; |
50 | 64 | import static org.hamcrest.Matchers.instanceOf; |
51 | 65 | import static org.hamcrest.Matchers.not; |
| 66 | +import static org.hamcrest.Matchers.oneOf; |
52 | 67 | import static org.hamcrest.Matchers.startsWith; |
53 | 68 | import static org.hamcrest.core.Is.is; |
54 | 69 |
|
@@ -330,6 +345,116 @@ public void testProfile() throws IOException { |
330 | 345 | } |
331 | 346 | } |
332 | 347 |
|
| 348 | + private final String PROCESS_NAME = "process_name"; |
| 349 | + private final String THREAD_NAME = "thread_name"; |
| 350 | + |
| 351 | + @SuppressWarnings("unchecked") |
| 352 | + public void testProfileParsing() throws IOException { |
| 353 | + indexTimestampData(1); |
| 354 | + |
| 355 | + RequestObjectBuilder builder = new RequestObjectBuilder(XContentType.JSON).query(fromIndex() + " | stats avg(value)").profile(true); |
| 356 | + Request request = prepareRequestWithOptions(builder, SYNC); |
| 357 | + HttpEntity response = performRequest(request).getEntity(); |
| 358 | + |
| 359 | + ProfileParser.Profile profile; |
| 360 | + try (InputStream responseContent = response.getContent()) { |
| 361 | + profile = readProfileFromResponse(responseContent); |
| 362 | + } |
| 363 | + |
| 364 | + ByteArrayOutputStream os = new ByteArrayOutputStream(); |
| 365 | + try (XContentBuilder jsonOutputBuilder = new XContentBuilder(JsonXContent.jsonXContent, os)) { |
| 366 | + parseProfile(profile, jsonOutputBuilder); |
| 367 | + } |
| 368 | + |
| 369 | + // Read the written JSON again into a map, so we can make assertions on it |
| 370 | + ByteArrayInputStream profileJson = new ByteArrayInputStream(os.toByteArray()); |
| 371 | + Map<String, Object> parsedProfile = XContentHelper.convertToMap(JsonXContent.jsonXContent, profileJson, true); |
| 372 | + |
| 373 | + assertEquals("ns", parsedProfile.get("displayTimeUnit")); |
| 374 | + List<Map<String, Object>> events = (List<Map<String, Object>>) parsedProfile.get("traceEvents"); |
| 375 | + // At least 1 metadata event to declare the node, and 2 events each for the data, node_reduce and final drivers, resp. |
| 376 | + assertThat(events.size(), greaterThanOrEqualTo(7)); |
| 377 | + |
| 378 | + String clusterName = "test-cluster"; |
| 379 | + Set<String> expectedProcessNames = new HashSet<>(); |
| 380 | + for (int i = 0; i < cluster.getNumNodes(); i++) { |
| 381 | + expectedProcessNames.add(clusterName + ":" + cluster.getName(i)); |
| 382 | + } |
| 383 | + |
| 384 | + int seenNodes = 0; |
| 385 | + int seenDrivers = 0; |
| 386 | + // Declaration of each node as a "process" via a metadata event (phase `ph` is `M`) |
| 387 | + // First event has to declare the first seen node. |
| 388 | + Map<String, Object> nodeMetadata = events.get(0); |
| 389 | + assertProcessMetadataForNextNode(nodeMetadata, expectedProcessNames, seenNodes++); |
| 390 | + |
| 391 | + // The rest should be pairs of 2 events: first, a metadata event, declaring 1 "thread" per driver in the profile, then |
| 392 | + // a "complete" event (phase `ph` is `X`) with a timestamp, duration `dur`, thread duration `tdur` (cpu time) and additional |
| 393 | + // arguments obtained from the driver. |
| 394 | + // Except when run as part of the Serverless tests, which can involve more than 1 node - in which case, there will be more node |
| 395 | + // metadata events. |
| 396 | + for (int i = 1; i < events.size() - 1;) { |
| 397 | + String eventName = (String) events.get(i).get("name"); |
| 398 | + assertTrue(Set.of(THREAD_NAME, PROCESS_NAME).contains(eventName)); |
| 399 | + if (eventName.equals(THREAD_NAME)) { |
| 400 | + Map<String, Object> metadataEventForDriver = events.get(i); |
| 401 | + Map<String, Object> eventForDriver = events.get(i + 1); |
| 402 | + assertDriverData(metadataEventForDriver, eventForDriver, seenNodes, seenDrivers); |
| 403 | + i = i + 2; |
| 404 | + seenDrivers++; |
| 405 | + } else if (eventName.equals(PROCESS_NAME)) { |
| 406 | + Map<String, Object> metadataEventForNode = events.get(i); |
| 407 | + assertProcessMetadataForNextNode(metadataEventForNode, expectedProcessNames, seenNodes); |
| 408 | + i++; |
| 409 | + seenNodes++; |
| 410 | + } |
| 411 | + } |
| 412 | + } |
| 413 | + |
| 414 | + @SuppressWarnings("unchecked") |
| 415 | + public void assertProcessMetadataForNextNode(Map<String, Object> nodeMetadata, Set<String> expectedNamesForNodes, int seenNodes) { |
| 416 | + assertEquals("M", nodeMetadata.get("ph")); |
| 417 | + assertEquals(PROCESS_NAME, nodeMetadata.get("name")); |
| 418 | + assertEquals(seenNodes, nodeMetadata.get("pid")); |
| 419 | + |
| 420 | + Map<String, Object> nodeMetadataArgs = (Map<String, Object>) nodeMetadata.get("args"); |
| 421 | + assertTrue(expectedNamesForNodes.contains((String) nodeMetadataArgs.get("name"))); |
| 422 | + } |
| 423 | + |
| 424 | + @SuppressWarnings("unchecked") |
| 425 | + public void assertDriverData(Map<String, Object> driverMetadata, Map<String, Object> driverEvent, int seenNodes, int seenDrivers) { |
| 426 | + assertEquals("M", driverMetadata.get("ph")); |
| 427 | + assertEquals(THREAD_NAME, driverMetadata.get("name")); |
| 428 | + assertTrue((int) driverMetadata.get("pid") < seenNodes); |
| 429 | + assertEquals(seenDrivers, driverMetadata.get("tid")); |
| 430 | + Map<String, Object> driverMetadataArgs = (Map<String, Object>) driverMetadata.get("args"); |
| 431 | + String driverType = (String) driverMetadataArgs.get("name"); |
| 432 | + assertThat(driverType, oneOf("data", "node_reduce", "final")); |
| 433 | + |
| 434 | + assertEquals("X", driverEvent.get("ph")); |
| 435 | + assertThat((String) driverEvent.get("name"), startsWith(driverType)); |
| 436 | + // Category used to implicitly colour-code and group drivers |
| 437 | + assertEquals(driverType, driverEvent.get("cat")); |
| 438 | + assertTrue((int) driverEvent.get("pid") < seenNodes); |
| 439 | + assertEquals(seenDrivers, driverEvent.get("tid")); |
| 440 | + long timestampMillis = (long) driverEvent.get("ts"); |
| 441 | + double durationMicros = (double) driverEvent.get("dur"); |
| 442 | + double cpuDurationMicros = (double) driverEvent.get("tdur"); |
| 443 | + assertTrue(timestampMillis >= 0); |
| 444 | + assertTrue(durationMicros >= 0); |
| 445 | + assertTrue(cpuDurationMicros >= 0); |
| 446 | + assertTrue(durationMicros >= cpuDurationMicros); |
| 447 | + |
| 448 | + // This should contain the essential information from a driver, like its operators, and will be just attached to the slice/ |
| 449 | + // visible when clicking on it. |
| 450 | + Map<String, Object> driverSliceArgs = (Map<String, Object>) driverEvent.get("args"); |
| 451 | + assertNotNull(driverSliceArgs.get("cpu_nanos")); |
| 452 | + assertNotNull(driverSliceArgs.get("took_nanos")); |
| 453 | + assertNotNull(driverSliceArgs.get("iterations")); |
| 454 | + assertNotNull(driverSliceArgs.get("sleeps")); |
| 455 | + assertThat(((List<String>) driverSliceArgs.get("operators")), not(empty())); |
| 456 | + } |
| 457 | + |
333 | 458 | public void testProfileOrdinalsGroupingOperator() throws IOException { |
334 | 459 | assumeTrue("requires pragmas", Build.current().isSnapshot()); |
335 | 460 | indexTimestampData(1); |
|
0 commit comments