@@ -259,6 +259,146 @@ fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
259259 Ok ( ( ) )
260260}
261261
262+ /// A builder for creating Stream objects.
263+ ///
264+ /// # Example
265+ ///
266+ /// ```
267+ /// use opentelemetry_sdk::metrics::{Aggregation, Stream};
268+ /// use opentelemetry::Key;
269+ ///
270+ /// let stream = Stream::builder()
271+ /// .with_name("my_stream")
272+ /// .with_aggregation(Aggregation::Sum)
273+ /// .with_cardinality_limit(100)
274+ /// .build()
275+ /// .unwrap();
276+ /// ```
277+ #[ derive( Default , Debug ) ]
278+ #[ non_exhaustive]
279+ pub struct StreamBuilder {
280+ name : Option < Cow < ' static , str > > ,
281+ description : Option < Cow < ' static , str > > ,
282+ unit : Option < Cow < ' static , str > > ,
283+ aggregation : Option < Aggregation > ,
284+ allowed_attribute_keys : Option < Arc < HashSet < Key > > > ,
285+ cardinality_limit : Option < usize > ,
286+ }
287+
288+ impl StreamBuilder {
289+ /// Create a new stream builder with default values.
290+ pub ( crate ) fn new ( ) -> Self {
291+ StreamBuilder :: default ( )
292+ }
293+
294+ /// Set the stream name. If this is not set, name provide while creating the instrument will be used.
295+ pub fn with_name ( mut self , name : impl Into < Cow < ' static , str > > ) -> Self {
296+ self . name = Some ( name. into ( ) ) ;
297+ self
298+ }
299+
300+ /// Set the stream description. If this is not set, description provided while creating the instrument will be used.
301+ pub fn with_description ( mut self , description : impl Into < Cow < ' static , str > > ) -> Self {
302+ self . description = Some ( description. into ( ) ) ;
303+ self
304+ }
305+
306+ /// Set the stream unit. If this is not set, unit provided while creating the instrument will be used.
307+ pub fn with_unit ( mut self , unit : impl Into < Cow < ' static , str > > ) -> Self {
308+ self . unit = Some ( unit. into ( ) ) ;
309+ self
310+ }
311+
312+ #[ cfg( feature = "spec_unstable_metrics_views" ) ]
313+ /// Set the stream aggregation. This is used to customize the aggregation.
314+ /// If not set, the default aggregation based on the instrument kind will be used.
315+ pub fn with_aggregation ( mut self , aggregation : Aggregation ) -> Self {
316+ self . aggregation = Some ( aggregation) ;
317+ self
318+ }
319+
320+ #[ cfg( feature = "spec_unstable_metrics_views" ) ]
321+ /// Set the stream allowed attribute keys.
322+ ///
323+ /// Any attribute recorded for the stream with a key not in this set will be
324+ /// dropped. If the set is empty, all attributes will be dropped, if `None` all
325+ /// attributes will be kept.
326+ pub fn with_allowed_attribute_keys (
327+ mut self ,
328+ attribute_keys : impl IntoIterator < Item = Key > ,
329+ ) -> Self {
330+ self . allowed_attribute_keys = Some ( Arc :: new ( attribute_keys. into_iter ( ) . collect ( ) ) ) ;
331+ self
332+ }
333+
334+ /// Set the stream cardinality limit. If this is not set, the default limit of 2000 will be used.
335+ pub fn with_cardinality_limit ( mut self , limit : usize ) -> Self {
336+ self . cardinality_limit = Some ( limit) ;
337+ self
338+ }
339+
340+ /// Build a new Stream instance using the configuration in this builder.
341+ ///
342+ /// # Returns
343+ ///
344+ /// A Result containing the new Stream instance or an error if the build failed.
345+ pub fn build ( self ) -> Result < Stream , Box < dyn Error > > {
346+ // TODO: Add same validation as already done while
347+ // creating instruments. It is better to move validation logic
348+ // to a common helper and call it from both places.
349+ // The current implementations does a basic validation
350+ // only to close the overall API design.
351+
352+ // if name is provided, it must not be empty
353+ if let Some ( name) = & self . name {
354+ if name. is_empty ( ) {
355+ return Err ( "Stream name must not be empty" . into ( ) ) ;
356+ }
357+ }
358+
359+ // if cardinality limit is provided, it must be greater than 0
360+ if let Some ( limit) = self . cardinality_limit {
361+ if limit == 0 {
362+ return Err ( "Cardinality limit must be greater than 0" . into ( ) ) ;
363+ }
364+ }
365+
366+ // If the aggregation is set to ExplicitBucketHistogram, validate the bucket boundaries.
367+ if let Some ( Aggregation :: ExplicitBucketHistogram { boundaries, .. } ) = & self . aggregation {
368+ validate_bucket_boundaries ( boundaries) ?;
369+ }
370+
371+ Ok ( Stream {
372+ name : self . name ,
373+ description : self . description ,
374+ unit : self . unit ,
375+ aggregation : self . aggregation ,
376+ allowed_attribute_keys : self . allowed_attribute_keys ,
377+ cardinality_limit : self . cardinality_limit ,
378+ } )
379+ }
380+ }
381+
382+ fn validate_bucket_boundaries ( boundaries : & [ f64 ] ) -> Result < ( ) , String > {
383+ // Validate boundaries do not contain f64::NAN, f64::INFINITY, or f64::NEG_INFINITY
384+ for boundary in boundaries {
385+ if boundary. is_nan ( ) || boundary. is_infinite ( ) {
386+ return Err (
387+ "Bucket boundaries must not contain NaN, Infinity, or -Infinity" . to_string ( ) ,
388+ ) ;
389+ }
390+ }
391+
392+ // validate that buckets are sorted and non-duplicate
393+ for i in 1 ..boundaries. len ( ) {
394+ if boundaries[ i] <= boundaries[ i - 1 ] {
395+ return Err ( "Bucket boundaries must be sorted and non-duplicate" . to_string ( ) ) ;
396+ }
397+ }
398+
399+ Ok ( ( ) )
400+ }
401+
262402/// Describes the stream of data an instrument produces.
263403///
264404/// # Example
0 commit comments