11/*
2- * Copyright 2014-2024 Real Logic Limited.
2+ * Copyright 2014-2025 Real Logic Limited.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1515 */
1616package io .aeron .cluster ;
1717
18+ import io .aeron .cluster .client .AeronCluster ;
1819import io .aeron .cluster .codecs .NewLeadershipTermEventEncoder ;
1920import io .aeron .cluster .codecs .SessionOpenEventEncoder ;
21+ import io .aeron .driver .DataPacketDispatcher ;
22+ import io .aeron .driver .MediaDriver ;
23+ import io .aeron .driver .ReceiveChannelEndpointSupplier ;
24+ import io .aeron .driver .SendChannelEndpointSupplier ;
25+ import io .aeron .driver .ext .DebugReceiveChannelEndpoint ;
26+ import io .aeron .driver .ext .DebugSendChannelEndpoint ;
27+ import io .aeron .driver .ext .LossGenerator ;
28+ import io .aeron .driver .media .ReceiveChannelEndpoint ;
29+ import io .aeron .driver .media .SendChannelEndpoint ;
30+ import io .aeron .driver .media .UdpChannel ;
2031import io .aeron .test .EventLogExtension ;
2132import io .aeron .test .InterruptAfter ;
2233import io .aeron .test .InterruptingTestCallback ;
23- import io .aeron .test .IpTables ;
2434import io .aeron .test .SlowTest ;
2535import io .aeron .test .SystemTestWatcher ;
2636import io .aeron .test .Tests ;
2737import io .aeron .test .cluster .TestCluster ;
2838import io .aeron .test .cluster .TestNode ;
2939import org .agrona .CloseHelper ;
30- import org .junit .jupiter .api .AfterEach ;
40+ import org .agrona .concurrent .UnsafeBuffer ;
41+ import org .agrona .concurrent .status .AtomicCounter ;
3142import org .junit .jupiter .api .BeforeEach ;
3243import org .junit .jupiter .api .Test ;
33- import org .junit .jupiter .api .condition .EnabledOnOs ;
34- import org .junit .jupiter .api .condition .OS ;
3544import org .junit .jupiter .api .extension .ExtendWith ;
3645import org .junit .jupiter .api .extension .RegisterExtension ;
3746
38- import java .util .ArrayList ;
39- import java .util .List ;
47+ import java .net .InetSocketAddress ;
48+ import java .util .concurrent .atomic .AtomicBoolean ;
49+ import java .util .concurrent .atomic .AtomicInteger ;
4050
4151import static io .aeron .protocol .DataHeaderFlyweight .HEADER_LENGTH ;
4252import static io .aeron .test .cluster .TestCluster .aCluster ;
53+ import static io .aeron .test .driver .TestMediaDriver .shouldRunJavaMediaDriver ;
4354import static org .junit .jupiter .api .Assertions .assertEquals ;
55+ import static org .junit .jupiter .api .Assumptions .assumeTrue ;
4456
4557@ ExtendWith ({ EventLogExtension .class , InterruptingTestCallback .class })
46- @ EnabledOnOs (OS .LINUX )
4758public class ClusterUncommittedStateTest
4859{
49- private static final List <String > HOSTNAMES = List .of ("127.2.0.0" , "127.2.1.0" , "127.2.2.0" );
50- private static final String CHAIN_NAME = "CLUSTER-TEST" ;
60+ private static final int NODE_COUNT = 3 ;
5161
5262 @ RegisterExtension
5363 final SystemTestWatcher systemTestWatcher = new SystemTestWatcher ();
5464 private TestCluster cluster ;
65+ private final ToggledLossControl [] toggledLossControls = new ToggledLossControl [NODE_COUNT ];
5566
5667 @ BeforeEach
5768 void setUp ()
5869 {
59- IpTables .setupChain (CHAIN_NAME );
60- }
61-
62- @ AfterEach
63- void tearDown ()
64- {
65- IpTables .tearDownChain (CHAIN_NAME );
70+ for (int i = 0 ; i < NODE_COUNT ; ++i )
71+ {
72+ toggledLossControls [i ] = new ToggledLossControl ();
73+ }
6674 }
6775
6876 @ Test
6977 @ SlowTest
7078 @ InterruptAfter (20 )
7179 void shouldNextCommittedSessionIdReflectOnlyCommittedSessions ()
7280 {
81+ assumeTrue (shouldRunJavaMediaDriver ());
82+
7383 cluster = aCluster ()
74- .withStaticNodes (HOSTNAMES .size ())
75- .withCustomAddresses (HOSTNAMES )
84+ .withStaticNodes (NODE_COUNT )
85+ .withReceiveChannelEndpointSupplier ((index ) -> toggledLossControls [index ])
86+ .withSendChannelEndpointSupplier ((index ) -> toggledLossControls [index ])
7687 .start ();
7788 systemTestWatcher .cluster (cluster );
7889
7990 final TestNode firstLeader = cluster .awaitLeader ();
80- final List <String > leaderHostname = List .of (HOSTNAMES .get (firstLeader .memberId ()));
81- final List <String > followerHostnames = new ArrayList <>(HOSTNAMES );
82- followerHostnames .remove (firstLeader .memberId ());
91+ final ToggledLossControl leaderLossControl = toggledLossControls [firstLeader .memberId ()];
8392
84- IpTables .makeSymmetricNetworkPartition (CHAIN_NAME , leaderHostname , followerHostnames );
93+ leaderLossControl .toggleLoss (true );
94+ Tests .await (() -> 0 < leaderLossControl .droppedOutboundFrames .get () &&
95+ 0 < leaderLossControl .droppedInboundFrames .get ());
8596
86- CloseHelper .close (cluster .asyncConnectClient ());
87- CloseHelper .close (cluster .asyncConnectClient ());
88- CloseHelper .close (cluster .asyncConnectClient ());
97+ CloseHelper .closeAll (
98+ cluster .connectIpcClient (new AeronCluster .Context (), firstLeader .mediaDriver ().aeronDirectoryName ()),
99+ cluster .connectIpcClient (new AeronCluster .Context (), firstLeader .mediaDriver ().aeronDirectoryName ()),
100+ cluster .connectIpcClient (new AeronCluster .Context (), firstLeader .mediaDriver ().aeronDirectoryName ()));
89101 final long estimatedLogFixedLengthSize = (NewLeadershipTermEventEncoder .BLOCK_LENGTH + HEADER_LENGTH ) +
90102 (3 * (SessionOpenEventEncoder .BLOCK_LENGTH + HEADER_LENGTH ));
91103 Tests .await (() -> firstLeader .appendPosition () > estimatedLogFixedLengthSize );
@@ -111,12 +123,91 @@ void shouldNextCommittedSessionIdReflectOnlyCommittedSessions()
111123 return true ;
112124 });
113125
114- IpTables . flushChain ( CHAIN_NAME );
126+ leaderLossControl . toggleLoss ( false );
115127 Tests .await (() -> 1 == cluster .getSnapshotCount (firstLeader ));
116128
117129 for (int i = 0 ; i < cluster .memberCount (); ++i )
118130 {
119131 assertEquals (1 , ClusterTest .readSnapshot (cluster .node (i )));
120132 }
133+
134+ final TestNode finalLeader = cluster .awaitLeader ();
135+ CloseHelper .close (cluster .asyncConnectClient ());
136+ cluster .takeSnapshot (finalLeader );
137+ cluster .awaitSnapshotCount (2 );
138+
139+ for (int i = 0 ; i < cluster .memberCount (); ++i )
140+ {
141+ assertEquals (2 , ClusterTest .readSnapshot (cluster .node (i )));
142+ }
143+ }
144+
145+ private static final class ToggledLossControl implements ReceiveChannelEndpointSupplier , SendChannelEndpointSupplier
146+ {
147+ final AtomicBoolean shouldDropOutboundFrames = new AtomicBoolean (false );
148+ final AtomicInteger droppedOutboundFrames = new AtomicInteger (0 );
149+ final ToggledLossGenerator outboundLossGenerator = new ToggledLossGenerator (
150+ shouldDropOutboundFrames , droppedOutboundFrames );
151+
152+ final AtomicBoolean shouldDropInboundFrames = new AtomicBoolean (false );
153+ final AtomicInteger droppedInboundFrames = new AtomicInteger (0 );
154+ final ToggledLossGenerator inboundLossGenerator = new ToggledLossGenerator (
155+ shouldDropInboundFrames , droppedInboundFrames );
156+
157+ public ReceiveChannelEndpoint newInstance (
158+ final UdpChannel udpChannel ,
159+ final DataPacketDispatcher dispatcher ,
160+ final AtomicCounter statusIndicator ,
161+ final MediaDriver .Context context )
162+ {
163+ return new DebugReceiveChannelEndpoint (
164+ udpChannel , dispatcher , statusIndicator , context , inboundLossGenerator , inboundLossGenerator );
165+ }
166+
167+ public SendChannelEndpoint newInstance (
168+ final UdpChannel udpChannel ,
169+ final AtomicCounter statusIndicator ,
170+ final MediaDriver .Context context )
171+ {
172+ return new DebugSendChannelEndpoint (
173+ udpChannel , statusIndicator , context , outboundLossGenerator , outboundLossGenerator );
174+ }
175+
176+ void toggleLoss (final boolean loss )
177+ {
178+ shouldDropOutboundFrames .set (loss );
179+ shouldDropInboundFrames .set (loss );
180+ }
181+
182+ private static final class ToggledLossGenerator implements LossGenerator
183+ {
184+ private final AtomicBoolean shouldDropFrame ;
185+ private final AtomicInteger droppedFrames ;
186+
187+ private ToggledLossGenerator (final AtomicBoolean shouldDropFrame , final AtomicInteger droppedFrames )
188+ {
189+ this .shouldDropFrame = shouldDropFrame ;
190+ this .droppedFrames = droppedFrames ;
191+ }
192+
193+ public boolean shouldDropFrame (final InetSocketAddress address , final UnsafeBuffer buffer , final int length )
194+ {
195+ droppedFrames .incrementAndGet ();
196+ return shouldDropFrame .get ();
197+ }
198+
199+ public boolean shouldDropFrame (
200+ final InetSocketAddress address ,
201+ final UnsafeBuffer buffer ,
202+ final int streamId ,
203+ final int sessionId ,
204+ final int termId ,
205+ final int termOffset ,
206+ final int length )
207+ {
208+ droppedFrames .incrementAndGet ();
209+ return shouldDropFrame .get ();
210+ }
211+ }
121212 }
122213}
0 commit comments