Skip to content

Commit 8919691

Browse files
committed
[XEN-3146-migrate] only log progress events every N bytes
1 parent aa3be5f commit 8919691

File tree

4 files changed

+58
-54
lines changed

4 files changed

+58
-54
lines changed

integration-tests/src/test/resources/solr.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
<bool name="s3.path.style.access.enabled">${S3_PATH_STYLE_ACCESS_ENABLED:false}</bool>
1515
<bool name="s3.client.checksumValidationEnabled">${S3_CLIENT_CHECKSUM_VALIDATION_ENABLED:true}</bool>
16+
<int name="s3.client.progressLogByteInterval">${S3_CLIENT_PROGRESS_LOG_BYTE_INTERVAL:4194304}</int>
1617
</repository>
1718
</backup>
1819
</solr>

solr-backup/src/main/java/eu/xenit/solr/backup/s3/S3BackupRepositoryConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class S3BackupRepositoryConfig {
3737
public static final String S3_CLIENT_CHECKSUM_VALIDATION_ENABLED = "s3.client.checksum.validation.enabled";
3838
public static final String S3_PROXY_HOST = "s3.proxy.host";
3939
public static final String S3_PROXY_PORT = "s3.proxy.port";
40+
public static final String S3_CLIENT_PROGRESS_LOG_BYTE_INTERVAL = "s3.client.progressLogByteInterval";
4041

4142
private final String bucketName;
4243

@@ -56,6 +57,8 @@ public class S3BackupRepositoryConfig {
5657

5758
private final Boolean checksumValidationEnabled;
5859

60+
private final Integer progressLogByteInterval;
61+
5962

6063
public S3BackupRepositoryConfig(NamedList<?> config) {
6164
region = getStringConfig(config, S3_REGION);
@@ -67,13 +70,14 @@ public S3BackupRepositoryConfig(NamedList<?> config) {
6770
secretKey = getStringConfig(config, S3_SECRET_KEY);
6871
pathStyleAccessEnabled = getBooleanConfig(config, S3_PATH_STYLE_ACCESS_ENABLED);
6972
checksumValidationEnabled = getBooleanConfig(config, S3_CLIENT_CHECKSUM_VALIDATION_ENABLED);
73+
progressLogByteInterval = getIntConfig(config, S3_CLIENT_PROGRESS_LOG_BYTE_INTERVAL);
7074
}
7175

7276
/**
7377
* @return a {@link S3StorageClient} from the provided config.
7478
*/
7579
public S3StorageClient buildClient() throws URISyntaxException {
76-
return new S3StorageClient(bucketName, region, proxyHost, proxyPort, endpoint, accessKey, secretKey, pathStyleAccessEnabled, checksumValidationEnabled);
80+
return new S3StorageClient(this);
7781
}
7882

7983
private static String getStringConfig(NamedList<?> config, String property) {

solr-backup/src/main/java/eu/xenit/solr/backup/s3/S3OutputStream.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,22 @@ public class S3OutputStream extends OutputStream {
5555
static final int MIN_PART_SIZE = 5242880;
5656

5757
private final S3Client s3Client;
58-
private final String bucketName;
5958
private final String key;
60-
// TODO: upgrade from 1.x sdk
61-
// private final SyncProgressListener progressListener;
6259
private volatile boolean closed;
6360
private final ByteBuffer buffer;
6461
private MultipartUpload multiPartUpload;
62+
private final S3BackupRepositoryConfig configuration;
6563

66-
public S3OutputStream(S3Client s3Client, String key, String bucketName) {
64+
public S3OutputStream(S3Client s3Client, String key, S3BackupRepositoryConfig configuration) {
6765
this.s3Client = s3Client;
68-
this.bucketName = bucketName;
66+
this.configuration = configuration;
6967
this.key = key;
7068
this.closed = false;
7169
this.buffer = ByteBuffer.allocate(PART_SIZE);
72-
// TODO: upgrade from 1.x sdk
73-
//this.progressListener = new ConnectProgressListener();
7470
this.multiPartUpload = null;
7571

7672
if (log.isDebugEnabled()) {
77-
log.debug("Created S3OutputStream for bucketName '{}' key '{}'", bucketName, key);
73+
log.debug("Created S3OutputStream for bucketName '{}' key '{}'", this.configuration.getBucketName(), key);
7874
}
7975
}
8076

@@ -134,7 +130,7 @@ private void uploadPart(boolean isLastPart) throws IOException {
134130

135131
if (multiPartUpload == null) {
136132
if (log.isDebugEnabled()) {
137-
log.debug("New multi-part upload for bucketName '{}' key '{}'", bucketName, key);
133+
log.debug("New multi-part upload for bucketName '{}' key '{}'", this.configuration.getBucketName(), key);
138134
}
139135
multiPartUpload = newMultipartUpload();
140136
}
@@ -144,7 +140,7 @@ private void uploadPart(boolean isLastPart) throws IOException {
144140
if (multiPartUpload != null) {
145141
multiPartUpload.abort();
146142
if (log.isDebugEnabled()) {
147-
log.debug("Multipart upload aborted for bucketName '{}' key '{}'.", bucketName, key);
143+
log.debug("Multipart upload aborted for bucketName '{}' key '{}'.", this.configuration.getBucketName(), key);
148144
}
149145
}
150146
throw new S3Exception("Part upload failed: ", e);
@@ -187,7 +183,7 @@ public void close() throws IOException {
187183
private MultipartUpload newMultipartUpload() throws IOException {
188184
CreateMultipartUploadRequest initRequest =
189185
CreateMultipartUploadRequest.builder()
190-
.bucket(bucketName)
186+
.bucket(this.configuration.getBucketName())
191187
.key(key)
192188
.build();
193189
try {
@@ -207,7 +203,7 @@ public MultipartUpload(String uploadId) {
207203
if (log.isDebugEnabled()) {
208204
log.debug(
209205
"Initiated multi-part upload for bucketName '{}' key '{}' with id '{}'",
210-
bucketName,
206+
configuration.getBucketName(),
211207
key,
212208
uploadId);
213209
}
@@ -224,15 +220,24 @@ void uploadPart(ByteArrayInputStream inputStream, int partSize) {
224220
* - Pass `contentLength` to request
225221
* - Wrap the input stream into a progress listening input stream.
226222
*/
227-
Consumer<Long> progressListener = (bytesTransferred) -> {
228-
log.debug("Progress: {} bytes", bytesTransferred);
223+
Consumer<Long> progressListener = new Consumer<>() {
224+
private Long bytesSinceLastCheckpoint = 0L;
225+
226+
public void accept(Long bytesTransferred) {
227+
// Only log every interval of bytes, instead of each event
228+
bytesSinceLastCheckpoint += bytesTransferred;
229+
if (bytesSinceLastCheckpoint > configuration.getProgressLogByteInterval()) {
230+
log.debug("Progress: {} bytes", bytesTransferred);
231+
bytesSinceLastCheckpoint = 0L;
232+
}
233+
}
229234
};
230235
InputStream trackedStream = new ProgressTrackingInputStream(inputStream, progressListener);
231236
RequestBody body = RequestBody.fromInputStream(trackedStream, partSize);
232237

233238
UploadPartRequest request =
234239
UploadPartRequest.builder()
235-
.bucket(bucketName)
240+
.bucket(configuration.getBucketName())
236241
.key(key)
237242
.uploadId(uploadId)
238243
.partNumber(currentPartNumber)
@@ -259,7 +264,7 @@ void complete() {
259264
}
260265
CompleteMultipartUploadRequest completeRequest =
261266
CompleteMultipartUploadRequest.builder()
262-
.bucket(bucketName)
267+
.bucket(configuration.getBucketName())
263268
.key(key)
264269
.uploadId(uploadId)
265270
.multipartUpload(
@@ -277,7 +282,7 @@ public void abort() {
277282
try {
278283
s3Client.abortMultipartUpload(AbortMultipartUploadRequest
279284
.builder()
280-
.bucket(bucketName)
285+
.bucket(configuration.getBucketName())
281286
.key(key)
282287
.uploadId(uploadId)
283288
.build());

solr-backup/src/main/java/eu/xenit/solr/backup/s3/S3StorageClient.java

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -93,36 +93,24 @@ class S3StorageClient {
9393
private static final Set<String> NOT_FOUND_CODES = Set.of("NoSuchKey", "404 Not Found");
9494

9595
private final S3Client s3Client;
96+
private final S3BackupRepositoryConfig configuration;
9697

97-
/**
98-
* The S3 bucket where we read/write all data.
99-
*/
100-
private final String bucketName;
101-
102-
S3StorageClient(
103-
String bucketName, String region, String proxyHost, int proxyPort, String endpoint, String accessKey, String secretKey, Boolean pathStyleAccessEnabled, Boolean checksumValidationEnabled) throws URISyntaxException {
104-
this(createInternalClient(region, proxyHost, proxyPort, endpoint, accessKey, secretKey, pathStyleAccessEnabled, checksumValidationEnabled), bucketName);
98+
S3StorageClient(S3BackupRepositoryConfig config) throws URISyntaxException {
99+
this(createInternalClient(config), config);
105100
}
106101

107102
@VisibleForTesting
108-
S3StorageClient(S3Client s3Client, String bucketName) {
103+
S3StorageClient(S3Client s3Client, S3BackupRepositoryConfig configuration) {
109104
this.s3Client = s3Client;
110-
this.bucketName = bucketName;
105+
this.configuration = configuration;
111106
}
112107

113-
private static S3Client createInternalClient(
114-
String region,
115-
String proxyHost,
116-
int proxyPort,
117-
String endpoint,
118-
String accessKey,
119-
String secretKey, Boolean pathStyleAccessEnabled,
120-
Boolean checksumValidationEnabled) throws URISyntaxException {
108+
private static S3Client createInternalClient(S3BackupRepositoryConfig config) throws URISyntaxException {
121109

122110
S3ClientBuilder clientBuilder = S3Client.builder();
123111

124112
S3Configuration configuration = S3Configuration.builder()
125-
.checksumValidationEnabled(checksumValidationEnabled)
113+
.checksumValidationEnabled(config.getChecksumValidationEnabled())
126114
.build();
127115
clientBuilder.serviceConfiguration(configuration);
128116

@@ -131,9 +119,9 @@ private static S3Client createInternalClient(
131119
* not on a general client configuration object.
132120
*/
133121
ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder();
134-
if (!StringUtils.isEmpty(proxyHost)) {
122+
if (!StringUtils.isEmpty(config.getProxyHost())) {
135123
ProxyConfiguration.Builder proxyConfigBuilder = ProxyConfiguration.builder()
136-
.endpoint(URI.create(proxyHost + ":" + proxyPort));
124+
.endpoint(URI.create(config.getProxyHost() + ":" + config.getProxyPort()));
137125
httpClientBuilder.proxyConfiguration(proxyConfigBuilder.build());
138126
}
139127
clientBuilder.httpClientBuilder(httpClientBuilder);
@@ -145,9 +133,9 @@ private static S3Client createInternalClient(
145133
*/
146134
clientBuilder.overrideConfiguration(ClientOverrideConfiguration.builder().build());
147135

148-
if (!(StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey))) {
136+
if (!(StringUtils.isEmpty(config.getAccessKey()) || StringUtils.isEmpty(config.getSecretKey()))) {
149137
clientBuilder.credentialsProvider(
150-
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
138+
StaticCredentialsProvider.create(AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey())));
151139
} else {
152140
log.info("No accessKey or secretKey configured, using default credentials provider chain");
153141
}
@@ -156,17 +144,17 @@ private static S3Client createInternalClient(
156144
* SDK v2 Migration: `setEndpointConfiguration` from v1 is replaced by
157145
* `endpointOverride`. The region must still be set separately.
158146
*/
159-
if (!StringUtils.isEmpty(endpoint)) {
160-
clientBuilder.endpointOverride(new URI(endpoint));
147+
if (!StringUtils.isEmpty(config.getEndpoint())) {
148+
clientBuilder.endpointOverride(new URI(config.getEndpoint()));
161149
}
162-
clientBuilder.region(Region.of(region));
150+
clientBuilder.region(Region.of(config.getRegion()));
163151

164152
/*
165153
* SDK v2 Migration: The method `withPathStyleAccessEnabled(boolean)` from v1 is
166154
* replaced by `forcePathStyle(boolean)` in v2.
167155
*/
168-
if (pathStyleAccessEnabled != null) {
169-
clientBuilder.forcePathStyle(pathStyleAccessEnabled);
156+
if (config.getPathStyleAccessEnabled() != null) {
157+
clientBuilder.forcePathStyle(config.getPathStyleAccessEnabled());
170158
}
171159

172160
return clientBuilder.build();
@@ -192,7 +180,7 @@ void createDirectory(String path) throws S3Exception {
192180
* In v2, request parameters like bucket and key are set using builder methods.
193181
*/
194182
PutObjectRequest putRequest = PutObjectRequest.builder()
195-
.bucket(bucketName)
183+
.bucket(this.configuration.getBucketName())
196184
.key(path)
197185
.contentType(S3_DIR_CONTENT_TYPE)
198186
.metadata(Collections.singletonMap("Content-Type", S3_DIR_CONTENT_TYPE))
@@ -257,7 +245,7 @@ String[] listDir(String path) throws S3Exception {
257245

258246
// The request MUST include the delimiter.
259247
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
260-
.bucket(bucketName)
248+
.bucket(this.configuration.getBucketName())
261249
.prefix(prefix)
262250
.delimiter(S3_FILE_PATH_DELIMITER)
263251
.build();
@@ -312,7 +300,7 @@ HeadObjectResponse getObjectMetadata(String path) throws SdkException {
312300
* This is the standard v2 way to retrieve object metadata without fetching the object's content.
313301
*/
314302
HeadObjectRequest request = HeadObjectRequest.builder()
315-
.bucket(bucketName)
303+
.bucket(this.configuration.getBucketName())
316304
.key(path)
317305
.build();
318306
return s3Client.headObject(request);
@@ -418,7 +406,10 @@ InputStream pullStream(String path) throws S3Exception {
418406
path = sanitizedFilePath(path);
419407

420408
try {
421-
ResponseInputStream<GetObjectResponse> requestedObject = s3Client.getObject(GetObjectRequest.builder().bucket(bucketName).key(path)
409+
ResponseInputStream<GetObjectResponse> requestedObject = s3Client
410+
.getObject(GetObjectRequest.builder()
411+
.bucket(this.configuration.getBucketName())
412+
.key(path)
422413
.build());
423414
// This InputStream instance needs to be closed by the caller
424415
return requestedObject;
@@ -441,7 +432,7 @@ OutputStream pushStream(String path) throws S3Exception {
441432
}
442433

443434
try {
444-
return new S3OutputStream(s3Client, path, bucketName);
435+
return new S3OutputStream(s3Client, path, this.configuration);
445436
} catch (SdkException ase) {
446437
throw handleAmazonException(ase);
447438
}
@@ -519,7 +510,10 @@ Collection<String> deleteObjects(Collection<String> entries, int batchSize) thro
519510
if (deleteIndividually) {
520511
for (ObjectIdentifier k : keysToDelete) {
521512
try {
522-
s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(k.key())
513+
s3Client.deleteObject(DeleteObjectRequest
514+
.builder()
515+
.bucket(this.configuration.getBucketName())
516+
.key(k.key())
523517
.build());
524518
deletedPaths.add(k.key());
525519
} catch (SdkException e) {
@@ -541,7 +535,7 @@ private DeleteObjectsRequest createBatchDeleteRequest(List<ObjectIdentifier> key
541535
.build();
542536

543537
return DeleteObjectsRequest.builder()
544-
.bucket(bucketName)
538+
.bucket(this.configuration.getBucketName())
545539
.delete(deleteAction)
546540
.build();
547541
}
@@ -554,7 +548,7 @@ private List<String> listAll(String path) throws S3Exception {
554548
* `ListObjectsV2Request`.
555549
*/
556550
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
557-
.bucket(bucketName)
551+
.bucket(this.configuration.getBucketName())
558552
.prefix(prefix)
559553
.build();
560554

0 commit comments

Comments
 (0)