@@ -612,30 +612,28 @@ pub async fn put_stream_hot_tier(
612
612
status : StatusCode :: BAD_REQUEST ,
613
613
} ) ;
614
614
}
615
- if CONFIG . options . hot_tier_storage_path . is_none ( ) {
616
- return Err ( StreamError :: HotTierNotEnabled ( stream_name) ) ;
617
- }
618
615
619
616
validator:: hot_tier ( & hottier. size . to_string ( ) ) ?;
620
617
621
618
STREAM_INFO . set_hot_tier ( & stream_name, true ) ?;
622
- if let Some ( hot_tier_manager) = HotTierManager :: global ( ) {
623
- let existing_hot_tier_used_size = hot_tier_manager
624
- . validate_hot_tier_size ( & stream_name, hottier. size )
625
- . await ?;
626
- hottier. used_size = existing_hot_tier_used_size;
627
- hottier. available_size = hottier. size ;
628
- hottier. version = Some ( CURRENT_HOT_TIER_VERSION . to_string ( ) ) ;
629
- hot_tier_manager
630
- . put_hot_tier ( & stream_name, & mut hottier)
631
- . await ?;
632
- let storage = CONFIG . storage ( ) . get_object_store ( ) ;
633
- let mut stream_metadata = storage. get_object_store_format ( & stream_name) . await ?;
634
- stream_metadata. hot_tier_enabled = true ;
635
- storage
636
- . put_stream_manifest ( & stream_name, & stream_metadata)
637
- . await ?;
638
- }
619
+ let Some ( hot_tier_manager) = HotTierManager :: global ( ) else {
620
+ return Err ( StreamError :: HotTierNotEnabled ( stream_name) ) ;
621
+ } ;
622
+ let existing_hot_tier_used_size = hot_tier_manager
623
+ . validate_hot_tier_size ( & stream_name, hottier. size )
624
+ . await ?;
625
+ hottier. used_size = existing_hot_tier_used_size;
626
+ hottier. available_size = hottier. size ;
627
+ hottier. version = Some ( CURRENT_HOT_TIER_VERSION . to_string ( ) ) ;
628
+ hot_tier_manager
629
+ . put_hot_tier ( & stream_name, & mut hottier)
630
+ . await ?;
631
+ let storage = CONFIG . storage ( ) . get_object_store ( ) ;
632
+ let mut stream_metadata = storage. get_object_store_format ( & stream_name) . await ?;
633
+ stream_metadata. hot_tier_enabled = true ;
634
+ storage
635
+ . put_stream_manifest ( & stream_name, & stream_metadata)
636
+ . await ?;
639
637
640
638
Ok ( (
641
639
format ! ( "hot tier set for stream {stream_name}" ) ,
@@ -660,34 +658,27 @@ pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Respo
660
658
}
661
659
}
662
660
663
- if CONFIG . options . hot_tier_storage_path . is_none ( ) {
661
+ let Some ( hot_tier_manager ) = HotTierManager :: global ( ) else {
664
662
return Err ( StreamError :: HotTierNotEnabled ( stream_name) ) ;
665
- }
666
-
667
- if let Some ( hot_tier_manager) = HotTierManager :: global ( ) {
668
- let StreamHotTier {
669
- version,
670
- size,
671
- used_size,
672
- available_size,
673
- oldest_date_time_entry,
674
- } = hot_tier_manager. get_hot_tier ( & stream_name) . await ?;
675
- let mut json = json ! ( {
676
- "version" : version,
677
- "size" : format!( "{size} Bytes" ) ,
678
- "used_size" : format!( "{used_size} Bytes" ) ,
679
- "available_size" : format!( "{available_size} Bytes" ) ,
680
- } ) ;
681
- if let Some ( entry) = oldest_date_time_entry {
682
- json[ "oldest_date_time_entry" ] = serde_json:: Value :: String ( entry) ;
683
- }
684
- Ok ( ( web:: Json ( json) , StatusCode :: OK ) )
685
- } else {
686
- Err ( StreamError :: Custom {
687
- msg : format ! ( "hot tier not initialised for stream {}" , stream_name) ,
688
- status : ( StatusCode :: BAD_REQUEST ) ,
689
- } )
690
- }
663
+ } ;
664
+ let StreamHotTier {
665
+ version,
666
+ size,
667
+ used_size,
668
+ available_size,
669
+ oldest_date_time_entry,
670
+ } = hot_tier_manager. get_hot_tier ( & stream_name) . await ?;
671
+ let mut json = json ! ( {
672
+ "version" : version,
673
+ "size" : format!( "{size} Bytes" ) ,
674
+ "used_size" : format!( "{used_size} Bytes" ) ,
675
+ "available_size" : format!( "{available_size} Bytes" ) ,
676
+ } ) ;
677
+ if let Some ( entry) = oldest_date_time_entry {
678
+ json[ "oldest_date_time_entry" ] = serde_json:: Value :: String ( entry) ;
679
+ }
680
+
681
+ Ok ( ( web:: Json ( json) , StatusCode :: OK ) )
691
682
}
692
683
693
684
pub async fn delete_stream_hot_tier (
@@ -709,9 +700,9 @@ pub async fn delete_stream_hot_tier(
709
700
}
710
701
}
711
702
712
- if CONFIG . options . hot_tier_storage_path . is_none ( ) {
703
+ let Some ( hot_tier_manager ) = HotTierManager :: global ( ) else {
713
704
return Err ( StreamError :: HotTierNotEnabled ( stream_name) ) ;
714
- }
705
+ } ;
715
706
716
707
if STREAM_INFO . stream_type ( & stream_name) . unwrap ( ) == Some ( StreamType :: Internal . to_string ( ) ) {
717
708
return Err ( StreamError :: Custom {
@@ -720,9 +711,8 @@ pub async fn delete_stream_hot_tier(
720
711
} ) ;
721
712
}
722
713
723
- if let Some ( hot_tier_manager) = HotTierManager :: global ( ) {
724
- hot_tier_manager. delete_hot_tier ( & stream_name) . await ?;
725
- }
714
+ hot_tier_manager. delete_hot_tier ( & stream_name) . await ?;
715
+
726
716
Ok ( (
727
717
format ! ( "hot tier deleted for stream {stream_name}" ) ,
728
718
StatusCode :: OK ,
0 commit comments