Skip to content

Commit c202547

Browse files
authored
Track and propagate MinRequestTime on the /reaggregate path (#121)
Co-authored-by: William Ehlhardt <[email protected]>
1 parent 7a099fa commit c202547

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregationBucket.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.joda.time.DateTime;
2727

2828
import java.util.Map;
29+
import java.util.Optional;
2930

3031
/**
3132
* Container class that holds aggregation pending records.
@@ -84,6 +85,10 @@ public boolean isSpecified(final Statistic statistic) {
8485
return _specified.getOrDefault(statistic, false);
8586
}
8687

88+
public Optional<DateTime> getMinRequestTime() {
89+
return _minRequestTime;
90+
}
91+
8792
/**
8893
* Add <code>AggregatedData</code> instance.
8994
*
@@ -122,9 +127,23 @@ public void update(final CombinedMetricData datum) {
122127
.log();
123128
}
124129
}
130+
131+
updateMinRequestTime(datum);
132+
}
133+
134+
private void updateMinRequestTime(final CombinedMetricData datum) {
135+
if (datum.getMinRequestTime().isPresent()) {
136+
if (!_minRequestTime.isPresent()) {
137+
_minRequestTime = datum.getMinRequestTime();
138+
}
139+
if (datum.getMinRequestTime().get().isBefore(_minRequestTime.get())) {
140+
_minRequestTime = datum.getMinRequestTime();
141+
}
142+
}
125143
}
126144

127145
private final DateTime _periodStart;
146+
private Optional<DateTime> _minRequestTime = Optional.empty();
128147
private final Map<Statistic, Calculator<?>> _data = Maps.newHashMap();
129148
private final Map<Statistic, Boolean> _specified = Maps.newHashMap();
130149

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ public Receive createReceive() {
172172
.setConditions(ImmutableList.of())
173173
.setPeriod(_period)
174174
.setStart(bucket.getPeriodStart())
175+
.setMinRequestTime(bucket.getMinRequestTime().orElse(null))
175176
.build();
176177
_emitter.tell(periodicData, getSelf());
177178
} else {

0 commit comments

Comments
 (0)