Skip to content
10 changes: 5 additions & 5 deletions chaos-tproxy-controller/src/cmd/command_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ pub struct Opt {
#[structopt(name = "FILE", parse(from_os_str))]
pub input: Option<PathBuf>,

/// Allows applying json config by stdin/stdout
#[structopt(short, long)]
pub interactive: bool,

// The number of occurrences of the `v/verbose` flag
/// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))]
Expand All @@ -33,6 +29,10 @@ pub struct Opt {
/// ipc path for sub proxy.
#[structopt(long)]
pub ipc_path: Option<PathBuf>,

/// ipc path to communicate with chaos-mesh.
#[structopt(long = "interactive-path")]
pub interactive_path: Option<PathBuf>,
}

impl Opt {
Expand All @@ -50,7 +50,7 @@ impl Opt {
}

fn checked(self) -> Result<Self> {
if !self.interactive && !self.proxy && self.input.is_none() {
if self.interactive_path.is_none() && !self.proxy && self.input.is_none() {
return Err(anyhow!("config file is required when interactive mode and daemon mode is all disabled, use `-h | --help` for more details"));
}
Ok(self)
Expand Down
30 changes: 19 additions & 11 deletions chaos-tproxy-controller/src/cmd/interactive/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ use std::task::{Context, Poll};
use anyhow::Error;
use futures::TryStreamExt;
use http::{Method, Request, Response, StatusCode};
use hyper::server::conn::{Connection, Http};
use hyper::server::conn::{Http};
use hyper::service::Service;
use hyper::Body;
use tokio::select;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::instrument;
use std::path::PathBuf;

use crate::cmd::interactive::stdio::StdStream;
use tokio::net::{UnixListener};
#[cfg(unix)]
use crate::proxy::config::Config;
use crate::proxy::exec::Proxy;
use crate::raw_config::RawConfig;
Expand All @@ -40,24 +42,30 @@ impl ConfigServer {
}
}

pub fn serve_interactive(&mut self) {
pub fn serve_interactive(&mut self, interactive_path: PathBuf) {
let mut rx = self.rx.take().unwrap();
let mut service = ConfigService(self.proxy.clone());

self.task = Some(tokio::spawn(async move {
let rx_mut = &mut rx;
let rx_mut = &mut rx;
tracing::info!("ConfigServer listener try binding {:?}", interactive_path);
let unix_listener = UnixListener::bind(interactive_path).unwrap();

loop {
let stream = StdStream::default();
let mut conn = Http::new().serve_connection(stream, &mut service);
let conn_mut = &mut conn;
select! {
_ = &mut *rx_mut => {
tracing::trace!("catch signal in config server.");
Connection::graceful_shutdown(Pin::new(conn_mut));
return Ok(());
},
ret = &mut *conn_mut => if let Err(e) = ret {
tracing::error!("{}",e);
}
stream = unix_listener.accept() => {
let (stream, _) = stream.unwrap();

let http = Http::new();
let conn = http.serve_connection(stream, &mut service);
if let Err(e) = conn.await {
tracing::error!("{}",e);
}
},
};
}
}));
Expand Down
9 changes: 6 additions & 3 deletions chaos-tproxy-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ async fn main() -> anyhow::Result<()> {
return Ok(());
}

if opt.interactive {
if opt.interactive_path.is_some() {
let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await);
config_server.serve_interactive();
config_server.serve_interactive(opt.interactive_path.clone().unwrap());

let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?;
signals.wait().await?;
// Currently we cannot graceful shutdown the config server.
config_server.stop().await?;

// Currently we cannot graceful shutdown the config server.
// delete the unix socket file
std::fs::remove_file(opt.interactive_path.clone().unwrap())?;

exit(0);
}
Ok(())
Expand Down