Skip to content

Commit d359715

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix] Failed read entries after multiple decommissioning (apache#4613)
* - * checkstyle * let all ledger handle enable watcher * let all ledger handle enable watcher * fix tests * fix tests * fix tests * add test logs for debug * add test logs for debug * add test logs for debug * - * add a new param keepUpdateMetadata when open a read-only ledger handle * address comments * address comment * address comment * test CI * test CI * test CI * test CI * test CI * test CI * test CI * remove logs for CI * test CI * remove logs for CI * address comment * fix test (cherry picked from commit bae9e49) (cherry picked from commit b6d8b0f)
1 parent 40c9d15 commit d359715

File tree

5 files changed

+352
-13
lines changed

5 files changed

+352
-13
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,14 +1212,52 @@ public void asyncCreateLedgerAdv(final long ledgerId,
12121212
*/
12131213
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
12141214
final OpenCallback cb, final Object ctx) {
1215+
asyncOpenLedger(lId, digestType, passwd, cb, ctx, false);
1216+
}
1217+
1218+
/**
1219+
* Open existing ledger asynchronously for reading.
1220+
*
1221+
* <p>Opening a ledger with this method invokes fencing and recovery on the ledger
1222+
* if the ledger has not been closed. Fencing will block all other clients from
1223+
* writing to the ledger. Recovery will make sure that the ledger is closed
1224+
* before reading from it.
1225+
*
1226+
* <p>Recovery also makes sure that any entries which reached one bookie, but not a
1227+
* quorum, will be replicated to a quorum of bookies. This occurs in cases were
1228+
* the writer of a ledger crashes after sending a write request to one bookie but
1229+
* before being able to send it to the rest of the bookies in the quorum.
1230+
*
1231+
* <p>If the ledger is already closed, neither fencing nor recovery will be applied.
1232+
*
1233+
* @see LedgerHandle#asyncClose
1234+
*
1235+
* @param lId
1236+
* ledger identifier
1237+
* @param digestType
1238+
* digest type, either MAC or CRC32
1239+
* @param passwd
1240+
* password
1241+
* @param ctx
1242+
* optional control object
1243+
* @param keepUpdateMetadata
1244+
* Whether update ledger metadata if the auto-recover component modified the ledger's ensemble.
1245+
*/
1246+
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
1247+
final OpenCallback cb, final Object ctx, boolean keepUpdateMetadata) {
12151248
closeLock.readLock().lock();
12161249
try {
12171250
if (closed) {
12181251
cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
12191252
return;
12201253
}
1221-
new LedgerOpenOp(BookKeeper.this, clientStats,
1222-
lId, digestType, passwd, cb, ctx).initiate();
1254+
LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, clientStats,
1255+
lId, digestType, passwd, cb, ctx);
1256+
if (keepUpdateMetadata) {
1257+
ledgerOpenOp.initiateWithKeepUpdateMetadata();
1258+
} else {
1259+
ledgerOpenOp.initiate();
1260+
}
12231261
} finally {
12241262
closeLock.readLock().unlock();
12251263
}
@@ -1287,13 +1325,36 @@ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestTyp
12871325
*/
12881326
public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd)
12891327
throws BKException, InterruptedException {
1328+
return openLedger(lId, digestType, passwd, false);
1329+
}
1330+
1331+
1332+
/**
1333+
* Synchronous open ledger call.
1334+
*
1335+
* @see #asyncOpenLedger
1336+
* @param lId
1337+
* ledger identifier
1338+
* @param digestType
1339+
* digest type, either MAC or CRC32
1340+
* @param passwd
1341+
* password
1342+
*
1343+
* @param keepUpdateMetadata
1344+
* Whether update ledger metadata if the auto-recover component modified the ledger's ensemble.
1345+
* @return a handle to the open ledger
1346+
* @throws InterruptedException
1347+
* @throws BKException
1348+
*/
1349+
public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd, boolean keepUpdateMetadata)
1350+
throws BKException, InterruptedException {
12901351
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
12911352
SyncOpenCallback result = new SyncOpenCallback(future);
12921353

12931354
/*
12941355
* Calls async open ledger
12951356
*/
1296-
asyncOpenLedger(lId, digestType, passwd, result, null);
1357+
asyncOpenLedger(lId, digestType, passwd, result, null, keepUpdateMetadata);
12971358

12981359
return SyncCallbackUtils.waitForResult(future);
12991360
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ class LedgerOpenOp {
5757
ReadOnlyLedgerHandle lh;
5858
final byte[] passwd;
5959
boolean doRecovery = true;
60+
// The ledger metadata may be modified even if it has been closed, because the auto-recovery component may rewrite
61+
// the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an opened ledger
62+
// handle in memory still accesses to a BK instance who has been decommissioned. The issue that solved happens as
63+
// follows:
64+
// 1. Client service open a readonly ledger handle, which has been closed.
65+
// 2. All BKs that relates to the ledger have been decommissioned.
66+
// 3. Auto recovery component moved the data into other BK instances who is alive.
67+
// 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, and the
68+
// connection will always fail.
69+
// For minimum modification, to add a new configuration named "keepUpdateMetadata", users can use the
70+
// new API to create a readonly ledger handle that will auto-updates metadata.
71+
boolean keepUpdateMetadata = false;
6072
boolean administrativeOpen = false;
6173
long startTime;
6274
final OpStatsLogger openOpLogger;
@@ -126,6 +138,15 @@ public void initiateWithoutRecovery() {
126138
initiate();
127139
}
128140

141+
/**
142+
* Different with {@link #initiate()}, the method keep update metadata once the auto-recover component modified
143+
* the ensemble.
144+
*/
145+
public void initiateWithKeepUpdateMetadata() {
146+
this.keepUpdateMetadata = true;
147+
initiate();
148+
}
149+
129150
private CompletableFuture<Void> closeLedgerHandleAsync() {
130151
if (lh != null) {
131152
return lh.closeAsync();
@@ -174,9 +195,25 @@ private void openWithMetadata(Versioned<LedgerMetadata> versionedMetadata) {
174195
}
175196

176197
// get the ledger metadata back
198+
// The cases that need to register listener immediately are:
199+
// 1. The ledger is not in recovery opening, which is the original case.
200+
// 2. The ledger is closed and need to keep update metadata. There is other cases that do not need to
201+
// register listener. e.g. The ledger is opening by Auto-Recovery component.
202+
final boolean watchImmediately = !doRecovery || (keepUpdateMetadata && metadata.isClosed());
177203
try {
204+
// The ledger metadata may be modified even if it has been closed, because the auto-recovery component may
205+
// rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an
206+
// opened ledger handle in memory still accesses to a BK instance who has been decommissioned. The issue
207+
// that solved happens as follows:
208+
// 1. Client service open a readonly ledger handle, which has been closed.
209+
// 2. All BKs that relates to the ledger have been decommissioned.
210+
// 3. Auto recovery component moved the data into other BK instances who is alive.
211+
// 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set,
212+
// and the connection will always fail.
213+
// Therefore, if a user needs to the feature that update metadata automatically, he will set
214+
// "keepUpdateMetadata" to "true",
178215
lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType,
179-
passwd, !doRecovery);
216+
passwd, watchImmediately);
180217
} catch (GeneralSecurityException e) {
181218
LOG.error("Security exception while opening ledger: " + ledgerId, e);
182219
openComplete(BKException.Code.DigestNotInitializedException, null);
@@ -199,6 +236,9 @@ private void openWithMetadata(Versioned<LedgerMetadata> versionedMetadata) {
199236
public void safeOperationComplete(int rc, Void result) {
200237
if (rc == BKException.Code.OK) {
201238
openComplete(BKException.Code.OK, lh);
239+
if (!watchImmediately && keepUpdateMetadata) {
240+
lh.registerLedgerMetadataListener();
241+
}
202242
} else {
203243
closeLedgerHandleAsync().whenComplete((ignore, ex) -> {
204244
if (ex != null) {

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,18 @@ public String toString() {
9595
ReadOnlyLedgerHandle(ClientContext clientCtx,
9696
long ledgerId, Versioned<LedgerMetadata> metadata,
9797
BookKeeper.DigestType digestType, byte[] password,
98-
boolean watch)
98+
boolean watchImmediately)
9999
throws GeneralSecurityException, NumberFormatException {
100100
super(clientCtx, ledgerId, metadata, digestType, password, WriteFlag.NONE);
101-
if (watch) {
102-
clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
101+
if (watchImmediately) {
102+
registerLedgerMetadataListener();
103103
}
104104
}
105105

106+
void registerLedgerMetadataListener() {
107+
clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
108+
}
109+
106110
@Override
107111
public void close()
108112
throws InterruptedException, BKException {

bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public BookieAutoRecoveryTest() throws IOException, KeeperException,
9696

9797
@Override
9898
public void setUp() throws Exception {
99+
LOG.info("Start setUp");
99100
super.setUp();
100101
baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
101102
baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
@@ -117,10 +118,12 @@ public void setUp() throws Exception {
117118
mFactory = metadataClientDriver.getLedgerManagerFactory();
118119
underReplicationManager = mFactory.newLedgerUnderreplicationManager();
119120
ledgerManager = mFactory.newLedgerManager();
121+
LOG.info("Finished setUp");
120122
}
121123

122124
@Override
123125
public void tearDown() throws Exception {
126+
LOG.info("Start tearDown");
124127
super.tearDown();
125128

126129
if (null != underReplicationManager) {
@@ -138,6 +141,7 @@ public void tearDown() throws Exception {
138141
if (null != scheduler) {
139142
scheduler.shutdown();
140143
}
144+
LOG.info("Finished tearDown");
141145
}
142146

143147
/**
@@ -146,6 +150,7 @@ public void tearDown() throws Exception {
146150
*/
147151
@Test
148152
public void testOpenLedgers() throws Exception {
153+
LOG.info("Start testOpenLedgers");
149154
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
150155
LedgerHandle lh = listOfLedgerHandle.get(0);
151156
int ledgerReplicaIndex = 0;
@@ -186,6 +191,7 @@ public void testOpenLedgers() throws Exception {
186191

187192
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
188193
listOfLedgerHandle.get(0), ledgerReplicaIndex);
194+
LOG.info("Finished testOpenLedgers");
189195
}
190196

191197
/**
@@ -194,6 +200,7 @@ public void testOpenLedgers() throws Exception {
194200
*/
195201
@Test
196202
public void testClosedLedgers() throws Exception {
203+
LOG.info("Start testClosedLedgers");
197204
List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
198205
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
199206
closeLedgers(listOfLedgerHandle);
@@ -247,6 +254,7 @@ public void testClosedLedgers() throws Exception {
247254
listOfLedgerHandle.get(index),
248255
listOfReplicaIndex.get(index));
249256
}
257+
LOG.info("Finished testClosedLedgers");
250258
}
251259

252260
/**
@@ -256,6 +264,7 @@ public void testClosedLedgers() throws Exception {
256264
*/
257265
@Test
258266
public void testStopWhileReplicationInProgress() throws Exception {
267+
LOG.info("Start testStopWhileReplicationInProgress");
259268
int numberOfLedgers = 2;
260269
List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
261270
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(
@@ -327,6 +336,7 @@ public void testStopWhileReplicationInProgress() throws Exception {
327336
listOfLedgerHandle.get(index),
328337
listOfReplicaIndex.get(index));
329338
}
339+
LOG.info("Finished testStopWhileReplicationInProgress");
330340
}
331341

332342
/**
@@ -336,6 +346,7 @@ public void testStopWhileReplicationInProgress() throws Exception {
336346
*/
337347
@Test
338348
public void testNoSuchLedgerExists() throws Exception {
349+
LOG.info("Start testNoSuchLedgerExists");
339350
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(2, 5);
340351
CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
341352
for (LedgerHandle lh : listOfLedgerHandle) {
@@ -372,6 +383,7 @@ public void testNoSuchLedgerExists() throws Exception {
372383
assertNull("UrLedger still exists after rereplication",
373384
watchUrLedgerNode(getUrLedgerZNode(lh), latch));
374385
}
386+
LOG.info("Finished testNoSuchLedgerExists");
375387
}
376388

377389
/**
@@ -380,6 +392,7 @@ public void testNoSuchLedgerExists() throws Exception {
380392
*/
381393
@Test
382394
public void testEmptyLedgerLosesQuorumEventually() throws Exception {
395+
LOG.info("Start testEmptyLedgerLosesQuorumEventually");
383396
LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD);
384397
CountDownLatch latch = new CountDownLatch(1);
385398
String urZNode = getUrLedgerZNode(lh);
@@ -420,6 +433,7 @@ public void testEmptyLedgerLosesQuorumEventually() throws Exception {
420433

421434
// should be able to open ledger without issue
422435
bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD);
436+
LOG.info("Finished testEmptyLedgerLosesQuorumEventually");
423437
}
424438

425439
/**
@@ -429,6 +443,7 @@ public void testEmptyLedgerLosesQuorumEventually() throws Exception {
429443
@Test
430444
public void testLedgerMetadataContainsIpAddressAsBookieID()
431445
throws Exception {
446+
LOG.info("Start testLedgerMetadataContainsIpAddressAsBookieID");
432447
stopBKCluster();
433448
bkc = new BookKeeperTestClient(baseClientConf);
434449
// start bookie with useHostNameAsBookieID=false, as old bookie
@@ -494,7 +509,7 @@ public void testLedgerMetadataContainsIpAddressAsBookieID()
494509

495510
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
496511
listOfLedgerHandle.get(0), ledgerReplicaIndex);
497-
512+
LOG.info("Finished testLedgerMetadataContainsIpAddressAsBookieID");
498513
}
499514

500515
/**
@@ -504,6 +519,7 @@ public void testLedgerMetadataContainsIpAddressAsBookieID()
504519
@Test
505520
public void testLedgerMetadataContainsHostNameAsBookieID()
506521
throws Exception {
522+
LOG.info("Start testLedgerMetadataContainsHostNameAsBookieID");
507523
stopBKCluster();
508524

509525
bkc = new BookKeeperTestClient(baseClientConf);
@@ -572,7 +588,7 @@ public void testLedgerMetadataContainsHostNameAsBookieID()
572588

573589
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
574590
listOfLedgerHandle.get(0), ledgerReplicaIndex);
575-
591+
LOG.info("Finished testLedgerMetadataContainsHostNameAsBookieID");
576592
}
577593

578594
private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) {
@@ -634,13 +650,13 @@ private Stat watchUrLedgerNode(final String znode,
634650
@Override
635651
public void process(WatchedEvent event) {
636652
if (event.getType() == EventType.NodeDeleted) {
637-
LOG.info("Received Ledger rereplication completion event :"
638-
+ event.getType());
653+
LOG.info("Received Ledger replication completion. event : {}, path: {}, latchCount: {}",
654+
event.getType(), event.getPath(), latch.getCount());
639655
latch.countDown();
640656
}
641657
if (event.getType() == EventType.NodeCreated) {
642-
LOG.info("Received urLedger publishing event :"
643-
+ event.getType());
658+
LOG.info("Received urLedger publishing event: {}, path: {}, latchCount: {}",
659+
event.getType(), event.getPath(), latch.getCount());
644660
latch.countDown();
645661
}
646662
}

0 commit comments

Comments
 (0)