Skip to content

Commit 9e3ca60

Browse files
committed
address comments
1 parent 3de9f35 commit 9e3ca60

File tree

3 files changed

+50
-4
lines changed

3 files changed

+50
-4
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private void openWithMetadata(Versioned<LedgerMetadata> versionedMetadata) {
195195
}
196196

197197
// get the ledger metadata back
198+
final boolean needRecovery = !doRecovery || !metadata.isClosed();
198199
try {
199200
// The ledger metadata may be modified even if it has been closed, because the auto-recovery component may
200201
// rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an
@@ -208,7 +209,7 @@ private void openWithMetadata(Versioned<LedgerMetadata> versionedMetadata) {
208209
// Therefore, if a user needs to the feature that update metadata automatically, he will set
209210
// "keepUpdateMetadata" to "true",
210211
lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType,
211-
passwd, !doRecovery || keepUpdateMetadata);
212+
passwd, !needRecovery);
212213
} catch (GeneralSecurityException e) {
213214
LOG.error("Security exception while opening ledger: " + ledgerId, e);
214215
openComplete(BKException.Code.DigestNotInitializedException, null);
@@ -231,6 +232,9 @@ private void openWithMetadata(Versioned<LedgerMetadata> versionedMetadata) {
231232
public void safeOperationComplete(int rc, Void result) {
232233
if (rc == BKException.Code.OK) {
233234
openComplete(BKException.Code.OK, lh);
235+
if (needRecovery && keepUpdateMetadata) {
236+
lh.registerLedgerMetadataListener();
237+
}
234238
} else {
235239
closeLedgerHandleAsync().whenComplete((ignore, ex) -> {
236240
if (ex != null) {

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,18 @@ public String toString() {
9595
ReadOnlyLedgerHandle(ClientContext clientCtx,
9696
long ledgerId, Versioned<LedgerMetadata> metadata,
9797
BookKeeper.DigestType digestType, byte[] password,
98-
boolean watch)
98+
boolean watchImmediately)
9999
throws GeneralSecurityException, NumberFormatException {
100100
super(clientCtx, ledgerId, metadata, digestType, password, WriteFlag.NONE);
101-
if (watch) {
102-
clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
101+
if (watchImmediately) {
102+
registerLedgerMetadataListener();
103103
}
104104
}
105105

106+
void registerLedgerMetadataListener() {
107+
clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
108+
}
109+
106110
@Override
107111
public void close()
108112
throws InterruptedException, BKException {

bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,44 @@ public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Excepti
138138
lh.addEntry(data);
139139
lh.close();
140140
List<BookieId> originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L);
141+
LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, false);
142+
assertTrue(originalEnsemble.size() == 2);
143+
144+
startNewBookie();
145+
BookieServer newBookieServer3 = serverByIndex(lastBookieIndex());
146+
killBookie(originalEnsemble.get(0));
147+
waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), newBookieServer3.getBookieId());
148+
149+
startNewBookie();
150+
int newBookieIndex4 = lastBookieIndex();
151+
BookieServer newBookieServer4 = serverByIndex(newBookieIndex4);
152+
killBookie(originalEnsemble.get(1));
153+
waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), newBookieServer4.getBookieId());
154+
155+
Awaitility.await().untilAsserted(() -> {
156+
LedgerEntries ledgerEntries = readonlyLh.read(0, 0);
157+
assertNotNull(ledgerEntries);
158+
byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes();
159+
assertEquals(new String(data), new String(entryBytes));
160+
ledgerEntries.close();
161+
});
162+
readonlyLh.close();
163+
}
164+
165+
/**
166+
* The purpose of this test:
167+
* 1. Client service open a readonly ledger handle with recovery, which has not been closed yet.
168+
* 2. All BKs that relates to the ledger have been decommissioned.
169+
* 3. Auto recovery component moved the data into other BK instances who is alive.
170+
* 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read
171+
* request works.
172+
*/
173+
@Test
174+
public void testRecoverOpenLedgerHandleStillWorkAfterDecommissioning() throws Exception {
175+
LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD);
176+
assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 2);
177+
lh.addEntry(data);
178+
List<BookieId> originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L);
141179
LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true);
142180
assertTrue(originalEnsemble.size() == 2);
143181

0 commit comments

Comments
 (0)