@@ -387,27 +387,11 @@ func (c *Client) opendir(path string) (string, error) {
387387// Stat returns a FileInfo structure describing the file specified by path 'p'.
388388// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.
389389func (c * Client ) Stat (p string ) (os.FileInfo , error ) {
390- id := c .nextID ()
391- typ , data , err := c .sendPacket (nil , & sshFxpStatPacket {
392- ID : id ,
393- Path : p ,
394- })
390+ fs , err := c .stat (p )
395391 if err != nil {
396392 return nil , err
397393 }
398- switch typ {
399- case sshFxpAttrs :
400- sid , data := unmarshalUint32 (data )
401- if sid != id {
402- return nil , & unexpectedIDErr {id , sid }
403- }
404- attr , _ := unmarshalAttrs (data )
405- return fileInfoFromStat (attr , path .Base (p )), nil
406- case sshFxpStatus :
407- return nil , normaliseError (unmarshalStatus (id , data ))
408- default :
409- return nil , unimplementedPacketErr (typ )
410- }
394+ return fileInfoFromStat (fs , path .Base (p )), nil
411395}
412396
413397// Lstat returns a FileInfo structure describing the file specified by path 'p'.
@@ -638,6 +622,30 @@ func (c *Client) close(handle string) error {
638622 }
639623}
640624
625+ func (c * Client ) stat (path string ) (* FileStat , error ) {
626+ id := c .nextID ()
627+ typ , data , err := c .sendPacket (nil , & sshFxpStatPacket {
628+ ID : id ,
629+ Path : path ,
630+ })
631+ if err != nil {
632+ return nil , err
633+ }
634+ switch typ {
635+ case sshFxpAttrs :
636+ sid , data := unmarshalUint32 (data )
637+ if sid != id {
638+ return nil , & unexpectedIDErr {id , sid }
639+ }
640+ attr , _ := unmarshalAttrs (data )
641+ return attr , nil
642+ case sshFxpStatus :
643+ return nil , normaliseError (unmarshalStatus (id , data ))
644+ default :
645+ return nil , unimplementedPacketErr (typ )
646+ }
647+ }
648+
641649func (c * Client ) fstat (handle string ) (* FileStat , error ) {
642650 id := c .nextID ()
643651 typ , data , err := c .sendPacket (nil , & sshFxpFstatPacket {
@@ -1160,23 +1168,19 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
11601168 }
11611169
11621170 // For concurrency, we want to guess how many concurrent workers we should use.
1163- var fileSize uint64
1171+ var fileStat * FileStat
11641172 if f .c .useFstat {
1165- fileStat , err := f .c .fstat (f .handle )
1166- if err != nil {
1167- return 0 , err
1168- }
1169- fileSize = fileStat .Size
1173+ fileStat , err = f .c .fstat (f .handle )
11701174 } else {
1171- fi , err := f .c .Stat (f .path )
1172- if err != nil {
1173- return 0 , err
1174- }
1175- fileSize = uint64 (fi .Size ())
1175+ fileStat , err = f .c .stat (f .path )
1176+ }
1177+ if err != nil {
1178+ return 0 , err
11761179 }
11771180
1178- if fileSize <= uint64 (f .c .maxPacket ) {
1179- // We should be able to handle this in one Read.
1181+ fileSize := fileStat .Size
1182+ if fileSize <= uint64 (f .c .maxPacket ) || ! isRegular (fileStat .Mode ) {
1183+ // only regular files are guaranteed to return (full read) xor (partial read, next error)
11801184 return f .writeToSequential (w )
11811185 }
11821186
@@ -1187,6 +1191,10 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
11871191 // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
11881192 concurrency := int (concurrency64 )
11891193
1194+ chunkSize := f .c .maxPacket
1195+ pool := newBufPool (concurrency , chunkSize )
1196+ resPool := newResChanPool (concurrency )
1197+
11901198 cancel := make (chan struct {})
11911199 var wg sync.WaitGroup
11921200 defer func () {
@@ -1200,7 +1208,6 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
12001208
12011209 type writeWork struct {
12021210 b []byte
1203- n int
12041211 off int64
12051212 err error
12061213
@@ -1209,7 +1216,10 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
12091216 writeCh := make (chan writeWork )
12101217
12111218 type readWork struct {
1212- off int64
1219+ id uint32
1220+ res chan result
1221+ off int64
1222+
12131223 cur , next chan writeWork
12141224 }
12151225 readCh := make (chan readWork )
@@ -1219,49 +1229,78 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
12191229 defer close (readCh )
12201230
12211231 off := f .offset
1222- chunkSize := int64 (f .c .maxPacket )
12231232
12241233 cur := writeCh
12251234 for {
1235+ id := f .c .nextID ()
1236+ res := resPool .Get ()
1237+
12261238 next := make (chan writeWork )
12271239 readWork := readWork {
1228- off : off ,
1240+ id : id ,
1241+ res : res ,
1242+ off : off ,
1243+
12291244 cur : cur ,
12301245 next : next ,
12311246 }
12321247
1248+ f .c .dispatchRequest (res , & sshFxpReadPacket {
1249+ ID : id ,
1250+ Handle : f .handle ,
1251+ Offset : uint64 (off ),
1252+ Len : uint32 (chunkSize ),
1253+ })
1254+
12331255 select {
12341256 case readCh <- readWork :
12351257 case <- cancel :
12361258 return
12371259 }
12381260
1239- off += chunkSize
1261+ off += int64 ( chunkSize )
12401262 cur = next
12411263 }
12421264 }()
12431265
1244- pool := newBufPool (concurrency , f .c .maxPacket )
1245-
12461266 wg .Add (concurrency )
12471267 for i := 0 ; i < concurrency ; i ++ {
12481268 // Map_i: each worker gets readWork, and does the Read into a buffer at the given offset.
12491269 go func () {
12501270 defer wg .Done ()
12511271
1252- ch := make (chan result , 1 ) // reusable channel
1253-
12541272 for readWork := range readCh {
1255- b := pool .Get ()
1256-
1257- n , err := f .readChunkAt (ch , b , readWork .off )
1258- if n < 0 {
1259- panic ("sftp.File: returned negative count from readChunkAt" )
1273+ var b []byte
1274+ var n int
1275+
1276+ s := <- readWork .res
1277+ resPool .Put (readWork .res )
1278+
1279+ err := s .err
1280+ if err == nil {
1281+ switch s .typ {
1282+ case sshFxpStatus :
1283+ err = normaliseError (unmarshalStatus (readWork .id , s .data ))
1284+
1285+ case sshFxpData :
1286+ sid , data := unmarshalUint32 (s .data )
1287+ if readWork .id != sid {
1288+ err = & unexpectedIDErr {readWork .id , sid }
1289+
1290+ } else {
1291+ l , data := unmarshalUint32 (data )
1292+ b = pool .Get ()[:l ]
1293+ n = copy (b , data [:l ])
1294+ b = b [:n ]
1295+ }
1296+
1297+ default :
1298+ err = unimplementedPacketErr (s .typ )
1299+ }
12601300 }
12611301
12621302 writeWork := writeWork {
12631303 b : b ,
1264- n : n ,
12651304 off : readWork .off ,
12661305 err : err ,
12671306
@@ -1290,10 +1329,10 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
12901329 }
12911330
12921331 // Because writes are serialized, this will always be the last successfully read byte.
1293- f .offset = packet .off + int64 (packet .n )
1332+ f .offset = packet .off + int64 (len ( packet .b ) )
12941333
1295- if packet .n > 0 {
1296- n , err := w .Write (packet .b [: packet . n ] )
1334+ if len ( packet .b ) > 0 {
1335+ n , err := w .Write (packet .b )
12971336 written += int64 (n )
12981337 if err != nil {
12991338 return written , err
0 commit comments