Skip to content

Commit 841c389

Browse files
authored
Merge pull request #10 from nullable-eth/streamlined-transfer
feature: streamlined tansfer
2 parents 08d2548 + 74abf6d commit 841c389

File tree

4 files changed

+226
-55
lines changed

4 files changed

+226
-55
lines changed

internal/logger/logger.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ func (l *Logger) LogTransferCompleted(sourcePath, destPath string, sizeBytes int
8585
}).Info("File transfer completed")
8686
}
8787

88+
// LogTransferSkipped logs when a file transfer is skipped (file already exists)
89+
func (l *Logger) LogTransferSkipped(sourcePath, destPath string, sizeBytes int64, reason string) {
90+
l.WithFields(logrus.Fields{
91+
"event": "transfer_skipped",
92+
"source_path": sourcePath,
93+
"dest_path": destPath,
94+
"size_bytes": sizeBytes,
95+
"reason": reason,
96+
}).Debug("File transfer skipped")
97+
}
98+
8899
// LogError logs an error with context
89100
func (l *Logger) LogError(err error, context map[string]interface{}) {
90101
fields := logrus.Fields{

internal/transfer/rsync.go

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// Package transfer provides file transfer implementations for syncarr.
12
package transfer
23

34
import (
@@ -6,7 +7,6 @@ import (
67
"os/exec"
78
"path/filepath"
89
"strings"
9-
"time"
1010

1111
"github.com/nullable-eth/syncarr/internal/config"
1212
"github.com/nullable-eth/syncarr/internal/logger"
@@ -26,8 +26,8 @@ type RsyncTransfer struct {
2626
checksumSkip bool // Skip checksum verification for speed
2727
}
2828

29-
// NewRsyncTransfer creates a new rsync transfer instance
30-
func NewRsyncTransfer(cfg *config.Config, log *logger.Logger) (*RsyncTransfer, error) {
29+
// newRsyncTransfer creates a new rsync transfer instance (package-private)
30+
func newRsyncTransfer(cfg *config.Config, log *logger.Logger) (*RsyncTransfer, error) {
3131
return &RsyncTransfer{
3232
sshConfig: &cfg.SSH,
3333
serverConfig: &cfg.Destination,
@@ -41,19 +41,8 @@ func NewRsyncTransfer(cfg *config.Config, log *logger.Logger) (*RsyncTransfer, e
4141
}, nil
4242
}
4343

44-
// TransferFile transfers a single file using rsync
45-
func (r *RsyncTransfer) TransferFile(sourcePath, destPath string) error {
46-
startTime := time.Now()
47-
48-
// Get source file info
49-
fileInfo, err := os.Stat(sourcePath)
50-
if err != nil {
51-
return fmt.Errorf("failed to stat source file: %w", err)
52-
}
53-
54-
// Log transfer start
55-
r.logger.LogTransferStarted(sourcePath, destPath, fileInfo.Size())
56-
44+
// doTransferFile transfers a single file using rsync (internal implementation without common logic)
45+
func (r *RsyncTransfer) doTransferFile(sourcePath, destPath string) error {
5746
// Ensure destination directory exists
5847
if err := r.ensureDestinationDir(destPath); err != nil {
5948
return fmt.Errorf("failed to create destination directory: %w", err)
@@ -76,18 +65,20 @@ func (r *RsyncTransfer) TransferFile(sourcePath, destPath string) error {
7665
return fmt.Errorf("rsync failed: %w", err)
7766
}
7867

79-
duration := time.Since(startTime)
80-
r.logger.LogTransferCompleted(sourcePath, destPath, fileInfo.Size(), duration)
81-
8268
return nil
8369
}
8470

85-
// TransferFiles transfers multiple files using rsync (can batch for efficiency)
86-
func (r *RsyncTransfer) TransferFiles(files []types.FileTransfer) error {
71+
// TransferFile transfers a single file using rsync (public interface for backward compatibility)
72+
func (r *RsyncTransfer) TransferFile(sourcePath, destPath string) error {
73+
return r.doTransferFile(sourcePath, destPath)
74+
}
75+
76+
// doTransferFiles transfers multiple files using rsync (internal implementation)
77+
func (r *RsyncTransfer) doTransferFiles(files []types.FileTransfer) error {
8778
// For small numbers of files, transfer individually
8879
if len(files) <= 3 {
8980
for _, file := range files {
90-
if err := r.TransferFile(file.SourcePath, file.DestPath); err != nil {
81+
if err := r.doTransferFile(file.SourcePath, file.DestPath); err != nil {
9182
return err
9283
}
9384
}
@@ -272,6 +263,40 @@ func (r *RsyncTransfer) createIncludeFile(baseDir string, files []types.FileTran
272263
return tmpFile.Name(), nil
273264
}
274265

266+
// TransferFiles transfers multiple files using rsync (public interface for backward compatibility)
267+
func (r *RsyncTransfer) TransferFiles(files []types.FileTransfer) error {
268+
return r.doTransferFiles(files)
269+
}
270+
271+
// Internal interface methods (lowercase for internalTransferer interface)
272+
func (r *RsyncTransfer) fileExists(path string) (bool, error) {
273+
return r.FileExists(path)
274+
}
275+
276+
func (r *RsyncTransfer) getFileSize(path string) (int64, error) {
277+
return r.GetFileSize(path)
278+
}
279+
280+
func (r *RsyncTransfer) deleteFile(path string) error {
281+
return r.DeleteFile(path)
282+
}
283+
284+
func (r *RsyncTransfer) listDirectoryContents(rootPath string) ([]string, error) {
285+
return r.ListDirectoryContents(rootPath)
286+
}
287+
288+
func (r *RsyncTransfer) mapSourcePathToLocal(sourcePath string) (string, error) {
289+
return r.MapSourcePathToLocal(sourcePath)
290+
}
291+
292+
func (r *RsyncTransfer) mapLocalPathToDest(localPath string) (string, error) {
293+
return r.MapLocalPathToDest(localPath)
294+
}
295+
296+
func (r *RsyncTransfer) close() error {
297+
return r.Close()
298+
}
299+
275300
// Close is a no-op for rsync (no persistent connections)
276301
func (r *RsyncTransfer) Close() error {
277302
return nil

internal/transfer/scp.go

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ type SCPTransfer struct {
3030
maxConcurrent int // Maximum concurrent transfers
3131
}
3232

33-
// NewSCPTransfer creates a new SCP transfer instance
34-
func NewSCPTransfer(cfg *config.Config, log *logger.Logger) (*SCPTransfer, error) {
33+
// newSCPTransfer creates a new SCP transfer instance (package-private)
34+
func newSCPTransfer(cfg *config.Config, log *logger.Logger) (*SCPTransfer, error) {
3535
transfer := &SCPTransfer{
3636
sshConfig: &cfg.SSH,
3737
serverConfig: &cfg.Destination,
@@ -115,34 +115,14 @@ func (s *SCPTransfer) Close() error {
115115
return nil
116116
}
117117

118-
// TransferFile transfers a single file from source to destination
119-
func (s *SCPTransfer) TransferFile(sourcePath, destPath string) error {
120-
startTime := time.Now()
121-
118+
// doTransferFile transfers a single file from source to destination (internal implementation without common logic)
119+
func (s *SCPTransfer) doTransferFile(sourcePath, destPath string) error {
122120
// Get source file info first
123121
fileInfo, err := os.Stat(sourcePath)
124122
if err != nil {
125123
return fmt.Errorf("failed to stat source file: %w", err)
126124
}
127125

128-
// Check if destination file already exists
129-
destExists, err := s.FileExists(destPath)
130-
if err != nil {
131-
s.logger.WithError(err).WithField("dest_path", destPath).Warn("Failed to check if destination file exists, proceeding with transfer")
132-
} else if destExists {
133-
// Check if sizes match - if so, skip transfer entirely
134-
destSize, err := s.GetFileSize(destPath)
135-
if err != nil {
136-
s.logger.WithError(err).WithField("dest_path", destPath).Warn("Failed to get destination file size, proceeding with transfer")
137-
} else if destSize == fileInfo.Size() {
138-
// Files are the same size, return early without any transfer logging
139-
return nil
140-
}
141-
}
142-
143-
// If we get here, we're actually going to transfer the file
144-
s.logger.LogTransferStarted(sourcePath, destPath, fileInfo.Size())
145-
146126
// Create destination directory if it doesn't exist
147127
// Use forward slashes for remote paths (SFTP always uses Unix-style paths)
148128
lastSlash := strings.LastIndex(destPath, "/")
@@ -245,12 +225,14 @@ func (s *SCPTransfer) TransferFile(sourcePath, destPath string) error {
245225
fileInfo.Size(), bytesTransferred)
246226
}
247227

248-
duration := time.Since(startTime)
249-
s.logger.LogTransferCompleted(sourcePath, destPath, bytesTransferred, duration)
250-
251228
return nil
252229
}
253230

231+
// TransferFile implements Phase 2: Single File Transfer (public interface for backward compatibility)
232+
func (s *SCPTransfer) TransferFile(sourcePath, destPath string) error {
233+
return s.doTransferFile(sourcePath, destPath)
234+
}
235+
254236
// TransferItemFiles implements Phase 3: Directory-Based File Transfer
255237
// Copy all files in the containing directories (including subtitles) to the destination server
256238
func (s *SCPTransfer) TransferItemFiles(item *types.SyncableItem) error {
@@ -299,10 +281,10 @@ func (s *SCPTransfer) TransferItemFiles(item *types.SyncableItem) error {
299281
return nil
300282
}
301283

302-
// TransferFiles transfers multiple files (legacy method, kept for compatibility)
303-
func (s *SCPTransfer) TransferFiles(files []types.FileTransfer) error {
284+
// doTransferFiles transfers multiple files (internal implementation)
285+
func (s *SCPTransfer) doTransferFiles(files []types.FileTransfer) error {
304286
for _, file := range files {
305-
if err := s.TransferFile(file.SourcePath, file.DestPath); err != nil {
287+
if err := s.doTransferFile(file.SourcePath, file.DestPath); err != nil {
306288
s.logger.LogError(err, map[string]interface{}{
307289
"source_path": file.SourcePath,
308290
"dest_path": file.DestPath,
@@ -313,6 +295,40 @@ func (s *SCPTransfer) TransferFiles(files []types.FileTransfer) error {
313295
return nil
314296
}
315297

298+
// TransferFiles transfers multiple files (public interface for backward compatibility)
299+
func (s *SCPTransfer) TransferFiles(files []types.FileTransfer) error {
300+
return s.doTransferFiles(files)
301+
}
302+
303+
// Internal interface methods (lowercase for internalTransferer interface)
304+
func (s *SCPTransfer) fileExists(path string) (bool, error) {
305+
return s.FileExists(path)
306+
}
307+
308+
func (s *SCPTransfer) getFileSize(path string) (int64, error) {
309+
return s.GetFileSize(path)
310+
}
311+
312+
func (s *SCPTransfer) deleteFile(path string) error {
313+
return s.DeleteFile(path)
314+
}
315+
316+
func (s *SCPTransfer) listDirectoryContents(rootPath string) ([]string, error) {
317+
return s.ListDirectoryContents(rootPath)
318+
}
319+
320+
func (s *SCPTransfer) mapSourcePathToLocal(sourcePath string) (string, error) {
321+
return s.MapSourcePathToLocal(sourcePath)
322+
}
323+
324+
func (s *SCPTransfer) mapLocalPathToDest(localPath string) (string, error) {
325+
return s.MapLocalPathToDest(localPath)
326+
}
327+
328+
func (s *SCPTransfer) close() error {
329+
return s.Close()
330+
}
331+
316332
// TransferFilesParallel transfers multiple files in parallel for better performance
317333
func (s *SCPTransfer) TransferFilesParallel(files []types.FileTransfer) error {
318334
if len(files) == 0 {

internal/transfer/transfer.go

Lines changed: 122 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"os/exec"
77
"strings"
8+
"time"
89

910
"github.com/nullable-eth/syncarr/internal/config"
1011
"github.com/nullable-eth/syncarr/internal/logger"
@@ -32,16 +33,134 @@ type FileTransferrer interface {
3233
MapLocalPathToDest(localPath string) (string, error)
3334
}
3435

35-
// NewTransferrer creates a new file transferrer based on the specified method
36+
// internalTransferer defines the interface for internal transfer implementations (rsync/scp)
37+
// These only handle the actual transfer without common logic like file checks and logging
38+
type internalTransferer interface {
39+
doTransferFile(sourcePath, destPath string) error
40+
doTransferFiles(files []types.FileTransfer) error
41+
close() error
42+
fileExists(path string) (bool, error)
43+
getFileSize(path string) (int64, error)
44+
deleteFile(path string) error
45+
listDirectoryContents(rootPath string) ([]string, error)
46+
mapSourcePathToLocal(sourcePath string) (string, error)
47+
mapLocalPathToDest(localPath string) (string, error)
48+
}
49+
50+
// TransferClient is the unified client that handles common logic and delegates to internal implementations
51+
type TransferClient struct {
52+
method TransferMethod
53+
internal internalTransferer
54+
logger *logger.Logger
55+
sshConfig *config.SSHConfig
56+
serverConfig *config.PlexServerConfig
57+
}
58+
59+
// NewTransferrer creates a new unified file transferrer that automatically chooses the best method
3660
func NewTransferrer(method TransferMethod, cfg *config.Config, log *logger.Logger) (FileTransferrer, error) {
61+
var internal internalTransferer
62+
var err error
63+
3764
switch method {
3865
case TransferMethodSFTP:
39-
return NewSCPTransfer(cfg, log)
66+
internal, err = newSCPTransfer(cfg, log)
4067
case TransferMethodRsync:
41-
return NewRsyncTransfer(cfg, log)
68+
internal, err = newRsyncTransfer(cfg, log)
4269
default:
4370
return nil, fmt.Errorf("unsupported transfer method: %s", method)
4471
}
72+
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to create %s transferrer: %w", method, err)
75+
}
76+
77+
return &TransferClient{
78+
method: method,
79+
internal: internal,
80+
logger: log,
81+
sshConfig: &cfg.SSH,
82+
serverConfig: &cfg.Destination,
83+
}, nil
84+
}
85+
86+
// TransferFile handles file transfer with unified logic - checks file existence, size, and delegates to internal implementation
87+
func (t *TransferClient) TransferFile(sourcePath, destPath string) error {
88+
// Get source file info
89+
fileInfo, err := os.Stat(sourcePath)
90+
if err != nil {
91+
return fmt.Errorf("failed to stat source file: %w", err)
92+
}
93+
94+
// Check if destination file already exists with same size (unified logic)
95+
destExists, err := t.internal.fileExists(destPath)
96+
if err != nil {
97+
t.logger.WithError(err).WithField("dest_path", destPath).Debug("Failed to check if destination file exists, proceeding with transfer")
98+
} else if destExists {
99+
// Check if sizes match - if so, skip transfer entirely
100+
destSize, err := t.internal.getFileSize(destPath)
101+
if err != nil {
102+
t.logger.WithError(err).WithField("dest_path", destPath).Debug("Failed to get destination file size, proceeding with transfer")
103+
} else if destSize == fileInfo.Size() {
104+
// Files are the same size, log skip and return early
105+
t.logger.LogTransferSkipped(sourcePath, destPath, fileInfo.Size(), "identical_size")
106+
return nil
107+
}
108+
}
109+
110+
// If we get here, we're actually going to transfer the file
111+
startTime := time.Now()
112+
t.logger.LogTransferStarted(sourcePath, destPath, fileInfo.Size())
113+
114+
// Delegate to internal implementation for actual transfer
115+
if err := t.internal.doTransferFile(sourcePath, destPath); err != nil {
116+
return fmt.Errorf("transfer failed using %s: %w", t.method, err)
117+
}
118+
119+
// Log successful completion
120+
duration := time.Since(startTime)
121+
t.logger.LogTransferCompleted(sourcePath, destPath, fileInfo.Size(), duration)
122+
123+
return nil
124+
}
125+
126+
// TransferFiles transfers multiple files (delegates to internal implementation)
127+
func (t *TransferClient) TransferFiles(files []types.FileTransfer) error {
128+
return t.internal.doTransferFiles(files)
129+
}
130+
131+
// Close closes the transfer client
132+
func (t *TransferClient) Close() error {
133+
return t.internal.close()
134+
}
135+
136+
// FileExists checks if a file exists on the destination
137+
func (t *TransferClient) FileExists(path string) (bool, error) {
138+
return t.internal.fileExists(path)
139+
}
140+
141+
// GetFileSize gets the size of a file on the destination
142+
func (t *TransferClient) GetFileSize(path string) (int64, error) {
143+
return t.internal.getFileSize(path)
144+
}
145+
146+
// DeleteFile deletes a file on the destination
147+
func (t *TransferClient) DeleteFile(path string) error {
148+
return t.internal.deleteFile(path)
149+
}
150+
151+
// ListDirectoryContents lists directory contents on the destination
152+
func (t *TransferClient) ListDirectoryContents(rootPath string) ([]string, error) {
153+
return t.internal.listDirectoryContents(rootPath)
154+
}
155+
156+
// MapSourcePathToLocal maps source path to local path
157+
func (t *TransferClient) MapSourcePathToLocal(sourcePath string) (string, error) {
158+
return t.internal.mapSourcePathToLocal(sourcePath)
159+
}
160+
161+
// MapLocalPathToDest maps local path to destination path
162+
func (t *TransferClient) MapLocalPathToDest(localPath string) (string, error) {
163+
return t.internal.mapLocalPathToDest(localPath)
45164
}
46165

47166
// GetOptimalTransferMethod returns the recommended transfer method based on system capabilities

0 commit comments

Comments
 (0)