11use super :: { Command , Output } ;
22use crate :: {
3- CacheParts ,
3+ CacheParts , SessionId ,
44 batch:: { BatchStrategy , DefaultStrategy , Round , SessionStub } ,
55} ;
6- use std:: sync:: mpsc:: { Receiver , Sender , TryRecvError } ;
6+ use std:: {
7+ collections:: BTreeSet ,
8+ sync:: mpsc:: { Receiver , Sender , TryRecvError } ,
9+ } ;
710
811pub ( super ) struct EngineManager ( DefaultStrategy < CacheParts > ) ;
912
@@ -26,12 +29,13 @@ impl EngineManager {
2629 & mut self ,
2730 commands : & Receiver < Command > ,
2831 outputs : & Sender < Output > ,
29- ) -> Result < ( ) , E > {
32+ ) -> Result < BTreeSet < SessionId > , E > {
33+ let mut removed = BTreeSet :: new ( ) ;
3034 loop {
3135 // 总是尝试进行非阻塞接收
3236 loop {
3337 match commands. try_recv ( ) {
34- Ok ( cmd) => self . apply ( cmd, outputs) ?,
38+ Ok ( cmd) => self . apply ( cmd, outputs, & mut removed ) ?,
3539 Err ( TryRecvError :: Disconnected ) => return Err ( E :: ReceiveError ) ,
3640 Err ( TryRecvError :: Empty ) => break ,
3741 }
@@ -40,14 +44,15 @@ impl EngineManager {
4044 if self . 0 . is_empty ( ) {
4145 // 也没有待处理的任务,阻塞等待
4246 match commands. recv ( ) {
43- Ok ( cmd) => self . apply ( cmd, outputs) ?,
44- Err ( _) => break Err ( E :: ReceiveError ) ,
47+ Ok ( cmd) => self . apply ( cmd, outputs, & mut removed ) ?,
48+ Err ( _) => return Err ( E :: ReceiveError ) ,
4549 }
4650 } else {
4751 // 有待处理的任务,退出循环
48- break Ok ( ( ) ) ;
52+ break ;
4953 }
5054 }
55+ Ok ( removed)
5156 }
5257
5358 /// 准备推理
@@ -59,22 +64,26 @@ impl EngineManager {
5964 self . 0 . take_stubs ( )
6065 }
6166
62- fn apply ( & mut self , cmd : Command , outputs : & Sender < Output > ) -> Result < ( ) , CommandError > {
67+ fn apply (
68+ & mut self ,
69+ cmd : Command ,
70+ outputs : & Sender < Output > ,
71+ removed : & mut BTreeSet < SessionId > ,
72+ ) -> Result < ( ) , CommandError > {
6373 match cmd {
6474 Command :: ShutDown => Err ( CommandError :: ShutDown ) ,
6575 Command :: Insert ( req) => {
6676 self . 0 . insert ( req. into_stub ( ) ) ;
6777 Ok ( ( ) )
6878 }
6979 Command :: Remove ( id) => {
70- if self
71- . 0
72- . remove ( & id)
73- . is_none_or ( |stub| outputs. send ( Output :: Removed ( stub. session ) ) . is_ok ( ) )
74- {
75- Ok ( ( ) )
80+ if let Some ( stub) = self . 0 . remove ( & id) {
81+ removed. insert ( stub. session . id ) ;
82+ outputs
83+ . send ( Output :: Removed ( stub. session ) )
84+ . map_err ( |_| CommandError :: SendError )
7685 } else {
77- Err ( CommandError :: SendError )
86+ Ok ( ( ) )
7887 }
7988 }
8089 }
0 commit comments