@@ -11,22 +11,29 @@ use image::codecs::jpeg::JpegEncoder;
11
11
use reqwest:: StatusCode ;
12
12
use reqwest:: header:: CONTENT_LENGTH ;
13
13
use serde:: { Deserialize , Serialize } ;
14
+ use serde_json:: json;
14
15
use specta:: Type ;
15
- use std:: path:: PathBuf ;
16
- use std:: time:: Duration ;
17
- use tauri:: AppHandle ;
16
+ use std:: {
17
+ path:: PathBuf ,
18
+ time:: { Duration , Instant } ,
19
+ } ;
20
+ use tauri:: { AppHandle , ipc:: Channel } ;
18
21
use tauri_plugin_clipboard_manager:: ClipboardExt ;
19
- use tauri_specta:: Event ;
20
22
use tokio:: io:: { AsyncReadExt , AsyncSeekExt } ;
21
- use tokio:: task;
23
+ use tokio:: task:: { self , JoinHandle } ;
22
24
use tokio:: time:: sleep;
23
- use tracing:: { error, info, warn} ;
25
+ use tracing:: { debug , error, info, trace , warn} ;
24
26
25
27
#[ derive( Deserialize , Serialize , Clone , Type , Debug ) ]
26
28
pub struct S3UploadMeta {
27
29
id : String ,
28
30
}
29
31
32
+ #[ derive( Deserialize , Clone , Debug ) ]
33
+ pub struct CreateErrorResponse {
34
+ error : String ,
35
+ }
36
+
30
37
// fn deserialize_empty_object_as_string<'de, D>(deserializer: D) -> Result<String, D::Error>
31
38
// where
32
39
// D: Deserializer<'de>,
@@ -105,13 +112,107 @@ pub struct UploadedImage {
105
112
// pub config: S3UploadMeta,
106
113
// }
107
114
115
+ pub struct UploadProgressUpdater {
116
+ video_state : Option < VideoProgressState > ,
117
+ app : AppHandle ,
118
+ video_id : String ,
119
+ }
120
+
121
+ struct VideoProgressState {
122
+ uploaded : u64 ,
123
+ total : u64 ,
124
+ pending_task : Option < JoinHandle < ( ) > > ,
125
+ last_update_time : Instant ,
126
+ }
127
+
128
+ impl UploadProgressUpdater {
129
+ pub fn new ( app : AppHandle , video_id : String ) -> Self {
130
+ Self {
131
+ video_state : None ,
132
+ app,
133
+ video_id,
134
+ }
135
+ }
136
+
137
+ pub fn update ( & mut self , uploaded : u64 , total : u64 ) {
138
+ let should_send_immediately = {
139
+ let state = self . video_state . get_or_insert_with ( || VideoProgressState {
140
+ uploaded,
141
+ total,
142
+ pending_task : None ,
143
+ last_update_time : Instant :: now ( ) ,
144
+ } ) ;
145
+
146
+ // Cancel any pending task
147
+ if let Some ( handle) = state. pending_task . take ( ) {
148
+ handle. abort ( ) ;
149
+ }
150
+
151
+ state. uploaded = uploaded;
152
+ state. total = total;
153
+ state. last_update_time = Instant :: now ( ) ;
154
+
155
+ // Send immediately if upload is complete
156
+ uploaded >= total
157
+ } ;
158
+
159
+ let app = self . app . clone ( ) ;
160
+ if should_send_immediately {
161
+ tokio:: spawn ( {
162
+ let video_id = self . video_id . clone ( ) ;
163
+ async move {
164
+ Self :: send_api_update ( & app, video_id, uploaded, total) . await ;
165
+ }
166
+ } ) ;
167
+
168
+ // Clear state since upload is complete
169
+ self . video_state = None ;
170
+ } else {
171
+ // Schedule delayed update
172
+ let handle = {
173
+ let video_id = self . video_id . clone ( ) ;
174
+ tokio:: spawn ( async move {
175
+ tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
176
+ Self :: send_api_update ( & app, video_id, uploaded, total) . await ;
177
+ } )
178
+ } ;
179
+
180
+ if let Some ( state) = & mut self . video_state {
181
+ state. pending_task = Some ( handle) ;
182
+ }
183
+ }
184
+ }
185
+
186
+ async fn send_api_update ( app : & AppHandle , video_id : String , uploaded : u64 , total : u64 ) {
187
+ let response = app
188
+ . authed_api_request ( "/api/desktop/video/progress" , |client, url| {
189
+ client. post ( url) . json ( & json ! ( {
190
+ "videoId" : video_id,
191
+ "uploaded" : uploaded,
192
+ "total" : total,
193
+ "updatedAt" : chrono:: Utc :: now( ) . to_rfc3339( )
194
+ } ) )
195
+ } )
196
+ . await ;
197
+
198
+ match response {
199
+ Ok ( resp) if resp. status ( ) . is_success ( ) => {
200
+ trace ! ( "Progress update sent successfully" ) ;
201
+ }
202
+ Ok ( resp) => error ! ( "Failed to send progress update: {}" , resp. status( ) ) ,
203
+ Err ( err) => error ! ( "Failed to send progress update: {err}" ) ,
204
+ }
205
+ }
206
+ }
207
+
108
208
pub async fn upload_video (
109
209
app : & AppHandle ,
110
210
video_id : String ,
111
211
file_path : PathBuf ,
112
212
existing_config : Option < S3UploadMeta > ,
113
213
screenshot_path : Option < PathBuf > ,
114
214
meta : Option < S3VideoMeta > ,
215
+ channel : Option < Channel < UploadProgress > > ,
115
216
) -> Result < UploadedVideo , String > {
116
217
println ! ( "Uploading video {video_id}..." ) ;
117
218
@@ -145,20 +246,24 @@ pub async fn upload_video(
145
246
146
247
let reader_stream = tokio_util:: io:: ReaderStream :: new ( file) ;
147
248
148
- let mut bytes_uploaded = 0 ;
149
- let progress_stream = reader_stream. inspect ( {
150
- let app = app. clone ( ) ;
151
- move |chunk| {
152
- if bytes_uploaded > 0 {
153
- let _ = UploadProgress {
154
- progress : bytes_uploaded as f64 / total_size as f64 ,
155
- }
156
- . emit ( & app) ;
157
- }
249
+ let mut bytes_uploaded = 0u64 ;
250
+ let mut progress = UploadProgressUpdater :: new ( app. clone ( ) , video_id) ;
158
251
159
- if let Ok ( chunk) = chunk {
160
- bytes_uploaded += chunk. len ( ) ;
252
+ let progress_stream = reader_stream. inspect ( move |chunk| {
253
+ if let Ok ( chunk) = chunk {
254
+ bytes_uploaded += chunk. len ( ) as u64 ;
255
+ }
256
+
257
+ if bytes_uploaded > 0 {
258
+ if let Some ( channel) = & channel {
259
+ channel
260
+ . send ( UploadProgress {
261
+ progress : bytes_uploaded as f64 / total_size as f64 ,
262
+ } )
263
+ . ok ( ) ;
161
264
}
265
+
266
+ progress. update ( bytes_uploaded, total_size) ;
162
267
}
163
268
} ) ;
164
269
@@ -316,6 +421,21 @@ pub async fn create_or_get_video(
316
421
return Err ( "Failed to authenticate request; please log in again" . into ( ) ) ;
317
422
}
318
423
424
+ if response. status ( ) != StatusCode :: OK {
425
+ if let Ok ( error) = response. json :: < CreateErrorResponse > ( ) . await {
426
+ if error. error == "upgrade_required" {
427
+ return Err (
428
+ "You must upgrade to Cap Pro to upload recordings over 5 minutes in length"
429
+ . into ( ) ,
430
+ ) ;
431
+ }
432
+
433
+ return Err ( format ! ( "server error: {}" , error. error) ) ;
434
+ }
435
+
436
+ return Err ( "Unknown error uploading video" . into ( ) ) ;
437
+ }
438
+
319
439
let response_text = response
320
440
. text ( )
321
441
. await
@@ -544,13 +664,12 @@ impl InstantMultipartUpload {
544
664
let mut uploaded_parts = Vec :: new ( ) ;
545
665
let mut part_number = 1 ;
546
666
let mut last_uploaded_position: u64 = 0 ;
547
-
548
- println ! ( "Starting multipart upload for {video_id}..." ) ;
667
+ let mut progress = UploadProgressUpdater :: new ( app. clone ( ) , pre_created_video. id . clone ( ) ) ;
549
668
550
669
// --------------------------------------------
551
670
// initiate the multipart upload
552
671
// --------------------------------------------
553
- println ! ( "Initiating multipart upload for {video_id}..." ) ;
672
+ debug ! ( "Initiating multipart upload for {video_id}..." ) ;
554
673
let initiate_response = match app
555
674
. authed_api_request ( "/api/upload/multipart/initiate" , |c, url| {
556
675
c. post ( url)
@@ -654,6 +773,7 @@ impl InstantMultipartUpload {
654
773
& mut part_number,
655
774
& mut last_uploaded_position,
656
775
new_data_size. min ( CHUNK_SIZE ) ,
776
+ & mut progress,
657
777
)
658
778
. await
659
779
{
@@ -680,6 +800,7 @@ impl InstantMultipartUpload {
680
800
& mut 1 ,
681
801
& mut 0 ,
682
802
uploaded_parts[ 0 ] . size as u64 ,
803
+ & mut progress,
683
804
)
684
805
. await
685
806
. map_err ( |err| format ! ( "Failed to re-upload first chunk: {err}" ) ) ?;
@@ -726,6 +847,7 @@ impl InstantMultipartUpload {
726
847
part_number : & mut i32 ,
727
848
last_uploaded_position : & mut u64 ,
728
849
chunk_size : u64 ,
850
+ progress : & mut UploadProgressUpdater ,
729
851
) -> Result < UploadedPart , String > {
730
852
let file_size = match tokio:: fs:: metadata ( file_path) . await {
731
853
Ok ( metadata) => metadata. len ( ) ,
@@ -838,6 +960,8 @@ impl InstantMultipartUpload {
838
960
}
839
961
} ;
840
962
963
+ progress. update ( expected_pos, file_size) ;
964
+
841
965
if !presign_response. status ( ) . is_success ( ) {
842
966
let status = presign_response. status ( ) ;
843
967
let error_body = presign_response
0 commit comments