@@ -365,7 +365,8 @@ protected void connectControlInput(IStreamingControlInterface controlInput, Comm
365365 {
366366 csHandler.connectCommandReceiver(new Subscriber<CommandEvent>() {
367367 Subscription sub;
368- static final String ERROR_MESSAGE = "Error sending command to {}. Canceling command receiver subscription" ;
368+ static final String ERROR_MESSAGE = "Error dispatching command to {}. ";
369+ static final String DISPATCH_STOP_MESSAGE = "No more commands will be processed.";
369370
370371 @Override
371372 public void onSubscribe(Subscription sub)
@@ -387,15 +388,17 @@ public void onNext(CommandEvent event)
387388 sub.request(1);
388389 })
389390 .exceptionally(e -> {
390- DefaultSystemRegistry .log .error (ERROR_MESSAGE , csHandler .csInfo .getFullName (), e );
391- sub .cancel ();
392- csHandler .sendStatus (event .getCorrelationID (), CommandStatus .failed (event .getCommand ().getID (), "Internal error processing command" ));
391+ DefaultSystemRegistry.log.error(ERROR_MESSAGE, csHandler.csInfo.getFullName());
392+ csHandler.sendStatus(event.getCorrelationID(),
393+ CommandStatus.failed(event.getCommand().getID(), "Internal error processing command"));
394+ sub.request(1);
393395 return null; // return type is Void
394396 });
395397 }
396398 catch (Exception e)
397399 {
398- DefaultSystemRegistry .log .error (ERROR_MESSAGE , csHandler .csInfo .getFullName (), e );
400+ DefaultSystemRegistry.log.error(ERROR_MESSAGE + DISPATCH_STOP_MESSAGE,
401+ csHandler.csInfo.getFullName(), e);
399402 sub.cancel();
400403 }
401404 });
@@ -404,8 +407,8 @@ public void onNext(CommandEvent event)
404407 @Override
405408 public void onError(Throwable e)
406409 {
407- DefaultSystemRegistry .log .error ("Error dispatching commands to {}. "
408- + "No more commands will be processed." , csHandler .csInfo .getFullName (), e );
410+ DefaultSystemRegistry.log.error(ERROR_MESSAGE + DISPATCH_STOP_MESSAGE,
411+ csHandler.csInfo.getFullName(), e);
409412 }
410413
411414 @Override
0 commit comments