Skip to content

Commit ce118b9

Browse files
committed
refactor: Let attemptStartingConsumers() return CompletableFuture<Boolean> instead of Optional<Throwable>
1 parent 0e53804 commit ce118b9

File tree

1 file changed

+26
-13
lines changed

1 file changed

+26
-13
lines changed

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,32 @@ public ProtocolAdapterWrapper(
150150
})
151151
.thenCompose(Function.identity())
152152
.handle((ignored, error) -> {
153-
if(error != null) {
153+
if (error != null) {
154154
log.error("Error starting adapter", error);
155155
stopAfterFailedStart();
156156
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
157157
//we still return the initial error since that's the most significant information
158158
return CompletableFuture.failedFuture(error);
159159
} else {
160-
return attemptStartingConsumers(writingEnabled, moduleServices.eventService())
161-
.map(startException -> {
160+
return attemptStartingConsumers(writingEnabled,
161+
moduleServices.eventService()).handle((success, startException) -> {
162+
if (startException == null) {
163+
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
164+
if (success) {
165+
log.debug("Successfully started adapter with id {}", adapter.getId());
166+
} else {
167+
log.debug("Partially started adapter with id {}", adapter.getId());
168+
}
169+
} else {
162170
log.error("Failed to start adapter with id {}", adapter.getId(), startException);
163171
stopAfterFailedStart();
164-
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
165172
//we still return the initial error since that's the most significant information
166-
return CompletableFuture.failedFuture(startException);
167-
})
168-
.orElseGet(() -> {
169-
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
170-
return CompletableFuture.completedFuture(null);
171-
});
173+
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
174+
throw new RuntimeException("Failed to start adapter with id " + adapter.getId(),
175+
startException);
176+
}
177+
return success;
178+
});
172179
}
173180
})
174181
.thenApply(ignored -> (Void)null)
@@ -201,7 +208,10 @@ private void stopAfterFailedStart() {
201208
}
202209
}
203210

204-
private @NotNull Optional<Throwable> attemptStartingConsumers(final boolean writingEnabled, final @NotNull EventService eventService) {
211+
private @NotNull CompletableFuture<Boolean> attemptStartingConsumers(
212+
final boolean writingEnabled,
213+
final @NotNull EventService eventService) {
214+
final CompletableFuture<Boolean> future = new CompletableFuture<>();
205215
try {
206216
//Adapter started successfully, now start the consumers
207217
createAndSubscribeTagConsumer();
@@ -215,21 +225,24 @@ private void stopAfterFailedStart() {
215225
try {
216226
if (startWriting(protocolAdapterWritingService).get()) {
217227
log.info("Successfully started adapter with id {}", adapter.getId());
228+
future.complete(true);
218229
} else {
219230
log.error("Protocol adapter start failed as data hub is not available.");
231+
future.complete(false);
220232
}
221233
} catch (final Exception e) {
222234
log.error("Failed to start writing for adapter with id {}.", adapter.getId(), e);
235+
future.completeExceptionally(e);
223236
}
224237
}
225238
}
226239
});
227240
}
228241
} catch (final Throwable e) {
229242
log.error("Protocol adapter start failed");
230-
return Optional.of(e);
243+
future.completeExceptionally(e);
231244
}
232-
return Optional.empty();
245+
return future;
233246
}
234247

235248
public @NotNull CompletableFuture<Void> stopAsync(final boolean destroy) {

0 commit comments

Comments
 (0)