44
55using System . Collections . Concurrent ;
66using System . Diagnostics . CodeAnalysis ;
7+ using System . IO . Abstractions ;
78using System . Security . Cryptography ;
89using Amazon . S3 ;
910using Amazon . S3 . Model ;
1011using Microsoft . Extensions . Logging ;
1112
1213namespace Documentation . Assembler . Deploying ;
1314
14- public class AwsS3SyncPlanStrategy ( ILoggerFactory logFactory , IAmazonS3 s3Client , string bucketName , AssembleContext context ) : IDocsSyncPlanStrategy
15+ public interface IS3EtagCalculator
16+ {
17+ Task < string > CalculateS3ETag ( string filePath , Cancel ctx = default ) ;
18+ }
19+
20+ public class S3EtagCalculator ( ILoggerFactory logFactory , IFileSystem readFileSystem ) : IS3EtagCalculator
1521{
16- internal const long PartSize = 5 * 1024 * 1024 ; // 5MB
1722 private readonly ILogger < AwsS3SyncPlanStrategy > _logger = logFactory . CreateLogger < AwsS3SyncPlanStrategy > ( ) ;
23+
1824 private static readonly ConcurrentDictionary < string , string > EtagCache = new ( ) ;
1925
26+ internal const long PartSize = 5 * 1024 * 1024 ; // 5MB
27+
28+ [ SuppressMessage ( "Security" , "CA5351:Do Not Use Broken Cryptographic Algorithms" ) ]
29+ public async Task < string > CalculateS3ETag ( string filePath , Cancel ctx = default )
30+ {
31+ if ( EtagCache . TryGetValue ( filePath , out var cachedEtag ) )
32+ {
33+ _logger . LogDebug ( "Using cached ETag for {Path}" , filePath ) ;
34+ return cachedEtag ;
35+ }
36+
37+ var fileInfo = readFileSystem . FileInfo . New ( filePath ) ;
38+ var fileSize = fileInfo . Length ;
39+
40+ // For files under 5MB, use simple MD5 (matching TransferUtility behavior)
41+ if ( fileSize <= PartSize )
42+ {
43+ await using var stream = readFileSystem . FileStream . New ( filePath , FileMode . Open , FileAccess . Read , FileShare . Read ) ;
44+ var smallBuffer = new byte [ fileSize ] ;
45+ var bytesRead = await stream . ReadAsync ( smallBuffer . AsMemory ( 0 , ( int ) fileSize ) , ctx ) ;
46+ var hash = MD5 . HashData ( smallBuffer . AsSpan ( 0 , bytesRead ) ) ;
47+ var etag = Convert . ToHexStringLower ( hash ) ;
48+ EtagCache [ filePath ] = etag ;
49+ return etag ;
50+ }
51+
52+ // For files over 5MB, use multipart format with 5MB parts (matching TransferUtility)
53+ var parts = ( int ) Math . Ceiling ( ( double ) fileSize / PartSize ) ;
54+
55+ await using var fileStream = readFileSystem . FileStream . New ( filePath , FileMode . Open , FileAccess . Read , FileShare . Read ) ;
56+ var partBuffer = new byte [ PartSize ] ;
57+ var partHashes = new List < byte [ ] > ( ) ;
58+
59+ for ( var i = 0 ; i < parts ; i ++ )
60+ {
61+ var bytesRead = await fileStream . ReadAsync ( partBuffer . AsMemory ( 0 , partBuffer . Length ) , ctx ) ;
62+ var partHash = MD5 . HashData ( partBuffer . AsSpan ( 0 , bytesRead ) ) ;
63+ partHashes . Add ( partHash ) ;
64+ }
65+
66+ // Concatenate all part hashes
67+ var concatenatedHashes = partHashes . SelectMany ( h => h ) . ToArray ( ) ;
68+ var finalHash = MD5 . HashData ( concatenatedHashes ) ;
69+
70+ var multipartEtag = $ "{ Convert . ToHexStringLower ( finalHash ) } -{ parts } ";
71+ EtagCache [ filePath ] = multipartEtag ;
72+ return multipartEtag ;
73+ }
74+ }
75+
76+ public class AwsS3SyncPlanStrategy (
77+ ILoggerFactory logFactory ,
78+ IAmazonS3 s3Client ,
79+ string bucketName ,
80+ AssembleContext context ,
81+ IS3EtagCalculator ? calculator = null
82+ )
83+ : IDocsSyncPlanStrategy
84+ {
85+ private readonly ILogger < AwsS3SyncPlanStrategy > _logger = logFactory . CreateLogger < AwsS3SyncPlanStrategy > ( ) ;
86+
87+ private readonly IS3EtagCalculator _s3EtagCalculator = calculator ?? new S3EtagCalculator ( logFactory , context . ReadFileSystem ) ;
88+
2089 private bool IsSymlink ( string path )
2190 {
2291 var fileInfo = context . ReadFileSystem . FileInfo . New ( path ) ;
@@ -42,7 +111,7 @@ await Parallel.ForEachAsync(localObjects, ctx, async (localFile, token) =>
42111 if ( remoteObjects . TryGetValue ( destinationPath , out var remoteObject ) )
43112 {
44113 // Check if the ETag differs for updates
45- var localETag = await CalculateS3ETag ( localFile . FullName , token ) ;
114+ var localETag = await _s3EtagCalculator . CalculateS3ETag ( localFile . FullName , token ) ;
46115 var remoteETag = remoteObject . ETag . Trim ( '"' ) ; // Remove quotes from remote ETag
47116 if ( localETag == remoteETag )
48117 {
@@ -89,14 +158,34 @@ await Parallel.ForEachAsync(localObjects, ctx, async (localFile, token) =>
89158
90159 return new SyncPlan
91160 {
161+ TotalSourceFiles = localObjects . Length ,
92162 DeleteRequests = deleteRequests . ToList ( ) ,
93163 AddRequests = addRequests . ToList ( ) ,
94164 UpdateRequests = updateRequests . ToList ( ) ,
95165 SkipRequests = skipRequests . ToList ( ) ,
96- Count = deleteRequests . Count + addRequests . Count + updateRequests . Count + skipRequests . Count
166+ TotalFilesToSync = deleteRequests . Count + addRequests . Count + updateRequests . Count + skipRequests . Count
97167 } ;
98168 }
99169
170+ /// <inheritdoc />
171+ public ( bool , float ) Validate ( SyncPlan plan , float deleteThreshold )
172+ {
173+ if ( plan . TotalSourceFiles == 0 )
174+ {
175+ _logger . LogError ( "No files to sync" ) ;
176+ return ( false , 1.0f ) ;
177+ }
178+
179+ var deleteRatio = ( float ) plan . DeleteRequests . Count / plan . TotalFilesToSync ;
180+ if ( deleteRatio > deleteThreshold )
181+ {
182+ _logger . LogError ( "Delete ratio is {Ratio} which is greater than the threshold of {Threshold}" , deleteRatio , deleteThreshold ) ;
183+ return ( false , deleteRatio ) ;
184+ }
185+
186+ return ( true , deleteRatio ) ;
187+ }
188+
100189 private async Task < Dictionary < string , S3Object > > ListObjects ( Cancel ctx = default )
101190 {
102191 var listBucketRequest = new ListObjectsV2Request
@@ -115,51 +204,4 @@ private async Task<Dictionary<string, S3Object>> ListObjects(Cancel ctx = defaul
115204
116205 return objects . ToDictionary ( o => o . Key ) ;
117206 }
118-
119- [ SuppressMessage ( "Security" , "CA5351:Do Not Use Broken Cryptographic Algorithms" ) ]
120- private async Task < string > CalculateS3ETag ( string filePath , Cancel ctx = default )
121- {
122- if ( EtagCache . TryGetValue ( filePath , out var cachedEtag ) )
123- {
124- _logger . LogDebug ( "Using cached ETag for {Path}" , filePath ) ;
125- return cachedEtag ;
126- }
127-
128- var fileInfo = context . ReadFileSystem . FileInfo . New ( filePath ) ;
129- var fileSize = fileInfo . Length ;
130-
131- // For files under 5MB, use simple MD5 (matching TransferUtility behavior)
132- if ( fileSize <= PartSize )
133- {
134- await using var stream = context . ReadFileSystem . FileStream . New ( filePath , FileMode . Open , FileAccess . Read , FileShare . Read ) ;
135- var smallBuffer = new byte [ fileSize ] ;
136- var bytesRead = await stream . ReadAsync ( smallBuffer . AsMemory ( 0 , ( int ) fileSize ) , ctx ) ;
137- var hash = MD5 . HashData ( smallBuffer . AsSpan ( 0 , bytesRead ) ) ;
138- var etag = Convert . ToHexStringLower ( hash ) ;
139- EtagCache [ filePath ] = etag ;
140- return etag ;
141- }
142-
143- // For files over 5MB, use multipart format with 5MB parts (matching TransferUtility)
144- var parts = ( int ) Math . Ceiling ( ( double ) fileSize / PartSize ) ;
145-
146- await using var fileStream = context . ReadFileSystem . FileStream . New ( filePath , FileMode . Open , FileAccess . Read , FileShare . Read ) ;
147- var partBuffer = new byte [ PartSize ] ;
148- var partHashes = new List < byte [ ] > ( ) ;
149-
150- for ( var i = 0 ; i < parts ; i ++ )
151- {
152- var bytesRead = await fileStream . ReadAsync ( partBuffer . AsMemory ( 0 , partBuffer . Length ) , ctx ) ;
153- var partHash = MD5 . HashData ( partBuffer . AsSpan ( 0 , bytesRead ) ) ;
154- partHashes . Add ( partHash ) ;
155- }
156-
157- // Concatenate all part hashes
158- var concatenatedHashes = partHashes . SelectMany ( h => h ) . ToArray ( ) ;
159- var finalHash = MD5 . HashData ( concatenatedHashes ) ;
160-
161- var multipartEtag = $ "{ Convert . ToHexStringLower ( finalHash ) } -{ parts } ";
162- EtagCache [ filePath ] = multipartEtag ;
163- return multipartEtag ;
164- }
165207}
0 commit comments