Skip to content

Commit a714876

Browse files
committed
vmm: fix stdout/stderr preemption
We introduced a preemptable receiver feature so that the new start containerd can preempt the old stdio streaming process routine. but it has a bug that when io copy thread finished, it will remove the io channel, which has a preemption signal sender in it, the removal of the sender may preempt the stream receiver to stop early, without all message processed. After this PR change, after copy of stdout and stderr finished, we don't call remove of the iochannel, but just set a flag of sender_closed to true. Before stream handle return, it just determin if the io channel should be removed by this flag. Signed-off-by: Abel Feng <fshb1988@gmail.com>
1 parent 4fb4e83 commit a714876

File tree

2 files changed

+40
-13
lines changed

2 files changed

+40
-13
lines changed

vmm/task/src/io.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use tokio_vsock::{VsockListener, VsockStream};
5353

5454
use crate::{
5555
device::SYSTEM_DEV_PATH,
56-
streaming::{get_output, get_stdin, remove_channel},
56+
streaming::{close_stdout, get_output, get_stdin, remove_channel},
5757
vsock,
5858
};
5959

@@ -281,7 +281,7 @@ where
281281
};
282282
copy(src, dst, exit_signal, on_close).await;
283283
if to.contains(STREAMING) {
284-
remove_channel(&to).await.unwrap_or_default();
284+
close_stdout(&to).await.unwrap_or_default();
285285
}
286286
debug!("finished copy io from container to {}", to);
287287
});

vmm/task/src/streaming.rs

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub struct IOChannel {
8080
remaining_data: Option<Any>,
8181
preemption_sender: Option<Sender<()>>,
8282
notifier: Arc<Notify>,
83+
sender_closed: bool,
8384
}
8485

8586
pub struct PreemptableReceiver {
@@ -116,6 +117,7 @@ impl IOChannel {
116117
remaining_data: None,
117118
preemption_sender: None,
118119
notifier: Arc::new(Notify::new()),
120+
sender_closed: false,
119121
}
120122
}
121123

@@ -135,10 +137,21 @@ impl IOChannel {
135137
None
136138
}
137139

138-
fn return_preempted_receiver(&mut self, r: PreemptableReceiver, remaining_data: Option<Any>) {
139-
self.receiver = Some(r.receiver);
140-
self.remaining_data = remaining_data;
141-
self.notifier.notify_one();
140+
fn return_preempted_receiver(
141+
&mut self,
142+
r: PreemptableReceiver,
143+
remaining_data: Option<Any>,
144+
) -> bool {
145+
// only return the receiver when sender is already moved to the io thread, and sender is not closed
146+
if let None = self.sender {
147+
if !self.sender_closed {
148+
self.receiver = Some(r.receiver);
149+
self.remaining_data = remaining_data;
150+
self.notifier.notify_one();
151+
return true;
152+
}
153+
}
154+
return false;
142155
}
143156
}
144157

@@ -214,11 +227,17 @@ impl Service {
214227
remaining_data: Option<Any>,
215228
) {
216229
let mut ios = self.ios.lock().await;
230+
let mut channel_used = false;
217231
if let Some(ch) = ios.get_mut(id) {
218-
ch.return_preempted_receiver(r, remaining_data);
232+
if ch.return_preempted_receiver(r, remaining_data) {
233+
channel_used = true;
234+
}
219235
} else {
220236
warn!("io channel removed when return the receiver");
221237
}
238+
if !channel_used {
239+
self.ios.lock().await.remove(id);
240+
}
222241
}
223242

224243
async fn get_remaining_data(&self, id: &str) -> Option<Any> {
@@ -283,18 +302,14 @@ impl Service {
283302
Ok(d) => d,
284303
Err(e) => {
285304
debug!("failed to marshal update of stream {}, {}", stream_id, e);
286-
if let Some(c) = self.ios.lock().await.get_mut(stream_id) {
287-
c.sender = Some(sender);
288-
}
305+
self.ios.lock().await.remove(stream_id);
289306
return Err(ttrpc::Error::Others(format!("failed to write data {}", e)));
290307
}
291308
};
292309
let a = new_any!(WindowUpdate, update_bytes);
293310
if let Err(e) = stream.send(&a).await {
294311
debug!("failed to send update of stream {}, {}", stream_id, e);
295-
if let Some(c) = self.ios.lock().await.get_mut(stream_id) {
296-
c.sender = Some(sender);
297-
}
312+
self.ios.lock().await.remove(stream_id);
298313
return Err(e);
299314
}
300315
window += WINDOW_SIZE;
@@ -310,6 +325,7 @@ impl Service {
310325
};
311326
let len: i32 = data_bytes.len().try_into().unwrap_or_default();
312327
if let Err(e) = sender.send(data_bytes).await {
328+
self.ios.lock().await.remove(stream_id);
313329
return Err(ttrpc::Error::Others(format!("failed to send data {}", e)));
314330
}
315331
window -= len;
@@ -327,6 +343,7 @@ impl Service {
327343
stream_id: &String,
328344
stream: ServerStream<Any, Any>,
329345
) -> ttrpc::Result<()> {
346+
// TODO the stream needs to return if client send close.
330347
let mut receiver = self.preempt_receiver(stream_id).await?;
331348
if let Some(a) = self.get_remaining_data(stream_id).await {
332349
if let Err(e) = stream.send(&a).await {
@@ -348,6 +365,7 @@ impl Service {
348365
match r {
349366
Some(d) => {
350367
if d.is_empty() {
368+
self.ios.lock().await.remove(stream_id);
351369
return Ok(());
352370
}
353371
let mut data = Data::new();
@@ -376,6 +394,7 @@ impl Service {
376394
};
377395
}
378396
None => {
397+
self.ios.lock().await.remove(stream_id);
379398
return Ok(());
380399
}
381400
}
@@ -394,6 +413,14 @@ pub async fn remove_channel(url: &str) -> containerd_shim::Result<()> {
394413
Ok(())
395414
}
396415

416+
pub async fn close_stdout(url: &str) -> containerd_shim::Result<()> {
417+
let id = get_id(url)?;
418+
if let Some(ch) = STREAMING_SERVICE.ios.lock().await.get_mut(id) {
419+
ch.sender_closed = true;
420+
}
421+
Ok(())
422+
}
423+
397424
pub async fn get_output(url: &str) -> containerd_shim::Result<StreamingOutput> {
398425
let id = get_id(url)?;
399426
STREAMING_SERVICE.get_output(id).await

0 commit comments

Comments
 (0)