Skip to content

Commit f056d31

Browse files
authored
Canonicalize processor names and types in IngestStats (#122610) (#122630)
1 parent 3aca3c9 commit f056d31

File tree

5 files changed

+68
-11
lines changed

5 files changed

+68
-11
lines changed

docs/changelog/122610.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122610
2+
summary: Canonicalize processor names and types in `IngestStats`
3+
area: Ingest Node
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,20 +1164,35 @@ static String getProcessorName(Processor processor) {
11641164
if (processor instanceof ConditionalProcessor conditionalProcessor) {
11651165
processor = conditionalProcessor.getInnerProcessor();
11661166
}
1167-
StringBuilder sb = new StringBuilder(5);
1168-
sb.append(processor.getType());
11691167

1168+
String tag = processor.getTag();
1169+
if (tag != null && tag.isEmpty()) {
1170+
tag = null; // it simplifies the rest of the logic slightly to coalesce to null
1171+
}
1172+
1173+
String pipelineName = null;
11701174
if (processor instanceof PipelineProcessor pipelineProcessor) {
1171-
String pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
1172-
sb.append(":");
1173-
sb.append(pipelineName);
1175+
pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
11741176
}
1175-
String tag = processor.getTag();
1176-
if (tag != null && tag.isEmpty() == false) {
1177-
sb.append(":");
1178-
sb.append(tag);
1177+
1178+
// if there's a tag, OR if it's a pipeline processor, then the processor name is a compound thing,
1179+
// BUT if neither of those apply, then it's just the type -- so we can return the type itself without
1180+
// allocating a new String object
1181+
if (tag == null && pipelineName == null) {
1182+
return processor.getType();
1183+
} else {
1184+
StringBuilder sb = new StringBuilder(5);
1185+
sb.append(processor.getType());
1186+
if (pipelineName != null) {
1187+
sb.append(":");
1188+
sb.append(pipelineName);
1189+
}
1190+
if (tag != null) {
1191+
sb.append(":");
1192+
sb.append(tag);
1193+
}
1194+
return sb.toString();
11791195
}
1180-
return sb.toString();
11811196
}
11821197

11831198
/**

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.TimeUnit;
34+
import java.util.function.Function;
3435

3536
public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats)
3637
implements
@@ -57,6 +58,11 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
5758
* Read from a stream.
5859
*/
5960
public static IngestStats read(StreamInput in) throws IOException {
61+
// while reading the processors, we're going to encounter identical name and type strings *repeatedly*
62+
// it's advantageous to discard the endless copies of the same strings and canonical-ize them to keep our
63+
// heap usage under control. note: this map is key to key, because of the limitations of the set interface.
64+
final Map<String, String> namesAndTypesCache = new HashMap<>();
65+
6066
var stats = readStats(in);
6167
var size = in.readVInt();
6268
if (stats == Stats.IDENTITY && size == 0) {
@@ -76,6 +82,9 @@ public static IngestStats read(StreamInput in) throws IOException {
7682
var processorName = in.readString();
7783
var processorType = in.readString();
7884
var processorStat = readStats(in);
85+
// pass these name and type through the local names and types cache to canonical-ize them
86+
processorName = namesAndTypesCache.computeIfAbsent(processorName, Function.identity());
87+
processorType = namesAndTypesCache.computeIfAbsent(processorType, Function.identity());
7988
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
8089
}
8190
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2096,7 +2096,7 @@ public void testStatName() {
20962096
Processor processor = mock(Processor.class);
20972097
String name = randomAlphaOfLength(10);
20982098
when(processor.getType()).thenReturn(name);
2099-
assertThat(IngestService.getProcessorName(processor), equalTo(name));
2099+
assertThat(IngestService.getProcessorName(processor), sameInstance(name));
21002100
String tag = randomAlphaOfLength(10);
21012101
when(processor.getTag()).thenReturn(tag);
21022102
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020

2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.is;
2223
import static org.hamcrest.Matchers.sameInstance;
2324

2425
public class IngestStatsTests extends ESTestCase {
@@ -37,6 +38,33 @@ public void testIdentitySerialization() throws IOException {
3738
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
3839
}
3940

41+
public void testProcessorNameAndTypeIdentitySerialization() throws IOException {
42+
IngestStats.Builder builder = new IngestStats.Builder();
43+
builder.addPipelineMetrics("pipeline_id", new IngestPipelineMetric());
44+
builder.addProcessorMetrics("pipeline_id", "set", "set", new IngestMetric());
45+
builder.addProcessorMetrics("pipeline_id", "set:foo", "set", new IngestMetric());
46+
builder.addProcessorMetrics("pipeline_id", "set:bar", "set", new IngestMetric());
47+
builder.addTotalMetrics(new IngestMetric());
48+
49+
IngestStats serializedStats = serialize(builder.build());
50+
List<IngestStats.ProcessorStat> processorStats = serializedStats.processorStats().get("pipeline_id");
51+
52+
// these are just table stakes
53+
assertThat(processorStats.get(0).name(), is("set"));
54+
assertThat(processorStats.get(0).type(), is("set"));
55+
assertThat(processorStats.get(1).name(), is("set:foo"));
56+
assertThat(processorStats.get(1).type(), is("set"));
57+
assertThat(processorStats.get(2).name(), is("set:bar"));
58+
assertThat(processorStats.get(2).type(), is("set"));
59+
60+
// this is actually interesting, though -- we're canonical-izing these strings to keep our heap usage under control
61+
final String set = processorStats.get(0).name();
62+
assertThat(processorStats.get(0).name(), sameInstance(set));
63+
assertThat(processorStats.get(0).type(), sameInstance(set));
64+
assertThat(processorStats.get(1).type(), sameInstance(set));
65+
assertThat(processorStats.get(2).type(), sameInstance(set));
66+
}
67+
4068
public void testStatsMerge() {
4169
var first = randomStats();
4270
var second = randomStats();

0 commit comments

Comments
 (0)