@@ -15,16 +15,6 @@ use super::{
1515 precomputed_sum:: PrecomputedSum , sum:: Sum , Number ,
1616} ;
1717
18- pub ( crate ) const STREAM_CARDINALITY_LIMIT : usize = 2000 ;
19-
20- /// Checks whether aggregator has hit cardinality limit for metric streams
21- pub ( crate ) fn is_under_cardinality_limit ( _size : usize ) -> bool {
22- true
23-
24- // TODO: Implement this feature, after allowing the ability to customize the cardinality limit.
25- // size < STREAM_CARDINALITY_LIMIT
26- }
27-
2818/// Receives measurements to be aggregated.
2919pub ( crate ) trait Measure < T > : Send + Sync + ' static {
3020 fn call ( & self , measurement : T , attrs : & [ KeyValue ] ) ;
@@ -133,14 +123,22 @@ pub(crate) struct AggregateBuilder<T> {
133123 /// measurements.
134124 filter : AttributeSetFilter ,
135125
126+ /// Cardinality limit for the metric stream
127+ cardinality_limit : usize ,
128+
136129 _marker : marker:: PhantomData < T > ,
137130}
138131
139132impl < T : Number > AggregateBuilder < T > {
140- pub ( crate ) fn new ( temporality : Temporality , filter : Option < Filter > ) -> Self {
133+ pub ( crate ) fn new (
134+ temporality : Temporality ,
135+ filter : Option < Filter > ,
136+ cardinality_limit : usize ,
137+ ) -> Self {
141138 AggregateBuilder {
142139 temporality,
143140 filter : AttributeSetFilter :: new ( filter) ,
141+ cardinality_limit,
144142 _marker : marker:: PhantomData ,
145143 }
146144 }
@@ -150,18 +148,31 @@ impl<T: Number> AggregateBuilder<T> {
150148 LastValue :: new (
151149 overwrite_temporality. unwrap_or ( self . temporality ) ,
152150 self . filter . clone ( ) ,
151+ self . cardinality_limit ,
153152 )
154153 . into ( )
155154 }
156155
157156 /// Builds a precomputed sum aggregate function input and output.
158157 pub ( crate ) fn precomputed_sum ( & self , monotonic : bool ) -> AggregateFns < T > {
159- PrecomputedSum :: new ( self . temporality , self . filter . clone ( ) , monotonic) . into ( )
158+ PrecomputedSum :: new (
159+ self . temporality ,
160+ self . filter . clone ( ) ,
161+ monotonic,
162+ self . cardinality_limit ,
163+ )
164+ . into ( )
160165 }
161166
162167 /// Builds a sum aggregate function input and output.
163168 pub ( crate ) fn sum ( & self , monotonic : bool ) -> AggregateFns < T > {
164- Sum :: new ( self . temporality , self . filter . clone ( ) , monotonic) . into ( )
169+ Sum :: new (
170+ self . temporality ,
171+ self . filter . clone ( ) ,
172+ monotonic,
173+ self . cardinality_limit ,
174+ )
175+ . into ( )
165176 }
166177
167178 /// Builds a histogram aggregate function input and output.
@@ -177,6 +188,7 @@ impl<T: Number> AggregateBuilder<T> {
177188 boundaries,
178189 record_min_max,
179190 record_sum,
191+ self . cardinality_limit ,
180192 )
181193 . into ( )
182194 }
@@ -196,6 +208,7 @@ impl<T: Number> AggregateBuilder<T> {
196208 max_scale,
197209 record_min_max,
198210 record_sum,
211+ self . cardinality_limit ,
199212 )
200213 . into ( )
201214 }
@@ -211,10 +224,13 @@ mod tests {
211224
212225 use super :: * ;
213226
227+ const CARDINALITY_LIMIT_DEFAULT : usize = 2000 ;
228+
214229 #[ test]
215230 fn last_value_aggregation ( ) {
216231 let AggregateFns { measure, collect } =
217- AggregateBuilder :: < u64 > :: new ( Temporality :: Cumulative , None ) . last_value ( None ) ;
232+ AggregateBuilder :: < u64 > :: new ( Temporality :: Cumulative , None , CARDINALITY_LIMIT_DEFAULT )
233+ . last_value ( None ) ;
218234 let mut a = MetricData :: Gauge ( Gauge {
219235 data_points : vec ! [ GaugeDataPoint {
220236 attributes: vec![ KeyValue :: new( "a" , 1 ) ] ,
@@ -244,7 +260,8 @@ mod tests {
244260 fn precomputed_sum_aggregation ( ) {
245261 for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
246262 let AggregateFns { measure, collect } =
247- AggregateBuilder :: < u64 > :: new ( temporality, None ) . precomputed_sum ( true ) ;
263+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
264+ . precomputed_sum ( true ) ;
248265 let mut a = MetricData :: Sum ( Sum {
249266 data_points : vec ! [
250267 SumDataPoint {
@@ -290,7 +307,8 @@ mod tests {
290307 fn sum_aggregation ( ) {
291308 for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
292309 let AggregateFns { measure, collect } =
293- AggregateBuilder :: < u64 > :: new ( temporality, None ) . sum ( true ) ;
310+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
311+ . sum ( true ) ;
294312 let mut a = MetricData :: Sum ( Sum {
295313 data_points : vec ! [
296314 SumDataPoint {
@@ -335,8 +353,9 @@ mod tests {
335353 #[ test]
336354 fn explicit_bucket_histogram_aggregation ( ) {
337355 for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
338- let AggregateFns { measure, collect } = AggregateBuilder :: < u64 > :: new ( temporality, None )
339- . explicit_bucket_histogram ( vec ! [ 1.0 ] , true , true ) ;
356+ let AggregateFns { measure, collect } =
357+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
358+ . explicit_bucket_histogram ( vec ! [ 1.0 ] , true , true ) ;
340359 let mut a = MetricData :: Histogram ( Histogram {
341360 data_points : vec ! [ HistogramDataPoint {
342361 attributes: vec![ KeyValue :: new( "a1" , 1 ) ] ,
@@ -382,8 +401,9 @@ mod tests {
382401 #[ test]
383402 fn exponential_histogram_aggregation ( ) {
384403 for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
385- let AggregateFns { measure, collect } = AggregateBuilder :: < u64 > :: new ( temporality, None )
386- . exponential_bucket_histogram ( 4 , 20 , true , true ) ;
404+ let AggregateFns { measure, collect } =
405+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
406+ . exponential_bucket_histogram ( 4 , 20 , true , true ) ;
387407 let mut a = MetricData :: ExponentialHistogram ( ExponentialHistogram {
388408 data_points : vec ! [ ExponentialHistogramDataPoint {
389409 attributes: vec![ KeyValue :: new( "a1" , 1 ) ] ,
0 commit comments