diff --git a/Cargo.lock b/Cargo.lock index 0c4cd0f50..58bd476d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -512,6 +512,28 @@ dependencies = [ "windows-sys 0.61.0", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1020,6 +1042,13 @@ dependencies = [ "uuid", ] +[[package]] +name = "cap-api" +version = "0.0.0" +dependencies = [ + "reqwest", +] + [[package]] name = "cap-audio" version = "0.1.0" @@ -1144,9 +1173,11 @@ name = "cap-desktop" version = "0.3.72-beta.1" dependencies = [ "anyhow", + "async-stream", "axum", "base64 0.22.1", "bytemuck", + "bytes", "cap-audio", "cap-camera", "cap-editor", @@ -1220,6 +1251,7 @@ dependencies = [ "tauri-plugin-window-state", "tauri-specta", "tempfile", + "thiserror 1.0.69", "tokio", "tokio-stream", "tokio-util", diff --git a/apps/desktop/src-tauri/Cargo.toml b/apps/desktop/src-tauri/Cargo.toml index 2eaccbe05..e1c936427 100644 --- a/apps/desktop/src-tauri/Cargo.toml +++ b/apps/desktop/src-tauri/Cargo.toml @@ -104,6 +104,9 @@ wgpu.workspace = true bytemuck = "1.23.1" kameo = "0.17.2" tauri-plugin-sentry = "0.5.0" +thiserror.workspace = true +bytes = "1.10.1" +async-stream = "0.3.6" [target.'cfg(target_os = "macos")'.dependencies] core-graphics = "0.24.0" diff --git a/apps/desktop/src-tauri/src/api.rs b/apps/desktop/src-tauri/src/api.rs new file mode 100644 index 000000000..92a2ba203 --- /dev/null +++ b/apps/desktop/src-tauri/src/api.rs @@ -0,0 +1,244 @@ +//! TODO: We should investigate generating this with OpenAPI. +//! This will come part of the EffectTS rewrite work. + +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tauri::AppHandle; + +use crate::web_api::ManagerExt; + +pub async fn upload_multipart_initiate(app: &AppHandle, video_id: &str) -> Result { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Response { + upload_id: String, + } + + let resp = app + .authed_api_request("/api/upload/multipart/initiate", |c, url| { + c.post(url) + .header("Content-Type", "application/json") + .json(&serde_json::json!({ + "videoId": video_id, + "contentType": "video/mp4" + })) + }) + .await + .map_err(|err| format!("api/upload_multipart_initiate/request: {err}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let error_body = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!( + "api/upload_multipart_initiate/{status}: {error_body}" + )); + } + + resp.json::() + .await + .map_err(|err| format!("api/upload_multipart_initiate/response: {err}")) + .map(|data| data.upload_id) +} + +pub async fn upload_multipart_presign_part( + app: &AppHandle, + video_id: &str, + upload_id: &str, + part_number: u32, + md5_sum: &str, +) -> Result { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Response { + presigned_url: String, + } + + let resp = app + .authed_api_request("/api/upload/multipart/presign-part", |c, url| { + c.post(url) + .header("Content-Type", "application/json") + .json(&serde_json::json!({ + "videoId": video_id, + "uploadId": upload_id, + "partNumber": part_number, + "md5Sum": md5_sum + })) + }) + .await + .map_err(|err| format!("api/upload_multipart_presign_part/request: {err}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let error_body = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!( + "api/upload_multipart_presign_part/{status}: {error_body}" + )); + } + + resp.json::() + .await + .map_err(|err| format!("api/upload_multipart_presign_part/response: {err}")) + .map(|data| data.presigned_url) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UploadedPart { + pub part_number: u32, + pub etag: String, + pub size: usize, + #[serde(skip)] + pub total_size: u64, +} + +#[derive(Serialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct S3VideoMeta { + #[serde(rename = "durationInSecs")] + pub duration_in_secs: f64, + pub width: u32, + pub height: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub fps: Option, +} + +pub async fn upload_multipart_complete( + app: &AppHandle, + video_id: &str, + upload_id: &str, + parts: &[UploadedPart], + meta: Option, +) -> Result, String> { + #[derive(Serialize)] + #[serde(rename_all = "camelCase")] + pub struct MultipartCompleteRequest<'a> { + video_id: &'a str, + upload_id: &'a str, + parts: &'a [UploadedPart], + #[serde(flatten)] + meta: Option, + } + + #[derive(Deserialize)] + pub struct Response { + location: Option, + } + + let resp = app + .authed_api_request("/api/upload/multipart/complete", |c, url| { + c.post(url) + .header("Content-Type", "application/json") + .json(&MultipartCompleteRequest { + video_id, + upload_id, + parts, + meta, + }) + }) + .await + .map_err(|err| format!("api/upload_multipart_complete/request: {err}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let error_body = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!( + "api/upload_multipart_complete/{status}: {error_body}" + )); + } + + resp.json::() + .await + .map_err(|err| format!("api/upload_multipart_complete/response: {err}")) + .map(|data| data.location) +} + +#[derive(Serialize)] +#[serde(rename_all = "lowercase")] +pub enum PresignedS3PutRequestMethod { + #[allow(unused)] + Post, + Put, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PresignedS3PutRequest { + pub video_id: String, + pub subpath: String, + pub method: PresignedS3PutRequestMethod, + #[serde(flatten)] + pub meta: Option, +} + +pub async fn upload_signed(app: &AppHandle, body: PresignedS3PutRequest) -> Result { + #[derive(Deserialize)] + struct Data { + url: String, + } + + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Response { + presigned_put_data: Data, + } + + let resp = app + .authed_api_request("/api/upload/signed", |client, url| { + client.post(url).json(&body) + }) + .await + .map_err(|err| format!("api/upload_signed/request: {err}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let error_body = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!("api/upload_signed/{status}: {error_body}")); + } + + resp.json::() + .await + .map_err(|err| format!("api/upload_signed/response: {err}")) + .map(|data| data.presigned_put_data.url) +} + +pub async fn desktop_video_progress( + app: &AppHandle, + video_id: &str, + uploaded: u64, + total: u64, +) -> Result<(), String> { + let resp = app + .authed_api_request("/api/desktop/video/progress", |client, url| { + client.post(url).json(&json!({ + "videoId": video_id, + "uploaded": uploaded, + "total": total, + "updatedAt": chrono::Utc::now().to_rfc3339() + })) + }) + .await + .map_err(|err| format!("api/desktop_video_progress/request: {err}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let error_body = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!("api/desktop_video_progress/{status}: {error_body}")); + } + + Ok(()) +} diff --git a/apps/desktop/src-tauri/src/general_settings.rs b/apps/desktop/src-tauri/src/general_settings.rs index f8e6b4725..771ae56a8 100644 --- a/apps/desktop/src-tauri/src/general_settings.rs +++ b/apps/desktop/src-tauri/src/general_settings.rs @@ -97,6 +97,8 @@ pub struct GeneralSettingsStore { pub enable_new_recording_flow: bool, #[serde(default)] pub post_deletion_behaviour: PostDeletionBehaviour, + #[serde(default = "default_enable_new_uploader", skip_serializing_if = "no")] + pub enable_new_uploader: bool, } fn default_enable_native_camera_preview() -> bool { @@ -108,6 +110,10 @@ fn default_enable_new_recording_flow() -> bool { cfg!(debug_assertions) } +fn default_enable_new_uploader() -> bool { + cfg!(debug_assertions) +} + fn no(_: &bool) -> bool { false } @@ -155,6 +161,7 @@ impl Default for GeneralSettingsStore { auto_zoom_on_clicks: false, enable_new_recording_flow: default_enable_new_recording_flow(), post_deletion_behaviour: PostDeletionBehaviour::DoNothing, + enable_new_uploader: default_enable_new_uploader(), } } } diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index 74ca14e70..d31d6eac1 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -1,3 +1,4 @@ +mod api; mod audio; mod audio_meter; mod auth; @@ -22,6 +23,7 @@ mod target_select_overlay; mod thumbnails; mod tray; mod upload; +mod upload_legacy; mod web_api; mod windows; @@ -30,8 +32,8 @@ use auth::{AuthStore, AuthenticationInvalid, Plan}; use camera::CameraPreviewState; use cap_editor::{EditorInstance, EditorState}; use cap_project::{ - ProjectConfiguration, RecordingMeta, RecordingMetaInner, SharingMeta, StudioRecordingMeta, XY, - ZoomSegment, + InstantRecordingMeta, ProjectConfiguration, RecordingMeta, RecordingMetaInner, SharingMeta, + StudioRecordingMeta, StudioRecordingStatus, UploadMeta, VideoUploadInfo, XY, ZoomSegment, }; use cap_recording::{ RecordingMode, @@ -83,14 +85,15 @@ use tauri_specta::Event; #[cfg(target_os = "macos")] use tokio::sync::Mutex; use tokio::sync::{RwLock, oneshot}; -use tracing::{error, trace}; -use upload::{S3UploadMeta, create_or_get_video, upload_image, upload_video}; +use tracing::{error, trace, warn}; +use upload::{create_or_get_video, upload_image, upload_video}; use web_api::ManagerExt as WebManagerExt; use windows::{CapWindowId, EditorWindowIds, ShowCapWindow, set_window_transparent}; use crate::{ camera::CameraPreviewManager, recording_settings::{RecordingSettingsStore, RecordingTargetMode}, + upload::InstantMultipartUpload, }; use crate::{recording::start_recording, upload::build_video_meta}; @@ -146,13 +149,6 @@ pub struct VideoRecordingMetadata { pub size: f64, } -#[derive(Clone, Serialize, Deserialize, specta::Type, Debug)] -pub struct VideoUploadInfo { - id: String, - link: String, - config: S3UploadMeta, -} - impl App { pub fn set_pending_recording(&mut self, mode: RecordingMode, target: ScreenCaptureTarget) { self.recording_state = RecordingState::Pending { mode, target }; @@ -899,16 +895,25 @@ async fn get_video_metadata(path: PathBuf) -> Result { vec![path.join("content/output.mp4")] } - RecordingMetaInner::Studio(meta) => match meta { - StudioRecordingMeta::SingleSegment { segment } => { - vec![recording_meta.path(&segment.display.path)] + RecordingMetaInner::Studio(meta) => { + let status = meta.status(); + if let StudioRecordingStatus::Failed { .. } = status { + return Err("Unable to get metadata on failed recording".to_string()); + } else if let StudioRecordingStatus::InProgress = status { + return Err("Unable to get metadata on in-progress recording".to_string()); } - StudioRecordingMeta::MultipleSegments { inner, .. } => inner - .segments - .iter() - .map(|s| recording_meta.path(&s.display.path)) - .collect(), - }, + + match meta { + StudioRecordingMeta::SingleSegment { segment } => { + vec![recording_meta.path(&segment.display.path)] + } + StudioRecordingMeta::MultipleSegments { inner } => inner + .segments + .iter() + .map(|s| recording_meta.path(&s.display.path)) + .collect(), + } + } }; let duration = display_paths @@ -1064,13 +1069,13 @@ async fn upload_exported_video( let mut meta = RecordingMeta::load_for_project(&path).map_err(|v| v.to_string())?; - let output_path = meta.output_path(); - if !output_path.exists() { + let file_path = meta.output_path(); + if !file_path.exists() { notifications::send_notification(&app, notifications::NotificationType::UploadFailed); return Err("Failed to upload video: Rendered video not found".to_string()); } - let metadata = build_video_meta(&output_path) + let metadata = build_video_meta(&file_path) .map_err(|err| format!("Error getting output video meta: {err}"))?; if !auth.is_upgraded() && metadata.duration_in_secs > 300.0 { @@ -1107,15 +1112,23 @@ async fn upload_exported_video( } .await?; - let upload_id = s3_config.id().to_string(); + let screenshot_path = meta.project_path.join("screenshots/display.jpg"); + meta.upload = Some(UploadMeta::SinglePartUpload { + video_id: s3_config.id.clone(), + file_path: file_path.clone(), + screenshot_path: screenshot_path.clone(), + recording_dir: path.clone(), + }); + meta.save_for_project() + .map_err(|e| error!("Failed to save recording meta: {e}")) + .ok(); match upload_video( &app, - upload_id.clone(), - output_path, - Some(s3_config), - Some(meta.project_path.join("screenshots/display.jpg")), - Some(metadata), + s3_config.id.clone(), + file_path, + screenshot_path, + metadata, Some(channel.clone()), ) .await @@ -1123,11 +1136,14 @@ async fn upload_exported_video( Ok(uploaded_video) => { channel.send(UploadProgress { progress: 1.0 }).ok(); + meta.upload = Some(UploadMeta::Complete); meta.sharing = Some(SharingMeta { link: uploaded_video.link.clone(), id: uploaded_video.id.clone(), }); - meta.save_for_project().ok(); + meta.save_for_project() + .map_err(|e| error!("Failed to save recording meta: {e}")) + .ok(); let _ = app .state::>() @@ -1142,6 +1158,12 @@ async fn upload_exported_video( error!("Failed to upload video: {e}"); NotificationType::UploadFailed.send(&app); + + meta.upload = Some(UploadMeta::Failed { error: e.clone() }); + meta.save_for_project() + .map_err(|e| error!("Failed to save recording meta: {e}")) + .ok(); + Err(e) } } @@ -1305,6 +1327,7 @@ async fn take_screenshot(app: AppHandle, _state: MutableState<'_, App>) -> Resul cursor: None, }, }), + upload: None, } .save_for_project() .unwrap(); @@ -1392,19 +1415,44 @@ async fn save_file_dialog( } #[derive(Serialize, specta::Type)] -pub struct RecordingMetaWithMode { +pub struct RecordingMetaWithMetadata { #[serde(flatten)] pub inner: RecordingMeta, + // Easier accessors for within webview + // THESE MUST COME AFTER `inner` to override flattened fields with the same name pub mode: RecordingMode, + pub status: StudioRecordingStatus, } -impl RecordingMetaWithMode { +impl RecordingMetaWithMetadata { fn new(inner: RecordingMeta) -> Self { Self { mode: match &inner.inner { RecordingMetaInner::Studio(_) => RecordingMode::Studio, RecordingMetaInner::Instant(_) => RecordingMode::Instant, }, + status: match &inner.inner { + RecordingMetaInner::Studio(StudioRecordingMeta::MultipleSegments { inner }) => { + inner + .status + .clone() + .unwrap_or(StudioRecordingStatus::Complete) + } + RecordingMetaInner::Studio(StudioRecordingMeta::SingleSegment { .. }) => { + StudioRecordingStatus::Complete + } + RecordingMetaInner::Instant(InstantRecordingMeta::InProgress { .. }) => { + StudioRecordingStatus::InProgress + } + RecordingMetaInner::Instant(InstantRecordingMeta::Failed { error }) => { + StudioRecordingStatus::Failed { + error: error.clone(), + } + } + RecordingMetaInner::Instant(InstantRecordingMeta::Complete { .. }) => { + StudioRecordingStatus::Complete + } + }, inner, } } @@ -1422,15 +1470,15 @@ pub enum FileType { fn get_recording_meta( path: PathBuf, _file_type: FileType, -) -> Result { +) -> Result { RecordingMeta::load_for_project(&path) - .map(RecordingMetaWithMode::new) + .map(RecordingMetaWithMetadata::new) .map_err(|e| format!("Failed to load recording meta: {e}")) } #[tauri::command] #[specta::specta] -fn list_recordings(app: AppHandle) -> Result, String> { +fn list_recordings(app: AppHandle) -> Result, String> { let recordings_dir = recordings_path(&app); if !recordings_dir.exists() { @@ -1708,7 +1756,7 @@ async fn editor_delete_project( let _ = tokio::fs::remove_dir_all(&path).await; - RecordingDeleted { path }.emit(&app); + RecordingDeleted { path }.emit(&app).ok(); Ok(()) } @@ -1957,6 +2005,7 @@ pub async fn run(recording_logging_handle: LoggingHandle) { RecordingDeleted, target_select_overlay::TargetUnderCursor, hotkeys::OnEscapePress, + upload::UploadProgressEvent, ]) .error_handling(tauri_specta::ErrorHandlingMode::Throw) .typ::() @@ -2118,6 +2167,27 @@ pub async fn run(recording_logging_handle: LoggingHandle) { }); } + tokio::spawn({ + let app = app.clone(); + async move { + let is_new_uploader_enabled = GeneralSettingsStore::get(&app) + .map_err(|err| { + error!( + "Error checking status of new uploader flow from settings: {err}" + ) + }) + .ok() + .and_then(|v| v.map(|v| v.enable_new_uploader)) + .unwrap_or(false); + if is_new_uploader_enabled { + resume_uploads(app) + .await + .map_err(|err| warn!("Error resuming uploads: {err}")) + .ok(); + } + } + }); + { app.manage(Arc::new(RwLock::new(App { camera_ws_port, @@ -2391,6 +2461,128 @@ pub async fn run(recording_logging_handle: LoggingHandle) { }); } +async fn resume_uploads(app: AppHandle) -> Result<(), String> { + let recordings_dir = recordings_path(&app); + if !recordings_dir.exists() { + return Err("Recording directory missing".to_string()); + } + + let entries = std::fs::read_dir(&recordings_dir) + .map_err(|e| format!("Failed to read recordings directory: {}", e))?; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() && path.extension().and_then(|s| s.to_str()) == Some("cap") { + // Load recording meta to check for in-progress recordings + if let Ok(mut meta) = RecordingMeta::load_for_project(&path) { + let mut needs_save = false; + + // Check if recording is still marked as in-progress and if so mark as failed + // This should only happen if the application crashes while recording + match &mut meta.inner { + RecordingMetaInner::Studio(StudioRecordingMeta::MultipleSegments { inner }) => { + if let Some(StudioRecordingStatus::InProgress) = &inner.status { + inner.status = Some(StudioRecordingStatus::Failed { + error: "Recording crashed".to_string(), + }); + needs_save = true; + } + } + RecordingMetaInner::Instant(InstantRecordingMeta::InProgress { .. }) => { + meta.inner = RecordingMetaInner::Instant(InstantRecordingMeta::Failed { + error: "Recording crashed".to_string(), + }); + needs_save = true; + } + _ => {} + } + + // Save the updated meta if we made changes + if needs_save && let Err(err) = meta.save_for_project() { + error!("Failed to save recording meta for {path:?}: {err}"); + } + + // Handle upload resumption + if let Some(upload_meta) = meta.upload { + match upload_meta { + UploadMeta::MultipartUpload { + video_id: _, + file_path, + pre_created_video, + recording_dir, + } => { + InstantMultipartUpload::spawn( + app.clone(), + file_path, + pre_created_video, + None, + recording_dir, + ); + } + UploadMeta::SinglePartUpload { + video_id, + file_path, + screenshot_path, + recording_dir, + } => { + let app = app.clone(); + tokio::spawn(async move { + if let Ok(meta) = build_video_meta(&file_path) + .map_err(|error| { + error!("Failed to resume video upload. error getting video metadata: {error}"); + + if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir).map_err(|err| error!("Error loading project metadata: {err}")) { + meta.upload = Some(UploadMeta::Failed { error }); + meta.save_for_project().map_err(|err| error!("Error saving project metadata: {err}")).ok(); + } + }) + && let Ok(uploaded_video) = upload_video( + &app, + video_id, + file_path, + screenshot_path, + meta, + None, + ) + .await + .map_err(|error| { + error!("Error completing resumed upload for video: {error}"); + + if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir).map_err(|err| error!("Error loading project metadata: {err}")) { + meta.upload = Some(UploadMeta::Failed { error }); + meta.save_for_project().map_err(|err| error!("Error saving project metadata: {err}")).ok(); + } + }) + { + if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir).map_err(|err| error!("Error loading project metadata: {err}")) { + meta.upload = Some(UploadMeta::Complete); + meta.sharing = Some(SharingMeta { + link: uploaded_video.link.clone(), + id: uploaded_video.id.clone(), + }); + meta.save_for_project() + .map_err(|e| error!("Failed to save recording meta: {e}")) + .ok(); + } + + let _ = app + .state::>() + .write() + .await + .set_text(uploaded_video.link.clone()); + NotificationType::ShareableLinkCopied.send(&app); + } + }); + } + UploadMeta::Failed { .. } | UploadMeta::Complete => {} + } + } + } + } + } + + Ok(()) +} + async fn create_editor_instance_impl( app: &AppHandle, path: PathBuf, @@ -2492,9 +2684,15 @@ fn open_project_from_path(path: &Path, app: AppHandle) -> Result<(), String> { let meta = RecordingMeta::load_for_project(path).map_err(|v| v.to_string())?; match &meta.inner { - RecordingMetaInner::Studio(_) => { - let project_path = path.to_path_buf(); + RecordingMetaInner::Studio(meta) => { + let status = meta.status(); + if let StudioRecordingStatus::Failed { .. } = status { + return Err("Unable to open failed recording".to_string()); + } else if let StudioRecordingStatus::InProgress = status { + return Err("Recording in progress".to_string()); + } + let project_path = path.to_path_buf(); tokio::spawn(async move { ShowCapWindow::Editor { project_path }.show(&app).await }); } RecordingMetaInner::Instant(_) => { diff --git a/apps/desktop/src-tauri/src/main.rs b/apps/desktop/src-tauri/src/main.rs index c99138f74..9dcaab06d 100644 --- a/apps/desktop/src-tauri/src/main.rs +++ b/apps/desktop/src-tauri/src/main.rs @@ -4,8 +4,6 @@ use std::sync::Arc; use cap_desktop_lib::DynLoggingLayer; -use dirs; -use tracing_appender; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; fn main() { diff --git a/apps/desktop/src-tauri/src/recording.rs b/apps/desktop/src-tauri/src/recording.rs index fe53716dd..aedb3f164 100644 --- a/apps/desktop/src-tauri/src/recording.rs +++ b/apps/desktop/src-tauri/src/recording.rs @@ -1,10 +1,13 @@ use cap_fail::fail; +use cap_project::CursorMoveEvent; use cap_project::cursor::SHORT_CURSOR_SHAPE_DEBOUNCE_MS; use cap_project::{ - CursorClickEvent, CursorMoveEvent, Platform, ProjectConfiguration, RecordingMeta, - RecordingMetaInner, SharingMeta, StudioRecordingMeta, TimelineConfiguration, TimelineSegment, - ZoomMode, ZoomSegment, cursor::CursorEvents, + CursorClickEvent, InstantRecordingMeta, MultipleSegments, Platform, ProjectConfiguration, + RecordingMeta, RecordingMetaInner, SharingMeta, StudioRecordingMeta, StudioRecordingStatus, + TimelineConfiguration, TimelineSegment, UploadMeta, ZoomMode, ZoomSegment, + cursor::CursorEvents, }; +use cap_recording::PipelineDoneError; use cap_recording::{ RecordingError, RecordingMode, feeds::{camera, microphone}, @@ -17,7 +20,8 @@ use cap_recording::{ }; use cap_rendering::ProjectRecordingsMeta; use cap_utils::{ensure_dir, spawn_actor}; -use serde::Deserialize; +use futures::stream; +use serde::{Deserialize, Serialize}; use specta::Type; use std::{ collections::{HashMap, VecDeque}, @@ -34,6 +38,7 @@ use tracing::*; use crate::{ App, CurrentRecordingChanged, MutableState, NewStudioRecordingAdded, RecordingState, RecordingStopped, VideoUploadInfo, + api::PresignedS3PutRequestMethod, audio::AppSounds, auth::AuthStore, create_screenshot, @@ -42,8 +47,7 @@ use crate::{ presets::PresetsStore, thumbnails::*, upload::{ - InstantMultipartUpload, build_video_meta, create_or_get_video, prepare_screenshot_upload, - upload_video, + InstantMultipartUpload, build_video_meta, compress_image, create_or_get_video, upload_video, }, web_api::ManagerExt, windows::{CapWindowId, ShowCapWindow}, @@ -53,7 +57,7 @@ pub enum InProgressRecording { Instant { target_name: String, handle: instant_recording::ActorHandle, - progressive_upload: Option, + progressive_upload: InstantMultipartUpload, video_upload_info: VideoUploadInfo, inputs: StartRecordingInputs, recording_dir: PathBuf, @@ -156,7 +160,7 @@ pub enum CompletedRecording { Instant { recording: instant_recording::CompletedRecording, target_name: String, - progressive_upload: Option, + progressive_upload: InstantMultipartUpload, video_upload_info: VideoUploadInfo, }, Studio { @@ -291,7 +295,7 @@ pub async fn start_recording( match AuthStore::get(&app).ok().flatten() { Some(_) => { // Pre-create the video and get the shareable link - if let Ok(s3_config) = create_or_get_video( + let s3_config = create_or_get_video( &app, false, None, @@ -302,18 +306,19 @@ pub async fn start_recording( None, ) .await - { - let link = app.make_app_url(format!("/s/{}", s3_config.id())).await; - info!("Pre-created shareable link: {}", link); - - Some(VideoUploadInfo { - id: s3_config.id().to_string(), - link: link.clone(), - config: s3_config, - }) - } else { - None - } + .map_err(|err| { + error!("Error creating instant mode video: {err}"); + err + })?; + + let link = app.make_app_url(format!("/s/{}", s3_config.id)).await; + info!("Pre-created shareable link: {}", link); + + Some(VideoUploadInfo { + id: s3_config.id.to_string(), + link: link.clone(), + config: s3_config, + }) } // Allow the recording to proceed without error for any signed-in user _ => { @@ -325,6 +330,38 @@ pub async fn start_recording( RecordingMode::Studio => None, }; + let date_time = if cfg!(windows) { + // Windows doesn't support colon in file paths + chrono::Local::now().format("%Y-%m-%d %H.%M.%S") + } else { + chrono::Local::now().format("%Y-%m-%d %H:%M:%S") + }; + + let meta = RecordingMeta { + platform: Some(Platform::default()), + project_path: recording_dir.clone(), + pretty_name: format!("{target_name} {date_time}"), + inner: match inputs.mode { + RecordingMode::Studio => { + RecordingMetaInner::Studio(StudioRecordingMeta::MultipleSegments { + inner: MultipleSegments { + segments: Default::default(), + cursors: Default::default(), + status: Some(StudioRecordingStatus::InProgress), + }, + }) + } + RecordingMode::Instant => { + RecordingMetaInner::Instant(InstantRecordingMeta::InProgress { recording: true }) + } + }, + sharing: None, + upload: None, + }; + + meta.save_for_project() + .map_err(|e| format!("Failed to save recording meta: {e}"))?; + match &inputs.capture_target { ScreenCaptureTarget::Window { id: _id } => { if let Some(show) = inputs @@ -383,26 +420,16 @@ pub async fn start_recording( } let (finish_upload_tx, finish_upload_rx) = flume::bounded(1); - let progressive_upload = video_upload_info - .as_ref() - .filter(|_| matches!(inputs.mode, RecordingMode::Instant)) - .map(|video_upload_info| { - InstantMultipartUpload::spawn( - app.clone(), - id.clone(), - recording_dir.join("content/output.mp4"), - video_upload_info.clone(), - Some(finish_upload_rx), - ) - }); debug!("spawning start_recording actor"); // done in spawn to catch panics just in case + let app_handle = app.clone(); let spawn_actor_res = async { spawn_actor({ let state_mtx = Arc::clone(&state_mtx); let general_settings = general_settings.cloned(); + let recording_dir = recording_dir.clone(); async move { fail!("recording::spawn_actor"); let mut state = state_mtx.write().await; @@ -469,6 +496,14 @@ pub async fn start_recording( return Err("Video upload info not found".to_string()); }; + let progressive_upload = InstantMultipartUpload::spawn( + app_handle, + recording_dir.join("content/output.mp4"), + video_upload_info.clone(), + Some(finish_upload_rx), + recording_dir.clone(), + ); + let mut builder = instant_recording::Actor::builder( recording_dir.clone(), inputs.capture_target.clone(), @@ -514,14 +549,14 @@ pub async fn start_recording( .await; let actor_done_fut = match spawn_actor_res { - Ok(fut) => fut, - Err(e) => { - let _ = RecordingEvent::Failed { error: e.clone() }.emit(&app); + Ok(rx) => rx, + Err(err) => { + let _ = RecordingEvent::Failed { error: err.clone() }.emit(&app); let mut dialog = MessageDialogBuilder::new( app.dialog().clone(), "An error occurred".to_string(), - e.clone(), + err.clone(), ) .kind(tauri_plugin_dialog::MessageDialogKind::Error); @@ -532,9 +567,9 @@ pub async fn start_recording( dialog.blocking_show(); let mut state = state_mtx.write().await; - let _ = handle_recording_end(app, None, &mut state).await; + let _ = handle_recording_end(app, Err(err.clone()), &mut state, recording_dir).await; - return Err(e); + return Err(err); } }; @@ -574,7 +609,9 @@ pub async fn start_recording( dialog.blocking_show(); // this clears the current recording for us - handle_recording_end(app, None, &mut state).await.ok(); + handle_recording_end(app, Err(e.to_string()), &mut state, recording_dir) + .await + .ok(); } } } @@ -618,8 +655,9 @@ pub async fn stop_recording(app: AppHandle, state: MutableState<'_, App>) -> Res }; let completed_recording = current_recording.stop().await.map_err(|e| e.to_string())?; + let recording_dir = completed_recording.project_path().clone(); - handle_recording_end(app, Some(completed_recording), &mut state).await?; + handle_recording_end(app, Ok(completed_recording), &mut state, recording_dir).await?; Ok(()) } @@ -706,17 +744,42 @@ pub async fn delete_recording(app: AppHandle, state: MutableState<'_, App>) -> R // runs when a recording ends, whether from success or failure async fn handle_recording_end( handle: AppHandle, - recording: Option, + recording: Result, app: &mut App, + recording_dir: PathBuf, ) -> Result<(), String> { // Clear current recording, just in case :) app.clear_current_recording(); - let res = if let Some(recording) = recording { + let res = match recording { // we delay reporting errors here so that everything else happens first - Some(handle_recording_finish(&handle, recording).await) - } else { - None + Ok(recording) => Some(handle_recording_finish(&handle, recording).await), + Err(error) => { + if let Ok(mut project_meta) = + RecordingMeta::load_for_project(&recording_dir).map_err(|err| { + error!("Error loading recording meta while finishing recording: {err}") + }) + { + match &mut project_meta.inner { + RecordingMetaInner::Studio(meta) => { + if let StudioRecordingMeta::MultipleSegments { inner } = meta { + inner.status = Some(StudioRecordingStatus::Failed { error }); + } + } + RecordingMetaInner::Instant(meta) => { + *meta = InstantRecordingMeta::Failed { error: error }; + } + } + project_meta + .save_for_project() + .map_err(|err| { + error!("Error saving recording meta while finishing recording: {err}") + }) + .ok(); + } + + None + } }; let _ = RecordingStopped.emit(&handle); @@ -780,8 +843,6 @@ async fn handle_recording_finish( None, )); - let target_name = completed_recording.target_name().clone(); - let (meta_inner, sharing) = match completed_recording { CompletedRecording::Studio { recording, .. } => { let recordings = ProjectRecordingsMeta::new(&recording_dir, &recording.meta)?; @@ -803,7 +864,6 @@ async fn handle_recording_finish( video_upload_info, .. } => { - // shareable_link = Some(video_upload_info.link.clone()); let app = app.clone(); let output_path = recording_dir.join("content/output.mp4"); @@ -811,74 +871,78 @@ async fn handle_recording_finish( spawn_actor({ let video_upload_info = video_upload_info.clone(); + let recording_dir = recording_dir.clone(); async move { - if let Some(progressive_upload) = progressive_upload { - let video_upload_succeeded = match progressive_upload - .handle - .await - .map_err(|e| e.to_string()) - .and_then(|r| r) - { - Ok(()) => { - info!( - "Not attempting instant recording upload as progressive upload succeeded" - ); - true - } - Err(e) => { - error!("Progressive upload failed: {}", e); - false - } - }; - - let _ = screenshot_task.await; - - if video_upload_succeeded { - let resp = prepare_screenshot_upload( - &app, - &video_upload_info.config.clone(), - display_screenshot, - ) - .await; - - match resp { - Ok(r) - if r.status().as_u16() >= 200 && r.status().as_u16() < 300 => - { - info!("Screenshot uploaded successfully"); - } - Ok(r) => { - error!("Failed to upload screenshot: {}", r.status()); - } - Err(e) => { - error!("Failed to upload screenshot: {e}"); - } + let video_upload_succeeded = match progressive_upload + .handle + .await + .map_err(|e| e.to_string()) + .and_then(|r| r) + { + Ok(()) => { + info!( + "Not attempting instant recording upload as progressive upload succeeded" + ); + true + } + Err(e) => { + error!("Progressive upload failed: {}", e); + false + } + }; + + let _ = screenshot_task.await; + + if video_upload_succeeded { + if let Ok(bytes) = + compress_image(display_screenshot).await + .map_err(|err| + error!("Error compressing thumbnail for instant mode progressive upload: {err}") + ) { + crate::upload::singlepart_uploader( + app.clone(), + crate::api::PresignedS3PutRequest { + video_id: video_upload_info.id.clone(), + subpath: "screenshot/screen-capture.jpg".to_string(), + method: PresignedS3PutRequestMethod::Put, + meta: None, + }, + bytes.len() as u64, + stream::once(async move { Ok::<_, std::io::Error>(bytes::Bytes::from(bytes)) }), + + ) + .await + .map_err(|err| { + error!("Error updating thumbnail for instant mode progressive upload: {err}") + }) + .ok(); } - } else { - let meta = build_video_meta(&output_path).ok(); - // The upload_video function handles screenshot upload, so we can pass it along - match upload_video( - &app, - video_upload_info.id.clone(), - output_path, - Some(video_upload_info.config.clone()), - Some(display_screenshot.clone()), - meta, - None, - ) - .await - { - Ok(_) => { - info!( - "Final video upload with screenshot completed successfully" - ) - } - Err(e) => { - error!("Error in final upload with screenshot: {}", e) - } + } else if let Ok(meta) = build_video_meta(&output_path) + .map_err(|err| error!("Error getting video metadata: {}", err)) + { + // The upload_video function handles screenshot upload, so we can pass it along + upload_video( + &app, + video_upload_info.id.clone(), + output_path, + display_screenshot.clone(), + meta, + None, + ) + .await + .map(|_| info!("Final video upload with screenshot completed successfully")) + .map_err(|error| { + error!("Error in upload_video: {error}"); + + if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir) { + meta.upload = Some(UploadMeta::Failed { error }); + meta.save_for_project() + .map_err(|e| format!("Failed to save recording meta: {e}")) + .ok(); } - } + }) + .ok(); } } }); @@ -893,25 +957,16 @@ async fn handle_recording_finish( } }; - let date_time = if cfg!(windows) { - // Windows doesn't support colon in file paths - chrono::Local::now().format("%Y-%m-%d %H.%M.%S") - } else { - chrono::Local::now().format("%Y-%m-%d %H:%M:%S") - }; - - let meta = RecordingMeta { - platform: Some(Platform::default()), - project_path: recording_dir.clone(), - sharing, - pretty_name: format!("{target_name} {date_time}"), - inner: meta_inner, - }; - - meta.save_for_project() - .map_err(|e| format!("Failed to save recording meta: {e}"))?; + if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir).map_err(|err| { + error!("Failed to load recording meta while saving finished recording: {err}") + }) { + meta.inner = meta_inner.clone(); + meta.sharing = sharing; + meta.save_for_project() + .map_err(|e| format!("Failed to save recording meta: {e}"))?; + } - if let RecordingMetaInner::Studio(_) = meta.inner { + if let RecordingMetaInner::Studio(_) = meta_inner { match GeneralSettingsStore::get(app) .ok() .flatten() @@ -1082,11 +1137,11 @@ fn generate_zoom_segments_from_clicks_impl( let mut merged: Vec<(f64, f64)> = Vec::new(); for interval in intervals { - if let Some(last) = merged.last_mut() { - if interval.0 <= last.1 + MERGE_GAP_THRESHOLD { - last.1 = last.1.max(interval.1); - continue; - } + if let Some(last) = merged.last_mut() + && interval.0 <= last.1 + MERGE_GAP_THRESHOLD + { + last.1 = last.1.max(interval.1); + continue; } merged.push(interval); } @@ -1122,6 +1177,7 @@ pub fn generate_zoom_segments_from_clicks( pretty_name: String::new(), sharing: None, inner: RecordingMetaInner::Studio(recording.meta.clone()), + upload: None, }; generate_zoom_segments_for_project(&recording_meta, recordings) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index f6e923697..d0bb1a535 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -1,242 +1,147 @@ // credit @filleduchaos -use crate::web_api::ManagerExt; -use crate::{UploadProgress, VideoUploadInfo}; +use crate::{ + UploadProgress, VideoUploadInfo, + api::{self, PresignedS3PutRequest, PresignedS3PutRequestMethod, S3VideoMeta, UploadedPart}, + general_settings::GeneralSettingsStore, + upload_legacy, + web_api::ManagerExt, +}; +use async_stream::{stream, try_stream}; +use axum::http::Uri; +use bytes::Bytes; +use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::StreamExt; -use image::ImageReader; -use image::codecs::jpeg::JpegEncoder; +use futures::{Stream, StreamExt, TryStreamExt, stream}; +use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; -use reqwest::header::CONTENT_LENGTH; use serde::{Deserialize, Serialize}; -use serde_json::json; use specta::Type; use std::{ - path::PathBuf, - time::{Duration, Instant}, + collections::{HashMap, HashSet}, + io, + path::{Path, PathBuf}, + pin::pin, + str::FromStr, + time::Duration, }; use tauri::{AppHandle, ipc::Channel}; use tauri_plugin_clipboard_manager::ClipboardExt; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; -use tokio::task::{self, JoinHandle}; -use tokio::time::sleep; -use tracing::{debug, error, info, trace, warn}; - -#[derive(Deserialize, Serialize, Clone, Type, Debug)] -pub struct S3UploadMeta { - id: String, -} - -#[derive(Deserialize, Clone, Debug)] -pub struct CreateErrorResponse { - error: String, -} - -// fn deserialize_empty_object_as_string<'de, D>(deserializer: D) -> Result -// where -// D: Deserializer<'de>, -// { -// struct StringOrObject; - -// impl<'de> de::Visitor<'de> for StringOrObject { -// type Value = String; - -// fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { -// formatter.write_str("string or empty object") -// } - -// fn visit_str(self, value: &str) -> Result -// where -// E: de::Error, -// { -// Ok(value.to_string()) -// } - -// fn visit_string(self, value: String) -> Result -// where -// E: de::Error, -// { -// Ok(value) -// } - -// fn visit_map(self, _map: M) -> Result -// where -// M: de::MapAccess<'de>, -// { -// // Return empty string for empty objects -// Ok(String::new()) -// } -// } - -// deserializer.deserialize_any(StringOrObject) -// } - -impl S3UploadMeta { - pub fn id(&self) -> &str { - &self.id - } - - // pub fn new(id: String) -> Self { - // Self { id } - // } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct S3VideoMeta { - #[serde(rename = "durationInSecs")] - pub duration_in_secs: f64, - pub width: u32, - pub height: u32, - #[serde(skip_serializing_if = "Option::is_none")] - pub fps: Option, -} - -pub struct UploadedVideo { - pub link: String, - pub id: String, - #[allow(unused)] - pub config: S3UploadMeta, -} +use tauri_specta::Event; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt, BufReader}, + task::{self, JoinHandle}, + time, +}; +use tokio_util::io::ReaderStream; +use tracing::{debug, error, info}; -pub struct UploadedImage { +pub struct UploadedItem { pub link: String, pub id: String, + // #[allow(unused)] + // pub config: S3UploadMeta, } -// pub struct UploadedAudio { -// pub link: String, -// pub id: String, -// pub config: S3UploadMeta, -// } - -pub struct UploadProgressUpdater { - video_state: Option, - app: AppHandle, +#[derive(Clone, Serialize, Type, tauri_specta::Event)] +pub struct UploadProgressEvent { video_id: String, + uploaded: String, + total: String, } -struct VideoProgressState { - uploaded: u64, - total: u64, - pending_task: Option>, - last_update_time: Instant, -} - -impl UploadProgressUpdater { - pub fn new(app: AppHandle, video_id: String) -> Self { - Self { - video_state: None, - app, - video_id, - } - } - - pub fn update(&mut self, uploaded: u64, total: u64) { - let should_send_immediately = { - let state = self.video_state.get_or_insert_with(|| VideoProgressState { - uploaded, - total, - pending_task: None, - last_update_time: Instant::now(), - }); - - // Cancel any pending task - if let Some(handle) = state.pending_task.take() { - handle.abort(); - } - - state.uploaded = uploaded; - state.total = total; - state.last_update_time = Instant::now(); - - // Send immediately if upload is complete - uploaded >= total - }; - - let app = self.app.clone(); - if should_send_immediately { - tokio::spawn({ - let video_id = self.video_id.clone(); - async move { - Self::send_api_update(&app, video_id, uploaded, total).await; - } - }); - - // Clear state since upload is complete - self.video_state = None; - } else { - // Schedule delayed update - let handle = { - let video_id = self.video_id.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(2)).await; - Self::send_api_update(&app, video_id, uploaded, total).await; - }) - }; - - if let Some(state) = &mut self.video_state { - state.pending_task = Some(handle); - } - } - } - - async fn send_api_update(app: &AppHandle, video_id: String, uploaded: u64, total: u64) { - let response = app - .authed_api_request("/api/desktop/video/progress", |client, url| { - client - .post(url) - .header("X-Cap-Desktop-Version", env!("CARGO_PKG_VERSION")) - .json(&json!({ - "videoId": video_id, - "uploaded": uploaded, - "total": total, - "updatedAt": chrono::Utc::now().to_rfc3339() - })) - }) - .await; - - match response { - Ok(resp) if resp.status().is_success() => { - trace!("Progress update sent successfully"); - } - Ok(resp) => error!("Failed to send progress update: {}", resp.status()), - Err(err) => error!("Failed to send progress update: {err}"), - } - } -} +// a typical recommended chunk size is 5MB (AWS min part size). +const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB pub async fn upload_video( app: &AppHandle, video_id: String, file_path: PathBuf, - existing_config: Option, - screenshot_path: Option, - meta: Option, + screenshot_path: PathBuf, + meta: S3VideoMeta, channel: Option>, -) -> Result { - println!("Uploading video {video_id}..."); +) -> Result { + let is_new_uploader_enabled = GeneralSettingsStore::get(&app) + .map_err(|err| error!("Error checking status of new uploader flow from settings: {err}")) + .ok() + .and_then(|v| v.map(|v| v.enable_new_uploader)) + .unwrap_or(false); + info!("uploader_video: is new uploader enabled? {is_new_uploader_enabled}"); + if !is_new_uploader_enabled { + return upload_legacy::upload_video( + app, + video_id, + file_path, + None, + Some(screenshot_path), + Some(meta), + channel, + ) + .await + .map(|v| UploadedItem { + link: v.link, + id: v.id, + }); + } + + info!("Uploading video {video_id}..."); + + let (stream, total_size) = file_reader_stream(file_path).await?; + let stream = progress( + app.clone(), + video_id.clone(), + stream.map(move |v| v.map(move |v| (total_size, v))), + ); - let client = reqwest::Client::new(); - let s3_config = match existing_config { - Some(config) => config, - None => create_or_get_video(app, false, Some(video_id.clone()), None, meta).await?, + let stream = if let Some(channel) = channel { + tauri_progress(channel, stream).boxed() + } else { + stream.boxed() }; - let presigned_put = presigned_s3_put( - app, + let video_fut = singlepart_uploader( + app.clone(), PresignedS3PutRequest { video_id: video_id.clone(), subpath: "result.mp4".to_string(), method: PresignedS3PutRequestMethod::Put, - meta: Some(build_video_meta(&file_path)?), + meta: Some(meta), }, - ) - .await?; + total_size, + stream.and_then(|(_, c)| async move { Ok(c) }), + ); - let file = tokio::fs::File::open(&file_path) + // TODO: We don't report progress on image upload + let bytes = compress_image(screenshot_path).await?; + let thumbnail_fut = singlepart_uploader( + app.clone(), + PresignedS3PutRequest { + video_id: video_id.clone(), + subpath: "screenshot/screen-capture.jpg".to_string(), + method: PresignedS3PutRequestMethod::Put, + meta: None, + }, + bytes.len() as u64, + stream::once(async move { Ok::<_, std::io::Error>(bytes::Bytes::from(bytes)) }), + ); + + let (video_result, thumbnail_result): (Result<_, String>, Result<_, String>) = + tokio::join!(video_fut, thumbnail_fut); + + let _ = (video_result?, thumbnail_result?); + + Ok(UploadedItem { + link: app.make_app_url(format!("/s/{video_id}")).await, + id: video_id, + }) +} + +/// Open a file and construct a stream to it. +async fn file_reader_stream(path: impl AsRef) -> Result<(ReaderStream, u64), String> { + let file = File::open(path) .await .map_err(|e| format!("Failed to open file: {e}"))?; @@ -245,146 +150,51 @@ pub async fn upload_video( .await .map_err(|e| format!("Failed to get file metadata: {e}"))?; - let total_size = metadata.len(); - - let reader_stream = tokio_util::io::ReaderStream::new(file); - - let mut bytes_uploaded = 0u64; - let mut progress = UploadProgressUpdater::new(app.clone(), video_id); - - let progress_stream = reader_stream.inspect(move |chunk| { - if let Ok(chunk) = chunk { - bytes_uploaded += chunk.len() as u64; - } - - if bytes_uploaded > 0 { - if let Some(channel) = &channel { - channel - .send(UploadProgress { - progress: bytes_uploaded as f64 / total_size as f64, - }) - .ok(); - } - - progress.update(bytes_uploaded, total_size); - } - }); - - let screenshot_upload = match screenshot_path { - Some(screenshot_path) if screenshot_path.exists() => { - Some(prepare_screenshot_upload(app, &s3_config, screenshot_path)) - } - _ => None, - }; - - let video_upload = client - .put(presigned_put) - .body(reqwest::Body::wrap_stream(progress_stream)) - .header(CONTENT_LENGTH, metadata.len()); - - let (video_upload, screenshot_result): ( - Result, - Option>, - ) = tokio::join!(video_upload.send(), async { - if let Some(screenshot_req) = screenshot_upload { - Some(screenshot_req.await) - } else { - None - } - }); - - let response = video_upload.map_err(|e| format!("Failed to send upload file request: {e}"))?; - - if response.status().is_success() { - println!("Video uploaded successfully"); - - if let Some(Ok(screenshot_response)) = screenshot_result { - if screenshot_response.status().is_success() { - println!("Screenshot uploaded successfully"); - } else { - println!( - "Failed to upload screenshot: {}", - screenshot_response.status() - ); - } - } + Ok((ReaderStream::new(file), metadata.len())) +} - return Ok(UploadedVideo { - link: app.make_app_url(format!("/s/{}", &s3_config.id)).await, - id: s3_config.id.clone(), - config: s3_config, - }); +pub async fn upload_image(app: &AppHandle, file_path: PathBuf) -> Result { + let is_new_uploader_enabled = GeneralSettingsStore::get(app) + .map_err(|err| error!("Error checking status of new uploader flow from settings: {err}")) + .ok() + .and_then(|v| v.map(|v| v.enable_new_uploader)) + .unwrap_or(false); + info!("upload_image: is new uploader enabled? {is_new_uploader_enabled}"); + if !is_new_uploader_enabled { + return upload_legacy::upload_image(app, file_path) + .await + .map(|v| UploadedItem { + link: v.link, + id: v.id, + }); } - let status = response.status(); - let error_body = response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - tracing::error!( - "Failed to upload file. Status: {}. Body: {}", - status, - error_body - ); - Err(format!( - "Failed to upload file. Status: {status}. Body: {error_body}" - )) -} - -pub async fn upload_image(app: &AppHandle, file_path: PathBuf) -> Result { let file_name = file_path .file_name() .and_then(|name| name.to_str()) .ok_or("Invalid file path")? .to_string(); - let client = reqwest::Client::new(); let s3_config = create_or_get_video(app, true, None, None, None).await?; - let presigned_put = presigned_s3_put( - app, + let (stream, total_size) = file_reader_stream(file_path).await?; + singlepart_uploader( + app.clone(), PresignedS3PutRequest { video_id: s3_config.id.clone(), subpath: file_name, method: PresignedS3PutRequestMethod::Put, meta: None, }, + total_size, + stream, ) .await?; - let file_content = tokio::fs::read(&file_path) - .await - .map_err(|e| format!("Failed to read file: {e}"))?; - - let response = client - .put(presigned_put) - .header(CONTENT_LENGTH, file_content.len()) - .body(file_content) - .send() - .await - .map_err(|e| format!("Failed to send upload file request: {e}"))?; - - if response.status().is_success() { - println!("File uploaded successfully"); - return Ok(UploadedImage { - link: app.make_app_url(format!("/s/{}", &s3_config.id)).await, - id: s3_config.id, - }); - } - - let status = response.status(); - let error_body = response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - tracing::error!( - "Failed to upload file. Status: {}. Body: {}", - status, - error_body - ); - Err(format!( - "Failed to upload file. Status: {status}. Body: {error_body}" - )) + Ok(UploadedItem { + link: app.make_app_url(format!("/s/{}", &s3_config.id)).await, + id: s3_config.id, + }) } pub async fn create_or_get_video( @@ -425,6 +235,11 @@ pub async fn create_or_get_video( } if response.status() != StatusCode::OK { + #[derive(Deserialize, Clone, Debug)] + pub struct CreateErrorResponse { + error: String, + } + if let Ok(error) = response.json::().await { if error.error == "upgrade_required" { return Err( @@ -451,55 +266,6 @@ pub async fn create_or_get_video( Ok(config) } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct PresignedS3PutRequest { - video_id: String, - subpath: String, - method: PresignedS3PutRequestMethod, - #[serde(flatten)] - meta: Option, -} - -#[derive(Serialize)] -#[serde(rename_all = "lowercase")] -pub enum PresignedS3PutRequestMethod { - #[allow(unused)] - Post, - Put, -} - -async fn presigned_s3_put(app: &AppHandle, body: PresignedS3PutRequest) -> Result { - #[derive(Deserialize, Debug)] - struct Data { - url: String, - } - - #[derive(Deserialize, Debug)] - #[serde(rename_all = "camelCase")] - struct Wrapper { - presigned_put_data: Data, - } - - let response = app - .authed_api_request("/api/upload/signed", |client, url| { - client.post(url).json(&body) - }) - .await - .map_err(|e| format!("Failed to send request to Next.js handler: {e}"))?; - - if response.status() == StatusCode::UNAUTHORIZED { - return Err("Failed to authenticate request; please log in again".into()); - } - - let Wrapper { presigned_put_data } = response - .json::() - .await - .map_err(|e| format!("Failed to deserialize server response: {e}"))?; - - Ok(presigned_put_data.url) -} - pub fn build_video_meta(path: &PathBuf) -> Result { let input = ffmpeg::format::input(path).map_err(|e| format!("Failed to read input file: {e}"))?; @@ -525,79 +291,26 @@ pub fn build_video_meta(path: &PathBuf) -> Result { }) } -// fn build_audio_upload_body( -// path: &PathBuf, -// base: S3UploadBody, -// ) -> Result { -// let input = -// ffmpeg::format::input(path).map_err(|e| format!("Failed to read input file: {e}"))?; -// let stream = input -// .streams() -// .best(ffmpeg::media::Type::Audio) -// .ok_or_else(|| "Failed to find appropriate audio stream in file".to_string())?; - -// let duration_millis = input.duration() as f64 / 1000.; - -// let codec = ffmpeg::codec::context::Context::from_parameters(stream.parameters()) -// .map_err(|e| format!("Unable to read audio codec information: {e}"))?; -// let codec_name = codec.id(); - -// let is_mp3 = path.extension().is_some_and(|ext| ext == "mp3"); - -// Ok(S3AudioUploadBody { -// base, -// duration: duration_millis.to_string(), -// audio_codec: format!("{codec_name:?}").replace("Id::", "").to_lowercase(), -// is_mp3, -// }) -// } - -pub async fn prepare_screenshot_upload( - app: &AppHandle, - s3_config: &S3UploadMeta, - screenshot_path: PathBuf, -) -> Result { - let presigned_put = presigned_s3_put( - app, - PresignedS3PutRequest { - video_id: s3_config.id.clone(), - subpath: "screenshot/screen-capture.jpg".to_string(), - method: PresignedS3PutRequestMethod::Put, - meta: None, - }, - ) - .await?; - - let compressed_image = compress_image(screenshot_path).await?; - - reqwest::Client::new() - .put(presigned_put) - .header(CONTENT_LENGTH, compressed_image.len()) - .body(compressed_image) - .send() - .await - .map_err(|e| format!("Error uploading screenshot: {e}")) -} - -async fn compress_image(path: PathBuf) -> Result, String> { +pub async fn compress_image(path: PathBuf) -> Result, String> { task::spawn_blocking(move || { let img = ImageReader::open(&path) .map_err(|e| format!("Failed to open image: {e}"))? .decode() .map_err(|e| format!("Failed to decode image: {e}"))?; - let new_width = img.width() / 2; - let new_height = img.height() / 2; - - let resized_img = img.resize(new_width, new_height, image::imageops::FilterType::Nearest); + let resized_img = img.resize( + img.width() / 2, + img.height() / 2, + image::imageops::FilterType::Nearest, + ); let mut buffer = Vec::new(); let mut encoder = JpegEncoder::new_with_quality(&mut buffer, 30); encoder .encode( resized_img.as_bytes(), - new_width, - new_height, + resized_img.width(), + resized_img.height(), resized_img.color().into(), ) .map_err(|e| format!("Failed to compress image: {e}"))?; @@ -608,20 +321,6 @@ async fn compress_image(path: PathBuf) -> Result, String> { .map_err(|e| format!("Failed to compress image: {e}"))? } -// a typical recommended chunk size is 5MB (AWS min part size). -const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB -// const MIN_PART_SIZE: u64 = 5 * 1024 * 1024; // For non-final parts - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct MultipartCompleteResponse<'a> { - video_id: &'a str, - upload_id: &'a str, - parts: &'a [UploadedPart], - #[serde(flatten)] - meta: Option, -} - pub struct InstantMultipartUpload { pub handle: tokio::task::JoinHandle>, } @@ -631,550 +330,519 @@ impl InstantMultipartUpload { /// and the file has stabilized (no additional data is being written). pub fn spawn( app: AppHandle, - video_id: String, file_path: PathBuf, pre_created_video: VideoUploadInfo, realtime_upload_done: Option>, + recording_dir: PathBuf, ) -> Self { Self { handle: spawn_actor(Self::run( app, - video_id, file_path, pre_created_video, realtime_upload_done, + recording_dir, )), } } pub async fn run( app: AppHandle, - video_id: String, file_path: PathBuf, pre_created_video: VideoUploadInfo, realtime_video_done: Option>, + recording_dir: PathBuf, ) -> Result<(), String> { - use std::time::Duration; + let is_new_uploader_enabled = GeneralSettingsStore::get(&app) + .map_err(|err| { + error!("Error checking status of new uploader flow from settings: {err}") + }) + .ok() + .and_then(|v| v.map(|v| v.enable_new_uploader)) + .unwrap_or(false); + info!("InstantMultipartUpload::run: is new uploader enabled? {is_new_uploader_enabled}"); + if !is_new_uploader_enabled { + return upload_legacy::InstantMultipartUpload::run( + app, + pre_created_video.id.clone(), + file_path, + pre_created_video, + realtime_video_done, + ) + .await; + } - use tokio::time::sleep; + let video_id = pre_created_video.id.clone(); + debug!("Initiating multipart upload for {video_id}..."); - // -------------------------------------------- - // basic constants and info for chunk approach - // -------------------------------------------- - let client = reqwest::Client::new(); - let s3_config = pre_created_video.config; + let mut project_meta = RecordingMeta::load_for_project(&recording_dir).map_err(|err| { + format!("Error reading project meta from {recording_dir:?} for upload init: {err}") + })?; + project_meta.upload = Some(UploadMeta::MultipartUpload { + video_id: video_id.clone(), + file_path: file_path.clone(), + pre_created_video: pre_created_video.clone(), + recording_dir: recording_dir.clone(), + }); + project_meta + .save_for_project() + .map_err(|e| error!("Failed to save recording meta: {e}")) + .ok(); - let mut uploaded_parts = Vec::new(); - let mut part_number = 1; - let mut last_uploaded_position: u64 = 0; - let mut progress = UploadProgressUpdater::new(app.clone(), pre_created_video.id.clone()); + let upload_id = api::upload_multipart_initiate(&app, &video_id).await?; + + let mut parts = progress( + app.clone(), + video_id.clone(), + multipart_uploader( + app.clone(), + video_id.clone(), + upload_id.clone(), + from_pending_file_to_chunks(file_path.clone(), realtime_video_done), + ), + ) + .try_collect::>() + .await?; + + // Deduplicate parts - keep the last occurrence of each part number + let mut deduplicated_parts = HashMap::new(); + for part in parts { + deduplicated_parts.insert(part.part_number, part); + } + parts = deduplicated_parts.into_values().collect::>(); + parts.sort_by_key(|part| part.part_number); - // -------------------------------------------- - // initiate the multipart upload - // -------------------------------------------- - debug!("Initiating multipart upload for {video_id}..."); - let initiate_response = match app - .authed_api_request("/api/upload/multipart/initiate", |c, url| { - c.post(url) - .header("Content-Type", "application/json") - .json(&serde_json::json!({ - "videoId": s3_config.id(), - "contentType": "video/mp4" - })) - }) - .await - { - Ok(r) => r, - Err(e) => { - return Err(format!("Failed to initiate multipart upload: {e}")); - } - }; + let metadata = build_video_meta(&file_path) + .map_err(|e| error!("Failed to get video metadata: {e}")) + .ok(); - if !initiate_response.status().is_success() { - let status = initiate_response.status(); - let error_body = initiate_response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(format!( - "Failed to initiate multipart upload. Status: {status}. Body: {error_body}" - )); - } + api::upload_multipart_complete(&app, &video_id, &upload_id, &parts, metadata).await?; + info!("Multipart upload complete for {video_id}."); - let initiate_data = match initiate_response.json::().await { - Ok(d) => d, - Err(e) => { - return Err(format!("Failed to parse initiate response: {e}")); - } - }; + let mut project_meta = RecordingMeta::load_for_project(&recording_dir).map_err(|err| { + format!("Error reading project meta from {recording_dir:?} for upload complete: {err}") + })?; + project_meta.upload = Some(UploadMeta::Complete); + project_meta + .save_for_project() + .map_err(|err| format!("Error reading project meta from {recording_dir:?}: {err}"))?; - let upload_id = match initiate_data.get("uploadId") { - Some(val) => val.as_str().unwrap_or("").to_string(), - None => { - return Err("No uploadId returned from initiate endpoint".to_string()); - } - }; + let _ = app.clipboard().write_text(pre_created_video.link.clone()); - if upload_id.is_empty() { - return Err("Empty uploadId returned from initiate endpoint".to_string()); - } + Ok(()) + } +} - println!("Multipart upload initiated with ID: {upload_id}"); +pub struct Chunk { + /// The total size of the file to be uploaded. + /// This can change as the recording grows. + total_size: u64, + /// The part number. `FILE_OFFSET = PART_NUMBER * CHUNK_SIZE`. + part_number: u32, + /// Actual data bytes of this chunk + chunk: Bytes, +} - let mut realtime_is_done = realtime_video_done.as_ref().map(|_| false); +/// Creates a stream that reads chunks from a file, yielding [Chunk]'s. +#[allow(unused)] +pub fn from_file_to_chunks(path: PathBuf) -> impl Stream> { + try_stream! { + let file = File::open(path).await?; + let total_size = file.metadata().await?.len(); + let mut file = BufReader::new(file); - // -------------------------------------------- - // Main loop while upload not complete: - // - If we have >= CHUNK_SIZE new data, upload. - // - If recording hasn't stopped, keep waiting. - // - If recording stopped, do leftover final(s). - // -------------------------------------------- + let mut buf = vec![0u8; CHUNK_SIZE as usize]; + let mut part_number = 0; loop { - if !realtime_is_done.unwrap_or(true) - && let Some(realtime_video_done) = &realtime_video_done - { - match realtime_video_done.try_recv() { - Ok(_) => { - realtime_is_done = Some(true); - } - Err(flume::TryRecvError::Empty) => {} - _ => { - warn!("cancelling upload as realtime generation failed"); - return Err("cancelling upload as realtime generation failed".to_string()); + part_number += 1; + let n = file.read(&mut buf).await?; + if n == 0 { break; } + yield Chunk { + total_size, + part_number, + chunk: Bytes::copy_from_slice(&buf[..n]), + }; + } + } +} + +/// Creates a stream that reads chunks from a potentially growing file, yielding [Chunk]'s. +/// The first chunk of the file is yielded last to allow for header rewriting after recording completion. +/// This uploader will continually poll the filesystem and wait for the file to stop uploading before flushing the rest. +pub fn from_pending_file_to_chunks( + path: PathBuf, + realtime_upload_done: Option>, +) -> impl Stream> { + try_stream! { + let mut file = tokio::fs::File::open(&path).await?; + let mut part_number = 1; + let mut last_read_position: u64 = 0; + let mut realtime_is_done = realtime_upload_done.as_ref().map(|_| false); + let mut first_chunk_size: Option = None; + let mut chunk_buffer = vec![0u8; CHUNK_SIZE as usize]; + + loop { + // Check if realtime recording is done + if !realtime_is_done.unwrap_or(true) { + if let Some(ref realtime_receiver) = realtime_upload_done { + match realtime_receiver.try_recv() { + Ok(_) => realtime_is_done = Some(true), + Err(flume::TryRecvError::Empty) => {}, + Err(_) => yield Err(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Realtime generation failed" + ))?, } } } - // Check the file's current size - if !file_path.exists() { - println!("File no longer exists, aborting upload"); - return Err("File no longer exists".to_string()); - } - - let file_size = match tokio::fs::metadata(&file_path).await { - Ok(md) => md.len(), - Err(e) => { - println!("Failed to get file metadata: {e}"); - sleep(Duration::from_millis(500)).await; + let file_size = match file.metadata().await { + Ok(metadata) => metadata.len(), + Err(_) => { + // File might be temporarily locked, retry with shorter delay + tokio::time::sleep(Duration::from_millis(100)).await; continue; } }; - let new_data_size = file_size - last_uploaded_position; - - if ((new_data_size >= CHUNK_SIZE) - || new_data_size > 0 && realtime_is_done.unwrap_or(false)) - || (realtime_is_done.is_none() && new_data_size > 0) - { - // We have a full chunk to send - match Self::upload_chunk( - &app, - &client, - &file_path, - s3_config.id(), - &upload_id, - &mut part_number, - &mut last_uploaded_position, - new_data_size.min(CHUNK_SIZE), - &mut progress, - ) - .await - { - Ok(part) => { - uploaded_parts.push(part); + let new_data_size = file_size.saturating_sub(last_read_position); + + // Determine if we should read a chunk + let should_read_chunk = if let Some(is_done) = realtime_is_done { + (new_data_size >= CHUNK_SIZE) || (is_done && new_data_size > 0) + } else { + new_data_size > 0 + }; + + if should_read_chunk { + let chunk_size = std::cmp::min(new_data_size, CHUNK_SIZE) as usize; + + file.seek(std::io::SeekFrom::Start(last_read_position)).await?; + + let mut total_read = 0; + while total_read < chunk_size { + match file.read(&mut chunk_buffer[total_read..chunk_size]).await { + Ok(0) => break, // EOF + Ok(n) => total_read += n, + Err(e) => yield Err(e)?, } - Err(e) => { - println!( - "Error uploading chunk (part {part_number}): {e}. Retrying in 1s..." - ); - sleep(Duration::from_secs(1)).await; + } + + if total_read > 0 { + // Remember first chunk size for later re-emission with updated header + if last_read_position == 0 { + first_chunk_size = Some(total_read as u64); } + + yield Chunk { + total_size: file_size, + part_number, + chunk: Bytes::copy_from_slice(&chunk_buffer[..total_read]), + }; + part_number += 1; + last_read_position += total_read as u64; } } else if new_data_size == 0 && realtime_is_done.unwrap_or(true) { - if realtime_is_done.unwrap_or(false) { - info!("realtime video done, uploading header chunk"); - - let part = Self::upload_chunk( - &app, - &client, - &file_path, - s3_config.id(), - &upload_id, - &mut 1, - &mut 0, - uploaded_parts[0].size as u64, - &mut progress, - ) - .await - .map_err(|err| format!("Failed to re-upload first chunk: {err}"))?; - - uploaded_parts[0] = part; - println!("Successfully re-uploaded first chunk",); - } - - // All leftover chunks are now uploaded. We finalize. - println!( - "Completing multipart upload with {} parts", - uploaded_parts.len() - ); - Self::finalize_upload( - &app, - &file_path, - s3_config.id(), - &upload_id, - &uploaded_parts, - ) - .await?; + // Recording is done and no new data - re-emit first chunk with corrected MP4 header + if let Some(first_size) = first_chunk_size { + file.seek(std::io::SeekFrom::Start(0)).await?; + + let chunk_size = first_size as usize; + let mut total_read = 0; + + while total_read < chunk_size { + match file.read(&mut chunk_buffer[total_read..chunk_size]).await { + Ok(0) => break, + Ok(n) => total_read += n, + Err(e) => yield Err(e)?, + } + } + if total_read > 0 { + yield Chunk { + total_size: file_size, + part_number: 1, + chunk: Bytes::copy_from_slice(&chunk_buffer[..total_read]), + }; + } + } break; } else { - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } } - - // Copy link to clipboard early - let _ = app.clipboard().write_text(pre_created_video.link.clone()); - - Ok(()) } +} - /// Upload a single chunk from the file at `last_uploaded_position` for `chunk_size` bytes. - /// Advances `last_uploaded_position` accordingly. Returns JSON { PartNumber, ETag, Size }. - #[allow(clippy::too_many_arguments)] - async fn upload_chunk( - app: &AppHandle, - client: &reqwest::Client, - file_path: &PathBuf, - video_id: &str, - upload_id: &str, - part_number: &mut i32, - last_uploaded_position: &mut u64, - chunk_size: u64, - progress: &mut UploadProgressUpdater, - ) -> Result { - let file_size = match tokio::fs::metadata(file_path).await { - Ok(metadata) => metadata.len(), - Err(e) => return Err(format!("Failed to get file metadata: {e}")), - }; - - // Check if we're at the end of the file - if *last_uploaded_position >= file_size { - return Err("No more data to read - already at end of file".to_string()); - } - - // Calculate how much we can actually read - let remaining = file_size - *last_uploaded_position; - let bytes_to_read = std::cmp::min(chunk_size, remaining); - - let mut file = tokio::fs::File::open(file_path) - .await - .map_err(|e| format!("Failed to open file: {e}"))?; +fn retryable_client(host: String) -> reqwest::ClientBuilder { + reqwest::Client::builder().retry( + reqwest::retry::for_host(host) + .classify_fn(|req_rep| { + match req_rep.status() { + // Server errors + Some(s) if s.is_server_error() || s == StatusCode::TOO_MANY_REQUESTS => { + req_rep.retryable() + } + // Network errors + None => req_rep.retryable(), + _ => req_rep.success(), + } + }) + .max_retries_per_request(5) + .max_extra_load(5.0), + ) +} - // Log before seeking - println!( - "Seeking to offset {} for part {} (file size: {}, remaining: {})", - *last_uploaded_position, *part_number, file_size, remaining - ); +/// Takes an incoming stream of bytes and individually uploads them to S3. +/// +/// Note: It's on the caller to ensure the chunks are sized correctly within S3 limits. +fn multipart_uploader( + app: AppHandle, + video_id: String, + upload_id: String, + stream: impl Stream>, +) -> impl Stream> { + debug!("Initializing multipart uploader for video {video_id:?}"); + + try_stream! { + let mut stream = pin!(stream); + let mut prev_part_number = None; + while let Some(item) = stream.next().await { + let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + debug!("Uploading chunk {part_number} for video {video_id:?}"); + prev_part_number = Some(part_number); + let md5_sum = base64::encode(md5::compute(&chunk).0); + let size = chunk.len(); + + let presigned_url = + api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, &md5_sum) + .await?; + + let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; + let resp = retryable_client(url.host().unwrap_or("").to_string()) + .build() + .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? + .put(&presigned_url) + .header("Content-MD5", &md5_sum) + .header("Content-Length", chunk.len()) + .timeout(Duration::from_secs(120)) + .body(chunk) + .send() + .await + .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; - // Seek to the position we left off - if let Err(e) = file - .seek(std::io::SeekFrom::Start(*last_uploaded_position)) - .await - { - return Err(format!("Failed to seek in file: {e}")); - } + let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string()); - // Read exactly bytes_to_read - let mut chunk = vec![0u8; bytes_to_read as usize]; - let mut total_read = 0; + match !resp.status().is_success() { + true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())), + false => Ok(()), + }?; - while total_read < bytes_to_read as usize { - match file.read(&mut chunk[total_read..]).await { - Ok(0) => break, // EOF - Ok(n) => { - total_read += n; - println!("Read {n} bytes, total so far: {total_read}/{bytes_to_read}"); - } - Err(e) => return Err(format!("Failed to read chunk from file: {e}")), - } + yield UploadedPart { + etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, + part_number, + size, + total_size + }; } + } +} - if total_read == 0 { - return Err("No data to upload for this part.".to_string()); - } +/// Takes an incoming stream of bytes and streams them to an S3 object. +pub async fn singlepart_uploader( + app: AppHandle, + request: PresignedS3PutRequest, + total_size: u64, + stream: impl Stream> + Send + 'static, +) -> Result<(), String> { + let presigned_url = api::upload_signed(&app, request).await?; + + let url = Uri::from_str(&presigned_url) + .map_err(|err| format!("singlepart_uploader/invalid_url: {err:?}"))?; + let resp = retryable_client(url.host().unwrap_or("").to_string()) + .build() + .map_err(|err| format!("singlepart_uploader/client: {err:?}"))? + .put(&presigned_url) + .header("Content-Length", total_size) + .timeout(Duration::from_secs(120)) + .body(reqwest::Body::wrap_stream(stream)) + .send() + .await + .map_err(|err| format!("singlepart_uploader/error: {err:?}"))?; - // Truncate the buffer to the actual bytes read - chunk.truncate(total_read); + match !resp.status().is_success() { + true => Err(format!( + "singlepart_uploader/error: {}", + resp.text().await.unwrap_or_default() + )), + false => Ok(()), + }?; - // Basic content‑MD5 for data integrity - let md5_sum = { - let digest = md5::compute(&chunk); - base64::encode(digest.0) - }; + Ok(()) +} - // Verify file position to ensure we're not experiencing file handle issues - let pos_after_read = file - .seek(std::io::SeekFrom::Current(0)) - .await - .map_err(|e| format!("Failed to get current file position: {e}"))?; +pub trait UploadedChunk { + /// total size of the file + fn total(&self) -> u64; - let expected_pos = *last_uploaded_position + total_read as u64; - if pos_after_read != expected_pos { - println!( - "WARNING: File position after read ({pos_after_read}) doesn't match expected position ({expected_pos})" - ); - } + /// size of the current chunk + fn size(&self) -> u64; +} - let file_size = tokio::fs::metadata(file_path) - .await - .map(|m| m.len()) - .unwrap_or(0); - let remaining = file_size - *last_uploaded_position; +impl UploadedChunk for UploadedPart { + fn total(&self) -> u64 { + self.total_size + } - println!( - "File size: {}, Last uploaded: {}, Remaining: {}, chunk_size: {}, part: {}", - file_size, *last_uploaded_position, remaining, chunk_size, *part_number - ); - println!( - "Uploading part {} ({} bytes), MD5: {}", - *part_number, total_read, md5_sum - ); + fn size(&self) -> u64 { + self.size as u64 + } +} - // Request presigned URL for this part - let presign_response = match app - .authed_api_request("/api/upload/multipart/presign-part", |c, url| { - c.post(url) - .header("Content-Type", "application/json") - .json(&serde_json::json!({ - "videoId": video_id, - "uploadId": upload_id, - "partNumber": *part_number, - "md5Sum": &md5_sum - })) - }) - .await - { - Ok(r) => r, - Err(e) => { - return Err(format!( - "Failed to request presigned URL for part {}: {}", - *part_number, e - )); - } - }; +impl UploadedChunk for Chunk { + fn total(&self) -> u64 { + self.total_size + } - progress.update(expected_pos, file_size); + fn size(&self) -> u64 { + self.chunk.len() as u64 + } +} - if !presign_response.status().is_success() { - let status = presign_response.status(); - let error_body = presign_response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(format!( - "Presign-part failed for part {}: status={}, body={}", - *part_number, status, error_body - )); - } +impl UploadedChunk for (u64, Bytes) { + fn total(&self) -> u64 { + self.0 + } - let presign_data = match presign_response.json::().await { - Ok(d) => d, - Err(e) => return Err(format!("Failed to parse presigned URL response: {e}")), - }; + fn size(&self) -> u64 { + self.1.len() as u64 + } +} - let presigned_url = presign_data - .get("presignedUrl") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); +/// Monitor the stream to report the upload progress +fn progress( + app: AppHandle, + video_id: String, + stream: impl Stream>, +) -> impl Stream> { + let mut uploaded = 0u64; + let mut pending_task: Option> = None; + let mut reemit_task: Option> = None; + let (video_id2, app_handle) = (video_id.clone(), app.clone()); + + stream! { + let mut stream = pin!(stream); + + while let Some(chunk) = stream.next().await { + if let Ok(chunk) = &chunk { + uploaded += chunk.size(); + let total = chunk.total(); + + // Cancel any pending task + if let Some(handle) = pending_task.take() { + handle.abort(); + } - if presigned_url.is_empty() { - return Err(format!("Empty presignedUrl for part {}", *part_number)); - } + // Cancel any existing reemit task + if let Some(handle) = reemit_task.take() { + handle.abort(); + } - // Upload the chunk with retry - let mut retry_count = 0; - let max_retries = 3; - let mut etag: Option = None; - - while retry_count < max_retries && etag.is_none() { - println!( - "Sending part {} (attempt {}/{}): {} bytes", - *part_number, - retry_count + 1, - max_retries, - total_read - ); - - match client - .put(&presigned_url) - .header("Content-MD5", &md5_sum) - .timeout(Duration::from_secs(120)) - .body(chunk.clone()) - .send() - .await - { - Ok(upload_response) => { - if upload_response.status().is_success() { - if let Some(etag_val) = upload_response.headers().get("ETag") { - let e = etag_val - .to_str() - .unwrap_or("") - .trim_matches('"') - .to_string(); - println!("Received ETag {} for part {}", e, *part_number); - etag = Some(e); - } else { - println!("No ETag in response for part {}", *part_number); - retry_count += 1; - sleep(Duration::from_secs(2)).await; + let should_send_immediately = uploaded >= total; + + if should_send_immediately { + // Send immediately if upload is complete + let app_clone = app.clone(); + let video_id_clone = video_id.clone(); + tokio::spawn(async move { + api::desktop_video_progress(&app_clone, &video_id_clone, uploaded, total).await.ok(); + }); + } else { + // Schedule delayed update + let app_clone = app.clone(); + let video_id_clone = video_id.clone(); + pending_task = Some(tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(2)).await; + api::desktop_video_progress(&app_clone, &video_id_clone, uploaded, total).await.ok(); + })); + + // Start reemit task for continuous progress updates every 700ms + let app_reemit = app.clone(); + let video_id_reemit = video_id.clone(); + let uploaded_reemit = uploaded; + let total_reemit = total; + reemit_task = Some(tokio::spawn(async move { + let mut interval = time::interval(Duration::from_millis(700)); + interval.tick().await; // Skip first immediate tick + + loop { + interval.tick().await; + UploadProgressEvent { + video_id: video_id_reemit.clone(), + uploaded: uploaded_reemit.to_string(), + total: total_reemit.to_string(), + } + .emit(&app_reemit) + .ok(); } - } else { - println!( - "Failed part {} (status {}). Will retry if possible.", - *part_number, - upload_response.status() - ); - if let Ok(body) = upload_response.text().await { - println!("Error response: {body}"); - } - retry_count += 1; - sleep(Duration::from_secs(2)).await; - } - } - Err(e) => { - println!( - "Part {} upload error (attempt {}/{}): {}", - *part_number, - retry_count + 1, - max_retries, - e - ); - retry_count += 1; - sleep(Duration::from_secs(2)).await; + })); } - } - } - let etag = match etag { - Some(e) => e, - None => { - return Err(format!( - "Failed to upload part {} after {} attempts", - *part_number, max_retries - )); + // Emit progress event for the app frontend + UploadProgressEvent { + video_id: video_id.clone(), + uploaded: uploaded.to_string(), + total: total.to_string(), + } + .emit(&app) + .ok(); } - }; - - // Advance the global progress - *last_uploaded_position += total_read as u64; - println!( - "After upload: new last_uploaded_position is {} ({}% of file)", - *last_uploaded_position, - (*last_uploaded_position as f64 / file_size as f64 * 100.0) as u32 - ); - let part = UploadedPart { - part_number: *part_number, - etag, - size: total_read, - }; - *part_number += 1; - Ok(part) - } - - /// Completes the multipart upload with the stored parts. - /// Logs a final location if the complete call is successful. - async fn finalize_upload( - app: &AppHandle, - file_path: &PathBuf, - video_id: &str, - upload_id: &str, - uploaded_parts: &[UploadedPart], - ) -> Result<(), String> { - println!( - "Completing multipart upload with {} parts", - uploaded_parts.len() - ); - - if uploaded_parts.is_empty() { - return Err("No parts uploaded before finalizing.".to_string()); + yield chunk; } - let mut total_bytes_in_parts = 0; - for part in uploaded_parts { - let pn = part.part_number; - let size = part.size; - let etag = &part.etag; - total_bytes_in_parts += part.size; - println!("Part {pn}: {size} bytes (ETag: {etag})"); + // Clean up reemit task when stream ends + if let Some(handle) = reemit_task.take() { + handle.abort(); } + } + .map(Some) + .chain(stream::once(async move { + // This will trigger the frontend to remove the event from the SolidJS store. + UploadProgressEvent { + video_id: video_id2, + uploaded: "0".into(), + total: "0".into(), + } + .emit(&app_handle) + .ok(); - let file_final_size = tokio::fs::metadata(file_path) - .await - .map(|md| md.len()) - .unwrap_or(0); + None + })) + .filter_map(|item| async move { item }) +} - println!("Sum of all parts: {total_bytes_in_parts} bytes"); - println!("File size on disk: {file_final_size} bytes"); - println!("Proceeding with multipart upload completion..."); +/// Track the upload progress into a Tauri channel +fn tauri_progress( + channel: Channel, + stream: impl Stream>, +) -> impl Stream> { + let mut uploaded = 0u64; - let metadata = build_video_meta(file_path) - .map_err(|e| error!("Failed to get video metadata: {e}")) - .ok(); + stream! { + let mut stream = pin!(stream); - let complete_response = match app - .authed_api_request("/api/upload/multipart/complete", |c, url| { - c.post(url).header("Content-Type", "application/json").json( - &MultipartCompleteResponse { - video_id, - upload_id, - parts: uploaded_parts, - meta: metadata, - }, - ) - }) - .await - { - Ok(response) => response, - Err(e) => { - return Err(format!("Failed to complete multipart upload: {e}")); - } - }; + while let Some(chunk) = stream.next().await { + if let Ok(chunk) = &chunk { + uploaded += chunk.size(); - if !complete_response.status().is_success() { - let status = complete_response.status(); - let error_body = complete_response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(format!( - "Failed to complete multipart upload. Status: {status}. Body: {error_body}" - )); - } - - let complete_data = match complete_response.json::().await { - Ok(d) => d, - Err(e) => { - return Err(format!("Failed to parse completion response: {e}")); + channel.send(UploadProgress { + progress: uploaded as f64 / chunk.total() as f64 + }) + .ok(); } - }; - if let Some(location) = complete_data.get("location") { - println!("Multipart upload complete. Final S3 location: {location}"); - } else { - println!("Multipart upload complete. No 'location' in response."); + yield chunk; } - - println!("Multipart upload complete for {video_id}."); - Ok(()) } } - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct UploadedPart { - part_number: i32, - etag: String, - size: usize, -} diff --git a/apps/desktop/src-tauri/src/upload_legacy.rs b/apps/desktop/src-tauri/src/upload_legacy.rs new file mode 100644 index 000000000..ee069f329 --- /dev/null +++ b/apps/desktop/src-tauri/src/upload_legacy.rs @@ -0,0 +1,1146 @@ +//! This is the legacy uploading module. +//! We are keeping it for now as an easy fallback. +//! +//! You should avoid making changes to it, make changes to the new upload module instead. + +// credit @filleduchaos + +use crate::api::S3VideoMeta; +use crate::web_api::ManagerExt; +use crate::{UploadProgress, VideoUploadInfo}; +use ffmpeg::ffi::AV_TIME_BASE; +use flume::Receiver; +use futures::StreamExt; +use image::ImageReader; +use image::codecs::jpeg::JpegEncoder; +use reqwest::StatusCode; +use reqwest::header::CONTENT_LENGTH; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use specta::Type; +use std::{ + path::PathBuf, + time::{Duration, Instant}, +}; +use tauri::{AppHandle, ipc::Channel}; +use tauri_plugin_clipboard_manager::ClipboardExt; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::task::{self, JoinHandle}; +use tokio::time::sleep; +use tracing::{debug, error, info, trace, warn}; + +#[derive(Deserialize, Serialize, Clone, Type, Debug)] +pub struct S3UploadMeta { + id: String, +} + +#[derive(Deserialize, Clone, Debug)] +pub struct CreateErrorResponse { + error: String, +} + +// fn deserialize_empty_object_as_string<'de, D>(deserializer: D) -> Result +// where +// D: Deserializer<'de>, +// { +// struct StringOrObject; + +// impl<'de> de::Visitor<'de> for StringOrObject { +// type Value = String; + +// fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { +// formatter.write_str("string or empty object") +// } + +// fn visit_str(self, value: &str) -> Result +// where +// E: de::Error, +// { +// Ok(value.to_string()) +// } + +// fn visit_string(self, value: String) -> Result +// where +// E: de::Error, +// { +// Ok(value) +// } + +// fn visit_map(self, _map: M) -> Result +// where +// M: de::MapAccess<'de>, +// { +// // Return empty string for empty objects +// Ok(String::new()) +// } +// } + +// deserializer.deserialize_any(StringOrObject) +// } + +// impl S3UploadMeta { +// pub fn id(&self) -> &str { +// &self.id +// } + +// // pub fn new(id: String) -> Self { +// // Self { id } +// // } +// } + +pub struct UploadedVideo { + pub link: String, + pub id: String, + #[allow(unused)] + pub config: S3UploadMeta, +} + +pub struct UploadedImage { + pub link: String, + pub id: String, +} + +// pub struct UploadedAudio { +// pub link: String, +// pub id: String, +// pub config: S3UploadMeta, +// } + +pub struct UploadProgressUpdater { + video_state: Option, + app: AppHandle, + video_id: String, +} + +struct VideoProgressState { + uploaded: u64, + total: u64, + pending_task: Option>, + last_update_time: Instant, +} + +impl UploadProgressUpdater { + pub fn new(app: AppHandle, video_id: String) -> Self { + Self { + video_state: None, + app, + video_id, + } + } + + pub fn update(&mut self, uploaded: u64, total: u64) { + let should_send_immediately = { + let state = self.video_state.get_or_insert_with(|| VideoProgressState { + uploaded, + total, + pending_task: None, + last_update_time: Instant::now(), + }); + + // Cancel any pending task + if let Some(handle) = state.pending_task.take() { + handle.abort(); + } + + state.uploaded = uploaded; + state.total = total; + state.last_update_time = Instant::now(); + + // Send immediately if upload is complete + uploaded >= total + }; + + let app = self.app.clone(); + if should_send_immediately { + tokio::spawn({ + let video_id = self.video_id.clone(); + async move { + Self::send_api_update(&app, video_id, uploaded, total).await; + } + }); + + // Clear state since upload is complete + self.video_state = None; + } else { + // Schedule delayed update + let handle = { + let video_id = self.video_id.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(2)).await; + Self::send_api_update(&app, video_id, uploaded, total).await; + }) + }; + + if let Some(state) = &mut self.video_state { + state.pending_task = Some(handle); + } + } + } + + async fn send_api_update(app: &AppHandle, video_id: String, uploaded: u64, total: u64) { + let response = app + .authed_api_request("/api/desktop/video/progress", |client, url| { + client + .post(url) + .header("X-Cap-Desktop-Version", env!("CARGO_PKG_VERSION")) + .json(&json!({ + "videoId": video_id, + "uploaded": uploaded, + "total": total, + "updatedAt": chrono::Utc::now().to_rfc3339() + })) + }) + .await; + + match response { + Ok(resp) if resp.status().is_success() => { + trace!("Progress update sent successfully"); + } + Ok(resp) => error!("Failed to send progress update: {}", resp.status()), + Err(err) => error!("Failed to send progress update: {err}"), + } + } +} + +pub async fn upload_video( + app: &AppHandle, + video_id: String, + file_path: PathBuf, + existing_config: Option, + screenshot_path: Option, + meta: Option, + channel: Option>, +) -> Result { + println!("Uploading video {video_id}..."); + + let client = reqwest::Client::new(); + let s3_config = match existing_config { + Some(config) => config, + None => create_or_get_video(app, false, Some(video_id.clone()), None, meta).await?, + }; + + let presigned_put = presigned_s3_put( + app, + PresignedS3PutRequest { + video_id: video_id.clone(), + subpath: "result.mp4".to_string(), + method: PresignedS3PutRequestMethod::Put, + meta: Some(build_video_meta(&file_path)?), + }, + ) + .await?; + + let file = tokio::fs::File::open(&file_path) + .await + .map_err(|e| format!("Failed to open file: {e}"))?; + + let metadata = file + .metadata() + .await + .map_err(|e| format!("Failed to get file metadata: {e}"))?; + + let total_size = metadata.len(); + + let reader_stream = tokio_util::io::ReaderStream::new(file); + + let mut bytes_uploaded = 0u64; + let mut progress = UploadProgressUpdater::new(app.clone(), video_id); + + let progress_stream = reader_stream.inspect(move |chunk| { + if let Ok(chunk) = chunk { + bytes_uploaded += chunk.len() as u64; + } + + if bytes_uploaded > 0 { + if let Some(channel) = &channel { + channel + .send(UploadProgress { + progress: bytes_uploaded as f64 / total_size as f64, + }) + .ok(); + } + + progress.update(bytes_uploaded, total_size); + } + }); + + let screenshot_upload = match screenshot_path { + Some(screenshot_path) if screenshot_path.exists() => { + Some(prepare_screenshot_upload(app, &s3_config, screenshot_path)) + } + _ => None, + }; + + let video_upload = client + .put(presigned_put) + .body(reqwest::Body::wrap_stream(progress_stream)) + .header(CONTENT_LENGTH, metadata.len()); + + let (video_upload, screenshot_result): ( + Result, + Option>, + ) = tokio::join!(video_upload.send(), async { + if let Some(screenshot_req) = screenshot_upload { + Some(screenshot_req.await) + } else { + None + } + }); + + let response = video_upload.map_err(|e| format!("Failed to send upload file request: {e}"))?; + + if response.status().is_success() { + println!("Video uploaded successfully"); + + if let Some(Ok(screenshot_response)) = screenshot_result { + if screenshot_response.status().is_success() { + println!("Screenshot uploaded successfully"); + } else { + println!( + "Failed to upload screenshot: {}", + screenshot_response.status() + ); + } + } + + return Ok(UploadedVideo { + link: app.make_app_url(format!("/s/{}", &s3_config.id)).await, + id: s3_config.id.clone(), + config: s3_config, + }); + } + + let status = response.status(); + let error_body = response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + tracing::error!( + "Failed to upload file. Status: {}. Body: {}", + status, + error_body + ); + Err(format!( + "Failed to upload file. Status: {status}. Body: {error_body}" + )) +} + +pub async fn upload_image(app: &AppHandle, file_path: PathBuf) -> Result { + let file_name = file_path + .file_name() + .and_then(|name| name.to_str()) + .ok_or("Invalid file path")? + .to_string(); + + let client = reqwest::Client::new(); + let s3_config = create_or_get_video(app, true, None, None, None).await?; + + let presigned_put = presigned_s3_put( + app, + PresignedS3PutRequest { + video_id: s3_config.id.clone(), + subpath: file_name, + method: PresignedS3PutRequestMethod::Put, + meta: None, + }, + ) + .await?; + + let file_content = tokio::fs::read(&file_path) + .await + .map_err(|e| format!("Failed to read file: {e}"))?; + + let response = client + .put(presigned_put) + .header(CONTENT_LENGTH, file_content.len()) + .body(file_content) + .send() + .await + .map_err(|e| format!("Failed to send upload file request: {e}"))?; + + if response.status().is_success() { + println!("File uploaded successfully"); + return Ok(UploadedImage { + link: app.make_app_url(format!("/s/{}", &s3_config.id)).await, + id: s3_config.id, + }); + } + + let status = response.status(); + let error_body = response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + tracing::error!( + "Failed to upload file. Status: {}. Body: {}", + status, + error_body + ); + Err(format!( + "Failed to upload file. Status: {status}. Body: {error_body}" + )) +} + +pub async fn create_or_get_video( + app: &AppHandle, + is_screenshot: bool, + video_id: Option, + name: Option, + meta: Option, +) -> Result { + let mut s3_config_url = if let Some(id) = video_id { + format!("/api/desktop/video/create?recordingMode=desktopMP4&videoId={id}") + } else if is_screenshot { + "/api/desktop/video/create?recordingMode=desktopMP4&isScreenshot=true".to_string() + } else { + "/api/desktop/video/create?recordingMode=desktopMP4".to_string() + }; + + if let Some(name) = name { + s3_config_url.push_str(&format!("&name={name}")); + } + + if let Some(meta) = meta { + s3_config_url.push_str(&format!("&durationInSecs={}", meta.duration_in_secs)); + s3_config_url.push_str(&format!("&width={}", meta.width)); + s3_config_url.push_str(&format!("&height={}", meta.height)); + if let Some(fps) = meta.fps { + s3_config_url.push_str(&format!("&fps={}", fps)); + } + } + + let response = app + .authed_api_request(s3_config_url, |client, url| client.get(url)) + .await + .map_err(|e| format!("Failed to send request to Next.js handler: {e}"))?; + + if response.status() == StatusCode::UNAUTHORIZED { + return Err("Failed to authenticate request; please log in again".into()); + } + + if response.status() != StatusCode::OK { + if let Ok(error) = response.json::().await { + if error.error == "upgrade_required" { + return Err( + "You must upgrade to Cap Pro to upload recordings over 5 minutes in length" + .into(), + ); + } + + return Err(format!("server error: {}", error.error)); + } + + return Err("Unknown error uploading video".into()); + } + + let response_text = response + .text() + .await + .map_err(|e| format!("Failed to read response body: {e}"))?; + + let config = serde_json::from_str::(&response_text).map_err(|e| { + format!("Failed to deserialize response: {e}. Response body: {response_text}") + })?; + + Ok(config) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PresignedS3PutRequest { + video_id: String, + subpath: String, + method: PresignedS3PutRequestMethod, + #[serde(flatten)] + meta: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "lowercase")] +pub enum PresignedS3PutRequestMethod { + #[allow(unused)] + Post, + Put, +} + +async fn presigned_s3_put(app: &AppHandle, body: PresignedS3PutRequest) -> Result { + #[derive(Deserialize, Debug)] + struct Data { + url: String, + } + + #[derive(Deserialize, Debug)] + #[serde(rename_all = "camelCase")] + struct Wrapper { + presigned_put_data: Data, + } + + let response = app + .authed_api_request("/api/upload/signed", |client, url| { + client.post(url).json(&body) + }) + .await + .map_err(|e| format!("Failed to send request to Next.js handler: {e}"))?; + + if response.status() == StatusCode::UNAUTHORIZED { + return Err("Failed to authenticate request; please log in again".into()); + } + + let Wrapper { presigned_put_data } = response + .json::() + .await + .map_err(|e| format!("Failed to deserialize server response: {e}"))?; + + Ok(presigned_put_data.url) +} + +pub fn build_video_meta(path: &PathBuf) -> Result { + let input = + ffmpeg::format::input(path).map_err(|e| format!("Failed to read input file: {e}"))?; + let video_stream = input + .streams() + .best(ffmpeg::media::Type::Video) + .ok_or_else(|| "Failed to find appropriate video stream in file".to_string())?; + + let video_codec = ffmpeg::codec::context::Context::from_parameters(video_stream.parameters()) + .map_err(|e| format!("Unable to read video codec information: {e}"))?; + let video = video_codec + .decoder() + .video() + .map_err(|e| format!("Unable to get video decoder: {e}"))?; + + Ok(S3VideoMeta { + duration_in_secs: input.duration() as f64 / AV_TIME_BASE as f64, + width: video.width(), + height: video.height(), + fps: video + .frame_rate() + .map(|v| (v.numerator() as f32 / v.denominator() as f32)), + }) +} + +// fn build_audio_upload_body( +// path: &PathBuf, +// base: S3UploadBody, +// ) -> Result { +// let input = +// ffmpeg::format::input(path).map_err(|e| format!("Failed to read input file: {e}"))?; +// let stream = input +// .streams() +// .best(ffmpeg::media::Type::Audio) +// .ok_or_else(|| "Failed to find appropriate audio stream in file".to_string())?; + +// let duration_millis = input.duration() as f64 / 1000.; + +// let codec = ffmpeg::codec::context::Context::from_parameters(stream.parameters()) +// .map_err(|e| format!("Unable to read audio codec information: {e}"))?; +// let codec_name = codec.id(); + +// let is_mp3 = path.extension().is_some_and(|ext| ext == "mp3"); + +// Ok(S3AudioUploadBody { +// base, +// duration: duration_millis.to_string(), +// audio_codec: format!("{codec_name:?}").replace("Id::", "").to_lowercase(), +// is_mp3, +// }) +// } + +pub async fn prepare_screenshot_upload( + app: &AppHandle, + s3_config: &S3UploadMeta, + screenshot_path: PathBuf, +) -> Result { + let presigned_put = presigned_s3_put( + app, + PresignedS3PutRequest { + video_id: s3_config.id.clone(), + subpath: "screenshot/screen-capture.jpg".to_string(), + method: PresignedS3PutRequestMethod::Put, + meta: None, + }, + ) + .await?; + + let compressed_image = compress_image(screenshot_path).await?; + + reqwest::Client::new() + .put(presigned_put) + .header(CONTENT_LENGTH, compressed_image.len()) + .body(compressed_image) + .send() + .await + .map_err(|e| format!("Error uploading screenshot: {e}")) +} + +async fn compress_image(path: PathBuf) -> Result, String> { + task::spawn_blocking(move || { + let img = ImageReader::open(&path) + .map_err(|e| format!("Failed to open image: {e}"))? + .decode() + .map_err(|e| format!("Failed to decode image: {e}"))?; + + let new_width = img.width() / 2; + let new_height = img.height() / 2; + + let resized_img = img.resize(new_width, new_height, image::imageops::FilterType::Nearest); + + let mut buffer = Vec::new(); + let mut encoder = JpegEncoder::new_with_quality(&mut buffer, 30); + encoder + .encode( + resized_img.as_bytes(), + new_width, + new_height, + resized_img.color().into(), + ) + .map_err(|e| format!("Failed to compress image: {e}"))?; + + Ok(buffer) + }) + .await + .map_err(|e| format!("Failed to compress image: {e}"))? +} + +// a typical recommended chunk size is 5MB (AWS min part size). +const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB +// const MIN_PART_SIZE: u64 = 5 * 1024 * 1024; // For non-final parts + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct MultipartCompleteResponse<'a> { + video_id: &'a str, + upload_id: &'a str, + parts: &'a [UploadedPart], + #[serde(flatten)] + meta: Option, +} + +pub struct InstantMultipartUpload {} + +impl InstantMultipartUpload { + pub async fn run( + app: AppHandle, + video_id: String, + file_path: PathBuf, + pre_created_video: VideoUploadInfo, + realtime_video_done: Option>, + ) -> Result<(), String> { + use std::time::Duration; + + use tokio::time::sleep; + + // -------------------------------------------- + // basic constants and info for chunk approach + // -------------------------------------------- + let client = reqwest::Client::new(); + let s3_config = pre_created_video.config; + + let mut uploaded_parts = Vec::new(); + let mut part_number = 1; + let mut last_uploaded_position: u64 = 0; + let mut progress = UploadProgressUpdater::new(app.clone(), pre_created_video.id.clone()); + + // -------------------------------------------- + // initiate the multipart upload + // -------------------------------------------- + debug!("Initiating multipart upload for {video_id}..."); + let initiate_response = match app + .authed_api_request("/api/upload/multipart/initiate", |c, url| { + c.post(url) + .header("Content-Type", "application/json") + .json(&serde_json::json!({ + "videoId": s3_config.id, + "contentType": "video/mp4" + })) + }) + .await + { + Ok(r) => r, + Err(e) => { + return Err(format!("Failed to initiate multipart upload: {e}")); + } + }; + + if !initiate_response.status().is_success() { + let status = initiate_response.status(); + let error_body = initiate_response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!( + "Failed to initiate multipart upload. Status: {status}. Body: {error_body}" + )); + } + + let initiate_data = match initiate_response.json::().await { + Ok(d) => d, + Err(e) => { + return Err(format!("Failed to parse initiate response: {e}")); + } + }; + + let upload_id = match initiate_data.get("uploadId") { + Some(val) => val.as_str().unwrap_or("").to_string(), + None => { + return Err("No uploadId returned from initiate endpoint".to_string()); + } + }; + + if upload_id.is_empty() { + return Err("Empty uploadId returned from initiate endpoint".to_string()); + } + + println!("Multipart upload initiated with ID: {upload_id}"); + + let mut realtime_is_done = realtime_video_done.as_ref().map(|_| false); + + // -------------------------------------------- + // Main loop while upload not complete: + // - If we have >= CHUNK_SIZE new data, upload. + // - If recording hasn't stopped, keep waiting. + // - If recording stopped, do leftover final(s). + // -------------------------------------------- + loop { + if !realtime_is_done.unwrap_or(true) + && let Some(realtime_video_done) = &realtime_video_done + { + match realtime_video_done.try_recv() { + Ok(_) => { + realtime_is_done = Some(true); + } + Err(flume::TryRecvError::Empty) => {} + _ => { + warn!("cancelling upload as realtime generation failed"); + return Err("cancelling upload as realtime generation failed".to_string()); + } + } + } + + // Check the file's current size + if !file_path.exists() { + println!("File no longer exists, aborting upload"); + return Err("File no longer exists".to_string()); + } + + let file_size = match tokio::fs::metadata(&file_path).await { + Ok(md) => md.len(), + Err(e) => { + println!("Failed to get file metadata: {e}"); + sleep(Duration::from_millis(500)).await; + continue; + } + }; + + let new_data_size = file_size - last_uploaded_position; + + if ((new_data_size >= CHUNK_SIZE) + || new_data_size > 0 && realtime_is_done.unwrap_or(false)) + || (realtime_is_done.is_none() && new_data_size > 0) + { + // We have a full chunk to send + match Self::upload_chunk( + &app, + &client, + &file_path, + &s3_config.id, + &upload_id, + &mut part_number, + &mut last_uploaded_position, + new_data_size.min(CHUNK_SIZE), + &mut progress, + ) + .await + { + Ok(part) => { + uploaded_parts.push(part); + } + Err(e) => { + println!( + "Error uploading chunk (part {part_number}): {e}. Retrying in 1s..." + ); + sleep(Duration::from_secs(1)).await; + } + } + } else if new_data_size == 0 && realtime_is_done.unwrap_or(true) { + if realtime_is_done.unwrap_or(false) { + info!("realtime video done, uploading header chunk"); + + let part = Self::upload_chunk( + &app, + &client, + &file_path, + &s3_config.id, + &upload_id, + &mut 1, + &mut 0, + uploaded_parts[0].size as u64, + &mut progress, + ) + .await + .map_err(|err| format!("Failed to re-upload first chunk: {err}"))?; + + uploaded_parts[0] = part; + println!("Successfully re-uploaded first chunk",); + } + + // All leftover chunks are now uploaded. We finalize. + println!( + "Completing multipart upload with {} parts", + uploaded_parts.len() + ); + Self::finalize_upload(&app, &file_path, &s3_config.id, &upload_id, &uploaded_parts) + .await?; + + break; + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + // Copy link to clipboard early + let _ = app.clipboard().write_text(pre_created_video.link.clone()); + + Ok(()) + } + + /// Upload a single chunk from the file at `last_uploaded_position` for `chunk_size` bytes. + /// Advances `last_uploaded_position` accordingly. Returns JSON { PartNumber, ETag, Size }. + #[allow(clippy::too_many_arguments)] + async fn upload_chunk( + app: &AppHandle, + client: &reqwest::Client, + file_path: &PathBuf, + video_id: &str, + upload_id: &str, + part_number: &mut i32, + last_uploaded_position: &mut u64, + chunk_size: u64, + progress: &mut UploadProgressUpdater, + ) -> Result { + let file_size = match tokio::fs::metadata(file_path).await { + Ok(metadata) => metadata.len(), + Err(e) => return Err(format!("Failed to get file metadata: {e}")), + }; + + // Check if we're at the end of the file + if *last_uploaded_position >= file_size { + return Err("No more data to read - already at end of file".to_string()); + } + + // Calculate how much we can actually read + let remaining = file_size - *last_uploaded_position; + let bytes_to_read = std::cmp::min(chunk_size, remaining); + + let mut file = tokio::fs::File::open(file_path) + .await + .map_err(|e| format!("Failed to open file: {e}"))?; + + // Log before seeking + println!( + "Seeking to offset {} for part {} (file size: {}, remaining: {})", + *last_uploaded_position, *part_number, file_size, remaining + ); + + // Seek to the position we left off + if let Err(e) = file + .seek(std::io::SeekFrom::Start(*last_uploaded_position)) + .await + { + return Err(format!("Failed to seek in file: {e}")); + } + + // Read exactly bytes_to_read + let mut chunk = vec![0u8; bytes_to_read as usize]; + let mut total_read = 0; + + while total_read < bytes_to_read as usize { + match file.read(&mut chunk[total_read..]).await { + Ok(0) => break, // EOF + Ok(n) => { + total_read += n; + println!("Read {n} bytes, total so far: {total_read}/{bytes_to_read}"); + } + Err(e) => return Err(format!("Failed to read chunk from file: {e}")), + } + } + + if total_read == 0 { + return Err("No data to upload for this part.".to_string()); + } + + // Truncate the buffer to the actual bytes read + chunk.truncate(total_read); + + // Basic content‑MD5 for data integrity + let md5_sum = { + let digest = md5::compute(&chunk); + base64::encode(digest.0) + }; + + // Verify file position to ensure we're not experiencing file handle issues + let pos_after_read = file + .seek(std::io::SeekFrom::Current(0)) + .await + .map_err(|e| format!("Failed to get current file position: {e}"))?; + + let expected_pos = *last_uploaded_position + total_read as u64; + if pos_after_read != expected_pos { + println!( + "WARNING: File position after read ({pos_after_read}) doesn't match expected position ({expected_pos})" + ); + } + + let file_size = tokio::fs::metadata(file_path) + .await + .map(|m| m.len()) + .unwrap_or(0); + let remaining = file_size - *last_uploaded_position; + + println!( + "File size: {}, Last uploaded: {}, Remaining: {}, chunk_size: {}, part: {}", + file_size, *last_uploaded_position, remaining, chunk_size, *part_number + ); + println!( + "Uploading part {} ({} bytes), MD5: {}", + *part_number, total_read, md5_sum + ); + + // Request presigned URL for this part + let presign_response = match app + .authed_api_request("/api/upload/multipart/presign-part", |c, url| { + c.post(url) + .header("Content-Type", "application/json") + .json(&serde_json::json!({ + "videoId": video_id, + "uploadId": upload_id, + "partNumber": *part_number, + "md5Sum": &md5_sum + })) + }) + .await + { + Ok(r) => r, + Err(e) => { + return Err(format!( + "Failed to request presigned URL for part {}: {}", + *part_number, e + )); + } + }; + + progress.update(expected_pos, file_size); + + if !presign_response.status().is_success() { + let status = presign_response.status(); + let error_body = presign_response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!( + "Presign-part failed for part {}: status={}, body={}", + *part_number, status, error_body + )); + } + + let presign_data = match presign_response.json::().await { + Ok(d) => d, + Err(e) => return Err(format!("Failed to parse presigned URL response: {e}")), + }; + + let presigned_url = presign_data + .get("presignedUrl") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + if presigned_url.is_empty() { + return Err(format!("Empty presignedUrl for part {}", *part_number)); + } + + // Upload the chunk with retry + let mut retry_count = 0; + let max_retries = 3; + let mut etag: Option = None; + + while retry_count < max_retries && etag.is_none() { + println!( + "Sending part {} (attempt {}/{}): {} bytes", + *part_number, + retry_count + 1, + max_retries, + total_read + ); + + match client + .put(&presigned_url) + .header("Content-MD5", &md5_sum) + .timeout(Duration::from_secs(120)) + .body(chunk.clone()) + .send() + .await + { + Ok(upload_response) => { + if upload_response.status().is_success() { + if let Some(etag_val) = upload_response.headers().get("ETag") { + let e = etag_val + .to_str() + .unwrap_or("") + .trim_matches('"') + .to_string(); + println!("Received ETag {} for part {}", e, *part_number); + etag = Some(e); + } else { + println!("No ETag in response for part {}", *part_number); + retry_count += 1; + sleep(Duration::from_secs(2)).await; + } + } else { + println!( + "Failed part {} (status {}). Will retry if possible.", + *part_number, + upload_response.status() + ); + if let Ok(body) = upload_response.text().await { + println!("Error response: {body}"); + } + retry_count += 1; + sleep(Duration::from_secs(2)).await; + } + } + Err(e) => { + println!( + "Part {} upload error (attempt {}/{}): {}", + *part_number, + retry_count + 1, + max_retries, + e + ); + retry_count += 1; + sleep(Duration::from_secs(2)).await; + } + } + } + + let etag = match etag { + Some(e) => e, + None => { + return Err(format!( + "Failed to upload part {} after {} attempts", + *part_number, max_retries + )); + } + }; + + // Advance the global progress + *last_uploaded_position += total_read as u64; + println!( + "After upload: new last_uploaded_position is {} ({}% of file)", + *last_uploaded_position, + (*last_uploaded_position as f64 / file_size as f64 * 100.0) as u32 + ); + + let part = UploadedPart { + part_number: *part_number, + etag, + size: total_read, + }; + *part_number += 1; + Ok(part) + } + + /// Completes the multipart upload with the stored parts. + /// Logs a final location if the complete call is successful. + async fn finalize_upload( + app: &AppHandle, + file_path: &PathBuf, + video_id: &str, + upload_id: &str, + uploaded_parts: &[UploadedPart], + ) -> Result<(), String> { + println!( + "Completing multipart upload with {} parts", + uploaded_parts.len() + ); + + if uploaded_parts.is_empty() { + return Err("No parts uploaded before finalizing.".to_string()); + } + + let mut total_bytes_in_parts = 0; + for part in uploaded_parts { + let pn = part.part_number; + let size = part.size; + let etag = &part.etag; + total_bytes_in_parts += part.size; + println!("Part {pn}: {size} bytes (ETag: {etag})"); + } + + let file_final_size = tokio::fs::metadata(file_path) + .await + .map(|md| md.len()) + .unwrap_or(0); + + println!("Sum of all parts: {total_bytes_in_parts} bytes"); + println!("File size on disk: {file_final_size} bytes"); + println!("Proceeding with multipart upload completion..."); + + let metadata = build_video_meta(file_path) + .map_err(|e| error!("Failed to get video metadata: {e}")) + .ok(); + + let complete_response = match app + .authed_api_request("/api/upload/multipart/complete", |c, url| { + c.post(url).header("Content-Type", "application/json").json( + &MultipartCompleteResponse { + video_id, + upload_id, + parts: uploaded_parts, + meta: metadata, + }, + ) + }) + .await + { + Ok(response) => response, + Err(e) => { + return Err(format!("Failed to complete multipart upload: {e}")); + } + }; + + if !complete_response.status().is_success() { + let status = complete_response.status(); + let error_body = complete_response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!( + "Failed to complete multipart upload. Status: {status}. Body: {error_body}" + )); + } + + let complete_data = match complete_response.json::().await { + Ok(d) => d, + Err(e) => { + return Err(format!("Failed to parse completion response: {e}")); + } + }; + + if let Some(location) = complete_data.get("location") { + println!("Multipart upload complete. Final S3 location: {location}"); + } else { + println!("Multipart upload complete. No 'location' in response."); + } + + println!("Multipart upload complete for {video_id}."); + Ok(()) + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct UploadedPart { + part_number: i32, + etag: String, + size: usize, +} diff --git a/apps/desktop/src-tauri/src/web_api.rs b/apps/desktop/src-tauri/src/web_api.rs index 2970d903f..b87b4721f 100644 --- a/apps/desktop/src-tauri/src/web_api.rs +++ b/apps/desktop/src-tauri/src/web_api.rs @@ -26,7 +26,7 @@ async fn do_authed_request( } ), ) - .header("X-Desktop-Version", env!("CARGO_PKG_VERSION")); + .header("X-Cap-Desktop-Version", env!("CARGO_PKG_VERSION")); if let Some(s) = std::option_env!("VITE_VERCEL_AUTOMATION_BYPASS_SECRET") { req = req.header("x-vercel-protection-bypass", s); diff --git a/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx b/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx index 1dc37e8f8..114453a98 100644 --- a/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx +++ b/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx @@ -26,6 +26,7 @@ function Inner(props: { initialStore: GeneralSettingsStore | null }) { enableNewRecordingFlow: false, autoZoomOnClicks: false, custom_cursor_capture2: true, + enableNewUploader: false, }, ); @@ -96,6 +97,19 @@ function Inner(props: { initialStore: GeneralSettingsStore | null }) { ); }} /> + { + handleChange("enableNewUploader", value); + // This is bad code, but I just want the UI to not jank and can't seem to find the issue. + setTimeout( + () => window.scrollTo({ top: 0, behavior: "instant" }), + 5, + ); + }} + /> diff --git a/apps/desktop/src/routes/(window-chrome)/settings/general.tsx b/apps/desktop/src/routes/(window-chrome)/settings/general.tsx index 8b6b1f1e4..853973d0e 100644 --- a/apps/desktop/src/routes/(window-chrome)/settings/general.tsx +++ b/apps/desktop/src/routes/(window-chrome)/settings/general.tsx @@ -118,6 +118,7 @@ function Inner(props: { initialStore: GeneralSettingsStore | null }) { enableNewRecordingFlow: false, autoZoomOnClicks: false, custom_cursor_capture2: true, + enableNewUploader: false, }, ); diff --git a/apps/desktop/src/routes/(window-chrome)/settings/recordings.tsx b/apps/desktop/src/routes/(window-chrome)/settings/recordings.tsx index e26e056e0..16f03892a 100644 --- a/apps/desktop/src/routes/(window-chrome)/settings/recordings.tsx +++ b/apps/desktop/src/routes/(window-chrome)/settings/recordings.tsx @@ -13,6 +13,7 @@ import { revealItemInDir } from "@tauri-apps/plugin-opener"; import * as shell from "@tauri-apps/plugin-shell"; import { cx } from "cva"; import { + createEffect, createMemo, createSignal, For, @@ -20,17 +21,19 @@ import { type ParentProps, Show, } from "solid-js"; +import { createStore, produce } from "solid-js/store"; +import CapTooltip from "~/components/Tooltip"; import { trackEvent } from "~/utils/analytics"; import { createTauriEventListener } from "~/utils/createEventListener"; import { commands, events, - type RecordingMetaWithMode, + type RecordingMetaWithMetadata, type UploadProgress, } from "~/utils/tauri"; type Recording = { - meta: RecordingMetaWithMode; + meta: RecordingMetaWithMetadata; path: string; prettyName: string; thumbnailPath: string; @@ -73,14 +76,29 @@ const recordingsQuery = queryOptions({ ); return recordings; }, + // This will ensure any changes to the upload status in the project meta are reflected. + refetchInterval: 2000, }); export default function Recordings() { const [activeTab, setActiveTab] = createSignal<(typeof Tabs)[number]["id"]>( Tabs[0].id, ); + const [uploadProgress, setUploadProgress] = createStore< + Record + >({}); const recordings = createQuery(() => recordingsQuery); + createTauriEventListener(events.uploadProgressEvent, (e) => { + setUploadProgress(e.video_id, (Number(e.uploaded) / Number(e.total)) * 100); + if (e.uploaded === e.total) + setUploadProgress( + produce((s) => { + delete s[e.video_id]; + }), + ); + }); + createTauriEventListener(events.recordingDeleted, () => recordings.refetch()); const filteredRecordings = createMemo(() => { @@ -169,6 +187,13 @@ export default function Recordings() { onCopyVideoToClipboard={() => handleCopyVideoToClipboard(recording.path) } + uploadProgress={ + recording.meta.upload && + (recording.meta.upload.state === "MultipartUpload" || + recording.meta.upload.state === "SinglePartUpload") + ? uploadProgress[recording.meta.upload.video_id] + : undefined + } /> )} @@ -185,6 +210,7 @@ function RecordingItem(props: { onOpenFolder: () => void; onOpenEditor: () => void; onCopyVideoToClipboard: () => void; + uploadProgress: number | undefined; }) { const [imageExists, setImageExists] = createSignal(true); const mode = () => props.recording.meta.mode; @@ -211,23 +237,66 @@ function RecordingItem(props: {
{props.recording.prettyName} -
- {mode() === "instant" ? ( - - ) : ( - - )} -

{firstLetterUpperCase()}

+
+
+ {mode() === "instant" ? ( + + ) : ( + + )} +

{firstLetterUpperCase()}

+
+ + +
+ +

Recording in progress

+
+
+ + + + {props.recording.meta.status.status === "Failed" + ? props.recording.meta.status.error + : ""} + + } + > +
+ +

Recording failed

+
+
+
+ + + + + {(sharing) => ( props.onOpenEditor()} + disabled={props.recording.meta.status.status !== "Complete"} > {(_) => { - const [progress, setProgress] = createSignal(0); const reupload = createMutation(() => ({ - mutationFn: async () => { - setProgress(0); - return await commands.uploadExportedVideo( + mutationFn: () => + commands.uploadExportedVideo( props.recording.path, "Reupload", - new Channel((progress) => - setProgress(Math.round(progress.progress * 100)), - ), - ); - }, - onSettled: () => setProgress(0), + new Channel((progress) => {}), + ), })); return ( - - {(sharing) => ( - <> + <> + reupload.mutate()} > - {reupload.isPending ? ( - - ) : ( - - )} + + } + > + + + + + {(sharing) => ( shell.open(sharing().link)} > - - )} - + )} + + ); }} @@ -330,7 +398,7 @@ function TooltipIconButton( props.onClick(); }} disabled={props.disabled} - class="p-2.5 opacity-70 will-change-transform hover:opacity-100 rounded-full transition-all duration-200 hover:bg-gray-3 dark:hover:bg-gray-5" + class="p-2.5 opacity-70 will-change-transform hover:opacity-100 rounded-full transition-all duration-200 hover:bg-gray-3 dark:hover:bg-gray-5 disabled:pointer-events-none disabled:opacity-45 disabled:hover:opacity-45" > {props.children} diff --git a/apps/desktop/src/utils/createEventListener.ts b/apps/desktop/src/utils/createEventListener.ts index a59c336f2..e3ecd2bc7 100644 --- a/apps/desktop/src/utils/createEventListener.ts +++ b/apps/desktop/src/utils/createEventListener.ts @@ -31,11 +31,14 @@ export function createTauriEventListener( eventListener: EventListener, callback: (payload: T) => void, ): void { + let aborted = false; const unlisten = eventListener.listen((event) => { + if (aborted) return; callback(event.payload); }); onCleanup(() => { + aborted = true; unlisten.then((cleanup) => cleanup()); }); } diff --git a/apps/desktop/src/utils/tauri.ts b/apps/desktop/src/utils/tauri.ts index 14ad46d54..cfece4f3a 100644 --- a/apps/desktop/src/utils/tauri.ts +++ b/apps/desktop/src/utils/tauri.ts @@ -125,13 +125,13 @@ async uploadExportedVideo(path: string, mode: UploadMode, channel: TAURI_CHANNEL async uploadScreenshot(screenshotPath: string) : Promise { return await TAURI_INVOKE("upload_screenshot", { screenshotPath }); }, -async getRecordingMeta(path: string, fileType: FileType) : Promise { +async getRecordingMeta(path: string, fileType: FileType) : Promise { return await TAURI_INVOKE("get_recording_meta", { path, fileType }); }, async saveFileDialog(fileName: string, fileType: string) : Promise { return await TAURI_INVOKE("save_file_dialog", { fileName, fileType }); }, -async listRecordings() : Promise<([string, RecordingMetaWithMode])[]> { +async listRecordings() : Promise<([string, RecordingMetaWithMetadata])[]> { return await TAURI_INVOKE("list_recordings"); }, async listScreenshots() : Promise<([string, RecordingMeta])[]> { @@ -301,7 +301,8 @@ requestOpenRecordingPicker: RequestOpenRecordingPicker, requestOpenSettings: RequestOpenSettings, requestScreenCapturePrewarm: RequestScreenCapturePrewarm, requestStartRecording: RequestStartRecording, -targetUnderCursor: TargetUnderCursor +targetUnderCursor: TargetUnderCursor, +uploadProgressEvent: UploadProgressEvent }>({ audioInputLevelChange: "audio-input-level-change", authenticationInvalid: "authentication-invalid", @@ -323,7 +324,8 @@ requestOpenRecordingPicker: "request-open-recording-picker", requestOpenSettings: "request-open-settings", requestScreenCapturePrewarm: "request-screen-capture-prewarm", requestStartRecording: "request-start-recording", -targetUnderCursor: "target-under-cursor" +targetUnderCursor: "target-under-cursor", +uploadProgressEvent: "upload-progress-event" }) /** user-defined constants **/ @@ -388,7 +390,7 @@ export type ExportSettings = ({ format: "Mp4" } & Mp4ExportSettings) | ({ format export type FileType = "recording" | "screenshot" export type Flags = { captions: boolean } export type FramesRendered = { renderedCount: number; totalFrames: number; type: "FramesRendered" } -export type GeneralSettingsStore = { instanceId?: string; uploadIndividualFiles?: boolean; hideDockIcon?: boolean; hapticsEnabled?: boolean; autoCreateShareableLink?: boolean; enableNotifications?: boolean; disableAutoOpenLinks?: boolean; hasCompletedStartup?: boolean; theme?: AppTheme; commercialLicense?: CommercialLicense | null; lastVersion?: string | null; windowTransparency?: boolean; postStudioRecordingBehaviour?: PostStudioRecordingBehaviour; mainWindowRecordingStartBehaviour?: MainWindowRecordingStartBehaviour; custom_cursor_capture2?: boolean; serverUrl?: string; recordingCountdown?: number | null; enableNativeCameraPreview: boolean; autoZoomOnClicks?: boolean; enableNewRecordingFlow: boolean; postDeletionBehaviour?: PostDeletionBehaviour } +export type GeneralSettingsStore = { instanceId?: string; uploadIndividualFiles?: boolean; hideDockIcon?: boolean; hapticsEnabled?: boolean; autoCreateShareableLink?: boolean; enableNotifications?: boolean; disableAutoOpenLinks?: boolean; hasCompletedStartup?: boolean; theme?: AppTheme; commercialLicense?: CommercialLicense | null; lastVersion?: string | null; windowTransparency?: boolean; postStudioRecordingBehaviour?: PostStudioRecordingBehaviour; mainWindowRecordingStartBehaviour?: MainWindowRecordingStartBehaviour; custom_cursor_capture2?: boolean; serverUrl?: string; recordingCountdown?: number | null; enableNativeCameraPreview: boolean; autoZoomOnClicks?: boolean; enableNewRecordingFlow: boolean; postDeletionBehaviour?: PostDeletionBehaviour; enableNewUploader: boolean } export type GifExportSettings = { fps: number; resolution_base: XY; quality: GifQuality | null } export type GifQuality = { /** @@ -405,7 +407,7 @@ export type Hotkey = { code: string; meta: boolean; ctrl: boolean; alt: boolean; export type HotkeyAction = "startStudioRecording" | "startInstantRecording" | "stopRecording" | "restartRecording" | "openRecordingPicker" | "openRecordingPickerDisplay" | "openRecordingPickerWindow" | "openRecordingPickerArea" | "other" export type HotkeysConfiguration = { show: boolean } export type HotkeysStore = { hotkeys: { [key in HotkeyAction]: Hotkey } } -export type InstantRecordingMeta = { fps: number; sample_rate: number | null } +export type InstantRecordingMeta = { recording: boolean } | { error: string } | { fps: number; sample_rate: number | null } export type JsonValue = [T] export type LogicalBounds = { position: LogicalPosition; size: LogicalSize } export type LogicalPosition = { x: number; y: number } @@ -414,7 +416,7 @@ export type MainWindowRecordingStartBehaviour = "close" | "minimise" export type ModelIDType = string export type Mp4ExportSettings = { fps: number; resolution_base: XY; compression: ExportCompression } export type MultipleSegment = { display: VideoMeta; camera?: VideoMeta | null; mic?: AudioMeta | null; system_audio?: AudioMeta | null; cursor?: string | null } -export type MultipleSegments = { segments: MultipleSegment[]; cursors: Cursors } +export type MultipleSegments = { segments: MultipleSegment[]; cursors: Cursors; status?: StudioRecordingStatus | null } export type NewNotification = { title: string; body: string; is_error: boolean } export type NewScreenshotAdded = { path: string } export type NewStudioRecordingAdded = { path: string } @@ -433,8 +435,8 @@ export type ProjectConfiguration = { aspectRatio: AspectRatio | null; background export type ProjectRecordingsMeta = { segments: SegmentRecordings[] } export type RecordingDeleted = { path: string } export type RecordingEvent = { variant: "Countdown"; value: number } | { variant: "Started" } | { variant: "Stopped" } | { variant: "Failed"; error: string } -export type RecordingMeta = (StudioRecordingMeta | InstantRecordingMeta) & { platform?: Platform | null; pretty_name: string; sharing?: SharingMeta | null } -export type RecordingMetaWithMode = ((StudioRecordingMeta | InstantRecordingMeta) & { platform?: Platform | null; pretty_name: string; sharing?: SharingMeta | null }) & { mode: RecordingMode } +export type RecordingMeta = (StudioRecordingMeta | InstantRecordingMeta) & { platform?: Platform | null; pretty_name: string; sharing?: SharingMeta | null; upload?: UploadMeta | null } +export type RecordingMetaWithMetadata = ((StudioRecordingMeta | InstantRecordingMeta) & { platform?: Platform | null; pretty_name: string; sharing?: SharingMeta | null; upload?: UploadMeta | null }) & { mode: RecordingMode; status: StudioRecordingStatus } export type RecordingMode = "studio" | "instant" export type RecordingOptionsChanged = null export type RecordingSettingsStore = { target: ScreenCaptureTarget | null; micName: string | null; cameraId: DeviceOrModelID | null; mode: RecordingMode | null; systemAudio: boolean } @@ -460,11 +462,14 @@ export type SingleSegment = { display: VideoMeta; camera?: VideoMeta | null; aud export type StartRecordingInputs = { capture_target: ScreenCaptureTarget; capture_system_audio?: boolean; mode: RecordingMode } export type StereoMode = "stereo" | "monoL" | "monoR" export type StudioRecordingMeta = { segment: SingleSegment } | { inner: MultipleSegments } +export type StudioRecordingStatus = { status: "InProgress" } | { status: "Failed"; error: string } | { status: "Complete" } export type TargetUnderCursor = { display_id: DisplayId | null; window: WindowUnderCursor | null } export type TimelineConfiguration = { segments: TimelineSegment[]; zoomSegments: ZoomSegment[]; sceneSegments?: SceneSegment[] } export type TimelineSegment = { recordingSegment?: number; timescale: number; start: number; end: number } +export type UploadMeta = { state: "MultipartUpload"; video_id: string; file_path: string; pre_created_video: VideoUploadInfo; recording_dir: string } | { state: "SinglePartUpload"; video_id: string; recording_dir: string; file_path: string; screenshot_path: string } | { state: "Failed"; error: string } | { state: "Complete" } export type UploadMode = { Initial: { pre_created_video: VideoUploadInfo | null } } | "Reupload" export type UploadProgress = { progress: number } +export type UploadProgressEvent = { video_id: string; uploaded: string; total: string } export type UploadResult = { Success: string } | "NotAuthenticated" | "PlanCheckFailed" | "UpgradeRequired" export type Video = { duration: number; width: number; height: number; fps: number; start_time: number } export type VideoMeta = { path: string; fps?: number; diff --git a/apps/web/actions/video/upload.ts b/apps/web/actions/video/upload.ts index 9bcd05a7c..87a3481b6 100644 --- a/apps/web/actions/video/upload.ts +++ b/apps/web/actions/video/upload.ts @@ -165,6 +165,7 @@ export async function createVideoAndGetUploadUrl({ isUpload = false, folderId, orgId, + supportsUploadProgress = false, }: { videoId?: Video.VideoId; duration?: number; @@ -175,17 +176,16 @@ export async function createVideoAndGetUploadUrl({ isUpload?: boolean; folderId?: Folder.FolderId; orgId: string; + // TODO: Remove this once we are happy with it's stability + supportsUploadProgress?: boolean; }) { const user = await getCurrentUser(); - if (!user) { - throw new Error("Unauthorized"); - } + if (!user) throw new Error("Unauthorized"); try { - if (!userIsPro(user) && duration && duration > 300) { + if (!userIsPro(user) && duration && duration > 300) throw new Error("upgrade_required"); - } const [customBucket] = await db() .select() @@ -240,9 +240,10 @@ export async function createVideoAndGetUploadUrl({ await db().insert(videos).values(videoData); - await db().insert(videoUploads).values({ - videoId: idToUse, - }); + if (supportsUploadProgress) + await db().insert(videoUploads).values({ + videoId: idToUse, + }); const fileKey = `${user.id}/${idToUse}/${ isScreenshot ? "screenshot/screen-capture.jpg" : "result.mp4" diff --git a/apps/web/app/(org)/dashboard/caps/components/CapCard/CapCard.tsx b/apps/web/app/(org)/dashboard/caps/components/CapCard/CapCard.tsx index 2e244a9bc..01a92ca8e 100644 --- a/apps/web/app/(org)/dashboard/caps/components/CapCard/CapCard.tsx +++ b/apps/web/app/(org)/dashboard/caps/components/CapCard/CapCard.tsx @@ -29,10 +29,14 @@ import { type PropsWithChildren, useState } from "react"; import { toast } from "sonner"; import { ConfirmationDialog } from "@/app/(org)/dashboard/_components/ConfirmationDialog"; import { useDashboardContext } from "@/app/(org)/dashboard/Contexts"; +import { useFeatureFlag } from "@/app/Layout/features"; import ProgressCircle, { useUploadProgress, } from "@/app/s/[videoId]/_components/ProgressCircle"; -import { VideoThumbnail } from "@/components/VideoThumbnail"; +import { + type ImageLoadingStatus, + VideoThumbnail, +} from "@/components/VideoThumbnail"; import { useEffectMutation } from "@/lib/EffectRuntime"; import { withRpc } from "@/lib/Rpcs"; import { PasswordDialog } from "../PasswordDialog"; @@ -172,6 +176,8 @@ export const CapCard = ({ cap.id, cap.hasActiveUpload || false, ); + const enableBetaUploadProgress = useFeatureFlag("enableUploadProgress"); + const [imageStatus, setImageStatus] = useState("loading"); // Helper function to create a drag preview element const createDragPreview = (text: string): HTMLElement => { @@ -292,7 +298,7 @@ export const CapCard = ({ onDragStart={handleDragStart} onDragEnd={handleDragEnd} className={clsx( - "flex relative overflow-hidden transition-colors duration-200 flex-col gap-4 w-full h-full rounded-xl cursor-default bg-gray-1 border border-gray-3 group", + "flex relative overflow-hidden transition-colors duration-200 flex-col gap-4 w-full h-full rounded-xl cursor-default bg-gray-1 border border-gray-3 group z-10", isSelected ? "!border-blue-10" : anyCapSelected @@ -314,7 +320,7 @@ export const CapCard = ({ : isDropdownOpen ? "opacity-100" : "opacity-0 group-hover:opacity-100", - "top-2 right-2 flex-col gap-2 z-[20]", + "top-2 right-2 flex-col gap-2 z-[51]", )} > { return downloadMutation.isPending ? ( @@ -421,7 +430,10 @@ export const CapCard = ({ error: "Failed to duplicate cap", }); }} - disabled={duplicateMutation.isPending} + disabled={ + duplicateMutation.isPending || + (enableBetaUploadProgress && cap.hasActiveUpload) + } className="flex gap-2 items-center rounded-lg" > @@ -496,19 +508,50 @@ export const CapCard = ({
)} +
{ - if (isDeleting) { - e.preventDefault(); - } + if (isDeleting) e.preventDefault(); }} href={`/s/${cap.id}`} > + {imageStatus !== "success" && uploadProgress ? ( +
+
+
+ {uploadProgress.status === "failed" ? ( +
+
+ +
+

+ Upload failed +

+
+ ) : ( +
+ +
+ )} +
+
+
+ ) : null} + - {uploadProgress && ( -
- {uploadProgress.status === "failed" ? ( -
-
- -
-

- Upload failed -

-
- ) : ( -
- -
- )} -
- )}
= memo( ? new Date(video.metadata.customCreatedAt) : video.createdAt; + const [imageStatus, setImageStatus] = + useState("loading"); + return (
= memo( alt={`${video.name} Thumbnail`} objectFit="cover" containerClass="!h-full !rounded-lg !border-b-0" + imageStatus={imageStatus} + setImageStatus={setImageStatus} />
diff --git a/apps/web/app/s/[videoId]/_components/CapVideoPlayer.tsx b/apps/web/app/s/[videoId]/_components/CapVideoPlayer.tsx index 7422d5912..957413a85 100644 --- a/apps/web/app/s/[videoId]/_components/CapVideoPlayer.tsx +++ b/apps/web/app/s/[videoId]/_components/CapVideoPlayer.tsx @@ -464,7 +464,12 @@ export function CapVideoPlayer({ return `https://placeholder.pics/svg/224x128/dc2626/ffffff/Error`; }, []); - const uploadProgress = useUploadProgress(videoId, hasActiveUpload || false); + const uploadProgressRaw = useUploadProgress( + videoId, + hasActiveUpload || false, + ); + // if the video comes back from S3, just ignore the upload progress. + const uploadProgress = videoLoaded ? null : uploadProgressRaw; const isUploading = uploadProgress?.status === "uploading"; const isUploadFailed = uploadProgress?.status === "failed"; diff --git a/apps/web/app/s/[videoId]/_components/HLSVideoPlayer.tsx b/apps/web/app/s/[videoId]/_components/HLSVideoPlayer.tsx index a8dc133be..f0f8ecec8 100644 --- a/apps/web/app/s/[videoId]/_components/HLSVideoPlayer.tsx +++ b/apps/web/app/s/[videoId]/_components/HLSVideoPlayer.tsx @@ -280,7 +280,12 @@ export function HLSVideoPlayer({ }; }, [captionsSrc]); - const uploadProgress = useUploadProgress(videoId, hasActiveUpload || false); + const uploadProgressRaw = useUploadProgress( + videoId, + hasActiveUpload || false, + ); + // if the video comes back from S3, just ignore the upload progress. + const uploadProgress = videoLoaded ? null : uploadProgressRaw; const isUploading = uploadProgress?.status === "uploading"; const isUploadFailed = uploadProgress?.status === "failed"; diff --git a/apps/web/components/VideoThumbnail.tsx b/apps/web/components/VideoThumbnail.tsx index 965e6ae52..605da0e46 100644 --- a/apps/web/components/VideoThumbnail.tsx +++ b/apps/web/components/VideoThumbnail.tsx @@ -3,7 +3,9 @@ import { queryOptions, useQuery } from "@tanstack/react-query"; import clsx from "clsx"; import moment from "moment"; import Image from "next/image"; -import { memo, useEffect, useRef, useState } from "react"; +import { memo, useEffect, useRef } from "react"; + +export type ImageLoadingStatus = "loading" | "success" | "error"; interface VideoThumbnailProps { videoId: string; @@ -12,6 +14,8 @@ interface VideoThumbnailProps { objectFit?: string; containerClass?: string; videoDuration?: number; + imageStatus: ImageLoadingStatus; + setImageStatus: (status: ImageLoadingStatus) => void; } const formatDuration = (durationSecs: number) => { @@ -61,16 +65,14 @@ export const VideoThumbnail: React.FC = memo( objectFit = "cover", containerClass, videoDuration, + imageStatus, + setImageStatus, }) => { const imageUrl = useQuery(imageUrlQuery(videoId)); const imageRef = useRef(null); const randomGradient = `linear-gradient(to right, ${generateRandomGrayScaleColor()}, ${generateRandomGrayScaleColor()})`; - const [imageStatus, setImageStatus] = useState< - "loading" | "error" | "success" - >("loading"); - useEffect(() => { if (imageRef.current?.complete && imageRef.current.naturalWidth !== 0) { setImageStatus("success"); diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml new file mode 100644 index 000000000..531e155be --- /dev/null +++ b/crates/api/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "cap-api" +version = "0.0.0" +edition = "2024" +publish = false + +[dependencies] +reqwest = "0.12.23" + +[lints] +workspace = true diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs new file mode 100644 index 000000000..b8aa47d34 --- /dev/null +++ b/crates/api/src/lib.rs @@ -0,0 +1,3 @@ +//! Types and implements for the Cap web API endpoints. + +// TODO: Migrate `apps/desktop/src-tauri/upload.rs` here once we figure out how auth will work with that. diff --git a/crates/project/src/meta.rs b/crates/project/src/meta.rs index d2d0457af..496ac6d5d 100644 --- a/crates/project/src/meta.rs +++ b/crates/project/src/meta.rs @@ -8,7 +8,6 @@ use std::{ path::{Path, PathBuf}, }; use tracing::{debug, info, warn}; -// use tracing::{debug, warn}; use crate::{ CaptionsData, CursorEvents, CursorImage, ProjectConfiguration, XY, @@ -73,9 +72,47 @@ pub struct RecordingMeta { pub sharing: Option, #[serde(flatten)] pub inner: RecordingMetaInner, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub upload: Option, } -impl specta::Flatten for RecordingMetaInner {} +#[derive(Deserialize, Serialize, Clone, Type, Debug)] +pub struct S3UploadMeta { + pub id: String, +} + +#[derive(Clone, Serialize, Deserialize, specta::Type, Debug)] +pub struct VideoUploadInfo { + pub id: String, + pub link: String, + pub config: S3UploadMeta, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Type)] +#[serde(tag = "state")] +pub enum UploadMeta { + MultipartUpload { + // Cap web identifier + video_id: String, + // Data for resuming + file_path: PathBuf, + pre_created_video: VideoUploadInfo, + recording_dir: PathBuf, + }, + SinglePartUpload { + // Cap web identifier + video_id: String, + // Path of the Cap file + recording_dir: PathBuf, + // Path to video and screenshot files for resuming + file_path: PathBuf, + screenshot_path: PathBuf, + }, + Failed { + error: String, + }, + Complete, +} #[derive(Debug, Clone, Serialize, Deserialize, Type)] #[serde(untagged, rename_all = "camelCase")] @@ -84,16 +121,29 @@ pub enum RecordingMetaInner { Instant(InstantRecordingMeta), } +impl specta::Flatten for RecordingMetaInner {} + #[derive(Debug, Clone, Serialize, Deserialize, Type)] -pub struct InstantRecordingMeta { - pub fps: u32, - pub sample_rate: Option, +#[serde(untagged, rename_all = "camelCase")] +pub enum InstantRecordingMeta { + InProgress { + // This field means nothing and is just because this enum is untagged. + recording: bool, + }, + Failed { + error: String, + }, + Complete { + fps: u32, + sample_rate: Option, + }, } impl RecordingMeta { pub fn path(&self, relative: &RelativePathBuf) -> PathBuf { relative.to_path(&self.project_path) } + pub fn load_for_project(project_path: &Path) -> Result> { let meta_path = project_path.join("recording-meta.json"); let mut meta: Self = serde_json::from_str(&std::fs::read_to_string(&meta_path)?)?; @@ -165,12 +215,20 @@ pub enum StudioRecordingMeta { } impl StudioRecordingMeta { + pub fn status(&self) -> StudioRecordingStatus { + match self { + StudioRecordingMeta::SingleSegment { .. } => StudioRecordingStatus::Complete, + StudioRecordingMeta::MultipleSegments { inner } => inner + .status + .clone() + .unwrap_or(StudioRecordingStatus::Complete), + } + } + pub fn camera_path(&self) -> Option { match self { - StudioRecordingMeta::SingleSegment { segment } => { - segment.camera.as_ref().map(|c| c.path.clone()) - } - StudioRecordingMeta::MultipleSegments { inner, .. } => inner + Self::SingleSegment { segment } => segment.camera.as_ref().map(|c| c.path.clone()), + Self::MultipleSegments { inner, .. } => inner .segments .first() .and_then(|s| s.camera.as_ref().map(|c| c.path.clone())), @@ -186,8 +244,8 @@ impl StudioRecordingMeta { pub fn min_fps(&self) -> u32 { match self { - StudioRecordingMeta::SingleSegment { segment } => segment.display.fps, - StudioRecordingMeta::MultipleSegments { inner, .. } => { + Self::SingleSegment { segment } => segment.display.fps, + Self::MultipleSegments { inner, .. } => { inner.segments.iter().map(|s| s.display.fps).min().unwrap() } } @@ -195,8 +253,8 @@ impl StudioRecordingMeta { pub fn max_fps(&self) -> u32 { match self { - StudioRecordingMeta::SingleSegment { segment } => segment.display.fps, - StudioRecordingMeta::MultipleSegments { inner, .. } => { + Self::SingleSegment { segment } => segment.display.fps, + Self::MultipleSegments { inner, .. } => { inner.segments.iter().map(|s| s.display.fps).max().unwrap() } } @@ -222,6 +280,16 @@ pub struct MultipleSegments { pub segments: Vec, #[serde(default, skip_serializing_if = "Cursors::is_empty")] pub cursors: Cursors, + #[serde(default)] + pub status: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Type)] +#[serde(tag = "status")] +pub enum StudioRecordingStatus { + InProgress, + Failed { error: String }, + Complete, } #[derive(Debug, Clone, Serialize, Deserialize, Type)] diff --git a/crates/recording/src/instant_recording.rs b/crates/recording/src/instant_recording.rs index ecf9d1f22..0a1ddc57c 100644 --- a/crates/recording/src/instant_recording.rs +++ b/crates/recording/src/instant_recording.rs @@ -108,7 +108,7 @@ impl Message for Actor { Ok(CompletedRecording { project_path: self.recording_dir.clone(), - meta: InstantRecordingMeta { + meta: InstantRecordingMeta::Complete { fps: self.video_info.fps(), sample_rate: None, }, diff --git a/crates/recording/src/studio_recording.rs b/crates/recording/src/studio_recording.rs index eb4784e4b..dfe1d4c4f 100644 --- a/crates/recording/src/studio_recording.rs +++ b/crates/recording/src/studio_recording.rs @@ -549,6 +549,7 @@ async fn stop_recording( }) .collect(), ), + status: Some(StudioRecordingStatus::Complete), }, }; diff --git a/packages/ui-solid/src/auto-imports.d.ts b/packages/ui-solid/src/auto-imports.d.ts index 22674b3e1..6c370f546 100644 --- a/packages/ui-solid/src/auto-imports.d.ts +++ b/packages/ui-solid/src/auto-imports.d.ts @@ -91,4 +91,6 @@ declare global { const IconMdiLoading: typeof import("~icons/mdi/loading.jsx")["default"] const IconMdiMonitor: typeof import('~icons/mdi/monitor.jsx')['default'] const IconPhMonitorBold: typeof import('~icons/ph/monitor-bold.jsx')['default'] + const IconPhRecordFill: typeof import('~icons/ph/record-fill.jsx')['default'] + const IconPhWarningBold: typeof import('~icons/ph/warning-bold.jsx')['default'] } diff --git a/packages/web-backend/src/Videos/VideosRepo.ts b/packages/web-backend/src/Videos/VideosRepo.ts index daa597ff6..d4d2e3d3f 100644 --- a/packages/web-backend/src/Videos/VideosRepo.ts +++ b/packages/web-backend/src/Videos/VideosRepo.ts @@ -45,7 +45,16 @@ export class VideosRepo extends Effect.Service()("VideosRepo", { }); const delete_ = (id: Video.VideoId) => - db.execute((db) => db.delete(Db.videos).where(Dz.eq(Db.videos.id, id))); + db.execute(async (db) => + db.transaction((db) => + Promise.all([ + db.delete(Db.videos).where(Dz.eq(Db.videos.id, id)), + db + .delete(Db.videoUploads) + .where(Dz.eq(Db.videoUploads.videoId, id)), + ]), + ), + ); const create = (data: CreateVideoInput) => Effect.gen(function* () { diff --git a/packages/web-backend/src/Videos/index.ts b/packages/web-backend/src/Videos/index.ts index d06a7422b..e2986c0cb 100644 --- a/packages/web-backend/src/Videos/index.ts +++ b/packages/web-backend/src/Videos/index.ts @@ -5,7 +5,6 @@ import { Array, Effect, Option, pipe } from "effect"; import { Database } from "../Database.ts"; import { S3Buckets } from "../S3Buckets/index.ts"; -import { S3BucketAccess } from "../S3Buckets/S3BucketAccess.ts"; import { VideosPolicy } from "./VideosPolicy.ts"; import { VideosRepo } from "./VideosRepo.ts";