@@ -87,7 +87,8 @@ public void recordAggregateData(final PeriodicData periodicData) {
8787 periodicData .getDimensions (),
8888 _periodDimensionName ,
8989 periodicData .getPeriod (),
90- _mappedDimensions );
90+ _mappedDimensions ,
91+ _defaultDimensionValues );
9192 _buckets .computeIfAbsent (bucketKey , k -> new Bucket (bucketKey ))
9293 .record (periodicData , now );
9394 }
@@ -116,7 +117,8 @@ static Key computeKey(
116117 final Key key ,
117118 final String periodDimensionName ,
118119 final Duration period ,
119- final ImmutableMultimap <String , String > mappedDimensions ) {
120+ final ImmutableMultimap <String , String > mappedDimensions ,
121+ final ImmutableMap <String , String > defaultDimensionValues ) {
120122 // TODO(ville): Apply interning to creating Key instances
121123
122124 // Filter the input key's parameters by the ones we wish to dimension map
@@ -134,10 +136,10 @@ static Key computeKey(
134136
135137 mappedDimensions .entries ()
136138 .stream ()
137- .filter (e -> parameters .containsKey (e .getKey ()))
139+ .filter (e -> parameters .containsKey (e .getKey ()) || defaultDimensionValues . containsKey ( e . getKey ()) )
138140 .forEach (e -> dimensionsBuilder .put (
139141 e .getValue (),
140- parameters .get (e .getKey ())));
142+ parameters .getOrDefault ( e . getKey (), defaultDimensionValues . get (e .getKey () ))));
141143
142144 return new DefaultKey (dimensionsBuilder .build ());
143145 }
@@ -147,7 +149,8 @@ private void flushMetrics() {
147149 // condition between the flush+remove and any new data added.
148150 //
149151 // 1) If the bucket flush does nothing then remove it from the buckets map and
150- // add it to a "to be removed" list. This prevents further
152+ // add it to a "to be removed" list. This prevents buckets from accumulating
153+ // in memory if the tag space changes over time.
151154 //
152155 // 2) On the next flush interval re-flush all the buckets to be removed and then
153156 // actually drop them.
@@ -206,13 +209,14 @@ private static String getPeriodAsString(final Duration period) {
206209 //
207210 // The validation in the Builder ensures that all target key names
208211 // occur only once across dimensions and mappedDimensions regardless
209- // of what the source key name is (or whether it was unammped or mapped).
212+ // of what the source key name is (or whether it was unmapped or mapped).
210213 final ImmutableMultimap .Builder <String , String > dimensionsBuilder = ImmutableMultimap .builder ();
211214 builder ._dimensions .forEach (e -> dimensionsBuilder .put (e , e ));
212215 dimensionsBuilder .putAll (builder ._mappedDimensions .entrySet ());
213216
214217 // Initialize the metrics factory and metrics instance
215218 _mappedDimensions = dimensionsBuilder .build ();
219+ _defaultDimensionValues = builder ._defaultDimensionValues ;
216220 _metricsFactory = builder ._metricsFactory ;
217221 _buckets = Maps .newConcurrentMap ();
218222 _periodDimensionName = builder ._periodDimensionName ;
@@ -238,6 +242,7 @@ private PeriodicStatisticsSink(final Builder builder) {
238242 }
239243
240244 private final ImmutableMultimap <String , String > _mappedDimensions ;
245+ private final ImmutableMap <String , String > _defaultDimensionValues ;
241246 private final Map <Key , Bucket > _buckets ;
242247 private final MetricsFactory _metricsFactory ;
243248 private final Deque <Bucket > _bucketsToBeRemoved = new ConcurrentLinkedDeque <>();
@@ -329,33 +334,39 @@ public void record(final PeriodicData periodicData, final long now) {
329334
330335 // This assumes that all AggregatedData instances have the
331336 // population field set correctly (really they should).
332- _metricSamples .accumulate (firstDatum .getPopulationSize ());
337+ _metricSamples .addAndGet (firstDatum .getPopulationSize ());
333338 }
334339
335340 _age .accumulate (now - periodicData .getStart ().plus (periodicData .getPeriod ()).toInstant ().toEpochMilli ());
336341 }
337342
338343 public boolean flushMetrics () {
339- // Rotate the metrics instance
340- final Metrics metrics = _metrics .getAndSet (createMetrics ());
341-
342344 // Gather and reset state
343345 final Set <String > oldUniqueMetrics = _uniqueMetrics .getAndSet (
344346 createConcurrentSet (_uniqueMetrics .get ()));
345347 final Set <MetricKey > oldUniqueStatistics = _uniqueStatistics .getAndSet (
346348 createConcurrentSet (_uniqueStatistics .get ()));
347349
348- // Record statistics and close
349- metrics .incrementCounter (_aggregatedDataName , _aggregatedData .getAndSet (0 ));
350- metrics .incrementCounter (_uniqueMetricsName , oldUniqueMetrics .size ());
351- metrics .incrementCounter (_uniqueStatisticsName , oldUniqueStatistics .size ());
352- metrics .incrementCounter (_metricSamplesName , _metricSamples .getThenReset ());
353- metrics .setTimer (_ageName , _age .getThenReset (), TimeUnit .MILLISECONDS );
354- metrics .close ();
355-
356350 // Use unique metrics as a proxy for whether data was added. See note in the sink's
357351 // flushMetrics method about how the race condition is resolved.
358- return !oldUniqueMetrics .isEmpty ();
352+ if (!oldUniqueMetrics .isEmpty ()) {
353+ // Rotate the metrics instance
354+ final Metrics metrics = _metrics .getAndSet (createMetrics ());
355+
356+ // Record statistics and close
357+ metrics .incrementCounter (_aggregatedDataName , _aggregatedData .getAndSet (0 ));
358+ metrics .incrementCounter (_uniqueMetricsName , oldUniqueMetrics .size ());
359+ metrics .incrementCounter (_uniqueStatisticsName , oldUniqueStatistics .size ());
360+ metrics .incrementCounter (_metricSamplesName , _metricSamples .getAndSet (0 ));
361+ metrics .setTimer (_ageName , _age .getThenReset (), TimeUnit .MILLISECONDS );
362+ metrics .close ();
363+
364+ // Periodic data was flushed
365+ return true ;
366+ }
367+
368+ // No periodic data was flushed; this bucket can be cleaned up
369+ return false ;
359370 }
360371
361372 @ LogValue
@@ -371,10 +382,6 @@ public Object toLogValue() {
371382 private Metrics createMetrics () {
372383 final Metrics metrics = _metricsFactory .create ();
373384 metrics .addAnnotations (_key .getParameters ());
374- metrics .resetCounter (_aggregatedDataName );
375- metrics .resetCounter (_uniqueMetricsName );
376- metrics .resetCounter (_uniqueStatisticsName );
377- metrics .resetCounter (_metricSamplesName );
378385 return metrics ;
379386 }
380387
@@ -387,7 +394,7 @@ private Metrics createMetrics() {
387394 private final AtomicReference <Metrics > _metrics = new AtomicReference <>();
388395
389396 private final LongAccumulator _age = new LongAccumulator (Math ::max , 0 );
390- private final LongAccumulator _metricSamples = new LongAccumulator ( Long :: sum , 0 );
397+ private final AtomicLong _metricSamples = new AtomicLong ( 0 );
391398 private final AtomicLong _aggregatedData = new AtomicLong (0 );
392399 private final AtomicReference <Set <String >> _uniqueMetrics = new AtomicReference <>(
393400 ConcurrentHashMap .newKeySet ());
@@ -468,6 +475,20 @@ public Builder setMappedDimensions(final ImmutableMap<String, String> value) {
468475 return this ;
469476 }
470477
478+ /**
479+ * Supply default dimension values by original dimension key. The keys
480+ * in this map must also be specified either in {@link Builder#setDimensions(ImmutableSet)}
481+ * or as keys in {@link Builder#setMappedDimensions(ImmutableMap)}. The
482+ * key refers to the dimension name on the input {@link PeriodicData}.
483+ *
484+ * @param value The default dimension key-value pairs.
485+ * @return This instance of <code>Builder</code>.
486+ */
487+ public Builder setDefaultDimensionsValues (final ImmutableMap <String , String > value ) {
488+ _defaultDimensionValues = value ;
489+ return this ;
490+ }
491+
471492 /**
472493 * The name of the outbound dimension for the periodicity of the data.
473494 * Cannot be null. Default is "_period".
@@ -506,6 +527,9 @@ protected Builder self() {
506527 @ NotNull
507528 private ImmutableMap <String , String > _mappedDimensions = ImmutableMap .of ();
508529 @ NotNull
530+ @ CheckWith (value = CheckDefaultDimensionTargets .class )
531+ private ImmutableMap <String , String > _defaultDimensionValues = ImmutableMap .of ();
532+ @ NotNull
509533 private String _periodDimensionName = "_period" ;
510534 @ JacksonInject
511535 @ NotNull
@@ -552,7 +576,7 @@ public boolean isSatisfied(final Object validatedObject, final Object value) {
552576 LOGGER .warn ()
553577 .setMessage ("Invalid PeriodicStatisticsSink" )
554578 .addData ("reason" , "Mapped dimensions overlap with periodDimensionName" )
555- .addData ("dimensions " , builder ._periodDimensionName )
579+ .addData ("periodDimensionName " , builder ._periodDimensionName )
556580 .addData ("mappedDimensions" , builder ._mappedDimensions )
557581 .log ();
558582 return false ;
@@ -561,7 +585,7 @@ public boolean isSatisfied(final Object validatedObject, final Object value) {
561585 LOGGER .warn ()
562586 .setMessage ("Invalid PeriodicStatisticsSink" )
563587 .addData ("reason" , "(Unmapped) dimensions overlap with periodDimensionName" )
564- .addData ("dimensions " , builder ._periodDimensionName )
588+ .addData ("periodDimensionName " , builder ._periodDimensionName )
565589 .addData ("dimensions" , builder ._dimensions )
566590 .log ();
567591 return false ;
@@ -577,8 +601,40 @@ public boolean isSatisfied(final Object validatedObject, final Object value) {
577601 // mapped dimension rule for "b" are equivalent! However, the current
578602 // check disallows this.
579603 //
604+ // See test:
605+ // testValidationDimensionCollisionAlthoughLogicallyEquivalent
606+ //
580607 // TODO(ville): Once we have a use case address this.
581608
609+ return true ;
610+ }
611+ }
612+
613+ private static final class CheckDefaultDimensionTargets implements CheckWithCheck .SimpleCheck {
614+
615+ private static final long serialVersionUID = 5011108547193627318L ;
616+
617+ @ Override
618+ public boolean isSatisfied (final Object validatedObject , final Object value ) {
619+ // TODO(ville): Find a way to throw validation exceptions instead of logging.
620+
621+ if (!(validatedObject instanceof PeriodicStatisticsSink .Builder )) {
622+ return false ;
623+ }
624+ final PeriodicStatisticsSink .Builder builder = (PeriodicStatisticsSink .Builder ) validatedObject ;
625+
626+ for (final String keyToDefault : builder ._defaultDimensionValues .keySet ()) {
627+ if (!builder ._dimensions .contains (keyToDefault ) && !builder ._mappedDimensions .containsKey (keyToDefault )) {
628+ LOGGER .warn ()
629+ .setMessage ("Invalid PeriodicStatisticsSink" )
630+ .addData ("reason" , "Default dimensions key not specified in (unmapped) dimensions or mapped dimensions" )
631+ .addData ("dimensions" , builder ._dimensions )
632+ .addData ("mappedDimensions.keySet" , builder ._mappedDimensions .keySet ())
633+ .addData ("defaultDimensionKey" , keyToDefault )
634+ .log ();
635+ return false ;
636+ }
637+ }
582638
583639 return true ;
584640 }
0 commit comments