@@ -5,11 +5,13 @@ import (
55 "context"
66 "encoding/json"
77 "errors"
8+ "fmt"
89 "io"
910 "io/fs"
1011 "mime"
1112 "net/http"
1213 "net/textproto"
14+ "os"
1315 "path"
1416 "strconv"
1517 "time"
@@ -143,20 +145,36 @@ func SkipUnchanged(localInfo fs.FileInfo, remote *schema.Object) bool {
143145// By default, files that already exist remotely with the same size (and modtime
144146// when available) are skipped. Use WithCheck(nil) to disable this behaviour.
145147func (c * Client ) CreateObjects (ctx context.Context , name string , fsys fs.FS , opts ... UploadOpt ) ([]schema.Object , error ) {
148+ debug := os .Getenv ("FILER_UPLOAD_DEBUG" ) != ""
149+ useSSE := os .Getenv ("FILER_UPLOAD_SSE" ) != "0"
150+ t0 := time .Now ()
151+ dbg := func (msg string , args ... any ) {
152+ if debug {
153+ // Print to stderr with elapsed time for correlation
154+ args = append ([]any {time .Since (t0 )}, args ... )
155+ _ , _ = fmt .Fprintf (os .Stderr , "[upload-debug %+v] " + msg + "\n " , args ... )
156+ }
157+ }
158+ dbg ("begin CreateObjects name=%q opts=%d" , name , len (opts ))
159+
146160 o := & uploadOpts {check : SkipUnchanged }
147161 for _ , opt := range opts {
148162 if err := opt (o ); err != nil {
149163 return nil , err
150164 }
151165 }
166+ dbg ("config prefix=%q filter=%v check=%v progress=%v sse=%v" , o .prefix , o .filter != nil , o .check != nil , o .progress != nil , useSSE )
152167
153168 entries , err := walkFS (fsys , o .filter )
154169 if err != nil {
170+ dbg ("walkFS error: %v" , err )
155171 return nil , err
156172 }
157173 if len (entries ) == 0 {
174+ dbg ("no local entries to upload" )
158175 return nil , nil
159176 }
177+ dbg ("walked %d entries" , len (entries ))
160178
161179 // Pre-filter: HEAD all entries in parallel and ask the check function
162180 // whether each upload should be skipped.
@@ -165,8 +183,10 @@ func (c *Client) CreateObjects(ctx context.Context, name string, fsys fs.FS, opt
165183 for i , e := range entries {
166184 reqs [i ] = schema.GetObjectRequest {Path : path .Join ("/" , o .prefix , e .path )}
167185 }
186+ dbg ("preflight HEAD for %d entries" , len (reqs ))
168187 remotes , err := c .GetObjects (ctx , name , reqs )
169188 if err != nil && ctx .Err () != nil {
189+ dbg ("preflight canceled: %v" , ctx .Err ())
170190 return nil , ctx .Err ()
171191 }
172192 filtered := entries [:0 ]
@@ -178,20 +198,24 @@ func (c *Client) CreateObjects(ctx context.Context, name string, fsys fs.FS, opt
178198 entries = filtered
179199 }
180200 if len (entries ) == 0 {
201+ dbg ("all entries skipped by preflight check" )
181202 return nil , nil
182203 }
204+ dbg ("after skip check %d entries" , len (entries ))
183205
184206 // Open every file. Keep track of all opened handles so we can close them
185207 // after the HTTP round-trip completes (the streaming encoder reads bodies
186208 // lazily as the HTTP client sends data, so files must stay open until
187209 // DoWithContext returns).
188210 parts := make ([]types.File , 0 , len (entries ))
189- for _ , e := range entries {
211+ var totalBytes int64
212+ for i , e := range entries {
190213 f , err := fsys .Open (e .path )
191214 if err != nil {
192215 for _ , p := range parts {
193216 p .Body .Close ()
194217 }
218+ dbg ("open error path=%s err=%v" , e .path , err )
195219 return nil , err
196220 }
197221 remotePath := path .Join ("/" , o .prefix , e .path )
@@ -223,29 +247,44 @@ func (c *Client) CreateObjects(ctx context.Context, name string, fsys fs.FS, opt
223247 if sz := e .info .Size (); sz > 0 {
224248 h .Set (types .ContentLengthHeader , strconv .FormatInt (sz , 10 ))
225249 }
250+ if ! useSSE && o .progress != nil {
251+ total := e .info .Size ()
252+ body = newUploadProgressReadCloser (body , total , func (written , bytes int64 ) {
253+ o .progress (i , len (entries ), remotePath , written , bytes )
254+ })
255+ }
256+ totalBytes += e .info .Size ()
226257 parts = append (parts , types.File {
227258 Path : remotePath ,
228259 Body : body ,
229260 ContentType : ct ,
230261 Header : h ,
231262 })
263+ dbg ("prepared part path=%s size=%d ct=%s" , remotePath , e .info .Size (), ct )
232264 }
233265 defer func () {
234266 for _ , p := range parts {
235267 p .Body .Close ()
236268 }
237269 }()
270+ dbg ("built multipart parts=%d totalBytes=%d" , len (parts ), totalBytes )
238271
239272 // Build a streaming multipart payload. The encoder reflect-walks the
240273 // struct and writes each types.File as a separate multipart "file" part.
241- // Request text/event-stream so the server branches to objectUploadSSEStream,
242- // which streams each multipart part directly to the backend and emits
243- // one SSE event per committed file and one on error.
274+ // When SSE is enabled (default), request text/event-stream so the server
275+ // branches to objectUploadSSEStream and emits progressive upload events.
276+ // Set FILER_UPLOAD_SSE=0 to use the non-SSE upload path (single JSON
277+ // response on completion), which avoids full-duplex request/response.
244278 upload := struct {
245279 Files []types.File `json:"file"`
246280 }{Files : parts }
247- payload , err := client .NewStreamingMultipartRequest (& upload , client .ContentTypeTextStream )
281+ accept := client .ContentTypeTextStream
282+ if ! useSSE {
283+ accept = types .ContentTypeJSON
284+ }
285+ payload , err := client .NewStreamingMultipartRequest (& upload , accept )
248286 if err != nil {
287+ dbg ("multipart build error: %v" , err )
249288 return nil , err
250289 }
251290
@@ -255,37 +294,66 @@ func (c *Client) CreateObjects(ctx context.Context, name string, fsys fs.FS, opt
255294 uploadErr error
256295 totalFiles int // populated from UploadStartEvent
257296 )
297+
298+ if ! useSSE {
299+ dbg ("starting non-SSE POST parts=%d totalBytes=%d" , len (parts ), totalBytes )
300+ if err := c .DoWithContext (ctx , payload , & results ,
301+ client .OptPath (name ),
302+ client .OptReqHeader ("X-Upload-Count" , strconv .Itoa (len (parts ))),
303+ client .OptNoTimeout (),
304+ ); err != nil {
305+ dbg ("DoWithContext (non-SSE) error: %v" , err )
306+ return nil , err
307+ }
308+ if o .progress != nil {
309+ for index , obj := range results {
310+ o .progress (index , len (results ), obj .Path , obj .Size , obj .Size )
311+ }
312+ }
313+ dbg ("completed upload (non-SSE) results=%d" , len (results ))
314+ return results , nil
315+ }
258316 sseCallback := func (ev client.TextStreamEvent ) error {
259317 switch ev .Event {
260318 case schema .UploadStartEvent :
261319 var s schema.UploadStart
262320 if err := json .Unmarshal ([]byte (ev .Data ), & s ); err != nil {
321+ dbg ("sse start decode error: %v data=%q" , err , ev .Data )
263322 return err
264323 }
265324 totalFiles = s .Files
325+ dbg ("sse start files=%d" , totalFiles )
266326 case schema .UploadFileEvent :
327+ var f schema.UploadFile
328+ if err := json .Unmarshal ([]byte (ev .Data ), & f ); err != nil {
329+ dbg ("sse progress decode error: %v data=%q" , err , ev .Data )
330+ return err
331+ }
332+ dbg ("sse progress idx=%d/%d path=%s written=%d bytes=%d" , f .Index , totalFiles , f .Path , f .Written , f .Bytes )
267333 if o .progress != nil {
268- var f schema.UploadFile
269- if err := json .Unmarshal ([]byte (ev .Data ), & f ); err != nil {
270- return err
271- }
272334 o .progress (f .Index , totalFiles , f .Path , f .Written , f .Bytes )
273335 }
274336 case schema .UploadCompleteEvent :
275337 var obj schema.Object
276338 if err := json .Unmarshal ([]byte (ev .Data ), & obj ); err != nil {
339+ dbg ("sse complete decode error: %v data=%q" , err , ev .Data )
277340 return err
278341 }
279342 results = append (results , obj )
343+ dbg ("sse complete path=%s size=%d count=%d/%d" , obj .Path , obj .Size , len (results ), totalFiles )
280344 if o .progress != nil {
281345 o .progress (len (results )- 1 , totalFiles , obj .Path , obj .Size , obj .Size )
282346 }
283347 case schema .UploadErrorEvent :
284348 var e schema.UploadError
285349 if err := json .Unmarshal ([]byte (ev .Data ), & e ); err != nil {
350+ dbg ("sse error decode error: %v data=%q" , err , ev .Data )
286351 return err
287352 }
288353 uploadErr = errors .Join (uploadErr , errors .New (e .Message ))
354+ dbg ("sse error %s" , e .Message )
355+ default :
356+ dbg ("sse unknown event=%q data=%q" , ev .Event , ev .Data )
289357 }
290358 return nil
291359 }
@@ -303,8 +371,10 @@ func (c *Client) CreateObjects(ctx context.Context, name string, fsys fs.FS, opt
303371 client .OptTextStreamCallback (sseCallback ),
304372 client .OptNoTimeout (),
305373 ); err != nil {
374+ dbg ("DoWithContext error: %v" , err )
306375 return nil , err
307376 }
377+ dbg ("completed upload results=%d err=%v" , len (results ), uploadErr )
308378 return results , uploadErr
309379}
310380
@@ -342,3 +412,35 @@ func walkFS(fsys fs.FS, filter func(fs.DirEntry) bool) ([]walkEntry, error) {
342412 }
343413 return entries , nil
344414}
415+
416+ type uploadProgressReadCloser struct {
417+ r io.ReadCloser
418+ total int64
419+ written int64
420+ lastEmit int64
421+ cb func (written , total int64 )
422+ }
423+
424+ func newUploadProgressReadCloser (r io.ReadCloser , total int64 , cb func (written , total int64 )) io.ReadCloser {
425+ return & uploadProgressReadCloser {
426+ r : r ,
427+ total : total ,
428+ cb : cb ,
429+ }
430+ }
431+
432+ func (r * uploadProgressReadCloser ) Read (p []byte ) (int , error ) {
433+ n , err := r .r .Read (p )
434+ if n > 0 {
435+ r .written += int64 (n )
436+ if r .written - r .lastEmit >= 64 * 1024 || (r .total > 0 && r .written >= r .total ) {
437+ r .lastEmit = r .written
438+ r .cb (r .written , r .total )
439+ }
440+ }
441+ return n , err
442+ }
443+
444+ func (r * uploadProgressReadCloser ) Close () error {
445+ return r .r .Close ()
446+ }
0 commit comments