Skip to content

Commit eea2b66

Browse files
authored
Merge pull request #554 from IABTechLab/wzh-UID2-6108-increase-thread-number-optout
deprecated methods and make optoout files truly parallel download
2 parents 2a38942 + 36df8ba commit eea2b66

File tree

2 files changed

+28
-42
lines changed

2 files changed

+28
-42
lines changed

src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,10 @@ private Future<Void> cloudRefresh() {
232232

233233
Promise<Void> refreshPromise = Promise.promise();
234234
this.isRefreshing = true;
235-
vertx.<Void>executeBlocking(blockBromise -> {
235+
vertx.executeBlocking(() -> {
236236
this.cloudRefreshEnsureInSync(refreshPromise, 0);
237-
blockBromise.complete();
238-
}, ar -> {});
237+
return null;
238+
});
239239

240240
return refreshPromise.future()
241241
.onComplete(v -> {
@@ -320,12 +320,13 @@ private void handleUpload(Message<String> msg) {
320320
this.pendingUpload.add(fileToUpload);
321321
}
322322

323-
this.uploadExecutor.<Void>executeBlocking(
324-
promise -> this.cloudUploadBlocking(promise, msg.body()),
325-
ar -> {
326-
this.pendingUpload.remove(fileToUpload);
327-
this.handleAsyncResult(ar);
328-
msg.reply(ar.succeeded());
323+
this.uploadExecutor.executeBlocking(() -> {
324+
this.cloudUploadBlocking(msg.body());
325+
return null;
326+
}).onComplete(ar -> {
327+
this.pendingUpload.remove(fileToUpload);
328+
this.handleAsyncResult(ar);
329+
msg.reply(ar.succeeded());
329330

330331
// increase counter
331332
if (ar.succeeded()) {
@@ -338,16 +339,10 @@ private void handleUpload(Message<String> msg) {
338339
});
339340
}
340341

341-
private void cloudUploadBlocking(Promise<Void> promise, String fileToUpload) {
342-
try {
343-
String cloudPath = this.cloudSync.toCloudPath(fileToUpload);
344-
try (InputStream localInput = this.localStorage.download(fileToUpload)) {
345-
this.cloudStorage.upload(localInput, cloudPath);
346-
}
347-
promise.complete();
348-
} catch (Exception ex) {
349-
LOGGER.error(ex.getMessage(), ex);
350-
promise.fail(new Throwable(ex));
342+
private void cloudUploadBlocking(String fileToUpload) throws Exception {
343+
String cloudPath = this.cloudSync.toCloudPath(fileToUpload);
344+
try (InputStream localInput = this.localStorage.download(fileToUpload)) {
345+
this.cloudStorage.upload(localInput, cloudPath);
351346
}
352347
}
353348

@@ -364,9 +359,10 @@ private Future<Void> cloudDownloadFile(String s3Path) {
364359
}
365360

366361
Promise<Void> promise = Promise.promise();
367-
this.downloadExecutor.<Void>executeBlocking(
368-
blockingPromise -> this.cloudDownloadBlocking(blockingPromise, s3Path),
369-
ar -> {
362+
this.downloadExecutor.executeBlocking(() -> {
363+
this.cloudDownloadBlocking(s3Path);
364+
return null;
365+
}, false).onComplete(ar -> {
370366
this.pendingDownload.remove(s3Path);
371367
this.handleAsyncResult(ar);
372368
promise.complete();
@@ -385,7 +381,7 @@ private Future<Void> cloudDownloadFile(String s3Path) {
385381
return promise.future();
386382
}
387383

388-
private void cloudDownloadBlocking(Promise<Void> promise, String s3Path) {
384+
private void cloudDownloadBlocking(String s3Path) throws Exception {
389385
final long cloudDownloadStart = System.nanoTime();
390386
try {
391387
String localPath = this.cloudSync.toLocalPath(s3Path);
@@ -398,15 +394,14 @@ private void cloudDownloadBlocking(Promise<Void> promise, String s3Path) {
398394
downloadSuccessTimer.record(java.time.Duration.ofMillis(cloudDownloadTimeMs));
399395
LOGGER.info("S3 download completed: {} in {} ms", cloudStorage.mask(s3Path), cloudDownloadTimeMs);
400396
}
401-
promise.complete();
402397
} catch (Exception ex) {
403398
final long cloudDownloadEnd = System.nanoTime();
404399
final long cloudDownloadTimeMs = (cloudDownloadEnd - cloudDownloadStart) / 1_000_000;
405400

406401
downloadFailureTimer.record(java.time.Duration.ofMillis(cloudDownloadTimeMs));
407402
// Be careful as the s3Path may contain the pre-signed S3 token, so do not log the whole path
408403
LOGGER.error("download error: " + ex.getClass().getSimpleName());
409-
promise.fail(new Throwable(ex));
404+
throw new CloudStorageException("Download failed");
410405
}
411406
}
412407

src/main/java/com/uid2/shared/vertx/RotatingStoreVerticle.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,10 @@ public void start(Promise<Void> startPromise) throws Exception {
8383
private void startRefresh(Promise<Void> promise) {
8484
LOGGER.info("Starting " + this.storeName + " loading");
8585
final long startupRefreshStart = System.nanoTime();
86-
vertx.executeBlocking(p -> {
87-
try {
88-
this.refresh();
89-
p.complete();
90-
} catch (Exception e) {
91-
p.fail(e);
92-
}
93-
}, ar -> {
86+
vertx.executeBlocking(() -> {
87+
this.refresh();
88+
return null;
89+
}).onComplete(ar -> {
9490
final long startupRefreshEnd = System.nanoTime();
9591
final long startupRefreshTimeMs = (startupRefreshEnd - startupRefreshStart) / 1000000;
9692

@@ -112,15 +108,10 @@ private void startBackgroundRefresh() {
112108
vertx.setPeriodic(this.refreshIntervalMs, (id) -> {
113109
final long start = System.nanoTime();
114110

115-
vertx.executeBlocking(promise -> {
116-
try {
117-
this.refresh();
118-
promise.complete();
119-
} catch (Exception e) {
120-
promise.fail(e);
121-
}
122-
},
123-
asyncResult -> {
111+
vertx.executeBlocking(() -> {
112+
this.refresh();
113+
return null;
114+
}).onComplete(asyncResult -> {
124115
final long end = System.nanoTime();
125116
final long elapsed = ((end - start) / 1000000);
126117
this.counterStoreRefreshTimeMs.increment(elapsed);

0 commit comments

Comments
 (0)