Skip to content

Commit 32c2329

Browse files
authored
feat(main): 引入多线程读取 ctl 命名管道,用信号机制改写 service 和 task 的状态检查 (#47)
* feat: 多线程读取ctl命名管道,信号机制改写service和task的状态检查
1 parent e945c21 commit 32c2329

File tree

6 files changed

+131
-88
lines changed

6 files changed

+131
-88
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ path = "systemctl/src/main.rs"
1414
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1515

1616
[dependencies]
17+
nix = "0.23"
1718
hashbrown = "0.11"
1819
cfg-if = { version = "1.0" }
1920
lazy_static = { version = "1.4.0" }
2021
libc = "0.2"
2122
humantime = "2.1"
22-
23+
tokio = { version = "1.25", features = ["full"] }
2324
[profile.release]
2425
panic = 'abort'
2526

src/main.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ mod systemctl;
77
mod task;
88
mod time;
99
mod unit;
10-
10+
use crate::executor::Executor;
1111
use error::ErrorFormat;
1212
use manager::{timer_manager::TimerManager, Manager};
1313
use parse::UnitParser;
14+
use std::thread;
1415
use systemctl::listener::Systemctl;
15-
16-
use crate::executor::Executor;
16+
use unit::signal::init_signal_handler;
1717

1818
pub struct FileDescriptor(usize);
1919

@@ -54,6 +54,13 @@ fn main() {
5454
println!("Parse {} success!", path);
5555
}
5656

57+
// 初始化信号处理程序
58+
init_signal_handler();
59+
// 监听systemctl
60+
thread::spawn(move || {
61+
Systemctl::ctl_listen();
62+
});
63+
5764
// 启动完服务后进入主循环
5865
loop {
5966
// 检查各服务运行状态
@@ -62,7 +69,5 @@ fn main() {
6269
Manager::check_cmd_proc();
6370
// 检查计时器任务
6471
TimerManager::check_timer();
65-
// 监听systemctl
66-
Systemctl::ctl_listen();
6772
}
6873
}

src/manager/mod.rs

Lines changed: 77 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -7,92 +7,106 @@ pub use unit_manager::*;
77
use crate::executor::ExitStatus;
88

99
use self::timer_manager::TimerManager;
10-
10+
use crate::unit::signal::SIGCHILD_SIGNAL_RECEIVED;
11+
use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
12+
use nix::unistd::Pid;
13+
use std::sync::atomic::Ordering;
1114
pub struct Manager;
1215

1316
impl Manager {
14-
/// ## 检查当前DragonReach运行的项目状态,并对其分发处理
17+
/// ## 检查当前 DragonReach 运行的项目状态,并对其分发处理
1518
pub fn check_running_status() {
16-
// 检查正在运行的Unit
17-
let mut running_manager = RUNNING_TABLE.write().unwrap();
18-
let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new();
19-
for unit in running_manager.mut_running_table() {
20-
let proc = unit.1;
21-
match proc.try_wait() {
22-
//进程正常退出
23-
Ok(Some(status)) => {
24-
exited_unit.push((
25-
*unit.0,
26-
ExitStatus::from_exit_code(status.code().unwrap_or(0)),
27-
));
28-
}
29-
//进程错误退出(或启动失败)
30-
Err(e) => {
31-
eprintln!("unit error: {}", e);
32-
33-
//test
34-
exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0)));
19+
if SIGCHILD_SIGNAL_RECEIVED
20+
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
21+
.is_ok()
22+
{
23+
let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new();
24+
let mut running_manager = RUNNING_TABLE.write().unwrap();
25+
// 检查所有运行中的 Unit
26+
for unit in running_manager.mut_running_table() {
27+
let pid = Pid::from_raw(unit.1.id() as i32);
28+
// 检查 Unit 的运行状态
29+
match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) {
30+
// 若 Unit 为正常退出,则将其加入退出列表
31+
Ok(WaitStatus::Exited(_, status)) => {
32+
exited_unit.push((*unit.0, ExitStatus::from_exit_code(status)));
33+
}
34+
// 若 Unit 为被信号终止,则将其加入退出列表,并输出日志
35+
Ok(WaitStatus::Signaled(_, signal, _)) => {
36+
eprintln!("unit terminated by signal: {}", signal);
37+
exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0)));
38+
}
39+
// 其他错误情况
40+
Err(_) => {
41+
eprintln!("unit waitpid error");
42+
}
43+
// 若 Unit 正常运行,则不做处理
44+
Ok(_) => {}
3545
}
36-
//进程处于正常运行状态
37-
_ => {}
3846
}
39-
}
40-
//释放锁,以便后续删除操作能拿到锁
41-
drop(running_manager);
4247

43-
// 处理退出的Unit
44-
for tmp in exited_unit {
45-
// 从运行表中擦除该unit
46-
UnitManager::remove_running(tmp.0);
48+
drop(running_manager);
4749

48-
// 取消该任务的定时器任务
49-
TimerManager::cancel_timer(tmp.0);
50+
// 处理退出的 Unit
51+
for tmp in exited_unit {
52+
// 将该任务从运行表中移除
53+
UnitManager::remove_running(tmp.0);
5054

51-
let _ = UnitManager::get_unit_with_id(&tmp.0)
52-
.unwrap()
53-
.lock()
54-
.unwrap()
55-
.exit(); //交付给相应类型的Unit类型去执行退出后的逻辑
55+
// 取消该任务的定时器任务
56+
TimerManager::cancel_timer(tmp.0);
5657

57-
TimerManager::update_next_trigger(tmp.0, false); //更新所有归属于此unit的计时器
58+
// 交付处理子进程退出逻辑
59+
let _ = UnitManager::get_unit_with_id(&tmp.0)
60+
.unwrap()
61+
.lock()
62+
.unwrap()
63+
.exit();
5864

59-
// 交付处理子进程退出逻辑
60-
let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap();
61-
unit.lock().unwrap().after_exit(tmp.1);
62-
}
65+
// 更新属于该 Unit 的定时器任务
66+
TimerManager::update_next_trigger(tmp.0, false);
6367

64-
// 若无运行中任务,则取出IDLE任务运行
65-
if UnitManager::running_count() == 0 {
66-
let unit = UnitManager::pop_a_idle_service();
67-
match unit {
68-
Some(unit) => {
68+
// 交付处理子进程退出后逻辑
69+
let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap();
70+
unit.lock().unwrap().after_exit(tmp.1);
71+
}
72+
// 若无运行中任务,则取出 IDLE 任务运行
73+
if UnitManager::running_count() == 0 {
74+
if let Some(unit) = UnitManager::pop_a_idle_service() {
6975
let _ = unit.lock().unwrap().run();
7076
}
71-
None => {}
7277
}
7378
}
7479
}
7580

7681
/// ## 检查当前所有cmd进程的运行状态
7782
pub fn check_cmd_proc() {
78-
let mut exited = Vec::new();
79-
let mut table = CMD_PROCESS_TABLE.write().unwrap();
80-
for tuple in table.iter_mut() {
81-
let mut proc = tuple.1.lock().unwrap();
82-
match proc.try_wait() {
83-
// 正常运行
84-
Ok(None) => {}
85-
// 停止运行,从表中删除数据
86-
_ => {
87-
// TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行
88-
// 后续应该添加机制来执行服务相关命令启动失败的回调
89-
exited.push(*tuple.0);
83+
if SIGCHILD_SIGNAL_RECEIVED
84+
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
85+
.is_ok()
86+
{
87+
let mut exited = Vec::new();
88+
let mut table = CMD_PROCESS_TABLE.write().unwrap();
89+
90+
for tuple in table.iter_mut() {
91+
let pid = Pid::from_raw(tuple.1.lock().unwrap().id() as i32);
92+
match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) {
93+
// 若 cmd 停止运行,则将其加入退出列表
94+
Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => {
95+
eprintln!("cmd exited");
96+
exited.push(*tuple.0);
97+
}
98+
Ok(_) => {}
99+
Err(_) => {
100+
// TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行
101+
// 后续应该添加机制来执行服务相关命令启动失败的回调
102+
eprintln!("cmd waitpid error");
103+
}
90104
}
91105
}
92-
}
93106

94-
for id in exited {
95-
table.remove(&id);
107+
for id in exited {
108+
table.remove(&id);
109+
}
96110
}
97111
}
98112
}

src/systemctl/listener/mod.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
1+
use super::ctl_parser::{CommandOperation, CtlParser, Pattern};
2+
use super::{ctl_path, DRAGON_REACH_CTL_PIPE};
3+
use crate::error::ErrorFormat;
4+
use crate::manager::ctl_manager::CtlManager;
5+
use lazy_static::lazy_static;
16
use std::fs::{self, File};
27
use std::io::Read;
38
use std::os::fd::FromRawFd;
49
use std::sync::{Arc, Mutex};
5-
6-
use lazy_static::lazy_static;
7-
8-
use crate::error::ErrorFormat;
9-
use crate::manager::ctl_manager::CtlManager;
10-
11-
use super::ctl_parser::{CommandOperation, CtlParser, Pattern};
12-
use super::{ctl_path, DRAGON_REACH_CTL_PIPE};
10+
use std::thread;
11+
use std::time::Duration;
1312

1413
lazy_static! {
1514
static ref CTL_READER: Mutex<Arc<File>> = {
1615
let file = Systemctl::init_listener();
1716
Mutex::new(Arc::new(file))
1817
};
1918
}
20-
19+
#[derive(Debug)]
2120
pub struct Command {
2221
pub(crate) operation: CommandOperation,
2322
pub(crate) args: Option<Vec<String>>,
@@ -68,18 +67,27 @@ impl Systemctl {
6867
/// 持续从系统服务控制管道中读取命令。
6968
///
7069
pub fn ctl_listen() {
70+
println!("ctl listen");
7171
let mut guard = CTL_READER.lock().unwrap();
7272
let mut s = String::new();
73-
if let Ok(size) = guard.read_to_string(&mut s) {
74-
if size == 0 {
75-
return;
76-
}
77-
match CtlParser::parse_ctl(&s) {
78-
Ok(cmd) => {
79-
let _ = CtlManager::exec_ctl(cmd);
73+
loop {
74+
s.clear();
75+
match guard.read_to_string(&mut s) {
76+
Ok(size) if size > 0 => match CtlParser::parse_ctl(&s) {
77+
Ok(cmd) => {
78+
let _ = CtlManager::exec_ctl(cmd);
79+
}
80+
Err(e) => {
81+
eprintln!("Failed to parse command: {}", e.error_format());
82+
}
83+
},
84+
Ok(_) => {
85+
// 如果读取到的大小为0,说明没有数据可读,适当休眠
86+
thread::sleep(Duration::from_millis(100));
8087
}
81-
Err(err) => {
82-
eprintln!("parse tcl command error: {}", err.error_format());
88+
Err(e) => {
89+
eprintln!("Failed to read from pipe: {}", e);
90+
break;
8391
}
8492
}
8593
}

src/unit/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ use crate::parse::parse_util::UnitParseUtil;
1212
use crate::parse::Segment;
1313

1414
pub mod service;
15+
pub mod signal;
1516
pub mod target;
1617
pub mod timer;
17-
1818
use self::target::TargetUnit;
1919

2020
pub fn generate_unit_id() -> usize {

src/unit/signal.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use nix::sys::signal::{self, SigHandler, Signal};
2+
use std::sync::atomic::{AtomicBool, Ordering};
3+
4+
pub static SIGCHILD_SIGNAL_RECEIVED: AtomicBool = AtomicBool::new(false);
5+
6+
extern "C" fn handle_sigchld(_: libc::c_int) {
7+
SIGCHILD_SIGNAL_RECEIVED.store(true, Ordering::SeqCst);
8+
}
9+
10+
pub fn init_signal_handler() {
11+
unsafe {
12+
signal::signal(Signal::SIGCHLD, SigHandler::Handler(handle_sigchld))
13+
.expect("Error setting SIGUSR1 handler");
14+
}
15+
}

0 commit comments

Comments
 (0)