Skip to content

Commit ba83d74

Browse files
authored
Merge pull request #581 from IABTechLab/ian-UID2-6345-enable-upload-only-optout-producer
add upload only option to OptOutCloudSync
2 parents d2fe0f1 + 1efff01 commit ba83d74

File tree

3 files changed

+54
-7
lines changed

3 files changed

+54
-7
lines changed

src/main/java/com/uid2/shared/optout/OptOutCloudSync.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class OptOutCloudSync implements ICloudSync {
2727
private static final Logger LOGGER = LoggerFactory.getLogger(OptOutCloudSync.class);
2828

2929
private final boolean fullSync;
30+
private final boolean uploadOnly;
3031
private final String cloudFolder;
3132
private final String deltaConsumerDir;
3233
private final String partitionConsumerDir;
@@ -48,7 +49,12 @@ public class OptOutCloudSync implements ICloudSync {
4849
private AtomicReference<List<Consumer<Collection<String>>>> handlersNewCloudPaths = new AtomicReference<>(new ArrayList<>());
4950

5051
public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync) {
52+
this(jsonConfig, fullSync, false);
53+
}
54+
55+
public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync, boolean uploadOnly) {
5156
this.fullSync = fullSync;
57+
this.uploadOnly = uploadOnly;
5258
this.cloudFolder = CloudUtils.normalizDirPath(jsonConfig.getString(Const.Config.OptOutS3FolderProp));
5359
this.deltaConsumerDir = OptOutUtils.getDeltaConsumerDir(jsonConfig);
5460
this.partitionConsumerDir = OptOutUtils.getPartitionConsumerDir(jsonConfig);
@@ -84,6 +90,19 @@ public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync) {
8490
this.mkdirsBlocking();
8591
}
8692

93+
/**
94+
* Creates an upload-only OptOutCloudSync instance.
95+
* This skips all download/refresh operations.
96+
*/
97+
public static OptOutCloudSync createUploadOnly(JsonObject jsonConfig, boolean fullSync) {
98+
return new OptOutCloudSync(jsonConfig, fullSync, true);
99+
}
100+
101+
@Override
102+
public boolean isUploadOnly() {
103+
return this.uploadOnly;
104+
}
105+
87106
@Override
88107
public String toCloudPath(String path) {
89108
if (OptOutUtils.isDeltaFile(path)) {
@@ -121,6 +140,11 @@ public String toLocalPath(String path) {
121140

122141
@Override
123142
public boolean refresh(Instant now, ICloudStorage fsCloud, ICloudStorage fsLocal, Consumer<Set<String>> handleDownloads, Consumer<Set<String>> handleDeletes) throws CloudStorageException {
143+
// In upload-only mode, skip all download/sync operations
144+
if (uploadOnly) {
145+
return true;
146+
}
147+
124148
// list local cached paths
125149
List<String> cachedPathList = new ArrayList<>();
126150
localListFiles(fsLocal, this.deltaConsumerDir, OptOutUtils.prefixDeltaFile, cachedPathList);

src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,19 +167,29 @@ public void start(Promise<Void> promise) {
167167
this.uploadExecutor = vertx.createSharedWorkerExecutor("cloudsync-" + name + "-upload-pool",
168168
this.uploadThreads);
169169

170-
// handle refresh event
171-
vertx.eventBus().consumer(
172-
eventRefresh,
173-
o -> this.handleRefresh(o));
170+
// handle refresh event (skip if upload-only)
171+
if (!cloudSync.isUploadOnly()) {
172+
vertx.eventBus().consumer(
173+
eventRefresh,
174+
o -> this.handleRefresh(o));
175+
} else {
176+
LOGGER.info("CloudSyncVerticle." + name + " is upload-only, skipping refresh event handler registration");
177+
}
174178

175179
// upload to cloud
176180
vertx.eventBus().<String>consumer(
177181
this.eventUpload,
178182
msg -> this.handleUpload(msg));
179183

180-
cloudRefresh()
181-
.onFailure(t -> LOGGER.error("cloudRefresh failed: " + t.getMessage(), new Exception(t)))
182-
.onComplete(ar -> promise.handle(ar));
184+
// initial refresh (skip if upload-only)
185+
if (!cloudSync.isUploadOnly()) {
186+
cloudRefresh()
187+
.onFailure(t -> LOGGER.error("cloudRefresh failed: " + t.getMessage(), new Exception(t)))
188+
.onComplete(ar -> promise.handle(ar));
189+
} else {
190+
LOGGER.info("CloudSyncVerticle." + name + " is upload-only, skipping initial refresh");
191+
promise.complete();
192+
}
183193

184194
promise.future()
185195
.onSuccess(v -> {
@@ -214,6 +224,10 @@ public String eventDownloaded() {
214224
}
215225

216226
private void handleRefresh(Message m) {
227+
if (cloudSync.isUploadOnly()) {
228+
LOGGER.warn("handleRefresh called but this is upload-only mode");
229+
return;
230+
}
217231
cloudRefresh()
218232
.onSuccess(t -> this.storeRefreshIsFailing.set(0))
219233
.onFailure(t -> {

src/main/java/com/uid2/shared/vertx/ICloudSync.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,13 @@ public interface ICloudSync {
1515
boolean refresh(Instant now, ICloudStorage fsCloud, ICloudStorage fsLocal,
1616
Consumer<Set<String>> handleDownloads, Consumer<Set<String>> handleDeletes)
1717
throws CloudStorageException;
18+
19+
/**
20+
* Returns true if this CloudSync instance only supports upload operations.
21+
* When true, download/refresh operations will be skipped.
22+
* Default is false for backward compatibility.
23+
*/
24+
default boolean isUploadOnly() {
25+
return false;
26+
}
1827
}

0 commit comments

Comments
 (0)