Skip to content
10 changes: 5 additions & 5 deletions chaos-tproxy-controller/src/cmd/command_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ pub struct Opt {
#[structopt(name = "FILE", parse(from_os_str))]
pub input: Option<PathBuf>,

/// Allows applying json config by stdin/stdout
#[structopt(short, long)]
pub interactive: bool,

// The number of occurrences of the `v/verbose` flag
/// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))]
Expand All @@ -33,6 +29,10 @@ pub struct Opt {
/// ipc path for sub proxy.
#[structopt(long)]
pub ipc_path: Option<PathBuf>,

/// ipc path to communicate with chaos-mesh.
#[structopt(long = "interactive-path")]
pub interactive_path: Option<PathBuf>,
}

impl Opt {
Expand All @@ -50,7 +50,7 @@ impl Opt {
}

fn checked(self) -> Result<Self> {
if !self.interactive && !self.proxy && self.input.is_none() {
if self.interactive_path.is_none() && !self.proxy && self.input.is_none() {
return Err(anyhow!("config file is required when interactive mode and daemon mode is all disabled, use `-h | --help` for more details"));
}
Ok(self)
Expand Down
30 changes: 19 additions & 11 deletions chaos-tproxy-controller/src/cmd/interactive/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ use std::task::{Context, Poll};
use anyhow::Error;
use futures::TryStreamExt;
use http::{Method, Request, Response, StatusCode};
use hyper::server::conn::{Connection, Http};
use hyper::server::conn::{Http};
use hyper::service::Service;
use hyper::Body;
use tokio::select;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::instrument;
use std::path::PathBuf;

use crate::cmd::interactive::stdio::StdStream;
use tokio::net::{UnixListener};
#[cfg(unix)]
use crate::proxy::config::Config;
use crate::proxy::exec::Proxy;
use crate::raw_config::RawConfig;
Expand All @@ -40,24 +42,30 @@ impl ConfigServer {
}
}

pub fn serve_interactive(&mut self) {
pub fn serve_interactive(&mut self, interactive_path: PathBuf) {
let mut rx = self.rx.take().unwrap();
let mut service = ConfigService(self.proxy.clone());

self.task = Some(tokio::spawn(async move {
let rx_mut = &mut rx;
let rx_mut = &mut rx;
tracing::info!("ConfigServer listener try binding {:?}", interactive_path);
let unix_listener = UnixListener::bind(interactive_path).unwrap();

loop {
let stream = StdStream::default();
let mut conn = Http::new().serve_connection(stream, &mut service);
let conn_mut = &mut conn;
select! {
_ = &mut *rx_mut => {
tracing::trace!("catch signal in config server.");
Connection::graceful_shutdown(Pin::new(conn_mut));
return Ok(());
},
ret = &mut *conn_mut => if let Err(e) = ret {
tracing::error!("{}",e);
}
stream = unix_listener.accept() => {
let (stream, _) = stream.unwrap();

let http = Http::new();
let conn = http.serve_connection(stream, &mut service);
if let Err(e) = conn.await {
tracing::error!("{}",e);
}
},
};
}
}));
Expand Down
9 changes: 5 additions & 4 deletions chaos-tproxy-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::signal::unix::SignalKind;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};
use std::path::PathBuf;

use crate::cmd::command_line::{get_config_from_opt, Opt};
use crate::cmd::interactive::handler::ConfigServer;
Expand Down Expand Up @@ -38,17 +39,17 @@ async fn main() -> anyhow::Result<()> {
let cfg = get_config_from_opt(&opt).await?;
let mut proxy = Proxy::new(opt.verbose).await;
proxy.reload(cfg.proxy_config).await?;
let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?;
let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], PathBuf::new())?;
signals.wait().await?;
proxy.stop().await?;
return Ok(());
}

if opt.interactive {
if opt.interactive_path.is_some() {
let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await);
config_server.serve_interactive();
config_server.serve_interactive(opt.interactive_path.clone().unwrap());

let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?;
let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], opt.interactive_path.clone().unwrap())?;
signals.wait().await?;
config_server.stop().await?;

Expand Down
2 changes: 1 addition & 1 deletion chaos-tproxy-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn proxy_main(path: PathBuf) -> anyhow::Result<()> {
server.serve(rx).await.unwrap();
});

let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?;
let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], PathBuf::new())?;
signals.wait().await?;

let _ = sender.send(());
Expand Down
15 changes: 12 additions & 3 deletions chaos-tproxy-proxy/src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use futures::future::select_all;
use tokio::signal::unix::{signal, Signal, SignalKind};
use std::path::PathBuf;

pub struct Signals(Vec<Signal>);
pub struct Signals
{
pub signals :Vec<Signal>,
interactive_path: PathBuf,
}

impl Signals {
pub fn from_kinds<'a>(
kinds: impl 'a + IntoIterator<Item = &'a SignalKind>,
interactive_path: PathBuf,
) -> anyhow::Result<Self> {
let signals = kinds
.into_iter()
.map(|kind| signal(*kind))
.collect::<Result<Vec<_>, _>>()?;
Ok(Self(signals))
Ok(Signals{signals, interactive_path})
}

pub async fn wait(&mut self) -> anyhow::Result<()> {
select_all(self.0.iter_mut().map(|sig| Box::pin(sig.recv()))).await;
select_all(self.signals.iter_mut().map(|sig| Box::pin(sig.recv()))).await;
if self.interactive_path != PathBuf::new() {
std::fs::remove_file(self.interactive_path.clone())?;
}
Ok(())
}
}