Skip to content

Commit 3921742

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[fix] [ml] Fix cursor metadata compatability issue when switching the config unackedRangesOpenCacheSetEnabled (apache#23759)
(cherry picked from commit 4ee4633) (cherry picked from commit 2609acd)
1 parent 774cfd3 commit 3921742

File tree

7 files changed

+235
-35
lines changed

7 files changed

+235
-35
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class ManagedLedgerConfig {
9090
@Getter
9191
@Setter
9292
private String shadowSourceName;
93+
@Getter
94+
private boolean persistIndividualAckAsLongArray;
9395

9496
public boolean isCreateIfMissing() {
9597
return createIfMissing;
@@ -100,6 +102,11 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
100102
return this;
101103
}
102104

105+
public ManagedLedgerConfig setPersistIndividualAckAsLongArray(boolean persistIndividualAckAsLongArray) {
106+
this.persistIndividualAckAsLongArray = persistIndividualAckAsLongArray;
107+
return this;
108+
}
109+
103110
/**
104111
* @return the lazyCursorRecovery
105112
*/

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ public class ManagedCursorImpl implements ManagedCursor {
178178
protected volatile long messagesConsumedCounter;
179179

180180
// Current ledger used to append the mark-delete position
181-
private volatile LedgerHandle cursorLedger;
181+
@VisibleForTesting
182+
volatile LedgerHandle cursorLedger;
182183

183184
// Wether the current cursorLedger is read-only or writable
184185
private boolean isCursorLedgerReadOnly = true;
@@ -632,7 +633,22 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
632633
try {
633634
Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
634635
list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
635-
individualDeletedMessages.build(rangeMap);
636+
// Guarantee compatability for the config "unackedRangesOpenCacheSetEnabled".
637+
if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
638+
individualDeletedMessages.build(rangeMap);
639+
} else {
640+
RangeSetWrapper<PositionImpl> rangeSetWrapperV2 = new RangeSetWrapper<>(positionRangeConverter,
641+
positionRangeReverseConverter, true,
642+
getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
643+
rangeSetWrapperV2.build(rangeMap);
644+
rangeSetWrapperV2.forEach(range -> {
645+
individualDeletedMessages.addOpenClosed(range.lowerEndpoint().getLedgerId(),
646+
range.lowerEndpoint().getEntryId(), range.upperEndpoint().getLedgerId(),
647+
range.upperEndpoint().getEntryId());
648+
return true;
649+
});
650+
rangeSetWrapperV2.clear();
651+
}
636652
} catch (Exception e) {
637653
log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(),
638654
name, e);
@@ -2335,7 +2351,14 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
23352351
}
23362352
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
23372353
// make the RangeSet recognize the "continuity" between adjacent Positions.
2338-
PositionImpl previousPosition = ledger.getPreviousPosition(position);
2354+
// Before https://github.com/apache/pulsar/pull/21105 is merged, the range does not support crossing
2355+
// multi ledgers, so the first position's entryId maybe "-1".
2356+
PositionImpl previousPosition;
2357+
if (position.getEntryId() == 0) {
2358+
previousPosition = new PositionImpl(position.getLedgerId(), -1);
2359+
} else {
2360+
previousPosition = ledger.getPreviousPosition(position);
2361+
}
23392362
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
23402363
previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
23412364
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
@@ -3138,10 +3161,21 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
31383161
.addAllProperties(buildPropertiesMap(mdEntry.properties));
31393162

31403163
Map<Long, long[]> internalRanges = null;
3141-
try {
3142-
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
3143-
} catch (Exception e) {
3144-
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
3164+
/**
3165+
* Cursor will create the {@link #individualDeletedMessages} typed {@link LongPairRangeSet.DefaultRangeSet} if
3166+
* disabled the config {@link ManagedLedgerConfig#unackedRangesOpenCacheSetEnabled}.
3167+
* {@link LongPairRangeSet.DefaultRangeSet} never implemented the methods below:
3168+
* - {@link LongPairRangeSet#toRanges(int)}, which is used to serialize cursor metadata.
3169+
* - {@link LongPairRangeSet#build(Map)}, which is used to deserialize cursor metadata.
3170+
* Do not enable the feature that https://github.com/apache/pulsar/pull/9292 introduced, to avoid serialization
3171+
* and deserialization error.
3172+
*/
3173+
if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) {
3174+
try {
3175+
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
3176+
} catch (Exception e) {
3177+
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
3178+
}
31453179
}
31463180
if (internalRanges != null && !internalRanges.isEmpty()) {
31473181
piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
*/
1919
package org.apache.bookkeeper.mledger.impl;
2020

21-
import static java.util.Objects.requireNonNull;
2221
import com.google.common.annotations.VisibleForTesting;
2322
import com.google.common.collect.Range;
2423
import java.util.ArrayList;
2524
import java.util.Collection;
2625
import java.util.List;
2726
import java.util.Map;
28-
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
2927
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
3028
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;
3129
import org.roaringbitmap.RoaringBitSet;
@@ -40,7 +38,6 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
4038

4139
private final LongPairRangeSet<T> rangeSet;
4240
private final LongPairConsumer<T> rangeConverter;
43-
private final ManagedLedgerConfig config;
4441
private final boolean enableMultiEntry;
4542

4643
/**
@@ -53,13 +50,19 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
5350
public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
5451
RangeBoundConsumer<T> rangeBoundConsumer,
5552
ManagedCursorImpl managedCursor) {
56-
requireNonNull(managedCursor);
57-
this.config = managedCursor.getManagedLedger().getConfig();
53+
this(rangeConverter, rangeBoundConsumer, managedCursor.getConfig().isUnackedRangesOpenCacheSetEnabled(),
54+
managedCursor.getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
55+
}
56+
57+
public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
58+
RangeBoundConsumer<T> rangeBoundConsumer,
59+
boolean unackedRangesOpenCacheSetEnabled,
60+
boolean persistentUnackedRangesWithMultipleEntriesEnabled) {
5861
this.rangeConverter = rangeConverter;
59-
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
62+
this.rangeSet = unackedRangesOpenCacheSetEnabled
6063
? new OpenLongPairRangeSet<>(rangeConverter, RoaringBitSet::new)
6164
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
62-
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
65+
this.enableMultiEntry = persistentUnackedRangesWithMultipleEntriesEnabled;
6366
}
6467

6568
@Override

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java

Lines changed: 101 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,27 @@
2020

2121
import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
2222
import static org.testng.Assert.assertEquals;
23+
import static org.testng.Assert.assertNotEquals;
2324
import static org.testng.Assert.assertNotNull;
2425
import static org.testng.Assert.assertTrue;
2526
import static org.testng.Assert.fail;
2627

28+
import com.fasterxml.jackson.databind.ObjectMapper;
2729
import java.nio.charset.StandardCharsets;
2830
import java.util.ArrayList;
2931
import java.util.List;
32+
import java.util.UUID;
3033
import java.util.concurrent.CountDownLatch;
3134
import java.util.concurrent.CyclicBarrier;
3235
import java.util.concurrent.Future;
3336
import java.util.concurrent.TimeUnit;
3437
import java.util.concurrent.atomic.AtomicBoolean;
3538
import java.util.concurrent.atomic.AtomicReference;
3639

40+
import lombok.extern.slf4j.Slf4j;
3741
import org.apache.bookkeeper.client.BookKeeper;
3842
import org.apache.bookkeeper.client.BookKeeperTestClient;
43+
import org.apache.bookkeeper.client.LedgerEntry;
3944
import org.apache.bookkeeper.client.api.DigestType;
4045
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
4146
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -49,18 +54,23 @@
4954
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
5055
import org.apache.bookkeeper.mledger.Position;
5156
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
57+
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
5258
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
5359
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
5460
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
55-
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
61+
import org.awaitility.Awaitility;
62+
import org.testng.annotations.DataProvider;
5663
import org.awaitility.Awaitility;
5764
import org.testng.annotations.Test;
5865

5966
import io.netty.buffer.ByteBuf;
6067
import lombok.Cleanup;
6168

69+
@Slf4j
6270
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
6371

72+
private final ObjectMapper jackson = new ObjectMapper();
73+
6474
public ManagedLedgerBkTest() {
6575
super(2);
6676
}
@@ -590,44 +600,114 @@ public void testPeriodicRollover() throws Exception {
590600
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
591601
}
592602

603+
@DataProvider(name = "unackedRangesOpenCacheSetEnabledPair")
604+
public Object[][] unackedRangesOpenCacheSetEnabledPair() {
605+
return new Object[][]{
606+
{false, true},
607+
{true, false},
608+
{true, true},
609+
{false, false}
610+
};
611+
}
612+
593613
/**
594614
* This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger.
595-
*
596615
* @throws Exception
597616
*/
598-
@Test
599-
public void testUnackmessagesAndRecovery() throws Exception {
617+
@Test(dataProvider = "unackedRangesOpenCacheSetEnabledPair")
618+
public void testUnackmessagesAndRecoveryCompatibility(boolean enabled1, boolean enabled2) throws Exception {
619+
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
620+
final String cursorName = "c1";
600621
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
601-
factoryConf.setMaxCacheSize(0);
602-
603622
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
604-
605-
ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
623+
final ManagedLedgerConfig config1 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
624+
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
625+
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
626+
.setUnackedRangesOpenCacheSetEnabled(enabled1);
627+
final ManagedLedgerConfig config2 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
606628
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
607-
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1);
608-
ManagedLedger ledger = factory.open("my_test_unack_messages", config);
609-
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
629+
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
630+
.setUnackedRangesOpenCacheSetEnabled(enabled2);
631+
632+
ManagedLedger ledger1 = factory.open(mlName, config1);
633+
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);
610634

611635
int totalEntries = 100;
612636
for (int i = 0; i < totalEntries; i++) {
613-
Position p = ledger.addEntry("entry".getBytes());
637+
Position p = ledger1.addEntry("entry".getBytes());
614638
if (i % 2 == 0) {
615-
cursor.delete(p);
639+
cursor1.delete(p);
616640
}
617641
}
642+
log.info("ack ranges: {}", cursor1.getIndividuallyDeletedMessagesSet().size());
618643

619-
LongPairRangeSet<PositionImpl> unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet();
644+
// reopen and recover cursor
645+
ledger1.close();
646+
ManagedLedger ledger2 = factory.open(mlName, config2);
647+
ManagedCursorImpl cursor2 = (ManagedCursorImpl) ledger2.openCursor(cursorName);
620648

621-
ledger.close();
649+
log.info("before: {}", cursor1.getIndividuallyDeletedMessagesSet().asRanges());
650+
log.info("after : {}", cursor2.getIndividuallyDeletedMessagesSet().asRanges());
651+
assertEquals(cursor1.getIndividuallyDeletedMessagesSet().asRanges(), cursor2.getIndividuallyDeletedMessagesSet().asRanges());
652+
assertEquals(cursor1.markDeletePosition, cursor2.markDeletePosition);
622653

623-
// open and recover cursor
624-
ledger = factory.open("my_test_unack_messages", config);
625-
cursor = (ManagedCursorImpl) ledger.openCursor("c1");
654+
ledger2.close();
655+
factory.shutdown();
656+
}
626657

627-
LongPairRangeSet<PositionImpl> unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet();
628-
assertTrue(unackMessagesBefore.equals(unackMessagesAfter));
658+
@DataProvider(name = "booleans")
659+
public Object[][] booleans() {
660+
return new Object[][] {
661+
{true},
662+
{false},
663+
};
664+
}
629665

630-
ledger.close();
666+
@Test(dataProvider = "booleans")
667+
public void testConfigPersistIndividualAckAsLongArray(boolean enable) throws Exception {
668+
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
669+
final String cursorName = "c1";
670+
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
671+
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
672+
final ManagedLedgerConfig config = new ManagedLedgerConfig()
673+
.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1)
674+
.setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1)
675+
.setMaxUnackedRangesToPersistInMetadataStore(1)
676+
.setUnackedRangesOpenCacheSetEnabled(true).setPersistIndividualAckAsLongArray(enable);
677+
678+
ManagedLedger ledger1 = factory.open(mlName, config);
679+
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);
680+
681+
// Write entries.
682+
int totalEntries = 100;
683+
List<Position> entries = new ArrayList<>();
684+
for (int i = 0; i < totalEntries; i++) {
685+
Position p = ledger1.addEntry("entry".getBytes());
686+
entries.add(p);
687+
}
688+
// Make ack holes and trigger a mark deletion.
689+
for (int i = totalEntries - 1; i >=0 ; i--) {
690+
if (i % 2 == 0) {
691+
cursor1.delete(entries.get(i));
692+
}
693+
}
694+
cursor1.markDelete(entries.get(9));
695+
Awaitility.await().untilAsserted(() -> {
696+
assertEquals(cursor1.pendingMarkDeleteOps.size(), 0);
697+
});
698+
699+
// Verify: the config affects.
700+
long cursorLedgerLac = cursor1.cursorLedger.getLastAddConfirmed();
701+
LedgerEntry ledgerEntry = cursor1.cursorLedger.readEntries(cursorLedgerLac, cursorLedgerLac).nextElement();
702+
MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry());
703+
if (enable) {
704+
assertNotEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
705+
} else {
706+
assertEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
707+
}
708+
709+
// cleanup
710+
ledger1.close();
631711
factory.shutdown();
632712
}
633713
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,6 +2146,10 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
21462146
+ " will only be tracked in memory and messages will be redelivered in case of"
21472147
+ " crashes.")
21482148
private int managedLedgerMaxUnackedRangesToPersist = 10000;
2149+
@FieldContext(
2150+
category = CATEGORY_STORAGE_ML,
2151+
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
2152+
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
21492153
@FieldContext(
21502154
category = CATEGORY_STORAGE_ML,
21512155
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1948,6 +1948,8 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
19481948

19491949
managedLedgerConfig
19501950
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
1951+
managedLedgerConfig
1952+
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
19511953
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
19521954
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
19531955
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(

0 commit comments

Comments
 (0)