Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/features/analyze/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (h *Handler) StartAcoustIDAnalysis(c *fiber.Ctx) error {

slog.Info("AcoustID analysis job started successfully", "jobID", jobID)

// // Trigger HTMX to refresh the job list
c.Set("HX-Trigger", "refreshJobList")

if c.Get("HX-Request") == "true" {
Expand Down Expand Up @@ -66,7 +65,6 @@ func (h *Handler) StartLyricsAnalysis(c *fiber.Ctx) error {

slog.Info("Lyrics analysis job started successfully", "jobID", jobID, "provider", provider)

// Trigger HTMX to refresh the job list
c.Set("HX-Trigger", "refreshJobList")

if c.Get("HX-Request") == "true" {
Expand Down
21 changes: 17 additions & 4 deletions src/features/jobs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ func (s *Service) RegisterHandler(jobType string, handler TaskHandler) {
}

func (s *Service) StartJob(jobType string, name string, metadata map[string]any) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()

// Check if this is a DAP job and if dap_sync is running or pending
if strings.HasPrefix(jobType, "dap_") && s.hasDapSyncRunningOrPending() {
return "", fmt.Errorf("cannot start DAP job: dap_sync job is running or pending")
}

// Create a copy of jobType to prevent potential memory sharing issues
jobTypeCopy := strings.Clone(jobType)
job := &Job{
Expand Down Expand Up @@ -207,16 +215,12 @@ func (s *Service) StartJob(jobType string, name string, metadata map[string]any)
job.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
}

s.mu.Lock()
s.jobs[job.ID] = job

// Check if we can start this job immediately
if !s.isAnyJobRunning() {
job.Status = JobStatusRunning
s.mu.Unlock()
go s.executeJob(job)
} else {
s.mu.Unlock()
}

return job.ID, nil
Expand Down Expand Up @@ -365,6 +369,15 @@ func (s *Service) isAnyJobRunning() bool {
return false
}

func (s *Service) hasDapSyncRunningOrPending() bool {
for _, job := range s.jobs {
if job.Type == "dap_sync" && (job.Status == JobStatusRunning || job.Status == JobStatusPending) {
return true
}
}
return false
}

func (s *Service) startNextPendingJob() {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions src/features/syncdap/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (h *Handler) TriggerSync(c *fiber.Ctx) error {
}
slog.Info("TriggerSync: sync job started", "jobID", jobID)
c.Response().Header.Set("HX-Trigger", "jobStarted")
c.Set("HX-Trigger", "refreshJobList")
return c.Render("toast/toastInfo", fiber.Map{
"Msg": "Sync job started!",
})
Expand Down
72 changes: 31 additions & 41 deletions src/features/syncdap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type DeviceStatus struct {
MountPath string
LastSeen time.Time
Error string
// Sync progress fields
Syncing bool
JobID string
}

// Service handles device synchronization monitoring
Expand Down Expand Up @@ -85,36 +82,25 @@ func (s *Service) checkDevices() {
// Check if we have existing status for this device
var status DeviceStatus
if existing, exists := s.statuses[device.UUID]; exists {
// Preserve sync progress data if device is currently syncing
status = existing
status.LastSeen = time.Now()
// Only update mount status if not currently syncing
if !status.Syncing {
mounted, mountPath, err := s.isDeviceMounted(device)
if err != nil {
slog.Error("Failed to check if device is mounted", "error", err)
status.Error = err.Error()
} else {
status.Mounted = mounted
status.MountPath = mountPath
}
}
} else {
// New device, create fresh status
status = DeviceStatus{
UUID: device.UUID,
Name: device.Name,
LastSeen: time.Now(),
}
// Check mount status for new devices
mounted, mountPath, err := s.isDeviceMounted(device)
if err != nil {
slog.Error("Failed to check if device is mounted", "error", err)
status.Error = err.Error()
} else {
status.Mounted = mounted
status.MountPath = mountPath
}
}

// Check mount status
mounted, mountPath, err := s.isDeviceMounted(device)
if err != nil {
slog.Error("Failed to check if device is mounted", "error", err)
status.Error = err.Error()
} else {
status.Mounted = mounted
status.MountPath = mountPath
}

newStatuses[device.UUID] = status
Expand Down Expand Up @@ -190,6 +176,21 @@ func (s *Service) GetDeviceStatus(uuid string) (DeviceStatus, bool) {
return status, exists
}

// findRunningSyncJob finds the job ID of a running sync job for the given UUID
func (s *Service) findRunningSyncJob(uuid string) (string, bool) {
jobs := s.jobService.GetJobs()
for _, job := range jobs {
if job.Type == "dap_sync" && job.Status == "running" {
if job.Metadata != nil {
if jobUUID, ok := job.Metadata["uuid"].(string); ok && jobUUID == uuid {
return job.ID, true
}
}
}
}
return "", false
}

// StartSync starts a sync operation for a device
func (s *Service) StartSync(uuid string) (string, error) {
s.mu.Lock()
Expand All @@ -206,8 +207,9 @@ func (s *Service) StartSync(uuid string) (string, error) {
return "", fmt.Errorf("device not mounted")
}

if status.Syncing {
slog.Error("Sync already in progress", "uuid", uuid)
// Check if sync already in progress
if jobID, running := s.findRunningSyncJob(uuid); running {
slog.Error("Sync already in progress", "uuid", uuid, "jobID", jobID)
return "", fmt.Errorf("sync already in progress")
}

Expand All @@ -220,10 +222,6 @@ func (s *Service) StartSync(uuid string) (string, error) {
return "", fmt.Errorf("failed to start sync job: %w", err)
}

status.Syncing = true
status.JobID = jobID
s.statuses[uuid] = status

return jobID, nil
}

Expand All @@ -232,23 +230,15 @@ func (s *Service) CancelSync(uuid string) error {
s.mu.Lock()
defer s.mu.Unlock()

status, exists := s.statuses[uuid]
if !exists {
return fmt.Errorf("device not found")
}

if !status.Syncing {
jobID, running := s.findRunningSyncJob(uuid)
if !running {
return fmt.Errorf("no sync in progress")
}

err := s.jobService.CancelJob(status.JobID)
err := s.jobService.CancelJob(jobID)
if err != nil {
return fmt.Errorf("failed to cancel sync job: %w", err)
}

status.Syncing = false
status.JobID = ""
s.statuses[uuid] = status

return nil
}
73 changes: 55 additions & 18 deletions src/features/syncdap/sync_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package syncdap

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -191,53 +190,84 @@ func (e *SyncDapTask) Execute(ctx context.Context, job *jobs.Job, progressUpdate

cmd := exec.CommandContext(ctx, "rsync", rsyncArgs...)

// Capture stdout to parse progress
stdout, pipeErr := cmd.StdoutPipe()
// Capture stderr to parse progress and stats
stderrPipe, pipeErr := cmd.StderrPipe()
if pipeErr != nil {
return nil, fmt.Errorf("failed to create stdout pipe: %w", pipeErr)
return nil, fmt.Errorf("failed to create stderr pipe: %w", pipeErr)
}

// Capture stderr for error details
var stderr bytes.Buffer
cmd.Stderr = &stderr

// Start command
if startErr := cmd.Start(); startErr != nil {
return nil, fmt.Errorf("failed to start rsync: %w", startErr)
}

// Parse rsync output for progress
go e.parseRsyncOutput(job, stdout, progressUpdater)
// Parse rsync output for progress and stats
var stats SyncStats
go e.parseRsyncOutput(job, stderrPipe, progressUpdater, &stats)

// Wait for command to complete
waitErr := cmd.Wait()

if waitErr != nil {
if ctx.Err() == context.Canceled {
return nil, ctx.Err()
}
stderrOutput := stderr.String()
if stderrOutput != "" {
return nil, fmt.Errorf("rsync failed: %v - %s", waitErr, strings.TrimSpace(stderrOutput))
}
return nil, fmt.Errorf("rsync failed: %w", waitErr)
}

return nil, nil
finalMessage := fmt.Sprintf("Sync completed successfully. %s", stats.Summary)
job.Logger.Info(finalMessage)

return map[string]any{"stats": stats, "msg": finalMessage}, nil
}

// Cleanup does nothing for syncdap.
func (e *SyncDapTask) Cleanup(job *jobs.Job) error {
return nil
}

// parseRsyncOutput parses rsync progress output and updates status
func (e *SyncDapTask) parseRsyncOutput(job *jobs.Job, stdout io.Reader, progressUpdater func(int, string)) {
scanner := bufio.NewScanner(stdout)
// SyncStats represents the statistics from a sync operation
type SyncStats struct {
FilesTransferred int
FilesTotal int
DataTransferred string
DataTotal string
Summary string
}

// parseRsyncOutput parses rsync stderr output for progress and stats
func (e *SyncDapTask) parseRsyncOutput(job *jobs.Job, reader io.Reader, progressUpdater func(int, string), stats *SyncStats) {
scanner := bufio.NewScanner(reader)
fileRegex := regexp.MustCompile(`^(\S+)$`)
progressRegex := regexp.MustCompile(`^\s*(\d+)\s+(\d+)%\s+([\d.]+\w+/s).*$`)

for scanner.Scan() {
line := scanner.Text()
lineTrim := strings.TrimSpace(line)

// Check if it's a stats line (contains ":")
if strings.Contains(lineTrim, ":") {
parts := strings.SplitN(lineTrim, ":", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
switch key {
case "Number of files":
if val, err := strconv.Atoi(value); err == nil {
stats.FilesTotal = val
}
case "Number of files transferred", "Number of regular files transferred":
if val, err := strconv.Atoi(value); err == nil {
stats.FilesTransferred = val
}
case "Total file size":
stats.DataTotal = value
case "Total transferred file size":
stats.DataTransferred = value
}
}
continue
}

// Check if it's a file transfer line (filename on its own line)
if matches := fileRegex.FindStringSubmatch(line); len(matches) > 1 && !strings.Contains(line, "%") {
Expand All @@ -258,4 +288,11 @@ func (e *SyncDapTask) parseRsyncOutput(job *jobs.Job, stdout io.Reader, progress
}
}
}

// Build summary after parsing
if stats.FilesTransferred > 0 {
stats.Summary = fmt.Sprintf("Transferred %d files (%s)", stats.FilesTransferred, stats.DataTransferred)
} else {
stats.Summary = "No files needed to be transferred"
}
}
2 changes: 1 addition & 1 deletion src/features/syncdap/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *TelegramHandler) handleDeviceList(bot *tgbotapi.BotAPI, chatID int64) e
}

syncStatus := ""
if status.Syncing {
if _, running := h.service.findRunningSyncJob(status.UUID); running {
syncStatus = " (🔄 Syncing...)"
}

Expand Down
30 changes: 18 additions & 12 deletions views/jobs/job_card_footer.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@

{{ if and (ne $job.Metadata nil) (index $job.Metadata "msg") }}
{{ $stats := index $job.Metadata "stats" }}
{{ $colorClass := "bg-red-50/80 dark:bg-red-900/30 border border-red-200/60 dark:border-red-800/60 text-red-700 dark:text-red-300" }}
{{ if eq $job.Type "analyze_lyrics" }}
{{ $colorClass = "bg-pink-50/80 dark:bg-pink-900/30 border border-pink-200/60 dark:border-pink-800/60 text-pink-700 dark:text-pink-300" }}
{{ else if eq $job.Status "cancelled" }}
{{ $colorClass = "bg-gray-50/80 dark:bg-gray-900/30 border border-gray-200/60 dark:border-gray-800/60 text-gray-700 dark:text-gray-300" }}
{{ else if and (eq $job.Status "completed") $stats (gt $stats.TracksImported 0) }}
{{ $colorClass = "bg-green-50/80 dark:bg-green-900/30 border border-green-200/60 dark:border-green-800/60 text-green-700 dark:text-green-300" }}
{{ else if and $stats (gt $stats.TracksImported 0) }}
{{ $colorClass = "bg-yellow-50/80 dark:bg-yellow-900/30 border border-yellow-200/60 dark:border-yellow-800/60 text-yellow-700 dark:text-yellow-300" }}
{{ else if and $stats (gt $stats.Queued 0) (eq $stats.Errors 0) }}
{{ $colorClass = "bg-purple-50/80 dark:bg-purple-900/30 border border-purple-200/60 dark:border-purple-800/60 text-purple-700 dark:text-purple-300" }}
{{ end }}
{{ $colorClass := "bg-red-50/80 dark:bg-red-900/30 border border-red-200/60 dark:border-red-800/60 text-red-700 dark:text-red-300" }}
{{ if eq $job.Type "analyze_lyrics" }}
{{ $colorClass = "bg-pink-50/80 dark:bg-pink-900/30 border border-pink-200/60 dark:border-pink-800/60 text-pink-700 dark:text-pink-300" }}
{{ else if eq $job.Status "cancelled" }}
{{ $colorClass = "bg-gray-50/80 dark:bg-gray-900/30 border border-gray-200/60 dark:border-gray-800/60 text-gray-700 dark:text-gray-300" }}
{{ else if eq $job.Type "dap_sync" }}
{{ if eq $job.Status "completed" }}
{{ $colorClass = "bg-green-50/80 dark:bg-green-900/30 border border-green-200/60 dark:border-green-800/60 text-green-700 dark:text-green-300" }}
{{ else }}
{{ $colorClass = "bg-blue-50/80 dark:bg-blue-900/30 border border-blue-200/60 dark:border-blue-800/60 text-blue-700 dark:text-blue-300" }}
{{ end }}
{{ else if and (eq $job.Status "completed") $stats (gt $stats.TracksImported 0) }}
{{ $colorClass = "bg-green-50/80 dark:bg-green-900/30 border border-green-200/60 dark:border-green-800/60 text-green-700 dark:text-green-300" }}
{{ else if and $stats (gt $stats.TracksImported 0) }}
{{ $colorClass = "bg-yellow-50/80 dark:bg-yellow-900/30 border border-yellow-200/60 dark:border-yellow-800/60 text-yellow-700 dark:text-yellow-300" }}
{{ else if and $stats (gt $stats.Queued 0) (eq $stats.Errors 0) }}
{{ $colorClass = "bg-purple-50/80 dark:bg-purple-900/30 border border-purple-200/60 dark:border-purple-800/60 text-purple-700 dark:text-purple-300" }}
{{ end }}
<div class="mt-1.5 p-1.5 {{ $colorClass }} rounded-md text-xs backdrop-blur-sm">
{{ index $job.Metadata "msg" }}
</div>
Expand Down
Loading