@@ -595,12 +595,16 @@ impl InstantMultipartUpload {
595
595
let upload_id = api:: upload_multipart_initiate ( & app, & video_id) . await ?;
596
596
597
597
// TODO: Will it be a problem that `ReaderStream` doesn't have a fixed chunk size??? We should fix that!!!!
598
- let parts = progress ( uploader (
598
+ let parts = progress (
599
599
app. clone ( ) ,
600
600
video_id. clone ( ) ,
601
- upload_id. clone ( ) ,
602
- from_pending_file ( file_path. clone ( ) , realtime_video_done) ,
603
- ) )
601
+ uploader (
602
+ app. clone ( ) ,
603
+ video_id. clone ( ) ,
604
+ upload_id. clone ( ) ,
605
+ from_pending_file ( file_path. clone ( ) , realtime_video_done) ,
606
+ ) ,
607
+ )
604
608
. try_collect :: < Vec < _ > > ( )
605
609
. await ?;
606
610
@@ -622,13 +626,23 @@ impl InstantMultipartUpload {
622
626
}
623
627
}
624
628
629
+ struct Chunk {
630
+ /// The total size of the file to be uploaded.
631
+ /// This can change as the recording grows.
632
+ total_size : u64 ,
633
+ /// The part number. `FILE_OFFSET = PART_NUMBER * CHUNK_SIZE`.
634
+ part_number : u32 ,
635
+ /// Actual data bytes of this chunk
636
+ chunk : Bytes ,
637
+ }
638
+
625
639
/// Creates a stream that reads chunks from a potentially growing file,
626
640
/// yielding (part_number, chunk_data) pairs. The first chunk is yielded last
627
641
/// to allow for header rewriting after recording completion.
628
642
pub fn from_pending_file (
629
643
path : PathBuf ,
630
644
realtime_upload_done : Option < Receiver < ( ) > > ,
631
- ) -> impl Stream < Item = io:: Result < ( u32 , Bytes ) > > {
645
+ ) -> impl Stream < Item = io:: Result < Chunk > > {
632
646
try_stream ! {
633
647
let mut part_number = 2 ; // Start at 2 since part 1 will be yielded last
634
648
let mut last_read_position: u64 = 0 ;
@@ -703,7 +717,11 @@ pub fn from_pending_file(
703
717
first_chunk_size = Some ( total_read as u64 ) ;
704
718
} else {
705
719
// Yield non-first chunks immediately
706
- yield ( part_number, Bytes :: from( chunk) ) ;
720
+ yield Chunk {
721
+ total_size: file_size,
722
+ part_number,
723
+ chunk: Bytes :: from( chunk) ,
724
+ } ;
707
725
part_number += 1 ;
708
726
}
709
727
@@ -728,7 +746,11 @@ pub fn from_pending_file(
728
746
729
747
if total_read > 0 {
730
748
first_chunk. truncate( total_read) ;
731
- yield ( 1 , Bytes :: from( first_chunk) ) ;
749
+ yield Chunk {
750
+ total_size: file_size,
751
+ part_number: 1 ,
752
+ chunk: Bytes :: from( first_chunk) ,
753
+ } ;
732
754
}
733
755
}
734
756
break ;
@@ -747,15 +769,15 @@ fn uploader(
747
769
app : AppHandle ,
748
770
video_id : String ,
749
771
upload_id : String ,
750
- stream : impl Stream < Item = io:: Result < ( u32 , Bytes ) > > ,
772
+ stream : impl Stream < Item = io:: Result < Chunk > > ,
751
773
) -> impl Stream < Item = Result < UploadedPart , String > > {
752
774
let client = reqwest:: Client :: default ( ) ;
753
775
754
776
try_stream ! {
755
777
let mut stream = pin!( stream) ;
756
778
let mut prev_part_number = None ;
757
779
while let Some ( item) = stream. next( ) . await {
758
- let ( part_number, chunk) = item. map_err( |err| format!( "uploader/part/{:?}/fs: {err:?}" , prev_part_number. map( |p| p + 1 ) ) ) ?;
780
+ let Chunk { total_size , part_number, chunk } = item. map_err( |err| format!( "uploader/part/{:?}/fs: {err:?}" , prev_part_number. map( |p| p + 1 ) ) ) ?;
759
781
prev_part_number = Some ( part_number) ;
760
782
let md5_sum = base64:: encode( md5:: compute( & chunk) . 0 ) ;
761
783
let size = chunk. len( ) ;
@@ -786,31 +808,37 @@ fn uploader(
786
808
etag: etag. ok_or_else( || format!( "uploader/part/{part_number}/error: ETag header not found" ) ) ?,
787
809
part_number,
788
810
size,
811
+ total_size
789
812
} ;
790
813
}
791
814
}
792
815
}
793
816
794
817
/// Monitor the stream to report the upload progress
795
818
fn progress (
819
+ app : AppHandle ,
820
+ video_id : String ,
796
821
stream : impl Stream < Item = Result < UploadedPart , String > > ,
797
822
) -> impl Stream < Item = Result < UploadedPart , String > > {
798
- // TODO: Reenable progress reporting to the backend but build it on streams directly here.
799
- // let mut progress = UploadProgressUpdater::new(app.clone(), pre_created_video.id.clone());
823
+ // TODO: Flatten this implementation into here
824
+ let mut progress = UploadProgressUpdater :: new ( app. clone ( ) , video_id. clone ( ) ) ;
825
+ let mut uploaded = 0 ;
800
826
801
827
stream ! {
802
828
let mut stream = pin!( stream) ;
803
829
804
830
while let Some ( part) = stream. next( ) . await {
805
831
if let Ok ( part) = & part {
806
- // progress.update(expected_pos, file_size);
807
- // UploadProgressEvent {
808
- // video_id: video_id.to_string(),
809
- // uploaded: last_uploaded_position.to_string(),
810
- // total: file_size.to_string(),
811
- // }
812
- // .emit(app)
813
- // .ok();
832
+ uploaded += part. size as u64 ;
833
+
834
+ progress. update( uploaded, part. total_size) ;
835
+ UploadProgressEvent {
836
+ video_id: video_id. to_string( ) ,
837
+ uploaded: uploaded. to_string( ) ,
838
+ total: part. total_size. to_string( ) ,
839
+ }
840
+ . emit( & app)
841
+ . ok( ) ;
814
842
}
815
843
816
844
yield part;
0 commit comments