Skip to content

Commit 0224108

Browse files
authored
refactor: implement cooperative state machine for range/list operations (#18204)
Fix blocking issue during initialization data transmission. Problem: When establishing a watch stream, the meta-service sends large amounts of initialization data to the client. During this transmission, other events are blocked until completion, including add-watcher commands. This creates a deadlock: if initialization data is large, it blocks all subsequent Dispatcher operations. When a second watch request arrives, it must wait for the first one to complete sending all initialization data. Since adding a new watcher requires holding the state machine lock, multiple concurrent watch requests will block the state machine entirely, causing timeouts for other requests. Solution: Make the process cooperative by not waiting for watch stream transmission to complete. Instead, queue the add-watcher command and return immediately. This allows subsequent watch requests to proceed without waiting for previous initialization data transmissions to finish.
1 parent 1c11888 commit 0224108

File tree

7 files changed

+105
-34
lines changed

7 files changed

+105
-34
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ url = "2.5.4"
535535
uuid = { version = "1.10.0", features = ["std", "serde", "v4", "v7"] }
536536
volo-thrift = "0.10"
537537
walkdir = "2.3.2"
538-
watcher = { version = "0.4.1" }
538+
watcher = { version = "0.4.2" }
539539
wiremock = "0.6"
540540
wkt = "0.11.1"
541541
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
@@ -661,5 +661,5 @@ sub-cache = { git = "https://github.com/databendlabs/sub-cache", tag = "v0.2.1"
661661
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
662662
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }
663663
tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" }
664-
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.1" }
664+
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.2" }
665665
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }

src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::leveled_store::rotbl_codec::RotblCodec;
3333
use crate::leveled_store::util;
3434
use crate::marked::Marked;
3535
use crate::state_machine::ExpireKey;
36+
use crate::utils::add_cooperative_yielding;
3637

3738
/// The data to compact.
3839
///
@@ -141,6 +142,8 @@ impl<'a> CompactingData<'a> {
141142
// Filter out tombstone
142143
let normal_strm = coalesce.try_filter(|(_k, v)| future::ready(v.is_normal()));
143144

145+
let normal_strm = add_cooperative_yielding(normal_strm, "compact");
146+
144147
Ok((sys_data, normal_strm.boxed()))
145148
}
146149
}

src/meta/raft-store/src/state_machine_api_ext.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::leveled_store::map_api::MarkedOf;
3939
use crate::marked::Marked;
4040
use crate::state_machine::ExpireKey;
4141
use crate::state_machine_api::StateMachineApi;
42+
use crate::utils::add_cooperative_yielding;
4243
use crate::utils::prefix_right_bound;
4344

4445
#[async_trait::async_trait]
@@ -111,7 +112,9 @@ pub trait StateMachineApiExt: StateMachineApi {
111112

112113
let strm = strm
113114
// Return only keys with the expected prefix
114-
.try_take_while(move |(k, _)| future::ready(Ok(k.starts_with(&p))))
115+
.try_take_while(move |(k, _)| future::ready(Ok(k.starts_with(&p))));
116+
117+
let strm = add_cooperative_yielding(strm, format!("list_kv: {prefix}"))
115118
// Skip tombstone
116119
.try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked))));
117120

@@ -121,10 +124,15 @@ pub trait StateMachineApiExt: StateMachineApi {
121124
/// Return a range of kv entries.
122125
async fn range_kv<R>(&self, rng: R) -> Result<IOResultStream<(String, SeqV)>, io::Error>
123126
where R: RangeBounds<String> + Send + Sync + Clone + 'static {
124-
let strm = self.map_ref().str_map().range(rng).await?;
127+
let left = rng.start_bound().cloned();
128+
let right = rng.end_bound().cloned();
125129

126-
// Skip tombstone
127-
let strm = strm.try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked))));
130+
let leveled_map = self.map_ref();
131+
let strm = leveled_map.str_map().range(rng).await?;
132+
133+
let strm = add_cooperative_yielding(strm, format!("range_kv: {left:?} to {right:?}"))
134+
// Skip tombstone
135+
.try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked))));
128136

129137
Ok(strm.boxed())
130138
}
@@ -181,12 +189,13 @@ pub trait StateMachineApiExt: StateMachineApi {
181189

182190
let strm = self.map_ref().expire_map().range(start..end).await?;
183191

184-
let strm = strm
185-
// Return only non-deleted records
186-
.try_filter_map(|(k, marked)| {
187-
let expire_entry = marked.unpack().map(|(v, _v_meta)| (k, v));
188-
future::ready(Ok(expire_entry))
189-
});
192+
let strm =
193+
add_cooperative_yielding(strm, format!("list_expire_index since {start} to {end}"))
194+
// Return only non-deleted records
195+
.try_filter_map(|(k, marked)| {
196+
let expire_entry = marked.unpack().map(|(v, _v_meta)| (k, v));
197+
future::ready(Ok(expire_entry))
198+
});
190199

191200
Ok(strm.boxed())
192201
}

src/meta/raft-store/src/utils.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,44 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt;
16+
17+
use futures::Stream;
18+
use futures_util::StreamExt;
19+
use log::info;
20+
21+
/// Add cooperative yielding to a stream to prevent task starvation.
22+
///
23+
/// This yields control back to the async runtime every 100 items to prevent
24+
/// blocking other concurrent tasks when processing large streams.
25+
pub(crate) fn add_cooperative_yielding<S, T>(
26+
stream: S,
27+
stream_name: impl fmt::Display + Send,
28+
) -> impl Stream<Item = T>
29+
where
30+
S: Stream<Item = T>,
31+
T: Send + 'static,
32+
{
33+
stream.enumerate().then(move |(index, item)| {
34+
// Yield control every 100 items to prevent blocking other tasks
35+
let to_yield = if index % 100 == 0 {
36+
if index % 5000 == 0 {
37+
info!("{stream_name} yield control to allow other tasks to run: index={index}");
38+
}
39+
true
40+
} else {
41+
false
42+
};
43+
44+
async move {
45+
if to_yield {
46+
tokio::task::yield_now().await;
47+
}
48+
item
49+
}
50+
})
51+
}
52+
1553
/// Return the right bound of the prefix, so that `p..right` will cover all strings with prefix `p`.
1654
///
1755
/// If the right bound can not be built, return None.

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ use databend_common_meta_types::LogEntry;
5353
use databend_common_meta_types::TxnReply;
5454
use databend_common_meta_types::TxnRequest;
5555
use databend_common_metrics::count::Count;
56-
use display_more::DisplayOptionExt;
5756
use fastrace::func_name;
5857
use fastrace::func_path;
5958
use fastrace::prelude::*;
@@ -449,8 +448,11 @@ impl MetaService for MetaServiceImpl {
449448
let sm = &mn.raft_store.state_machine;
450449
let sm = sm.write().await;
451450

452-
let weak_sender = mn.add_watcher(watch, tx.clone()).await?;
453-
let sender_str = weak_sender.upgrade().map(|s| s.to_string());
451+
info!("enter sm write lock for watch {}", watch);
452+
453+
let sender = mn.new_watch_sender(watch, tx.clone())?;
454+
let sender_str = sender.to_string();
455+
let weak_sender = mn.insert_watch_sender(sender);
454456

455457
// Build a closure to remove the stream tx from Dispatcher when the stream is dropped.
456458
let on_drop = {
@@ -467,9 +469,15 @@ impl MetaService for MetaServiceImpl {
467469
let snk = new_initialization_sink::<WatchTypes>(tx.clone(), ctx);
468470
let strm = sm.range_kv(key_range).await?;
469471

472+
info!("created initialization stream for {}", sender_str);
473+
474+
let sndr = sender_str.clone();
475+
470476
let fu = async move {
471477
try_forward(strm, snk, ctx).await;
472478

479+
info!("initialization flush complete for watcher {}", sndr);
480+
473481
// Send an empty message with `is_initialization=false` to indicate
474482
// the end of the initialization flush.
475483
tx.send(Ok(WatchResponse::new_initialization_complete()))
@@ -478,12 +486,17 @@ impl MetaService for MetaServiceImpl {
478486
error!("failed to send flush complete message: {}", e);
479487
})
480488
.ok();
489+
490+
info!(
491+
"finished sending initialization complete flag for watcher {}",
492+
sndr
493+
);
481494
};
482495
let fu = Box::pin(fu);
483496

484497
info!(
485498
"sending initial flush Future to watcher {} via Dispatcher",
486-
sender_str.display()
499+
sender_str
487500
);
488501

489502
mn.dispatcher_handle.send_future(fu);

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,26 +1174,34 @@ impl MetaNode {
11741174
}
11751175
}
11761176

1177-
pub(crate) async fn add_watcher(
1177+
pub(crate) fn insert_watch_sender(
1178+
&self,
1179+
sender: Arc<WatchStreamSender<WatchTypes>>,
1180+
) -> Weak<WatchStreamSender<WatchTypes>> {
1181+
let weak = Arc::downgrade(&sender);
1182+
1183+
self.dispatcher_handle
1184+
.request(move |dispatcher: &mut Dispatcher<WatchTypes>| {
1185+
dispatcher.insert_watch_stream_sender(sender);
1186+
});
1187+
1188+
weak
1189+
}
1190+
1191+
pub(crate) fn new_watch_sender(
11781192
&self,
11791193
request: WatchRequest,
11801194
tx: mpsc::Sender<Result<WatchResponse, Status>>,
1181-
) -> Result<Weak<WatchStreamSender<WatchTypes>>, Status> {
1182-
let stream_sender = self
1183-
.dispatcher_handle
1184-
.request_blocking(move |dispatcher: &mut Dispatcher<WatchTypes>| {
1185-
let key_range = match build_key_range(&request.key, &request.key_end) {
1186-
Ok(kr) => kr,
1187-
Err(e) => return Err(Status::invalid_argument(e.to_string())),
1188-
};
1189-
1190-
let interested = event_filter_from_filter_type(request.filter_type());
1191-
Ok(dispatcher.add_watcher(key_range, interested, tx))
1192-
})
1193-
.await
1194-
.map_err(|_e| Status::internal("watch-event-Dispatcher closed"))??;
1195+
) -> Result<Arc<WatchStreamSender<WatchTypes>>, Status> {
1196+
let key_range = match build_key_range(&request.key, &request.key_end) {
1197+
Ok(kr) => kr,
1198+
Err(e) => return Err(Status::invalid_argument(e.to_string())),
1199+
};
1200+
1201+
let interested = event_filter_from_filter_type(request.filter_type());
11951202

1196-
Ok(stream_sender)
1203+
let sender = Dispatcher::new_watch_stream_sender(key_range.clone(), interested, tx);
1204+
Ok(sender)
11971205
}
11981206

11991207
/// Get a kvapi::KVApi implementation.

0 commit comments

Comments
 (0)