Skip to content

Commit d732824

Browse files
Add extensive logging to worker job processing
- Log every step of job processing for debugging - Check and report if background image exists - Log FFmpeg stdout/stderr output - Log file sizes and upload status - Log status update results Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 24cc8b6 commit d732824

File tree

1 file changed

+75
-14
lines changed

1 file changed

+75
-14
lines changed

src/jobs.rs

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -207,45 +207,78 @@ pub async fn run_worker(queue_url: String) -> Result<()> {
207207
}
208208

209209
async fn poll_queue(queue_url: &str) -> Result<Option<Job>> {
210+
let url = format!("{}/jobs/pending", queue_url);
211+
info!("Polling: {}", url);
212+
210213
let client = reqwest::Client::new();
211214
let response = client
212-
.get(format!("{}/jobs/pending", queue_url))
215+
.get(&url)
213216
.send()
214217
.await
215218
.map_err(|e| anyhow::anyhow!("Failed to poll queue: {}", e))?;
216219

217220
let status = response.status();
221+
info!("Poll response status: {}", status);
222+
218223
if status.as_u16() == 204 {
219224
return Ok(None);
220225
}
221226

222-
let job: Job = response
223-
.json()
224-
.await
225-
.map_err(|e| anyhow::anyhow!("Failed to parse job: {}", e))?;
227+
let body = response.text().await.map_err(|e| anyhow::anyhow!("Failed to read response: {}", e))?;
228+
info!("Poll response body: {}", body);
229+
230+
let job: Job = serde_json::from_str(&body)
231+
.map_err(|e| anyhow::anyhow!("Failed to parse job: {} - body was: {}", e, body))?;
232+
info!("Parsed job: {} with status {:?}", job.id, job.status);
226233
Ok(Some(job))
227234
}
228235

229236
async fn process_job(job: Job) -> Result<()> {
230-
info!("Processing job: {}", job.id);
237+
info!("=== PROCESSING JOB START ===");
238+
info!("Job ID: {}", job.id);
239+
info!("Video path: {}", job.video_path);
240+
info!("Output path: {}", job.output_path);
241+
info!("Selection: {:?}", job.selection);
242+
info!("WebDAV URL: {}", job.webdav_config.url);
243+
info!("Queue URL: {:?}", job.webdav_config.queue_url);
231244

232245
let worker_id = format!("worker-{}", uuid::Uuid::new_v4().simple());
233-
let temp_dir = format!("./worker-temp-{}", worker_id);
246+
let temp_dir = format!("/tmp/worker-{}", worker_id);
247+
info!("Creating temp dir: {}", temp_dir);
234248
fs::create_dir_all(&temp_dir)?;
235249

236250
// Build input URL with auth for direct FFmpeg streaming
237251
let video_url = build_webdav_download_url(&job.webdav_config, &job.video_path);
238-
let _input_path = format!("{}/input.mp4", temp_dir);
252+
info!("Video URL for FFmpeg: {}", video_url);
239253

240254
// Background image path (downloaded by cloud-init to /root)
241255
let bg_image_path = "/root/gpc-bg.png";
256+
info!("Background image path: {}", bg_image_path);
257+
258+
// Check if background image exists
259+
if std::path::Path::new(bg_image_path).exists() {
260+
info!("Background image exists at {}", bg_image_path);
261+
} else {
262+
error!("Background image NOT FOUND at {}", bg_image_path);
263+
// Try to list /root to see what's there
264+
if let Ok(entries) = std::fs::read_dir("/root") {
265+
info!("Contents of /root:");
266+
for entry in entries {
267+
if let Ok(e) = entry {
268+
info!(" - {:?}", e.path());
269+
}
270+
}
271+
}
272+
}
242273

243274
let output_path = format!("{}/output.mp4", temp_dir);
275+
info!("Output path: {}", output_path);
244276

245277
// Build FFmpeg filter complex based on quadrant selection
246278
let filter_complex = build_filter_complex(&job.selection)?;
279+
info!("FFmpeg filter: {}", filter_complex);
247280

248-
info!("Running FFmpeg with filter: {}", filter_complex);
281+
info!("Starting FFmpeg...");
249282

250283
// Run FFmpeg command
251284
let ffmpeg_result = tokio::process::Command::new("ffmpeg")
@@ -266,25 +299,51 @@ async fn process_job(job: Job) -> Result<()> {
266299

267300
match ffmpeg_result {
268301
Ok(output) => {
302+
let stdout = String::from_utf8_lossy(&output.stdout);
303+
let stderr = String::from_utf8_lossy(&output.stderr);
304+
info!("FFmpeg exit status: {}", output.status);
305+
if !stdout.is_empty() {
306+
info!("FFmpeg stdout: {}", stdout);
307+
}
308+
if !stderr.is_empty() {
309+
info!("FFmpeg stderr: {}", stderr);
310+
}
311+
269312
if output.status.success() {
270-
info!("FFmpeg processing successful");
313+
info!("FFmpeg processing successful!");
314+
315+
// Check output file
316+
match fs::metadata(&output_path) {
317+
Ok(meta) => info!("Output file size: {} bytes", meta.len()),
318+
Err(e) => error!("Failed to stat output file: {}", e),
319+
}
271320

272321
// Upload result back to WebDAV
322+
info!("Reading output file...");
273323
let output_data = fs::read(&output_path)?;
324+
info!("Output file read, size: {} bytes", output_data.len());
325+
326+
info!("Creating WebDAV client...");
274327
let dav_client = WebDavClient::new(&job.webdav_config)?;
275328

276329
info!("Uploading processed video to: {}", job.output_path);
277-
dav_client.upload_file(&job.output_path, output_data).await?;
330+
match dav_client.upload_file(&job.output_path, output_data).await {
331+
Ok(_) => info!("Upload successful!"),
332+
Err(e) => error!("Upload FAILED: {}", e),
333+
}
278334

279335
info!("Job {} completed successfully", job.id);
280336

281337
// Update job to completed via queue URL
282338
if let Some(queue_url) = &job.webdav_config.queue_url {
283-
let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Completed, None).await;
339+
info!("Updating job status to completed at: {}", queue_url);
340+
match update_job_status_remote(queue_url, &job.id, JobStatus::Completed, None).await {
341+
Ok(_) => info!("Status update successful"),
342+
Err(e) => error!("Status update failed: {}", e),
343+
}
284344
}
285345
} else {
286-
let stderr = String::from_utf8_lossy(&output.stderr);
287-
error!("FFmpeg failed: {}", stderr);
346+
error!("FFmpeg FAILED with exit code: {}", output.status);
288347

289348
if let Some(queue_url) = &job.webdav_config.queue_url {
290349
let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Failed, None).await;
@@ -301,8 +360,10 @@ async fn process_job(job: Job) -> Result<()> {
301360
}
302361

303362
// Cleanup temp directory
363+
info!("Cleaning up temp dir: {}", temp_dir);
304364
let _ = fs::remove_dir_all(&temp_dir);
305365

366+
info!("=== PROCESSING JOB END ===");
306367
Ok(())
307368
}
308369

0 commit comments

Comments
 (0)