@@ -54,6 +54,10 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
5454pub ( crate ) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE : & str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" ;
5555/// Default maximum batch size.
5656pub ( crate ) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT : usize = 512 ;
57+ /// Maximum force flush timeout.
58+ pub ( crate ) const OTEL_BLRP_FORCEFLUSH_TIMEOUT : & str = "OTEL_BLRP_FORCEFLUSH_TIMEOUT" ;
59+ /// Default maximum force flush timeout.
60+ pub ( crate ) const OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT : Duration = Duration :: from_millis ( 5_000 ) ;
5761
5862/// Messages sent between application thread and batch log processor's work thread.
5963#[ allow( clippy:: large_enum_variant) ]
@@ -333,6 +337,7 @@ impl BatchLogProcessor {
333337 let ( message_sender, message_receiver) = mpsc:: sync_channel :: < BatchMessage > ( 64 ) ; // Is this a reasonable bound?
334338 let max_queue_size = config. max_queue_size ;
335339 let max_export_batch_size = config. max_export_batch_size ;
340+ let forceflush_timeout = config. forceflush_timeout ;
336341 let current_batch_size = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
337342 let current_batch_size_for_thread = current_batch_size. clone ( ) ;
338343
@@ -485,7 +490,7 @@ impl BatchLogProcessor {
485490 logs_sender,
486491 message_sender,
487492 handle : Mutex :: new ( Some ( handle) ) ,
488- forceflush_timeout : Duration :: from_secs ( 5 ) , // TODO: make this configurable
493+ forceflush_timeout,
489494 dropped_logs_count : AtomicUsize :: new ( 0 ) ,
490495 max_queue_size,
491496 export_log_message_sent : Arc :: new ( AtomicBool :: new ( false ) ) ,
@@ -582,6 +587,9 @@ pub struct BatchConfig {
582587 /// is 512.
583588 pub ( crate ) max_export_batch_size : usize ,
584589
590+ /// The maximum duration to wait for a force flush to complete. The default value is 5 seconds.
591+ pub ( crate ) forceflush_timeout : Duration ,
592+
585593 /// The maximum duration to export a batch of data.
586594 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
587595 pub ( crate ) max_export_timeout : Duration ,
@@ -599,6 +607,7 @@ pub struct BatchConfigBuilder {
599607 max_queue_size : usize ,
600608 scheduled_delay : Duration ,
601609 max_export_batch_size : usize ,
610+ forceflush_timeout : Duration ,
602611 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
603612 max_export_timeout : Duration ,
604613}
@@ -618,6 +627,7 @@ impl Default for BatchConfigBuilder {
618627 max_queue_size : OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT ,
619628 scheduled_delay : OTEL_BLRP_SCHEDULE_DELAY_DEFAULT ,
620629 max_export_batch_size : OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT ,
630+ forceflush_timeout : OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT ,
621631 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
622632 max_export_timeout : OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT ,
623633 }
@@ -678,6 +688,18 @@ impl BatchConfigBuilder {
678688 self
679689 }
680690
691+ /// Set forceflush_timeout for [`BatchConfigBuilder`].
692+ /// It's the maximum duration to wait for a force flush to complete.
693+ /// The default value is 5 seconds.
694+ ///
695+ /// Corresponding environment variable: `OTEL_BLRP_FORCEFLUSH_TIMEOUT`.
696+ ///
697+ /// Note: Programmatically setting this will override any value set via the environment variable.
698+ pub fn with_forceflush_timeout ( mut self , forceflush_timeout : Duration ) -> Self {
699+ self . forceflush_timeout = forceflush_timeout;
700+ self
701+ }
702+
681703 /// Builds a `BatchConfig` enforcing the following invariants:
682704 /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
683705 pub fn build ( self ) -> BatchConfig {
@@ -691,6 +713,7 @@ impl BatchConfigBuilder {
691713 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
692714 max_export_timeout : self . max_export_timeout ,
693715 max_export_batch_size,
716+ forceflush_timeout : self . forceflush_timeout ,
694717 }
695718 }
696719
@@ -716,6 +739,13 @@ impl BatchConfigBuilder {
716739 self . scheduled_delay = Duration :: from_millis ( scheduled_delay) ;
717740 }
718741
742+ if let Some ( forceflush_timeout) = env:: var ( OTEL_BLRP_FORCEFLUSH_TIMEOUT )
743+ . ok ( )
744+ . and_then ( |timeout| u64:: from_str ( & timeout) . ok ( ) )
745+ {
746+ self . forceflush_timeout = Duration :: from_millis ( forceflush_timeout) ;
747+ }
748+
719749 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
720750 if let Some ( max_export_timeout) = env:: var ( OTEL_BLRP_EXPORT_TIMEOUT )
721751 . ok ( )
@@ -731,7 +761,8 @@ impl BatchConfigBuilder {
731761#[ cfg( all( test, feature = "testing" , feature = "logs" ) ) ]
732762mod tests {
733763 use super :: {
734- BatchConfig , BatchConfigBuilder , BatchLogProcessor , OTEL_BLRP_MAX_EXPORT_BATCH_SIZE ,
764+ BatchConfig , BatchConfigBuilder , BatchLogProcessor , OTEL_BLRP_FORCEFLUSH_TIMEOUT ,
765+ OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT , OTEL_BLRP_MAX_EXPORT_BATCH_SIZE ,
735766 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT , OTEL_BLRP_MAX_QUEUE_SIZE ,
736767 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT , OTEL_BLRP_SCHEDULE_DELAY ,
737768 OTEL_BLRP_SCHEDULE_DELAY_DEFAULT ,
@@ -764,6 +795,8 @@ mod tests {
764795 "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
765796 ) ;
766797 assert_eq ! ( OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT , 512 ) ;
798+ assert_eq ! ( OTEL_BLRP_FORCEFLUSH_TIMEOUT , "OTEL_BLRP_FORCEFLUSH_TIMEOUT" ) ;
799+ assert_eq ! ( OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT . as_millis( ) , 5_000 ) ;
767800 }
768801
769802 #[ test]
@@ -775,6 +808,7 @@ mod tests {
775808 OTEL_BLRP_EXPORT_TIMEOUT ,
776809 OTEL_BLRP_MAX_QUEUE_SIZE ,
777810 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE ,
811+ OTEL_BLRP_FORCEFLUSH_TIMEOUT ,
778812 ] ;
779813
780814 let config = temp_env:: with_vars_unset ( env_vars, BatchConfig :: default) ;
@@ -795,18 +829,21 @@ mod tests {
795829 ( OTEL_BLRP_SCHEDULE_DELAY , Some ( "2000" ) ) ,
796830 ( OTEL_BLRP_MAX_QUEUE_SIZE , Some ( "4096" ) ) ,
797831 ( OTEL_BLRP_MAX_EXPORT_BATCH_SIZE , Some ( "1024" ) ) ,
832+ ( OTEL_BLRP_FORCEFLUSH_TIMEOUT , Some ( "10000" ) ) ,
798833 ] ;
799834
800835 temp_env:: with_vars ( env_vars, || {
801836 let config = BatchConfigBuilder :: default ( )
802837 . with_max_queue_size ( 2048 )
803838 . with_scheduled_delay ( Duration :: from_millis ( 1000 ) )
804839 . with_max_export_batch_size ( 512 )
840+ . with_forceflush_timeout ( Duration :: from_millis ( 20000 ) )
805841 . build ( ) ;
806842
807843 assert_eq ! ( config. scheduled_delay, Duration :: from_millis( 1000 ) ) ;
808844 assert_eq ! ( config. max_queue_size, 2048 ) ;
809845 assert_eq ! ( config. max_export_batch_size, 512 ) ;
846+ assert_eq ! ( config. forceflush_timeout, Duration :: from_millis( 20000 ) ) ;
810847 } ) ;
811848 }
812849
@@ -818,6 +855,7 @@ mod tests {
818855 ( OTEL_BLRP_EXPORT_TIMEOUT , Some ( "60000" ) ) ,
819856 ( OTEL_BLRP_MAX_QUEUE_SIZE , Some ( "4096" ) ) ,
820857 ( OTEL_BLRP_MAX_EXPORT_BATCH_SIZE , Some ( "1024" ) ) ,
858+ ( OTEL_BLRP_FORCEFLUSH_TIMEOUT , Some ( "10000" ) ) ,
821859 ] ;
822860
823861 let config = temp_env:: with_vars ( env_vars, BatchConfig :: default) ;
@@ -827,6 +865,7 @@ mod tests {
827865 assert_eq ! ( config. max_export_timeout, Duration :: from_millis( 60000 ) ) ;
828866 assert_eq ! ( config. max_queue_size, 4096 ) ;
829867 assert_eq ! ( config. max_export_batch_size, 1024 ) ;
868+ assert_eq ! ( config. forceflush_timeout, Duration :: from_millis( 10000 ) ) ;
830869 }
831870
832871 #[ test]
@@ -850,7 +889,8 @@ mod tests {
850889 let batch_builder = BatchConfigBuilder :: default ( )
851890 . with_max_export_batch_size ( 1 )
852891 . with_scheduled_delay ( Duration :: from_millis ( 2 ) )
853- . with_max_queue_size ( 4 ) ;
892+ . with_max_queue_size ( 4 )
893+ . with_forceflush_timeout ( Duration :: from_millis ( 15 ) ) ;
854894
855895 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
856896 let batch_builder = batch_builder. with_max_export_timeout ( Duration :: from_millis ( 3 ) ) ;
@@ -861,6 +901,7 @@ mod tests {
861901 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
862902 assert_eq ! ( batch. max_export_timeout, Duration :: from_millis( 3 ) ) ;
863903 assert_eq ! ( batch. max_queue_size, 4 ) ;
904+ assert_eq ! ( batch. forceflush_timeout, Duration :: from_millis( 15 ) ) ;
864905 }
865906
866907 #[ test]
@@ -870,6 +911,7 @@ mod tests {
870911 ( OTEL_BLRP_SCHEDULE_DELAY , Some ( "I am not number" ) ) ,
871912 #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
872913 ( OTEL_BLRP_EXPORT_TIMEOUT , Some ( "2046" ) ) ,
914+ ( OTEL_BLRP_FORCEFLUSH_TIMEOUT , Some ( "5000" ) ) ,
873915 ] ;
874916 temp_env:: with_vars ( env_vars. clone ( ) , || {
875917 let builder = BatchLogProcessor :: builder ( InMemoryLogExporter :: default ( ) ) ;
@@ -889,6 +931,10 @@ mod tests {
889931 builder. config. max_export_timeout,
890932 Duration :: from_millis( 2046 )
891933 ) ;
934+ assert_eq ! (
935+ builder. config. forceflush_timeout,
936+ OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT
937+ ) ;
892938 } ) ;
893939
894940 env_vars. push ( ( OTEL_BLRP_MAX_QUEUE_SIZE , Some ( "120" ) ) ) ;
@@ -897,6 +943,10 @@ mod tests {
897943 let builder = BatchLogProcessor :: builder ( InMemoryLogExporter :: default ( ) ) ;
898944 assert_eq ! ( builder. config. max_export_batch_size, 120 ) ;
899945 assert_eq ! ( builder. config. max_queue_size, 120 ) ;
946+ assert_eq ! (
947+ builder. config. forceflush_timeout,
948+ OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT
949+ ) ;
900950 } ) ;
901951 }
902952
@@ -906,6 +956,7 @@ mod tests {
906956 . with_max_export_batch_size ( 1 )
907957 . with_scheduled_delay ( Duration :: from_millis ( 2 ) )
908958 . with_max_queue_size ( 4 )
959+ . with_forceflush_timeout ( Duration :: from_millis ( 15 ) )
909960 . build ( ) ;
910961
911962 let builder =
@@ -915,6 +966,7 @@ mod tests {
915966 assert_eq ! ( actual. max_export_batch_size, 1 ) ;
916967 assert_eq ! ( actual. scheduled_delay, Duration :: from_millis( 2 ) ) ;
917968 assert_eq ! ( actual. max_queue_size, 4 ) ;
969+ assert_eq ! ( actual. forceflush_timeout, Duration :: from_millis( 15 ) ) ;
918970 }
919971
920972 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
0 commit comments