Skip to content
10 changes: 5 additions & 5 deletions chaos-tproxy-controller/src/cmd/command_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ pub struct Opt {
#[structopt(name = "FILE", parse(from_os_str))]
pub input: Option<PathBuf>,

/// Allows applying json config by stdin/stdout
#[structopt(short, long)]
pub interactive: bool,

// The number of occurrences of the `v/verbose` flag
/// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))]
Expand All @@ -33,6 +29,10 @@ pub struct Opt {
/// ipc path for sub proxy.
#[structopt(long)]
pub ipc_path: Option<PathBuf>,

/// ipc path to communicate with chaos-mesh.
#[structopt(long = "interactive-path")]
pub interactive_path: Option<PathBuf>,
}

impl Opt {
Expand All @@ -50,7 +50,7 @@ impl Opt {
}

fn checked(self) -> Result<Self> {
if !self.interactive && !self.proxy && self.input.is_none() {
if self.interactive_path.is_none() && !self.proxy && self.input.is_none() {
return Err(anyhow!("config file is required when interactive mode and daemon mode is all disabled, use `-h | --help` for more details"));
}
Ok(self)
Expand Down
34 changes: 22 additions & 12 deletions chaos-tproxy-controller/src/cmd/interactive/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryInto;
use std::future::Future;
use std::path::PathBuf;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -8,16 +9,17 @@ use std::task::{Context, Poll};
use anyhow::Error;
use futures::TryStreamExt;
use http::{Method, Request, Response, StatusCode};
use hyper::server::conn::{Connection, Http};
use hyper::server::conn::Http;
use hyper::service::Service;
use hyper::Body;
use tokio::net::UnixListener;
use tokio::select;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::instrument;

use crate::cmd::interactive::stdio::StdStream;
#[cfg(unix)]
use crate::proxy::config::Config;
use crate::proxy::exec::Proxy;
use crate::raw_config::RawConfig;
Expand All @@ -41,24 +43,32 @@ impl ConfigServer {
}
}

pub fn serve_interactive(&mut self) {
pub fn serve_interactive(&mut self, interactive_path: PathBuf) {
let mut rx = self.rx.take().unwrap();
let mut service = ConfigService(self.proxy.clone());
self.task = Some(tokio::spawn(async move {
let proxy = self.proxy.clone();
self.task = Some(tokio::task::spawn(async move {
let rx_mut = &mut rx;
tracing::info!("ConfigServer listener try binding {:?}", interactive_path);
let unix_listener = UnixListener::bind(interactive_path).unwrap();

loop {
let stream = StdStream::default();
let mut conn = Http::new().serve_connection(stream, &mut service);
let conn_mut = &mut conn;
let mut service = ConfigService(proxy.clone());
select! {
_ = &mut *rx_mut => {
tracing::trace!("catch signal in config server.");
Connection::graceful_shutdown(Pin::new(conn_mut));
return Ok(());
},
ret = &mut *conn_mut => if let Err(e) = ret {
tracing::error!("{}",e);
}
stream = unix_listener.accept() => {
tokio::task::spawn(async move {
let (stream, _) = stream.unwrap();

let http = Http::new();
let conn = http.serve_connection(stream, &mut service);
if let Err(e) = conn.await {
tracing::error!("{}",e);
}
});
},
};
}
}));
Expand Down
9 changes: 6 additions & 3 deletions chaos-tproxy-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ async fn main() -> anyhow::Result<()> {
return Ok(());
}

if opt.interactive {
if let Some(path) = opt.interactive_path {
let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await);
config_server.serve_interactive();
config_server.serve_interactive(path.clone());

let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?;
signals.wait().await?;
// Currently we cannot graceful shutdown the config server.
config_server.stop().await?;

// Currently we cannot graceful shutdown the config server.
// delete the unix socket file
std::fs::remove_file(path.clone())?;

exit(0);
}
Ok(())
Expand Down