Skip to content

REUSABLE_DATA memory mode creates duplicate aggregator handles in DELTA temporalit #7729

@lenin-jaganathan

Description

@lenin-jaganathan

public MetricData collect(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
long startEpochNanos,
long epochNanos) {
boolean reset = aggregationTemporality == DELTA;
long start =
aggregationTemporality == DELTA
? registeredReader.getLastCollectEpochNanos()
: startEpochNanos;
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
if (reset) {
AggregatorHolder<T, U> holder = this.aggregatorHolder;
this.aggregatorHolder =
(memoryMode == REUSABLE_DATA)
? new AggregatorHolder<>(previousCollectionAggregatorHandles)
: new AggregatorHolder<>();
// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
// record operations should re-read the volatile this.aggregatorHolder.
// Repeatedly grab recordsInProgress until it is <= 1, which signals all active record
// operations are complete.
int recordsInProgress = holder.activeRecordingThreads.addAndGet(1);
while (recordsInProgress > 1) {
recordsInProgress = holder.activeRecordingThreads.get();
}
aggregatorHandles = holder.aggregatorHandles;
} else {
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
}
List<T> points;
if (memoryMode == REUSABLE_DATA) {
reusableResultList.clear();
points = reusableResultList;
} else {
points = new ArrayList<>(aggregatorHandles.size());
}
// In DELTA aggregation temporality each Attributes is reset to 0
// every time we perform a collection (by definition of DELTA).
// In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles
// (into which the values are recorded) effectively starting from 0
// for each recorded Attributes.
// In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing
// a key-value from a map and putting it again on next recording will cost an allocation,
// we are keeping the aggregator handles in their map, and only reset their value once
// we finish collecting the aggregated value from each one.
// The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory,
// hence during collect(), when the map is at full capacity, we try to clear away unused
// aggregator handles, so on next recording cycle using this map, there will be room for newly
// recorded Attributes. This comes at the expanse of memory allocations. This can be avoided
// if the user chooses to increase the maxCardinality.
if (memoryMode == REUSABLE_DATA && reset) {
if (aggregatorHandles.size() >= maxCardinality) {
aggregatorHandles.forEach(
(attribute, handle) -> {
if (!handle.hasRecordedValues()) {
aggregatorHandles.remove(attribute);
}
});
}
}
// Grab aggregated points.
aggregatorHandles.forEach(
(attributes, handle) -> {
if (!handle.hasRecordedValues()) {
return;
}
T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset);
if (reset && memoryMode == IMMUTABLE_DATA) {
// Return the aggregator to the pool.
// The pool is only used in DELTA temporality (since in CUMULATIVE the handler is
// always used as it is the place accumulating the values and never resets)
// AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid
// using the pool since it allocates memory internally on each put() or remove()
aggregatorHandlePool.offer(handle);
}
if (point != null) {
points.add(point);
}
});
// Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are
// created during collection.
int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1);
for (int i = 0; i < toDelete; i++) {
aggregatorHandlePool.poll();
}
if (reset && memoryMode == REUSABLE_DATA) {
previousCollectionAggregatorHandles = aggregatorHandles;
}
if (points.isEmpty() || !enabled) {
return EmptyMetricData.getInstance();
}
return aggregator.toMetricData(
resource, instrumentationScopeInfo, metricDescriptor, points, aggregationTemporality);
}

In DefaultSynchronousMetricStorage, when using MemoryMode.REUSABLE_DATA with AggregationTemporality.DELTA, the implementation maintains two aggregator handles per unique attribute set due to a ping-pong pattern between collection intervals.

During the collection operation, a new aggregatorHolder is created with the handles from previousCollectionAggregatorHandles. In any case, when an instrument is registered for the first time, it will not be present in the previousCollectionAggregatorHandles. So, for the next collection cycle, another duplicate handle is created for the same. Throughout the lifetime of the application, both of these objects coexist, causing an increased memory footprint in the storage. For instruments with multiple tags, this becomes a memory overhead.

One thing I could probably think of is probably hold writelock for a few more instructions for DELTA and REUSABLE_DATA, and reuse the existing handle by only resetting the values. We can also improve the attribute reset behaviour (cardinality control) in this mode to remove only those instruments that are not recorded in the last interval.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions