Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func (l *Logger) LogTransferCompleted(sourcePath, destPath string, sizeBytes int
}).Info("File transfer completed")
}

// LogTransferSkipped logs when a file transfer is skipped (file already exists)
func (l *Logger) LogTransferSkipped(sourcePath, destPath string, sizeBytes int64, reason string) {
l.WithFields(logrus.Fields{
"event": "transfer_skipped",
"source_path": sourcePath,
"dest_path": destPath,
"size_bytes": sizeBytes,
"reason": reason,
}).Debug("File transfer skipped")
}

// LogError logs an error with context
func (l *Logger) LogError(err error, context map[string]interface{}) {
fields := logrus.Fields{
Expand Down
69 changes: 47 additions & 22 deletions internal/transfer/rsync.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package transfer provides file transfer implementations for syncarr.
package transfer

import (
Expand All @@ -6,7 +7,6 @@ import (
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/nullable-eth/syncarr/internal/config"
"github.com/nullable-eth/syncarr/internal/logger"
Expand All @@ -26,8 +26,8 @@ type RsyncTransfer struct {
checksumSkip bool // Skip checksum verification for speed
}

// NewRsyncTransfer creates a new rsync transfer instance
func NewRsyncTransfer(cfg *config.Config, log *logger.Logger) (*RsyncTransfer, error) {
// newRsyncTransfer creates a new rsync transfer instance (package-private)
func newRsyncTransfer(cfg *config.Config, log *logger.Logger) (*RsyncTransfer, error) {
return &RsyncTransfer{
sshConfig: &cfg.SSH,
serverConfig: &cfg.Destination,
Expand All @@ -41,19 +41,8 @@ func NewRsyncTransfer(cfg *config.Config, log *logger.Logger) (*RsyncTransfer, e
}, nil
}

// TransferFile transfers a single file using rsync
func (r *RsyncTransfer) TransferFile(sourcePath, destPath string) error {
startTime := time.Now()

// Get source file info
fileInfo, err := os.Stat(sourcePath)
if err != nil {
return fmt.Errorf("failed to stat source file: %w", err)
}

// Log transfer start
r.logger.LogTransferStarted(sourcePath, destPath, fileInfo.Size())

// doTransferFile transfers a single file using rsync (internal implementation without common logic)
func (r *RsyncTransfer) doTransferFile(sourcePath, destPath string) error {
// Ensure destination directory exists
if err := r.ensureDestinationDir(destPath); err != nil {
return fmt.Errorf("failed to create destination directory: %w", err)
Expand All @@ -76,18 +65,20 @@ func (r *RsyncTransfer) TransferFile(sourcePath, destPath string) error {
return fmt.Errorf("rsync failed: %w", err)
}

duration := time.Since(startTime)
r.logger.LogTransferCompleted(sourcePath, destPath, fileInfo.Size(), duration)

return nil
}

// TransferFiles transfers multiple files using rsync (can batch for efficiency)
func (r *RsyncTransfer) TransferFiles(files []types.FileTransfer) error {
// TransferFile transfers a single file using rsync (public interface for backward compatibility)
func (r *RsyncTransfer) TransferFile(sourcePath, destPath string) error {
return r.doTransferFile(sourcePath, destPath)
}

// doTransferFiles transfers multiple files using rsync (internal implementation)
func (r *RsyncTransfer) doTransferFiles(files []types.FileTransfer) error {
// For small numbers of files, transfer individually
if len(files) <= 3 {
for _, file := range files {
if err := r.TransferFile(file.SourcePath, file.DestPath); err != nil {
if err := r.doTransferFile(file.SourcePath, file.DestPath); err != nil {
return err
}
}
Expand Down Expand Up @@ -272,6 +263,40 @@ func (r *RsyncTransfer) createIncludeFile(baseDir string, files []types.FileTran
return tmpFile.Name(), nil
}

// TransferFiles transfers multiple files using rsync (public interface for backward compatibility)
func (r *RsyncTransfer) TransferFiles(files []types.FileTransfer) error {
return r.doTransferFiles(files)
}

// Internal interface methods (lowercase for internalTransferer interface)
func (r *RsyncTransfer) fileExists(path string) (bool, error) {
return r.FileExists(path)
}

func (r *RsyncTransfer) getFileSize(path string) (int64, error) {
return r.GetFileSize(path)
}

func (r *RsyncTransfer) deleteFile(path string) error {
return r.DeleteFile(path)
}

func (r *RsyncTransfer) listDirectoryContents(rootPath string) ([]string, error) {
return r.ListDirectoryContents(rootPath)
}

func (r *RsyncTransfer) mapSourcePathToLocal(sourcePath string) (string, error) {
return r.MapSourcePathToLocal(sourcePath)
}

func (r *RsyncTransfer) mapLocalPathToDest(localPath string) (string, error) {
return r.MapLocalPathToDest(localPath)
}

func (r *RsyncTransfer) close() error {
return r.Close()
}

// Close is a no-op for rsync (no persistent connections)
func (r *RsyncTransfer) Close() error {
return nil
Expand Down
76 changes: 46 additions & 30 deletions internal/transfer/scp.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type SCPTransfer struct {
maxConcurrent int // Maximum concurrent transfers
}

// NewSCPTransfer creates a new SCP transfer instance
func NewSCPTransfer(cfg *config.Config, log *logger.Logger) (*SCPTransfer, error) {
// newSCPTransfer creates a new SCP transfer instance (package-private)
func newSCPTransfer(cfg *config.Config, log *logger.Logger) (*SCPTransfer, error) {
transfer := &SCPTransfer{
sshConfig: &cfg.SSH,
serverConfig: &cfg.Destination,
Expand Down Expand Up @@ -115,34 +115,14 @@ func (s *SCPTransfer) Close() error {
return nil
}

// TransferFile transfers a single file from source to destination
func (s *SCPTransfer) TransferFile(sourcePath, destPath string) error {
startTime := time.Now()

// doTransferFile transfers a single file from source to destination (internal implementation without common logic)
func (s *SCPTransfer) doTransferFile(sourcePath, destPath string) error {
// Get source file info first
fileInfo, err := os.Stat(sourcePath)
if err != nil {
return fmt.Errorf("failed to stat source file: %w", err)
}

// Check if destination file already exists
destExists, err := s.FileExists(destPath)
if err != nil {
s.logger.WithError(err).WithField("dest_path", destPath).Warn("Failed to check if destination file exists, proceeding with transfer")
} else if destExists {
// Check if sizes match - if so, skip transfer entirely
destSize, err := s.GetFileSize(destPath)
if err != nil {
s.logger.WithError(err).WithField("dest_path", destPath).Warn("Failed to get destination file size, proceeding with transfer")
} else if destSize == fileInfo.Size() {
// Files are the same size, return early without any transfer logging
return nil
}
}

// If we get here, we're actually going to transfer the file
s.logger.LogTransferStarted(sourcePath, destPath, fileInfo.Size())

// Create destination directory if it doesn't exist
// Use forward slashes for remote paths (SFTP always uses Unix-style paths)
lastSlash := strings.LastIndex(destPath, "/")
Expand Down Expand Up @@ -245,12 +225,14 @@ func (s *SCPTransfer) TransferFile(sourcePath, destPath string) error {
fileInfo.Size(), bytesTransferred)
}

duration := time.Since(startTime)
s.logger.LogTransferCompleted(sourcePath, destPath, bytesTransferred, duration)

return nil
}

// TransferFile implements Phase 2: Single File Transfer (public interface for backward compatibility)
func (s *SCPTransfer) TransferFile(sourcePath, destPath string) error {
return s.doTransferFile(sourcePath, destPath)
}

// TransferItemFiles implements Phase 3: Directory-Based File Transfer
// Copy all files in the containing directories (including subtitles) to the destination server
func (s *SCPTransfer) TransferItemFiles(item *types.SyncableItem) error {
Expand Down Expand Up @@ -299,10 +281,10 @@ func (s *SCPTransfer) TransferItemFiles(item *types.SyncableItem) error {
return nil
}

// TransferFiles transfers multiple files (legacy method, kept for compatibility)
func (s *SCPTransfer) TransferFiles(files []types.FileTransfer) error {
// doTransferFiles transfers multiple files (internal implementation)
func (s *SCPTransfer) doTransferFiles(files []types.FileTransfer) error {
for _, file := range files {
if err := s.TransferFile(file.SourcePath, file.DestPath); err != nil {
if err := s.doTransferFile(file.SourcePath, file.DestPath); err != nil {
s.logger.LogError(err, map[string]interface{}{
"source_path": file.SourcePath,
"dest_path": file.DestPath,
Expand All @@ -313,6 +295,40 @@ func (s *SCPTransfer) TransferFiles(files []types.FileTransfer) error {
return nil
}

// TransferFiles transfers multiple files (public interface for backward compatibility)
func (s *SCPTransfer) TransferFiles(files []types.FileTransfer) error {
return s.doTransferFiles(files)
}

// Internal interface methods (lowercase for internalTransferer interface)
func (s *SCPTransfer) fileExists(path string) (bool, error) {
return s.FileExists(path)
}

func (s *SCPTransfer) getFileSize(path string) (int64, error) {
return s.GetFileSize(path)
}

func (s *SCPTransfer) deleteFile(path string) error {
return s.DeleteFile(path)
}

func (s *SCPTransfer) listDirectoryContents(rootPath string) ([]string, error) {
return s.ListDirectoryContents(rootPath)
}

func (s *SCPTransfer) mapSourcePathToLocal(sourcePath string) (string, error) {
return s.MapSourcePathToLocal(sourcePath)
}

func (s *SCPTransfer) mapLocalPathToDest(localPath string) (string, error) {
return s.MapLocalPathToDest(localPath)
}

func (s *SCPTransfer) close() error {
return s.Close()
}

// TransferFilesParallel transfers multiple files in parallel for better performance
func (s *SCPTransfer) TransferFilesParallel(files []types.FileTransfer) error {
if len(files) == 0 {
Expand Down
125 changes: 122 additions & 3 deletions internal/transfer/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/exec"
"strings"
"time"

"github.com/nullable-eth/syncarr/internal/config"
"github.com/nullable-eth/syncarr/internal/logger"
Expand Down Expand Up @@ -32,16 +33,134 @@ type FileTransferrer interface {
MapLocalPathToDest(localPath string) (string, error)
}

// NewTransferrer creates a new file transferrer based on the specified method
// internalTransferer defines the interface for internal transfer implementations (rsync/scp)
// These only handle the actual transfer without common logic like file checks and logging
type internalTransferer interface {
doTransferFile(sourcePath, destPath string) error
doTransferFiles(files []types.FileTransfer) error
close() error
fileExists(path string) (bool, error)
getFileSize(path string) (int64, error)
deleteFile(path string) error
listDirectoryContents(rootPath string) ([]string, error)
mapSourcePathToLocal(sourcePath string) (string, error)
mapLocalPathToDest(localPath string) (string, error)
}

// TransferClient is the unified client that handles common logic and delegates to internal implementations
type TransferClient struct {
method TransferMethod
internal internalTransferer
logger *logger.Logger
sshConfig *config.SSHConfig
serverConfig *config.PlexServerConfig
}

// NewTransferrer creates a new unified file transferrer that automatically chooses the best method
func NewTransferrer(method TransferMethod, cfg *config.Config, log *logger.Logger) (FileTransferrer, error) {
var internal internalTransferer
var err error

switch method {
case TransferMethodSFTP:
return NewSCPTransfer(cfg, log)
internal, err = newSCPTransfer(cfg, log)
case TransferMethodRsync:
return NewRsyncTransfer(cfg, log)
internal, err = newRsyncTransfer(cfg, log)
default:
return nil, fmt.Errorf("unsupported transfer method: %s", method)
}

if err != nil {
return nil, fmt.Errorf("failed to create %s transferrer: %w", method, err)
}

return &TransferClient{
method: method,
internal: internal,
logger: log,
sshConfig: &cfg.SSH,
serverConfig: &cfg.Destination,
}, nil
}

// TransferFile handles file transfer with unified logic - checks file existence, size, and delegates to internal implementation
func (t *TransferClient) TransferFile(sourcePath, destPath string) error {
// Get source file info
fileInfo, err := os.Stat(sourcePath)
if err != nil {
return fmt.Errorf("failed to stat source file: %w", err)
}

// Check if destination file already exists with same size (unified logic)
destExists, err := t.internal.fileExists(destPath)
if err != nil {
t.logger.WithError(err).WithField("dest_path", destPath).Debug("Failed to check if destination file exists, proceeding with transfer")
} else if destExists {
// Check if sizes match - if so, skip transfer entirely
destSize, err := t.internal.getFileSize(destPath)
if err != nil {
t.logger.WithError(err).WithField("dest_path", destPath).Debug("Failed to get destination file size, proceeding with transfer")
} else if destSize == fileInfo.Size() {
// Files are the same size, log skip and return early
t.logger.LogTransferSkipped(sourcePath, destPath, fileInfo.Size(), "identical_size")
return nil
}
}

// If we get here, we're actually going to transfer the file
startTime := time.Now()
t.logger.LogTransferStarted(sourcePath, destPath, fileInfo.Size())

// Delegate to internal implementation for actual transfer
if err := t.internal.doTransferFile(sourcePath, destPath); err != nil {
return fmt.Errorf("transfer failed using %s: %w", t.method, err)
}

// Log successful completion
duration := time.Since(startTime)
t.logger.LogTransferCompleted(sourcePath, destPath, fileInfo.Size(), duration)

return nil
}

// TransferFiles transfers multiple files (delegates to internal implementation)
func (t *TransferClient) TransferFiles(files []types.FileTransfer) error {
return t.internal.doTransferFiles(files)
}

// Close closes the transfer client
func (t *TransferClient) Close() error {
return t.internal.close()
}

// FileExists checks if a file exists on the destination
func (t *TransferClient) FileExists(path string) (bool, error) {
return t.internal.fileExists(path)
}

// GetFileSize gets the size of a file on the destination
func (t *TransferClient) GetFileSize(path string) (int64, error) {
return t.internal.getFileSize(path)
}

// DeleteFile deletes a file on the destination
func (t *TransferClient) DeleteFile(path string) error {
return t.internal.deleteFile(path)
}

// ListDirectoryContents lists directory contents on the destination
func (t *TransferClient) ListDirectoryContents(rootPath string) ([]string, error) {
return t.internal.listDirectoryContents(rootPath)
}

// MapSourcePathToLocal maps source path to local path
func (t *TransferClient) MapSourcePathToLocal(sourcePath string) (string, error) {
return t.internal.mapSourcePathToLocal(sourcePath)
}

// MapLocalPathToDest maps local path to destination path
func (t *TransferClient) MapLocalPathToDest(localPath string) (string, error) {
return t.internal.mapLocalPathToDest(localPath)
}

// GetOptimalTransferMethod returns the recommended transfer method based on system capabilities
Expand Down
Loading