File tree Expand file tree Collapse file tree 4 files changed +24
-2
lines changed
main/java/com/rabbitmq/stream/impl
java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 4 files changed +24
-2
lines changed Original file line number Diff line number Diff line change @@ -2824,7 +2824,17 @@ public void channelInactive(ChannelHandlerContext ctx) {
28242824 // because it will be handled later anyway.
28252825 if (shutdownReason == null ) {
28262826 if (closing .compareAndSet (false , true )) {
2827- executorService .submit (() -> closingSequence (ShutdownReason .UNKNOWN ));
2827+ if (executorService == null ) {
2828+ // the TCP connection is closed before the state is initialized
2829+ // we do our best the execute the closing sequence
2830+ new Thread (
2831+ () -> {
2832+ closingSequence (ShutdownReason .UNKNOWN );
2833+ })
2834+ .start ();
2835+ } else {
2836+ executorService .submit (() -> closingSequence (ShutdownReason .UNKNOWN ));
2837+ }
28282838 }
28292839 }
28302840 }
Original file line number Diff line number Diff line change @@ -742,7 +742,7 @@ static <T> T locatorOperation(
742742 Function <Client , T > operation ,
743743 Supplier <Client > clientSupplier ,
744744 BackOffDelayPolicy backOffDelayPolicy ) {
745- int maxAttempt = 3 ;
745+ int maxAttempt = 5 ;
746746 int attempt = 0 ;
747747 boolean executed = false ;
748748 Exception lastException = null ;
Original file line number Diff line number Diff line change 2323import static java .util .stream .IntStream .range ;
2424import static org .assertj .core .api .Assertions .assertThat ;
2525
26+ import ch .qos .logback .classic .Level ;
2627import com .google .common .util .concurrent .RateLimiter ;
2728import com .rabbitmq .stream .*;
2829import com .rabbitmq .stream .impl .TestUtils .Sync ;
@@ -57,10 +58,13 @@ public class RecoveryClusterTest {
5758 TestInfo testInfo ;
5859 EventLoopGroup eventLoopGroup ;
5960 EnvironmentBuilder environmentBuilder ;
61+ static Level producersCoordinatorLogLevel ;
6062
6163 @ BeforeAll
6264 static void initAll () {
6365 nodes = Cli .nodes ();
66+ producersCoordinatorLogLevel =
67+ TestUtils .newLoggerLevel (ProducersCoordinator .class , Level .DEBUG );
6468 }
6569
6670 @ BeforeEach
@@ -82,6 +86,13 @@ void tearDown() {
8286 }
8387 }
8488
89+ @ AfterAll
90+ static void tearDownAll () {
91+ if (producersCoordinatorLogLevel != null ) {
92+ TestUtils .newLoggerLevel (ProducersCoordinator .class , producersCoordinatorLogLevel );
93+ }
94+ }
95+
8596 @ ParameterizedTest
8697 @ CsvSource ({
8798 "false,false" ,
Original file line number Diff line number Diff line change 77
88 <logger name =" com.rabbitmq.stream" level =" warn" />
99 <logger name =" com.rabbitmq.stream.impl.Utils" level =" warn" />
10+ <logger name =" com.rabbitmq.stream.impl.StreamEnvironment" level =" warn" />
1011 <logger name =" com.rabbitmq.stream.impl.ConsumersCoordinator" level =" warn" />
1112 <logger name =" com.rabbitmq.stream.impl.ProducersCoordinator" level =" warn" />
1213 <logger name =" com.rabbitmq.stream.impl.RecoveryClusterTest" level =" info" />
You can’t perform that action at this time.
0 commit comments