Skip to content

Commit d574162

Browse files
committed
No need to trigger ensembleChangeLoop if all failed bookies are not in current ensemble
Signed-off-by: M1eyu2018 <[email protected]>
1 parent 7317435 commit d574162

File tree

3 files changed

+157
-6
lines changed

3 files changed

+157
-6
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2176,13 +2176,27 @@ void handleBookieFailure(final Map<Integer, BookieId> failedBookies) {
21762176
delayedWriteFailedBookies.putAll(failedBookies);
21772177
} else {
21782178
changingEnsemble = true;
2179-
triggerLoop = true;
2180-
21812179
toReplace = new HashMap<>(delayedWriteFailedBookies);
21822180
delayedWriteFailedBookies.clear();
21832181
toReplace.putAll(failedBookies);
21842182

21852183
origEnsemble = getCurrentEnsemble();
2184+
2185+
for (Map.Entry<Integer, BookieId> entry : toReplace.entrySet()) {
2186+
Integer bookieIndex = entry.getKey();
2187+
BookieId addr = entry.getValue();
2188+
if (origEnsemble.get(bookieIndex).equals(addr)) {
2189+
triggerLoop = true;
2190+
break;
2191+
}
2192+
}
2193+
2194+
if (!triggerLoop) {
2195+
if (LOG.isDebugEnabled()) {
2196+
LOG.debug("No need to triggerLoop as all failed bookies are not in current ensemble, failedBookies:{}", toReplace);
2197+
}
2198+
changingEnsemble = false;
2199+
}
21862200
}
21872201
}
21882202
if (triggerLoop) {
@@ -2211,10 +2225,13 @@ void ensembleChangeLoop(List<BookieId> origEnsemble, Map<Integer, BookieId> fail
22112225
new MetadataUpdateLoop(
22122226
clientCtx.getLedgerManager(), getId(),
22132227
this::getVersionedLedgerMetadata,
2214-
(metadata) -> metadata.getState() == LedgerMetadata.State.OPEN
2215-
&& failedBookies.entrySet().stream().anyMatch(
2216-
e -> LedgerMetadataUtils.getLastEnsembleValue(metadata)
2217-
.get(e.getKey()).equals(e.getValue())),
2228+
(metadata) -> {
2229+
LedgerHandleFaultInjector.getInstance().sleepWhenTest();
2230+
return metadata.getState() == LedgerMetadata.State.OPEN
2231+
&& failedBookies.entrySet().stream().anyMatch(
2232+
e -> LedgerMetadataUtils.getLastEnsembleValue(metadata)
2233+
.get(e.getKey()).equals(e.getValue()));
2234+
},
22182235
(metadata) -> {
22192236
attempts.incrementAndGet();
22202237

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.bookkeeper.client;
19+
20+
import com.google.common.annotations.VisibleForTesting;
21+
22+
/**
23+
* Used to inject certain faults for testing.
24+
*/
25+
public class LedgerHandleFaultInjector {
26+
@VisibleForTesting
27+
public static LedgerHandleFaultInjector instance =
28+
new LedgerHandleFaultInjector();
29+
30+
@VisibleForTesting
31+
public static LedgerHandleFaultInjector getInstance() {
32+
return instance;
33+
}
34+
35+
@VisibleForTesting
36+
public void sleepWhenTest() {
37+
}
38+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.bookkeeper.client;
22+
23+
import org.apache.bookkeeper.conf.ServerConfiguration;
24+
import org.apache.bookkeeper.net.BookieId;
25+
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
26+
import org.junit.Test;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import static org.junit.Assert.assertEquals;
31+
32+
/**
33+
* This unit test tests ensemble change.
34+
*/
35+
public class TestEnsembleChange extends BookKeeperClusterTestCase {
36+
37+
private static final Logger LOG = LoggerFactory.getLogger(TestEnsembleChange.class);
38+
39+
private final BookKeeper.DigestType digestType;
40+
41+
public TestEnsembleChange() {
42+
super(3);
43+
this.digestType = BookKeeper.DigestType.CRC32;
44+
}
45+
46+
@Test(timeout = 60000)
47+
public void TestFailedBookieIsNotInCurrentEnsembleWhenEnsembleChange() throws Exception {
48+
int ensembleSize = 3;
49+
int writeQuorumSize = 3;
50+
int ackQuorumSize = 2;
51+
int numEntries = 10;
52+
53+
final LedgerHandleFaultInjector injector = new LedgerHandleFaultInjector() {
54+
@Override
55+
public void sleepWhenTest() {
56+
// make the run time of ensemble change long
57+
// to simulate add entry is faster than ensemble change relatively
58+
try {
59+
Thread.sleep(1000);
60+
} catch (InterruptedException e) {
61+
LOG.info("catch InterruptedException when sleep", e);
62+
}
63+
}
64+
};
65+
LedgerHandleFaultInjector.instance = injector;
66+
67+
LedgerHandle lh = bkc.createLedger(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, "".getBytes());
68+
69+
// ensure available bookie to ensemble change
70+
ServerConfiguration conf = newServerConfiguration();
71+
startAndAddBookie(conf);
72+
73+
// normal add entry
74+
String tmp = "BookKeeper is cool!";
75+
for (int i = 0; i < numEntries; i++) {
76+
lh.addEntry(tmp.getBytes());
77+
}
78+
79+
// simulate slow bookie
80+
BookieId bookie = getBookie(0);
81+
sleepBookie(bookie, 30);
82+
83+
lh.addEntry(tmp.getBytes());
84+
// make a 2-second interval between first timeout and following timeouts
85+
// so that first ensemble change can finish before following ensemble changes
86+
Thread.sleep(2000);
87+
88+
// create following ensemble changes and the failed bookie is not in current ensemble
89+
for (int i = numEntries + 1; i < numEntries * 2; i++) {
90+
lh.addEntry(tmp.getBytes());
91+
Thread.sleep(1000);
92+
}
93+
94+
assertEquals(2 * numEntries - 1, lh.getLastAddConfirmed());
95+
}
96+
}

0 commit comments

Comments
 (0)