@@ -150,25 +150,32 @@ public ProtocolAdapterWrapper(
150150 })
151151 .thenCompose (Function .identity ())
152152 .handle ((ignored , error ) -> {
153- if (error != null ) {
153+ if (error != null ) {
154154 log .error ("Error starting adapter" , error );
155155 stopAfterFailedStart ();
156156 protocolAdapterState .setRuntimeStatus (ProtocolAdapterState .RuntimeStatus .STOPPED );
157157 //we still return the initial error since that's the most significant information
158158 return CompletableFuture .failedFuture (error );
159159 } else {
160- return attemptStartingConsumers (writingEnabled , moduleServices .eventService ())
161- .map (startException -> {
160+ return attemptStartingConsumers (writingEnabled ,
161+ moduleServices .eventService ()).handle ((success , startException ) -> {
162+ if (startException == null ) {
163+ protocolAdapterState .setRuntimeStatus (ProtocolAdapterState .RuntimeStatus .STARTED );
164+ if (success ) {
165+ log .debug ("Successfully started adapter with id {}" , adapter .getId ());
166+ } else {
167+ log .debug ("Partially started adapter with id {}" , adapter .getId ());
168+ }
169+ } else {
162170 log .error ("Failed to start adapter with id {}" , adapter .getId (), startException );
163171 stopAfterFailedStart ();
164- protocolAdapterState .setRuntimeStatus (ProtocolAdapterState .RuntimeStatus .STOPPED );
165172 //we still return the initial error since that's the most significant information
166- return CompletableFuture . failedFuture ( startException );
167- })
168- . orElseGet (() -> {
169- protocolAdapterState . setRuntimeStatus ( ProtocolAdapterState . RuntimeStatus . STARTED );
170- return CompletableFuture . completedFuture ( null ) ;
171- });
173+ protocolAdapterState . setRuntimeStatus ( ProtocolAdapterState . RuntimeStatus . STOPPED );
174+ throw new RuntimeException ( "Failed to start adapter with id " + adapter . getId (),
175+ startException );
176+ }
177+ return success ;
178+ });
172179 }
173180 })
174181 .thenApply (ignored -> (Void )null )
@@ -201,7 +208,10 @@ private void stopAfterFailedStart() {
201208 }
202209 }
203210
204- private @ NotNull Optional <Throwable > attemptStartingConsumers (final boolean writingEnabled , final @ NotNull EventService eventService ) {
211+ private @ NotNull CompletableFuture <Boolean > attemptStartingConsumers (
212+ final boolean writingEnabled ,
213+ final @ NotNull EventService eventService ) {
214+ final CompletableFuture <Boolean > future = new CompletableFuture <>();
205215 try {
206216 //Adapter started successfully, now start the consumers
207217 createAndSubscribeTagConsumer ();
@@ -215,21 +225,24 @@ private void stopAfterFailedStart() {
215225 try {
216226 if (startWriting (protocolAdapterWritingService ).get ()) {
217227 log .info ("Successfully started adapter with id {}" , adapter .getId ());
228+ future .complete (true );
218229 } else {
219230 log .error ("Protocol adapter start failed as data hub is not available." );
231+ future .complete (false );
220232 }
221233 } catch (final Exception e ) {
222234 log .error ("Failed to start writing for adapter with id {}." , adapter .getId (), e );
235+ future .completeExceptionally (e );
223236 }
224237 }
225238 }
226239 });
227240 }
228241 } catch (final Throwable e ) {
229242 log .error ("Protocol adapter start failed" );
230- return Optional . of (e );
243+ future . completeExceptionally (e );
231244 }
232- return Optional . empty () ;
245+ return future ;
233246 }
234247
235248 public @ NotNull CompletableFuture <Void > stopAsync (final boolean destroy ) {
0 commit comments