Skip to content
This repository was archived by the owner on Apr 4, 2024. It is now read-only.

Commit 2cf1175

Browse files
Add callback for fail-rs (#50)
I want to add a new kind of action for fail-rs, which can help TiKV be able to stop at some statement and call one callback function that set by tests. This feature will help use more convenient to deal with some tests which need to wait for a while, because we can pass a callback to fail_point to notify test-thread to avoid waiting. Signed-off-by: Little-Wallace <[email protected]>
1 parent e75d98a commit 2cf1175

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

src/lib.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,39 @@
226226

227227
use std::collections::HashMap;
228228
use std::env::VarError;
229+
use std::fmt::Debug;
229230
use std::str::FromStr;
230231
use std::sync::atomic::{AtomicUsize, Ordering};
231232
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, TryLockError};
232233
use std::time::{Duration, Instant};
233234
use std::{env, thread};
234235

236+
#[derive(Clone)]
237+
struct SyncCallback(Arc<dyn Fn() + Send + Sync>);
238+
239+
impl Debug for SyncCallback {
240+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241+
f.write_str("SyncCallback()")
242+
}
243+
}
244+
245+
impl PartialEq for SyncCallback {
246+
fn eq(&self, other: &Self) -> bool {
247+
Arc::ptr_eq(&self.0, &other.0)
248+
}
249+
}
250+
251+
impl SyncCallback {
252+
fn new(f: impl Fn() + Send + Sync + 'static) -> SyncCallback {
253+
SyncCallback(Arc::new(f))
254+
}
255+
256+
fn run(&self) {
257+
let callback = &self.0;
258+
callback();
259+
}
260+
}
261+
235262
/// Supported tasks.
236263
#[derive(Clone, Debug, PartialEq)]
237264
enum Task {
@@ -251,6 +278,8 @@ enum Task {
251278
Yield,
252279
/// Busy waiting for some milliseconds.
253280
Delay(u64),
281+
/// Call callback function.
282+
Callback(SyncCallback),
254283
}
255284

256285
#[derive(Debug)]
@@ -285,6 +314,15 @@ impl Action {
285314
}
286315
}
287316

317+
fn from_callback(f: impl Fn() + Send + Sync + 'static) -> Action {
318+
let task = Task::Callback(SyncCallback::new(f));
319+
Action {
320+
task,
321+
freq: 1.0,
322+
count: None,
323+
}
324+
}
325+
288326
fn get_task(&self) -> Option<Task> {
289327
use rand::Rng;
290328

@@ -459,6 +497,9 @@ impl FailPoint {
459497
let timeout = Duration::from_millis(t);
460498
while timer.elapsed() < timeout {}
461499
}
500+
Task::Callback(f) => {
501+
f.run();
502+
}
462503
}
463504
None
464505
}
@@ -628,6 +669,25 @@ pub fn cfg<S: Into<String>>(name: S, actions: &str) -> Result<(), String> {
628669
set(&mut registry, name.into(), actions)
629670
}
630671

672+
/// Configure the actions for a fail point at runtime.
673+
///
674+
/// Each fail point can be configured by a callback. Process will call this callback function
675+
/// when it meet this fail-point.
676+
pub fn cfg_callback<S, F>(name: S, f: F) -> Result<(), String>
677+
where
678+
S: Into<String>,
679+
F: Fn() + Send + Sync + 'static,
680+
{
681+
let mut registry = REGISTRY.registry.write().unwrap();
682+
let p = registry
683+
.entry(name.into())
684+
.or_insert_with(|| Arc::new(FailPoint::new()));
685+
let action = Action::from_callback(f);
686+
let actions = vec![action];
687+
p.set_actions("callback", actions);
688+
Ok(())
689+
}
690+
631691
/// Remove a fail point.
632692
///
633693
/// If the fail point doesn't exist, nothing will happen.

tests/tests.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3+
use std::sync::atomic::{AtomicUsize, Ordering};
34
use std::sync::*;
45
use std::time::*;
56
use std::*;
@@ -135,6 +136,27 @@ fn test_yield() {
135136
f();
136137
}
137138

139+
#[test]
140+
#[cfg_attr(not(feature = "failpoints"), ignore)]
141+
fn test_callback() {
142+
let f1 = || {
143+
fail_point!("cb");
144+
};
145+
let f2 = || {
146+
fail_point!("cb");
147+
};
148+
149+
let counter = Arc::new(AtomicUsize::new(0));
150+
let counter2 = counter.clone();
151+
fail::cfg_callback("cb", move || {
152+
counter2.fetch_add(1, Ordering::SeqCst);
153+
})
154+
.unwrap();
155+
f1();
156+
f2();
157+
assert_eq!(2, counter.load(Ordering::SeqCst));
158+
}
159+
138160
#[test]
139161
#[cfg_attr(not(feature = "failpoints"), ignore)]
140162
fn test_delay() {

0 commit comments

Comments
 (0)