Skip to content

Commit b08d11c

Browse files
committed
Process stderr from test-proxy differently
Resolves #2715. On Windows, listening on stderr was keeping the process alive and not shutting down immediately after all test processes were done and the proxy dropped.
1 parent 2e2c53c commit b08d11c

File tree

1 file changed

+63
-20
lines changed
  • sdk/core/azure_core_test/src/proxy

1 file changed

+63
-20
lines changed

sdk/core/azure_core_test/src/proxy/mod.rs

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ use serde::Serializer;
2828
#[cfg(not(target_arch = "wasm32"))]
2929
use std::process::ExitStatus;
3030
use std::{fmt, str::FromStr, sync::Arc};
31+
use tokio::sync::Notify;
3132
#[cfg(not(target_arch = "wasm32"))]
32-
use tokio::process::Child;
33+
use tokio::{io::Lines, process::Child};
3334

3435
const ABSTRACTION_IDENTIFIER: HeaderName = HeaderName::from_static("x-abstraction-identifier");
3536
const RECORDING_ID: HeaderName = HeaderName::from_static("x-recording-id");
@@ -45,6 +46,8 @@ pub use bootstrap::start;
4546
pub struct Proxy {
4647
#[cfg(not(target_arch = "wasm32"))]
4748
command: Option<Child>,
49+
#[cfg(not(target_arch = "wasm32"))]
50+
done: Option<Arc<Notify>>,
4851
endpoint: Option<Url>,
4952
client: Option<Client>,
5053
}
@@ -76,34 +79,57 @@ impl Proxy {
7679
)
7780
})?;
7881

79-
let mut stdout = command
80-
.stdout
81-
.take()
82-
.ok_or_else(|| azure_core::Error::message(ErrorKind::Io, "no stdout pipe"))?;
82+
let mut stdout = BufReader::new(
83+
command
84+
.stdout
85+
.take()
86+
.ok_or_else(|| azure_core::Error::message(ErrorKind::Io, "no stdout pipe"))?,
87+
)
88+
.lines();
8389
// Take stderr now but we won't listen until after start up, such that messages should buffer.
84-
let mut stderr = command
85-
.stderr
86-
.take()
87-
.ok_or_else(|| azure_core::Error::message(ErrorKind::Io, "no stderr pipe"))?;
90+
// let mut stderr = BufReader::new(
91+
// command
92+
// .stderr
93+
// .take()
94+
// .ok_or_else(|| azure_core::Error::message(ErrorKind::Io, "no stderr pipe"))?,
95+
// )
96+
// .lines();
97+
98+
let done = Arc::new(Notify::new());
8899
self.command = Some(command);
100+
self.done = Some(done.clone());
89101

90102
// Wait until the service is listening on a port.
91103
self.wait_till_listening(&mut stdout).await?;
92104

93105
// Then spawn a thread to keep pumping messages to stdout and stderr.
94106
// The pipe will be closed when the process is shut down, which will terminate the task.
95-
tokio::spawn(async move {
96-
let mut reader = BufReader::new(&mut stdout).lines();
97-
while let Some(line) = reader.next_line().await.unwrap_or(None) {
107+
let mut stdout_task = tokio::spawn(async move {
108+
while let Some(line) = stdout.next_line().await.unwrap_or(None) {
98109
// Trace useful lines that test-proxy writes to stdout.
99110
trace_line(Level::TRACE, &line);
100111
}
101112
});
113+
// let mut stderr_task = tokio::spawn(async move {
114+
// while let Some(line) = stderr.next_line().await.unwrap_or(None) {
115+
// // Trace useful lines that test-proxy writes to stderr.
116+
// trace_line(Level::TRACE, &line);
117+
// }
118+
// });
102119
tokio::spawn(async move {
103-
let mut reader = BufReader::new(&mut stderr).lines();
104-
while let Some(line) = reader.next_line().await.unwrap_or(None) {
105-
// Trace useful lines that test-proxy writes to stdout.
106-
trace_line(Level::ERROR, &line);
120+
loop {
121+
tokio::select! {
122+
_ = &mut stdout_task => {
123+
eprintln!("stdout");
124+
}
125+
// _ = &mut stderr_task => {
126+
// eprintln!("stderr");
127+
// },
128+
_ = done.notified() => {
129+
eprintln!("done");
130+
break;
131+
},
132+
};
107133
}
108134
});
109135

@@ -138,6 +164,8 @@ impl Proxy {
138164
///
139165
/// Waits until the process is killed.
140166
pub async fn stop(&mut self) -> Result<()> {
167+
tracing::warn!("stopping");
168+
self.done();
141169
if let Some(command) = &mut self.command {
142170
tracing::debug!(pid = ?command.id(), "stopping");
143171
return Ok(command.kill().await?);
@@ -147,16 +175,20 @@ impl Proxy {
147175

148176
/// Waits for the Test Proxy service to exit, return the process exit code when completed.
149177
pub async fn wait(&mut self) -> Result<ExitStatus> {
178+
tracing::warn!("waiting");
179+
self.done();
150180
if let Some(command) = &mut self.command {
151181
return Ok(command.wait().await?);
152182
}
153183
Ok(ExitStatus::default())
154184
}
155185

156-
async fn wait_till_listening(&mut self, stdout: &mut ChildStdout) -> Result<()> {
186+
async fn wait_till_listening(
187+
&mut self,
188+
stdout: &mut Lines<BufReader<ChildStdout>>,
189+
) -> Result<()> {
157190
let pid = self.command.as_ref().and_then(Child::id);
158-
let mut reader = BufReader::new(stdout).lines();
159-
while let Some(line) = reader.next_line().await? {
191+
while let Some(line) = stdout.next_line().await? {
160192
const RUNNING_PATTERN: &str = "Running proxy version is Azure.Sdk.Tools.TestProxy ";
161193
const LISTENING_PATTERN: &str = "Now listening on: ";
162194

@@ -192,6 +224,13 @@ impl Proxy {
192224

193225
Ok(())
194226
}
227+
228+
fn done(&self) {
229+
if let Some(done) = &self.done {
230+
eprintln!("signalling done");
231+
done.notify_one();
232+
}
233+
}
195234
}
196235

197236
impl Proxy {
@@ -201,6 +240,8 @@ impl Proxy {
201240
Ok(Self {
202241
#[cfg(not(target_arch = "wasm32"))]
203242
command: None,
243+
#[cfg(not(target_arch = "wasm32"))]
244+
done: None,
204245
endpoint: Some(endpoint.clone()),
205246
client: Some(Client::new(endpoint)?),
206247
})
@@ -221,6 +262,8 @@ impl Drop for Proxy {
221262
///
222263
/// Does not wait until the process is killed.
223264
fn drop(&mut self) {
265+
eprintln!("dropping Proxy");
266+
self.done();
224267
if let Some(command) = &mut self.command {
225268
let _ = command.start_kill();
226269
}
@@ -273,7 +316,7 @@ impl Default for ProxyOptions {
273316
Self {
274317
auto: true,
275318
insecure: false,
276-
auto_shutdown_in_seconds: 300,
319+
auto_shutdown_in_seconds: 10,
277320
}
278321
}
279322
}

0 commit comments

Comments
 (0)