Skip to content

Commit 91f6f80

Browse files
authored
Better processor stat merge (#102821) (#102886)
1 parent bc81f68 commit 91f6f80

File tree

3 files changed

+136
-42
lines changed

3 files changed

+136
-42
lines changed

docs/changelog/102821.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 102821
2+
summary: Better processor stat merge
3+
area: Ingest Node
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.concurrent.TimeUnit;
31-
import java.util.stream.Collectors;
3231

3332
public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats)
3433
implements
@@ -263,20 +262,62 @@ static List<PipelineStat> merge(List<PipelineStat> first, List<PipelineStat> sec
263262
*/
264263
public record ProcessorStat(String name, String type, Stats stats) {
265264

266-
// The list of ProcessorStats has *always* stats for each processor (even if processor was executed or not), so it's safe to zip
267-
// both lists using a common index iterator.
268265
private static List<ProcessorStat> merge(List<ProcessorStat> first, List<ProcessorStat> second) {
269-
var merged = new ArrayList<ProcessorStat>();
270-
assert first.size() == second.size()
271-
: "stats size mismatch ["
272-
+ first.stream().map(ps -> ps.name + ":" + ps.type).collect(Collectors.joining(","))
273-
+ "] ["
274-
+ second.stream().map(ps -> ps.name + ":" + ps.type).collect(Collectors.joining(","))
275-
+ "]";
276-
for (var i = 0; i < first.size(); i++) {
277-
merged.add(new ProcessorStat(first.get(i).name, first.get(i).type, Stats.merge(first.get(i).stats, second.get(i).stats)));
266+
// in the simple case, this amounts to summing up the stats in the first and second and returning
267+
// a new list of stats that contains the sum. but there are a few not-quite-so-simple cases, too,
268+
// so this logic is a little bit intricate.
269+
270+
// total up the stats across both sides
271+
long firstIngestCountTotal = 0;
272+
for (ProcessorStat ps : first) {
273+
firstIngestCountTotal += ps.stats.ingestCount;
274+
}
275+
276+
long secondIngestCountTotal = 0;
277+
for (ProcessorStat ps : second) {
278+
secondIngestCountTotal += ps.stats.ingestCount;
279+
}
280+
281+
// early return in the case of a non-ingest node (the sum of the stats will be zero, so just return the other)
282+
if (firstIngestCountTotal == 0) {
283+
return second;
284+
} else if (secondIngestCountTotal == 0) {
285+
return first;
286+
}
287+
288+
// the list of stats can be different depending on the exact order of application of the cluster states
289+
// that apply a change to a pipeline -- figure out if they match or not (usually they match!!!)
290+
291+
// speculative execution of the expected, simple case (where we can merge the processor stats)
292+
// if we process both lists of stats and everything matches up, we can return the resulting merged list
293+
if (first.size() == second.size()) { // if the sizes of the lists don't match, then we can skip all this
294+
boolean match = true;
295+
var merged = new ArrayList<ProcessorStat>(first.size());
296+
for (var i = 0; i < first.size(); i++) {
297+
ProcessorStat ps1 = first.get(i);
298+
ProcessorStat ps2 = second.get(i);
299+
if (ps1.name.equals(ps2.name) == false || ps1.type.equals(ps2.type) == false) {
300+
match = false;
301+
break;
302+
} else {
303+
merged.add(new ProcessorStat(ps1.name, ps1.type, Stats.merge(ps1.stats, ps2.stats)));
304+
}
305+
}
306+
if (match) {
307+
return merged;
308+
}
309+
}
310+
311+
// speculative execution failed, so we're in the unfortunate case. the lists are different, and they
312+
// can't be meaningfully merged without more information. note that IngestService#innerUpdatePipelines
313+
// resets the counts if there's enough variation on an update, so we'll favor the side with the *lower*
314+
// count as being the 'newest' -- the assumption is that the higher side is just a cluster state
315+
// application away from itself being reset to zero anyway.
316+
if (firstIngestCountTotal < secondIngestCountTotal) {
317+
return first;
318+
} else {
319+
return second;
278320
}
279-
return merged;
280321
}
281322
}
282323
}

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -71,42 +71,86 @@ public void testPipelineStatsMerge() {
7171
);
7272
}
7373

74-
public void testProcessorStatsMerge() {
74+
public void testProcessorStatsMergeZeroCounts() {
7575
{
76-
var first = Map.of("pipeline-1", randomPipelineProcessorStats());
76+
var expected = randomPipelineProcessorStats();
77+
var first = Map.of("pipeline-1", expected);
78+
79+
// merging with an empty map yields the non-empty map
7780
assertEquals(IngestStats.merge(Map.of(), first), first);
7881
assertEquals(IngestStats.merge(first, Map.of()), first);
82+
83+
// it's the same exact reference, in fact
84+
assertSame(expected, IngestStats.merge(Map.of(), first).get("pipeline-1"));
85+
assertSame(expected, IngestStats.merge(first, Map.of()).get("pipeline-1"));
7986
}
8087
{
81-
var first = Map.of(
82-
"pipeline-1",
83-
randomPipelineProcessorStats(),
84-
"pipeline-2",
85-
randomPipelineProcessorStats(),
86-
"pipeline-3",
87-
randomPipelineProcessorStats()
88+
var expected = randomPipelineProcessorStats();
89+
var first = Map.of("pipeline-1", expected);
90+
var zero = List.of(
91+
new IngestStats.ProcessorStat("proc-1", "type-1", zeroStats()),
92+
new IngestStats.ProcessorStat("proc-1", "type-2", zeroStats()),
93+
new IngestStats.ProcessorStat("proc-2", "type-1", zeroStats()),
94+
new IngestStats.ProcessorStat("proc-3", "type-3", zeroStats())
8895
);
89-
var second = Map.of(
96+
var second = Map.of("pipeline-1", zero);
97+
98+
// merging with a zero map yields the non-zero map
99+
assertEquals(IngestStats.merge(second, first), first);
100+
assertEquals(IngestStats.merge(first, second), first);
101+
102+
// it's the same exact reference, in fact
103+
assertSame(expected, IngestStats.merge(second, first).get("pipeline-1"));
104+
assertSame(expected, IngestStats.merge(first, second).get("pipeline-1"));
105+
}
106+
}
107+
108+
public void testProcessorStatsMerge() {
109+
var first = Map.of(
110+
"pipeline-1",
111+
randomPipelineProcessorStats(),
112+
"pipeline-2",
113+
randomPipelineProcessorStats(),
114+
"pipeline-3",
115+
randomPipelineProcessorStats()
116+
);
117+
var second = Map.of(
118+
"pipeline-2",
119+
randomPipelineProcessorStats(),
120+
"pipeline-3",
121+
randomPipelineProcessorStats(),
122+
"pipeline-1",
123+
randomPipelineProcessorStats()
124+
);
125+
126+
assertEquals(
127+
IngestStats.merge(first, second),
128+
Map.of(
129+
"pipeline-1",
130+
expectedPipelineProcessorStats(first.get("pipeline-1"), second.get("pipeline-1")),
90131
"pipeline-2",
91-
randomPipelineProcessorStats(),
132+
expectedPipelineProcessorStats(first.get("pipeline-2"), second.get("pipeline-2")),
92133
"pipeline-3",
93-
randomPipelineProcessorStats(),
94-
"pipeline-1",
95-
randomPipelineProcessorStats()
96-
);
134+
expectedPipelineProcessorStats(first.get("pipeline-3"), second.get("pipeline-3"))
135+
)
136+
);
137+
}
97138

98-
assertEquals(
99-
IngestStats.merge(first, second),
100-
Map.of(
101-
"pipeline-1",
102-
expectedPipelineProcessorStats(first.get("pipeline-1"), second.get("pipeline-1")),
103-
"pipeline-2",
104-
expectedPipelineProcessorStats(first.get("pipeline-2"), second.get("pipeline-2")),
105-
"pipeline-3",
106-
expectedPipelineProcessorStats(first.get("pipeline-3"), second.get("pipeline-3"))
107-
)
108-
);
109-
}
139+
public void testProcessorStatsMergeHeterogeneous() {
140+
// if a pipeline has heterogeneous *non-zero* stats, then we defer to the one with a smaller total ingest count
141+
142+
var first = Map.of(
143+
"pipeline-1",
144+
List.of(
145+
new IngestStats.ProcessorStat("name-1", "type-1", new IngestStats.Stats(randomLongBetween(1, 100), 0, 0, 0)),
146+
new IngestStats.ProcessorStat("name-2", "type-2", new IngestStats.Stats(randomLongBetween(1, 100), 0, 0, 0))
147+
)
148+
);
149+
var expected = List.of(new IngestStats.ProcessorStat("name-1", "type-1", new IngestStats.Stats(1, 0, 0, 0)));
150+
var second = Map.of("pipeline-1", expected);
151+
152+
assertEquals(second, IngestStats.merge(first, second));
153+
assertSame(expected, IngestStats.merge(second, first).get("pipeline-1"));
110154
}
111155

112156
private static List<IngestStats.ProcessorStat> expectedPipelineProcessorStats(
@@ -117,7 +161,7 @@ private static List<IngestStats.ProcessorStat> expectedPipelineProcessorStats(
117161
new IngestStats.ProcessorStat("proc-1", "type-1", merge(first.get(0).stats(), second.get(0).stats())),
118162
new IngestStats.ProcessorStat("proc-1", "type-2", merge(first.get(1).stats(), second.get(1).stats())),
119163
new IngestStats.ProcessorStat("proc-2", "type-1", merge(first.get(2).stats(), second.get(2).stats())),
120-
new IngestStats.ProcessorStat("proc-3", "type-4", merge(first.get(3).stats(), second.get(3).stats()))
164+
new IngestStats.ProcessorStat("proc-3", "type-3", merge(first.get(3).stats(), second.get(3).stats()))
121165
);
122166
}
123167

@@ -126,7 +170,7 @@ private static List<IngestStats.ProcessorStat> randomPipelineProcessorStats() {
126170
randomProcessorStat("proc-1", "type-1"),
127171
randomProcessorStat("proc-1", "type-2"),
128172
randomProcessorStat("proc-2", "type-1"),
129-
randomProcessorStat("proc-3", "type-4")
173+
randomProcessorStat("proc-3", "type-3")
130174
);
131175
}
132176

@@ -216,4 +260,8 @@ private static IngestStats.PipelineStat randomPipelineStat(String id) {
216260
private static IngestStats.Stats randomStats() {
217261
return new IngestStats.Stats(randomLong(), randomLong(), randomLong(), randomLong());
218262
}
263+
264+
private static IngestStats.Stats zeroStats() {
265+
return new IngestStats.Stats(0, 0, 0, 0);
266+
}
219267
}

0 commit comments

Comments
 (0)