diff --git a/README.md b/README.md index c6d8f01..7b7e27f 100644 --- a/README.md +++ b/README.md @@ -34,10 +34,10 @@ services: DEST_PLEX_TOKEN: "your-destination-plex-token" # SSH Configuration (choose password OR key-based auth) - OPT_SSH_USER: "your-ssh-user" - OPT_SSH_PASSWORD: "your-ssh-password" # For password auth - # OPT_SSH_KEY_PATH: "/keys/id_rsa" # For key-based auth - OPT_SSH_PORT: "22" + SSH_USER: "your-ssh-user" + SSH_PASSWORD: "your-ssh-password" # For password auth + # SSH_KEY_PATH: "/keys/id_rsa" # For key-based auth + SSH_PORT: "22" # Sync Configuration SYNC_LABEL: "Sync2Secondary" # Label to identify content to sync @@ -46,13 +46,15 @@ services: DRY_RUN: "false" # Set to "true" for testing # Path Mapping - SOURCE_REPLACE_FROM: "/data/Media" # Source path prefix to replace - SOURCE_REPLACE_TO: "/media/source" # Local container path + SOURCE_REPLACE_FROM: "/data/Media" # Source path prefix to strip for destination + SOURCE_REPLACE_TO: "/media/source" # Local container path (or leave empty for same-volume mounting) DEST_ROOT_DIR: "/mnt/data" # Destination server root path volumes: # Mount your media directories (adjust paths as needed) - "/path/to/your/media:/media/source:ro" # Read-only source media + # Alternative: Same-volume mounting (leave SOURCE_REPLACE_TO empty) + # - "/data/Media:/data/Media:ro" # For SSH key authentication (uncomment if using keys) # - "/path/to/ssh/keys:/keys:ro" @@ -129,12 +131,12 @@ services: | Variable | Description | Example | Required | |----------|-------------|---------|----------| -| `OPT_SSH_USER` | SSH username | `mediauser` | ✅ | -| `OPT_SSH_PASSWORD` | SSH password (for password auth) | `secretpass` | ❌* | -| `OPT_SSH_KEY_PATH` | SSH private key path (for key auth) | `/keys/id_rsa` | ❌* | -| `OPT_SSH_PORT` | SSH port | `22` | ❌ | +| `SSH_USER` | SSH username | `mediauser` | ✅ | +| `SSH_PASSWORD` | SSH password (for password auth) | `secretpass` | ❌* | +| `SSH_KEY_PATH` | SSH private key path (for key auth) | `/keys/id_rsa` | ❌* | +| `SSH_PORT` | SSH port | `22` | ❌ | -*Either password or key path is required +*Either password or key path is required. Password auth requires `sshpass` to be installed for both SCP and rsync transfers. ### Sync Configuration @@ -150,8 +152,8 @@ services: | Variable | Description | Example | Required | |----------|-------------|---------|----------| -| `SOURCE_REPLACE_FROM` | Source path prefix to replace | `/data/Media` | ❌ | -| `SOURCE_REPLACE_TO` | Container path for source media | `/media/source` | ❌ | +| `SOURCE_REPLACE_FROM` | Source path prefix to strip for destination mapping | `/data/Media` | ❌ | +| `SOURCE_REPLACE_TO` | Container path for source media (leave empty for same-volume mounting) | `/media/source` | ❌ | | `DEST_ROOT_DIR` | Destination server root directory | `/mnt/data` | ✅ | @@ -407,8 +409,8 @@ We welcome contributions! Here's how to get started: - Verify SSH credentials are correct - Ensure SSH user has access to destination paths - Test SSH connection manually: `ssh user@destination-server` -- For password auth: Ensure `OPT_SSH_PASSWORD` is set -- For key auth: Ensure private key is mounted and `OPT_SSH_KEY_PATH` is correct +- For password auth: Ensure `SSH_PASSWORD` is set and `sshpass` is installed +- For key auth: Ensure private key is mounted and `SSH_KEY_PATH` is correct ### Rsync Not Found diff --git a/internal/config/config.go b/internal/config/config.go index f899659..764c3e4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "os" + "path/filepath" "strconv" "strings" "time" @@ -13,9 +14,10 @@ type Config struct { Source PlexServerConfig `json:"source"` Destination PlexServerConfig `json:"destination"` SyncLabel string `json:"syncLabel"` - SourceReplaceFrom string `json:"sourceReplaceFrom"` // Optional: Source path pattern to replace (e.g., "/data/Movies") - SourceReplaceTo string `json:"sourceReplaceTo"` // Optional: Local path replacement (e.g., "M:\\Movies") + SourceReplaceFrom string `json:"sourceReplaceFrom"` // Optional: Source path prefix to strip (e.g., "/data/Movies") + SourceReplaceTo string `json:"sourceReplaceTo"` // Optional: Local path replacement (e.g., "/media/source"). Leave empty for same-volume mounting DestRootDir string `json:"destRootDir"` // Required: Destination root path (e.g., "/mnt/data/Movies") + TransferMethod string `json:"transferMethod"` // Optional: Force transfer method ("rsync" or "scp"), auto-detected if empty Interval time.Duration `json:"interval"` SSH SSHConfig `json:"ssh"` Performance PerformanceConfig `json:"performance"` @@ -82,11 +84,12 @@ func LoadConfig() (*Config, error) { SourceReplaceFrom: getEnvWithDefault("SOURCE_REPLACE_FROM", ""), SourceReplaceTo: getEnvWithDefault("SOURCE_REPLACE_TO", ""), DestRootDir: getEnvWithDefault("DEST_ROOT_DIR", ""), + TransferMethod: strings.ToLower(getEnvWithDefault("TRANSFER_METHOD", "")), // rsync, scp, or empty for auto-detection SSH: SSHConfig{ - User: getEnvWithDefault("OPT_SSH_USER", ""), - Password: getEnvWithDefault("OPT_SSH_PASSWORD", ""), - Port: getEnvWithDefault("OPT_SSH_PORT", "22"), - KeyPath: getEnvWithDefault("OPT_SSH_KEY_PATH", ""), // Keep for future use + User: getEnvWithDefault("SSH_USER", ""), + Password: getEnvWithDefault("SSH_PASSWORD", ""), + Port: getEnvWithDefault("SSH_PORT", "22"), + KeyPath: getEnvWithDefault("SSH_KEY_PATH", ""), // Keep for future use }, DryRun: parseBoolEnv("DRY_RUN", false), LogLevel: getEnvWithDefault("LOG_LEVEL", "INFO"), @@ -242,3 +245,78 @@ func parseFloatEnv(key string, defaultValue float64) float64 { } return defaultValue } + +// MapSourcePathToLocal converts a source Plex server path to a local filesystem path +func (c *Config) MapSourcePathToLocal(sourcePath string) (string, error) { + if sourcePath == "" { + return "", fmt.Errorf("source path is empty") + } + + // If no source replacement configured, use the Plex path as-is + if c.SourceReplaceFrom == "" { + return filepath.FromSlash(sourcePath), nil + } + + // If SourceReplaceFrom is set but SourceReplaceTo is empty, + // use source path as-is (same volume mounting scenario) + if c.SourceReplaceTo == "" { + return filepath.FromSlash(sourcePath), nil + } + + // Apply source replacement pattern + sourcePathNorm := filepath.ToSlash(sourcePath) + sourceReplaceFromNorm := filepath.ToSlash(c.SourceReplaceFrom) + + if !strings.HasPrefix(sourcePathNorm, sourceReplaceFromNorm) { + return "", fmt.Errorf("source path %s does not start with replacement pattern %s", sourcePath, c.SourceReplaceFrom) + } + + relativePath := strings.TrimPrefix(sourcePathNorm, sourceReplaceFromNorm) + relativePath = strings.TrimPrefix(relativePath, "/") + + localPath := filepath.Join(c.SourceReplaceTo, relativePath) + return localPath, nil +} + +// MapLocalPathToDest converts a local filesystem path to a destination server path +func (c *Config) MapLocalPathToDest(localPath string) (string, error) { + if localPath == "" { + return "", fmt.Errorf("local path is empty") + } + + if c.DestRootDir == "" { + return "", fmt.Errorf("destination root directory not configured") + } + + var relativePath string + + if c.SourceReplaceTo != "" { + // Standard case: strip SourceReplaceTo prefix from local path + localPathNorm := filepath.ToSlash(localPath) + sourceReplaceToNorm := filepath.ToSlash(c.SourceReplaceTo) + + if !strings.HasPrefix(localPathNorm, sourceReplaceToNorm) { + return "", fmt.Errorf("local path %s does not start with source replacement root %s", localPath, c.SourceReplaceTo) + } + + relativePath = strings.TrimPrefix(localPathNorm, sourceReplaceToNorm) + relativePath = strings.TrimPrefix(relativePath, "/") + } else if c.SourceReplaceFrom != "" { + // Same volume mounting: strip SourceReplaceFrom prefix to get relative path + localPathNorm := filepath.ToSlash(localPath) + sourceReplaceFromNorm := filepath.ToSlash(c.SourceReplaceFrom) + + if !strings.HasPrefix(localPathNorm, sourceReplaceFromNorm) { + return "", fmt.Errorf("local path %s does not start with source replacement pattern %s", localPath, c.SourceReplaceFrom) + } + + relativePath = strings.TrimPrefix(localPathNorm, sourceReplaceFromNorm) + relativePath = strings.TrimPrefix(relativePath, "/") + } else { + // Fallback: use just the filename (preserves original behavior) + relativePath = filepath.Base(localPath) + } + + destPath := strings.TrimSuffix(c.DestRootDir, "/") + "/" + relativePath + return destPath, nil +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go index cbdd9b5..6505d87 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -19,8 +19,8 @@ func TestLoadConfig(t *testing.T) { "DEST_PLEX_PROTOCOL": "http", "SYNC_LABEL": "test-sync", "SYNC_INTERVAL": "30", - "OPT_SSH_USER": "testuser", - "OPT_SSH_KEY_PATH": "/test/keys/id_rsa", + "SSH_USER": "testuser", + "SSH_KEY_PATH": "/test/keys/id_rsa", "DEST_ROOT_DIR": "/test/dest", "LOG_LEVEL": "DEBUG", "DRY_RUN": "true", diff --git a/internal/orchestrator/sync.go b/internal/orchestrator/sync.go index 58ed09c..6d170b7 100644 --- a/internal/orchestrator/sync.go +++ b/internal/orchestrator/sync.go @@ -56,16 +56,33 @@ func NewSyncOrchestrator(cfg *config.Config, log *logger.Logger) (*SyncOrchestra // Initialize content discovery (Phase 1 & 2) orchestrator.contentDiscovery = discovery.NewContentDiscovery(sourceClient, cfg.SyncLabel, log) - // Phase 3: Transfer Files - Auto-detect optimal transfer method + // Phase 3: Transfer Files - Use configured or auto-detect optimal transfer method if isSSHConfigured(cfg.SSH, log) { - // Auto-detect optimal transfer method (rsync preferred for performance) - transferMethod := transfer.GetOptimalTransferMethod(log) + var transferMethod transfer.TransferMethod + + // Check if user specified a transfer method via environment variable + if cfg.TransferMethod != "" { + switch cfg.TransferMethod { + case "rsync": + transferMethod = transfer.TransferMethodRsync + log.WithField("method", "rsync").Info("Using user-configured transfer method") + case "scp": + transferMethod = transfer.TransferMethodSCP + log.WithField("method", "scp").Info("Using user-configured transfer method") + default: + log.WithField("invalid_method", cfg.TransferMethod).Warn("Invalid TRANSFER_METHOD specified, falling back to auto-detection") + transferMethod = transfer.GetOptimalTransferMethod(log) + } + } else { + // Auto-detect optimal method (rsync preferred for performance) + transferMethod = transfer.GetOptimalTransferMethod(log) + } + fileTransfer, err := transfer.NewTransferrer(transferMethod, cfg, log) if err != nil { return nil, fmt.Errorf("failed to create file transferrer: %w", err) } orchestrator.fileTransfer = fileTransfer - log.WithField("transfer_method", string(transferMethod)).Info("High-performance file transfer enabled") } else { log.Info("SSH not configured - running in metadata-only sync mode") } @@ -254,7 +271,7 @@ func (s *SyncOrchestrator) transferEnhancedItemFiles(enhancedItem *discovery.Enh } // Map source Plex path to local path - localPath, err := s.fileTransfer.MapSourcePathToLocal(sourcePath) + localPath, err := s.config.MapSourcePathToLocal(sourcePath) if err != nil { s.logger.WithError(err).WithField("source_path", sourcePath).Error("Failed to map source path to local path") continue @@ -267,7 +284,7 @@ func (s *SyncOrchestrator) transferEnhancedItemFiles(enhancedItem *discovery.Enh } // Map local path to destination path - destPath, err := s.fileTransfer.MapLocalPathToDest(localPath) + destPath, err := s.config.MapLocalPathToDest(localPath) if err != nil { s.logger.WithError(err).WithField("local_path", localPath).Error("Failed to map local path to destination path") continue @@ -312,7 +329,7 @@ func (s *SyncOrchestrator) findRelatedFiles(mainFilePath string) []string { prefix := filename[:dotIndex] // Map source path to local path for directory listing - localDir, err := s.fileTransfer.MapSourcePathToLocal(dir) + localDir, err := s.config.MapSourcePathToLocal(dir) if err != nil { s.logger.WithError(err).WithField("source_dir", dir).Debug("Failed to map source directory to local path") return allPaths diff --git a/internal/plex/types.go b/internal/plex/types.go index 4e13092..b94241f 100644 --- a/internal/plex/types.go +++ b/internal/plex/types.go @@ -4,6 +4,7 @@ import ( "encoding/json" "encoding/xml" "fmt" + "strconv" ) // Library represents a Plex library @@ -120,7 +121,7 @@ type TVShow struct { UseOriginalTitle int `json:"useOriginalTitle,omitempty"` AudioLanguage string `json:"audioLanguage,omitempty"` SubtitleLanguage string `json:"subtitleLanguage,omitempty"` - SubtitleMode int `json:"subtitleMode,omitempty"` + SubtitleMode FlexibleInt `json:"subtitleMode,omitempty"` AutoDeletionItemPolicyUnwatchedLibrary int `json:"autoDeletionItemPolicyUnwatchedLibrary,omitempty"` AutoDeletionItemPolicyWatchedLibrary int `json:"autoDeletionItemPolicyWatchedLibrary,omitempty"` Slug string `json:"slug,omitempty"` @@ -269,6 +270,39 @@ func (fr FlexibleRating) MarshalJSON() ([]byte, error) { return json.Marshal(fr.Value) } +// FlexibleInt can handle both string and integer values +type FlexibleInt struct { + Value int +} + +// UnmarshalJSON implements custom JSON unmarshaling for FlexibleInt +func (fi *FlexibleInt) UnmarshalJSON(data []byte) error { + // Try to unmarshal as an integer first + var intValue int + if err := json.Unmarshal(data, &intValue); err == nil { + fi.Value = intValue + return nil + } + + // Try to unmarshal as a string and parse it as an integer + var stringValue string + if err := json.Unmarshal(data, &stringValue); err == nil { + if parsedInt, parseErr := strconv.Atoi(stringValue); parseErr == nil { + fi.Value = parsedInt + return nil + } + } + + // If both fail, set to 0 + fi.Value = 0 + return nil +} + +// MarshalJSON implements custom JSON marshaling for FlexibleInt +func (fi FlexibleInt) MarshalJSON() ([]byte, error) { + return json.Marshal(fi.Value) +} + // MediaContainer holds metadata for movies or TV shows type MediaContainer struct { Size int `json:"size"` diff --git a/internal/transfer/rsync.go b/internal/transfer/rsync.go index ebe80f4..59d5e22 100644 --- a/internal/transfer/rsync.go +++ b/internal/transfer/rsync.go @@ -43,10 +43,7 @@ func newRsyncTransfer(cfg *config.Config, log *logger.Logger) (*RsyncTransfer, e // doTransferFile transfers a single file using rsync (internal implementation without common logic) func (r *RsyncTransfer) doTransferFile(sourcePath, destPath string) error { - // Ensure destination directory exists - if err := r.ensureDestinationDir(destPath); err != nil { - return fmt.Errorf("failed to create destination directory: %w", err) - } + // Directory creation is now handled by the common transferrer before calling this method // Build rsync command with optimizations args := r.buildRsyncArgs(sourcePath, destPath) @@ -65,12 +62,16 @@ func (r *RsyncTransfer) doTransferFile(sourcePath, destPath string) error { return fmt.Errorf("rsync failed: %w", err) } - return nil -} + // Check if rsync actually transferred data or skipped the file + if r.isFileSkipped(string(output), sourcePath) { + r.logger.WithFields(map[string]interface{}{ + "source_path": sourcePath, + "dest_path": destPath, + }).Debug("File was skipped by rsync (already up-to-date)") + return nil // Not an error, just skipped + } -// TransferFile transfers a single file using rsync (public interface for backward compatibility) -func (r *RsyncTransfer) TransferFile(sourcePath, destPath string) error { - return r.doTransferFile(sourcePath, destPath) + return nil } // doTransferFiles transfers multiple files using rsync (internal implementation) @@ -92,13 +93,16 @@ func (r *RsyncTransfer) doTransferFiles(files []types.FileTransfer) error { // buildRsyncArgs builds optimized rsync arguments func (r *RsyncTransfer) buildRsyncArgs(sourcePath, destPath string) []string { remoteHost := fmt.Sprintf("%s@%s", r.sshConfig.User, r.serverConfig.Host) + + // For exec.Command, we don't need shell quoting - Go handles argument separation remoteDest := fmt.Sprintf("%s:%s", remoteHost, destPath) args := []string{ - "-avz", // Archive mode, verbose, compression - "--progress", // Show progress - "--partial", // Keep partial transfers - "--inplace", // Update files in place (faster for large files) + "-avz", // Archive mode, verbose, compression + "--progress", // Show progress + "--partial", // Keep partial transfers + "--inplace", // Update files in place (faster for large files) + "--itemize-changes", // Show detailed changes (helps detect skips) } // Compression settings @@ -143,6 +147,55 @@ func (r *RsyncTransfer) buildRsyncArgs(sourcePath, destPath string) []string { return args } +// isFileSkipped analyzes rsync output to determine if the file was skipped (not transferred) +func (r *RsyncTransfer) isFileSkipped(output, sourcePath string) bool { + outputLines := strings.Split(output, "\n") + filename := filepath.Base(sourcePath) + + // Look for itemize-changes output: lines starting with itemize codes + // If file was transferred, we'd see something like ">f+++++++++" (new file) or ">f.st......" (updated file) + // If file was skipped, there will be no itemize line for this file, or minimal output + + hasItemizeOutput := false + hasTransferProgress := false + + for _, line := range outputLines { + line = strings.TrimSpace(line) + + // Check for itemize-changes output (indicates actual changes) + if strings.HasPrefix(line, ">f") && strings.Contains(line, filename) { + hasItemizeOutput = true + r.logger.WithFields(map[string]interface{}{ + "itemize_line": line, + "filename": filename, + }).Debug("Detected rsync itemize output indicating file transfer") + } + + // Check for progress output (indicates data transfer) + if strings.Contains(line, filename) && (strings.Contains(line, "bytes/sec") || strings.Contains(line, "%")) { + hasTransferProgress = true + r.logger.WithFields(map[string]interface{}{ + "progress_line": line, + "filename": filename, + }).Debug("Detected rsync progress output indicating file transfer") + } + } + + // If we have no itemize output AND no progress, the file was likely skipped + fileSkipped := !hasItemizeOutput && !hasTransferProgress + + if fileSkipped { + r.logger.WithFields(map[string]interface{}{ + "filename": filename, + "has_itemize_output": hasItemizeOutput, + "has_transfer_progress": hasTransferProgress, + "rsync_output": output, + }).Debug("File appears to have been skipped by rsync (up-to-date)") + } + + return fileSkipped +} + // transferFilesBatch transfers multiple files in batches for efficiency func (r *RsyncTransfer) transferFilesBatch(files []types.FileTransfer) error { // Group files by directory for more efficient transfers @@ -179,12 +232,7 @@ func (r *RsyncTransfer) transferDirectoryBatch(sourceDir string, files []types.F // Use first file's destination to determine target directory destDir := filepath.Dir(files[0].DestPath) - // Ensure destination directory exists for all files in batch - for _, file := range files { - if err := r.ensureDestinationDir(file.DestPath); err != nil { - return fmt.Errorf("failed to create destination directory for %s: %w", file.DestPath, err) - } - } + // Directory creation is now handled by the common transferrer before calling transfer methods remoteHost := fmt.Sprintf("%s@%s", r.sshConfig.User, r.serverConfig.Host) remoteDest := fmt.Sprintf("%s:%s/", remoteHost, destDir) @@ -267,248 +315,3 @@ func (r *RsyncTransfer) createIncludeFile(baseDir string, files []types.FileTran func (r *RsyncTransfer) TransferFiles(files []types.FileTransfer) error { return r.doTransferFiles(files) } - -// Internal interface methods (lowercase for internalTransferer interface) -func (r *RsyncTransfer) fileExists(path string) (bool, error) { - return r.FileExists(path) -} - -func (r *RsyncTransfer) getFileSize(path string) (int64, error) { - return r.GetFileSize(path) -} - -func (r *RsyncTransfer) deleteFile(path string) error { - return r.DeleteFile(path) -} - -func (r *RsyncTransfer) listDirectoryContents(rootPath string) ([]string, error) { - return r.ListDirectoryContents(rootPath) -} - -func (r *RsyncTransfer) mapSourcePathToLocal(sourcePath string) (string, error) { - return r.MapSourcePathToLocal(sourcePath) -} - -func (r *RsyncTransfer) mapLocalPathToDest(localPath string) (string, error) { - return r.MapLocalPathToDest(localPath) -} - -func (r *RsyncTransfer) close() error { - return r.Close() -} - -// Close is a no-op for rsync (no persistent connections) -func (r *RsyncTransfer) Close() error { - return nil -} - -// DeleteFile deletes a file on the remote server using SSH -func (r *RsyncTransfer) DeleteFile(path string) error { - remoteHost := fmt.Sprintf("%s@%s", r.sshConfig.User, r.serverConfig.Host) - - sshCmd := []string{ - "ssh", - "-o", "StrictHostKeyChecking=no", - "-o", "ConnectTimeout=10", - } - - if r.sshConfig.Port != "" && r.sshConfig.Port != "22" { - sshCmd = append(sshCmd, "-p", r.sshConfig.Port) - } - - sshCmd = append(sshCmd, remoteHost, fmt.Sprintf("rm -f '%s'", path)) - - cmd := exec.Command(sshCmd[0], sshCmd[1:]...) - return cmd.Run() -} - -// ListDirectoryContents recursively lists all files in a directory -func (r *RsyncTransfer) ListDirectoryContents(rootPath string) ([]string, error) { - remoteHost := fmt.Sprintf("%s@%s", r.sshConfig.User, r.serverConfig.Host) - - sshCmd := []string{ - "ssh", - "-o", "StrictHostKeyChecking=no", - "-o", "ConnectTimeout=10", - } - - if r.sshConfig.Port != "" && r.sshConfig.Port != "22" { - sshCmd = append(sshCmd, "-p", r.sshConfig.Port) - } - - sshCmd = append(sshCmd, remoteHost, fmt.Sprintf("find '%s' -type f", rootPath)) - - cmd := exec.Command(sshCmd[0], sshCmd[1:]...) - output, err := cmd.Output() - if err != nil { - return nil, err - } - - files := strings.Split(strings.TrimSpace(string(output)), "\n") - if len(files) == 1 && files[0] == "" { - return []string{}, nil - } - - r.logger.WithFields(map[string]interface{}{ - "root_path": rootPath, - "file_count": len(files), - }).Debug("Listed directory contents") - - return files, nil -} - -// FileExists checks if a file exists on the remote server using SSH -func (r *RsyncTransfer) FileExists(path string) (bool, error) { - remoteHost := fmt.Sprintf("%s@%s", r.sshConfig.User, r.serverConfig.Host) - - sshCmd := []string{ - "ssh", - "-o", "StrictHostKeyChecking=no", - "-o", "ConnectTimeout=10", - } - - if r.sshConfig.Port != "" && r.sshConfig.Port != "22" { - sshCmd = append(sshCmd, "-p", r.sshConfig.Port) - } - - sshCmd = append(sshCmd, remoteHost, fmt.Sprintf("test -f '%s'", path)) - - cmd := exec.Command(sshCmd[0], sshCmd[1:]...) - err := cmd.Run() - - return err == nil, nil -} - -// GetFileSize returns the size of a remote file -func (r *RsyncTransfer) GetFileSize(path string) (int64, error) { - remoteHost := fmt.Sprintf("%s@%s", r.sshConfig.User, r.serverConfig.Host) - - sshCmd := []string{ - "ssh", - "-o", "StrictHostKeyChecking=no", - "-o", "ConnectTimeout=10", - } - - if r.sshConfig.Port != "" && r.sshConfig.Port != "22" { - sshCmd = append(sshCmd, "-p", r.sshConfig.Port) - } - - sshCmd = append(sshCmd, remoteHost, fmt.Sprintf("stat -f%%z '%s' 2>/dev/null || stat -c%%s '%s'", path, path)) - - cmd := exec.Command(sshCmd[0], sshCmd[1:]...) - output, err := cmd.Output() - if err != nil { - return 0, err - } - - var size int64 - if _, err := fmt.Sscanf(strings.TrimSpace(string(output)), "%d", &size); err != nil { - return 0, err - } - - return size, nil -} - -// MapSourcePathToLocal converts a source Plex server path to a local filesystem path -func (r *RsyncTransfer) MapSourcePathToLocal(sourcePath string) (string, error) { - if sourcePath == "" { - return "", fmt.Errorf("source path is empty") - } - - // If no source replacement configured, use the Plex path as-is - if r.sourceReplaceFrom == "" || r.sourceReplaceTo == "" { - return filepath.FromSlash(sourcePath), nil - } - - // Apply source replacement pattern - sourcePathNorm := filepath.ToSlash(sourcePath) - sourceReplaceFromNorm := filepath.ToSlash(r.sourceReplaceFrom) - - if !strings.HasPrefix(sourcePathNorm, sourceReplaceFromNorm) { - return "", fmt.Errorf("source path %s does not start with replacement pattern %s", sourcePath, r.sourceReplaceFrom) - } - - relativePath := strings.TrimPrefix(sourcePathNorm, sourceReplaceFromNorm) - relativePath = strings.TrimPrefix(relativePath, "/") - - localPath := filepath.Join(r.sourceReplaceTo, relativePath) - return localPath, nil -} - -// MapLocalPathToDest converts a local filesystem path to a destination server path -func (r *RsyncTransfer) MapLocalPathToDest(localPath string) (string, error) { - if localPath == "" { - return "", fmt.Errorf("local path is empty") - } - - if r.destRootDir == "" { - return "", fmt.Errorf("destination root directory not configured") - } - - var relativePath string - - if r.sourceReplaceTo != "" { - localPathNorm := filepath.ToSlash(localPath) - sourceReplaceToNorm := filepath.ToSlash(r.sourceReplaceTo) - - if !strings.HasPrefix(localPathNorm, sourceReplaceToNorm) { - return "", fmt.Errorf("local path %s does not start with source replacement root %s", localPath, r.sourceReplaceTo) - } - - relativePath = strings.TrimPrefix(localPathNorm, sourceReplaceToNorm) - relativePath = strings.TrimPrefix(relativePath, "/") - } else { - relativePath = filepath.Base(localPath) - } - - destPath := strings.TrimSuffix(r.destRootDir, "/") + "/" + relativePath - return destPath, nil -} - -// ensureDestinationDir creates the destination directory on the remote server if it doesn't exist -func (r *RsyncTransfer) ensureDestinationDir(destPath string) error { - // Extract directory from destination path - destDir := filepath.Dir(destPath) - - // Build SSH command to create directory - remoteHost := fmt.Sprintf("%s@%s", r.sshConfig.User, r.serverConfig.Host) - mkdirCmd := fmt.Sprintf("mkdir -p '%s'", destDir) - - // SSH options - sshOpts := []string{ - "-o", "StrictHostKeyChecking=no", - "-o", "ConnectTimeout=10", - } - - if r.sshConfig.Port != "" && r.sshConfig.Port != "22" { - sshOpts = append(sshOpts, "-p", r.sshConfig.Port) - } - - // Build command with authentication - var cmd *exec.Cmd - if r.sshConfig.Password != "" { - // Use sshpass for password authentication - args := append([]string{"-p", r.sshConfig.Password, "ssh"}, sshOpts...) - args = append(args, remoteHost, mkdirCmd) - cmd = exec.Command("sshpass", args...) - r.logger.WithField("dest_dir", destDir).Debug("Creating remote directory with sshpass") - } else { - // Use SSH key-based authentication - args := append(sshOpts, remoteHost, mkdirCmd) - cmd = exec.Command("ssh", args...) - r.logger.WithField("dest_dir", destDir).Debug("Creating remote directory with SSH keys") - } - - output, err := cmd.CombinedOutput() - if err != nil { - r.logger.WithFields(map[string]interface{}{ - "dest_dir": destDir, - "output": string(output), - }).Warn("Failed to create remote directory (may already exist)") - // Don't return error - directory might already exist - } else { - r.logger.WithField("dest_dir", destDir).Debug("Remote directory created successfully") - } - - return nil -} diff --git a/internal/transfer/scp.go b/internal/transfer/scp.go index 9857c50..8032d60 100644 --- a/internal/transfer/scp.go +++ b/internal/transfer/scp.go @@ -2,600 +2,105 @@ package transfer import ( "fmt" - "io" - "os" - "path/filepath" + "os/exec" "strings" - "sync" - "time" "github.com/nullable-eth/syncarr/internal/config" "github.com/nullable-eth/syncarr/internal/logger" "github.com/nullable-eth/syncarr/pkg/types" - "github.com/pkg/sftp" - "golang.org/x/crypto/ssh" ) -// SCPTransfer handles file transfers using SCP over SSH +// SCPTransfer handles file transfers using actual SCP commands over SSH type SCPTransfer struct { sshConfig *config.SSHConfig serverConfig *config.PlexServerConfig - sourceReplaceFrom string // Optional: Source path pattern to replace - sourceReplaceTo string // Optional: Local path replacement - destRootDir string // Required: Destination root directory + sourceReplaceFrom string + sourceReplaceTo string + destRootDir string logger *logger.Logger - sshClient *ssh.Client - sftpClient *sftp.Client - bufferSize int // Buffer size for transfers - maxConcurrent int // Maximum concurrent transfers } // newSCPTransfer creates a new SCP transfer instance (package-private) func newSCPTransfer(cfg *config.Config, log *logger.Logger) (*SCPTransfer, error) { - transfer := &SCPTransfer{ + return &SCPTransfer{ sshConfig: &cfg.SSH, serverConfig: &cfg.Destination, sourceReplaceFrom: cfg.SourceReplaceFrom, sourceReplaceTo: cfg.SourceReplaceTo, destRootDir: cfg.DestRootDir, logger: log, - bufferSize: 1024 * 1024, // 1MB buffer for better performance - maxConcurrent: 3, // Allow up to 3 concurrent transfers - } - - // Establish SSH connection - if err := transfer.connect(); err != nil { - return nil, fmt.Errorf("failed to establish SSH connection: %w", err) - } - - return transfer, nil + }, nil } -// connect establishes SSH and SFTP connections -func (s *SCPTransfer) connect() error { - // Create SSH client configuration with password authentication and optimized settings - sshClientConfig := &ssh.ClientConfig{ - User: s.sshConfig.User, - Auth: []ssh.AuthMethod{ - ssh.Password(s.sshConfig.Password), - }, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), // For simplicity, ignore host key verification - Timeout: 30 * time.Second, - // Optimize for high throughput - Config: ssh.Config{ - Ciphers: []string{ - "aes128-ctr", "aes192-ctr", "aes256-ctr", // Faster AES-CTR ciphers - "aes128-gcm@openssh.com", "aes256-gcm@openssh.com", - }, - }, - } - - // Connect to SSH server using destination Plex server host - sshAddr := fmt.Sprintf("%s:%s", s.serverConfig.Host, s.sshConfig.Port) - sshClient, err := ssh.Dial("tcp", sshAddr, sshClientConfig) - if err != nil { - return fmt.Errorf("failed to connect to SSH server %s: %w", sshAddr, err) - } - s.sshClient = sshClient - - // Create SFTP client - sftpClient, err := sftp.NewClient(sshClient) - if err != nil { - if closeErr := sshClient.Close(); closeErr != nil { - s.logger.WithError(closeErr).Warn("Failed to close SSH client after SFTP creation error") - } - return fmt.Errorf("failed to create SFTP client: %w", err) - } - s.sftpClient = sftpClient - - s.logger.WithField("ssh_host", sshAddr).Info("Successfully connected to SSH/SFTP server") - return nil -} - -// Close closes the SSH and SFTP connections -func (s *SCPTransfer) Close() error { - var errs []error - - if s.sftpClient != nil { - if err := s.sftpClient.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close SFTP client: %w", err)) - } - } - - if s.sshClient != nil { - if err := s.sshClient.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close SSH client: %w", err)) - } - } - - if len(errs) > 0 { - return fmt.Errorf("errors closing connections: %v", errs) - } - - return nil -} - -// doTransferFile transfers a single file from source to destination (internal implementation without common logic) +// doTransferFile transfers a single file using actual SCP command func (s *SCPTransfer) doTransferFile(sourcePath, destPath string) error { - // Get source file info first - fileInfo, err := os.Stat(sourcePath) - if err != nil { - return fmt.Errorf("failed to stat source file: %w", err) - } - - // Create destination directory if it doesn't exist - // Use forward slashes for remote paths (SFTP always uses Unix-style paths) - lastSlash := strings.LastIndex(destPath, "/") - if lastSlash == -1 { - return fmt.Errorf("invalid destination path format: %s", destPath) - } - destDir := destPath[:lastSlash] - - s.logger.WithFields(map[string]interface{}{ - "dest_dir": destDir, - "dest_path": destPath, - }).Debug("Creating destination directory") - - if err := s.sftpClient.MkdirAll(destDir); err != nil { - return fmt.Errorf("failed to create destination directory %s: %w", destDir, err) - } - - // Verify directory was created successfully - if dirInfo, err := s.sftpClient.Stat(destDir); err != nil { - return fmt.Errorf("destination directory %s was not created successfully: %w", destDir, err) - } else if !dirInfo.IsDir() { - return fmt.Errorf("destination path %s exists but is not a directory", destDir) - } - - s.logger.WithField("dest_dir", destDir).Debug("Destination directory verified") - - // Test write permissions by trying to create a temporary test file - testFilePath := destDir + "/.sync_test_" + fmt.Sprintf("%d", time.Now().UnixNano()) - if testFile, testErr := s.sftpClient.Create(testFilePath); testErr != nil { - s.logger.WithFields(map[string]interface{}{ - "dest_dir": destDir, - "test_file_path": testFilePath, - "test_error": testErr.Error(), - }).Error("Cannot create test file in destination directory - permissions issue?") + // Directory creation is now handled by the common transferrer before calling this method + + // Build SCP command + args := s.buildSCPArgs(sourcePath, destPath) + + var cmd *exec.Cmd + if s.sshConfig.Password != "" { + // Use sshpass for password authentication + sshpassArgs := []string{"-p", s.sshConfig.Password, "scp"} + sshpassArgs = append(sshpassArgs, args...) + cmd = exec.Command("sshpass", sshpassArgs...) + s.logger.Debug("Using sshpass for SCP password authentication") } else { - testFile.Close() - if removeErr := s.sftpClient.Remove(testFilePath); removeErr != nil { - s.logger.WithFields(map[string]interface{}{ - "test_file_path": testFilePath, - "remove_error": removeErr.Error(), - }).Warn("Failed to clean up test file") - } - s.logger.WithField("dest_dir", destDir).Debug("Write permissions verified with test file") - } - - // Open source file - srcFile, err := os.Open(sourcePath) - if err != nil { - return fmt.Errorf("failed to open source file: %w", err) - } - defer srcFile.Close() - - // Create destination file with more detailed error context - s.logger.WithFields(map[string]interface{}{ - "dest_path": destPath, - "dest_dir": destDir, - }).Debug("Creating destination file") - - dstFile, err := s.sftpClient.Create(destPath) - if err != nil { - // Check if directory exists to provide better error context - if dirInfo, dirErr := s.sftpClient.Stat(destDir); dirErr != nil { - return fmt.Errorf("failed to create destination file %s: destination directory %s does not exist or is not accessible: %w (original error: %v)", destPath, destDir, dirErr, err) - } else if !dirInfo.IsDir() { - return fmt.Errorf("failed to create destination file %s: %s exists but is not a directory: %w", destPath, destDir, err) - } else { - // Directory exists, let's get more debugging info - s.logger.WithFields(map[string]interface{}{ - "dest_dir": destDir, - "dir_mode": dirInfo.Mode().String(), - "dir_size": dirInfo.Size(), - "create_error": err.Error(), - }).Error("Directory exists but file creation failed") - - // Try to list directory contents for debugging - if entries, listErr := s.sftpClient.ReadDir(destDir); listErr != nil { - s.logger.WithError(listErr).WithField("dest_dir", destDir).Debug("Could not list directory contents") - } else { - s.logger.WithFields(map[string]interface{}{ - "dest_dir": destDir, - "entry_count": len(entries), - }).Debug("Directory contents listed successfully") - } - - return fmt.Errorf("failed to create destination file %s in existing directory %s (mode: %s): %w", destPath, destDir, dirInfo.Mode().String(), err) - } + // Use regular SCP (key-based auth) + cmd = exec.Command("scp", args...) } - defer dstFile.Close() - // Copy file contents with optimized buffer - buffer := make([]byte, s.bufferSize) - bytesTransferred, err := io.CopyBuffer(dstFile, srcFile, buffer) + // Capture output for debugging + output, err := cmd.CombinedOutput() if err != nil { - return fmt.Errorf("failed to copy file contents: %w", err) - } - - // Verify file size - if bytesTransferred != fileInfo.Size() { - return fmt.Errorf("file size mismatch: expected %d, transferred %d", - fileInfo.Size(), bytesTransferred) + s.logger.WithFields(map[string]interface{}{ + "source_path": sourcePath, + "dest_path": destPath, + "scp_args": strings.Join(args, " "), + "output": string(output), + }).Error("SCP command failed") + return fmt.Errorf("scp failed: %w", err) } return nil } -// TransferFile implements Phase 2: Single File Transfer (public interface for backward compatibility) -func (s *SCPTransfer) TransferFile(sourcePath, destPath string) error { - return s.doTransferFile(sourcePath, destPath) -} - -// TransferItemFiles implements Phase 3: Directory-Based File Transfer -// Copy all files in the containing directories (including subtitles) to the destination server -func (s *SCPTransfer) TransferItemFiles(item *types.SyncableItem) error { - s.logger.WithField("item", s.getItemIdentifier(item)).Info("Phase 3: Starting directory-based file transfer") +// Note: escapeShellPath removed - not needed for exec.Command as Go handles argument separation - // TODO: Uncomment when plexgo library implements proper Media and Part structures for file paths - // filePaths, err := s.getItemFilePaths(item) - // if err != nil { - // return fmt.Errorf("failed to get file paths for item: %w", err) - // } +// buildSCPArgs builds the SCP command arguments +func (s *SCPTransfer) buildSCPArgs(sourcePath, destPath string) []string { + remoteHost := fmt.Sprintf("%s@%s", s.sshConfig.User, s.serverConfig.Host) - // PLACEHOLDER: Generate placeholder file paths until proper implementation - filePaths := s.generatePlaceholderFilePaths(item) + // For exec.Command, we don't need shell quoting - Go handles argument separation + // The remote destination still needs proper formatting for the remote shell + remoteDest := fmt.Sprintf("%s:%s", remoteHost, destPath) - if len(filePaths) == 0 { - s.logger.WithField("item", s.getItemIdentifier(item)).Warn("No file paths found for item") - return nil + args := []string{ + "-o", "StrictHostKeyChecking=no", + "-o", "UserKnownHostsFile=/dev/null", + "-o", "ConnectTimeout=30", + "-C", // Enable compression } - // Transfer entire directories containing the files - processedDirs := make(map[string]bool) - - for _, filePath := range filePaths { - sourceDir := filepath.Dir(filePath) - - // Skip if we've already processed this directory - if processedDirs[sourceDir] { - continue - } - processedDirs[sourceDir] = true - - destDir := s.calculateDestPath(sourceDir) - - s.logger.WithFields(map[string]interface{}{ - "source_dir": sourceDir, - "dest_dir": destDir, - }).Info("Transferring entire directory (includes subtitles, extras, etc.)") - - // Copy entire directory (includes subtitles, extras, etc.) - if err := s.CopyDirectory(sourceDir, destDir); err != nil { - return fmt.Errorf("failed to copy directory %s to %s: %w", sourceDir, destDir, err) - } + // Add port if specified + if s.sshConfig.Port != "" && s.sshConfig.Port != "22" { + args = append(args, "-P", s.sshConfig.Port) } - s.logger.WithField("item", s.getItemIdentifier(item)).Info("Phase 3: Directory-based file transfer complete") - return nil + // Add source path and remote destination - no quotes needed for exec.Command + args = append(args, sourcePath, remoteDest) + return args } -// doTransferFiles transfers multiple files (internal implementation) +// doTransferFiles transfers multiple files using SCP func (s *SCPTransfer) doTransferFiles(files []types.FileTransfer) error { + // SCP can handle multiple files in one command, but for simplicity and error handling, + // we'll transfer them individually for _, file := range files { if err := s.doTransferFile(file.SourcePath, file.DestPath); err != nil { - s.logger.LogError(err, map[string]interface{}{ - "source_path": file.SourcePath, - "dest_path": file.DestPath, - }) return err } } return nil } - -// TransferFiles transfers multiple files (public interface for backward compatibility) -func (s *SCPTransfer) TransferFiles(files []types.FileTransfer) error { - return s.doTransferFiles(files) -} - -// Internal interface methods (lowercase for internalTransferer interface) -func (s *SCPTransfer) fileExists(path string) (bool, error) { - return s.FileExists(path) -} - -func (s *SCPTransfer) getFileSize(path string) (int64, error) { - return s.GetFileSize(path) -} - -func (s *SCPTransfer) deleteFile(path string) error { - return s.DeleteFile(path) -} - -func (s *SCPTransfer) listDirectoryContents(rootPath string) ([]string, error) { - return s.ListDirectoryContents(rootPath) -} - -func (s *SCPTransfer) mapSourcePathToLocal(sourcePath string) (string, error) { - return s.MapSourcePathToLocal(sourcePath) -} - -func (s *SCPTransfer) mapLocalPathToDest(localPath string) (string, error) { - return s.MapLocalPathToDest(localPath) -} - -func (s *SCPTransfer) close() error { - return s.Close() -} - -// TransferFilesParallel transfers multiple files in parallel for better performance -func (s *SCPTransfer) TransferFilesParallel(files []types.FileTransfer) error { - if len(files) == 0 { - return nil - } - - // Use a semaphore to limit concurrent transfers - semaphore := make(chan struct{}, s.maxConcurrent) - errChan := make(chan error, len(files)) - - var wg sync.WaitGroup - - for _, file := range files { - wg.Add(1) - go func(f types.FileTransfer) { - defer wg.Done() - - // Acquire semaphore - semaphore <- struct{}{} - defer func() { <-semaphore }() - - if err := s.TransferFile(f.SourcePath, f.DestPath); err != nil { - s.logger.LogError(err, map[string]interface{}{ - "source_path": f.SourcePath, - "dest_path": f.DestPath, - }) - errChan <- err - return - } - }(file) - } - - // Wait for all transfers to complete - wg.Wait() - close(errChan) - - // Check for any errors - for err := range errChan { - if err != nil { - return err // Return first error encountered - } - } - - return nil -} - -// FileExists checks if a file exists on the remote server -func (s *SCPTransfer) FileExists(path string) (bool, error) { - _, err := s.sftpClient.Stat(path) - if err != nil { - if os.IsNotExist(err) { - return false, nil - } - return false, err - } - return true, nil -} - -// GetFileSize returns the size of a remote file -func (s *SCPTransfer) GetFileSize(path string) (int64, error) { - stat, err := s.sftpClient.Stat(path) - if err != nil { - return 0, err - } - return stat.Size(), nil -} - -// DeleteFile deletes a file on the remote server -func (s *SCPTransfer) DeleteFile(path string) error { - return s.sftpClient.Remove(path) -} - -// CreateDirectory creates a directory on the remote server -func (s *SCPTransfer) CreateDirectory(path string) error { - return s.sftpClient.MkdirAll(path) -} - -// ListDirectoryContents recursively lists all files in a directory -func (s *SCPTransfer) ListDirectoryContents(rootPath string) ([]string, error) { - var allFiles []string - - err := s.walkDirectory(rootPath, func(path string, info os.FileInfo) error { - if !info.IsDir() { - allFiles = append(allFiles, path) - } - return nil - }) - - if err != nil { - return nil, fmt.Errorf("failed to walk directory: %w", err) - } - - s.logger.WithFields(map[string]interface{}{ - "root_path": rootPath, - "file_count": len(allFiles), - }).Debug("Listed directory contents") - - return allFiles, nil -} - -// walkDirectory recursively walks a directory tree on the remote server -func (s *SCPTransfer) walkDirectory(path string, walkFunc func(path string, info os.FileInfo) error) error { - entries, err := s.sftpClient.ReadDir(path) - if err != nil { - return fmt.Errorf("failed to read directory %s: %w", path, err) - } - - for _, entry := range entries { - entryPath := strings.TrimRight(path, "/") + "/" + entry.Name() - - // Call the walk function for this entry - if err := walkFunc(entryPath, entry); err != nil { - return err - } - - // If it's a directory, recursively walk it - if entry.IsDir() { - if err := s.walkDirectory(entryPath, walkFunc); err != nil { - return err - } - } - } - - return nil -} - -// CopyDirectory copies an entire directory from source to destination -func (s *SCPTransfer) CopyDirectory(sourceDir, destDir string) error { - s.logger.WithFields(map[string]interface{}{ - "source_dir": sourceDir, - "dest_dir": destDir, - }).Info("Starting directory copy") - - // TODO: Implement recursive directory copying - // This would involve: - // 1. Walking the source directory tree - // 2. Creating destination directories - // 3. Copying all files including subtitles, extras, etc. - - // PLACEHOLDER: Just create the destination directory for now - if err := s.sftpClient.MkdirAll(destDir); err != nil { - return fmt.Errorf("failed to create destination directory: %w", err) - } - - s.logger.WithField("dest_dir", destDir).Warn("Directory copy not fully implemented - only created destination directory") - return nil -} - -// calculateDestPath generates the destination path for a source directory -func (s *SCPTransfer) calculateDestPath(sourceDir string) string { - // TODO: Implement proper path mapping based on configuration - // This should map source paths to destination paths based on: - // - DEST_MEDIA_PATH configuration - // - Library-specific path mappings - // - Volume mount configurations - - // PLACEHOLDER: Simple path mapping - baseName := filepath.Base(sourceDir) - destPath := filepath.Join("/media/sync", baseName) - - s.logger.WithFields(map[string]interface{}{ - "source_dir": sourceDir, - "dest_path": destPath, - }).Debug("Generated destination path (placeholder logic)") - - return destPath -} - -// generatePlaceholderFilePaths generates placeholder file paths for testing -func (s *SCPTransfer) generatePlaceholderFilePaths(item *types.SyncableItem) []string { - // PLACEHOLDER: Generate fake file paths until proper file path extraction is available - title := item.Title - if title == "" { - title = "unknown" - } - - // Generate placeholder paths - placeholderPaths := []string{ - fmt.Sprintf("/media/source/%s/%s.mkv", title, title), - fmt.Sprintf("/media/source/%s/%s.srt", title, title), // subtitle file - } - - s.logger.WithFields(map[string]interface{}{ - "item_title": title, - "placeholder_paths": placeholderPaths, - }).Debug("Generated placeholder file paths - not real file paths") - - return placeholderPaths -} - -// getItemIdentifier returns a string identifier for the item for logging purposes -func (s *SCPTransfer) getItemIdentifier(item *types.SyncableItem) string { - identifier := item.RatingKey - if identifier == "" { - identifier = item.Title - } - if identifier == "" { - identifier = "unknown" - } - - return identifier -} - -// MapSourcePathToLocal converts a source Plex server path to a local filesystem path -// If source replacement is configured, applies the pattern replacement -// Otherwise, uses the Plex path as-is (useful for mounted volumes) -func (s *SCPTransfer) MapSourcePathToLocal(sourcePath string) (string, error) { - if sourcePath == "" { - return "", fmt.Errorf("source path is empty") - } - - // If no source replacement configured, use the Plex path as-is - if s.sourceReplaceFrom == "" || s.sourceReplaceTo == "" { - // Convert to local path separators for the current OS - localPath := filepath.FromSlash(sourcePath) - return localPath, nil - } - - // Apply source replacement pattern - // Normalize paths for comparison (always use forward slashes) - sourcePathNorm := filepath.ToSlash(sourcePath) - sourceReplaceFromNorm := filepath.ToSlash(s.sourceReplaceFrom) - - // Check if the source path starts with the replacement pattern - if !strings.HasPrefix(sourcePathNorm, sourceReplaceFromNorm) { - return "", fmt.Errorf("source path %s does not start with replacement pattern %s", sourcePath, s.sourceReplaceFrom) - } - - // Remove the source pattern and replace with local pattern - relativePath := strings.TrimPrefix(sourcePathNorm, sourceReplaceFromNorm) - relativePath = strings.TrimPrefix(relativePath, "/") // Remove leading slash - - // Build local path with proper separators for the target OS - localPath := filepath.Join(s.sourceReplaceTo, relativePath) - return localPath, nil -} - -// MapLocalPathToDest converts a local filesystem path to a destination server path -func (s *SCPTransfer) MapLocalPathToDest(localPath string) (string, error) { - if localPath == "" { - return "", fmt.Errorf("local path is empty") - } - - if s.destRootDir == "" { - return "", fmt.Errorf("destination root directory not configured") - } - - // Extract the relative path from local path - var relativePath string - - if s.sourceReplaceTo != "" { - // If source replacement is configured, extract relative path from the replacement root - localPathNorm := filepath.ToSlash(localPath) - sourceReplaceToNorm := filepath.ToSlash(s.sourceReplaceTo) - - if !strings.HasPrefix(localPathNorm, sourceReplaceToNorm) { - return "", fmt.Errorf("local path %s does not start with source replacement root %s", localPath, s.sourceReplaceTo) - } - - relativePath = strings.TrimPrefix(localPathNorm, sourceReplaceToNorm) - relativePath = strings.TrimPrefix(relativePath, "/") // Remove leading slash - } else { - // If no source replacement, extract filename from the full path - relativePath = filepath.Base(localPath) - } - - // Build destination path (always use forward slashes for remote paths) - destPath := strings.TrimSuffix(s.destRootDir, "/") + "/" + relativePath - return destPath, nil -} diff --git a/internal/transfer/ssh.go b/internal/transfer/ssh.go new file mode 100644 index 0000000..46bdb28 --- /dev/null +++ b/internal/transfer/ssh.go @@ -0,0 +1,178 @@ +// Package transfer provides file transfer implementations for syncarr. +package transfer + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/nullable-eth/syncarr/internal/config" + "github.com/nullable-eth/syncarr/internal/logger" + "golang.org/x/crypto/ssh" +) + +// fileOperations defines the interface for SSH-based file operations +type fileOperations interface { + GetFileSize(path string) (int64, error) + DeleteFile(path string) error + ListDirectoryContents(rootPath string) ([]string, error) + CreateDirectory(path string) error + Close() error +} + +// sshClient handles all SSH-based file operations with persistent connection +type sshClient struct { + sshConfig *config.SSHConfig + serverConfig *config.PlexServerConfig + logger *logger.Logger + client *ssh.Client // Persistent SSH connection (reused for multiple sessions) +} + +// getSSHClient creates and returns an SSH client connection +func (s *sshClient) getSSHClient() (*ssh.Client, error) { + if s.client != nil { + return s.client, nil + } + + // Create SSH client config + config := &ssh.ClientConfig{ + User: s.sshConfig.User, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), // For simplicity, ignore host key verification + Timeout: 30 * time.Second, + } + + // Add authentication method + if s.sshConfig.Password != "" { + config.Auth = []ssh.AuthMethod{ + ssh.Password(s.sshConfig.Password), + } + } + + // Determine port + port := s.sshConfig.Port + if port == "" { + port = "22" + } + + // Connect to SSH server + addr := fmt.Sprintf("%s:%s", s.serverConfig.Host, port) + client, err := ssh.Dial("tcp", addr, config) + if err != nil { + return nil, fmt.Errorf("failed to connect to SSH server: %w", err) + } + + s.client = client + return client, nil +} + +// executeCommand executes a command using the persistent SSH connection (creates fresh session each time) +func (s *sshClient) executeCommand(cmd string) ([]byte, error) { + client, err := s.getSSHClient() + if err != nil { + return nil, err + } + + // Create a fresh session for this command (SSH protocol requirement) + session, err := client.NewSession() + if err != nil { + return nil, fmt.Errorf("failed to create SSH session: %w", err) + } + defer session.Close() + + output, err := session.Output(cmd) + if err != nil { + s.logger.WithFields(map[string]interface{}{ + "command": cmd, + "error": err.Error(), + }).Debug("SSH command failed") + return nil, err + } + + s.logger.WithField("command", cmd).Debug("SSH command executed successfully via reused connection") + return output, nil +} + +// GetFileSize returns the size of a remote file using persistent connection +func (s *sshClient) GetFileSize(path string) (int64, error) { + // Properly escape the path for shell execution + escapedPath := strings.ReplaceAll(path, "'", "'\"'\"'") + cmd := fmt.Sprintf("stat -f%%z '%s' 2>/dev/null || stat -c%%s '%s'", escapedPath, escapedPath) + + output, err := s.executeCommand(cmd) + if err != nil { + return 0, err + } + + size, err := strconv.ParseInt(strings.TrimSpace(string(output)), 10, 64) + if err != nil { + return 0, err + } + + return size, nil +} + +// DeleteFile deletes a file on the remote server using persistent connection +func (s *sshClient) DeleteFile(path string) error { + // Properly escape the path for shell execution + escapedPath := strings.ReplaceAll(path, "'", "'\"'\"'") + cmd := fmt.Sprintf("rm -f '%s'", escapedPath) + + _, err := s.executeCommand(cmd) + return err +} + +// ListDirectoryContents recursively lists all files in a directory using persistent connection +func (s *sshClient) ListDirectoryContents(rootPath string) ([]string, error) { + // Properly escape the path for shell execution + escapedRootPath := strings.ReplaceAll(rootPath, "'", "'\"'\"'") + cmd := fmt.Sprintf("find '%s' -type f", escapedRootPath) + + output, err := s.executeCommand(cmd) + if err != nil { + return nil, err + } + + files := strings.Split(strings.TrimSpace(string(output)), "\n") + if len(files) == 1 && files[0] == "" { + return []string{}, nil + } + + s.logger.WithFields(map[string]interface{}{ + "root_path": rootPath, + "file_count": len(files), + }).Debug("Listed directory contents via SSH") + + return files, nil +} + +// CreateDirectory creates a directory on the remote server using persistent connection +func (s *sshClient) CreateDirectory(path string) error { + // Properly escape the path for shell execution + escapedPath := strings.ReplaceAll(path, "'", "'\"'\"'") + cmd := fmt.Sprintf("mkdir -p '%s'", escapedPath) + + _, err := s.executeCommand(cmd) + if err != nil { + s.logger.WithFields(map[string]interface{}{ + "dest_dir": path, + "error": err.Error(), + }).Warn("Failed to create remote directory (may already exist)") + // Don't return error - directory might already exist + } else { + s.logger.WithField("dest_dir", path).Debug("Remote directory created successfully") + } + + return nil +} + +// Close closes the SSH connection +func (s *sshClient) Close() error { + if s.client != nil { + err := s.client.Close() + s.client = nil + s.logger.Debug("SSH client connection closed successfully") + return err + } + return nil +} diff --git a/internal/transfer/transfer.go b/internal/transfer/transfer.go index bef9981..1437697 100644 --- a/internal/transfer/transfer.go +++ b/internal/transfer/transfer.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "os/exec" + "path/filepath" "strings" "time" @@ -16,7 +17,7 @@ import ( type TransferMethod string const ( - TransferMethodSFTP TransferMethod = "sftp" + TransferMethodSCP TransferMethod = "scp" TransferMethodRsync TransferMethod = "rsync" ) @@ -25,94 +26,106 @@ type FileTransferrer interface { TransferFile(sourcePath, destPath string) error TransferFiles(files []types.FileTransfer) error Close() error - FileExists(path string) (bool, error) GetFileSize(path string) (int64, error) DeleteFile(path string) error ListDirectoryContents(rootPath string) ([]string, error) - MapSourcePathToLocal(sourcePath string) (string, error) - MapLocalPathToDest(localPath string) (string, error) } -// internalTransferer defines the interface for internal transfer implementations (rsync/scp) -// These only handle the actual transfer without common logic like file checks and logging -type internalTransferer interface { +// transferImplementation defines the interface for actual transfer implementations (rsync/scp only) +type transferImplementation interface { doTransferFile(sourcePath, destPath string) error doTransferFiles(files []types.FileTransfer) error - close() error - fileExists(path string) (bool, error) - getFileSize(path string) (int64, error) - deleteFile(path string) error - listDirectoryContents(rootPath string) ([]string, error) - mapSourcePathToLocal(sourcePath string) (string, error) - mapLocalPathToDest(localPath string) (string, error) } -// TransferClient is the unified client that handles common logic and delegates to internal implementations -type TransferClient struct { - method TransferMethod - internal internalTransferer - logger *logger.Logger - sshConfig *config.SSHConfig - serverConfig *config.PlexServerConfig +// transferClient is the unified client that handles common logic and delegates to internal implementations +type transferClient struct { + method TransferMethod + fileOps fileOperations + transfer transferImplementation + logger *logger.Logger +} + +// newSSHClient creates a new SSH client for file operations +func newSSHClient(cfg *config.Config, log *logger.Logger) (fileOperations, error) { + return &sshClient{ + sshConfig: &cfg.SSH, + serverConfig: &cfg.Destination, + logger: log, + }, nil } // NewTransferrer creates a new unified file transferrer that automatically chooses the best method func NewTransferrer(method TransferMethod, cfg *config.Config, log *logger.Logger) (FileTransferrer, error) { - var internal internalTransferer - var err error + // Create shared SSH client for all file operations + sshFileOps, err := newSSHClient(cfg, log) + if err != nil { + return nil, fmt.Errorf("failed to create SSH client: %w", err) + } + + // Create transfer implementation + var transferImpl transferImplementation switch method { - case TransferMethodSFTP: - internal, err = newSCPTransfer(cfg, log) + case TransferMethodSCP: + transferImpl, err = newSCPTransfer(cfg, log) + if err != nil { + return nil, fmt.Errorf("failed to create SCP transferrer: %w", err) + } case TransferMethodRsync: - internal, err = newRsyncTransfer(cfg, log) + transferImpl, err = newRsyncTransfer(cfg, log) + if err != nil { + return nil, fmt.Errorf("failed to create rsync transferrer: %w", err) + } default: return nil, fmt.Errorf("unsupported transfer method: %s", method) } - if err != nil { - return nil, fmt.Errorf("failed to create %s transferrer: %w", method, err) - } + log.WithField("transfer_method", string(method)).Info("High-performance file transfer enabled") - return &TransferClient{ - method: method, - internal: internal, - logger: log, - sshConfig: &cfg.SSH, - serverConfig: &cfg.Destination, + return &transferClient{ + method: method, + fileOps: sshFileOps, + transfer: transferImpl, + logger: log, }, nil } // TransferFile handles file transfer with unified logic - checks file existence, size, and delegates to internal implementation -func (t *TransferClient) TransferFile(sourcePath, destPath string) error { +func (t *transferClient) TransferFile(sourcePath, destPath string) error { // Get source file info fileInfo, err := os.Stat(sourcePath) if err != nil { return fmt.Errorf("failed to stat source file: %w", err) } - // Check if destination file already exists with same size (unified logic) - destExists, err := t.internal.fileExists(destPath) + // Check if destination file exists and get its size in one optimized call + destSize, err := t.fileOps.GetFileSize(destPath) if err != nil { - t.logger.WithError(err).WithField("dest_path", destPath).Debug("Failed to check if destination file exists, proceeding with transfer") - } else if destExists { - // Check if sizes match - if so, skip transfer entirely - destSize, err := t.internal.getFileSize(destPath) - if err != nil { - t.logger.WithError(err).WithField("dest_path", destPath).Debug("Failed to get destination file size, proceeding with transfer") - } else if destSize == fileInfo.Size() { - // Files are the same size, log skip and return early - t.logger.LogTransferSkipped(sourcePath, destPath, fileInfo.Size(), "identical_size") - return nil - } + // File doesn't exist or can't be accessed, proceed with transfer + t.logger.WithError(err).WithField("dest_path", destPath).Debug("Destination file doesn't exist or can't be accessed, proceeding with transfer") + } else if destSize == fileInfo.Size() { + // Files are the same size, log skip and return early + t.logger.LogTransferSkipped(sourcePath, destPath, fileInfo.Size(), "identical_size") + return nil + } + + // Ensure destination directory exists before transfer + if err := t.ensureDestinationDir(destPath); err != nil { + return fmt.Errorf("failed to create destination directory: %w", err) } // If we get here, we're actually going to transfer the file startTime := time.Now() t.logger.LogTransferStarted(sourcePath, destPath, fileInfo.Size()) - // Delegate to internal implementation for actual transfer - if err := t.internal.doTransferFile(sourcePath, destPath); err != nil { + // Delegate to transfer implementation for actual transfer (directory already created) + if err := t.transfer.doTransferFile(sourcePath, destPath); err != nil { + // Check if this is a special "file was skipped" error + if strings.Contains(err.Error(), "file_skipped") { + // File was skipped by rsync (already up-to-date), log as skipped + t.logger.LogTransferSkipped(sourcePath, destPath, fileInfo.Size(), "rsync_skipped") + return nil + } return fmt.Errorf("transfer failed using %s: %w", t.method, err) } @@ -123,44 +136,37 @@ func (t *TransferClient) TransferFile(sourcePath, destPath string) error { return nil } -// TransferFiles transfers multiple files (delegates to internal implementation) -func (t *TransferClient) TransferFiles(files []types.FileTransfer) error { - return t.internal.doTransferFiles(files) +// TransferFiles transfers multiple files (delegates to transfer implementation) +func (t *transferClient) TransferFiles(files []types.FileTransfer) error { + return t.transfer.doTransferFiles(files) } -// Close closes the transfer client -func (t *TransferClient) Close() error { - return t.internal.close() +// Close closes the SSH connection +func (t *transferClient) Close() error { + return t.fileOps.Close() } -// FileExists checks if a file exists on the destination -func (t *TransferClient) FileExists(path string) (bool, error) { - return t.internal.fileExists(path) +// GetFileSize gets the size of a file on the destination (via SSH) +func (t *transferClient) GetFileSize(path string) (int64, error) { + return t.fileOps.GetFileSize(path) } -// GetFileSize gets the size of a file on the destination -func (t *TransferClient) GetFileSize(path string) (int64, error) { - return t.internal.getFileSize(path) +// DeleteFile deletes a file on the destination (via SSH) +func (t *transferClient) DeleteFile(path string) error { + return t.fileOps.DeleteFile(path) } -// DeleteFile deletes a file on the destination -func (t *TransferClient) DeleteFile(path string) error { - return t.internal.deleteFile(path) +// ListDirectoryContents lists directory contents on the destination (via SSH) +func (t *transferClient) ListDirectoryContents(rootPath string) ([]string, error) { + return t.fileOps.ListDirectoryContents(rootPath) } -// ListDirectoryContents lists directory contents on the destination -func (t *TransferClient) ListDirectoryContents(rootPath string) ([]string, error) { - return t.internal.listDirectoryContents(rootPath) -} - -// MapSourcePathToLocal maps source path to local path -func (t *TransferClient) MapSourcePathToLocal(sourcePath string) (string, error) { - return t.internal.mapSourcePathToLocal(sourcePath) -} +// ensureDestinationDir creates the destination directory using SSH +func (t *transferClient) ensureDestinationDir(destPath string) error { + destDir := filepath.Dir(destPath) -// MapLocalPathToDest maps local path to destination path -func (t *TransferClient) MapLocalPathToDest(localPath string) (string, error) { - return t.internal.mapLocalPathToDest(localPath) + // Use SSH to create the directory + return t.fileOps.CreateDirectory(destDir) } // GetOptimalTransferMethod returns the recommended transfer method based on system capabilities @@ -171,14 +177,14 @@ func GetOptimalTransferMethod(log *logger.Logger) TransferMethod { return TransferMethodRsync } - log.Info("rsync not available - falling back to SFTP transfers") - return TransferMethodSFTP + log.Info("rsync not available - falling back to SCP transfers") + return TransferMethodSCP } -// ForceTransferMethod forces a specific transfer method (useful for testing) -func ForceTransferMethod(method TransferMethod, log *logger.Logger) TransferMethod { +// ForceTransferMethod forces a specific transfer method and creates a transfer client (useful for testing) +func ForceTransferMethod(method TransferMethod, cfg *config.Config, log *logger.Logger) (FileTransferrer, error) { log.WithField("forced_method", string(method)).Info("Using forced transfer method") - return method + return NewTransferrer(method, cfg, log) } // IsRsyncAvailable checks if rsync is installed and available locally