@@ -32,7 +32,8 @@ import (
3232)
3333
3434var (
35- hasherMap map [string ]workerProcess
35+ hasherMap map [string ]workerProcess
36+ hasherMapMu sync.Mutex
3637)
3738
3839type workerProcess struct {
@@ -382,6 +383,16 @@ func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *Chu
382383 su .updateChunkProgress (chunkEndIndex , pos )
383384 finalRequestObject , _ := data .Get ("isFinal" )
384385 finalRequest , _ := finalRequestObject .Bool ()
386+ totalSizeObject , _ := data .Get ("totalSize" )
387+ totalSize , _ := totalSizeObject .Int ()
388+ timingObject , _ := data .Get ("timing" )
389+ timing , _ := timingObject .Int ()
390+ logEvent := logEntry {
391+ OpType : "upload" ,
392+ DataSize : totalSize ,
393+ TimeTaken : int64 (timing ),
394+ }
395+ writeLogEntry (blobber .blobber .Baseurl , logEvent )
385396 if finalRequest {
386397 finalResult , err := data .Get ("finalResult" )
387398 if err != nil {
@@ -442,9 +453,10 @@ func ProcessEventData(data safejs.Value) {
442453 remotePath = fileMeta .RemotePath
443454 }
444455 if err != nil {
445- selfPostMessage (false , false , err .Error (), remotePath , 0 , nil )
456+ selfPostMessage (false , false , err .Error (), remotePath , 0 , 0 , 0 , nil )
446457 return
447458 }
459+ hasherMapMu .Lock ()
448460 wp , ok := hasherMap [fileMeta .RemotePath ]
449461 if ! ok {
450462 wp = workerProcess {
@@ -456,15 +468,20 @@ func ProcessEventData(data safejs.Value) {
456468 wp .hasher = CreateHasher (formInfo .ShardSize )
457469 hasherMap [fileMeta .RemotePath ] = wp
458470 }
471+ hasherMapMu .Unlock ()
459472 if formInfo .IsFinal {
460- defer delete (hasherMap , fileMeta .RemotePath )
473+ defer func () {
474+ hasherMapMu .Lock ()
475+ delete (hasherMap , fileMeta .RemotePath )
476+ hasherMapMu .Unlock ()
477+ }()
461478 }
462479 blobberID := os .Getenv ("BLOBBER_ID" )
463480 formBuilder := CreateChunkedUploadFormBuilder (formInfo .StorageVersion , formInfo .EncryptionVersion , formInfo .PrivateSigningKey )
464481 uploadData , err := formBuilder .Build (fileMeta , wp .hasher , formInfo .ConnectionID , blobberID , formInfo .ChunkSize , formInfo .ChunkStartIndex , formInfo .ChunkEndIndex , formInfo .IsFinal , formInfo .EncryptedKey , formInfo .EncryptedKeyPoint ,
465482 fileShards , thumbnailChunkData , formInfo .ShardSize )
466483 if err != nil {
467- selfPostMessage (false , false , err .Error (), remotePath , formInfo .ChunkEndIndex , nil )
484+ selfPostMessage (false , false , err .Error (), remotePath , formInfo .ChunkEndIndex , 0 , 0 , nil )
468485 return
469486 }
470487 if formInfo .OnlyHash {
@@ -474,27 +491,35 @@ func ProcessEventData(data safejs.Value) {
474491 ValidationRoot : uploadData .formData .ValidationRoot ,
475492 ThumbnailContentHash : uploadData .formData .ThumbnailContentHash ,
476493 }
477- selfPostMessage (true , true , "" , remotePath , formInfo .ChunkEndIndex , finalResult )
494+ selfPostMessage (true , true , "" , remotePath , formInfo .ChunkEndIndex , 0 , 0 , finalResult )
478495 } else {
479- selfPostMessage (true , false , "" , remotePath , formInfo .ChunkEndIndex , nil )
496+ selfPostMessage (true , false , "" , remotePath , formInfo .ChunkEndIndex , 0 , 0 , nil )
480497 }
481498 return
482499 }
483500 blobberURL := os .Getenv ("BLOBBER_URL" )
484501 if ! formInfo .IsFinal {
485502 wp .wg .Add (1 )
486503 }
504+
487505 go func (blobberData blobberData , remotePath string , wg * sync.WaitGroup ) {
506+ var (
507+ totalSize int
508+ timing int64
509+ )
510+ for _ , dataBuffer := range blobberData .dataBuffers {
511+ totalSize += dataBuffer .Len ()
512+ }
488513 if formInfo .IsFinal && len (blobberData .dataBuffers ) > 1 {
489- err = sendUploadRequest (blobberData .dataBuffers [:len (blobberData .dataBuffers )- 1 ], blobberData .contentSlice [:len (blobberData .contentSlice )- 1 ], blobberURL , formInfo .AllocationID , formInfo .AllocationTx , formInfo .HttpMethod , formInfo .ClientId )
514+ timing , err = sendUploadRequest (blobberData .dataBuffers [:len (blobberData .dataBuffers )- 1 ], blobberData .contentSlice [:len (blobberData .contentSlice )- 1 ], blobberURL , formInfo .AllocationID , formInfo .AllocationTx , formInfo .HttpMethod , formInfo .ClientId )
490515 if err != nil {
491- selfPostMessage (false , true , err .Error (), remotePath , formInfo .ChunkEndIndex , nil )
516+ selfPostMessage (false , true , err .Error (), remotePath , formInfo .ChunkEndIndex , totalSize , timing , nil )
492517 return
493518 }
494519 wg .Wait ()
495- err = sendUploadRequest (blobberData .dataBuffers [len (blobberData .dataBuffers )- 1 :], blobberData .contentSlice [len (blobberData .contentSlice )- 1 :], blobberURL , formInfo .AllocationID , formInfo .AllocationTx , formInfo .HttpMethod , formInfo .ClientId )
520+ timing , err = sendUploadRequest (blobberData .dataBuffers [len (blobberData .dataBuffers )- 1 :], blobberData .contentSlice [len (blobberData .contentSlice )- 1 :], blobberURL , formInfo .AllocationID , formInfo .AllocationTx , formInfo .HttpMethod , formInfo .ClientId )
496521 if err != nil {
497- selfPostMessage (false , true , err .Error (), remotePath , formInfo .ChunkEndIndex , nil )
522+ selfPostMessage (false , true , err .Error (), remotePath , formInfo .ChunkEndIndex , totalSize , timing , nil )
498523 return
499524 }
500525 } else {
@@ -503,9 +528,9 @@ func ProcessEventData(data safejs.Value) {
503528 } else {
504529 defer wg .Done ()
505530 }
506- err = sendUploadRequest (blobberData .dataBuffers , blobberData .contentSlice , blobberURL , formInfo .AllocationID , formInfo .AllocationTx , formInfo .HttpMethod , formInfo .ClientId )
531+ timing , err = sendUploadRequest (blobberData .dataBuffers , blobberData .contentSlice , blobberURL , formInfo .AllocationID , formInfo .AllocationTx , formInfo .HttpMethod , formInfo .ClientId )
507532 if err != nil {
508- selfPostMessage (false , formInfo .IsFinal , err .Error (), remotePath , formInfo .ChunkEndIndex , nil )
533+ selfPostMessage (false , formInfo .IsFinal , err .Error (), remotePath , formInfo .ChunkEndIndex , totalSize , timing , nil )
509534 return
510535 }
511536 }
@@ -515,9 +540,9 @@ func ProcessEventData(data safejs.Value) {
515540 ValidationRoot : blobberData .formData .ValidationRoot ,
516541 ThumbnailContentHash : blobberData .formData .ThumbnailContentHash ,
517542 }
518- selfPostMessage (true , true , "" , remotePath , formInfo .ChunkEndIndex , finalResult )
543+ selfPostMessage (true , true , "" , remotePath , formInfo .ChunkEndIndex , totalSize , timing , finalResult )
519544 } else {
520- selfPostMessage (true , false , "" , remotePath , formInfo .ChunkEndIndex , nil )
545+ selfPostMessage (true , false , "" , remotePath , formInfo .ChunkEndIndex , totalSize , timing , nil )
521546 }
522547 }(uploadData , remotePath , wp .wg )
523548
@@ -527,13 +552,15 @@ func InitHasherMap() {
527552 hasherMap = make (map [string ]workerProcess )
528553}
529554
530- func selfPostMessage (success , isFinal bool , errMsg , remotePath string , chunkEndIndex int , finalResult * FinalWorkerResult ) {
555+ func selfPostMessage (success , isFinal bool , errMsg , remotePath string , chunkEndIndex , totalSize int , timing int64 , finalResult * FinalWorkerResult ) {
531556 obj := js .Global ().Get ("Object" ).New ()
532557 obj .Set ("success" , success )
533558 obj .Set ("error" , errMsg )
534559 obj .Set ("isFinal" , isFinal )
535560 obj .Set ("chunkEndIndex" , chunkEndIndex )
536561 obj .Set ("remotePath" , remotePath )
562+ obj .Set ("totalSize" , totalSize )
563+ obj .Set ("timing" , timing )
537564 if finalResult != nil {
538565 finalResultJSON , err := json .Marshal (finalResult )
539566 if err != nil {
@@ -619,7 +646,7 @@ func parseEventData(data safejs.Value) (*FileMeta, *ChunkedUploadFormInfo, [][]b
619646 return fileMeta , formInfo , fileShards , thumbnailChunkData , nil
620647}
621648
622- func sendUploadRequest (dataBuffers []* bytes.Buffer , contentSlice []string , blobberURL , allocationID , allocationTx , httpMethod string , clientId ... string ) (err error ) {
649+ func sendUploadRequest (dataBuffers []* bytes.Buffer , contentSlice []string , blobberURL , allocationID , allocationTx , httpMethod string , clientId ... string ) (timing int64 , err error ) {
623650 eg , _ := errgroup .WithContext (context .TODO ())
624651 for dataInd := 0 ; dataInd < len (dataBuffers ); dataInd ++ {
625652 ind := dataInd
@@ -639,8 +666,10 @@ func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobb
639666 err , shouldContinue = func () (err error , shouldContinue bool ) {
640667 resp := fasthttp .AcquireResponse ()
641668 defer fasthttp .ReleaseResponse (resp )
669+ now := time .Now ()
642670 err = zboxutil .FastHttpClient .DoTimeout (req , resp , DefaultUploadTimeOut )
643671 fasthttp .ReleaseRequest (req )
672+ timing = time .Since (now ).Milliseconds ()
644673 if err != nil {
645674 logger .Logger .Error ("Upload : " , err , " baseURL " , blobberURL )
646675 if errors .Is (err , fasthttp .ErrConnectionClosed ) || errors .Is (err , syscall .EPIPE ) || errors .Is (err , fasthttp .ErrDialTimeout ) {
@@ -710,7 +739,7 @@ func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobb
710739 return err
711740 })
712741 }
713- return eg .Wait ()
742+ return timing , eg .Wait ()
714743}
715744
716745type eventChanWorker struct {
0 commit comments