Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122610.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122610
summary: Canonicalize processor names and types in `IngestStats`
area: Ingest Node
type: bug
issues: []
35 changes: 25 additions & 10 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1164,20 +1164,35 @@ static String getProcessorName(Processor processor) {
if (processor instanceof ConditionalProcessor conditionalProcessor) {
processor = conditionalProcessor.getInnerProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());

String tag = processor.getTag();
if (tag != null && tag.isEmpty()) {
tag = null; // it simplifies the rest of the logic slightly to coalesce to null
}

String pipelineName = null;
if (processor instanceof PipelineProcessor pipelineProcessor) {
String pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
sb.append(":");
sb.append(pipelineName);
pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
}
String tag = processor.getTag();
if (tag != null && tag.isEmpty() == false) {
sb.append(":");
sb.append(tag);

// if there's a tag, OR if it's a pipeline processor, then the processor name is a compound thing,
// BUT if neither of those apply, then it's just the type -- so we can return the type itself without
// allocating a new String object
if (tag == null && pipelineName == null) {
return processor.getType();
} else {
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
if (pipelineName != null) {
sb.append(":");
sb.append(pipelineName);
}
if (tag != null) {
sb.append(":");
sb.append(tag);
}
return sb.toString();
}
return sb.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats)
implements
Expand All @@ -57,6 +58,11 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
* Read from a stream.
*/
public static IngestStats read(StreamInput in) throws IOException {
// while reading the processors, we're going to encounter identical name and type strings *repeatedly*
// it's advantageous to discard the endless copies of the same strings and canonical-ize them to keep our
// heap usage under control. note: this map is key to key, because of the limitations of the set interface.
final Map<String, String> namesAndTypesCache = new HashMap<>();

var stats = readStats(in);
var size = in.readVInt();
if (stats == Stats.IDENTITY && size == 0) {
Expand All @@ -76,6 +82,9 @@ public static IngestStats read(StreamInput in) throws IOException {
var processorName = in.readString();
var processorType = in.readString();
var processorStat = readStats(in);
// pass these name and type through the local names and types cache to canonical-ize them
processorName = namesAndTypesCache.computeIfAbsent(processorName, Function.identity());
processorType = namesAndTypesCache.computeIfAbsent(processorType, Function.identity());
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
}
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2096,7 +2096,7 @@ public void testStatName() {
Processor processor = mock(Processor.class);
String name = randomAlphaOfLength(10);
when(processor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(processor), equalTo(name));
assertThat(IngestService.getProcessorName(processor), sameInstance(name));
String tag = randomAlphaOfLength(10);
when(processor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;

public class IngestStatsTests extends ESTestCase {
Expand All @@ -37,6 +38,33 @@ public void testIdentitySerialization() throws IOException {
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
}

public void testProcessorNameAndTypeIdentitySerialization() throws IOException {
IngestStats.Builder builder = new IngestStats.Builder();
builder.addPipelineMetrics("pipeline_id", new IngestPipelineMetric());
builder.addProcessorMetrics("pipeline_id", "set", "set", new IngestMetric());
builder.addProcessorMetrics("pipeline_id", "set:foo", "set", new IngestMetric());
builder.addProcessorMetrics("pipeline_id", "set:bar", "set", new IngestMetric());
builder.addTotalMetrics(new IngestMetric());

IngestStats serializedStats = serialize(builder.build());
List<IngestStats.ProcessorStat> processorStats = serializedStats.processorStats().get("pipeline_id");

// these are just table stakes
assertThat(processorStats.get(0).name(), is("set"));
assertThat(processorStats.get(0).type(), is("set"));
assertThat(processorStats.get(1).name(), is("set:foo"));
assertThat(processorStats.get(1).type(), is("set"));
assertThat(processorStats.get(2).name(), is("set:bar"));
assertThat(processorStats.get(2).type(), is("set"));

// this is actually interesting, though -- we're canonical-izing these strings to keep our heap usage under control
final String set = processorStats.get(0).name();
assertThat(processorStats.get(0).name(), sameInstance(set));
assertThat(processorStats.get(0).type(), sameInstance(set));
assertThat(processorStats.get(1).type(), sameInstance(set));
assertThat(processorStats.get(2).type(), sameInstance(set));
}

public void testStatsMerge() {
var first = randomStats();
var second = randomStats();
Expand Down