diff --git a/app/com/arpnetworking/rollups/RollupDefinition.java b/app/com/arpnetworking/rollups/RollupDefinition.java index 4bbeb4bc5..1a709cc4a 100644 --- a/app/com/arpnetworking/rollups/RollupDefinition.java +++ b/app/com/arpnetworking/rollups/RollupDefinition.java @@ -40,6 +40,7 @@ public final class RollupDefinition implements Serializable, ConsistentHashingRo private final Instant _startTime; private final ImmutableMap _filterTags; private final ImmutableMultimap _allMetricTags; + private final Instant _giveUpAfter; private RollupDefinition(final Builder builder) { _sourceMetricName = builder._sourceMetricName; @@ -48,6 +49,7 @@ private RollupDefinition(final Builder builder) { _startTime = builder._startTime; _filterTags = builder._filterTags; _allMetricTags = builder._allMetricTags; + _giveUpAfter = builder._giveUpAfter; } public String getSourceMetricName() { @@ -74,6 +76,10 @@ public ImmutableMultimap getAllMetricTags() { return _allMetricTags; } + public Instant getGiveUpAfter() { + return _giveUpAfter; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -88,12 +94,13 @@ public boolean equals(final Object o) { && _period == that._period && _startTime.equals(that._startTime) && _filterTags.equals(that._filterTags) - && _allMetricTags.equals(that._allMetricTags); + && _allMetricTags.equals(that._allMetricTags) + && _giveUpAfter.equals(that._giveUpAfter); } @Override public int hashCode() { - return Objects.hash(_sourceMetricName, _destinationMetricName, _period, _startTime, _filterTags, _allMetricTags); + return Objects.hash(_sourceMetricName, _destinationMetricName, _period, _startTime, _filterTags, _allMetricTags, _giveUpAfter); } @Override @@ -105,6 +112,7 @@ public String toString() { .add("_startTime", _startTime) .add("_filterTags", _filterTags) .add("_allMetricTags", _allMetricTags) + .add("_giveUpAfter", _giveUpAfter) .toString(); } @@ -135,6 +143,8 @@ public static final class Builder extends OvalBuilder { private ImmutableMap _filterTags = ImmutableMap.of(); @NotNull private ImmutableMultimap _allMetricTags; + @NotNull + private Instant _giveUpAfter; /** * Creates a builder for a RollupDefinition. @@ -208,5 +218,16 @@ public Builder setAllMetricTags(final ImmutableMultimap value) { _allMetricTags = value; return this; } + + /** + * Sets the {@code _giveUpAfter} and returns a reference to this Builder so that the methods can be chained together. + * + * @param value the {@code _giveUpAfter} to set + * @return a reference to this Builder + */ + public Builder setGiveUpAfter(final Instant value) { + _giveUpAfter = value; + return this; + } } } diff --git a/app/com/arpnetworking/rollups/RollupGenerator.java b/app/com/arpnetworking/rollups/RollupGenerator.java index b7a90fef0..3416f94e1 100644 --- a/app/com/arpnetworking/rollups/RollupGenerator.java +++ b/app/com/arpnetworking/rollups/RollupGenerator.java @@ -313,7 +313,8 @@ private void handleLastDataPointMessage(final LastDataPointsMessage message) { .setSourceMetricName(message.getSourceMetricName()) .setDestinationMetricName(rollupMetricName) .setPeriod(period) - .setAllMetricTags(message.getTags()); + .setAllMetricTags(message.getTags()) + .setGiveUpAfter(Instant.now().plus(period.periodCountToDuration(1))); for (final Instant startTime : startTimes) { rollupDefBuilder.setStartTime(startTime); diff --git a/app/com/arpnetworking/rollups/RollupManager.java b/app/com/arpnetworking/rollups/RollupManager.java index c13ed4bbb..74efe2ff2 100644 --- a/app/com/arpnetworking/rollups/RollupManager.java +++ b/app/com/arpnetworking/rollups/RollupManager.java @@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; +import java.time.Instant; import java.util.Comparator; import java.util.Optional; import java.util.TreeSet; @@ -140,7 +141,29 @@ private void executorFinished(final RollupExecutor.FinishRollupMessage message) } private Optional getNextRollup() { - return Optional.ofNullable(_rollupDefinitions.pollFirst()); + while (!_rollupDefinitions.isEmpty()) { + final RollupDefinition earliest = _rollupDefinitions.pollFirst(); + if (earliest == null) { + LOGGER.error() + .setMessage("got null job out of set despite verifying non-emptiness; should be impossible") + .log(); + return Optional.empty(); + } + final Instant now = Instant.now(); + if (earliest.getGiveUpAfter().isAfter(now)) { + return Optional.of(earliest); + } + LOGGER.warn() + .setMessage("rollup definition aged out") + .addData("rollupDefinition", earliest) + .addData("timeAgedOut", now) + .log(); + final Metrics metrics = _metricsFactory.create(); + metrics.addAnnotation("rollup_metric", earliest.getDestinationMetricName()); + metrics.incrementCounter("rollup/manager/aged_out"); + metrics.close(); + } + return Optional.empty(); } private static class RollupComparator implements Comparator, Serializable { diff --git a/test/java/com/arpnetworking/metrics/portal/TestBeanFactory.java b/test/java/com/arpnetworking/metrics/portal/TestBeanFactory.java index 864cb147a..19fbf62de 100644 --- a/test/java/com/arpnetworking/metrics/portal/TestBeanFactory.java +++ b/test/java/com/arpnetworking/metrics/portal/TestBeanFactory.java @@ -238,7 +238,8 @@ public static RollupDefinition.Builder createRollupDefinitionBuilder() { .setDestinationMetricName("my_metric_1h") .setPeriod(RollupPeriod.HOURLY) .setAllMetricTags(ImmutableMultimap.of("tag", "val")) - .setFilterTags(ImmutableMap.of()); + .setFilterTags(ImmutableMap.of()) + .setGiveUpAfter(Instant.now().plus(Duration.ofDays(1))); } /** diff --git a/test/java/com/arpnetworking/rollups/RollupExecutorTest.java b/test/java/com/arpnetworking/rollups/RollupExecutorTest.java index 01f1f8264..8cf1c3b3c 100644 --- a/test/java/com/arpnetworking/rollups/RollupExecutorTest.java +++ b/test/java/com/arpnetworking/rollups/RollupExecutorTest.java @@ -29,6 +29,7 @@ import com.arpnetworking.kairos.client.models.Sampling; import com.arpnetworking.kairos.client.models.SamplingUnit; import com.arpnetworking.metrics.incubator.PeriodicMetrics; +import com.arpnetworking.metrics.portal.TestBeanFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; @@ -126,14 +127,7 @@ public void testFetchesNextRollup() { _probe.expectMsg(RollupExecutor.FETCH_ROLLUP); final RollupExecutor.FinishRollupMessage finished = ThreadLocalBuilder.build( RollupExecutor.FinishRollupMessage.Builder.class, - b -> b.setRollupDefinition(new RollupDefinition.Builder() - .setSourceMetricName("metric") - .setDestinationMetricName("metric_1h") - .setPeriod(RollupPeriod.HOURLY) - .setStartTime(Instant.EPOCH) - .setAllMetricTags(ImmutableMultimap.of()) - .build() - ) + b -> b.setRollupDefinition(TestBeanFactory.createRollupDefinitionBuilder().build()) ); actor.tell(finished, ActorRef.noSender()); _probe.expectMsg(finished); @@ -172,7 +166,7 @@ public void testPerformsRollup() { _probe.expectMsg(RollupExecutor.FETCH_ROLLUP); actor.tell( - new RollupDefinition.Builder() + TestBeanFactory.createRollupDefinitionBuilder() .setSourceMetricName("metric") .setDestinationMetricName("metric_1h") .setPeriod(RollupPeriod.HOURLY) @@ -219,7 +213,7 @@ public void testPerformsRollup() { @Test public void testBuildRollupQuery() { - RollupDefinition definition = new RollupDefinition.Builder() + RollupDefinition definition = TestBeanFactory.createRollupDefinitionBuilder() .setSourceMetricName("my_metric") .setDestinationMetricName("my_metric_1h") .setAllMetricTags(ImmutableMultimap.of("tag1", "val1", "tag2", "val2")) @@ -256,7 +250,7 @@ public void testBuildRollupQuery() { RollupExecutor.buildQueryRollup(definition) ); - definition = new RollupDefinition.Builder() + definition = TestBeanFactory.createRollupDefinitionBuilder() .setSourceMetricName("my_metric_1h") .setDestinationMetricName("my_metric_1d") .setAllMetricTags(ImmutableMultimap.of("tag1", "val1", "tag2", "val2")) diff --git a/test/java/com/arpnetworking/rollups/RollupManagerTest.java b/test/java/com/arpnetworking/rollups/RollupManagerTest.java index 3f60fe4be..80470160c 100644 --- a/test/java/com/arpnetworking/rollups/RollupManagerTest.java +++ b/test/java/com/arpnetworking/rollups/RollupManagerTest.java @@ -26,6 +26,7 @@ import com.arpnetworking.metrics.impl.NoOpMetricsFactory; import com.arpnetworking.metrics.incubator.PeriodicMetrics; import com.arpnetworking.metrics.portal.AkkaClusteringConfigFactory; +import com.arpnetworking.metrics.portal.TestBeanFactory; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; @@ -102,7 +103,7 @@ public void testStoreRollupDefinition() { final TestKit testKit = new TestKit(_system); final ActorRef actor = createActor(); final ActorRef testActor = testKit.getTestActor(); - final RollupDefinition.Builder rollupDefBuilder = new RollupDefinition.Builder() + final RollupDefinition.Builder rollupDefBuilder = TestBeanFactory.createRollupDefinitionBuilder() .setSourceMetricName("foo") .setDestinationMetricName("foo_1h") .setPeriod(RollupPeriod.HOURLY) @@ -128,12 +129,7 @@ public void testDeDupsRollups() { final TestKit testKit = new TestKit(_system); final ActorRef actor = createActor(); final ActorRef testActor = testKit.getTestActor(); - final RollupDefinition.Builder rollupDefBuilder = new RollupDefinition.Builder() - .setSourceMetricName("foo") - .setDestinationMetricName("foo_1h") - .setPeriod(RollupPeriod.HOURLY) - .setAllMetricTags(ImmutableMultimap.of("bar", "val")) - .setStartTime(Instant.EPOCH); + final RollupDefinition.Builder rollupDefBuilder = TestBeanFactory.createRollupDefinitionBuilder(); final RollupDefinition rollupDef = rollupDefBuilder.build(); final RollupDefinition rollupDef2 = rollupDefBuilder.build(); actor.tell(rollupDef, testActor); @@ -149,14 +145,9 @@ public void testReturnsRollupsInChronologicalOrder() { final TestKit testKit = new TestKit(_system); final ActorRef actor = createActor(); final ActorRef testActor = testKit.getTestActor(); - final RollupDefinition.Builder rollupDefBuilder = new RollupDefinition.Builder() - .setSourceMetricName("foo") - .setDestinationMetricName("foo_1h") - .setPeriod(RollupPeriod.HOURLY) - .setAllMetricTags(ImmutableMultimap.of("bar", "val")) - .setStartTime(Instant.EPOCH); + final RollupDefinition.Builder rollupDefBuilder = TestBeanFactory.createRollupDefinitionBuilder(); final RollupDefinition rollupDef = rollupDefBuilder.build(); - final RollupDefinition rollupDef2 = rollupDefBuilder.setStartTime(Instant.EPOCH.plus(1, ChronoUnit.HOURS)).build(); + final RollupDefinition rollupDef2 = rollupDefBuilder.setStartTime(rollupDef.getStartTime().plus(1, ChronoUnit.HOURS)).build(); actor.tell(rollupDef2, testActor); actor.tell(rollupDef, testActor); actor.tell(RollupFetch.getInstance(), testActor); @@ -174,12 +165,8 @@ public void testSplitsFailedRollups() throws Exception { final TestKit testKit = new TestKit(_system); final ActorRef actor = createActor(); final ActorRef testActor = testKit.getTestActor(); - final RollupDefinition rollupDef = new RollupDefinition.Builder() - .setSourceMetricName("foo") - .setDestinationMetricName("foo_1h") - .setPeriod(RollupPeriod.HOURLY) + final RollupDefinition rollupDef = TestBeanFactory.createRollupDefinitionBuilder() .setAllMetricTags(ImmutableMultimap.of("tag", "val1", "tag", "val2")) - .setStartTime(Instant.EPOCH) .build(); final ImmutableSet children = ImmutableSet.of("val1", "val2").stream() @@ -204,4 +191,19 @@ public void testSplitsFailedRollups() throws Exception { actor.tell(RollupFetch.getInstance(), testActor); testKit.expectMsg(NoMoreRollups.getInstance()); } + + @Test + public void testAgesOutExpiredRollups() { + final TestKit testKit = new TestKit(_system); + final ActorRef actor = createActor(); + final ActorRef testActor = testKit.getTestActor(); + final RollupDefinition rollupDef = TestBeanFactory.createRollupDefinitionBuilder() + .setGiveUpAfter(Instant.now().minusMillis(1)) + .build(); + + actor.tell(rollupDef, testActor); + + actor.tell(RollupFetch.getInstance(), testActor); + testKit.expectMsg(NoMoreRollups.getInstance()); + } }