diff --git a/pom.xml b/pom.xml
index 931c7f38..221defc3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -845,6 +845,12 @@
${akka.version}
test
+
+ com.mercateo
+ test-clock
+ 1.0.2
+ test
+
com.arpnetworking.metrics
@@ -895,6 +901,11 @@
kafka-clients
2.2.1
+
+ redis.clients
+ jedis
+ 3.3.0
+
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/DistinctOriginLimitingSource.java b/src/main/java/com/arpnetworking/metrics/mad/sources/DistinctOriginLimitingSource.java
new file mode 100644
index 00000000..87533e56
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/DistinctOriginLimitingSource.java
@@ -0,0 +1,321 @@
+/*
+ * Copyright 2020 Dropbox.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.arpnetworking.metrics.mad.sources;
+
+import com.arpnetworking.commons.observer.Observable;
+import com.arpnetworking.commons.observer.Observer;
+import com.arpnetworking.logback.annotations.LogValue;
+import com.arpnetworking.metrics.common.sources.BaseSource;
+import com.arpnetworking.metrics.common.sources.Source;
+import com.arpnetworking.metrics.mad.model.DefaultRecord;
+import com.arpnetworking.metrics.mad.model.Metric;
+import com.arpnetworking.metrics.mad.model.Record;
+import com.arpnetworking.metrics.mad.sources.ratelimiter.AcceptListSink;
+import com.arpnetworking.metrics.mad.sources.ratelimiter.AcceptListSource;
+import com.arpnetworking.metrics.mad.sources.ratelimiter.AcceptListStore;
+import com.arpnetworking.metrics.mad.sources.ratelimiter.AcceptListStoreUpdater;
+import com.arpnetworking.steno.LogValueMapFactory;
+import com.arpnetworking.steno.Logger;
+import com.arpnetworking.steno.LoggerFactory;
+import com.arpnetworking.tsdcore.model.DefaultKey;
+import com.arpnetworking.tsdcore.model.Key;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import net.sf.oval.constraint.NotNull;
+
+import java.time.Clock;
+import java.time.temporal.ChronoUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class DistinctOriginLimitingSource extends BaseSource {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DistinctOriginLimitingSource.class);
+ private static final long FIVE_MINUTES_IN_MILLIS = TimeUnit.MINUTES.toMillis(5);
+ private final Source _source;
+ private final DistinctOriginLimitingObserver _observer;
+ private final ScheduledExecutorService _updateExecutor;
+ private final AcceptListStoreUpdater _updater;
+
+ public DistinctOriginLimitingSource(Builder builder) {
+ super(builder);
+ _source = builder._source;
+ _observer = new DistinctOriginLimitingObserver(this, builder._hashFunction, builder._threshold);
+ _source.attach(_observer);
+ _updateExecutor = Executors.newSingleThreadScheduledExecutor(
+ runnable -> new Thread(runnable, "DistinctOriginLimiterSource"));
+ _updater = new AcceptListStoreUpdater(builder._store, _observer, _observer);
+ }
+
+ @Override
+ public void start() {
+ _updater.run();
+ _updateExecutor.scheduleWithFixedDelay(_updater, FIVE_MINUTES_IN_MILLIS, FIVE_MINUTES_IN_MILLIS, TimeUnit.MILLISECONDS);
+ _source.start();
+ }
+
+ @Override
+ public void stop() {
+ _source.stop();
+ _updateExecutor.shutdown();
+ }
+
+ // Overridden to make available for tests
+ @Override
+ protected void notify(Object event) {
+ super.notify(event);
+ }
+
+ @LogValue
+ public Object toLogValue() {
+ return LogValueMapFactory.builder(this)
+ .put("source", _source)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return toLogValue().toString();
+ }
+
+
+ /**
+ * Package private for testing
+ */
+ static final class DistinctOriginLimitingObserver implements Observer, AcceptListSink, AcceptListSource {
+ private final DistinctOriginLimitingSource _source;
+ private final HashFunction _hashFunction;
+ private final HashSet _accepted = new HashSet<>();
+ private final Integer _threshold;
+ private final Clock _clock = Clock.systemUTC();
+ private String _lastTimeBucket;
+ private ConcurrentSkipListMap> _recentOccurrences = new ConcurrentSkipListMap<>();
+ private ImmutableSet _globalAcceptList = ImmutableSet.builder().build();
+
+ /* package private */ DistinctOriginLimitingObserver(
+ final DistinctOriginLimitingSource source,
+ final HashFunction hashFunction,
+ final Integer threshold) {
+ _source = source;
+ _hashFunction = hashFunction;
+ _threshold = threshold;
+ _lastTimeBucket = getCurrentBucketStart();
+ }
+
+ @Override
+ public void notify(@NonNull final Observable observable, @NonNull final Object event) {
+ if (!(event instanceof Record)) {
+ LOGGER.error()
+ .setMessage("Observed unsupported event")
+ .addData("event", event)
+ .log();
+ return;
+ }
+
+ final Record record = (Record) event;
+ final Key key = new DefaultKey(record.getDimensions());
+ final String origin = record.getAnnotations().getOrDefault("origin", "unknown");
+ LOGGER.trace()
+ .setMessage("Sending record to aggregation actor")
+ .addData("record", record)
+ .addData("key", key)
+ .log();
+
+ final String dimensionKeys = record.getDimensions().keySet().stream()
+ .sorted()
+ .collect(Collectors.joining(","));
+
+ final Map metricToHash = record.getMetrics()
+ .keySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ value -> value,
+ value -> _hashFunction.hashBytes((value + dimensionKeys).getBytes()).toString()
+ )
+ );
+
+ final String timeBucket = getCurrentBucketStart();
+
+ // Age out old data
+ if (!_lastTimeBucket.equals(timeBucket)) {
+ _recentOccurrences = new ConcurrentSkipListMap<>(_recentOccurrences.tailMap(timeBucket, true));
+ }
+
+ // Update _recentOccurrences with the latest hashes
+ final List metricNamesToRetain = metricToHash.entrySet()
+ .stream().filter(entry -> updateAndFilterHash(entry.getValue(), origin, timeBucket))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+
+ // If we are retaining zero metrics then we can just drop the record, otherwise we need to determine
+ // if we're sending all or only part of the record.
+ if (metricNamesToRetain.size() > 0) {
+ // If all of the metrics passed the filter then just forward the event
+ // otherwise we need to build a new record that only retains the metrics remaining
+ // after filtering
+ if (metricNamesToRetain.size() == record.getMetrics().size()) {
+ _source.notify(event);
+ } else {
+ // Pair down the metrics to only those we're letting through and then send them along as a new
+ // record
+ final ImmutableMap metricsToRetain = record.getMetrics()
+ .entrySet()
+ .stream()
+ .filter(entry -> metricNamesToRetain.contains(entry.getKey()))
+ .collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ _source.notify(new DefaultRecord.Builder()
+ .setId(record.getId())
+ .setTime(record.getTime())
+ .setAnnotations(record.getAnnotations())
+ .setDimensions(record.getDimensions())
+ .setMetrics(metricsToRetain)
+ .build());
+ }
+ }
+ }
+
+ @Override
+ public void updateAcceptList(ImmutableSet acceptList) {
+ _globalAcceptList = acceptList;
+ _accepted.removeAll(_globalAcceptList);
+ }
+
+ @Override
+ public ImmutableSet getAcceptList() {
+ return ImmutableSet.copyOf(_accepted);
+ }
+
+ private boolean updateAndFilterHash(final String hash, final String origin, final String timeBucket) {
+ // Return quickly for known good
+ if ( _globalAcceptList.contains(hash)) {
+ _accepted.add(hash);
+ return true;
+ } else if (_accepted.contains(hash)) {
+ return true;
+ }
+
+ final int recent = _recentOccurrences.compute(
+ timeBucket + "_" + hash,
+ (bucket, value) -> {
+ if (value == null) {
+ value = new HashSet<>();
+ }
+
+ value.add(origin);
+ return value;
+ }).size();
+
+ if (recent > _threshold) {
+ _accepted.add(hash);
+ // We don't bother removing from the _recentOccurrences map as it will be aged out
+ // over time.
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private String getCurrentBucketStart() {
+ return "" + _clock.instant().truncatedTo(ChronoUnit.MINUTES).getEpochSecond();
+ }
+ }
+
+ /**
+ * Implementation of builder pattern for ReverseRateLimitingSource
.
+ *
+ * @author Gil Markham (gmarkham at dropbox dot com)
+ */
+ public static class Builder extends BaseSource.Builder {
+
+ /**
+ * Public constructor.
+ */
+ public Builder() {
+ super(DistinctOriginLimitingSource::new);
+ }
+
+ /**
+ * Sets wrapped source. Cannot be null.
+ *
+ * @param value The wrapped source.
+ * @return This instance of Builder
.
+ */
+ public final DistinctOriginLimitingSource.Builder setSource(final Source value) {
+ _source = value;
+ return this;
+ }
+
+ /**
+ * Sets store for persisting accepted metrics. Cannot be null.
+ *
+ * @param value The distinct origin store.
+ * @return This instance of Builder
.
+ */
+ public final DistinctOriginLimitingSource.Builder setStore(final AcceptListStore value) {
+ _store = value;
+ return this;
+ }
+
+ /**
+ * Sets the hashing function to use for hashing metric+tag values.
+ *
+ * @param value Hash function to use, defaults to SHA256
+ * @return This instance of Builder
.
+ */
+ public final DistinctOriginLimitingSource.Builder setHashFunction(final HashFunction value) {
+ _hashFunction = value;
+ return this;
+ }
+
+ /**
+ * Sets the threshold of distinct origins necessary to accept a metric+tag combination.
+ *
+ * @param value threshold of distinct origins
+ * @return This instance of Builder
.
+ */
+ public final DistinctOriginLimitingSource.Builder setThreshold(final Integer value) {
+ _threshold = value;
+ return this;
+ }
+
+
+ @Override
+ protected DistinctOriginLimitingSource.Builder self() {
+ return this;
+ }
+
+ @NotNull
+ private Source _source;
+ @NotNull
+ private AcceptListStore _store;
+ @NotNull
+ private HashFunction _hashFunction = Hashing.sha256();
+ @NotNull
+ private Integer _threshold = 10;
+ }
+
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ReverseRateLimitingSource.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ReverseRateLimitingSource.java
new file mode 100644
index 00000000..54508ae1
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ReverseRateLimitingSource.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2020 Dropbox.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.arpnetworking.metrics.mad.sources;
+
+import com.arpnetworking.commons.observer.Observable;
+import com.arpnetworking.commons.observer.Observer;
+import com.arpnetworking.logback.annotations.LogValue;
+import com.arpnetworking.metrics.common.sources.BaseSource;
+import com.arpnetworking.metrics.common.sources.Source;
+import com.arpnetworking.metrics.mad.model.DefaultRecord;
+import com.arpnetworking.metrics.mad.model.Metric;
+import com.arpnetworking.metrics.mad.model.Record;
+import com.arpnetworking.steno.LogValueMapFactory;
+import com.arpnetworking.steno.Logger;
+import com.arpnetworking.steno.LoggerFactory;
+import com.arpnetworking.tsdcore.model.DefaultKey;
+import com.arpnetworking.tsdcore.model.Key;
+import com.google.common.collect.ImmutableMap;
+import net.sf.oval.constraint.NotNull;
+import org.apache.commons.codec.digest.Md5Crypt;
+
+import java.time.Clock;
+import java.time.temporal.ChronoUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+public class ReverseRateLimitingSource extends BaseSource {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReverseRateLimitingSource.class);
+ private final Source _source;
+
+ public ReverseRateLimitingSource(Builder builder) {
+ super(builder);
+ this._source = builder._source;
+ this._source.attach(new ReverseRateLimitingObserver(this));
+ }
+
+ @Override
+ public void start() {
+ _source.start();
+ }
+
+ @Override
+ public void stop() {
+ _source.stop();
+ }
+
+ @LogValue
+ public Object toLogValue() {
+ return LogValueMapFactory.builder(this)
+ .put("source", _source)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return toLogValue().toString();
+ }
+
+
+ /**
+ * Package private for testing
+ */
+ static final class ReverseRateLimitingObserver implements Observer {
+ private final ReverseRateLimitingSource _source;
+ private final ConcurrentSkipListMap _recentOccurrences = new ConcurrentSkipListMap<>();
+ private final HashMap _globalOccurrences = new HashMap<>();
+ private final Integer _threshold = 10;
+ private final Clock _clock = Clock.systemUTC();
+
+ /* package private */ ReverseRateLimitingObserver(final ReverseRateLimitingSource source) {
+ _source = source;
+ }
+
+ @Override
+ public void notify(Observable observable, Object event) {
+ if (!(event instanceof Record)) {
+ LOGGER.error()
+ .setMessage("Observed unsupported event")
+ .addData("event", event)
+ .log();
+ return;
+ }
+
+ final Record record = (Record) event;
+ final Key key = new DefaultKey(record.getDimensions());
+ LOGGER.trace()
+ .setMessage("Sending record to aggregation actor")
+ .addData("record", record)
+ .addData("key", key)
+ .log();
+
+ final String dimensionKeys = record.getDimensions().keySet().stream()
+ .sorted()
+ .collect(Collectors.joining(","));
+
+ final Map metricToHash = record.getMetrics()
+ .keySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ value -> value,
+ value -> Md5Crypt.md5Crypt((value + dimensionKeys).getBytes())
+ )
+ );
+
+ final String timeBucket = getCurrentBucketStart();
+
+
+ // Update _recentOccurrences with the latest hashes
+ final List metricNamesToRetain = metricToHash.entrySet()
+ .stream().filter(entry -> updateAndFilterHash(entry.getValue(), timeBucket))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+
+ // If we are retaining zero metrics then we can just drop the record, otherwise we need to determine
+ // if we're sending all or only part of the record.
+ if (metricNamesToRetain.size() > 0) {
+ // If all of the metrics passed the filter then just forward the event
+ // otherwise we need to build a new record that only retains the metrics remaining
+ // after filtering
+ if (metricNamesToRetain.size() != record.getMetrics().size()) {
+ _source.notify(event);
+ } else {
+ // Pair down the metrics to only those we're letting through and then send them along as a new
+ // record
+ final ImmutableMap metricsToRetain = record.getMetrics()
+ .entrySet()
+ .stream()
+ .filter(entry -> metricNamesToRetain.contains(entry.getKey()))
+ .collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ _source.notify(new DefaultRecord.Builder()
+ .setId(record.getId())
+ .setTime(record.getTime())
+ .setAnnotations(record.getAnnotations())
+ .setDimensions(record.getDimensions())
+ .setMetrics(metricsToRetain)
+ .build());
+ }
+ }
+ }
+
+ private boolean updateAndFilterHash(final String hash, final String timeBucket) {
+ final Integer recent = _recentOccurrences.compute(
+ timeBucket + "_" + hash,
+ (bucket, value) -> value == null ? 1 : value + 1);
+ final Integer global = _globalOccurrences.getOrDefault(hash, 0);
+ return (global + recent) > _threshold;
+ }
+
+ private String getCurrentBucketStart() {
+ return "" + _clock.instant().truncatedTo(ChronoUnit.MINUTES).getEpochSecond();
+ }
+ }
+
+ /**
+ * Implementation of builder pattern for ReverseRateLimitingSource
.
+ *
+ * @author Gil Markham (gmarkham at dropbox dot com)
+ */
+ public static class Builder extends BaseSource.Builder {
+
+ /**
+ * Public constructor.
+ */
+ public Builder() {
+ super(ReverseRateLimitingSource::new);
+ }
+
+ /**
+ * Sets wrapped source. Cannot be null.
+ *
+ * @param value The wrapped source.
+ * @return This instance of Builder
.
+ */
+ public final ReverseRateLimitingSource.Builder setSource(final Source value) {
+ _source = value;
+ return this;
+ }
+
+
+ @Override
+ protected ReverseRateLimitingSource.Builder self() {
+ return this;
+ }
+
+ @NotNull
+ private Source _source;
+ }
+
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListSink.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListSink.java
new file mode 100644
index 00000000..a80f8751
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListSink.java
@@ -0,0 +1,7 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableSet;
+
+public interface AcceptListSink {
+ void updateAcceptList(ImmutableSet acceptList);
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListSource.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListSource.java
new file mode 100644
index 00000000..b9828a37
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListSource.java
@@ -0,0 +1,7 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableSet;
+
+public interface AcceptListSource {
+ ImmutableSet getAcceptList();
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListStore.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListStore.java
new file mode 100644
index 00000000..d7c2dffb
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListStore.java
@@ -0,0 +1,8 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableSet;
+
+public interface AcceptListStore {
+ ImmutableSet getInitialList();
+ ImmutableSet updateAndReturnCurrent(final ImmutableSet updated);
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListStoreUpdater.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListStoreUpdater.java
new file mode 100644
index 00000000..a2ed3ba6
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/AcceptListStoreUpdater.java
@@ -0,0 +1,18 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+public class AcceptListStoreUpdater implements Updater {
+ private final AcceptListStore _store;
+ private final AcceptListSink _sink;
+ private final AcceptListSource _source;
+
+ public AcceptListStoreUpdater(final AcceptListStore store, final AcceptListSink sink, final AcceptListSource source) {
+ _store = store;
+ _sink = sink;
+ _source = source;
+ }
+
+ @Override
+ public void run() {
+ _sink.updateAcceptList(_store.updateAndReturnCurrent(_source.getAcceptList()));
+ }
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/CachingAcceptListStore.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/CachingAcceptListStore.java
new file mode 100644
index 00000000..43fd9dec
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/CachingAcceptListStore.java
@@ -0,0 +1,187 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.arpnetworking.commons.builder.OvalBuilder;
+import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
+import com.arpnetworking.steno.Logger;
+import com.arpnetworking.steno.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import net.sf.oval.constraint.NotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+
+public class CachingAcceptListStore implements AcceptListStore {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CachingAcceptListStore.class);
+ private final AcceptListStore _store;
+ private HashSet _cache;
+ private final ObjectMapper _objectMapper = ObjectMapperFactory.getInstance();
+ private final File _cacheFile;
+ private final File _backupFile;
+ private boolean _cacheGood = false;
+
+ private CachingAcceptListStore(Builder builder) {
+ _store = builder._store;
+
+ _cacheFile = new File(builder._filePath);
+ _backupFile = new File(builder._filePath + builder._backupExtension);
+ // Try to load the cache file first
+ TypeReference> _typeRef = new TypeReference>() {
+ };
+ if (_cacheFile.exists() && _cacheFile.isFile()) {
+ try {
+ _cache = _objectMapper.readValue(_cacheFile, _typeRef);
+ _cacheGood = true;
+ } catch (IOException e) {
+ LOGGER.warn()
+ .setMessage("Failed to load cache file.")
+ .addData("file", _cacheFile)
+ .log();
+ }
+ }
+
+ // Failed to load the cache file, try the backup
+ if (_cache == null && _backupFile.exists() && _backupFile.isFile()) {
+ try {
+ _cache = _objectMapper.readValue(_backupFile, _typeRef);
+ } catch (IOException e) {
+ LOGGER.warn()
+ .setMessage("Failed to load backup cache file.")
+ .addData("file", _backupFile)
+ .log();
+ }
+ }
+
+ // Unable to load any file, start with an empty set
+ if (_cache == null) {
+ LOGGER.warn()
+ .setMessage("Unable to load cache file.")
+ .log();
+ _cache = new HashSet<>();
+ }
+ }
+
+ @Override
+ public ImmutableSet getInitialList() {
+ return ImmutableSet.copyOf(_cache);
+ }
+
+ @Override
+ public ImmutableSet updateAndReturnCurrent(final ImmutableSet updated) {
+ // We update our local cache with whatever was provided and persist it, if we get a response
+ // from the underlying store we replace what we just persisted with the store's contents.
+ // In the case that the store throws an exception, we simply return the cached response.
+ // This doesn't handle a true cold start case where the underlying store is empty.
+
+
+ ImmutableSet result = _store.updateAndReturnCurrent(updated);
+
+ if( result != null) {
+ _cache = new HashSet<>(result);
+ } else {
+ _cache.addAll(updated);
+ }
+
+ File tmpFile;
+ try {
+ tmpFile = File.createTempFile("mad_origin_cache", ".tmp");
+ tmpFile.deleteOnExit();
+ _objectMapper.writeValue(tmpFile, _cache);
+ } catch (IOException e) {
+ tmpFile = null;
+ LOGGER.warn()
+ .setMessage("Failed to create tmp cache file.")
+ .setThrowable(e)
+ .log();
+ }
+
+ if (tmpFile != null) {
+ // The primary cache file was 'good' so move it to the backup, otherwise we ignore it
+ if (_cacheGood) {
+ // Move the current file to become the backup, persist to the current file
+ if (!_cacheFile.renameTo(_backupFile)) {
+ LOGGER.warn()
+ .setMessage("Failed to move cache file to backup.")
+ .addData("file", _cacheFile)
+ .addData("backup", _backupFile)
+ .log();
+ }
+ }
+
+ // Try and move the tmp file to the main cache file
+ if (!tmpFile.renameTo(_cacheFile)) {
+ LOGGER.warn()
+ .setMessage("Failed to move temp cache file.")
+ .addData("file", _cacheFile)
+ .addData("tmpFile", tmpFile)
+ .log();
+ } else {
+ _cacheGood = true;
+ }
+
+ if (!tmpFile.delete()) {
+ LOGGER.warn()
+ .setMessage("Failed to delete temp cache file.")
+ .addData("tmpFile", tmpFile)
+ .log();
+ }
+ }
+
+ return result != null ? result: ImmutableSet.copyOf(_cache);
+ }
+
+
+ /**
+ * Implementation of builder pattern for ReverseRateLimitingSource
.
+ *
+ * @author Gil Markham (gmarkham at dropbox dot com)
+ */
+ public static class Builder extends OvalBuilder {
+ public Builder() {
+ super(CachingAcceptListStore::new);
+ }
+
+ /**
+ * Sets wrappped store. Cannot be null.
+ *
+ * @param value The distinct origin store.
+ * @return This instance of Builder
.
+ */
+ public final Builder setStore(final AcceptListStore value) {
+ _store = value;
+ return this;
+ }
+
+ /**
+ * Sets the filepath to the local filesystem cache.
+ *
+ * @param value filepath of cache file
+ * @return This instance of Builder
.
+ */
+ public final Builder setFilePath(final String value) {
+ _filePath = value;
+ return this;
+ }
+
+ /**
+ * Sets the backup file extension for the local filesystem cache, defaults to '.bak'
+ *
+ * @param value backup file extension
+ * @return This instance of Builder
.
+ */
+ public final Builder setBackupExtension(final String value) {
+ _backupExtension = value;
+ return this;
+ }
+
+ @NotNull
+ private AcceptListStore _store;
+ @NotNull
+ private String _filePath;
+ @NotNull
+ private String _backupExtension = ".bak";
+ }
+
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryAcceptListStore.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryAcceptListStore.java
new file mode 100644
index 00000000..001bb687
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryAcceptListStore.java
@@ -0,0 +1,42 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.arpnetworking.commons.builder.OvalBuilder;
+import com.google.common.collect.ImmutableSet;
+import net.sf.oval.constraint.NotNull;
+
+import java.util.HashSet;
+
+public class InMemoryAcceptListStore implements AcceptListStore {
+ private final HashSet _cache = new HashSet<>();
+
+ private InMemoryAcceptListStore(final Builder builder) {
+ _cache.addAll(builder._initialCache);
+
+ }
+
+ @Override
+ public ImmutableSet getInitialList() {
+ return ImmutableSet.copyOf(_cache);
+ }
+
+ @Override
+ public ImmutableSet updateAndReturnCurrent(final ImmutableSet updated) {
+ _cache.addAll(updated);
+ return ImmutableSet.copyOf(_cache);
+ }
+
+ public static class Builder extends OvalBuilder {
+
+ @NotNull
+ private ImmutableSet _initialCache = ImmutableSet.of();
+
+ public Builder() {
+ super(InMemoryAcceptListStore::new);
+ }
+
+ public Builder setInitialCache(final ImmutableSet value) {
+ _initialCache = value;
+ return this;
+ }
+ }
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryRateLimitStore.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryRateLimitStore.java
new file mode 100644
index 00000000..29affde6
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryRateLimitStore.java
@@ -0,0 +1,16 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InMemoryRateLimitStore implements RateLimitStore {
+ private final HashMap _store = new HashMap<>();
+
+ @Override
+ public ImmutableMap updateAndReturnCurrent(Map recent) {
+ recent.forEach((key,value) -> _store.compute(key, (k,v) -> v == null ? value : v + value));
+ return ImmutableMap.copyOf(_store);
+ }
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RateLimitStore.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RateLimitStore.java
new file mode 100644
index 00000000..36843b99
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RateLimitStore.java
@@ -0,0 +1,9 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+public interface RateLimitStore {
+ ImmutableMap updateAndReturnCurrent(Map recent);
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisAcceptListStore.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisAcceptListStore.java
new file mode 100644
index 00000000..cc25b586
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisAcceptListStore.java
@@ -0,0 +1,106 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.arpnetworking.commons.builder.OvalBuilder;
+import com.arpnetworking.steno.Logger;
+import com.arpnetworking.steno.LoggerFactory;
+import com.google.common.collect.ImmutableSet;
+import net.sf.oval.constraint.NotNull;
+import redis.clients.jedis.Jedis;
+
+import java.time.Clock;
+import java.time.temporal.ChronoUnit;
+import java.util.Set;
+
+public class RedisAcceptListStore implements AcceptListStore {
+ private final static Logger LOGGER = LoggerFactory.getLogger(RedisAcceptListStore.class);
+ private final Jedis _jedis;
+ private final Clock _clock;
+ private String lastKey;
+ private Set previousSet;
+
+ private RedisAcceptListStore(final Builder builder) {
+ _jedis = builder._jedis;
+ _clock = builder._clock;
+ }
+
+ @Override
+ public ImmutableSet getInitialList() {
+ String key = getCurrentKey();
+ final Set currentSet = _jedis.smembers(key);
+ previousSet = _jedis.smembers(getPreviousKey());
+ lastKey = key;
+
+ // Blend the previous set with the current set so that a recently created current
+ // set doesn't artificially reject metrics
+ return ImmutableSet.builder()
+ .addAll(currentSet)
+ .addAll(previousSet)
+ .build();
+ }
+
+ @Override
+ public ImmutableSet updateAndReturnCurrent(ImmutableSet seen) {
+ final String key = getCurrentKey();
+ boolean updateExpiry = false;
+
+ // If the 'current' key was updated or we weren't initialized (lastKey == null) then fetch the
+ // previous accept list and cache it until the key is updated again. The previous set shouldn't
+ // be being written to at this point so it's okay to read it once.
+ if (!key.equals(lastKey)) {
+ previousSet = _jedis.smembers(getPreviousKey());
+ lastKey = key;
+ updateExpiry = true;
+ }
+
+ final long count = _jedis.sadd(key, seen.asList().toArray(new String[seen.size()]));
+ LOGGER.info().setMessage("Added metrics to accept list").addData("count", count);
+ if (updateExpiry) {
+ _jedis.expireAt(key, currentExpiry());
+ }
+
+ final Set currentSet = _jedis.smembers(key);
+
+ // Blend the previous set with the current set so that a recently created current
+ // set doesn't artificially reject metrics
+ return ImmutableSet.builder()
+ .addAll(currentSet)
+ .addAll(previousSet)
+ .build();
+ }
+
+ private int currentExpiry() {
+ return (int) _clock.instant()
+ .truncatedTo(ChronoUnit.DAYS)
+ .plus(2, ChronoUnit.DAYS)
+ .getEpochSecond();
+ }
+
+ private String getCurrentKey() {
+ return _clock.instant().truncatedTo(ChronoUnit.DAYS).toString();
+ }
+
+ private String getPreviousKey() {
+ return _clock.instant().truncatedTo(ChronoUnit.DAYS).minus(1, ChronoUnit.DAYS).toString();
+ }
+
+ public static class Builder extends OvalBuilder {
+ @NotNull
+ private Jedis _jedis;
+ @NotNull
+ private Clock _clock = Clock.systemUTC();
+
+ public Builder() {
+ super(RedisAcceptListStore::new);
+ }
+
+ public Builder setJedis(final Jedis value) {
+ _jedis = value;
+ return this;
+ }
+
+ public Builder setClock(final Clock value) {
+ _clock = value;
+ return this;
+ }
+ }
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisRateLimitStore.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisRateLimitStore.java
new file mode 100644
index 00000000..e69bcb37
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisRateLimitStore.java
@@ -0,0 +1,38 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableMap;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Response;
+import redis.clients.jedis.Transaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RedisRateLimitStore implements RateLimitStore {
+ private final Jedis _jedis = new Jedis();
+ private final Map _cache = new HashMap<>();
+
+ @Override
+ public ImmutableMap updateAndReturnCurrent(Map recent) {
+ final Long nextExpiry = 0L;
+ final Map> responseMap = recent.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ final Transaction transaction = _jedis.multi();
+ final String key = entry.getKey();
+ final Response response = transaction.incrBy(key, entry.getValue());
+ transaction.expireAt(key, nextExpiry);
+ transaction.exec();
+ return response;
+ }
+ ));
+
+
+ responseMap.forEach((key, value) -> _cache.compute(key, (k, v) -> v == null ? value.get() : v + value.get()));
+
+ return ImmutableMap.copyOf(_cache);
+ }
+}
diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/Updater.java b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/Updater.java
new file mode 100644
index 00000000..5cc504d7
--- /dev/null
+++ b/src/main/java/com/arpnetworking/metrics/mad/sources/ratelimiter/Updater.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2020 Dropbox.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+/**
+ * Interface for classes which periodically update a data store.
+ *
+ * @author Gil Markham (gmarkham at dropbox dot com)
+ */
+public interface Updater extends Runnable {
+}
diff --git a/src/test/java/com/arpnetworking/metrics/mad/sources/DistinctOriginLimitingSourceTest.java b/src/test/java/com/arpnetworking/metrics/mad/sources/DistinctOriginLimitingSourceTest.java
new file mode 100644
index 00000000..bac9300d
--- /dev/null
+++ b/src/test/java/com/arpnetworking/metrics/mad/sources/DistinctOriginLimitingSourceTest.java
@@ -0,0 +1,260 @@
+package com.arpnetworking.metrics.mad.sources;
+
+import com.arpnetworking.commons.observer.Observable;
+import com.arpnetworking.commons.observer.Observer;
+import com.arpnetworking.metrics.common.sources.Source;
+import com.arpnetworking.metrics.mad.model.DefaultMetric;
+import com.arpnetworking.metrics.mad.model.DefaultRecord;
+import com.arpnetworking.metrics.mad.model.MetricType;
+import com.arpnetworking.metrics.mad.model.Record;
+import com.arpnetworking.metrics.mad.sources.ratelimiter.AcceptListStore;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.Hashing;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.time.ZonedDateTime;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+public class DistinctOriginLimitingSourceTest {
+ private Source _mockSource;
+ private AcceptListStore _mockStore;
+ private Observer _mockObserver;
+ private DistinctOriginLimitingSource.Builder _sourceBuilder;
+
+ @Before
+ public void setUp() {
+ _mockSource = Mockito.mock(Source.class);
+ _mockStore = Mockito.mock(AcceptListStore.class);
+ Mockito.when(_mockStore.updateAndReturnCurrent(any())).thenReturn(ImmutableSet.of());
+ _mockObserver = Mockito.mock(Observer.class);
+ _sourceBuilder = new DistinctOriginLimitingSource.Builder()
+ .setName("DistinctOriginLimitingSource")
+ .setSource(_mockSource)
+ .setStore(_mockStore);
+ }
+
+ @Test
+ public void testAttach() {
+ _sourceBuilder.build();
+ Mockito.verify(_mockSource).attach(Mockito.any(Observer.class));
+ }
+
+ @Test
+ public void testStart() {
+ _sourceBuilder.build().start();
+ Mockito.verify(_mockSource).start();
+ Mockito.verify(_mockStore).updateAndReturnCurrent(Mockito.any());
+ }
+
+ @Test
+ public void testStop() {
+ _sourceBuilder.build().stop();
+ Mockito.verify(_mockSource).stop();
+ }
+
+ @Test
+ public void testToString() {
+ final String asString = _sourceBuilder.build().toString();
+ Assert.assertNotNull(asString);
+ Assert.assertFalse(asString.isEmpty());
+ }
+
+ @Test
+ public void testUsesStoreList() {
+ final String metricHash = Hashing.sha256().hashBytes("foobar".getBytes()).toString();
+ reset(_mockStore);
+ when(_mockStore.updateAndReturnCurrent(any()))
+ .thenReturn(ImmutableSet.builder().add(metricHash).build());
+ Source source = _sourceBuilder.build();
+ source.attach(_mockObserver);
+ source.start();
+
+ notify(_mockSource, new DefaultRecord.Builder()
+ .setMetrics(ImmutableMap.of("foobar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build()))
+ .setId(UUID.randomUUID().toString())
+ .setTime(ZonedDateTime.now())
+ .build()
+ );
+
+ final ArgumentCaptor argument = ArgumentCaptor.forClass(Record.class);
+ Mockito.verify(_mockObserver).notify(Mockito.same(source), argument.capture());
+ final Record actualRecord = argument.getValue();
+ Assert.assertNotNull(actualRecord.getMetrics().get("foobar"));
+ }
+
+ @Test
+ public void testDoesNothingIfNotARecord() {
+ final String metricHash = Hashing.sha256().hashBytes("foobar".getBytes()).toString();
+ reset(_mockStore);
+ when(_mockStore.updateAndReturnCurrent(any()))
+ .thenReturn(ImmutableSet.builder().add(metricHash).build());
+ Source source = _sourceBuilder.build();
+ source.attach(_mockObserver);
+ source.start();
+
+ notify(_mockSource, new Object());
+
+ Mockito.verifyNoMoreInteractions(_mockObserver);
+ }
+
+ @Test
+ public void testFullyBlocksRecordIfNoneAccepted() {
+ final String metricHash = Hashing.sha256().hashBytes("foobar".getBytes()).toString();
+ reset(_mockStore);
+ when(_mockStore.updateAndReturnCurrent(any()))
+ .thenReturn(ImmutableSet.builder().add(metricHash).build());
+ Source source = _sourceBuilder.build();
+ source.attach(_mockObserver);
+ source.start();
+
+ notify(_mockSource, new DefaultRecord.Builder()
+ .setMetrics(ImmutableMap.of("bar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build()))
+ .setId(UUID.randomUUID().toString())
+ .setTime(ZonedDateTime.now())
+ .build()
+ );
+
+ Mockito.verifyNoMoreInteractions(_mockObserver);
+ }
+
+ @Test
+ public void testPartiallyBlocksRecordIfSomeAccepted() {
+ final String metricHash = Hashing.sha256().hashBytes("foobar".getBytes()).toString();
+ reset(_mockStore);
+ when(_mockStore.updateAndReturnCurrent(any()))
+ .thenReturn(ImmutableSet.builder().add(metricHash).build());
+ Source source = _sourceBuilder.build();
+ source.attach(_mockObserver);
+ source.start();
+
+ notify(_mockSource, new DefaultRecord.Builder()
+ .setMetrics(ImmutableMap.of("bar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build(),
+ "foobar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build()))
+ .setId(UUID.randomUUID().toString())
+ .setTime(ZonedDateTime.now())
+ .build()
+ );
+
+ final ArgumentCaptor argument = ArgumentCaptor.forClass(Record.class);
+ Mockito.verify(_mockObserver).notify(Mockito.same(source), argument.capture());
+ final Record actualRecord = argument.getValue();
+ Assert.assertEquals(1, actualRecord.getMetrics().size());
+ Assert.assertNotNull(actualRecord.getMetrics().get("foobar"));
+ }
+
+ @Test
+ public void testAcceptedAfterThresholdReached() {
+ final String metricHash = Hashing.sha256().hashBytes("foobar".getBytes()).toString();
+ reset(_mockStore);
+ when(_mockStore.updateAndReturnCurrent(any()))
+ .thenReturn(ImmutableSet.builder().add(metricHash).build());
+ Source source = _sourceBuilder.setThreshold(1).build();
+ source.attach(_mockObserver);
+ source.start();
+
+ notify(_mockSource, new DefaultRecord.Builder()
+ .setMetrics(ImmutableMap.of("bar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build(),
+ "foobar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build()))
+ .setId(UUID.randomUUID().toString())
+ .setTime(ZonedDateTime.now())
+ .build()
+ );
+
+ final ArgumentCaptor argument = ArgumentCaptor.forClass(Record.class);
+ Mockito.verify(_mockObserver).notify(Mockito.same(source), argument.capture());
+ Record actualRecord = argument.getValue();
+ Assert.assertEquals(1, actualRecord.getMetrics().size());
+ Assert.assertNotNull(actualRecord.getMetrics().get("foobar"));
+ reset(_mockObserver);
+
+ notify(_mockSource, new DefaultRecord.Builder()
+ .setAnnotations(ImmutableMap.of("origin", "1234"))
+ .setMetrics(ImmutableMap.of("bar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build(),
+ "foobar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build()))
+ .setId(UUID.randomUUID().toString())
+ .setTime(ZonedDateTime.now())
+ .build()
+ );
+
+ Mockito.verify(_mockObserver).notify(Mockito.same(source), argument.capture());
+ actualRecord = argument.getValue();
+ Assert.assertEquals(2, actualRecord.getMetrics().size());
+ Assert.assertNotNull(actualRecord.getMetrics().get("foobar"));
+ Assert.assertNotNull(actualRecord.getMetrics().get("bar"));
+ }
+
+ @Test
+ public void testObserverRecordsAllAcceptedMetrics() {
+ final DistinctOriginLimitingSource mockLimitingSource = mock(DistinctOriginLimitingSource.class);
+ final DistinctOriginLimitingSource.DistinctOriginLimitingObserver observer =
+ new DistinctOriginLimitingSource.DistinctOriginLimitingObserver(mockLimitingSource, Hashing.sha256(), 1);
+ final String metricHash = Hashing.sha256().hashBytes("foobar".getBytes()).toString();
+ observer.updateAcceptList(ImmutableSet.of(metricHash));
+
+ observer.notify(mockLimitingSource, new DefaultRecord.Builder()
+ .setMetrics(ImmutableMap.of("bar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build(),
+ "foobar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build()))
+ .setId(UUID.randomUUID().toString())
+ .setTime(ZonedDateTime.now())
+ .build()
+ );
+
+ final ArgumentCaptor argument = ArgumentCaptor.forClass(Record.class);
+ Mockito.verify(mockLimitingSource).notify(argument.capture());
+ Record actualRecord = argument.getValue();
+ Assert.assertEquals(1, actualRecord.getMetrics().size());
+ Assert.assertNotNull(actualRecord.getMetrics().get("foobar"));
+ reset(mockLimitingSource);
+
+ observer.notify(mockLimitingSource, new DefaultRecord.Builder()
+ .setAnnotations(ImmutableMap.of("origin", "1234"))
+ .setMetrics(ImmutableMap.of("bar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build(),
+ "foobar",
+ new DefaultMetric.Builder().setType(MetricType.COUNTER).build()))
+ .setId(UUID.randomUUID().toString())
+ .setTime(ZonedDateTime.now())
+ .build()
+ );
+
+ Mockito.verify(mockLimitingSource).notify(argument.capture());
+ actualRecord = argument.getValue();
+ Assert.assertEquals(2, actualRecord.getMetrics().size());
+ Assert.assertNotNull(actualRecord.getMetrics().get("foobar"));
+ Assert.assertNotNull(actualRecord.getMetrics().get("bar"));
+
+ final ImmutableSet accepted = observer.getAcceptList();
+ Assert.assertEquals(2, accepted.size());
+ Assert.assertTrue(accepted.contains(metricHash));
+ Assert.assertTrue(accepted.contains(Hashing.sha256().hashBytes("bar".getBytes()).toString()));
+ }
+
+
+ private static void notify(final Observable observable, final Object event) {
+ final ArgumentCaptor argument = ArgumentCaptor.forClass(Observer.class);
+ Mockito.verify(observable).attach(argument.capture());
+ for (final Observer observer : argument.getAllValues()) {
+ observer.notify(observable, event);
+ }
+ }
+}
diff --git a/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/CachingAcceptListStoreTest.java b/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/CachingAcceptListStoreTest.java
new file mode 100644
index 00000000..a616def7
--- /dev/null
+++ b/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/CachingAcceptListStoreTest.java
@@ -0,0 +1,208 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CachingAcceptListStoreTest extends TestCase {
+
+ @Mock
+ private AcceptListStore _mockStore;
+ @Captor
+ private ArgumentCaptor> _listCaptor;
+
+ private final ObjectMapper _objectMapper = ObjectMapperFactory.getInstance();
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testLoadsCacheFromDisk() throws IOException {
+ final File primaryFile = File.createTempFile("CachingAcceptListStoreTest", ".json");
+ primaryFile.deleteOnExit();
+ _objectMapper.writeValue(primaryFile, ImmutableSet.of("foo"));
+
+
+ final CachingAcceptListStore store = new CachingAcceptListStore.Builder()
+ .setStore(_mockStore)
+ .setFilePath(primaryFile.getAbsolutePath())
+ .setBackupExtension(".bak")
+ .build();
+
+ final ImmutableSet storeList = store.getInitialList();
+ Assert.assertNotNull(storeList);
+ Assert.assertEquals(1, storeList.size());
+ Assert.assertTrue(storeList.contains("foo"));
+ }
+
+ @Test
+ public void testLoadsBackupCacheFromDiskIfPrimaryIsMissing() throws IOException {
+ final File primaryFile = File.createTempFile("CachingAcceptListStoreTest", ".json");
+ primaryFile.delete();
+ final File backupFile = new File(primaryFile.getAbsolutePath() + ".bak");
+ backupFile.deleteOnExit();
+
+ _objectMapper.writeValue(backupFile, ImmutableSet.of("foo"));
+
+
+ final CachingAcceptListStore store = new CachingAcceptListStore.Builder()
+ .setStore(_mockStore)
+ .setFilePath(primaryFile.getAbsolutePath())
+ .setBackupExtension(".bak")
+ .build();
+
+ final ImmutableSet storeList = store.getInitialList();
+ Assert.assertNotNull(storeList);
+ Assert.assertEquals(1, storeList.size());
+ Assert.assertTrue(storeList.contains("foo"));
+ }
+
+ @Test
+ public void testLoadsBackupCacheFromDiskIfPrimaryIsBroken() throws IOException {
+ final File primaryFile = File.createTempFile("CachingAcceptListStoreTest", ".json");
+ primaryFile.deleteOnExit();
+ final FileWriter primaryFileWriter = new FileWriter(primaryFile);
+ primaryFileWriter.write("123123");
+ primaryFileWriter.close();
+ final File backupFile = new File(primaryFile.getAbsolutePath() + ".bak");
+ backupFile.deleteOnExit();
+
+ _objectMapper.writeValue(backupFile, ImmutableSet.of("foo"));
+
+
+ final CachingAcceptListStore store = new CachingAcceptListStore.Builder()
+ .setStore(_mockStore)
+ .setFilePath(primaryFile.getAbsolutePath())
+ .setBackupExtension(".bak")
+ .build();
+
+ final ImmutableSet storeList = store.getInitialList();
+ Assert.assertNotNull(storeList);
+ Assert.assertEquals(1, storeList.size());
+ Assert.assertTrue(storeList.contains("foo"));
+ }
+
+ @Test
+ public void testFallsBackToEmptyIfCacheFilesAreMissing() throws IOException {
+ final File primaryFile = File.createTempFile("CachingAcceptListStoreTest", ".json");
+ primaryFile.delete();
+
+ final CachingAcceptListStore store = new CachingAcceptListStore.Builder()
+ .setStore(_mockStore)
+ .setFilePath(primaryFile.getAbsolutePath())
+ .setBackupExtension(".bak")
+ .build();
+
+ final ImmutableSet storeList = store.getInitialList();
+ Assert.assertNotNull(storeList);
+ Assert.assertEquals(0, storeList.size());
+ }
+
+ @Test
+ public void testFallsBackToEmptyIfBothFilesAreBroken() throws IOException {
+ final File primaryFile = File.createTempFile("CachingAcceptListStoreTest", ".json");
+ primaryFile.deleteOnExit();
+ final FileWriter primaryFileWriter = new FileWriter(primaryFile);
+ primaryFileWriter.write("123123");
+ primaryFileWriter.close();
+
+ final File backupFile = new File(primaryFile.getAbsolutePath() + ".bak");
+ backupFile.deleteOnExit();
+ final FileWriter backupFileWriter = new FileWriter(backupFile);
+ backupFileWriter.write("123123");
+ backupFileWriter.close();
+
+ final CachingAcceptListStore store = new CachingAcceptListStore.Builder()
+ .setStore(_mockStore)
+ .setFilePath(primaryFile.getAbsolutePath())
+ .setBackupExtension(".bak")
+ .build();
+
+ final ImmutableSet storeList = store.getInitialList();
+ Assert.assertNotNull(storeList);
+ Assert.assertEquals(0, storeList.size());
+ }
+
+ @Test
+ public void testUpdateCallsUnderlyingStoreAndUpdatesCacheFiles() throws IOException {
+ when(_mockStore.updateAndReturnCurrent(any())).thenReturn(ImmutableSet.of("foo", "bar"));
+ final File primaryFile = File.createTempFile("CachingAcceptListStoreTest", ".json");
+ final long createDate = primaryFile.lastModified();
+ primaryFile.deleteOnExit();
+ final File backupFile = new File(primaryFile.getAbsolutePath() + ".bak");
+ backupFile.createNewFile();
+ backupFile.deleteOnExit();
+ //assertFalse(backupFile.exists());
+
+ _objectMapper.writeValue(primaryFile, ImmutableSet.of("foo"));
+
+
+ final CachingAcceptListStore store = new CachingAcceptListStore.Builder()
+ .setStore(_mockStore)
+ .setFilePath(primaryFile.getAbsolutePath())
+ .setBackupExtension(".bak")
+ .build();
+
+ store.updateAndReturnCurrent(ImmutableSet.of("bar"));
+ verify(_mockStore).updateAndReturnCurrent(_listCaptor.capture());
+
+ final ImmutableSet result = _listCaptor.getValue();
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertTrue(createDate < primaryFile.lastModified());
+ assertTrue(backupFile.exists());
+ }
+
+ @Test
+ public void testUpdateCallsUnderlyingStoreAndUpdatesPrimaryOnlyIfWhenBad() throws IOException {
+ when(_mockStore.updateAndReturnCurrent(any())).thenReturn(ImmutableSet.of("foo", "bar"));
+ final File primaryFile = File.createTempFile("CachingAcceptListStoreTest", ".json");
+ final FileWriter primaryFileWriter = new FileWriter(primaryFile);
+ primaryFileWriter.write("123124312");
+ primaryFileWriter.close();
+ final long createDate = primaryFile.lastModified();
+ primaryFile.deleteOnExit();
+
+ final File backupFile = new File(primaryFile.getAbsolutePath() + ".bak");
+ backupFile.deleteOnExit();
+
+ _objectMapper.writeValue(backupFile, ImmutableSet.of("foo"));
+ assertTrue(backupFile.exists());
+
+ final long backupModifiedDate = backupFile.lastModified();
+
+
+ final CachingAcceptListStore store = new CachingAcceptListStore.Builder()
+ .setStore(_mockStore)
+ .setFilePath(primaryFile.getAbsolutePath())
+ .setBackupExtension(".bak")
+ .build();
+
+ store.updateAndReturnCurrent(ImmutableSet.of("bar"));
+ verify(_mockStore).updateAndReturnCurrent(_listCaptor.capture());
+
+ final ImmutableSet result = _listCaptor.getValue();
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertTrue(createDate < primaryFile.lastModified());
+ assertEquals(backupModifiedDate, backupFile.lastModified());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryAcceptListStoreTest.java b/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryAcceptListStoreTest.java
new file mode 100644
index 00000000..3d9a8251
--- /dev/null
+++ b/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/InMemoryAcceptListStoreTest.java
@@ -0,0 +1,35 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableSet;
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InMemoryAcceptListStoreTest extends TestCase {
+
+ @Test
+ public void testUpdateAndReturnCurrent() {
+ final InMemoryAcceptListStore store = new InMemoryAcceptListStore.Builder()
+ .setInitialCache(ImmutableSet.of("foobar"))
+ .build();
+ ImmutableSet returnVal = store.updateAndReturnCurrent(ImmutableSet.of("foo"));
+ Assert.assertEquals(2, returnVal.size());
+ Assert.assertTrue(returnVal.contains("foo"));
+ Assert.assertTrue(returnVal.contains("foobar"));
+ returnVal = store.updateAndReturnCurrent(ImmutableSet.of("foo", "bar"));
+ Assert.assertEquals(3, returnVal.size());
+ Assert.assertTrue(returnVal.contains("foo"));
+ Assert.assertTrue(returnVal.contains("bar"));
+ Assert.assertTrue(returnVal.contains("foobar"));
+ }
+
+ @Test
+ public void testGetInitialList() {
+ final InMemoryAcceptListStore store = new InMemoryAcceptListStore.Builder()
+ .setInitialCache(ImmutableSet.of("foobar"))
+ .build();
+ final ImmutableSet returnVal = store.getInitialList();
+ Assert.assertEquals(1, returnVal.size());
+ Assert.assertTrue(returnVal.contains("foobar"));
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisAcceptListStoreTest.java b/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisAcceptListStoreTest.java
new file mode 100644
index 00000000..edbb2935
--- /dev/null
+++ b/src/test/java/com/arpnetworking/metrics/mad/sources/ratelimiter/RedisAcceptListStoreTest.java
@@ -0,0 +1,112 @@
+package com.arpnetworking.metrics.mad.sources.ratelimiter;
+
+import com.google.common.collect.ImmutableSet;
+import com.mercateo.test.clock.TestClock;
+import junit.framework.TestCase;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import redis.clients.jedis.Jedis;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class RedisAcceptListStoreTest extends TestCase {
+
+ @Mock
+ private Jedis _mockJedis;
+
+ private TestClock _clock;
+ private Instant _start;
+ private RedisAcceptListStore _store;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ _start = Clock.systemUTC().instant();
+ _clock = TestClock.fixed(_start, ZoneOffset.UTC);
+
+ _store = new RedisAcceptListStore.Builder()
+ .setClock(_clock)
+ .setJedis(_mockJedis)
+ .build();
+ }
+
+ @Test
+ public void testInitializeReadsCurrentAndPrevious() {
+ final String expectedKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).toString();
+ final String expectedPreviousKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).minus(1, ChronoUnit.DAYS).toString();
+ when(_mockJedis.smembers(eq(expectedKey))).thenReturn(ImmutableSet.of("test1"));
+ when(_mockJedis.smembers(eq(expectedPreviousKey))).thenReturn(ImmutableSet.of("test2"));
+ final ImmutableSet result = _store.getInitialList();
+ verify(_mockJedis).smembers(eq(expectedKey));
+ verify(_mockJedis).smembers(eq(expectedPreviousKey));
+ assertEquals(2, result.size());
+ assertTrue(result.contains("test1"));
+ assertTrue(result.contains("test2"));
+ }
+
+ @Test
+ public void testUsesInitialPreviousSet() {
+ final String expectedKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).toString();
+ final String expectedPreviousKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).minus(1, ChronoUnit.DAYS).toString();
+ when(_mockJedis.smembers(eq(expectedKey))).thenReturn(ImmutableSet.of("test1"));
+ when(_mockJedis.smembers(eq(expectedPreviousKey))).thenReturn(ImmutableSet.of("test2"));
+ final ImmutableSet result = _store.getInitialList();
+ assertEquals(2, result.size());
+ assertTrue(result.contains("test1"));
+ assertTrue(result.contains("test2"));
+
+ reset(_mockJedis);
+ when(_mockJedis.smembers(eq(expectedKey))).thenReturn(ImmutableSet.of("test3"));
+ when(_mockJedis.sadd(eq(expectedKey), any())).thenReturn(1L);
+ final ImmutableSet updateResult = _store.updateAndReturnCurrent(ImmutableSet.of("test3"));
+ verify(_mockJedis).smembers(eq(expectedKey));
+ verify(_mockJedis).sadd(eq(expectedKey), eq("test3"));
+ verifyNoMoreInteractions(_mockJedis);
+ assertEquals(2, updateResult.size());
+ assertTrue(updateResult.contains("test3"));
+ assertTrue(updateResult.contains("test2"));
+ }
+
+ @Test
+ public void testUpdatesPreviousSetOnKeyChange() {
+ String expectedKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).toString();
+ String expectedPreviousKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).minus(1, ChronoUnit.DAYS).toString();
+ when(_mockJedis.smembers(eq(expectedKey))).thenReturn(ImmutableSet.of("test1"));
+ when(_mockJedis.smembers(eq(expectedPreviousKey))).thenReturn(ImmutableSet.of("test2"));
+ final ImmutableSet result = _store.getInitialList();
+ assertEquals(2, result.size());
+ assertTrue(result.contains("test1"));
+ assertTrue(result.contains("test2"));
+
+ _clock.fastForward(Duration.ofDays(1));
+
+ reset(_mockJedis);
+ expectedKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).toString();
+ expectedPreviousKey = _clock.instant().truncatedTo(ChronoUnit.DAYS).minus(1, ChronoUnit.DAYS).toString();
+ when(_mockJedis.smembers(eq(expectedKey))).thenReturn(ImmutableSet.of("test3"));
+ when(_mockJedis.smembers(eq(expectedPreviousKey))).thenReturn(ImmutableSet.of("test1"));
+ when(_mockJedis.sadd(eq(expectedKey), any())).thenReturn(1L);
+ final ImmutableSet updateResult = _store.updateAndReturnCurrent(ImmutableSet.of("test3"));
+ verify(_mockJedis).smembers(eq(expectedKey));
+ verify(_mockJedis).expireAt(eq(expectedKey), eq(_clock.instant().truncatedTo(ChronoUnit.DAYS).plus(2, ChronoUnit.DAYS).getEpochSecond()));
+ verify(_mockJedis).smembers(eq(expectedPreviousKey));
+ verify(_mockJedis).sadd(eq(expectedKey), eq("test3"));
+ verifyNoMoreInteractions(_mockJedis);
+ assertEquals(2, updateResult.size());
+ assertTrue(updateResult.contains("test3"));
+ assertTrue(updateResult.contains("test1"));
+ }
+}
\ No newline at end of file