Skip to content

Commit ab5e31d

Browse files
authored
refactor(meta-service): simplify StateMachineApi, remove SMEventSender trait (#18368)
1 parent b7f4cd8 commit ab5e31d

File tree

7 files changed

+53
-64
lines changed

7 files changed

+53
-64
lines changed

src/meta/raft-store/src/applier/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,9 @@ where SM: StateMachineApi + 'static
130130
};
131131

132132
// Send queued change events to subscriber
133-
if let Some(sender) = self.sm.event_sender() {
134-
for event in self.changes.drain(..) {
135-
debug!("send to EventSender: {:?}", event);
136-
sender.send(event);
137-
}
133+
for event in self.changes.drain(..) {
134+
debug!("send to EventSender: {:?}", event);
135+
self.sm.on_change_applied(event);
138136
}
139137

140138
Ok(applied_state)

src/meta/raft-store/src/sm_v003/sm_v003.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::fmt::Debug;
15+
use std::fmt;
16+
use std::fmt::Formatter;
1617
use std::io;
1718

1819
use databend_common_meta_types::raft_types::Entry;
1920
use databend_common_meta_types::raft_types::StorageError;
2021
use databend_common_meta_types::snapshot_db::DB;
2122
use databend_common_meta_types::sys_data::SysData;
2223
use databend_common_meta_types::AppliedState;
24+
use databend_common_meta_types::SeqV;
2325
use log::info;
2426
use openraft::entry::RaftEntry;
2527

@@ -29,18 +31,32 @@ use crate::leveled_store::leveled_map::LeveledMap;
2931
use crate::leveled_store::sys_data_api::SysDataApiRO;
3032
use crate::sm_v003::sm_v003_kv_api::SMV003KVApi;
3133
use crate::state_machine::ExpireKey;
32-
use crate::state_machine_api::SMEventSender;
3334
use crate::state_machine_api::StateMachineApi;
3435

35-
#[derive(Debug, Default)]
36+
type OnChange = Box<dyn Fn((String, Option<SeqV>, Option<SeqV>)) + Send + Sync>;
37+
38+
#[derive(Default)]
3639
pub struct SMV003 {
3740
levels: LeveledMap,
3841

3942
/// The expiration key since which for next clean.
4043
expire_cursor: ExpireKey,
4144

42-
/// subscriber of state machine data
43-
pub(crate) subscriber: Option<Box<dyn SMEventSender>>,
45+
/// Callback when a change is applied to state machine
46+
pub(crate) on_change_applied: Option<OnChange>,
47+
}
48+
49+
impl fmt::Debug for SMV003 {
50+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
51+
f.debug_struct("SMV003")
52+
.field("levels", &self.levels)
53+
.field("expire_cursor", &self.expire_cursor)
54+
.field(
55+
"on_change_applied",
56+
&self.on_change_applied.as_ref().map(|_x| "is_set"),
57+
)
58+
.finish()
59+
}
4460
}
4561

4662
impl StateMachineApi for SMV003 {
@@ -68,8 +84,12 @@ impl StateMachineApi for SMV003 {
6884
self.levels.sys_data_mut()
6985
}
7086

71-
fn event_sender(&self) -> Option<&dyn SMEventSender> {
72-
self.subscriber.as_ref().map(|x| x.as_ref())
87+
fn on_change_applied(&mut self, change: (String, Option<SeqV>, Option<SeqV>)) {
88+
let Some(on_change_applied) = &self.on_change_applied else {
89+
// No subscribers, do nothing.
90+
return;
91+
};
92+
(*on_change_applied)(change);
7393
}
7494
}
7595

@@ -170,8 +190,8 @@ impl SMV003 {
170190
self.map_mut()
171191
}
172192

173-
pub fn set_event_sender(&mut self, subscriber: Box<dyn SMEventSender>) {
174-
self.subscriber = Some(subscriber);
193+
pub fn set_on_change_applied(&mut self, on_change_applied: OnChange) {
194+
self.on_change_applied = Some(on_change_applied);
175195
}
176196

177197
pub fn freeze_writable(&mut self) {

src/meta/raft-store/src/state_machine_api.rs

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,37 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::fmt::Debug;
16-
use std::sync::Arc;
17-
1815
use databend_common_meta_types::sys_data::SysData;
1916
use databend_common_meta_types::SeqV;
20-
use futures::future::BoxFuture;
2117
use map_api::map_api::MapApi;
2218

2319
use crate::state_machine::ExpireKey;
2420
use crate::state_machine::UserKey;
2521

26-
/// Send a key-value change event to subscribers.
27-
pub trait SMEventSender: Debug + Sync + Send {
28-
fn send(&self, change: (String, Option<SeqV>, Option<SeqV>));
29-
30-
/// Send a future to the worker to let it run it in serialized order.
31-
fn send_future(&self, fut: BoxFuture<'static, ()>);
32-
}
33-
34-
impl<T> SMEventSender for Arc<T>
35-
where T: SMEventSender
36-
{
37-
fn send(&self, change: (String, Option<SeqV>, Option<SeqV>)) {
38-
self.as_ref().send(change);
39-
}
40-
41-
fn send_future(&self, fut: BoxFuture<'static, ()>) {
42-
self.as_ref().send_future(fut);
43-
}
44-
}
45-
4622
/// The API a state machine implements.
4723
///
4824
/// The state machine is responsible for managing the application's persistent state,
@@ -80,10 +56,17 @@ pub trait StateMachineApi: Send + Sync {
8056
/// metadata about the state machine and its configuration.
8157
fn sys_data_mut(&mut self) -> &mut SysData;
8258

83-
/// Returns an optional reference to the event sender.
59+
/// Notify subscribers of a key-value change applied to the state machine.
60+
///
61+
/// Called after a change is committed, but before it is guaranteed persisted.
62+
/// The change may be replayed on server restart.
8463
///
85-
/// This method returns an event sender that can be used to send state change events to subscribers.
64+
/// - `change`: (`String`, `Option<SeqV>`, `Option<SeqV>`)
65+
/// - key: user application key
66+
/// - old: previous value (`None` if new key)
67+
/// - new: new value (`None` if deleted)
8668
///
87-
/// The implementation could just return `None` if the state machine does not support subscribing.
88-
fn event_sender(&self) -> Option<&dyn SMEventSender>;
69+
/// Called for every successful create, update, or delete.
70+
/// Implementations without subscribers may leave this empty.
71+
fn on_change_applied(&mut self, change: (String, Option<SeqV>, Option<SeqV>));
8972
}

src/meta/raft-store/src/state_machine_api_ext.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,10 @@ pub trait StateMachineApiExt: StateMachineApi {
122122

123123
let strm = if let Some(right) = prefix_right_bound(&p) {
124124
self.user_map()
125-
.as_user_map()
126125
.range(UserKey::new(&p)..UserKey::new(right))
127126
.await?
128127
} else {
129-
self.user_map()
130-
.as_user_map()
131-
.range(UserKey::new(&p)..)
132-
.await?
128+
self.user_map().range(UserKey::new(&p)..).await?
133129
};
134130

135131
let strm = strm
@@ -218,7 +214,7 @@ pub trait StateMachineApiExt: StateMachineApi {
218214
///
219215
/// It does not check expiration of the returned entry.
220216
async fn get_maybe_expired_kv(&self, key: &String) -> Result<Option<SeqV>, io::Error> {
221-
let got = self.user_map().as_user_map().get(key.as_ref()).await?;
217+
let got = self.user_map().get(key.as_ref()).await?;
222218
let seqv = Into::<Option<SeqV>>::into(got);
223219
Ok(seqv)
224220
}

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use databend_common_meta_client::MetaGrpcReadReq;
2929
use databend_common_meta_client::MetaGrpcReq;
3030
use databend_common_meta_kvapi::kvapi::KVApi;
3131
use databend_common_meta_raft_store::state_machine::UserKey;
32-
use databend_common_meta_raft_store::state_machine_api::SMEventSender;
3332
use databend_common_meta_raft_store::state_machine_api_ext::StateMachineApiExt;
3433
use databend_common_meta_types::protobuf as pb;
3534
use databend_common_meta_types::protobuf::meta_service_server::MetaService;
@@ -74,6 +73,7 @@ use tonic::Request;
7473
use tonic::Response;
7574
use tonic::Status;
7675
use tonic::Streaming;
76+
use watcher::dispatch::Command;
7777
use watcher::key_range::build_key_range;
7878
use watcher::util::new_initialization_sink;
7979
use watcher::util::try_forward;
@@ -518,7 +518,7 @@ impl MetaService for MetaServiceImpl {
518518
sender_str
519519
);
520520

521-
mn.dispatcher_handle.send_future(fu);
521+
mn.dispatcher_handle.send_command(Command::Future(fu));
522522
}
523523

524524
stream

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,14 @@ impl MetaNodeBuilder {
174174
let handle = DispatcherHandle::new(handle, node_id);
175175
let handle = Arc::new(handle);
176176

177+
let on_change_applied = {
178+
let h = handle.clone();
179+
move |change| h.send_change(change)
180+
};
181+
177182
sto.get_state_machine()
178183
.await
179-
.set_event_sender(Box::new(handle.clone()));
184+
.set_on_change_applied(Box::new(on_change_applied));
180185

181186
let meta_node = Arc::new(MetaNode {
182187
raft_store: sto.clone(),

src/meta/service/src/meta_service/watcher.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,10 @@ use std::future::Future;
1919
use std::io::Error;
2020
use std::ops::Deref;
2121

22-
use databend_common_meta_raft_store::state_machine_api::SMEventSender;
2322
use databend_common_meta_types::protobuf::WatchResponse;
2423
use databend_common_meta_types::SeqV;
25-
use futures::future::BoxFuture;
2624
use log::debug;
2725
use tonic::Status;
28-
use watcher::dispatch::Command;
2926
use watcher::dispatch::DispatcherHandle as GenericDispatcherHandle;
3027
use watcher::type_config::KVChange;
3128
use watcher::type_config::KeyOf;
@@ -109,13 +106,3 @@ impl DispatcherHandle {
109106
h
110107
}
111108
}
112-
113-
impl SMEventSender for DispatcherHandle {
114-
fn send(&self, change: KVChange<WatchTypes>) {
115-
self.send_change(change);
116-
}
117-
118-
fn send_future(&self, fut: BoxFuture<'static, ()>) {
119-
self.send_command(Command::Future(fut));
120-
}
121-
}

0 commit comments

Comments
 (0)