1414
1515use std:: sync:: Arc ;
1616
17- use databend_common_exception:: ErrorCode ;
1817use databend_common_exception:: Result ;
1918use databend_common_meta_app:: principal:: SettingIdent ;
2019use databend_common_meta_app:: principal:: UserSetting ;
@@ -23,13 +22,9 @@ use databend_common_meta_kvapi::kvapi;
2322use databend_common_meta_kvapi:: kvapi:: Key ;
2423use databend_common_meta_types:: seq_value:: SeqV ;
2524use databend_common_meta_types:: seq_value:: SeqValue ;
26- use databend_common_meta_types:: MatchSeq ;
27- use databend_common_meta_types:: MatchSeqExt ;
2825use databend_common_meta_types:: MetaError ;
29- use databend_common_meta_types:: Operation ;
3026use databend_common_meta_types:: UpsertKV ;
31-
32- use crate :: setting:: SettingApi ;
27+ use futures:: TryStreamExt ;
3328
3429pub struct SettingMgr {
3530 kv_api : Arc < dyn kvapi:: KVApi < Error = MetaError > > ,
@@ -58,16 +53,14 @@ impl SettingMgr {
5853}
5954
6055// TODO: do not use json for setting value
61- #[ async_trait:: async_trait]
62- impl SettingApi for SettingMgr {
56+ impl SettingMgr {
6357 #[ async_backtrace:: framed]
6458 #[ fastrace:: trace]
65- async fn set_setting ( & self , setting : UserSetting ) -> Result < u64 > {
59+ pub async fn set_setting ( & self , setting : UserSetting ) -> Result < u64 > {
6660 // Upsert.
67- let seq = MatchSeq :: GE ( 0 ) ;
68- let val = Operation :: Update ( serde_json:: to_vec ( & setting) ?) ;
61+ let val = serde_json:: to_vec ( & setting) ?;
6962 let key = self . setting_key ( & setting. name ) ;
70- let upsert = self . kv_api . upsert_kv ( UpsertKV :: new ( & key, seq , val, None ) ) ;
63+ let upsert = self . kv_api . upsert_kv ( UpsertKV :: update ( & key, & val) ) ;
7164
7265 let ( _prev, curr) = upsert. await ?. unpack ( ) ;
7366 let res_seq = curr. seq ( ) ;
@@ -76,45 +69,38 @@ impl SettingApi for SettingMgr {
7669
7770 #[ async_backtrace:: framed]
7871 #[ fastrace:: trace]
79- async fn get_settings ( & self ) -> Result < Vec < UserSetting > > {
72+ pub async fn get_settings ( & self ) -> Result < Vec < UserSetting > > {
8073 let prefix = self . setting_prefix ( ) ;
81- let values = self . kv_api . prefix_list_kv ( & prefix) . await ?;
74+ let mut strm = self . kv_api . list_kv ( & prefix) . await ?;
8275
83- let mut settings = Vec :: with_capacity ( values . len ( ) ) ;
84- for ( _ , value ) in values {
85- let setting = serde_json:: from_slice :: < UserSetting > ( & value. data ) ?;
76+ let mut settings = Vec :: new ( ) ;
77+ while let Some ( item ) = strm . try_next ( ) . await ? {
78+ let setting: UserSetting = serde_json:: from_slice ( & item . value . unwrap ( ) . data ) ?;
8679 settings. push ( setting) ;
8780 }
81+
8882 Ok ( settings)
8983 }
9084
9185 #[ async_backtrace:: framed]
9286 #[ fastrace:: trace]
93- async fn get_setting ( & self , name : & str , seq : MatchSeq ) -> Result < SeqV < UserSetting > > {
87+ pub async fn get_setting ( & self , name : & str ) -> Result < Option < SeqV < UserSetting > > > {
9488 let key = self . setting_key ( name) ;
9589 let res = self . kv_api . get_kv ( & key) . await ?;
9690
97- let seq_value = res. ok_or_else ( || {
98- ErrorCode :: UnknownVariable ( format ! ( "Setting '{}' does not exist." , name ) )
99- } ) ? ;
91+ let Some ( seqv ) = res else {
92+ return Ok ( None ) ;
93+ } ;
10094
101- match seq. match_seq ( & seq_value) {
102- Ok ( _) => Ok ( seq_value. try_map ( |d| d. try_into ( ) ) ?) ,
103- Err ( _) => Err ( ErrorCode :: UnknownVariable ( format ! (
104- "Setting '{}' does not exist." ,
105- name
106- ) ) ) ,
107- }
95+ let seqv = seqv. try_map ( |d| d. try_into ( ) ) ?;
96+ Ok ( Some ( seqv) )
10897 }
10998
11099 #[ async_backtrace:: framed]
111100 #[ fastrace:: trace]
112- async fn try_drop_setting ( & self , name : & str , seq : MatchSeq ) -> Result < ( ) > {
101+ pub async fn try_drop_setting ( & self , name : & str ) -> Result < ( ) > {
113102 let key = self . setting_key ( name) ;
114- let _res = self
115- . kv_api
116- . upsert_kv ( UpsertKV :: new ( & key, seq, Operation :: Delete , None ) )
117- . await ?;
103+ let _res = self . kv_api . upsert_kv ( UpsertKV :: delete ( & key) ) . await ?;
118104
119105 Ok ( ( ) )
120106 }
0 commit comments