Skip to content

Commit 8a51c29

Browse files
committed
Revert "refactor: Let attemptStartingConsumers() return CompletableFuture<Boolean> instead of Optional<Throwable>"
This reverts commit ce118b9.
1 parent 7f475c4 commit 8a51c29

File tree

1 file changed

+13
-26
lines changed

1 file changed

+13
-26
lines changed

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -150,32 +150,25 @@ public ProtocolAdapterWrapper(
150150
})
151151
.thenCompose(Function.identity())
152152
.handle((ignored, error) -> {
153-
if (error != null) {
153+
if(error != null) {
154154
log.error("Error starting adapter", error);
155155
stopAfterFailedStart();
156156
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
157157
//we still return the initial error since that's the most significant information
158158
return CompletableFuture.failedFuture(error);
159159
} else {
160-
return attemptStartingConsumers(writingEnabled,
161-
moduleServices.eventService()).handle((success, startException) -> {
162-
if (startException == null) {
163-
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
164-
if (success) {
165-
log.debug("Successfully started adapter with id {}", adapter.getId());
166-
} else {
167-
log.debug("Partially started adapter with id {}", adapter.getId());
168-
}
169-
} else {
160+
return attemptStartingConsumers(writingEnabled, moduleServices.eventService())
161+
.map(startException -> {
170162
log.error("Failed to start adapter with id {}", adapter.getId(), startException);
171163
stopAfterFailedStart();
172-
//we still return the initial error since that's the most significant information
173164
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
174-
throw new RuntimeException("Failed to start adapter with id " + adapter.getId(),
175-
startException);
176-
}
177-
return success;
178-
});
165+
//we still return the initial error since that's the most significant information
166+
return CompletableFuture.failedFuture(startException);
167+
})
168+
.orElseGet(() -> {
169+
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
170+
return CompletableFuture.completedFuture(null);
171+
});
179172
}
180173
})
181174
.thenApply(ignored -> (Void)null)
@@ -208,10 +201,7 @@ private void stopAfterFailedStart() {
208201
}
209202
}
210203

211-
private @NotNull CompletableFuture<Boolean> attemptStartingConsumers(
212-
final boolean writingEnabled,
213-
final @NotNull EventService eventService) {
214-
final CompletableFuture<Boolean> future = new CompletableFuture<>();
204+
private @NotNull Optional<Throwable> attemptStartingConsumers(final boolean writingEnabled, final @NotNull EventService eventService) {
215205
try {
216206
//Adapter started successfully, now start the consumers
217207
createAndSubscribeTagConsumer();
@@ -225,24 +215,21 @@ private void stopAfterFailedStart() {
225215
try {
226216
if (startWriting(protocolAdapterWritingService).get()) {
227217
log.info("Successfully started adapter with id {}", adapter.getId());
228-
future.complete(true);
229218
} else {
230219
log.error("Protocol adapter start failed as data hub is not available.");
231-
future.complete(false);
232220
}
233221
} catch (final Exception e) {
234222
log.error("Failed to start writing for adapter with id {}.", adapter.getId(), e);
235-
future.completeExceptionally(e);
236223
}
237224
}
238225
}
239226
});
240227
}
241228
} catch (final Throwable e) {
242229
log.error("Protocol adapter start failed");
243-
future.completeExceptionally(e);
230+
return Optional.of(e);
244231
}
245-
return future;
232+
return Optional.empty();
246233
}
247234

248235
public @NotNull CompletableFuture<Void> stopAsync(final boolean destroy) {

0 commit comments

Comments
 (0)