Skip to content

Commit 4b338f8

Browse files
authored
Canonicalize processor names and types in IngestStats (elastic#122610) (elastic#122633)
1 parent f6cc4d7 commit 4b338f8

File tree

5 files changed

+68
-11
lines changed

5 files changed

+68
-11
lines changed

docs/changelog/122610.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122610
2+
summary: Canonicalize processor names and types in `IngestStats`
3+
area: Ingest Node
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,20 +1160,35 @@ static String getProcessorName(Processor processor) {
11601160
if (processor instanceof ConditionalProcessor conditionalProcessor) {
11611161
processor = conditionalProcessor.getInnerProcessor();
11621162
}
1163-
StringBuilder sb = new StringBuilder(5);
1164-
sb.append(processor.getType());
11651163

1164+
String tag = processor.getTag();
1165+
if (tag != null && tag.isEmpty()) {
1166+
tag = null; // it simplifies the rest of the logic slightly to coalesce to null
1167+
}
1168+
1169+
String pipelineName = null;
11661170
if (processor instanceof PipelineProcessor pipelineProcessor) {
1167-
String pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
1168-
sb.append(":");
1169-
sb.append(pipelineName);
1171+
pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
11701172
}
1171-
String tag = processor.getTag();
1172-
if (tag != null && tag.isEmpty() == false) {
1173-
sb.append(":");
1174-
sb.append(tag);
1173+
1174+
// if there's a tag, OR if it's a pipeline processor, then the processor name is a compound thing,
1175+
// BUT if neither of those apply, then it's just the type -- so we can return the type itself without
1176+
// allocating a new String object
1177+
if (tag == null && pipelineName == null) {
1178+
return processor.getType();
1179+
} else {
1180+
StringBuilder sb = new StringBuilder(5);
1181+
sb.append(processor.getType());
1182+
if (pipelineName != null) {
1183+
sb.append(":");
1184+
sb.append(pipelineName);
1185+
}
1186+
if (tag != null) {
1187+
sb.append(":");
1188+
sb.append(tag);
1189+
}
1190+
return sb.toString();
11751191
}
1176-
return sb.toString();
11771192
}
11781193

11791194
/**

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.TimeUnit;
34+
import java.util.function.Function;
3435

3536
public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats)
3637
implements
@@ -57,6 +58,11 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
5758
* Read from a stream.
5859
*/
5960
public static IngestStats read(StreamInput in) throws IOException {
61+
// while reading the processors, we're going to encounter identical name and type strings *repeatedly*
62+
// it's advantageous to discard the endless copies of the same strings and canonical-ize them to keep our
63+
// heap usage under control. note: this map is key to key, because of the limitations of the set interface.
64+
final Map<String, String> namesAndTypesCache = new HashMap<>();
65+
6066
var stats = readStats(in);
6167
var size = in.readVInt();
6268
if (stats == Stats.IDENTITY && size == 0) {
@@ -78,6 +84,9 @@ public static IngestStats read(StreamInput in) throws IOException {
7884
var processorName = in.readString();
7985
var processorType = in.readString();
8086
var processorStat = readStats(in);
87+
// pass these name and type through the local names and types cache to canonical-ize them
88+
processorName = namesAndTypesCache.computeIfAbsent(processorName, Function.identity());
89+
processorType = namesAndTypesCache.computeIfAbsent(processorType, Function.identity());
8190
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
8291
}
8392
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2213,7 +2213,7 @@ public void testStatName() {
22132213
Processor processor = mock(Processor.class);
22142214
String name = randomAlphaOfLength(10);
22152215
when(processor.getType()).thenReturn(name);
2216-
assertThat(IngestService.getProcessorName(processor), equalTo(name));
2216+
assertThat(IngestService.getProcessorName(processor), sameInstance(name));
22172217
String tag = randomAlphaOfLength(10);
22182218
when(processor.getTag()).thenReturn(tag);
22192219
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020

2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.is;
2223
import static org.hamcrest.Matchers.sameInstance;
2324

2425
public class IngestStatsTests extends ESTestCase {
@@ -37,6 +38,33 @@ public void testIdentitySerialization() throws IOException {
3738
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
3839
}
3940

41+
public void testProcessorNameAndTypeIdentitySerialization() throws IOException {
42+
IngestStats.Builder builder = new IngestStats.Builder();
43+
builder.addPipelineMetrics("pipeline_id", new IngestPipelineMetric());
44+
builder.addProcessorMetrics("pipeline_id", "set", "set", new IngestMetric());
45+
builder.addProcessorMetrics("pipeline_id", "set:foo", "set", new IngestMetric());
46+
builder.addProcessorMetrics("pipeline_id", "set:bar", "set", new IngestMetric());
47+
builder.addTotalMetrics(new IngestMetric());
48+
49+
IngestStats serializedStats = serialize(builder.build());
50+
List<IngestStats.ProcessorStat> processorStats = serializedStats.processorStats().get("pipeline_id");
51+
52+
// these are just table stakes
53+
assertThat(processorStats.get(0).name(), is("set"));
54+
assertThat(processorStats.get(0).type(), is("set"));
55+
assertThat(processorStats.get(1).name(), is("set:foo"));
56+
assertThat(processorStats.get(1).type(), is("set"));
57+
assertThat(processorStats.get(2).name(), is("set:bar"));
58+
assertThat(processorStats.get(2).type(), is("set"));
59+
60+
// this is actually interesting, though -- we're canonical-izing these strings to keep our heap usage under control
61+
final String set = processorStats.get(0).name();
62+
assertThat(processorStats.get(0).name(), sameInstance(set));
63+
assertThat(processorStats.get(0).type(), sameInstance(set));
64+
assertThat(processorStats.get(1).type(), sameInstance(set));
65+
assertThat(processorStats.get(2).type(), sameInstance(set));
66+
}
67+
4068
public void testStatsMerge() {
4169
var first = randomStats();
4270
var second = randomStats();

0 commit comments

Comments
 (0)