@@ -13,7 +13,7 @@ use log::{debug, trace};
1313use metrics:: {
1414 register_meter_with_group, Gauge , GaugeUsize , Histogram , Meter , Sample ,
1515} ;
16- use mio:: { tcp :: TcpStream , Poll , PollOpt , Ready , Token } ;
16+ use mio:: { net :: TcpStream , Interest , Registry , Token } ;
1717use priority_send_queue:: { PrioritySendQueue , SendQueuePriority } ;
1818use serde:: Deserialize ;
1919use serde_derive:: Serialize ;
@@ -183,7 +183,7 @@ pub struct GenericConnection<Socket: GenericSocket> {
183183 /// Sending packet.
184184 sending_packet : Option < Packet > ,
185185 /// Event flags this connection interested
186- interest : Ready ,
186+ interest : Interest ,
187187 /// Registered flag
188188 registered : AtomicBool ,
189189 /// Assemble packet with extra information before sending out.
@@ -324,7 +324,10 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
324324 let status = self . write_next_from_queue ( ) ?;
325325
326326 if self . sending_packet . is_none ( ) && self . send_queue . is_empty ( ) {
327- self . interest . remove ( Ready :: writable ( ) ) ;
327+ self . interest = self
328+ . interest
329+ . remove ( Interest :: WRITABLE )
330+ . expect ( "Interest::WRITABLE should not be empty" ) ;
328331 }
329332 NETWORK_SEND_QUEUE_SIZE . update ( self . send_queue . len ( ) ) ;
330333 io. update_registration ( self . token ) ?;
@@ -361,7 +364,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
361364 }
362365
363366 if !self . interest . is_writable ( ) {
364- self . interest . insert ( Ready :: writable ( ) ) ;
367+ self . interest = self . interest . add ( Interest :: WRITABLE ) ;
365368 }
366369 io. update_registration ( self . token ) . ok ( ) ;
367370 }
@@ -384,15 +387,18 @@ impl Connection {
384387 recv_buf : BytesMut :: new ( ) ,
385388 send_queue : PrioritySendQueue :: default ( ) ,
386389 sending_packet : None ,
387- interest : Ready :: hup ( ) | Ready :: readable ( ) ,
390+ interest : Interest :: READABLE , /* previously(mio0.6) use
391+ * Ready::hup() |
392+ * Ready::readable(), Ready::hub()
393+ * is removed from 0.7 */
388394 registered : AtomicBool :: new ( false ) ,
389395 assembler : Box :: new ( PacketWithLenAssembler :: default ( ) ) ,
390396 }
391397 }
392398
393399 /// Register this connection with the IO event loop.
394400 pub fn register_socket (
395- & self , reg : Token , event_loop : & Poll ,
401+ & mut self , reg : Token , poll_registry : & Registry ,
396402 ) -> io:: Result < ( ) > {
397403 if self . registered . load ( AtomicOrdering :: SeqCst ) {
398404 return Ok ( ( ) ) ;
@@ -402,12 +408,9 @@ impl Connection {
402408 self . token,
403409 reg
404410 ) ;
405- if let Err ( e) = event_loop. register (
406- & self . socket ,
407- reg,
408- self . interest ,
409- PollOpt :: edge ( ) ,
410- ) {
411+ if let Err ( e) =
412+ poll_registry. register ( & mut self . socket , reg, self . interest )
413+ {
411414 trace ! (
412415 "Failed to register socket, token = {}, reg = {:?}, err = {:?}" ,
413416 self . token,
@@ -422,18 +425,18 @@ impl Connection {
422425 /// Update connection registration. Should be called at the end of the IO
423426 /// handler.
424427 pub fn update_socket (
425- & self , reg : Token , event_loop : & Poll ,
428+ & mut self , reg : Token , poll_registry : & Registry ,
426429 ) -> io:: Result < ( ) > {
427430 trace ! (
428431 "Connection reregister, token = {}, reg = {:?}" ,
429432 self . token,
430433 reg
431434 ) ;
432435 if !self . registered . load ( AtomicOrdering :: SeqCst ) {
433- self . register_socket ( reg, event_loop )
436+ self . register_socket ( reg, poll_registry )
434437 } else {
435- event_loop
436- . reregister ( & self . socket , reg, self . interest , PollOpt :: edge ( ) )
438+ poll_registry
439+ . reregister ( & mut self . socket , reg, self . interest )
437440 . unwrap_or_else ( |e| {
438441 trace ! ( "Failed to reregister socket, token = {}, reg = {:?}, err = {:?}" , self . token, reg, e) ;
439442 } ) ;
@@ -443,9 +446,11 @@ impl Connection {
443446
444447 /// Delete connection registration. Should be called at the end of the IO
445448 /// handler.
446- pub fn deregister_socket ( & self , event_loop : & Poll ) -> io:: Result < ( ) > {
449+ pub fn deregister_socket (
450+ & mut self , poll_registry : & Registry ,
451+ ) -> io:: Result < ( ) > {
447452 trace ! ( "Connection deregister, token = {}" , self . token) ;
448- event_loop . deregister ( & self . socket ) . ok ( ) ;
453+ poll_registry . deregister ( & mut self . socket ) . ok ( ) ;
449454 Ok ( ( ) )
450455 }
451456
@@ -591,7 +596,7 @@ impl PacketAssembler for PacketWithLenAssembler {
591596mod tests {
592597 use super :: * ;
593598 use crate :: iolib:: IoChannel ;
594- use mio:: Ready ;
599+ use mio:: Interest ;
595600 use std:: {
596601 cmp,
597602 io:: { Read , Result , Write } ,
@@ -672,7 +677,7 @@ mod tests {
672677 send_queue : PrioritySendQueue :: default ( ) ,
673678 sending_packet : None ,
674679 recv_buf : BytesMut :: new ( ) ,
675- interest : Ready :: hup ( ) | Ready :: readable ( ) ,
680+ interest : Interest :: READABLE ,
676681 registered : AtomicBool :: new ( false ) ,
677682 assembler : Box :: new ( PacketWithLenAssembler :: new ( 1 , None ) ) ,
678683 }
0 commit comments