Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 128 additions & 10 deletions apps/desktop/src-tauri/src/frame_ws.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::sync::Arc;

use flume::Receiver;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;

#[derive(Clone)]
pub struct WSFrame {
pub data: Vec<u8>,
pub width: u32,
pub height: u32,
pub stride: u32,
}

pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationToken) {
pub async fn create_watch_frame_ws(
frame_rx: watch::Receiver<Option<WSFrame>>,
) -> (u16, CancellationToken) {
use axum::{
extract::{
State,
Expand All @@ -19,9 +22,8 @@ pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationT
response::IntoResponse,
routing::get,
};
use tokio::sync::Mutex;

type RouterState = Arc<Mutex<Receiver<WSFrame>>>;
type RouterState = watch::Receiver<Option<WSFrame>>;

#[axum::debug_handler]
async fn ws_handler(
Expand All @@ -31,17 +33,133 @@ pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationT
ws.on_upgrade(move |socket| handle_socket(socket, state))
}

async fn handle_socket(mut socket: WebSocket, state: RouterState) {
let camera_rx = state.lock().await;
async fn handle_socket(mut socket: WebSocket, mut camera_rx: RouterState) {
println!("socket connection established");
tracing::info!("Socket connection established");
let now = std::time::Instant::now();

// Send the current frame immediately upon connection (if one exists)
// This ensures the client doesn't wait for the next config change to see the image
{
let frame_opt = camera_rx.borrow().clone();
if let Some(mut frame) = frame_opt {
frame.data.extend_from_slice(&frame.stride.to_le_bytes());
frame.data.extend_from_slice(&frame.height.to_le_bytes());
frame.data.extend_from_slice(&frame.width.to_le_bytes());

if let Err(e) = socket.send(Message::Binary(frame.data)).await {
tracing::error!("Failed to send initial frame to socket: {:?}", e);
return;
}
}
}

loop {
tokio::select! {
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => {
tracing::info!("WebSocket closed");
break;
}
Some(Ok(_)) => {
tracing::info!("Received message from socket (ignoring)");
}
Some(Err(e)) => {
tracing::error!("WebSocket error: {:?}", e);
break;
}
}
},
res = camera_rx.changed() => {
if res.is_err() {
tracing::error!("Camera channel closed");
break;
}
let frame_opt = camera_rx.borrow().clone();
if let Some(mut frame) = frame_opt {
frame.data.extend_from_slice(&frame.stride.to_le_bytes());
frame.data.extend_from_slice(&frame.height.to_le_bytes());
frame.data.extend_from_slice(&frame.width.to_le_bytes());

if let Err(e) = socket.send(Message::Binary(frame.data)).await {
tracing::error!("Failed to send frame to socket: {:?}", e);
break;
}
}
}
}
}

let elapsed = now.elapsed();
println!("Websocket closing after {elapsed:.2?}");
tracing::info!("Websocket closing after {elapsed:.2?}");
}

let router = axum::Router::new()
.route("/", get(ws_handler))
.with_state(frame_rx);

let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tracing::info!("WebSocket server listening on port {}", port);

let cancel_token = CancellationToken::new();
let cancel_token_child = cancel_token.child_token();
tokio::spawn(async move {
let server = axum::serve(listener, router.into_make_service());
tokio::select! {
_ = server => {},
_ = cancel_token.cancelled() => {
println!("WebSocket server shutting down");
}
}
});

(port, cancel_token_child)
}

pub async fn create_frame_ws(frame_rx: flume::Receiver<WSFrame>) -> (u16, CancellationToken) {
use axum::{
extract::{
State,
ws::{Message, WebSocket, WebSocketUpgrade},
},
response::IntoResponse,
routing::get,
};

type RouterState = Arc<flume::Receiver<WSFrame>>;

#[axum::debug_handler]
async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<RouterState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}

async fn handle_socket(mut socket: WebSocket, camera_rx: RouterState) {
println!("socket connection established");
tracing::info!("Socket connection established");
let now = std::time::Instant::now();

loop {
tokio::select! {
_ = socket.recv() => {
tracing::info!("Received message from socket");
break;
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => {
tracing::info!("WebSocket closed");
break;
}
Some(Ok(_)) => {
tracing::info!("Received message from socket (ignoring)");
}
Some(Err(e)) => {
tracing::error!("WebSocket error: {:?}", e);
break;
}
}
},
incoming_frame = camera_rx.recv_async() => {
match incoming_frame {
Expand Down Expand Up @@ -74,7 +192,7 @@ pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationT

let router = axum::Router::new()
.route("/", get(ws_handler))
.with_state(Arc::new(Mutex::new(frame_rx)));
.with_state(Arc::new(frame_rx));

let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
Expand Down
47 changes: 46 additions & 1 deletion apps/desktop/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod posthog;
mod presets;
mod recording;
mod recording_settings;
mod screenshot_editor;
mod target_select_overlay;
mod thumbnails;
mod tray;
Expand Down Expand Up @@ -58,6 +59,9 @@ use kameo::{Actor, actor::ActorRef};
use notifications::NotificationType;
use recording::{InProgressRecording, RecordingEvent, RecordingInputKind};
use scap_targets::{Display, DisplayId, WindowId, bounds::LogicalBounds};
use screenshot_editor::{
ScreenshotEditorInstances, create_screenshot_editor_instance, update_screenshot_config,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use specta::Type;
Expand All @@ -84,7 +88,9 @@ use tracing::*;
use upload::{create_or_get_video, upload_image, upload_video};
use web_api::AuthedApiError;
use web_api::ManagerExt as WebManagerExt;
use windows::{CapWindowId, EditorWindowIds, ShowCapWindow, set_window_transparent};
use windows::{
CapWindowId, EditorWindowIds, ScreenshotEditorWindowIds, ShowCapWindow, set_window_transparent,
};

use crate::{
camera::CameraPreviewManager,
Expand Down Expand Up @@ -1107,6 +1113,25 @@ async fn copy_screenshot_to_clipboard(
Ok(())
}

#[tauri::command]
#[specta::specta]
#[instrument(skip(clipboard, data))]
async fn copy_image_to_clipboard(
clipboard: MutableState<'_, ClipboardContext>,
data: Vec<u8>,
) -> Result<(), String> {
println!("Copying image to clipboard ({} bytes)", data.len());

let img_data = clipboard_rs::RustImageData::from_bytes(&data)
.map_err(|e| format!("Failed to create image data from bytes: {e}"))?;
clipboard
.write()
.await
.set_image(img_data)
.map_err(|err| format!("Failed to copy image to clipboard: {err}"))?;
Ok(())
}

#[tauri::command]
#[specta::specta]
#[instrument(skip(_app))]
Expand Down Expand Up @@ -2223,6 +2248,7 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
recording::resume_recording,
recording::restart_recording,
recording::delete_recording,
recording::take_screenshot,
recording::list_cameras,
recording::list_capture_windows,
recording::list_capture_displays,
Expand All @@ -2241,6 +2267,7 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
copy_file_to_path,
copy_video_to_clipboard,
copy_screenshot_to_clipboard,
copy_image_to_clipboard,
open_file_path,
get_video_metadata,
create_editor_instance,
Expand All @@ -2256,6 +2283,8 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
permissions::request_permission,
upload_exported_video,
upload_screenshot,
create_screenshot_editor_instance,
update_screenshot_config,
get_recording_meta,
save_file_dialog,
list_recordings,
Expand Down Expand Up @@ -2431,6 +2460,7 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
])
.map_label(|label| match label {
label if label.starts_with("editor-") => "editor",
label if label.starts_with("screenshot-editor-") => "screenshot-editor",
label if label.starts_with("window-capture-occluder-") => {
"window-capture-occluder"
}
Expand All @@ -2448,6 +2478,7 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
fake_window::init(&app);
app.manage(target_select_overlay::WindowFocusManager::default());
app.manage(EditorWindowIds::default());
app.manage(ScreenshotEditorWindowIds::default());
#[cfg(target_os = "macos")]
app.manage(crate::platform::ScreenCapturePrewarmer::default());
app.manage(http_client::HttpClient::default());
Expand Down Expand Up @@ -2682,6 +2713,18 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
reopen_main_window(&app);
}
}
CapWindowId::ScreenshotEditor { id } => {
let window_ids =
ScreenshotEditorWindowIds::get(window.app_handle());
window_ids.ids.lock().unwrap().retain(|(_, _id)| *_id != id);

tokio::spawn(ScreenshotEditorInstances::remove(window.clone()));

#[cfg(target_os = "windows")]
if CapWindowId::Settings.get(&app).is_none() {
reopen_main_window(&app);
}
}
CapWindowId::Settings => {
for (label, window) in app.webview_windows() {
if let Ok(id) = CapWindowId::from_str(&label)
Expand Down Expand Up @@ -2792,6 +2835,7 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
tauri::RunEvent::Reopen { .. } => {
let has_window = _handle.webview_windows().iter().any(|(label, _)| {
label.starts_with("editor-")
|| label.starts_with("screenshot-editor-")
|| label.as_str() == "settings"
|| label.as_str() == "signin"
});
Expand All @@ -2802,6 +2846,7 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) {
.iter()
.find(|(label, _)| {
label.starts_with("editor-")
|| label.starts_with("screenshot-editor-")
|| label.as_str() == "settings"
|| label.as_str() == "signin"
})
Expand Down
Loading
Loading