1- use std:: { borrow:: Cow , collections:: HashSet , sync:: Arc } ;
1+ use std:: { borrow:: Cow , collections:: HashSet , error :: Error , sync:: Arc } ;
22
33use opentelemetry:: {
44 metrics:: { AsyncInstrument , SyncInstrument } ,
@@ -73,10 +73,13 @@ impl InstrumentKind {
7373/// Instruments can be used as criteria for views.
7474///
7575/// ```
76- /// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
76+ /// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream, StreamBuilder };
7777///
7878/// let criteria = Instrument::new().name("counter_*");
79- /// let mask = Stream::new().aggregation(Aggregation::Sum);
79+ /// let mask = Stream::builder()
80+ /// .with_aggregation(Aggregation::Sum)
81+ /// .build()
82+ /// .unwrap();
8083///
8184/// let view = new_view(criteria, mask);
8285/// # drop(view);
@@ -169,71 +172,60 @@ impl Instrument {
169172 }
170173}
171174
172- /// Describes the stream of data an instrument produces .
175+ /// A builder for creating Stream objects .
173176///
174177/// # Example
175178///
176- /// Streams can be used as masks in views.
177- ///
178179/// ```
179- /// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
180- ///
181- /// let criteria = Instrument::new().name("counter_*");
182- /// let mask = Stream::new().aggregation(Aggregation::Sum);
180+ /// use opentelemetry_sdk::metrics::{Aggregation, Stream};
181+ /// use opentelemetry::Key;
183182///
184- /// let view = new_view(criteria, mask);
185- /// # drop(view);
183+ /// let stream = Stream::builder()
184+ /// .with_name("my_stream")
185+ /// .with_aggregation(Aggregation::Sum)
186+ /// .with_cardinality_limit(100)
187+ /// .build()
188+ /// .unwrap();
186189/// ```
187190#[ derive( Default , Debug ) ]
188191#[ non_exhaustive]
189- #[ allow( unreachable_pub) ]
190- pub struct Stream {
191- /// The human-readable identifier of the stream.
192- pub ( crate ) name : Option < Cow < ' static , str > > ,
193- /// Describes the purpose of the data.
194- pub ( crate ) description : Option < Cow < ' static , str > > ,
195- /// the unit of measurement recorded.
196- pub ( crate ) unit : Option < Cow < ' static , str > > ,
197- /// Aggregation the stream uses for an instrument.
198- pub ( crate ) aggregation : Option < Aggregation > ,
199- /// An allow-list of attribute keys that will be preserved for the stream.
200- ///
201- /// Any attribute recorded for the stream with a key not in this set will be
202- /// dropped. If the set is empty, all attributes will be dropped, if `None` all
203- /// attributes will be kept.
204- pub ( crate ) allowed_attribute_keys : Option < Arc < HashSet < Key > > > ,
205-
206- /// Cardinality limit for the stream.
207- pub ( crate ) cardinality_limit : Option < usize > ,
192+ pub struct StreamBuilder {
193+ name : Option < Cow < ' static , str > > ,
194+ description : Option < Cow < ' static , str > > ,
195+ unit : Option < Cow < ' static , str > > ,
196+ aggregation : Option < Aggregation > ,
197+ allowed_attribute_keys : Option < Arc < HashSet < Key > > > ,
198+ cardinality_limit : Option < usize > ,
208199}
209200
210- impl Stream {
211- /// Create a new stream with empty values.
212- pub fn new ( ) -> Self {
213- Stream :: default ( )
201+ impl StreamBuilder {
202+ /// Create a new stream builder with default values.
203+ pub ( crate ) fn new ( ) -> Self {
204+ StreamBuilder :: default ( )
214205 }
215206
216- /// Set the stream name.
217- pub fn name ( mut self , name : impl Into < Cow < ' static , str > > ) -> Self {
207+ /// Set the stream name. If this is not set, name provide while creating the instrument will be used.
208+ pub fn with_name ( mut self , name : impl Into < Cow < ' static , str > > ) -> Self {
218209 self . name = Some ( name. into ( ) ) ;
219210 self
220211 }
221212
222- /// Set the stream description.
223- pub fn description ( mut self , description : impl Into < Cow < ' static , str > > ) -> Self {
213+ /// Set the stream description. If this is not set, description provided while creating the instrument will be used.
214+ pub fn with_description ( mut self , description : impl Into < Cow < ' static , str > > ) -> Self {
224215 self . description = Some ( description. into ( ) ) ;
225216 self
226217 }
227218
228- /// Set the stream unit.
229- pub fn unit ( mut self , unit : impl Into < Cow < ' static , str > > ) -> Self {
219+ /// Set the stream unit. If this is not set, unit provided while creating the instrument will be used.
220+ pub fn with_unit ( mut self , unit : impl Into < Cow < ' static , str > > ) -> Self {
230221 self . unit = Some ( unit. into ( ) ) ;
231222 self
232223 }
233224
234225 #[ cfg( feature = "spec_unstable_metrics_views" ) ]
235- /// Set the stream aggregation.
236- pub fn aggregation ( mut self , aggregation : Aggregation ) -> Self {
226+ /// Set the stream aggregation. This is used to customize the aggregation.
227+ /// If not set, the default aggregation based on the instrument kind will be used.
228+ pub fn with_aggregation ( mut self , aggregation : Aggregation ) -> Self {
237229 self . aggregation = Some ( aggregation) ;
238230 self
239231 }
@@ -242,18 +234,130 @@ impl Stream {
242234 /// Set the stream allowed attribute keys.
243235 ///
244236 /// Any attribute recorded for the stream with a key not in this set will be
245- /// dropped. If this set is empty all attributes will be dropped.
246- pub fn allowed_attribute_keys ( mut self , attribute_keys : impl IntoIterator < Item = Key > ) -> Self {
237+ /// dropped. If the set is empty, all attributes will be dropped, if `None` all
238+ /// attributes will be kept.
239+ pub fn with_allowed_attribute_keys (
240+ mut self ,
241+ attribute_keys : impl IntoIterator < Item = Key > ,
242+ ) -> Self {
247243 self . allowed_attribute_keys = Some ( Arc :: new ( attribute_keys. into_iter ( ) . collect ( ) ) ) ;
248-
249244 self
250245 }
251246
252- /// Set the stream cardinality limit.
253- pub fn cardinality_limit ( mut self , limit : usize ) -> Self {
247+ /// Set the stream cardinality limit. If this is not set, the default limit of 2000 will be used.
248+ pub fn with_cardinality_limit ( mut self , limit : usize ) -> Self {
254249 self . cardinality_limit = Some ( limit) ;
255250 self
256251 }
252+
253+ /// Build a new Stream instance using the configuration in this builder.
254+ ///
255+ /// # Returns
256+ ///
257+ /// A Result containing the new Stream instance or an error if the build failed.
258+ pub fn build ( self ) -> Result < Stream , Box < dyn Error > > {
259+ // TODO: Add same validation as already done while
260+ // creating instruments. It is better to move validation logic
261+ // to a common helper and call it from both places.
262+ // The current implementations does a basic validation
263+ // only to close the overall API design.
264+
265+ // if name is provided, it must not be empty
266+ if let Some ( name) = & self . name {
267+ if name. is_empty ( ) {
268+ return Err ( "Stream name must not be empty" . into ( ) ) ;
269+ }
270+ }
271+
272+ // if cardinality limit is provided, it must be greater than 0
273+ if let Some ( limit) = self . cardinality_limit {
274+ if limit == 0 {
275+ return Err ( "Cardinality limit must be greater than 0" . into ( ) ) ;
276+ }
277+ }
278+
279+ // If the aggregation is set to ExplicitBucketHistogram, validate the bucket boundaries.
280+ if let Some ( Aggregation :: ExplicitBucketHistogram { boundaries, .. } ) = & self . aggregation {
281+ validate_bucket_boundaries ( boundaries) ?;
282+ }
283+
284+ Ok ( Stream {
285+ name : self . name ,
286+ description : self . description ,
287+ unit : self . unit ,
288+ aggregation : self . aggregation ,
289+ allowed_attribute_keys : self . allowed_attribute_keys ,
290+ cardinality_limit : self . cardinality_limit ,
291+ } )
292+ }
293+ }
294+
295+ fn validate_bucket_boundaries ( boundaries : & [ f64 ] ) -> Result < ( ) , String > {
296+ // Validate boundaries do not contain f64::NAN, f64::INFINITY, or f64::NEG_INFINITY
297+ for boundary in boundaries {
298+ if boundary. is_nan ( ) || boundary. is_infinite ( ) {
299+ return Err (
300+ "Bucket boundaries must not contain NaN, Infinity, or -Infinity" . to_string ( ) ,
301+ ) ;
302+ }
303+ }
304+
305+ // validate that buckets are sorted and non-duplicate
306+ for i in 1 ..boundaries. len ( ) {
307+ if boundaries[ i] <= boundaries[ i - 1 ] {
308+ return Err ( "Bucket boundaries must be sorted and non-duplicate" . to_string ( ) ) ;
309+ }
310+ }
311+
312+ Ok ( ( ) )
313+ }
314+
315+ /// Describes the stream of data an instrument produces.
316+ ///
317+ /// # Example
318+ ///
319+ /// Streams can be used as masks in views.
320+ ///
321+ /// ```
322+ /// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
323+ ///
324+ /// let criteria = Instrument::new().name("counter_*");
325+ /// let mask = Stream::builder()
326+ /// .with_aggregation(Aggregation::Sum)
327+ /// .build()
328+ /// .unwrap();
329+ ///
330+ /// let view = new_view(criteria, mask);
331+ /// # drop(view);
332+ /// ```
333+ #[ derive( Default , Debug ) ]
334+ #[ non_exhaustive]
335+ #[ allow( unreachable_pub) ]
336+ pub struct Stream {
337+ /// The human-readable identifier of the stream.
338+ pub ( crate ) name : Option < Cow < ' static , str > > ,
339+ /// Describes the purpose of the data.
340+ pub ( crate ) description : Option < Cow < ' static , str > > ,
341+ /// the unit of measurement recorded.
342+ pub ( crate ) unit : Option < Cow < ' static , str > > ,
343+ /// Aggregation the stream uses for an instrument.
344+ pub ( crate ) aggregation : Option < Aggregation > ,
345+ /// An allow-list of attribute keys that will be preserved for the stream.
346+ ///
347+ /// Any attribute recorded for the stream with a key not in this set will be
348+ /// dropped. If the set is empty, all attributes will be dropped, if `None` all
349+ /// attributes will be kept.
350+ pub ( crate ) allowed_attribute_keys : Option < Arc < HashSet < Key > > > ,
351+
352+ /// Cardinality limit for the stream.
353+ pub ( crate ) cardinality_limit : Option < usize > ,
354+ }
355+
356+ impl Stream {
357+ /// Create a new stream builder with default values.
358+ pub fn builder ( ) -> StreamBuilder {
359+ StreamBuilder :: new ( )
360+ }
257361}
258362
259363/// The identifying properties of an instrument.
0 commit comments