@@ -365,7 +365,8 @@ protected void connectControlInput(IStreamingControlInterface controlInput, Comm
365365 {
366366 csHandler .connectCommandReceiver (new Subscriber <CommandEvent >() {
367367 Subscription sub ;
368- static final String ERROR_MESSAGE = "Error sending command to {}. Canceling command receiver subscription" ;
368+ static final String ERROR_MESSAGE = "Error dispatching command to {}. " ;
369+ static final String DISPATCH_STOP_MESSAGE = "No more commands will be processed." ;
369370
370371 @ Override
371372 public void onSubscribe (Subscription sub )
@@ -388,14 +389,16 @@ public void onNext(CommandEvent event)
388389 })
389390 .exceptionally (e -> {
390391 DefaultSystemRegistry .log .error (ERROR_MESSAGE , csHandler .csInfo .getFullName (), e );
391- sub .cancel ();
392- csHandler .sendStatus (event .getCorrelationID (), CommandStatus .failed (event .getCommand ().getID (), "Internal error processing command" ));
392+ csHandler .sendStatus (event .getCorrelationID (),
393+ CommandStatus .failed (event .getCommand ().getID (), "Internal error processing command" ));
394+ sub .request (1 );
393395 return null ; // return type is Void
394396 });
395397 }
396398 catch (Exception e )
397399 {
398- DefaultSystemRegistry .log .error (ERROR_MESSAGE , csHandler .csInfo .getFullName (), e );
400+ DefaultSystemRegistry .log .error (ERROR_MESSAGE + DISPATCH_STOP_MESSAGE ,
401+ csHandler .csInfo .getFullName (), e );
399402 sub .cancel ();
400403 }
401404 });
@@ -404,8 +407,8 @@ public void onNext(CommandEvent event)
404407 @ Override
405408 public void onError (Throwable e )
406409 {
407- DefaultSystemRegistry .log .error ("Error dispatching commands to {}. "
408- + "No more commands will be processed." , csHandler .csInfo .getFullName (), e );
410+ DefaultSystemRegistry .log .error (ERROR_MESSAGE + DISPATCH_STOP_MESSAGE ,
411+ csHandler .csInfo .getFullName (), e );
409412 }
410413
411414 @ Override
0 commit comments