5959import com .fasterxml .jackson .databind .ObjectMapper ;
6060import io .netty .buffer .ByteBuf ;
6161import io .netty .buffer .Unpooled ;
62+ import kotlin .Unit ;
63+ import kotlin .jvm .functions .Function1 ;
64+ import org .apache .commons .lang3 .StringUtils ;
65+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
66+ import org .jetbrains .annotations .NotNull ;
67+ import org .jetbrains .annotations .Nullable ;
68+ import org .slf4j .Logger ;
69+ import org .slf4j .LoggerFactory ;
70+
6271import java .io .DataInputStream ;
6372import java .io .DataOutputStream ;
6473import java .io .IOException ;
96105import java .util .function .Consumer ;
97106import java .util .stream .Collectors ;
98107
99- import kotlin .Unit ;
100- import kotlin .jvm .functions .Function1 ;
101- import org .apache .commons .lang3 .StringUtils ;
102- import org .apache .commons .lang3 .exception .ExceptionUtils ;
103- import org .jetbrains .annotations .NotNull ;
104- import org .jetbrains .annotations .Nullable ;
105- import org .slf4j .Logger ;
106- import org .slf4j .LoggerFactory ;
107-
108108import static com .exactpro .th2 .common .event .EventUtils .createMessageBean ;
109109import static com .exactpro .th2 .conn .dirty .fix .FixByteBufUtilKt .findField ;
110110import static com .exactpro .th2 .conn .dirty .fix .FixByteBufUtilKt .findLastField ;
181181import static com .exactpro .th2 .netty .bytebuf .util .ByteBufUtil .asExpandable ;
182182import static com .exactpro .th2 .netty .bytebuf .util .ByteBufUtil .indexOf ;
183183import static com .exactpro .th2 .netty .bytebuf .util .ByteBufUtil .isEmpty ;
184- import static com .exactpro .th2 .netty .bytebuf .util .ByteBufUtil .startsWith ;
185184import static com .exactpro .th2 .util .MessageUtil .findByte ;
186185import static com .exactpro .th2 .util .MessageUtil .getBodyLength ;
187186import static com .exactpro .th2 .util .MessageUtil .getChecksum ;
@@ -392,11 +391,11 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
392391 try {
393392 sendingTimeoutHandler .getWithTimeout (channel .open ());
394393 } catch (TimeoutException e ) {
395- ExceptionUtils .rethrow (new TimeoutException (
394+ ExceptionUtils .asRuntimeException (new TimeoutException (
396395 String .format ("could not open connection before timeout %d mls elapsed" ,
397396 currentTimeout )));
398397 } catch (Exception e ) {
399- ExceptionUtils .rethrow (e );
398+ ExceptionUtils .asRuntimeException (e );
400399 }
401400 }
402401
@@ -411,7 +410,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
411410 }
412411 if (System .currentTimeMillis () > deadline ) {
413412 // The method should have checked exception in signature...
414- ExceptionUtils .rethrow (new TimeoutException (String .format ("session was not established within %d mls" ,
413+ ExceptionUtils .asRuntimeException (new TimeoutException (String .format ("session was not established within %d mls" ,
415414 settings .getConnectionTimeoutOnSend ())));
416415 }
417416 }
@@ -426,7 +425,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
426425 }
427426 if (System .currentTimeMillis () > deadline ) {
428427 // The method should have checked exception in signature...
429- ExceptionUtils .rethrow (new TimeoutException (String .format ("session was not established within %d mls" ,
428+ ExceptionUtils .asRuntimeException (new TimeoutException (String .format ("session was not established within %d mls" ,
430429 settings .getConnectionTimeoutOnSend ())));
431430 }
432431 }
@@ -496,7 +495,7 @@ public ByteBuf onReceive(@NotNull IChannel channel, @NotNull ByteBuf buffer) {
496495
497496 @ NotNull
498497 @ Override
499- public Map <String , String > onIncoming (@ NotNull IChannel channel , @ NotNull ByteBuf message , MessageID messageId ) {
498+ public Map <String , String > onIncoming (@ NotNull IChannel channel , @ NotNull ByteBuf message , @ NotNull MessageID messageId ) {
500499 Map <String , String > metadata = new HashMap <>();
501500
502501 StrategyState state = strategy .getState ();
@@ -537,7 +536,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
537536 if (msgTypeValue .equals (MSG_TYPE_LOGOUT )) {
538537 serverMsgSeqNum .incrementAndGet ();
539538 state .addMessageID (messageId );
540- strategy .getIncomingMessageStrategy (x -> x . getLogoutStrategy () ).process (message , metadata );
539+ strategy .getIncomingMessageStrategy (IncomingMessagesStrategy :: getLogoutStrategy ).process (message , metadata );
541540 return metadata ;
542541 }
543542
@@ -759,11 +758,7 @@ private void resetSequence(ByteBuf message) {
759758 } else {
760759 int newSeqNo = Integer .parseInt (requireNonNull (seqNumValue .getValue ()));
761760 serverMsgSeqNum .updateAndGet (sequence -> {
762- if (sequence < newSeqNo - 1 ) {
763- return newSeqNo - 1 ;
764- } else {
765- return sequence ;
766- }
761+ return Math .max (sequence , newSeqNo - 1 );
767762 });
768763 }
769764 }
@@ -850,7 +845,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
850845 }
851846
852847 AtomicBoolean skip = new AtomicBoolean (recoveryConfig .getOutOfOrder ());
853- AtomicReference <ByteBuf > skipped = new AtomicReference (null );
848+ AtomicReference <ByteBuf > skipped = new AtomicReference <> (null );
854849
855850 int endSeq = endSeqNo ;
856851 LOGGER .info ("Loading messages from {} to {}" , beginSeqNo , endSeqNo );
@@ -1596,7 +1591,7 @@ private Map<String, String> gapFillSequenceReset(ByteBuf message, Map<String, St
15961591 onOutgoingUpdateTag (message , metadata );
15971592 FixField msgType = findField (message , MSG_TYPE_TAG , US_ASCII );
15981593
1599- if (msgType == null || !msgType .getValue (). equals ( MSG_TYPE_SEQUENCE_RESET )) return null ;
1594+ if (msgType == null || !Objects . equals ( msgType .getValue (), MSG_TYPE_SEQUENCE_RESET )) return null ;
16001595
16011596 if (resendRequestConfig .getGapFill ()) return null ;
16021597
@@ -1613,11 +1608,13 @@ private Map<String, String> missOutgoingMessages(ByteBuf message, Map<String, St
16131608 int countToMiss = strategy .getMissOutgoingMessagesConfiguration ().getCount ();
16141609 var strategyState = strategy .getState ();
16151610 onOutgoingUpdateTag (message , metadata );
1616- if (!strategyState .addMissedMessageToCacheIfCondition (msgSeqNum .get (), message .copy (), x -> x <= countToMiss )) {
1617- return null ;
1611+ if (strategyState .addMissedMessageToCacheIfCondition (msgSeqNum .get (), message .copy (), x -> x <= countToMiss )) {
1612+ message .clear ();
1613+ }
1614+ if (strategy .getAllowMessagesBeforeRetransmissionFinishes ()
1615+ && Duration .between (strategy .getStartTime (), Instant .now ()).compareTo (strategy .getConfig ().getDuration ()) > 0 ) {
1616+ strategy .disableAllowMessagesBeforeRetransmissionFinishes ("after " + strategy .getConfig ().getDuration () + " strategy duration" );
16181617 }
1619-
1620- message .clear ();
16211618
16221619 return null ;
16231620 }
@@ -1772,9 +1769,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
17721769 ByteBuf logonBuf = Unpooled .wrappedBuffer (logon .toString ().getBytes (StandardCharsets .UTF_8 ));
17731770
17741771 channel .send (logonBuf , strategy .getState ().enrichProperties (props ), null , SendMode .DIRECT_MQ )
1775- .thenAcceptAsync (x -> {
1776- strategy .getState ().addMessageID (x );
1777- }, executorService );
1772+ .thenAcceptAsync (x -> strategy .getState ().addMessageID (x ), executorService );
17781773
17791774 boolean logonSent = false ;
17801775 boolean responseReceived = true ;
@@ -1783,7 +1778,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
17831778 try (
17841779 Socket socket = new Socket (address .getAddress (), address .getPort ());
17851780 DataOutputStream dOut = new DataOutputStream (socket .getOutputStream ());
1786- DataInputStream dIn = new DataInputStream (socket .getInputStream ());
1781+ DataInputStream dIn = new DataInputStream (socket .getInputStream ())
17871782 ){
17881783 socket .setSoTimeout (5000 );
17891784
@@ -1817,7 +1812,8 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
18171812 LOGGER .info ("Waiting for 5 seconds to check if main session will be disconnected." );
18181813 Thread .sleep (5000 );
18191814 } catch (InterruptedException e ) {
1820- e .printStackTrace ();
1815+ LOGGER .error ("Interrupted" , e );
1816+ Thread .currentThread ().interrupt ();
18211817 }
18221818
18231819 HashMap <String , Object > additionalDetails = new HashMap <>();
@@ -2228,7 +2224,7 @@ private void applyNextStrategy() {
22282224 ruleErrorEvent (nextStrategyConfig .getRuleType (), null , e );
22292225 }
22302226
2231- LOGGER .info ("Next strategy applied: {}" , nextStrategyConfig .getRuleType ());
2227+ LOGGER .info ("Next strategy applied: {}, duration: {} " , nextStrategyConfig .getRuleType (), nextStrategyConfig . getDuration ());
22322228 executorService .schedule (this ::applyNextStrategy , nextStrategyConfig .getDuration ().toMillis (), TimeUnit .MILLISECONDS );
22332229 }
22342230
0 commit comments