@@ -8,6 +8,7 @@ use std::any::Any;
88use std:: collections:: VecDeque ;
99use std:: future:: { self , Future } ;
1010use std:: io:: { self , Read , Seek , SeekFrom , Write } ;
11+ use std:: mem;
1112#[ cfg( unix) ]
1213use std:: os:: unix:: {
1314 ffi:: OsStrExt ,
@@ -19,7 +20,6 @@ use std::path::{Path, PathBuf};
1920use std:: pin:: Pin ;
2021use std:: pin:: pin;
2122use std:: sync:: Arc ;
22- use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
2323use std:: task:: { Context , Poll } ;
2424use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
2525
@@ -34,51 +34,21 @@ use crate::davpath::DavPath;
3434use crate :: fs:: * ;
3535use crate :: localfs_macos:: DUCacheBuilder ;
3636
37- const RUNTIME_TYPE_BASIC : u32 = 1 ;
38- const RUNTIME_TYPE_THREADPOOL : u32 = 2 ;
39- static RUNTIME_TYPE : AtomicU32 = AtomicU32 :: new ( 0 ) ;
40-
41- #[ derive( Clone , Copy ) ]
42- #[ repr( u32 ) ]
43- enum RuntimeType {
44- Basic = RUNTIME_TYPE_BASIC ,
45- ThreadPool = RUNTIME_TYPE_THREADPOOL ,
46- }
47-
48- impl RuntimeType {
49- #[ inline]
50- fn get ( ) -> RuntimeType {
51- match RUNTIME_TYPE . load ( Ordering :: Relaxed ) {
52- RUNTIME_TYPE_BASIC => RuntimeType :: Basic ,
53- RUNTIME_TYPE_THREADPOOL => RuntimeType :: ThreadPool ,
54- _ => {
55- let dbg = format ! ( "{:?}" , tokio:: runtime:: Handle :: current( ) ) ;
56- let rt = if dbg. contains ( "ThreadPool" ) {
57- RuntimeType :: ThreadPool
58- } else {
59- RuntimeType :: Basic
60- } ;
61- RUNTIME_TYPE . store ( rt as u32 , Ordering :: SeqCst ) ;
62- rt
63- }
64- }
65- }
66- }
67-
6837// Run some code via block_in_place() or spawn_blocking().
6938//
7039// There's also a method on LocalFs for this, use the freestanding
7140// function if you do not want the fs_access_guard() closure to be used.
41+ #[ cfg( feature = "localfs" ) ]
7242#[ inline]
7343async fn blocking < F , R > ( func : F ) -> R
7444where
7545 F : FnOnce ( ) -> R ,
7646 F : Send + ' static ,
7747 R : Send + ' static ,
7848{
79- match RuntimeType :: get ( ) {
80- RuntimeType :: Basic => task:: spawn_blocking ( func) . await . unwrap ( ) ,
81- RuntimeType :: ThreadPool => task:: block_in_place ( func) ,
49+ match tokio :: runtime :: Handle :: current ( ) . runtime_flavor ( ) {
50+ tokio :: runtime :: RuntimeFlavor :: MultiThread => task:: block_in_place ( func) ,
51+ _ => task:: spawn_blocking ( func) . await . unwrap ( ) ,
8252 }
8353}
8454
@@ -103,7 +73,10 @@ pub(crate) struct LocalFsInner {
10373}
10474
10575#[ derive( Debug ) ]
106- struct LocalFsFile ( Option < std:: fs:: File > ) ;
76+ struct LocalFsFile {
77+ file : Option < std:: fs:: File > ,
78+ buf : BytesMut ,
79+ }
10780
10881struct LocalFsReadDir {
10982 fs : LocalFs ,
@@ -340,7 +313,10 @@ impl DavFileSystem for LocalFs {
340313 . create_new ( options. create_new )
341314 . open ( path) ;
342315 match res {
343- Ok ( file) => Ok ( Box :: new ( LocalFsFile ( Some ( file) ) ) as Box < dyn DavFile > ) ,
316+ Ok ( file) => Ok ( Box :: new ( LocalFsFile {
317+ file : Some ( file) ,
318+ buf : BytesMut :: new ( ) ,
319+ } ) as Box < dyn DavFile > ) ,
344320 Err ( e) => Err ( e. into ( ) ) ,
345321 }
346322 } )
@@ -676,27 +652,27 @@ impl DavDirEntry for LocalFsDirEntry {
676652impl DavFile for LocalFsFile {
677653 fn metadata ( & ' _ mut self ) -> FsFuture < ' _ , Box < dyn DavMetaData > > {
678654 async move {
679- let file = self . 0 . take ( ) . unwrap ( ) ;
655+ let file = self . file . take ( ) . unwrap ( ) ;
680656 let ( meta, file) = blocking ( move || ( file. metadata ( ) , file) ) . await ;
681- self . 0 = Some ( file) ;
657+ self . file = Some ( file) ;
682658 Ok ( Box :: new ( LocalFsMetaData ( meta?) ) as Box < dyn DavMetaData > )
683659 }
684660 . boxed ( )
685661 }
686662
687663 fn write_bytes ( & ' _ mut self , buf : Bytes ) -> FsFuture < ' _ , ( ) > {
688664 async move {
689- let mut file = self . 0 . take ( ) . unwrap ( ) ;
665+ let mut file = self . file . take ( ) . unwrap ( ) ;
690666 let ( res, file) = blocking ( move || ( file. write_all ( & buf) , file) ) . await ;
691- self . 0 = Some ( file) ;
667+ self . file = Some ( file) ;
692668 res. map_err ( |e| e. into ( ) )
693669 }
694670 . boxed ( )
695671 }
696672
697673 fn write_buf ( & ' _ mut self , mut buf : Box < dyn Buf + Send > ) -> FsFuture < ' _ , ( ) > {
698674 async move {
699- let mut file = self . 0 . take ( ) . unwrap ( ) ;
675+ let mut file = self . file . take ( ) . unwrap ( ) ;
700676 let ( res, file) = blocking ( move || {
701677 while buf. remaining ( ) > 0 {
702678 let n = match file. write ( buf. chunk ( ) ) {
@@ -708,48 +684,50 @@ impl DavFile for LocalFsFile {
708684 ( Ok ( ( ) ) , file)
709685 } )
710686 . await ;
711- self . 0 = Some ( file) ;
687+ self . file = Some ( file) ;
712688 res. map_err ( |e| e. into ( ) )
713689 }
714690 . boxed ( )
715691 }
716692
717693 fn read_bytes ( & ' _ mut self , count : usize ) -> FsFuture < ' _ , Bytes > {
718694 async move {
719- let mut file = self . 0 . take ( ) . unwrap ( ) ;
720- let ( res, file) = blocking ( move || {
721- let mut buf = BytesMut :: with_capacity ( count) ;
695+ let mut file = self . file . take ( ) . unwrap ( ) ;
696+ let mut buf = mem:: take ( & mut self . buf ) ;
697+ let ( res, file, buf) = blocking ( move || {
698+ buf. reserve ( count) ;
722699 let res = unsafe {
723700 buf. set_len ( count) ;
724701 file. read ( & mut buf) . map ( |n| {
725702 buf. set_len ( n) ;
726- buf. freeze ( )
703+ buf. split ( ) . freeze ( )
727704 } )
728705 } ;
729- ( res, file)
706+ ( res, file, buf )
730707 } )
731708 . await ;
732- self . 0 = Some ( file) ;
709+ self . file = Some ( file) ;
710+ self . buf = buf;
733711 res. map_err ( |e| e. into ( ) )
734712 }
735713 . boxed ( )
736714 }
737715
738716 fn seek ( & ' _ mut self , pos : SeekFrom ) -> FsFuture < ' _ , u64 > {
739717 async move {
740- let mut file = self . 0 . take ( ) . unwrap ( ) ;
718+ let mut file = self . file . take ( ) . unwrap ( ) ;
741719 let ( res, file) = blocking ( move || ( file. seek ( pos) , file) ) . await ;
742- self . 0 = Some ( file) ;
720+ self . file = Some ( file) ;
743721 res. map_err ( |e| e. into ( ) )
744722 }
745723 . boxed ( )
746724 }
747725
748726 fn flush ( & ' _ mut self ) -> FsFuture < ' _ , ( ) > {
749727 async move {
750- let mut file = self . 0 . take ( ) . unwrap ( ) ;
728+ let mut file = self . file . take ( ) . unwrap ( ) ;
751729 let ( res, file) = blocking ( move || ( file. flush ( ) , file) ) . await ;
752- self . 0 = Some ( file) ;
730+ self . file = Some ( file) ;
753731 res. map_err ( |e| e. into ( ) )
754732 }
755733 . boxed ( )
0 commit comments