1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+ package org .apache .ratis ;
19+
20+ import org .apache .ratis .client .RaftClient ;
21+ import org .apache .ratis .conf .RaftProperties ;
22+ import org .apache .ratis .protocol .RaftClientReply ;
23+ import org .apache .ratis .protocol .RaftPeerId ;
24+ import org .apache .ratis .protocol .exceptions .ReadIndexException ;
25+ import org .apache .ratis .retry .ExceptionDependentRetry ;
26+ import org .apache .ratis .retry .RetryPolicies ;
27+ import org .apache .ratis .retry .RetryPolicy ;
28+ import org .apache .ratis .server .RaftServer ;
29+ import org .apache .ratis .server .RaftServerConfigKeys ;
30+ import org .apache .ratis .server .impl .MiniRaftCluster ;
31+ import org .apache .ratis .util .Slf4jUtils ;
32+ import org .apache .ratis .util .TimeDuration ;
33+ import org .apache .ratis .util .function .CheckedConsumer ;
34+ import org .junit .jupiter .api .Assertions ;
35+ import org .junit .jupiter .api .BeforeEach ;
36+ import org .junit .jupiter .api .Test ;
37+ import org .slf4j .event .Level ;
38+
39+ import java .util .ArrayList ;
40+ import java .util .List ;
41+ import java .util .concurrent .CompletableFuture ;
42+ import java .util .concurrent .TimeUnit ;
43+
44+ import static org .apache .ratis .ReadOnlyRequestTests .CounterStateMachine ;
45+ import static org .apache .ratis .ReadOnlyRequestTests .INCREMENT ;
46+ import static org .apache .ratis .ReadOnlyRequestTests .QUERY ;
47+ import static org .apache .ratis .ReadOnlyRequestTests .WAIT_AND_INCREMENT ;
48+ import static org .apache .ratis .ReadOnlyRequestTests .assertReplyAtLeast ;
49+ import static org .apache .ratis .ReadOnlyRequestTests .assertReplyExact ;
50+ import static org .apache .ratis .ReadOnlyRequestTests .retrieve ;
51+ import static org .apache .ratis .server .RaftServerConfigKeys .Read .Option .LINEARIZABLE ;
52+
53+ /** Test for the {@link RaftServerConfigKeys.Read.Option#LINEARIZABLE} feature. */
54+ public abstract class LinearizableReadTests <CLUSTER extends MiniRaftCluster >
55+ extends BaseTest
56+ implements MiniRaftCluster .Factory .Get <CLUSTER > {
57+
58+ {
59+ Slf4jUtils .setLogLevel (RaftServer .Division .LOG , Level .DEBUG );
60+ }
61+
62+ public abstract boolean isLeaderLeaseEnabled ();
63+
64+ public abstract void assertRaftProperties (RaftProperties properties );
65+
66+ void runWithNewCluster (CheckedConsumer <CLUSTER , Exception > testCase ) throws Exception {
67+ runWithNewCluster (3 , 0 , true , cluster -> {
68+ assertRaftProperties (cluster .getProperties ());
69+ testCase .accept (cluster );
70+ });
71+ }
72+
73+ @ BeforeEach
74+ public void setup () {
75+ final RaftProperties p = getProperties ();
76+ CounterStateMachine .setProperties (p );
77+ RaftServerConfigKeys .Read .setOption (p , LINEARIZABLE );
78+ RaftServerConfigKeys .Read .setLeaderLeaseEnabled (p , isLeaderLeaseEnabled ());
79+ }
80+
81+ @ Test
82+ public void testLinearizableRead () throws Exception {
83+ runWithNewCluster (ReadOnlyRequestTests ::runTestReadOnly );
84+ }
85+
86+ @ Test
87+ public void testLinearizableReadTimeout () throws Exception {
88+ runWithNewCluster (cluster -> ReadOnlyRequestTests .runTestReadTimeout (ReadIndexException .class , cluster ));
89+ }
90+
91+ @ Test
92+ public void testFollowerLinearizableRead () throws Exception {
93+ runWithNewCluster (LinearizableReadTests ::runTestFollowerLinearizableRead );
94+ }
95+
96+ static class Reply {
97+ private final int count ;
98+ private final CompletableFuture <RaftClientReply > future ;
99+
100+ Reply (int count , CompletableFuture <RaftClientReply > future ) {
101+ this .count = count ;
102+ this .future = future ;
103+ }
104+
105+ void assertExact () {
106+ assertReplyExact (count , future .join ());
107+ }
108+
109+ void assertAtLeast () {
110+ assertReplyAtLeast (count , future .join ());
111+ }
112+ }
113+
114+ static <C extends MiniRaftCluster > void runTestFollowerLinearizableRead (C cluster ) throws Exception {
115+ final RaftPeerId leaderId = RaftTestUtil .waitForLeader (cluster ).getId ();
116+
117+ final List <RaftServer .Division > followers = cluster .getFollowers ();
118+ Assertions .assertEquals (2 , followers .size ());
119+
120+ final RaftPeerId f0 = followers .get (0 ).getId ();
121+ final RaftPeerId f1 = followers .get (1 ).getId ();
122+
123+ final int n = 100 ;
124+ final List <Reply > f0Replies = new ArrayList <>(n );
125+ final List <Reply > f1Replies = new ArrayList <>(n );
126+ try (RaftClient client = cluster .createClient (leaderId )) {
127+ for (int i = 0 ; i < n ; i ++) {
128+ final int count = i + 1 ;
129+ assertReplyExact (count , client .io ().send (INCREMENT ));
130+
131+ f0Replies .add (new Reply (count , client .async ().sendReadOnly (QUERY , f0 )));
132+ f1Replies .add (new Reply (count , client .async ().sendReadOnly (QUERY , f1 )));
133+ }
134+
135+ for (int i = 0 ; i < n ; i ++) {
136+ f0Replies .get (i ).assertAtLeast ();
137+ f1Replies .get (i ).assertAtLeast ();
138+ }
139+ }
140+ }
141+
142+ @ Test
143+ public void testFollowerLinearizableReadParallel () throws Exception {
144+ runWithNewCluster (LinearizableReadTests ::runTestFollowerReadOnlyParallel );
145+ }
146+
147+ static <C extends MiniRaftCluster > void runTestFollowerReadOnlyParallel (C cluster ) throws Exception {
148+ final RaftPeerId leaderId = RaftTestUtil .waitForLeader (cluster ).getId ();
149+
150+ final List <RaftServer .Division > followers = cluster .getFollowers ();
151+ Assertions .assertEquals (2 , followers .size ());
152+ final RaftPeerId f0 = followers .get (0 ).getId ();
153+ final RaftPeerId f1 = followers .get (1 ).getId ();
154+
155+ try (RaftClient leaderClient = cluster .createClient (leaderId );
156+ RaftClient f0Client = cluster .createClient (f0 );
157+ RaftClient f1Client = cluster .createClient (f1 )) {
158+
159+ final int n = 10 ;
160+ final List <Reply > writeReplies = new ArrayList <>(n );
161+ final List <Reply > f1Replies = new ArrayList <>(n );
162+ for (int i = 0 ; i < n ; i ++) {
163+ int count = 2 *i + 1 ;
164+ assertReplyExact (count , leaderClient .io ().send (INCREMENT ));
165+
166+ count ++;
167+ writeReplies .add (new Reply (count , leaderClient .async ().send (WAIT_AND_INCREMENT )));
168+ Thread .sleep (100 );
169+
170+ assertReplyExact (count , f0Client .io ().sendReadOnly (QUERY , f0 ));
171+ f1Replies .add (new Reply (count , f1Client .async ().sendReadOnly (QUERY , f1 )));
172+ }
173+
174+ for (int i = 0 ; i < n ; i ++) {
175+ writeReplies .get (i ).assertExact ();
176+ f1Replies .get (i ).assertAtLeast ();
177+ }
178+ }
179+ }
180+
181+ @ Test
182+ public void testLinearizableReadFailWhenLeaderDown () throws Exception {
183+ runWithNewCluster (LinearizableReadTests ::runTestLinearizableReadFailWhenLeaderDown );
184+ }
185+
186+ static <C extends MiniRaftCluster > void runTestLinearizableReadFailWhenLeaderDown (C cluster ) throws Exception {
187+ final RaftPeerId leaderId = RaftTestUtil .waitForLeader (cluster ).getId ();
188+
189+ final List <RaftServer .Division > followers = cluster .getFollowers ();
190+ Assertions .assertEquals (2 , followers .size ());
191+ final RaftPeerId f0 = followers .get (0 ).getId ();
192+
193+ try (RaftClient leaderClient = cluster .createClient (leaderId );
194+ RaftClient f0Client = cluster .createClient (f0 , RetryPolicies .noRetry ())) {
195+ assertReplyExact (1 , leaderClient .io ().send (INCREMENT ));
196+ assertReplyExact (1 , f0Client .io ().sendReadOnly (QUERY ));
197+
198+ // kill the leader
199+ // read timeout quicker than election timeout
200+ final RaftClientReply reply = leaderClient .admin ().transferLeadership (null , 200 );
201+ Assertions .assertTrue (reply .isSuccess ());
202+
203+ // client should fail and won't retry
204+ Assertions .assertThrows (ReadIndexException .class , () -> f0Client .io ().sendReadOnly (QUERY , f0 ));
205+ }
206+ }
207+
208+ @ Test
209+ public void testFollowerReadOnlyRetryWhenLeaderDown () throws Exception {
210+ // only retry on ReadIndexException
211+ final RetryPolicy retryPolicy = ExceptionDependentRetry
212+ .newBuilder ()
213+ .setDefaultPolicy (RetryPolicies .noRetry ())
214+ .setExceptionToPolicy (ReadIndexException .class ,
215+ RetryPolicies .retryForeverWithSleep (TimeDuration .valueOf (500 , TimeUnit .MILLISECONDS )))
216+ .build ();
217+
218+ runWithNewCluster (cluster -> ReadOnlyRequestTests .runTestReadOnlyRetryWhenLeaderDown (retryPolicy , cluster ));
219+ }
220+
221+
222+ @ Test
223+ public void testReadAfterWrite () throws Exception {
224+ runWithNewCluster (LinearizableReadTests ::runTestReadAfterWrite );
225+ }
226+
227+ static <C extends MiniRaftCluster > void runTestReadAfterWrite (C cluster ) throws Exception {
228+ final RaftPeerId leaderId = RaftTestUtil .waitForLeader (cluster ).getId ();
229+
230+ try (RaftClient client = cluster .createClient (leaderId )) {
231+ // test blocking read-after-write
232+ assertReplyExact (1 , client .io ().send (INCREMENT ));
233+ assertReplyExact (1 , client .io ().sendReadAfterWrite (QUERY ));
234+
235+ // test asynchronous read-after-write
236+ client .async ().send (INCREMENT );
237+ final CompletableFuture <RaftClientReply > asyncReply = client .async ().sendReadAfterWrite (QUERY );
238+
239+ for (int i = 0 ; i < 20 ; i ++) {
240+ client .async ().send (INCREMENT );
241+ }
242+
243+ // read-after-write is more consistent than linearizable read
244+ final CompletableFuture <RaftClientReply > linearizable = client .async ().sendReadOnly (QUERY );
245+ final CompletableFuture <RaftClientReply > readAfterWrite = client .async ().sendReadAfterWrite (QUERY );
246+ final int r = retrieve (readAfterWrite .get ());
247+ final int l = retrieve (linearizable .get ());
248+ Assertions .assertTrue (r >= l , () -> "readAfterWrite = " + r + " < linearizable = " + l );
249+
250+ assertReplyAtLeast (2 , asyncReply .join ());
251+ }
252+ }
253+ }
0 commit comments