diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 137be071d672f..375ab72a82491 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1503,7 +1503,12 @@ protected void internalResetCursor(Position proposedReadPosition, final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback; - final Position newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition); + final Position newMarkDeletePosition; + if (isCompactionCursor()) { + newMarkDeletePosition = markDeletePosition; + } else { + newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition); + } Runnable alignAcknowledgeStatusAfterPersisted = () -> { // Correct the variable "messagesConsumedCounter". @@ -1593,7 +1598,6 @@ public void operationFailed(ManagedLedgerException exception) { persistentMarkDeletePosition = null; inProgressMarkDeletePersistPosition = null; - lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null); internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() { @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java index ddfe8825a8888..7aba181cb4464 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.compaction; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.time.Duration; @@ -59,6 +60,8 @@ */ public abstract class AbstractTwoPhaseCompactor extends Compactor { + @VisibleForTesting + static Runnable injectionAfterSeekInPhaseTwo = () -> {}; private static final Logger log = LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class); protected static final int MAX_OUTSTANDING = 500; protected final Duration phaseOneLoopReadTimeout; @@ -188,6 +191,7 @@ private CompletableFuture phaseTwoSeekThenLoop(RawReader reader, MessageId CompletableFuture promise = new CompletableFuture<>(); reader.seekAsync(from).thenCompose((v) -> { + injectionAfterSeekInPhaseTwo.run(); Semaphore outstanding = new Semaphore(MAX_OUTSTANDING); CompletableFuture loopPromise = new CompletableFuture<>(); phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise, MessageId.earliest); @@ -436,4 +440,4 @@ protected static class PhaseOneResult { public long getPhaseOneLoopReadTimeoutInSeconds() { return phaseOneLoopReadTimeout.getSeconds(); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 889eb2b8a35e1..077cf9d0b11b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -57,6 +57,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -109,7 +110,8 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.testng.annotations.AfterMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -121,7 +123,13 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { protected BookKeeper bk; private PublishingOrderCompactor compactor; - @BeforeMethod + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setDispatcherMaxReadBatchSize(1); + } + + @BeforeClass @Override public void setup() throws Exception { super.internalSetup(); @@ -139,7 +147,7 @@ public void setup() throws Exception { compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, compactionScheduler); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { super.internalCleanup(); @@ -149,6 +157,12 @@ public void cleanup() throws Exception { } } + @BeforeMethod(alwaysRun = true) + public void beforeMethod() throws Exception { + admin.namespaces().removeRetention("my-tenant/my-ns"); + AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {}; + } + protected long compact(String topic) throws ExecutionException, InterruptedException { return compactor.compact(topic).get(); } @@ -165,7 +179,7 @@ protected PublishingOrderCompactor getCompactor() { @Test public void testCompaction() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compaction"; final int numMessages = 20; final int maxKeys = 10; @@ -229,7 +243,7 @@ public void testCompaction() throws Exception { @Test public void testCompactionWithReader() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compaction-with-reader"; final int numMessages = 20; final int maxKeys = 10; @@ -290,7 +304,7 @@ public void testCompactionWithReader() throws Exception { @Test public void testReadCompactedBeforeCompaction() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/read-compacted-before-compaction"; Producer producer = pulsarClient.newProducer() .topic(topic) @@ -330,7 +344,7 @@ public void testReadCompactedBeforeCompaction() throws Exception { @Test public void testReadEntriesAfterCompaction() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/read-entries-after-compaction"; Producer producer = pulsarClient.newProducer() .topic(topic) @@ -361,7 +375,7 @@ public void testReadEntriesAfterCompaction() throws Exception { @Test public void testSeekEarliestAfterCompaction() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/seek-earliest-after-compaction"; Producer producer = pulsarClient.newProducer() .topic(topic) @@ -402,7 +416,7 @@ public void testSeekEarliestAfterCompaction() throws Exception { @Test public void testBrokerRestartAfterCompaction() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/test-restart-after-compaction"; Producer producer = pulsarClient.newProducer() .topic(topic) @@ -444,7 +458,7 @@ public void testBrokerRestartAfterCompaction() throws Exception { @Test public void testCompactEmptyTopic() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compact-empty-topic"; Producer producer = pulsarClient.newProducer() .topic(topic) @@ -467,7 +481,7 @@ public void testCompactEmptyTopic() throws Exception { @Test public void testFirstMessageRetained() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/first-message-retained"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -508,7 +522,7 @@ public void testFirstMessageRetained() throws Exception { @Test public void testBatchMessageIdsDontChange() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/batch-message-ids-dont-change"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -571,7 +585,7 @@ public void testBatchMessageIdsDontChange() throws Exception { @Test public void testBatchMessageWithNullValue() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/batch-message-with-null-value"; pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .receiverQueueSize(1).readCompacted(true).subscribe().close(); @@ -625,7 +639,7 @@ public void testBatchMessageWithNullValue() throws Exception { @Test public void testWholeBatchCompactedOut() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/whole-batch-compacted-out"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -670,7 +684,7 @@ public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Excepti restartBroker(); FieldUtils.writeField(compactor, "topicCompactionRetainNullKey", retainNullKey, true); - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/key-less-messages-pass-through-" + retainNullKey; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -737,7 +751,7 @@ public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Excepti @Test public void testEmptyPayloadDeletes() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/empty-payload-deletes"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -819,7 +833,7 @@ public void testEmptyPayloadDeletes() throws Exception { @Test public void testEmptyPayloadDeletesWhenCompressed() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/empty-payload-deletes-when-compressed"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -900,7 +914,7 @@ public void testEmptyPayloadDeletesWhenCompressed() throws Exception { @Test public void testCompactorReadsCompacted() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compactor-reads-compacted"; // capture opened ledgers Set ledgersOpened = Sets.newConcurrentHashSet(); @@ -952,6 +966,12 @@ public void testCompactorReadsCompacted() throws Exception { assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); ledgersOpened.clear(); + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create()) { + producerNormal.newMessage() + .key("key2") + .value("my-message".getBytes()) + .send(); + } // force broker to close resources for topic pulsar.getBrokerService().getTopicReference(topic).get().close(false).get(); @@ -1000,7 +1020,7 @@ public void testCompactorReadsCompacted() throws Exception { @Test public void testCompactCompressedNoBatch() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compact-compressed-no-batch"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -1039,7 +1059,7 @@ public void testCompactCompressedNoBatch() throws Exception { @Test public void testCompactCompressedBatching() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compact-compressed-batching"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -1118,7 +1138,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe @Test public void testCompactEncryptedNoBatch() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compact-encrypted-no-batch"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -1160,7 +1180,7 @@ public void testCompactEncryptedNoBatch() throws Exception { @Test public void testCompactEncryptedBatching() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compact-encrypted-batching"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -1216,7 +1236,7 @@ public void testCompactEncryptedBatching() throws Exception { @Test public void testCompactEncryptedAndCompressedNoBatch() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compact-encrypted-and-compressed-no-batch"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -1259,7 +1279,7 @@ public void testCompactEncryptedAndCompressedNoBatch() throws Exception { @Test public void testCompactEncryptedAndCompressedBatching() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compact-encrypted-and-compressed-batching"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -1317,7 +1337,7 @@ public void testCompactEncryptedAndCompressedBatching() throws Exception { @Test public void testEmptyPayloadDeletesWhenEncrypted() throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/empty-payload-deletes-when-encrypted"; // subscribe before sending anything, so that we get all messages pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") @@ -1413,7 +1433,7 @@ public static Object[][] lastDeletedBatching() { @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") public void testCompactionWithLastDeletedKey(boolean batching) throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/compaction-with-last-deleted-key-" + batching; Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); @@ -1439,7 +1459,7 @@ public void testCompactionWithLastDeletedKey(boolean batching) throws Exception @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") public void testEmptyCompactionLedger(boolean batching) throws Exception { - String topic = "persistent://my-tenant/my-ns/my-topic1"; + String topic = "persistent://my-tenant/my-ns/empty-compaction-ledger-" + batching; Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); @@ -2347,7 +2367,7 @@ public void testAcknowledgeWithReconnection() throws Exception { @Test(timeOut = 120 * 1000) public void testConcurrentCompactionAndTopicDelete() throws Exception { - final String topicName = newUniqueName("persistent://my-tenant/my-ns/tp"); + final String topicName = newUniqueName("persistent://my-tenant/my-ns/concurrent-compaction-topic-delete"); admin.topics().createNonPartitionedTopic(topicName); // Load up the topic. Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); @@ -2460,4 +2480,68 @@ public void testEarliestSubsAfterRollover() throws Exception { assertEquals(results, expected); } + + @Test + public void testPhaseTwoInterruption() throws Exception { + // Set infinite retention to retain all original ledgers + admin.namespaces().setRetention("my-tenant/my-ns", new RetentionPolicies(-1, -1)); + final var topic = "persistent://my-tenant/my-ns/phase-two-interruption"; + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + final BiConsumer send = (key, value) -> { + final var msgId = producer.newMessage().key(key).value(value).sendAsync().join(); + log.info("Sent {} => {} to {}", key, value, msgId); + }; + + send.accept("key-0", "value"); + for (int i = 0; i < 3; i++) { + send.accept("key-1", "value-" + i); + } + + triggerAndWaitCompaction(topic); // update the compaction horizon + + AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> { + // Simulate the case when the topic is closed during compaction phase two + CompletableFuture.runAsync(() -> { + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).join() + .orElseThrow(); + persistentTopic.close().join(); + }); + }; + // Send a new message so that the compaction won't be skipped + send.accept("key-2", "value-0"); + send.accept("key-2", "value-1"); + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> assertFalse(pulsar.getBrokerService().getTopics() + .containsKey(TopicName.get(topic).toString()))); + + AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {}; + + // Messages of "key-2" are not compacted due to the injected failure, but the previous messages are read from + // the compacted ledger rather than the original ledger. + verifyReadKeyValues(topic, true, List.of("key-0", "value", "key-1", "value-2", "key-2", "value-0", "key-2", + "value-1")); + // The original ledger still exists so old values of "key-1" can be read + verifyReadKeyValues(topic, false, List.of("key-0", "value", "key-1", "value-0", "key-1", "value-1", "key-1", + "value-2", "key-2", "value-0", "key-2", "value-1")); + } + + private void verifyReadKeyValues(String topic, boolean readCompacted, List expectedKeyValues) + throws Exception { + @Cleanup final var reader = pulsarClient.newReader(Schema.STRING).topic(topic).readCompacted(readCompacted) + .startMessageId(MessageId.earliest).create(); + final var keyValues = new ArrayList(); + while (reader.hasMessageAvailable()) { + final var msg = reader.readNext(); + keyValues.add(msg.getKey()); + keyValues.add(msg.getValue()); + } + assertEquals(keyValues, expectedKeyValues, + readCompacted + " " + String.join(",", keyValues.toArray(new String[0]))); + } + + private void triggerAndWaitCompaction(String topic) throws Exception { + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> assertEquals( + admin.topics().compactionStatus(topic).status, LongRunningProcessStatus.Status.SUCCESS)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java index 4cdd195d49328..2ca03c55b30f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java @@ -20,18 +20,23 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl.MSG_COMPRESSION_TYPE; import static org.testng.Assert.assertEquals; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; @@ -45,45 +50,49 @@ import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "flaky") -public class StrategicCompactionTest extends CompactionTest { +public class StrategicCompactionTest extends MockedPulsarServiceBaseTest { + + protected ScheduledExecutorService compactionScheduler; + protected BookKeeper bk; private TopicCompactionStrategy strategy; private StrategicTwoPhaseCompactor compactor; - @BeforeMethod + @BeforeClass @Override public void setup() throws Exception { - super.setup(); + super.internalSetup(); + compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get(); compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy(); } + @AfterClass(alwaysRun = true) @Override - protected long compact(String topic) throws ExecutionException, InterruptedException { - return (long) compactor.compact(topic, strategy).get(); - } - - @Override - protected long compact(String topic, CryptoKeyReader cryptoKeyReader) - throws ExecutionException, InterruptedException { - return (long) compactor.compact(topic, strategy, cryptoKeyReader).get(); + public void cleanup() throws Exception { + super.internalCleanup(); + bk.close(); + if (compactionScheduler != null) { + compactionScheduler.shutdownNow(); + } } - @Override - protected PublishingOrderCompactor getCompactor() { - return compactor; + private long compact(String topic) throws ExecutionException, InterruptedException { + return (long) compactor.compact(topic, strategy).get(); } - @Test public void testNumericOrderCompaction() throws Exception { strategy = new NumericOrderCompactionStrategy(); - String topic = "persistent://my-property/use/my-ns/my-topic1"; + String topic = "persistent://my-property/use/my-ns/numeric-order-compaction"; final int numMessages = 50; final int maxKeys = 5;