Skip to content

Commit d59a0d9

Browse files
authored
Canonicalize processor names and types in IngestStats (#122610)
1 parent 9141335 commit d59a0d9

File tree

5 files changed

+68
-11
lines changed

5 files changed

+68
-11
lines changed

docs/changelog/122610.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122610
2+
summary: Canonicalize processor names and types in `IngestStats`
3+
area: Ingest Node
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,20 +1195,35 @@ static String getProcessorName(Processor processor) {
11951195
if (processor instanceof ConditionalProcessor conditionalProcessor) {
11961196
processor = conditionalProcessor.getInnerProcessor();
11971197
}
1198-
StringBuilder sb = new StringBuilder(5);
1199-
sb.append(processor.getType());
12001198

1199+
String tag = processor.getTag();
1200+
if (tag != null && tag.isEmpty()) {
1201+
tag = null; // it simplifies the rest of the logic slightly to coalesce to null
1202+
}
1203+
1204+
String pipelineName = null;
12011205
if (processor instanceof PipelineProcessor pipelineProcessor) {
1202-
String pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
1203-
sb.append(":");
1204-
sb.append(pipelineName);
1206+
pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
12051207
}
1206-
String tag = processor.getTag();
1207-
if (tag != null && tag.isEmpty() == false) {
1208-
sb.append(":");
1209-
sb.append(tag);
1208+
1209+
// if there's a tag, OR if it's a pipeline processor, then the processor name is a compound thing,
1210+
// BUT if neither of those apply, then it's just the type -- so we can return the type itself without
1211+
// allocating a new String object
1212+
if (tag == null && pipelineName == null) {
1213+
return processor.getType();
1214+
} else {
1215+
StringBuilder sb = new StringBuilder(5);
1216+
sb.append(processor.getType());
1217+
if (pipelineName != null) {
1218+
sb.append(":");
1219+
sb.append(pipelineName);
1220+
}
1221+
if (tag != null) {
1222+
sb.append(":");
1223+
sb.append(tag);
1224+
}
1225+
return sb.toString();
12101226
}
1211-
return sb.toString();
12121227
}
12131228

12141229
/**

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.TimeUnit;
34+
import java.util.function.Function;
3435

3536
public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats)
3637
implements
@@ -57,6 +58,11 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
5758
* Read from a stream.
5859
*/
5960
public static IngestStats read(StreamInput in) throws IOException {
61+
// while reading the processors, we're going to encounter identical name and type strings *repeatedly*
62+
// it's advantageous to discard the endless copies of the same strings and canonical-ize them to keep our
63+
// heap usage under control. note: this map is key to key, because of the limitations of the set interface.
64+
final Map<String, String> namesAndTypesCache = new HashMap<>();
65+
6066
var stats = readStats(in);
6167
var size = in.readVInt();
6268
if (stats == Stats.IDENTITY && size == 0) {
@@ -76,6 +82,9 @@ public static IngestStats read(StreamInput in) throws IOException {
7682
var processorName = in.readString();
7783
var processorType = in.readString();
7884
var processorStat = readStats(in);
85+
// pass these name and type through the local names and types cache to canonical-ize them
86+
processorName = namesAndTypesCache.computeIfAbsent(processorName, Function.identity());
87+
processorType = namesAndTypesCache.computeIfAbsent(processorType, Function.identity());
7988
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
8089
}
8190
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2181,7 +2181,7 @@ public void testStatName() {
21812181
Processor processor = mock(Processor.class);
21822182
String name = randomAlphaOfLength(10);
21832183
when(processor.getType()).thenReturn(name);
2184-
assertThat(IngestService.getProcessorName(processor), equalTo(name));
2184+
assertThat(IngestService.getProcessorName(processor), sameInstance(name));
21852185
String tag = randomAlphaOfLength(10);
21862186
when(processor.getTag()).thenReturn(tag);
21872187
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020

2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.is;
2223
import static org.hamcrest.Matchers.sameInstance;
2324

2425
public class IngestStatsTests extends ESTestCase {
@@ -37,6 +38,33 @@ public void testIdentitySerialization() throws IOException {
3738
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
3839
}
3940

41+
public void testProcessorNameAndTypeIdentitySerialization() throws IOException {
42+
IngestStats.Builder builder = new IngestStats.Builder();
43+
builder.addPipelineMetrics("pipeline_id", new IngestPipelineMetric());
44+
builder.addProcessorMetrics("pipeline_id", "set", "set", new IngestMetric());
45+
builder.addProcessorMetrics("pipeline_id", "set:foo", "set", new IngestMetric());
46+
builder.addProcessorMetrics("pipeline_id", "set:bar", "set", new IngestMetric());
47+
builder.addTotalMetrics(new IngestMetric());
48+
49+
IngestStats serializedStats = serialize(builder.build());
50+
List<IngestStats.ProcessorStat> processorStats = serializedStats.processorStats().get("pipeline_id");
51+
52+
// these are just table stakes
53+
assertThat(processorStats.get(0).name(), is("set"));
54+
assertThat(processorStats.get(0).type(), is("set"));
55+
assertThat(processorStats.get(1).name(), is("set:foo"));
56+
assertThat(processorStats.get(1).type(), is("set"));
57+
assertThat(processorStats.get(2).name(), is("set:bar"));
58+
assertThat(processorStats.get(2).type(), is("set"));
59+
60+
// this is actually interesting, though -- we're canonical-izing these strings to keep our heap usage under control
61+
final String set = processorStats.get(0).name();
62+
assertThat(processorStats.get(0).name(), sameInstance(set));
63+
assertThat(processorStats.get(0).type(), sameInstance(set));
64+
assertThat(processorStats.get(1).type(), sameInstance(set));
65+
assertThat(processorStats.get(2).type(), sameInstance(set));
66+
}
67+
4068
public void testStatsMerge() {
4169
var first = randomStats();
4270
var second = randomStats();

0 commit comments

Comments
 (0)