@@ -29,6 +29,20 @@ public class S3Repo implements SourceRepo {
29
29
private static final double S3_TARGET_THROUGHPUT_GIBPS = 8.0 ; // Arbitrarily chosen
30
30
private static final long S3_MAX_MEMORY_BYTES = 1024L * 1024 * 1024 ; // Arbitrarily chosen
31
31
private static final long S3_MINIMUM_PART_SIZE_BYTES = 8L * 1024 * 1024 ; // Default, but be explicit
32
+
33
+ // Transfer Manager maintains buffers for CRT responses which without being careful can overload heap.
34
+ // A quick calculation for heap usage is:
35
+ // MaxHeapUsedByCrtBuffers = (initialReadBufferSizeInBytes * concurrentFileDownloads) +
36
+ // minimalPartSizeInBytes * min(concurrentFileDownloads, maxConcurrency)
37
+ // The default values that we use are:
38
+ // initialReadBufferSizeInBytes = 80MB
39
+ // minimalPartSizeInBytes = 8MB
40
+ // maxConcurrency = 100
41
+ //
42
+ // To reduce heap memory usage to under 1GB we will set concurrentFileDownloads to 10 (otherwise it defaults to 100)
43
+ private static int DIRECTORY_TRANSFER_MAX_CONCURRENT_FILE_DOWNLOADS = 10 ;
44
+
45
+
32
46
public static final String INDICES_PREFIX_STR = "indices/" ;
33
47
private final Path s3LocalDir ;
34
48
private final S3AsyncClient s3Client ;
@@ -150,7 +164,10 @@ public Path getBlobFilePath(String indexId, int shardId, String blobName) {
150
164
151
165
@ Override
152
166
public void prepBlobFiles (ShardMetadata shardMetadata ) {
153
- try (S3TransferManager transferManager = S3TransferManager .builder ().s3Client (s3Client ).build ()) {
167
+ try (S3TransferManager transferManager = S3TransferManager .builder ()
168
+ .s3Client (s3Client )
169
+ .directoryTransferMaxConcurrency (DIRECTORY_TRANSFER_MAX_CONCURRENT_FILE_DOWNLOADS )
170
+ .build ()) {
154
171
155
172
Path shardDirPath = getShardDirPath (shardMetadata .getIndexId (), shardMetadata .getShardId ());
156
173
ensureS3LocalDirectoryExists (shardDirPath );
0 commit comments