@@ -303,10 +303,6 @@ pub(crate) struct IoBufs {
303303 pub segment_accountant : Mutex < SegmentAccountant > ,
304304 pub segment_cleaner : SegmentCleaner ,
305305 deferred_segment_ops : stack:: Stack < SegmentOp > ,
306- #[ cfg( feature = "io_uring" ) ]
307- pub submission_mutex : Mutex < ( ) > ,
308- #[ cfg( feature = "io_uring" ) ]
309- pub io_uring : rio:: Rio ,
310306}
311307
312308impl Drop for IoBufs {
@@ -410,10 +406,6 @@ impl IoBufs {
410406 segment_accountant : Mutex :: new ( segment_accountant) ,
411407 segment_cleaner,
412408 deferred_segment_ops : stack:: Stack :: default ( ) ,
413- #[ cfg( feature = "io_uring" ) ]
414- submission_mutex : Mutex :: new ( ( ) ) ,
415- #[ cfg( feature = "io_uring" ) ]
416- io_uring : rio:: new ( ) ?,
417409 } )
418410 }
419411
@@ -745,90 +737,38 @@ impl IoBufs {
745737 let stored_max_stable_lsn = iobuf. stored_max_stable_lsn ;
746738
747739 io_fail ! ( self , "buffer write" ) ;
748- #[ cfg( feature = "io_uring" ) ]
749- {
750- let mut wrote = 0 ;
751- while wrote < total_len {
752- let to_write = & data[ wrote..] ;
753- let offset = log_offset + wrote as u64 ;
754-
755- // we take out this mutex to guarantee
756- // that our `Link` write operation below
757- // is serialized with the following sync.
758- // we don't put the `Rio` instance into
759- // the `Mutex` because we want to drop the
760- // `Mutex` right after beginning the async
761- // submission.
762- let link_mu = self . submission_mutex . lock ( ) ;
763-
764- // using the `Link` ordering, we specify
765- // that `io_uring` should not begin
766- // the following `sync_file_range`
767- // until the previous write is
768- // complete.
769- let wrote_completion = self . io_uring . write_at_ordered (
770- & * self . config . file ,
771- & to_write,
772- offset,
773- rio:: Ordering :: Link ,
774- ) ;
775-
776- let sync_completion = if iobuf. from_tip {
777- self . io_uring . fsync ( & * self . config . file )
778- } else {
779- self . io_uring . sync_file_range (
780- & * self . config . file ,
781- offset,
782- to_write. len ( ) ,
783- )
784- } ;
785-
786- sync_completion. wait ( ) ?;
787-
788- // TODO we want to move this above the previous `wait`
789- // but there seems to be an issue in `rio` that is
790- // triggered when multiple threads are submitting
791- // events while events from other threads are in play.
792- drop ( link_mu) ;
793-
794- wrote += wrote_completion. wait ( ) ?;
795- }
796- }
797- #[ cfg( not( feature = "io_uring" ) ) ]
798- {
799- let f = & self . config . file ;
800- pwrite_all ( f, data, log_offset) ?;
801- if !self . config . temporary {
802- if iobuf. from_tip {
803- f. sync_all ( ) ?;
804- } else if cfg ! ( not( target_os = "linux" ) ) {
805- f. sync_data ( ) ?;
806- } else {
807- #[ allow( clippy:: assertions_on_constants) ]
808- {
809- assert ! ( cfg!( target_os = "linux" ) ) ;
810- }
740+ let f = & self . config . file ;
741+ pwrite_all ( f, data, log_offset) ?;
742+ if !self . config . temporary {
743+ if iobuf. from_tip {
744+ f. sync_all ( ) ?;
745+ } else if cfg ! ( not( target_os = "linux" ) ) {
746+ f. sync_data ( ) ?;
747+ } else {
748+ #[ allow( clippy:: assertions_on_constants) ]
749+ {
750+ assert ! ( cfg!( target_os = "linux" ) ) ;
751+ }
811752
812- #[ cfg( target_os = "linux" ) ]
813- {
814- use std:: os:: unix:: io:: AsRawFd ;
815- let ret = unsafe {
816- libc:: sync_file_range (
817- f. as_raw_fd ( ) ,
818- i64:: try_from ( log_offset) . unwrap ( ) ,
819- i64:: try_from ( total_len) . unwrap ( ) ,
820- libc:: SYNC_FILE_RANGE_WAIT_BEFORE
821- | libc:: SYNC_FILE_RANGE_WRITE
822- | libc:: SYNC_FILE_RANGE_WAIT_AFTER ,
823- )
824- } ;
825- if ret < 0 {
826- let err = std:: io:: Error :: last_os_error ( ) ;
827- if let Some ( libc:: ENOSYS ) = err. raw_os_error ( ) {
828- f. sync_all ( ) ?;
829- } else {
830- return Err ( err. into ( ) ) ;
831- }
753+ #[ cfg( target_os = "linux" ) ]
754+ {
755+ use std:: os:: unix:: io:: AsRawFd ;
756+ let ret = unsafe {
757+ libc:: sync_file_range (
758+ f. as_raw_fd ( ) ,
759+ i64:: try_from ( log_offset) . unwrap ( ) ,
760+ i64:: try_from ( total_len) . unwrap ( ) ,
761+ libc:: SYNC_FILE_RANGE_WAIT_BEFORE
762+ | libc:: SYNC_FILE_RANGE_WRITE
763+ | libc:: SYNC_FILE_RANGE_WAIT_AFTER ,
764+ )
765+ } ;
766+ if ret < 0 {
767+ let err = std:: io:: Error :: last_os_error ( ) ;
768+ if let Some ( libc:: ENOSYS ) = err. raw_os_error ( ) {
769+ f. sync_all ( ) ?;
770+ } else {
771+ return Err ( err. into ( ) ) ;
832772 }
833773 }
834774 }
0 commit comments