Skip to content

Commit c5c8e84

Browse files
abstract more API + retry on S3 upload request
1 parent ecddfb7 commit c5c8e84

File tree

3 files changed

+64
-62
lines changed

3 files changed

+64
-62
lines changed

apps/desktop/src-tauri/src/api.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
//! This will come part of the EffectTS rewrite work.
33
44
use serde::{Deserialize, Serialize};
5+
use serde_json::json;
56
use tauri::AppHandle;
67

78
use crate::web_api::ManagerExt;
89

9-
// TODO: Adding retry and backoff logic to everything!
10-
1110
pub async fn upload_multipart_initiate(app: &AppHandle, video_id: &str) -> Result<String, String> {
1211
#[derive(Deserialize)]
1312
pub struct Response {
@@ -212,3 +211,33 @@ pub async fn upload_signed(app: &AppHandle, body: PresignedS3PutRequest) -> Resu
212211
.map_err(|err| format!("api/upload_signed/response: {err}"))
213212
.map(|data| data.presigned_put_data.url)
214213
}
214+
215+
pub async fn desktop_video_progress(
216+
app: &AppHandle,
217+
video_id: &str,
218+
uploaded: u64,
219+
total: u64,
220+
) -> Result<(), String> {
221+
let resp = app
222+
.authed_api_request("/api/desktop/video/progress", |client, url| {
223+
client.post(url).json(&json!({
224+
"videoId": video_id,
225+
"uploaded": uploaded,
226+
"total": total,
227+
"updatedAt": chrono::Utc::now().to_rfc3339()
228+
}))
229+
})
230+
.await
231+
.map_err(|err| format!("api/desktop_video_progress/request: {err}"))?;
232+
233+
if !resp.status().is_success() {
234+
let status = resp.status();
235+
let error_body = resp
236+
.text()
237+
.await
238+
.unwrap_or_else(|_| "<no response body>".to_string());
239+
return Err(format!("api/desktop_video_progress/{status}: {error_body}"));
240+
}
241+
242+
Ok(())
243+
}

apps/desktop/src-tauri/src/upload.rs

Lines changed: 32 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,40 @@
11
// credit @filleduchaos
22

3-
use crate::api::{PresignedS3PutRequest, PresignedS3PutRequestMethod, S3VideoMeta, UploadedPart};
4-
use crate::web_api::ManagerExt;
5-
use crate::{UploadProgress, VideoUploadInfo, api};
3+
use crate::{
4+
UploadProgress, VideoUploadInfo, api,
5+
api::{PresignedS3PutRequest, PresignedS3PutRequestMethod, S3VideoMeta, UploadedPart},
6+
web_api::ManagerExt,
7+
};
68
use async_stream::{stream, try_stream};
7-
use axum::body::Body;
9+
use axum::http::{self, Uri};
810
use bytes::Bytes;
9-
use cap_project::{RecordingMeta, RecordingMetaInner, UploadState};
11+
use cap_project::{RecordingMeta, UploadState};
1012
use cap_utils::spawn_actor;
1113
use ffmpeg::ffi::AV_TIME_BASE;
1214
use flume::Receiver;
1315
use futures::{Stream, StreamExt, TryStreamExt, stream};
14-
use image::ImageReader;
15-
use image::codecs::jpeg::JpegEncoder;
16+
use image::{ImageReader, codecs::jpeg::JpegEncoder};
1617
use reqwest::StatusCode;
17-
use reqwest::header::CONTENT_LENGTH;
1818
use serde::{Deserialize, Serialize};
19-
use serde_json::json;
2019
use specta::Type;
21-
use std::error::Error;
22-
use std::io;
23-
use std::path::Path;
24-
use std::pin::pin;
2520
use std::{
26-
path::PathBuf,
21+
io,
22+
path::{Path, PathBuf},
23+
pin::pin,
24+
str::FromStr,
2725
time::{Duration, Instant},
2826
};
2927
use tauri::{AppHandle, ipc::Channel};
3028
use tauri_plugin_clipboard_manager::ClipboardExt;
3129
use tauri_specta::Event;
32-
use tokio::fs::File;
33-
use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
34-
use tokio::sync::watch;
35-
use tokio::task::{self, JoinHandle};
36-
use tokio::time::sleep;
30+
use tokio::{
31+
fs::File,
32+
io::{AsyncReadExt, AsyncSeekExt, BufReader},
33+
sync::watch,
34+
task::{self, JoinHandle},
35+
};
3736
use tokio_util::io::ReaderStream;
38-
use tracing::{debug, error, info, trace, warn};
37+
use tracing::{debug, error, info, trace};
3938

4039
#[derive(Deserialize, Serialize, Clone, Type, Debug)]
4140
pub struct S3UploadMeta {
@@ -65,15 +64,6 @@ pub struct UploadedImage {
6564
pub id: String,
6665
}
6766

68-
pub fn upload_v2(app: AppHandle) {
69-
// TODO: Progress reporting
70-
// TODO: Multipart or regular upload automatically sorted out
71-
// TODO: Allow either FS derived or Rust progress derived multipart upload source
72-
// TODO: Support screenshots, or videos
73-
74-
todo!();
75-
}
76-
7767
pub struct UploadProgressUpdater {
7868
video_state: Option<VideoProgressState>,
7969
app: AppHandle,
@@ -123,7 +113,7 @@ impl UploadProgressUpdater {
123113
tokio::spawn({
124114
let video_id = self.video_id.clone();
125115
async move {
126-
Self::send_api_update(&app, video_id, uploaded, total).await;
116+
api::desktop_video_progress(&app, &video_id, uploaded, total).await;
127117
}
128118
});
129119

@@ -135,7 +125,7 @@ impl UploadProgressUpdater {
135125
let video_id = self.video_id.clone();
136126
tokio::spawn(async move {
137127
tokio::time::sleep(Duration::from_secs(2)).await;
138-
Self::send_api_update(&app, video_id, uploaded, total).await;
128+
api::desktop_video_progress(&app, &video_id, uploaded, total).await;
139129
})
140130
};
141131

@@ -144,30 +134,6 @@ impl UploadProgressUpdater {
144134
}
145135
}
146136
}
147-
148-
async fn send_api_update(app: &AppHandle, video_id: String, uploaded: u64, total: u64) {
149-
let response = app
150-
.authed_api_request("/api/desktop/video/progress", |client, url| {
151-
client
152-
.post(url)
153-
.header("X-Cap-Desktop-Version", env!("CARGO_PKG_VERSION"))
154-
.json(&json!({
155-
"videoId": video_id,
156-
"uploaded": uploaded,
157-
"total": total,
158-
"updatedAt": chrono::Utc::now().to_rfc3339()
159-
}))
160-
})
161-
.await;
162-
163-
match response {
164-
Ok(resp) if resp.status().is_success() => {
165-
trace!("Progress update sent successfully");
166-
}
167-
Ok(resp) => error!("Failed to send progress update: {}", resp.status()),
168-
Err(err) => error!("Failed to send progress update: {err}"),
169-
}
170-
}
171137
}
172138

173139
#[derive(Default, Debug)]
@@ -740,8 +706,6 @@ fn uploader(
740706
upload_id: String,
741707
stream: impl Stream<Item = io::Result<Chunk>>,
742708
) -> impl Stream<Item = Result<UploadedPart, String>> {
743-
let client = reqwest::Client::default();
744-
745709
try_stream! {
746710
let mut stream = pin!(stream);
747711
let mut prev_part_number = None;
@@ -755,8 +719,17 @@ fn uploader(
755719
api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, &md5_sum)
756720
.await?;
757721

758-
// TODO: Retries
759-
let resp = client
722+
let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?;
723+
let resp = reqwest::Client::builder()
724+
.retry(reqwest::retry::for_host(url.host().unwrap_or("<unknown>").to_string()).classify_fn(|req_rep| {
725+
if req_rep.status().map_or(false, |s| s.is_server_error()) {
726+
req_rep.retryable()
727+
} else {
728+
req_rep.success()
729+
}
730+
}))
731+
.build()
732+
.map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))?
760733
.put(&presigned_url)
761734
.header("Content-MD5", &md5_sum)
762735
.header("Content-Length", chunk.len())

apps/desktop/src-tauri/src/web_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async fn do_authed_request(
2626
}
2727
),
2828
)
29-
.header("X-Desktop-Version", env!("CARGO_PKG_VERSION"));
29+
.header("X-Cap-Desktop-Version", env!("CARGO_PKG_VERSION"));
3030

3131
if let Some(s) = std::option_env!("VITE_VERCEL_AUTOMATION_BYPASS_SECRET") {
3232
req = req.header("x-vercel-protection-bypass", s);

0 commit comments

Comments
 (0)