Skip to content

Commit 270301c

Browse files
mayastor-borstiagolobocastro
andcommitted
Merge #1949
1949: SPDK DiskError and Hot-removal patches r=tiagolobocastro a=tiagolobocastro test: add lvs hot-removal The bdev aio and uring modules now hot-remove the bdev when they detect the backing device is hot-removed. This happens when they complete an io with -ENODEV, -EIO or positive error. Add a test which leverages ublk to achieve this by removing the ublk device. --- feat(ublk): add ublksrv nix package --- test: add disk errors test Makes uses of an LVM lvol allowing us to set it up to yield errors without detaching the device. We "import" the gRPC pool types, allowing us to test we're returning the correct states, alerts and errors. --- fix(lvm): cleanup stable dm entries when creating a vg --- test: don't panic twice on environment drop Also improve some testing asserts to output more information. --- build: pulls latest spdk-rs to include hotremoval fixes --- feat(lvm): add device-mapper utilities Adds methods for suspending, resuming, retrieving the table and updating the table. This can be useful for testing for example. --- feat(lvm): support commands with inputs --- fix(lvm): close fd above stderr On invocation, lvm requires that only the standard file descriptors stdin, stdout and stderr are available. If others are found, they get closed and messages are issued warning about the leak. Previously we were silencing the errors only, now we close the range from 3 to 1024. todo: pick higher range? --- refactor: implement default for cli args Makes it less awkward to use. Also we should consider modifying the MayastorEnvironment. --- refactor: expose lvm vg create lvol This will allow us to use concrete types for testing. As a bonus we also avoid looking up the VG twice. Though something seems slightly off, there's some indirection back and forth from vg to lv. --- fix(lvm): handle lvol already exists error If already exists, then validates the parameters. --- feat: allow running the lvm code outside of spdk context Adds a no_spdk variable to the PoolArgs, which is only usable for the LVM backend currently, though could extend to ZFS ex. As a simplistic impl for now we won't attach the mayastor tag in this case. Probably not ideal, but for now should be ok. Also the lvm commands were being trampolined to the spdk reactor. This is easily removed by simply checking if we're in and spdk thread, and if not then no need to trampoline anyway so this was an easy win. Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
2 parents fe94489 + f7c10d6 commit 270301c

File tree

26 files changed

+795
-176
lines changed

26 files changed

+795
-176
lines changed

io-engine-tests/src/compose/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ impl Drop for MayastorTest<'_> {
139139
self.reactor.send_future(async { mayastor_env_stop(0) });
140140
// wait for mayastor to stop
141141
let hdl = self.thdl.take().unwrap();
142-
hdl.join().unwrap()
142+
let result = hdl.join();
143+
if !std::thread::panicking() {
144+
result.unwrap();
145+
}
143146
}
144147
}

io-engine-tests/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ pub fn dd_random_file(path: &str, bs: u32, size: u64) {
176176
.output()
177177
.expect("failed exec dd");
178178

179-
assert!(output.status.success());
179+
assert!(output.status.success(), "{:#?}", output);
180180
}
181181

182182
pub fn truncate_file(path: &str, size: u64) {
@@ -185,23 +185,23 @@ pub fn truncate_file(path: &str, size: u64) {
185185
.output()
186186
.expect("failed exec truncate");
187187

188-
assert!(output.status.success());
188+
assert!(output.status.success(), "{:#?}", output);
189189
}
190190

191191
pub fn truncate_file_bytes(path: &str, size: u64) {
192192
let output = Command::new("truncate")
193193
.args(["-s", &format!("{size}"), path])
194194
.output()
195195
.expect("failed exec truncate");
196-
assert!(output.status.success());
196+
assert!(output.status.success(), "{:#?}", output);
197197
}
198198

199199
pub fn expand_tempfs_file_bytes(path: &str, size: u64) {
200200
let output = Command::new("truncate")
201201
.args(["-s", &format!("+{size}"), path])
202202
.output()
203203
.expect("failed exec truncate");
204-
assert!(output.status.success());
204+
assert!(output.status.success(), "{:#?}", output);
205205
}
206206

207207
/// Automatically assign a loopdev to path
@@ -212,7 +212,7 @@ pub fn setup_loopdev_file(path: &str, sector_size: Option<u64>) -> String {
212212
.args(["-f", "--show", "-b", &format!("{log_sec}"), path])
213213
.output()
214214
.expect("failed exec losetup");
215-
assert!(output.status.success());
215+
assert!(output.status.success(), "{:#?}", output);
216216
// return the assigned loop device
217217
String::from_utf8(output.stdout).unwrap().trim().to_string()
218218
}
@@ -223,7 +223,7 @@ pub fn detach_loopdev(dev: &str) {
223223
.args(["-d", dev])
224224
.output()
225225
.expect("failed exec losetup");
226-
assert!(output.status.success());
226+
assert!(output.status.success(), "{:#?}", output);
227227
}
228228

229229
pub fn fscheck(device: &str) {
@@ -235,7 +235,7 @@ pub fn fscheck(device: &str) {
235235
io::stdout().write_all(&output.stderr).unwrap();
236236
io::stdout().write_all(&output.stdout).unwrap();
237237

238-
assert!(output.status.success());
238+
assert!(output.status.success(), "{:#?}", output);
239239
}
240240

241241
pub fn mkfs(path: &str, fstype: &str) -> bool {
@@ -279,7 +279,7 @@ pub fn compare_files(a: &str, b: &str) {
279279

280280
io::stdout().write_all(&output.stderr).unwrap();
281281
io::stdout().write_all(&output.stdout).unwrap();
282-
assert!(output.status.success());
282+
assert!(output.status.success(), "{:#?}", output);
283283
}
284284

285285
pub fn mount_umount(device: &str) -> Result<String, String> {

io-engine-tests/src/pool.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ impl PoolBuilderLocal {
282282
backend: Default::default(),
283283
enc_key: None,
284284
crypto_vbdev_name: None,
285+
no_spdk: false,
285286
})
286287
.await?;
287288
Ok(lvs)

io-engine/examples/lvs-eval/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ async fn create_lvs(args: &CliArgs) -> Lvs {
135135
md_args: Some(PoolMetadataArgs {
136136
max_expansion: args.max_expansion.clone(),
137137
}),
138-
backend: Default::default(),
139-
enc_key: None,
140-
crypto_vbdev_name: None,
138+
..Default::default()
141139
};
142140

143141
Lvs::create_or_import(lvs_args.clone()).await.unwrap()

io-engine/src/bdev/lvs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ impl Lvs {
217217
enc_key: self.key.clone(),
218218
// XXX: Is this path ever exercised apart from test, or casperf perhaps?
219219
crypto_vbdev_name: self.key.as_ref().map(|_| format!("crypto_{}", self.name)),
220+
no_spdk: false,
220221
};
221222
match &self.mode {
222223
LvsMode::Create => match crate::lvs::Lvs::import_from_args(args.clone()).await {

io-engine/src/core/env.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -299,40 +299,50 @@ pub struct MayastorCliArgs {
299299
#[derive(Debug, clap::Parser, Clone)]
300300
pub struct SpdkTracingArgs {
301301
/// Number of trace entries per lcore.
302-
#[clap(long, default_value_t = 0)]
302+
#[clap(long, env = "SPDK_TRACING_ENTRIES", default_value_t = 0)]
303303
pub entries: u64,
304304
/// Number of user created threads.
305-
#[clap(long, default_value_t = 1)]
305+
#[clap(long, env = "SPDK_TRACING_THREADS", default_value_t = 1)]
306306
pub threads: u32,
307307
}
308+
impl Default for SpdkTracingArgs {
309+
fn default() -> Self {
310+
Self::parse_from(Vec::<String>::new())
311+
}
312+
}
308313

309314
/// DiskPool related arguments.
310315
#[derive(Debug, clap::Parser, Clone)]
311316
pub struct PoolCliArgs {
312317
/// I/O error count threshold.
313318
/// After this many errors a pool alert is raised as Warning.
314-
#[clap(long, default_value_t = 64)]
319+
#[clap(long, env, default_value_t = 8)]
315320
pub io_error_threshold: u64,
316321

317322
/// I/O stall deadline.
318323
/// If an I/O is stuck longer than this period, then the pool is considered stalled and a
319324
/// Critical alert is raised.
320325
/// The pool disk will also be reset and the stall will be cleared once complete and
321326
/// I/O flows again.
322-
#[clap(long, default_value = "30s")]
327+
#[clap(long, env, default_value = "30s")]
323328
pub io_stall_deadline: humantime::Duration,
324329

325330
/// I/O stall transitions threshold.
326331
/// After this many transitions within the window, a pool alert is raised as Warning.
327-
#[clap(long, default_value_t = 3)]
332+
#[clap(long, env, default_value_t = 3)]
328333
pub io_stall_transition_threshold: u64,
329334

330335
/// I/O stall transitions window.
331336
/// Time window during which stall ↔ resume state transitions are tracked
332337
/// for flakiness detection.
333-
#[clap(long, default_value = "3h")]
338+
#[clap(long, env, default_value = "3h")]
334339
pub io_stall_transition_window: humantime::Duration,
335340
}
341+
impl Default for PoolCliArgs {
342+
fn default() -> Self {
343+
Self::parse_from(Vec::<String>::new())
344+
}
345+
}
336346

337347
fn delay_compat(s: &str) -> Result<bool, String> {
338348
match s {
@@ -507,8 +517,8 @@ impl Default for MayastorEnvironment {
507517
developer_delay: false,
508518
rdma: false,
509519
bs_cluster_unmap: false,
510-
pool_args: PoolCliArgs::parse_from(Vec::<String>::new()),
511-
traces: SpdkTracingArgs::parse_from(Vec::<String>::new()),
520+
pool_args: PoolCliArgs::default(),
521+
traces: SpdkTracingArgs::default(),
512522
}
513523
}
514524
}

io-engine/src/grpc/v1/pool.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ impl TryFrom<CreatePoolRequest> for PoolArgs {
278278
backend: backend.into(),
279279
enc_key: None,
280280
crypto_vbdev_name: None,
281+
no_spdk: false,
281282
})
282283
}
283284
}
@@ -366,6 +367,7 @@ impl TryFrom<ImportPoolRequest> for PoolArgs {
366367
.encryption
367368
.as_ref()
368369
.map(|_| format!("crypto_{}", args.name)),
370+
no_spdk: false,
369371
})
370372
}
371373
}
@@ -578,6 +580,11 @@ impl PoolErrorsNt {
578580
}
579581
}
580582

583+
/// Convert something which implements [`PoolOps`] to the proto `Pool` type.
584+
pub async fn pool_to_proto(pool: &dyn PoolOps) -> Pool {
585+
pool.async_into().await
586+
}
587+
581588
impl AsyncFrom<Box<dyn PoolOps>> for Pool {
582589
async fn async_from(value: Box<dyn PoolOps>) -> Self {
583590
let value = value.deref();

io-engine/src/lvm/cli.rs

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use crate::lvm::{error, error::Error, property::Property};
22

3+
use nix::errno::Errno;
34
use serde::de::Deserialize;
45
use snafu::ResultExt;
56
use std::ffi::OsStr;
67
use strum_macros::{AsRefStr, Display, EnumString};
7-
use tokio::process::Command;
8+
use tokio::{io::AsyncWriteExt, process::Command};
89

910
/// Common set of query options for a volume group or logical volume.
1011
/// If the name is present then the name will be used to query.
@@ -30,6 +31,15 @@ impl CmnQueryArgs {
3031
..Default::default()
3132
}
3233
}
34+
/// Find only our entries (ie, with our tag).
35+
pub(crate) fn ours_if(spdk: bool) -> Self {
36+
if spdk {
37+
Self::ours()
38+
} else {
39+
Self::any()
40+
}
41+
}
42+
3343
/// Find entries with the given name.
3444
pub(crate) fn named_opt(self, name: &Option<String>) -> Self {
3545
let Some(name) = name else {
@@ -111,12 +121,19 @@ enum LvmSubCmd {
111121
/// Display information about logical volumes.
112122
#[strum(serialize = "lvs")]
113123
LVList,
124+
/// DeviceMapper commands.
125+
#[strum(serialize = "dmsetup")]
126+
DMSetup,
127+
/// BlockDevice commands.
128+
#[strum(serialize = "blockdev")]
129+
BlkDev,
114130
}
115131

116132
/// LVM wrapper over `Command` with added qol such as error mapping and
117133
/// decoding of json output reports.
118134
pub(super) struct LvmCmd {
119135
cmd: &'static str,
136+
input: Option<String>,
120137
cmder: Command,
121138
}
122139

@@ -142,6 +159,7 @@ impl LvmCmd {
142159
Self {
143160
cmd,
144161
cmder: Command::new(cmd),
162+
input: None,
145163
}
146164
}
147165
/// Prepare a `Command` for `LvmSubCmd::PVCreate`.
@@ -188,6 +206,14 @@ impl LvmCmd {
188206
pub(super) fn lv_list() -> Self {
189207
Self::new(LvmSubCmd::LVList.as_ref())
190208
}
209+
/// Prepare a `Command` for `LvmSubCmd::DMSuspend`.
210+
pub(super) fn dm_setup() -> Self {
211+
Self::new(LvmSubCmd::DMSetup.as_ref())
212+
}
213+
/// Prepare a `Command` for `LvmSubCmd::BlkDev`.
214+
pub(super) fn blk_dev() -> Self {
215+
Self::new(LvmSubCmd::BlkDev.as_ref())
216+
}
191217
/// Runs the LVM command with the provided `Command` arguments et all and
192218
/// returns an LVM specific report containing an output type `T`.
193219
/// >> Note: This requires the json output to be specified in args.
@@ -241,7 +267,7 @@ impl LvmCmd {
241267
/// Tag the given `Property`.
242268
pub(super) fn tag_if(self, tag: bool, property: Property) -> Self {
243269
if tag {
244-
self.arg(property.add())
270+
self.tag(property)
245271
} else {
246272
self
247273
}
@@ -264,6 +290,11 @@ impl LvmCmd {
264290
self.cmder.args(args);
265291
self
266292
}
293+
/// Run the command with the given input.
294+
pub fn input(mut self, input: String) -> Self {
295+
self.input = Some(input);
296+
self
297+
}
267298
/// Runs the LVM command with the provided `Command` arguments et al.
268299
///
269300
/// # Errors
@@ -285,14 +316,11 @@ impl LvmCmd {
285316
pub(super) async fn output(mut self) -> Result<std::process::Output, Error> {
286317
tracing::trace!("{:?}", self.cmder);
287318

288-
crate::tokio_run!(async move {
289-
let output = self
290-
.cmder
291-
.output()
292-
.await
293-
.context(error::LvmBinSpawnErrSnafu {
294-
command: self.cmd.to_string(),
295-
})?;
319+
let in_spdk = spdk_rs::Thread::is_spdk_thread();
320+
let fut = async move {
321+
let output = self.cmder().await.context(error::LvmBinSpawnErrSnafu {
322+
command: self.cmd.to_string(),
323+
})?;
296324
if !output.status.success() {
297325
let error = String::from_utf8_lossy(&output.stderr).to_string();
298326
return Err(Error::LvmBinErr {
@@ -301,7 +329,42 @@ impl LvmCmd {
301329
});
302330
}
303331
Ok(output)
304-
})
332+
};
333+
if in_spdk {
334+
crate::tokio_run!(fut)
335+
} else {
336+
fut.await
337+
}
338+
}
339+
340+
async fn cmder(&mut self) -> std::io::Result<std::process::Output> {
341+
unsafe {
342+
self.cmder.pre_exec(|| {
343+
Self::close_range().ok();
344+
Ok(())
345+
});
346+
}
347+
348+
let Some(input) = self.input.take() else {
349+
return self.cmder.output().await;
350+
};
351+
352+
let mut child = self.cmder.stdin(std::process::Stdio::piped()).spawn()?;
353+
354+
if let Some(mut stdin) = child.stdin.take() {
355+
stdin.write_all(input.as_bytes()).await?;
356+
stdin.shutdown().await?;
357+
}
358+
359+
child.wait_with_output().await
360+
}
361+
362+
/// The close_range system call closes all open file descriptors from first to last (included).
363+
/// Here close from 3 to 1024.
364+
/// todo: find a better way such as querying /proc/self/fd ?.
365+
fn close_range() -> nix::Result<()> {
366+
let res = unsafe { libc::close_range(3, 1024, libc::CLOSE_RANGE_CLOEXEC as i32) };
367+
Errno::result(res).map(drop)
305368
}
306369
}
307370

0 commit comments

Comments
 (0)