From 6aab99efd239f9b8a402627455d4f14f84a99109 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 30 May 2025 20:40:32 +0300 Subject: [PATCH 1/4] Add some broker entry cache end-to-end tests to confirm assumptions (cherry picked from commit 5ddff77e083894003db9ba3f5479657e5c3c23b3) --- .../broker/cache/BrokerEntryCacheTest.java | 424 ++++++++++++++++++ .../broker/cache/PulsarLookupProxy.java | 109 +++++ .../pulsar/broker/service/Ipv4Proxy.java | 11 +- 3 files changed, 543 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BrokerEntryCacheTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/PulsarLookupProxy.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BrokerEntryCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BrokerEntryCacheTest.java new file mode 100644 index 0000000000000..ae68d8927f679 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BrokerEntryCacheTest.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.cache; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.Ipv4Proxy; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * This test class current contains test cases that are exploratory in nature and are not intended to be run + * as part of the regular test suite. This might change later. + * The current intent is to show how the PIP-430 caching results in better cache hit rates in catch-up reads. + * The way how catch-up reads are simulated is by disconnecting all broker connections at once using a failure proxy. + * This forces the consumers to reconnect and read from the beginning of the backlog. + */ +@Test(groups = "broker-api") +@Slf4j +public class BrokerEntryCacheTest extends ProducerConsumerBase { + @BeforeMethod + @Override + protected void setup() throws Exception { + this.conf.setClusterName("test"); + internalSetup(); + producerBaseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + @Override + protected ServiceConfiguration getDefaultConf() { + ServiceConfiguration defaultConf = super.getDefaultConf(); + // use cache eviction by expected read count, this is the default behavior so it's not necessary to set it, + // but it makes the test more explicit + defaultConf.setCacheEvictionByExpectedReadCount(true); + // increase default eviction threshold to reasonable value + defaultConf.setManagedLedgerCacheEvictionTimeThresholdMillis(10000); + // uncomment one of these to compare with existing caching behavior + //defaultConf.setCacheEvictionByExpectedReadCount(false); + //configurePR12258Caching(defaultConf); + //configureCacheEvictionByMarkDeletedPosition(defaultConf); + return defaultConf; + } + + /** + * Configures ServiceConfiguration for cache eviction based on the slowest markDeletedPosition. + * This method disables cache eviction by expected read count and enables eviction by markDeletedPosition. + * Related to https://github.com/apache/pulsar/pull/14985 - "Evicting cache data by the slowest markDeletedPosition" + * + * @param defaultConf ServiceConfiguration instance to be configured + */ + private static void configureCacheEvictionByMarkDeletedPosition(ServiceConfiguration defaultConf) { + defaultConf.setCacheEvictionByExpectedReadCount(false); + defaultConf.setCacheEvictionByMarkDeletedPosition(true); + } + + /** + * Configures ServiceConfiguration with settings to test PR12258 behavior for caching to drain backlog consumers. + * This method sets configurations to enable caching for cursors with backlogged messages. + * To make PR12258 effective, there's an additional change made in the broker codebase to + * activate the cursor when a consumer connects, instead of waiting for the scheduled task to activate it. + * Check org.apache.pulsar.broker.service.persistent.PersistentSubscription#addConsumerInternal method + * for the change. + * @param defaultConf ServiceConfiguration instance to be modified + */ + private static void configurePR12258Caching(ServiceConfiguration defaultConf) { + defaultConf.setCacheEvictionByExpectedReadCount(false); + defaultConf.setManagedLedgerMinimumBacklogCursorsForCaching(1); + defaultConf.setManagedLedgerMinimumBacklogEntriesForCaching(1); + defaultConf.setManagedLedgerMaxBacklogBetweenCursorsForCaching(Integer.MAX_VALUE); + defaultConf.setManagedLedgerCursorBackloggedThreshold(Long.MAX_VALUE); + } + + // change enabled to true to run the test + @Test(enabled = false) + public void testTailingReads() throws Exception { + final String topicName = "persistent://my-property/my-ns/cache-test-topic"; + final String subscriptionName = "test-subscription"; + final int numConsumers = 5; + final int messagesPerSecond = 150; + final int testDurationSeconds = 3; + final int totalMessages = messagesPerSecond * testDurationSeconds; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT64) + .topic(topicName) + .enableBatching(false) + .blockIfQueueFull(true) + .create(); + + // Create consumers on the tail (reading from latest) + Consumer[] consumers = new Consumer[numConsumers]; + for (int i = 0; i < numConsumers; i++) { + consumers[i] = pulsarClient.newConsumer(Schema.INT64) + .topic(topicName) + .subscriptionName(subscriptionName + "-" + i) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) + .subscribe(); + } + + ManagedLedgerFactoryMXBean cacheStats = pulsar.getDefaultManagedLedgerFactory().getCacheStats(); + + // Record initial cache metrics + long initialCacheHits = cacheStats.getCacheHitsTotal(); + long initialCacheMisses = cacheStats.getCacheMissesTotal(); + + // Start producer thread + CountDownLatch producerLatch = new CountDownLatch(1); + Thread producerThread = new Thread(() -> { + try { + long startTime = System.currentTimeMillis(); + int messagesSent = 0; + long messageId = 0; + while (messagesSent < totalMessages) { + long expectedTime = startTime + (messagesSent * 1000L / messagesPerSecond); + long currentTime = System.currentTimeMillis(); + + if (currentTime < expectedTime) { + Thread.sleep(expectedTime - currentTime); + } + + producer.send(messageId++); + messagesSent++; + } + log.info("Producer finished sending {} messages", messagesSent); + } catch (Exception e) { + log.error("Producer error", e); + fail("Producer failed: " + e.getMessage()); + } finally { + producerLatch.countDown(); + } + }); + + // Start consumer threads + CountDownLatch consumersLatch = new CountDownLatch(numConsumers); + for (int i = 0; i < numConsumers; i++) { + final int consumerId = i; + Thread consumerThread = new Thread(() -> { + try { + int messagesReceived = 0; + long startTime = System.currentTimeMillis(); + + while (System.currentTimeMillis() - startTime < (testDurationSeconds + 2) * 1000) { + try { + Message message = + consumers[consumerId].receive(1000, TimeUnit.MILLISECONDS); + if (message != null) { + consumers[consumerId].acknowledge(message); + messagesReceived++; + } + } catch (PulsarClientException.TimeoutException e) { + // Expected timeout, continue + } + } + log.info("Consumer {} received {} messages", consumerId, messagesReceived); + } catch (Exception e) { + log.error("Consumer {} error", consumerId, e); + } finally { + consumersLatch.countDown(); + } + }); + consumerThread.start(); + } + + // Start producer + producerThread.start(); + + // Wait for test completion + assertTrue(producerLatch.await(testDurationSeconds + 5, TimeUnit.SECONDS), + "Producer should complete within timeout"); + assertTrue(consumersLatch.await(testDurationSeconds + 10, TimeUnit.SECONDS), + "Consumers should complete within timeout"); + + // Clean up consumers + for (Consumer consumer : consumers) { + consumer.close(); + } + + // Get final cache metrics + long finalCacheHits = cacheStats.getCacheHitsTotal(); + long finalCacheMisses = cacheStats.getCacheMissesTotal(); + + // Calculate metrics similar to testStorageReadCacheMissesRate + long cacheHitsDelta = finalCacheHits - initialCacheHits; + long cacheMissesDelta = finalCacheMisses - initialCacheMisses; + + log.info("Cache metrics - Hits: {} -> {} (delta: {}), Misses: {} -> {} (delta: {})", + initialCacheHits, finalCacheHits, cacheHitsDelta, + initialCacheMisses, finalCacheMisses, cacheMissesDelta); + + // Verify that cache activity occurred + assertTrue(cacheHitsDelta + cacheMissesDelta > 0, + "Expected cache activity (hits or misses) during the test"); + + // Verify metrics make sense for the workload + assertTrue(cacheHitsDelta >= 0, "Cache hits should not decrease"); + assertTrue(cacheMissesDelta >= 0, "Cache misses should not decrease"); + + // With multiple consumers reading from the tail, we expect some cache activity + // The exact ratio depends on cache size and message patterns + double totalCacheRequests = cacheHitsDelta + cacheMissesDelta; + if (totalCacheRequests > 0) { + double cacheHitRate = cacheHitsDelta / totalCacheRequests; + log.info("Cache hit rate: {}%", String.format("%.2f", cacheHitRate * 100)); + + // With tail consumers, we might expect good cache hit rates + // since recent messages are more likely to be cached + assertTrue(cacheHitRate >= 0.0 && cacheHitRate <= 1.0, + "Cache hit rate should be between 0 and 1"); + } + } + + // change enabled to true to run the test + @Test(enabled = false) + public void testCatchUpReadsWithFailureProxyDisconnectingAllConnections() throws Exception { + final String topicName = "persistent://my-property/my-ns/cache-catchup-test-topic"; + final String subscriptionName = "test-catchup-subscription"; + final int numConsumers = 5; + final int totalMessages = 1000; + final int receiverQueueSize = 50; + + // Wire a failure proxy so that it's possible to disconnect broker connections forcefully + @Cleanup("stop") + Ipv4Proxy failureProxy = new Ipv4Proxy(0, "localhost", pulsar.getBrokerListenPort().get()); + failureProxy.startup(); + @Cleanup("stop") + PulsarLookupProxy lookupProxy = new PulsarLookupProxy(0, pulsar.getWebService().getListenPortHTTP().get(), + pulsar.getBrokerListenPort().get(), failureProxy.getLocalPort()); + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("http://localhost:" + lookupProxy.getBindPort()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT64) + .topic(topicName) + .enableBatching(false) + .blockIfQueueFull(true) + .create(); + + // Create consumers in paused state with receiver queue size of 50 + Consumer[] consumers = new Consumer[numConsumers]; + for (int i = 0; i < numConsumers; i++) { + consumers[i] = pulsarClient.newConsumer(Schema.INT64) + .topic(topicName) + .subscriptionName(subscriptionName + "-" + i) + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .startPaused(true) // start consumers in paused state + .receiverQueueSize(receiverQueueSize) + .subscribe(); + } + + ManagedLedgerFactoryMXBean cacheStats = pulsar.getDefaultManagedLedgerFactory().getCacheStats(); + + // Record initial cache metrics + long initialCacheHits = cacheStats.getCacheHitsTotal(); + long initialCacheMisses = cacheStats.getCacheMissesTotal(); + + // Produce all messages while consumers are paused + log.info("Starting to produce {} messages", totalMessages); + for (long messageId = 0; messageId < totalMessages; messageId++) { + producer.send(messageId); + } + log.info("Finished producing {} messages", totalMessages); + + // Record cache metrics after production + long afterProductionCacheHits = cacheStats.getCacheHitsTotal(); + long afterProductionCacheMisses = cacheStats.getCacheMissesTotal(); + + // Unpause all consumers + for (Consumer consumer : consumers) { + consumer.resume(); + } + + @Cleanup("interrupt") + Thread failureInjector = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + // Simulate a failure by disconnecting the broker connections + Thread.sleep(2000); // Wait for some messages to be consumed + failureProxy.disconnectFrontChannels(); + log.info("Injected failure by disconnecting all broker connections"); + } catch (InterruptedException e) { + log.info("Failure injector interrupted"); + Thread.currentThread().interrupt(); + } + } + }); + failureInjector.start(); + + // Start consumer threads to read the catch-up messages + CountDownLatch consumersLatch = new CountDownLatch(numConsumers); + int[] messagesReceivedPerConsumer = new int[numConsumers]; + + for (int i = 0; i < numConsumers; i++) { + final int consumerId = i; + Thread consumerThread = new Thread(() -> { + try { + long startTime = System.currentTimeMillis(); + ConcurrentHashMap messagesReceived = new ConcurrentHashMap<>(); + + // Give consumers enough time to catch up + while (messagesReceived.size() < totalMessages && System.currentTimeMillis() - startTime < 30000) { + try { + Message message = consumers[consumerId].receive(1000, TimeUnit.MILLISECONDS); + Thread.sleep(20); // Simulate processing time + if (message != null) { + long messageId = message.getValue(); + messagesReceived.put(messageId, messageId); + consumers[consumerId].acknowledge(message); + } + } catch (PulsarClientException.TimeoutException e) { + // Continue on timeout + } + } + messagesReceivedPerConsumer[consumerId] = messagesReceived.size(); + log.info("Consumer {} received {} messages", consumerId, messagesReceived.size()); + } catch (Exception e) { + log.error("Consumer {} error", consumerId, e); + } finally { + consumersLatch.countDown(); + } + }); + consumerThread.start(); + } + + // Wait for all consumers to complete + assertTrue(consumersLatch.await(60, TimeUnit.SECONDS), + "All consumers should complete catch-up reads within timeout"); + + failureInjector.interrupt(); + failureInjector.join(); + + // Clean up consumers + for (Consumer consumer : consumers) { + consumer.close(); + } + + // Get final cache metrics + long finalCacheHits = cacheStats.getCacheHitsTotal(); + long finalCacheMisses = cacheStats.getCacheMissesTotal(); + + // Calculate metrics + long productionCacheHitsDelta = afterProductionCacheHits - initialCacheHits; + long productionCacheMissesDelta = afterProductionCacheMisses - initialCacheMisses; + long consumptionCacheHitsDelta = finalCacheHits - afterProductionCacheHits; + long consumptionCacheMissesDelta = finalCacheMisses - afterProductionCacheMisses; + + log.info("Production phase - Cache hits delta: {}, Cache misses delta: {}", + productionCacheHitsDelta, productionCacheMissesDelta); + log.info("Consumption phase - Cache hits delta: {}, Cache misses delta: {}", + consumptionCacheHitsDelta, consumptionCacheMissesDelta); + + // Verify all consumers received all messages + for (int i = 0; i < numConsumers; i++) { + assertTrue(messagesReceivedPerConsumer[i] == totalMessages, + String.format("Consumer %d should receive all %d messages, but received %d", + i, totalMessages, messagesReceivedPerConsumer[i])); + } + + // Verify cache activity occurred during consumption + assertTrue(consumptionCacheHitsDelta + consumptionCacheMissesDelta > 0, + "Expected cache activity during catch-up reads"); + + // For catch-up reads, we expect minimal cache misses since messages should be cached + // or efficiently retrieved in sequence + double totalConsumptionCacheRequests = consumptionCacheHitsDelta + consumptionCacheMissesDelta; + if (totalConsumptionCacheRequests > 0) { + double cacheHitRate = consumptionCacheHitsDelta / totalConsumptionCacheRequests; + log.info("Consumption cache hit rate: {}%", String.format("%.2f", cacheHitRate * 100)); + + // For catch-up scenarios, we expect very few cache misses + assertTrue(consumptionCacheMissesDelta == 0 || cacheHitRate > 0.6, + String.format("Expected no cache misses or very high hit rate for catch-up reads. " + + "Cache misses: %d, Hit rate: %.2f%%", + consumptionCacheMissesDelta, cacheHitRate * 100)); + } + + log.info("Catch-up read test completed successfully with {} consumers and {} messages", + numConsumers, totalMessages); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/PulsarLookupProxy.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/PulsarLookupProxy.java new file mode 100644 index 0000000000000..9e92676830d6e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/PulsarLookupProxy.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.cache; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import lombok.SneakyThrows; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.proxy.ProxyServlet; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.Callback; + +/** + * A simple proxy that replaces the localhost Pulsar broker's target port in the response content. + * This is useful for routing requests via Toxiproxy or another failure proxy + * to simulate network failures for broker connections. + * The Pulsar client should use the http url of this proxy in the serviceUrl to achieve this. + */ +public class PulsarLookupProxy { + + private final Server server; + + @SneakyThrows + public PulsarLookupProxy(int bindPort, int brokerHttpPort, int brokerPulsarPort, int failureProxyPulsarPort) { + server = new Server(bindPort); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + server.setHandler(context); + + ServletHolder proxyServlet = + new ServletHolder(new ReplacingProxyServlet(brokerHttpPort, "localhost:" + brokerPulsarPort, + "localhost:" + failureProxyPulsarPort)); + context.addServlet(proxyServlet, "/*"); + + server.start(); + } + + public int getBindPort() { + return Arrays.stream(server.getConnectors()).filter(ServerConnector.class::isInstance) + .map(ServerConnector.class::cast).findFirst().map(connector -> connector.getLocalPort()).orElseThrow(); + } + + @SneakyThrows + public void stop() { + server.stop(); + } + + public static class ReplacingProxyServlet extends ProxyServlet { + private final int brokerHttpPort; + private final String search; + private final String replacement; + + public ReplacingProxyServlet(int brokerHttpPort, String search, String replacement) { + this.brokerHttpPort = brokerHttpPort; + this.search = search; + this.replacement = replacement; + } + + @Override + protected String rewriteTarget(HttpServletRequest clientRequest) { + StringBuilder url = new StringBuilder(); + url.append("http://localhost:" + brokerHttpPort); + url.append(clientRequest.getRequestURI()); + String query = clientRequest.getQueryString(); + if (query != null) { + url.append("?").append(query); + } + return url.toString(); + } + + @Override + protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, + Response proxyResponse, byte[] buffer, int offset, int length, + Callback callback) { + String contentString = new String(buffer, offset, length, StandardCharsets.UTF_8); + String replaced = contentString.replace(search, replacement); + if (replaced.equals(contentString)) { + super.onResponseContent(request, response, proxyResponse, buffer, offset, length, callback); + } else { + byte[] replacedBuffer = replaced.getBytes(StandardCharsets.UTF_8); + response.setHeader("Content-Length", null); + super.onResponseContent(request, response, proxyResponse, replacedBuffer, 0, replacedBuffer.length, + callback); + } + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java index a84dab4d17dff..431d3060bbaf8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java @@ -34,15 +34,18 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class Ipv4Proxy { @Getter - private final int localPort; + private int localPort; private final String backendServerHost; private final int backendServerPort; private final EventLoopGroup serverGroup = new NioEventLoopGroup(1); @@ -69,6 +72,12 @@ protected void initChannel(SocketChannel ch) { } }).childOption(ChannelOption.AUTO_READ, false) .bind(localPort).sync(); + if (localServerChannel.isSuccess()) { + localPort = ((InetSocketAddress) localServerChannel.channel().localAddress()).getPort(); + log.info("Proxy started on port: {}", localPort); + } else { + throw new RuntimeException("Failed to bind to port: " + localPort, localServerChannel.cause()); + } } public synchronized void stop() throws InterruptedException{ From a2a155b895e08395869ca82bd2ea593a618b0e4e Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 29 Jul 2025 16:30:31 +0300 Subject: [PATCH 2/4] Add expected read count solution --- .../org/apache/bookkeeper/mledger/Entry.java | 11 + .../mledger/EntryReadCountHandler.java | 28 +++ .../mledger/ManagedLedgerConfig.java | 3 + .../mledger/ReferenceCountedEntry.java | 8 + .../bookkeeper/mledger/impl/EntryImpl.java | 56 ++++- .../impl/EntryReadCountHandlerImpl.java | 57 +++++ .../mledger/impl/ManagedCursorContainer.java | 196 ++++++++---------- .../mledger/impl/ManagedCursorImpl.java | 9 + .../mledger/impl/ManagedLedgerImpl.java | 20 +- .../bookkeeper/mledger/impl/OpAddEntry.java | 21 +- .../mledger/impl/cache/EntryCache.java | 7 +- .../impl/cache/EntryCacheDisabled.java | 7 +- .../impl/cache/PendingReadsManager.java | 20 +- .../impl/cache/RangeCacheRemovalQueue.java | 143 ++++++++++++- .../impl/cache/RangeEntryCacheImpl.java | 40 ++-- ...RangeEntryCacheManagerEvictionHandler.java | 9 +- .../cache/RangeEntryCacheManagerImpl.java | 10 +- .../mledger/impl/EntryCacheManagerTest.java | 1 + .../mledger/impl/EntryCacheTest.java | 48 +++-- .../mledger/impl/EntryImplTest.java | 14 +- .../InflightReadsLimiterIntegrationTest.java | 6 +- .../impl/ManagedCursorContainerTest.java | 35 ++++ .../mledger/impl/ManagedCursorTest.java | 2 +- .../mledger/impl/ManagedLedgerTest.java | 1 + .../impl/cache/PendingReadsManagerTest.java | 112 +++++----- .../mledger/impl/cache/RangeCacheTest.java | 16 +- .../test/MockedBookKeeperTestCase.java | 1 + .../pulsar/broker/ServiceConfiguration.java | 8 + .../pulsar/broker/service/BrokerService.java | 11 +- .../broker/service/EntryAndMetadata.java | 6 + ...tStickyKeyDispatcherMultipleConsumers.java | 4 + .../persistent/PersistentSubscription.java | 6 + .../service/persistent/PersistentTopic.java | 5 + .../buffer/impl/TransactionEntryImpl.java | 6 + .../broker/cache/BacklogConsumerTest.java | 8 + .../MinimumBacklogCacheStrategyTest.java | 1 + .../api/SimpleProducerConsumerTest.java | 152 +++++--------- 37 files changed, 715 insertions(+), 373 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/EntryReadCountHandler.java create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryReadCountHandlerImpl.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java index c243861e9c0f5..cea114b27f54e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java @@ -67,6 +67,17 @@ public interface Entry { */ boolean release(); + /** + * Managed Ledger implementations of EntryImpl should implement this method to return the read count handler + * associated with the entry. + * This handler is used to track how many times the entry has been read and to manage + * the eviction of entries from the broker cache based on their expected read count. + * @return + */ + default EntryReadCountHandler getReadCountHandler() { + return null; + } + /** * Check if this entry is for the given Position. * @param position the position to check against diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/EntryReadCountHandler.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/EntryReadCountHandler.java new file mode 100644 index 0000000000000..858698c3ba5ab --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/EntryReadCountHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +public interface EntryReadCountHandler { + int getExpectedReadCount(); + void incrementExpectedReadCount(); + void markRead(); + default boolean hasExpectedReads() { + return getExpectedReadCount() >= 1; + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index aaa127973a5ec..ad5cbf9ad711f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -84,6 +84,9 @@ public class ManagedLedgerConfig { @Getter @Setter private boolean cacheEvictionByMarkDeletedPosition = false; + @Getter + @Setter + private boolean cacheEvictionByExpectedReadCount = true; private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReferenceCountedEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReferenceCountedEntry.java index 34305e312a06b..69399813312d8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReferenceCountedEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReferenceCountedEntry.java @@ -24,5 +24,13 @@ * An Entry that is also reference counted. */ public interface ReferenceCountedEntry extends Entry, ReferenceCounted { + EntryReadCountHandler getReadCountHandler(); + default boolean hasExpectedReads() { + EntryReadCountHandler readCountHandler = getReadCountHandler(); + if (readCountHandler != null) { + return readCountHandler.hasExpectedReads(); + } + return false; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index c2b8b906e919c..b85a1bc45fea8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.EntryReadCountHandler; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.ReferenceCountedEntry; @@ -48,20 +49,24 @@ protected EntryImpl newObject(Handle handle) { private long entryId; private Position position; ByteBuf data; + private EntryReadCountHandler readCountHandler; + private boolean decreaseReadCountOnRelease = true; private Runnable onDeallocate; - public static EntryImpl create(LedgerEntry ledgerEntry) { + public static EntryImpl create(LedgerEntry ledgerEntry, int expectedReadCount) { EntryImpl entry = RECYCLER.get(); entry.ledgerId = ledgerEntry.getLedgerId(); entry.entryId = ledgerEntry.getEntryId(); entry.data = ledgerEntry.getEntryBuffer(); entry.data.retain(); + entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount); entry.setRefCnt(1); return entry; } - public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) { + public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor, + int expectedReadCount) { ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null; if (interceptor != null) { ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate(); @@ -74,7 +79,7 @@ public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor duplicateBuffer.release(); } } - EntryImpl returnEntry = create(ledgerEntry); + EntryImpl returnEntry = create(ledgerEntry, expectedReadCount); if (processorHandle != null) { processorHandle.release(); ledgerEntry.close(); @@ -84,41 +89,66 @@ public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor @VisibleForTesting public static EntryImpl create(long ledgerId, long entryId, byte[] data) { + return create(ledgerId, entryId, data, 0); + } + + @VisibleForTesting + public static EntryImpl create(long ledgerId, long entryId, byte[] data, int expectedReadCount) { EntryImpl entry = RECYCLER.get(); entry.ledgerId = ledgerId; entry.entryId = entryId; entry.data = Unpooled.wrappedBuffer(data); + entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount); entry.setRefCnt(1); return entry; } public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) { + return create(ledgerId, entryId, data, 0); + } + + public static EntryImpl create(long ledgerId, long entryId, ByteBuf data, int expectedReadCount) { EntryImpl entry = RECYCLER.get(); entry.ledgerId = ledgerId; entry.entryId = entryId; entry.data = data; entry.data.retain(); + entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount); entry.setRefCnt(1); return entry; } - public static EntryImpl create(Position position, ByteBuf data) { + public static EntryImpl create(Position position, ByteBuf data, int expectedReadCount) { EntryImpl entry = RECYCLER.get(); entry.position = PositionFactory.create(position); entry.ledgerId = position.getLedgerId(); entry.entryId = position.getEntryId(); entry.data = data; entry.data.retain(); + entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount); + entry.setRefCnt(1); + return entry; + } + + public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data, int expectedReadCount) { + EntryImpl entry = RECYCLER.get(); + entry.position = PositionFactory.create(position); + entry.ledgerId = position.getLedgerId(); + entry.entryId = position.getEntryId(); + entry.data = data.retainedDuplicate(); + entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount); entry.setRefCnt(1); return entry; } - public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data) { + public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data, + EntryReadCountHandler entryReadCountHandler) { EntryImpl entry = RECYCLER.get(); entry.position = PositionFactory.create(position); entry.ledgerId = position.getLedgerId(); entry.entryId = position.getEntryId(); entry.data = data.retainedDuplicate(); + entry.readCountHandler = entryReadCountHandler; entry.setRefCnt(1); return entry; } @@ -130,6 +160,7 @@ public static EntryImpl create(EntryImpl other) { entry.ledgerId = other.ledgerId; entry.entryId = other.entryId; entry.data = other.data.retainedDuplicate(); + entry.readCountHandler = other.readCountHandler; entry.setRefCnt(1); return entry; } @@ -140,6 +171,7 @@ public static EntryImpl create(Entry other) { entry.ledgerId = other.getLedgerId(); entry.entryId = other.getEntryId(); entry.data = other.getDataBuffer().retainedDuplicate(); + entry.readCountHandler = other.getReadCountHandler(); entry.setRefCnt(1); return entry; } @@ -227,6 +259,9 @@ public ReferenceCounted touch(Object hint) { @Override protected void deallocate() { + if (decreaseReadCountOnRelease && readCountHandler != null) { + readCountHandler.markRead(); + } // This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it if (onDeallocate != null) { try { @@ -240,6 +275,8 @@ protected void deallocate() { ledgerId = -1; entryId = -1; position = null; + readCountHandler = null; + decreaseReadCountOnRelease = true; recyclerHandle.recycle(this); } @@ -248,6 +285,15 @@ public boolean matchesPosition(Position key) { return key != null && key.compareTo(ledgerId, entryId) == 0; } + @Override + public EntryReadCountHandler getReadCountHandler() { + return readCountHandler; + } + + public void setDecreaseReadCountOnRelease(boolean enabled) { + decreaseReadCountOnRelease = enabled; + } + @Override public String toString() { return getClass().getName() + "@" + System.identityHashCode(this) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryReadCountHandlerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryReadCountHandlerImpl.java new file mode 100644 index 0000000000000..e311a36d49458 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryReadCountHandlerImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.bookkeeper.mledger.EntryReadCountHandler; + +public class EntryReadCountHandlerImpl implements EntryReadCountHandler { + private static final AtomicIntegerFieldUpdater expectedReadCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(EntryReadCountHandlerImpl.class, "expectedReadCount"); + + private volatile int expectedReadCount; + + private EntryReadCountHandlerImpl(int expectedReadCount) { + this.expectedReadCount = expectedReadCount; + } + + public int getExpectedReadCount() { + return expectedReadCount; + } + + @Override + public void incrementExpectedReadCount() { + expectedReadCountUpdater.incrementAndGet(this); + } + + @Override + public void markRead() { + expectedReadCountUpdater.decrementAndGet(this); + } + + /** + * Creates an instance of EntryReadCountHandlerImpl if the expected read count is greater than 0. + * If the expected read count is 0 or less, it returns null. + * @param expectedReadCount the expected read count for the entry + * @return an instance of EntryReadCountHandlerImpl or null + */ + public static EntryReadCountHandlerImpl maybeCreate(int expectedReadCount) { + return expectedReadCount > 0 ? new EntryReadCountHandlerImpl(expectedReadCount) : null; + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index ba901ece51c39..5dea65d01a74f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -19,11 +19,13 @@ package org.apache.bookkeeper.mledger.impl; import static java.util.Objects.requireNonNull; -import java.util.ArrayList; import java.util.Iterator; import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.StampedLock; import lombok.Value; import lombok.experimental.UtilityClass; @@ -36,13 +38,12 @@ *

* The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep. *

- * This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with - * an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented - * in a single array. More details about heap implementations: - * here - *

- * The heap is updated and kept sorted when a cursor is updated. - * + * In addition, it allows to track the cursors that are before a given position, so that we can use this + * information to calculate the expected read count for a read that is about to be performed. When + * cacheEvictionByExpectedReadCount=true, the expected read count + * is used for cached entries to determine how many possible times the entry is going to be read before eviction. + * When the cache fills up, the eviction of entries with positive remaining expected read count will be postponed + * until all other entries are evicted. */ public class ManagedCursorContainer implements Iterable { @@ -65,15 +66,30 @@ public static class CursorInfo { long version; } - private static class Item { + // Counter to keep track of the order in which cursors are added. + // This is used to maintain a stable order of cursors that have the same position. + private final AtomicInteger addOrderCounter = new AtomicInteger(0); + + private static class Item implements Comparable { final ManagedCursor cursor; + final int addOrder; Position position; - int idx; - Item(ManagedCursor cursor, Position position, int idx) { + Item(ManagedCursor cursor, Position position, int addOrder) { this.cursor = cursor; + this.addOrder = addOrder; this.position = position; - this.idx = idx; + } + + @Override + public int compareTo(Item o) { + int positionComparison = this.position.compareTo(o.position); + if (positionComparison != 0) { + return positionComparison; + } + // If positions are equal, compare by add order to maintain a stable order + // for cursors that have the same position. + return Integer.compare(this.addOrder, o.addOrder); } } @@ -137,8 +153,8 @@ public static long getNextVersion(long existingVersion) { public ManagedCursorContainer() {} - // Used to keep track of slowest cursor. - private final ArrayList heap = new ArrayList<>(); + // Used to keep track of slowest cursor and the cursors before a given position. + private final NavigableSet sortedByPosition = new TreeSet<>(); // Maps a cursor to its position in the heap private final ConcurrentMap cursors = new ConcurrentSkipListMap<>(); @@ -160,13 +176,10 @@ public ManagedCursorContainer() {} public void add(ManagedCursor cursor, Position position) { long stamp = rwLock.writeLock(); try { - Item item = new Item(cursor, position, position != null ? heap.size() : -1); + Item item = new Item(cursor, position, addOrderCounter.getAndIncrement()); cursors.put(cursor.getName(), item); if (position != null) { - heap.add(item); - if (heap.size() > 1) { - siftUp(item); - } + sortedByPosition.add(item); } if (cursor.isDurable()) { durableCursorCount++; @@ -192,17 +205,8 @@ public boolean removeCursor(String name) { try { Item item = cursors.remove(name); if (item != null) { - if (item.idx >= 0) { - if (heap.size() == 1) { - heap.clear(); - } else { - // Move the item to the right end of the heap to be removed - Item lastItem = heap.get(heap.size() - 1); - swap(item, lastItem); - heap.remove(item.idx); - // Update the heap - siftDown(lastItem); - } + if (item.position != null) { + sortedByPosition.remove(item); } if (item.cursor.isDurable()) { durableCursorCount--; @@ -235,32 +239,33 @@ public Pair cursorUpdated(ManagedCursor cursor, Position new long stamp = rwLock.writeLock(); try { Item item = cursors.get(cursor.getName()); - if (item == null || item.idx == -1) { + if (item == null) { return null; } - - Position previousSlowestConsumer = heap.get(0).position; - item.position = newPosition; - version = DataVersion.getNextVersion(version); - - if (heap.size() == 1) { - return Pair.of(previousSlowestConsumer, item.position); + Position previousSlowestConsumer = internalSlowestReaderPosition(); + // it is necessary to remove the item from the sorted set + // before updating the position, since otherwise sorting would not work correctly + if (item.position != null) { + sortedByPosition.remove(item); } - - // When the cursor moves forward, we need to push it toward the - // bottom of the tree and push it up if a reset was done - if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) { - siftDown(item); - } else { - siftUp(item); + item.position = newPosition; + // if the new position is not null, we add it back to the sorted set + // to maintain the order of cursors by position + if (newPosition != null) { + sortedByPosition.add(item); } - Position newSlowestConsumer = heap.get(0).position; + version = DataVersion.getNextVersion(version); + Position newSlowestConsumer = internalSlowestReaderPosition(); return Pair.of(previousSlowestConsumer, newSlowestConsumer); } finally { rwLock.unlockWrite(stamp); } } + private Position internalSlowestReaderPosition() { + return !sortedByPosition.isEmpty() ? sortedByPosition.first().position : null; + } + /** * Get the slowest reader position for the cursors that are ordered. * @@ -269,7 +274,7 @@ public Pair cursorUpdated(ManagedCursor cursor, Position new public Position getSlowestReaderPosition() { long stamp = rwLock.readLock(); try { - return heap.isEmpty() ? null : heap.get(0).position; + return internalSlowestReaderPosition(); } finally { rwLock.unlockRead(stamp); } @@ -278,7 +283,7 @@ public Position getSlowestReaderPosition() { public ManagedCursor getSlowestReader() { long stamp = rwLock.readLock(); try { - return heap.isEmpty() ? null : heap.get(0).cursor; + return !sortedByPosition.isEmpty() ? sortedByPosition.first().cursor : null; } finally { rwLock.unlockRead(stamp); } @@ -291,10 +296,10 @@ public ManagedCursor getSlowestReader() { public CursorInfo getCursorWithOldestPosition() { long stamp = rwLock.readLock(); try { - if (heap.isEmpty()) { + if (sortedByPosition.isEmpty()) { return null; } else { - Item item = heap.get(0); + Item item = sortedByPosition.first(); return new CursorInfo(item.cursor, item.position, version); } } finally { @@ -387,74 +392,39 @@ public void remove() { }; } - // ////////////////////// - - /** - * Push the item up towards the root of the tree (the lowest reading position). - */ - private void siftUp(Item item) { - Item parent = getParent(item); - while (item.idx > 0 && parent.position.compareTo(item.position) > 0) { - swap(item, parent); - parent = getParent(item); - } - } - - /** - * Push the item down towards the bottom of the tree (the highest reading position). - */ - private void siftDown(final Item item) { - while (true) { - Item j = null; - Item right = getRight(item); - if (right != null && right.position.compareTo(item.position) < 0) { - Item left = getLeft(item); - if (left != null && left.position.compareTo(right.position) < 0) { - j = left; - } else { - j = right; - } + public int getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor cursor) { + long stamp = rwLock.readLock(); + try { + Item item = cursors.get(cursor.getName()); + if (item == null || item.position == null) { + return 0; } else { - Item left = getLeft(item); - if (left != null && left.position.compareTo(item.position) < 0) { - j = left; + int count = 0; + for (Item o : sortedByPosition) { + if (o.position.compareTo(item.position) > 0) { + break; + } + count++; } + return count; } - - if (j != null) { - swap(item, j); - } else { - break; - } + } finally { + rwLock.unlockRead(stamp); } } - /** - * Swap two items in the heap. - */ - private void swap(Item item1, Item item2) { - int idx1 = item1.idx; - int idx2 = item2.idx; - - heap.set(idx2, item1); - heap.set(idx1, item2); - - // Update the indexes too - item1.idx = idx2; - item2.idx = idx1; - } - - private Item getParent(Item item) { - return heap.get((item.idx - 1) / 2); - } - - private Item getLeft(Item item) { - int i = item.idx * 2 + 1; - return i < heap.size() ? heap.get(i) : null; - } - - private Item getRight(Item item) { - int i = item.idx * 2 + 2; - return i < heap.size() ? heap.get(i) : null; + public int size() { + long stamp = rwLock.tryOptimisticRead(); + int size = cursors.size(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + size = cursors.size(); + } finally { + rwLock.unlockRead(stamp); + } + } + return size; } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 9cba4d863e2d6..ea9251de4d471 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -4086,4 +4086,13 @@ void addWaitingCursorRequested(Runnable addWaitingCursorRunnable) { registeredToWaitingCursors = true; } } + public int getNumberOfCursorsAtSamePositionOrBefore() { + if (ledger.getConfig().isCacheEvictionByExpectedReadCount()) { + return ledger.getNumberOfCursorsAtSamePositionOrBefore(this); + } else if (isCacheReadEntry()) { + return 1; + } else { + return 0; + } + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4b630a6b362df..9830d17fbd7ef 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.IntSupplier; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -506,7 +507,7 @@ protected ManagedLedgerInterceptor.LastEntryHandle createLastEntryHandle(LedgerH entries.getEntry(lh.getLastAddConfirmed()); if (ledgerEntry != null) { promise.complete( - Optional.of(EntryImpl.create(ledgerEntry))); + Optional.of(EntryImpl.create(ledgerEntry, 0))); } else { promise.complete(Optional.empty()); } @@ -2296,7 +2297,7 @@ protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCal protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry, Object ctx) { - boolean shouldCacheEntry = opReadEntry.cursor.isCacheReadEntry(); + IntSupplier expectedReadCount = () -> opReadEntry.cursor.getNumberOfCursorsAtSamePositionOrBefore(); if (config.getReadEntryTimeoutSeconds() > 0) { // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); @@ -2304,9 +2305,9 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, opReadEntry, readOpCount, createdTime, ctx); lastReadCallback = readCallback; - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntry, readCallback, readOpCount); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount); } else { - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntry, opReadEntry, ctx); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, opReadEntry, ctx); } } @@ -2468,7 +2469,7 @@ public boolean hasMoreEntries(Position position) { // slowest reader position is earliest mark delete position when cacheEvictionByMarkDeletedPosition=true // it is the earliest read position when cacheEvictionByMarkDeletedPosition=false private void invalidateEntriesUpToSlowestReaderPosition() { - if (entryCache.getSize() <= 0) { + if (entryCache.getSize() <= 0 || config.isCacheEvictionByExpectedReadCount()) { return; } Position slowestReaderPosition = activeCursors.getSlowestReaderPosition(); @@ -2528,7 +2529,7 @@ void onCursorMarkDeletePositionUpdated(ManagedCursorImpl cursor, Position newPos private void updateActiveCursor(ManagedCursorImpl cursor, Position newPosition) { Pair slowestPositions = activeCursors.cursorUpdated(cursor, newPosition); - if (slowestPositions != null + if (!config.isCacheEvictionByExpectedReadCount() && slowestPositions != null && !slowestPositions.getLeft().equals(slowestPositions.getRight())) { invalidateEntriesUpToSlowestReaderPosition(); } @@ -4048,6 +4049,11 @@ private void deactivateCursorByName(String cursorName) { } } + public int getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor cursor) { + return activeCursors.getNumberOfCursorsAtSamePositionOrBefore(cursor); + } + + public void removeWaitingCursor(ManagedCursor cursor) { ((ManagedCursorImpl) cursor).removeWaitingCursorRequested(() -> { // remove only if the cursor has been registered @@ -4814,7 +4820,7 @@ public boolean checkInactiveLedgerAndRollOver() { public void checkCursorsToCacheEntries() { - if (minBacklogCursorsForCaching < 1) { + if (minBacklogCursorsForCaching < 1 || config.isCacheEvictionByExpectedReadCount()) { return; } Iterator it = cursors.iterator(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 2a361fcebf229..2ebc782f3ee75 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -259,13 +259,22 @@ public void run() { long ledgerId = ledger != null ? ledger.getId() : ((Position) ctx).getLedgerId(); // Don't insert to the entry cache for the ShadowManagedLedger - if (!(ml instanceof ShadowManagedLedgerImpl) && ml.hasActiveCursors()) { + if (!(ml instanceof ShadowManagedLedgerImpl)) { + int activeCursorCount = ml.getActiveCursors().size(); // Avoid caching entries if no cursor has been created - EntryImpl entry = EntryImpl.create(ledgerId, entryId, data); - // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling - // insert - ml.entryCache.insert(entry); - entry.release(); + if (activeCursorCount > 0) { + int expectedReadCount = 0; + // only use expectedReadCount if cache eviction is enabled by expected read count + if (ml.getConfig().isCacheEvictionByExpectedReadCount()) { + expectedReadCount = activeCursorCount; + } + EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount); + entry.setDecreaseReadCountOnRelease(false); + // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling + // insert + ml.entryCache.insert(entry); + entry.release(); + } } Position lastEntry = PositionFactory.create(ledgerId, entryId); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java index 5722893983479..b2ebf7430560c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import java.util.function.IntSupplier; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; @@ -78,14 +79,14 @@ public interface EntryCache { * the first entry to read (inclusive) * @param lastEntry * the last entry to read (inclusive) - * @param shouldCacheEntry - * whether the read entry should be cached + * @param expectedReadCount resolves the expected read count for the given entry. When the expected read count is + * >0, the entry can be cached and reused later. * @param callback * the callback object that will be notified when read is done * @param ctx * the context object */ - void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, ReadEntriesCallback callback, Object ctx); /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index afb8091dcb625..9cd63d99f4c85 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.function.IntSupplier; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -67,7 +68,7 @@ public void clear() { } @Override - public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync( ledgerEntries -> { @@ -76,7 +77,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole try { for (LedgerEntry e : ledgerEntries) { // Insert the entries at the end of the list (they will be unsorted for now) - EntryImpl entry = EntryImpl.create(e, interceptor); + EntryImpl entry = EntryImpl.create(e, interceptor, 0); entries.add(entry); totalSize += entry.getLength(); } @@ -109,7 +110,7 @@ public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.Read Iterator iterator = ledgerEntries.iterator(); if (iterator.hasNext()) { LedgerEntry ledgerEntry = iterator.next(); - EntryImpl returnEntry = EntryImpl.create(ledgerEntry, interceptor); + EntryImpl returnEntry = EntryImpl.create(ledgerEntry, interceptor, 0); ml.getMbean().recordReadEntriesOpsCacheMisses(1, returnEntry.getLength()); ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java index c3bd97e5a34f1..5f904f3cf8420 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -275,7 +276,7 @@ private void readEntriesComplete(List callbacks, if (first.startEntry == key.startEntry && first.endEntry == key.endEntry) { // perfect match, no copy, this is the most common case - first.callback.readEntriesComplete((List) entriesToReturn, first.ctx); + first.callback.readEntriesComplete(entriesToReturn, first.ctx); } else { first.callback.readEntriesComplete( keepEntries(entriesToReturn, first.startEntry, first.endEntry), first.ctx); @@ -286,6 +287,8 @@ private void readEntriesComplete(List callbacks, copyEntries(entriesToReturn, callback.startEntry, callback.endEntry), callback.ctx); } for (Entry entry : entriesToReturn) { + // don't decrease the read count when these entries are released + ((EntryImpl) entry).setDecreaseReadCountOnRelease(false); entry.release(); } } @@ -326,7 +329,7 @@ private static List copyEntries(List entriesToReturn, long startEn } - void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + void readEntries(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry); @@ -348,9 +351,9 @@ void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldC continue; } CompletableFuture> readFromLeftFuture = - recursiveReadMissingEntriesAsync(lh, shouldCacheEntry, findBestCandidateOutcome.missingOnLeft); + recursiveReadMissingEntriesAsync(lh, expectedReadCount, findBestCandidateOutcome.missingOnLeft); CompletableFuture> readFromRightFuture = - recursiveReadMissingEntriesAsync(lh, shouldCacheEntry, + recursiveReadMissingEntriesAsync(lh, expectedReadCount, findBestCandidateOutcome.missingOnRight); readFromLeftFuture .thenCombine(readFromMidFuture, (left, mid) -> { @@ -378,20 +381,21 @@ void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldC if (createdByThisThread.get()) { CompletableFuture> readResult = rangeEntryCache.readFromStorage(lh, firstEntry, - lastEntry, shouldCacheEntry); + lastEntry, expectedReadCount); pendingRead.attach(readResult); } } } - private CompletableFuture> recursiveReadMissingEntriesAsync(ReadHandle lh, boolean shouldCacheEntry, + private CompletableFuture> recursiveReadMissingEntriesAsync(ReadHandle lh, + IntSupplier expectedReadCount, PendingReadKey missingKey) { CompletableFuture> future; if (missingKey != null) { future = new CompletableFuture<>(); ReadEntriesCallback callback = new ReadEntriesCallback(future); - rangeEntryCache.asyncReadEntry0(lh, missingKey.startEntry, missingKey.endEntry, - shouldCacheEntry, callback, null, false); + rangeEntryCache.asyncReadEntry0(lh, missingKey.startEntry, missingKey.endEntry, expectedReadCount, callback, + null, false); } else { future = CompletableFuture.completedFuture(Collections.emptyList()); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheRemovalQueue.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheRemovalQueue.java index 31c7e87b21e10..eadbd2ea66158 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheRemovalQueue.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheRemovalQueue.java @@ -19,6 +19,12 @@ package org.apache.bookkeeper.mledger.impl.cache; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.List; +import org.apache.bookkeeper.mledger.ReferenceCountedEntry; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.tuple.Pair; import org.jctools.queues.MpscUnboundedArrayQueue; @@ -32,13 +38,15 @@ class RangeCacheRemovalQueue { private static final int REMOVAL_QUEUE_CHUNK_SIZE = 128 * 1024; private final MpscUnboundedArrayQueue removalQueue = new MpscUnboundedArrayQueue<>( REMOVAL_QUEUE_CHUNK_SIZE); + private final RangeCacheRemovalQueueStash stash = new RangeCacheRemovalQueueStash(); public Pair evictLEntriesBeforeTimestamp(long timestampNanos) { return evictEntries( - (e, c) -> e.timestampNanos < timestampNanos ? EvictionResult.REMOVE : EvictionResult.STOP); + (e, c) -> e.timestampNanos < timestampNanos ? EvictionResult.REMOVE : EvictionResult.STOP, + true); } - public Pair evictLeastAccessedEntries(long sizeToFree) { + public Pair evictLeastAccessedEntries(long sizeToFree, long timestampNanos) { checkArgument(sizeToFree > 0); return evictEntries( (e, c) -> { @@ -46,21 +54,136 @@ public Pair evictLeastAccessedEntries(long sizeToFree) { if (c.removedSize >= sizeToFree) { return EvictionResult.STOP; } + // stash entries that are not evictable and haven't expired + boolean expired = e.timestampNanos < timestampNanos; + // TODO: entry should copy the EntryReadCountHandler reference to avoid recycling issues + if (e.value.hasExpectedReads() && !expired) { + return EvictionResult.STASH; + } return EvictionResult.REMOVE; + }, false); + } + + /** + * Returns the actual size of the removal queue, including entries in the stash. + * This method is used for testing purposes to verify actual size of cached entries. + * This has a performance impact, so it should not be used in production code. + * @return a pair containing the number of entries and their total size in bytes + */ + @VisibleForTesting + public synchronized Pair getNonEvictableSize() { + final MutableInt entries = new MutableInt(0); + final MutableLong bytesSize = new MutableLong(0L); + stash.entries.forEach((entry) -> { + if (entry != null) { + entry.withWriteLock(wrapper -> { + ReferenceCountedEntry value = wrapper.value; + if (value != null && value.hasExpectedReads()) { + entries.increment(); + bytesSize.add(value.getLength()); + } + return null; }); + } + }); + removalQueue.drain((entry) -> { + if (entry != null) { + boolean exists = entry.withWriteLock(wrapper -> { + ReferenceCountedEntry value = wrapper.value; + if (value != null) { + if (value.hasExpectedReads()) { + entries.increment(); + bytesSize.add(wrapper.size); + } + return true; + } + return false; + }); + if (exists) { + // Add the entry to the stash to avoid losing it + stash.add(entry); + } + } + }); + return Pair.of(entries.getValue(), bytesSize.getValue()); } public boolean addEntry(RangeCacheEntryWrapper newWrapper) { return removalQueue.offer(newWrapper); } + class RangeCacheRemovalQueueStash { + // TODO: consider using a more efficient data structure, for example, a linked list of lists + // and keeping a pool of lists to recycle + List entries = new ArrayList<>(); + int size = 0; + int removed = 0; + + public void add(RangeCacheEntryWrapper entry) { + entries.add(entry); + size++; + } + + public boolean evictEntries(EvictionPredicate evictionPredicate, RangeCacheRemovalCounters counters, + boolean processAllEntriesInStash) { + boolean continueEviction = doEvictEntries(evictionPredicate, counters, processAllEntriesInStash); + maybeTrim(); + return continueEviction; + } + + private boolean doEvictEntries(EvictionPredicate evictionPredicate, RangeCacheRemovalCounters counters, + boolean processAllEntriesInStash) { + for (int i = 0; i < entries.size(); i++) { + RangeCacheEntryWrapper entry = entries.get(i); + if (entry == null) { + continue; + } + EvictionResult evictionResult = handleEviction(evictionPredicate, entry, counters); + if (evictionResult.shouldRemoveFromQueue()) { + // mark the entry as deleted + entries.set(i, null); + // recycle the entry after it has been removed + entry.recycle(); + removed++; + } + if (!processAllEntriesInStash && (!evictionResult.isContinueEviction() || Thread.currentThread() + .isInterrupted())) { + return false; + } + } + return true; + } + + void maybeTrim() { + if (removed == size) { + entries.clear(); + size = 0; + removed = 0; + } else if (size > 1000 && removed > size / 2) { + List newEntries = new ArrayList<>(size - removed); + for (RangeCacheEntryWrapper entry : entries) { + if (entry != null) { + newEntries.add(entry); + } + } + entries = newEntries; + size = entries.size(); + removed = 0; + } + } + } + enum EvictionResult { - REMOVE, MISSING, STOP; + REMOVE, STASH, STOP, MISSING; boolean isContinueEviction() { return this != STOP; } + boolean shouldStash() { + return this == STASH; + } + boolean shouldRemoveFromQueue() { return this == REMOVE || this == MISSING; } @@ -78,9 +201,13 @@ interface EvictionPredicate { * @param evictionPredicate the predicate to determine if an entry should be evicted * @return the number of entries and the total size removed from the cache */ - private synchronized Pair evictEntries(EvictionPredicate evictionPredicate) { + private synchronized Pair evictEntries( + EvictionPredicate evictionPredicate, boolean alwaysProcessAllEntriesInStash) { RangeCacheRemovalCounters counters = RangeCacheRemovalCounters.create(); - handleQueue(evictionPredicate, counters); + boolean continueEviction = stash.evictEntries(evictionPredicate, counters, alwaysProcessAllEntriesInStash); + if (continueEviction) { + handleQueue(evictionPredicate, counters); + } return handleRemovalResult(counters); } @@ -92,7 +219,11 @@ private void handleQueue(EvictionPredicate evictionPredicate, break; } EvictionResult evictionResult = handleEviction(evictionPredicate, entry, counters); - if (evictionResult.shouldRemoveFromQueue()) { + if (evictionResult.shouldStash()) { + // remove the peeked entry from the queue + removalQueue.poll(); + stash.add(entry); + } else if (evictionResult.shouldRemoveFromQueue()) { // remove the peeked entry from the queue removalQueue.poll(); // recycle the entry after it has been removed from the queue diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 03aba0e425e7e..d6363ae56673d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import java.util.function.IntSupplier; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -147,7 +148,8 @@ public boolean insert(Entry entry) { cachedData = entry.getDataBuffer().retain(); } - ReferenceCountedEntry cacheEntry = EntryImpl.createWithRetainedDuplicate(position, cachedData); + ReferenceCountedEntry cacheEntry = + EntryImpl.createWithRetainedDuplicate(position, cachedData, entry.getReadCountHandler()); cachedData.release(); if (entries.put(position, cacheEntry)) { totalAddedEntriesSize.add(entryLength); @@ -229,7 +231,7 @@ public void asyncReadEntry(ReadHandle lh, Position position, final ReadEntryCall final Object ctx) { try { asyncReadEntriesByPosition(lh, position, position, 1, - DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY, + () -> DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY ? 1 : 0, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { @@ -256,10 +258,10 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } @Override - public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, final ReadEntriesCallback callback, Object ctx) { try { - asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, true); + asyncReadEntry0(lh, firstEntry, lastEntry, expectedReadCount, callback, ctx, true); } catch (Throwable t) { log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t); // invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt @@ -271,18 +273,18 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole } @SuppressWarnings({ "unchecked", "rawtypes" }) - void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, final ReadEntriesCallback callback, Object ctx, boolean acquirePermits) { final long ledgerId = lh.getId(); final int numberOfEntries = (int) (lastEntry - firstEntry) + 1; final Position firstPosition = PositionFactory.create(ledgerId, firstEntry); final Position lastPosition = PositionFactory.create(ledgerId, lastEntry); - asyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, callback, ctx, + asyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount, callback, ctx, acquirePermits); } void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, - boolean shouldCacheEntry, final ReadEntriesCallback originalCallback, + IntSupplier expectedReadCount, final ReadEntriesCallback originalCallback, Object ctx, boolean acquirePermits) { checkArgument(firstPosition.getLedgerId() == lastPosition.getLedgerId(), "Invalid range. Entries %s and %s should be in the same ledger.", @@ -298,7 +300,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter(); if (!acquirePermits || pendingReadsLimiter.isDisabled()) { - doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, + doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount, originalCallback, ctx); } else { long estimatedEntrySize = getEstimatedEntrySize(lh); @@ -314,19 +316,19 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position // or timeout ml.getExecutor().execute(() -> { doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, lastPosition, numberOfEntries, - shouldCacheEntry, originalCallback, ctx, handle, estimatedReadSize); + expectedReadCount, originalCallback, ctx, handle, estimatedReadSize); }); }); // permits were immediately available and acquired if (optionalHandle.isPresent()) { doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, lastPosition, numberOfEntries, - shouldCacheEntry, originalCallback, ctx, optionalHandle.get(), estimatedReadSize); + expectedReadCount, originalCallback, ctx, optionalHandle.get(), estimatedReadSize); } } } void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position firstPosition, Position lastPosition, - int numberOfEntries, boolean shouldCacheEntry, + int numberOfEntries, IntSupplier expectedReadCount, final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle, long estimatedReadSize) { if (!handle.success()) { @@ -366,12 +368,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx2) { originalCallback.readEntriesFailed(exception, ctx2); } }; - doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, + doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, expectedReadCount, wrappedCallback, ctx); } void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, - boolean shouldCacheEntry, final ReadEntriesCallback callback, + IntSupplier expectedReadCount, final ReadEntriesCallback callback, Object ctx) { Collection cachedEntries; if (firstPosition.compareTo(lastPosition) == 0) { @@ -412,7 +414,7 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio // Read all the entries from bookkeeper pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), lastPosition.getEntryId(), - shouldCacheEntry, callback, ctx); + expectedReadCount, callback, ctx); } } @@ -436,11 +438,11 @@ private long getAvgEntrySize() { * @param lh the handle * @param firstEntry the first entry * @param lastEntry the last entry - * @param shouldCacheEntry if we should put the entry into the cache + * @param expectedReadCount if we should put the entry into the cache * @return a handle to the operation */ CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, - boolean shouldCacheEntry) { + IntSupplier expectedReadCount) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) .thenApply( @@ -451,12 +453,14 @@ CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, l try { // We got the entries, we need to transform them to a List<> type long totalSize = 0; + int expectedReadCountVal = expectedReadCount.getAsInt(); final List entriesToReturn = new ArrayList<>(entriesToRead); for (LedgerEntry e : ledgerEntries) { - EntryImpl entry = EntryImpl.create(e, interceptor); + EntryImpl entry = EntryImpl.create(e, interceptor, expectedReadCountVal); entriesToReturn.add(entry); totalSize += entry.getLength(); - if (shouldCacheEntry) { + // TODO: is it useful to cache entries with expected read count of 1? + if (expectedReadCountVal > 0) { insert(entry); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerEvictionHandler.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerEvictionHandler.java index ff408bbca765b..d999bef6824e7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerEvictionHandler.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerEvictionHandler.java @@ -48,11 +48,12 @@ public void invalidateEntriesBeforeTimestampNanos(long timestamp) { * Force the cache to drop entries to free space. * * @param sizeToFree the total memory size to free + * @param timestamp the timestamp before which entries which aren't evictable will be evicted * @return a pair containing the number of entries evicted and their total size */ - public Pair evictEntries(long sizeToFree) { + public Pair evictEntries(long sizeToFree, long timestamp) { checkArgument(sizeToFree > 0); - Pair evicted = rangeCacheRemovalQueue.evictLeastAccessedEntries(sizeToFree); + Pair evicted = rangeCacheRemovalQueue.evictLeastAccessedEntries(sizeToFree, timestamp); int evictedEntries = evicted.getLeft(); long evictedSize = evicted.getRight(); if (log.isDebugEnabled()) { @@ -64,4 +65,8 @@ public Pair evictEntries(long sizeToFree) { manager.entriesRemoved(evictedSize, evictedEntries); return evicted; } + + public Pair getNonEvictableSize() { + return rangeCacheRemovalQueue.getNonEvictableSize(); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java index 1e81d3166dcfd..57b7124fbf252 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import com.google.common.annotations.VisibleForTesting; import io.opentelemetry.api.OpenTelemetry; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -31,6 +32,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,7 +167,8 @@ private void doEvictToWatermarkWhenOverThreshold() { long startTime = System.nanoTime(); log.info("Triggering cache eviction. total size: {} Mb -- Need to discard: {} Mb", currentSize / MB, sizeToEvict / MB); - evictionHandler.evictEntries(sizeToEvict); + long maxTimestampNanos = startTime - mlFactory.getCacheEvictionTimeThreshold(); + evictionHandler.evictEntries(sizeToEvict, maxTimestampNanos); long endTime = System.nanoTime(); double durationMs = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); log.info("Eviction completed. Removed {} Mb in {} ms", (currentSize - this.currentSize.get()) / MB, @@ -193,6 +196,11 @@ public long getSize() { return currentSize.get(); } + @VisibleForTesting + public Pair getNonEvictableSize() { + return evictionHandler.getNonEvictableSize(); + } + @Override public long getMaxSize() { return maxSize; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 9cc4883b88ccb..f4c0da28511a9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -293,6 +293,7 @@ public void verifyHitsMisses() throws Exception { config.setCacheEvictionIntervalMs(1000); ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setCacheEvictionByExpectedReadCount(false); managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(false); @Cleanup("shutdown") diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index a7ce874d7c32d..0fe889a2da5ec 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; +import java.util.function.IntSupplier; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -81,7 +82,7 @@ public void testRead() throws Exception { } when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 9)); - final var entries = readEntry(entryCache, lh, 0, 9, false, null); + final var entries = readEntry(entryCache, lh, 0, 9, () -> 0, null); assertEquals(entries.size(), 10); entries.forEach(Entry::release); @@ -105,7 +106,7 @@ public void testReadMissingBefore() throws Exception { } when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 9)); - final var entries = readEntry(entryCache, lh, 0, 9, false, null); + final var entries = readEntry(entryCache, lh, 0, 9, () -> 0, null); assertEquals(entries.size(), 10); } @@ -124,7 +125,7 @@ public void testReadMissingAfter() throws Exception { } when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 9)); - final var entries = readEntry(entryCache, lh, 0, 9, false, null); + final var entries = readEntry(entryCache, lh, 0, 9, () -> 0, null); assertEquals(entries.size(), 10); } @@ -144,7 +145,7 @@ public void testReadMissingMiddle() throws Exception { entryCache.insert(EntryImpl.create(0, 9, data)); when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 9)); - final var entries = readEntry(entryCache, lh, 0, 9, false, null); + final var entries = readEntry(entryCache, lh, 0, 9, () -> 0, null); assertEquals(entries.size(), 10); } @@ -164,7 +165,7 @@ public void testReadMissingMultiple() throws Exception { entryCache.insert(EntryImpl.create(0, 8, data)); when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 9)); - final var entries = readEntry(entryCache, lh, 0, 9, false, null); + final var entries = readEntry(entryCache, lh, 0, 9, () -> 0, null); assertEquals(entries.size(), 10); } @@ -177,25 +178,25 @@ public void testCachedReadReturnsDifferentByteBuffer() throws Exception { @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); - readEntry(entryCache, lh, 0, 1, true, e -> { + readEntry(entryCache, lh, 0, 1, () -> 1, e -> { assertTrue(e instanceof ManagedLedgerException); assertTrue(e.getMessage().contains("LastConfirmedEntry is null when reading ledger 0")); }); when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(-1, -1)); - readEntry(entryCache, lh, 0, 1, true, e -> { + readEntry(entryCache, lh, 0, 1, () -> 1, e -> { assertTrue(e instanceof ManagedLedgerException); assertTrue(e.getMessage().contains("LastConfirmedEntry is -1:-1 when reading ledger 0")); }); when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 0)); - readEntry(entryCache, lh, 0, 1, true, e -> { + readEntry(entryCache, lh, 0, 1, () -> 1, e -> { assertTrue(e instanceof ManagedLedgerException); assertTrue(e.getMessage().contains("LastConfirmedEntry is 0:0 when reading entry 1")); }); when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 1)); - List cacheMissEntries = readEntry(entryCache, lh, 0, 1, true, null); + List cacheMissEntries = readEntry(entryCache, lh, 0, 1, () -> 1, null); // Ensure first entry is 0 and assertEquals(cacheMissEntries.size(), 2); assertEquals(cacheMissEntries.get(0).getEntryId(), 0); @@ -204,7 +205,7 @@ public void testCachedReadReturnsDifferentByteBuffer() throws Exception { // Move the reader index to simulate consumption cacheMissEntries.get(0).getDataBuffer().readerIndex(10); - List cacheHitEntries = readEntry(entryCache, lh, 0, 1, true, null); + List cacheHitEntries = readEntry(entryCache, lh, 0, 1, () -> 1, null); assertEquals(cacheHitEntries.get(0).getEntryId(), 0); assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0); } @@ -228,7 +229,7 @@ public void testReadWithError() throws Exception { entryCache.insert(EntryImpl.create(0, 2, data)); when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 9)); - readEntry(entryCache, lh, 0, 9, false, e -> + readEntry(entryCache, lh, 0, 9, () -> 0, e -> assertTrue(e instanceof ManagedLedgerException.LedgerNotExistException)); } @@ -252,20 +253,21 @@ static ReadHandle getLedgerHandle() { } private List readEntry(EntryCache entryCache, ReadHandle lh, long firstEntry, long lastEntry, - boolean shouldCacheEntry, Consumer assertion) + IntSupplier expectedReadCount, Consumer assertion) throws InterruptedException { final var future = new CompletableFuture>(); - entryCache.asyncReadEntry(lh, firstEntry, lastEntry, shouldCacheEntry, new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - future.complete(entries); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); + entryCache.asyncReadEntry(lh, firstEntry, lastEntry, expectedReadCount, + new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + future.complete(entries); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); try { final var entries = future.get(); assertNull(assertion); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryImplTest.java index 618a6f88eccd3..eac61f5d88ec3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryImplTest.java @@ -43,7 +43,7 @@ public void testCreateWithLedgerIdEntryIdAndByteBuf() { ByteBuf data = Unpooled.wrappedBuffer(testData); // When - EntryImpl entry = EntryImpl.create(ledgerId, entryId, data); + EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, 1); try { // Then @@ -65,7 +65,7 @@ public void testCreateWithPositionAndByteBuf() { ByteBuf data = Unpooled.wrappedBuffer(testData); // When - EntryImpl entry = EntryImpl.create(position, data); + EntryImpl entry = EntryImpl.create(position, data, 1); try { // Then @@ -88,7 +88,7 @@ public void testCreateWithRetainedDuplicate() { ByteBuf data = Unpooled.wrappedBuffer(testData); // When - EntryImpl entry = EntryImpl.createWithRetainedDuplicate(position, data); + EntryImpl entry = EntryImpl.createWithRetainedDuplicate(position, data, 1); try { // Then @@ -109,7 +109,7 @@ public void testCreateFromAnotherEntryImpl() { long entryId = 222L; byte[] testData = "original-entry-data".getBytes(); ByteBuf originalData = Unpooled.wrappedBuffer(testData); - EntryImpl originalEntry = EntryImpl.create(ledgerId, entryId, originalData); + EntryImpl originalEntry = EntryImpl.create(ledgerId, entryId, originalData, 1); try { // When @@ -170,7 +170,7 @@ public void testCreateWithEmptyData() { ByteBuf data = Unpooled.EMPTY_BUFFER; // When - EntryImpl entry = EntryImpl.create(ledgerId, entryId, data); + EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, 1); try { // Then @@ -215,7 +215,7 @@ public void testCreateFromEntryImplWhereGetPositionHasBeenCalled() { public void testCreateWithPositionThatIsntImmutable() { // Given Position position = new AckSetPositionImpl(1L, 2L, new long[0]); - EntryImpl entry = EntryImpl.create(position, Unpooled.EMPTY_BUFFER); + EntryImpl entry = EntryImpl.create(position, Unpooled.EMPTY_BUFFER, 1); // Expect that the position is different since it's not immutable assertNotSame(entry.getPosition(), position); @@ -228,7 +228,7 @@ public void testCreateWithPositionThatIsntImmutable() { public void testCreateWithPositionThatIsImmutable() { // Given Position position = PositionFactory.create(1L, 2L); - EntryImpl entry = EntryImpl.create(position, Unpooled.EMPTY_BUFFER); + EntryImpl entry = EntryImpl.create(position, Unpooled.EMPTY_BUFFER, 1); // Expect that the position is same since it's immutable assertSame(entry.getPosition(), position); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java index 17601946355d4..01a8808a2b96c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java @@ -139,7 +139,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { // Initialize "entryCache.estimatedEntrySize" to the correct value. Object ctx = new Object(); SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback(); - entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx); + entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, () -> 1, cb0, ctx); cb0.entries.join(); int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue(); Awaitility.await().untilAsserted(() -> { @@ -153,7 +153,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { SimpleReadEntriesCallback cb1 = new SimpleReadEntriesCallback(); SimpleReadEntriesCallback cb2 = new SimpleReadEntriesCallback(); threadFactory.newThread(() -> { - entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, true, cb1, ctx); + entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, () -> 1, cb1, ctx); }).start(); threadFactory.newThread(() -> { try { @@ -161,7 +161,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { } catch (InterruptedException e) { throw new RuntimeException(e); } - entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx); + entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, () -> 1, cb2, ctx); }).start(); long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 55abd2d3e9d5d..d3e118033ea59 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -871,4 +871,39 @@ public void testSlowestReader() { private static ManagedCursor createCursor(ManagedCursorContainer container, String name, Position position) { return new MockManagedCursor(container, name, position, position, false, true); } + + @Test + public void testCountNumberOfCursorsAtSamePositionOrBefore() { + ManagedCursorContainer container = new ManagedCursorContainer(); + List cursors = IntStream.rangeClosed(1, 1000) + .mapToObj(idx -> createCursor(container, "cursor" + idx, PositionFactory.create(0, idx))) + .collect(Collectors.toList()); + // randomize adding order + Collections.shuffle(cursors); + cursors.forEach(cursor -> container.add(cursor, cursor.getReadPosition())); + for (int i = 1; i <= 1000; i++) { + ManagedCursor cursor = container.get("cursor" + i); + int numberOfCursorsBefore = container.getNumberOfCursorsAtSamePositionOrBefore(cursor); + assertThat(numberOfCursorsBefore).describedAs("cursor:%s", cursor).isEqualTo(i); + } + } + + @Test + public void testCountNumberOfCursorsAtSamePositionOrBefore_SamePosition() { + ManagedCursorContainer container = new ManagedCursorContainer(); + addCursor(container, "cursor1", PositionFactory.create(0, 1)); + addCursor(container, "cursor2", PositionFactory.create(0, 2)); + for (int i = 3; i <= 998; i++) { + addCursor(container, "cursor" + i, PositionFactory.create(0, 3)); + } + addCursor(container, "cursor999", PositionFactory.create(0, 4)); + addCursor(container, "cursor1000", PositionFactory.create(0, 5)); + ManagedCursor cursor = container.get("cursor4"); + assertThat(container.getNumberOfCursorsAtSamePositionOrBefore(cursor)).isEqualTo(998); + } + + private static void addCursor(ManagedCursorContainer container, String name, Position position) { + container.add(createCursor(container, name, position), position); + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index a88605dbb3479..6196a59b174be 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2932,7 +2932,7 @@ private static EntryImpl createEntry(Position p1) { } private static EntryImpl createEntryAndReleaseBuffer(Position p1, ByteBuf buffer) { - EntryImpl entry = EntryImpl.create(p1, buffer); + EntryImpl entry = EntryImpl.create(p1, buffer, 0); buffer.release(); return entry; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 91cac258bc2a4..f2aa986ac6c83 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -1852,6 +1852,7 @@ public void invalidateEntriesFromCacheByMarkDeletePosition() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); initManagedLedgerConfig(config); config.setCacheEvictionByMarkDeletedPosition(true); + config.setCacheEvictionByExpectedReadCount(false); ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateEntriesFromCacheByMarkDeletePosition", config); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java index 5656aa7cac121..9c6c79eb95b40 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java @@ -21,13 +21,14 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; -import static org.testng.AssertJUnit.assertSame; import io.opentelemetry.api.OpenTelemetry; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -43,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.IntSupplier; import java.util.stream.Collectors; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -112,14 +114,14 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ReadHandle rh = invocationOnMock.getArgument(0); long startEntry = invocationOnMock.getArgument(1); long endEntry = invocationOnMock.getArgument(2); - boolean shouldCacheEntry = invocationOnMock.getArgument(3); + IntSupplier expectedReadCount = invocationOnMock.getArgument(3); AsyncCallbacks.ReadEntriesCallback callback = invocationOnMock.getArgument(4); Object ctx = invocationOnMock.getArgument(5); - pendingReadsManager.readEntries(lh, startEntry, endEntry, shouldCacheEntry, callback, ctx); + pendingReadsManager.readEntries(lh, startEntry, endEntry, expectedReadCount, callback, ctx); return null; } }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(), - anyBoolean(), any(), any(), anyBoolean()); + any(), any(), any(), anyBoolean()); lh = mock(ReadHandle.class); ml = mock(ManagedLedgerImpl.class); @@ -176,17 +178,17 @@ private void verifyRange(List entries, long firstEntry, long endEntry) private static class PreparedReadFromStorage extends CompletableFuture> { final long firstEntry; final long endEntry; - final boolean shouldCacheEntry; + final IntSupplier expectedReadCount; - public PreparedReadFromStorage(long firstEntry, long endEntry, boolean shouldCacheEntry) { + public PreparedReadFromStorage(long firstEntry, long endEntry, IntSupplier expectedReadCount) { this.firstEntry = firstEntry; this.endEntry = endEntry; - this.shouldCacheEntry = shouldCacheEntry; + this.expectedReadCount = expectedReadCount; } @Override public String toString() { - return "PreparedReadFromStorage(" + firstEntry + "," + endEntry + "," + shouldCacheEntry + ")"; + return "PreparedReadFromStorage(" + firstEntry + "," + endEntry + "," + expectedReadCount + ")"; } public void storageReadCompleted() { @@ -195,13 +197,16 @@ public void storageReadCompleted() { } private PreparedReadFromStorage prepareReadFromStorage(ReadHandle lh, RangeEntryCacheImpl rangeEntryCache, - long firstEntry, long endEntry, boolean shouldCacheEntry) { - PreparedReadFromStorage read = new PreparedReadFromStorage(firstEntry, endEntry, shouldCacheEntry); - log.info("prepareReadFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry); - when(rangeEntryCache.readFromStorage(eq(lh), eq(firstEntry), eq(endEntry), eq(shouldCacheEntry))).thenAnswer( + long firstEntry, long endEntry, + IntSupplier expectedReadCount) { + PreparedReadFromStorage read = new PreparedReadFromStorage(firstEntry, endEntry, expectedReadCount); + log.info("prepareReadFromStorage from {} to {} expectedReadCount {}", firstEntry, endEntry, expectedReadCount); + when(rangeEntryCache.readFromStorage(eq(lh), eq(firstEntry), eq(endEntry), + argThat(expectedReadCountArg -> expectedReadCountArg.getAsInt() + == expectedReadCount.getAsInt()))).thenAnswer( (invocationOnMock -> { - log.info("readFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, - shouldCacheEntry); + log.info("readFromStorage from {} to {} expectedReadCount {}", firstEntry, endEntry, + expectedReadCount); entryRangeReadCount.computeIfAbsent(Pair.of(firstEntry, endEntry), __ -> new AtomicInteger(0)) .getAndIncrement(); return read; @@ -215,13 +220,13 @@ public void simpleRead() throws Exception { long firstEntry = 100; long endEntry = 199; - boolean shouldCacheEntry = false; + IntSupplier expectedReadCount = () -> 0; PreparedReadFromStorage read1 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); // complete the read read1.storageReadCompleted(); @@ -240,17 +245,17 @@ public void simpleConcurrentReadPerfectMatch() throws Exception { long firstEntry = 100; long endEntry = 199; - boolean shouldCacheEntry = false; + IntSupplier expectedReadCount = () -> 0; PreparedReadFromStorage read1 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback2, CTX2); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback2, CTX2); // complete the read from BK // only one read completes 2 callbacks @@ -282,19 +287,19 @@ public void simpleConcurrentReadIncluding() throws Exception { long firstEntrySecondRead = firstEntry + 10; long endEntrySecondRead = endEntry - 10; - boolean shouldCacheEntry = false; + IntSupplier expectedReadCount = () -> 0; PreparedReadFromStorage read1 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, - shouldCacheEntry, callback2, CTX2); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, + CTX2); // complete the read from BK // only one read completes 2 callbacks @@ -329,21 +334,21 @@ public void simpleConcurrentReadMissingLeft() throws Exception { long firstEntrySecondRead = firstEntry - 10; long endEntrySecondRead = endEntry; - boolean shouldCacheEntry = false; + IntSupplier expectedReadCount = () -> 0; PreparedReadFromStorage read1 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PreparedReadFromStorage readForLeft = - prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, - endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, + CTX2); // complete the read from BK read1.storageReadCompleted(); @@ -370,21 +375,21 @@ public void simpleConcurrentReadMissingRight() throws Exception { long firstEntrySecondRead = firstEntry; long endEntrySecondRead = endEntry + 10; - boolean shouldCacheEntry = false; + IntSupplier expectedReadCount = () -> 0; PreparedReadFromStorage read1 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PreparedReadFromStorage readForRight = - prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, - endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, + CTX2); // complete the read from BK read1.storageReadCompleted(); @@ -411,24 +416,24 @@ public void simpleConcurrentReadMissingBoth() throws Exception { long firstEntrySecondRead = firstEntry - 10; long endEntrySecondRead = endEntry + 10; - boolean shouldCacheEntry = false; + IntSupplier expectedReadCount = () -> 0; PreparedReadFromStorage read1 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PreparedReadFromStorage readForLeft = - prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, expectedReadCount); PreparedReadFromStorage readForRight = - prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, - endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, + CTX2); // complete the read from BK read1.storageReadCompleted(); @@ -456,21 +461,22 @@ public void simpleConcurrentReadNoMatch() throws Exception { long firstEntrySecondRead = 1000; long endEntrySecondRead = 1099; - boolean shouldCacheEntry = false; + IntSupplier expectedReadCount = () -> 0; PreparedReadFromStorage read1 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, expectedReadCount); PreparedReadFromStorage read2 = - prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry); + prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, endEntrySecondRead, + expectedReadCount); PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, expectedReadCount, callback, CTX); CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntrySecondRead, - endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, expectedReadCount, callback2, + CTX2); read1.storageReadCompleted(); callback.get(); @@ -491,11 +497,11 @@ public void concurrentReadOnOverlappedEntryRanges() throws Exception { final var readFutures = new ArrayList(); final BiConsumer readEntries = (firstEntry, lastEntry) -> { final var callback = new CapturingReadEntriesCallback(); - pendingReadsManager.readEntries(lh, firstEntry, lastEntry, false, callback, CTX); + pendingReadsManager.readEntries(lh, firstEntry, lastEntry, () -> 0, callback, CTX); readFutures.add(callback); }; final BiFunction mockReadFromStorage = (firstEntry, lastEntry) -> - prepareReadFromStorage(lh, rangeEntryCache, firstEntry, lastEntry, false); + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, lastEntry, () -> 0); final var read0 = mockReadFromStorage.apply(10L, 70L); readEntries.accept(10L, 70L); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheTest.java index 1e9e0862e2bb4..c15bc699f1c06 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheTest.java @@ -107,7 +107,7 @@ private static ReferenceCountedEntry createCachedEntry(int i, String str) { } private static ReferenceCountedEntry createCachedEntry(Position position, String str) { - return EntryImpl.create(position, Unpooled.wrappedBuffer(str.getBytes())); + return EntryImpl.create(position, Unpooled.wrappedBuffer(str.getBytes()), 0); } private static Position createPosition(int i) { @@ -237,7 +237,7 @@ public void eviction() { putToCache(cache, 3, "three"); // This should remove the LRU entries: 0, 1 whose combined size is 7 - assertEquals(removalQueue.evictLeastAccessedEntries(5), Pair.of(2, (long) 7)); + assertEquals(removalQueue.evictLeastAccessedEntries(5, 0), Pair.of(2, (long) 7)); assertEquals(cache.getNumberOfEntries(), 2); assertEquals(cache.getSize(), 8); @@ -246,7 +246,7 @@ public void eviction() { assertEquals(cache.get(createPosition(2)).getData(), "two".getBytes()); assertEquals(cache.get(createPosition(3)).getData(), "three".getBytes()); - assertEquals(removalQueue.evictLeastAccessedEntries(100), Pair.of(2, (long) 8)); + assertEquals(removalQueue.evictLeastAccessedEntries(100, 0), Pair.of(2, (long) 8)); assertEquals(cache.getNumberOfEntries(), 0); assertEquals(cache.getSize(), 0); assertNull(cache.get(createPosition(0))); @@ -255,14 +255,14 @@ public void eviction() { assertNull(cache.get(createPosition(3))); try { - removalQueue.evictLeastAccessedEntries(0); + removalQueue.evictLeastAccessedEntries(0, 0); fail("should throw exception"); } catch (IllegalArgumentException e) { // ok } try { - removalQueue.evictLeastAccessedEntries(-1); + removalQueue.evictLeastAccessedEntries(-1, 0); fail("should throw exception"); } catch (IllegalArgumentException e) { // ok @@ -282,19 +282,19 @@ public void evictions() { } assertEquals(cache.getSize(), expectedSize); - Pair res = removalQueue.evictLeastAccessedEntries(1); + Pair res = removalQueue.evictLeastAccessedEntries(1, 0); assertEquals((int) res.getLeft(), 1); assertEquals((long) res.getRight(), 1); expectedSize -= 1; assertEquals(cache.getSize(), expectedSize); - res = removalQueue.evictLeastAccessedEntries(10); + res = removalQueue.evictLeastAccessedEntries(10, 0); assertEquals((int) res.getLeft(), 10); assertEquals((long) res.getRight(), 11); expectedSize -= 11; assertEquals(cache.getSize(), expectedSize); - res = removalQueue.evictLeastAccessedEntries(expectedSize); + res = removalQueue.evictLeastAccessedEntries(expectedSize, 0); assertEquals((int) res.getLeft(), 89); assertEquals((long) res.getRight(), expectedSize); assertEquals(cache.getSize(), 0); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 862015479567d..f4619955ea18b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -94,6 +94,7 @@ public final void setUp(Method method) throws Exception { } protected ManagedLedgerConfig initManagedLedgerConfig(ManagedLedgerConfig config) { + config.setCacheEvictionByExpectedReadCount(false); return config; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a1c7134eadfd3..f52d741624cc2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3471,6 +3471,14 @@ public double getLoadBalancerBandwidthOutResourceWeight() { ) private boolean cacheEvictionByMarkDeletedPosition = false; + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Evicting cache data by expected read count. Expected read count is calculated by the number of " + + "active cursors with a read position that is behind the position of the cached entry. " + + "This setting will override the cacheEvictionByMarkDeletedPosition setting." + ) + private boolean cacheEvictionByExpectedReadCount = true; + /**** --- Transaction config variables. --- ****/ @FieldContext( category = CATEGORY_TRANSACTION, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f67a16ed8da7e..fc8720acf0533 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2107,9 +2107,14 @@ public CompletableFuture getManagedLedgerConfig(@NonNull To managedLedgerConfig.setInactiveOffloadedLedgerEvictionTime( serviceConfig.getManagedLedgerInactiveOffloadedLedgerEvictionTimeSeconds(), TimeUnit.SECONDS); - - managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( - serviceConfig.isCacheEvictionByMarkDeletedPosition()); + if (serviceConfig.isCacheEvictionByExpectedReadCount()) { + managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(false); + managedLedgerConfig.setCacheEvictionByExpectedReadCount(true); + } else { + managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( + serviceConfig.isCacheEvictionByMarkDeletedPosition()); + managedLedgerConfig.setCacheEvictionByExpectedReadCount(false); + } managedLedgerConfig.setMinimumBacklogCursorsForCaching( serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching()); managedLedgerConfig.setMinimumBacklogEntriesForCaching( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java index cf1802e3853ac..33abddc300b43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java @@ -23,6 +23,7 @@ import java.util.function.ToIntFunction; import lombok.Getter; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.EntryReadCountHandler; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; @@ -136,4 +137,9 @@ public int getCachedStickyKeyHash() { public Entry unwrap() { return entry; } + + @Override + public EntryReadCountHandler getReadCountHandler() { + return entry.getReadCountHandler(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 9e92a2ab40dc1..080f5acbf165f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -466,6 +466,10 @@ private Map> filterAndGroupEntriesForDispatching(List addConsumerInternal(Consumer consumer) { return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> { synchronized (PersistentSubscription.this) { cursor.updateLastActive(); + if (!cursor.isActive() + && (topic.getManagedLedger().getConfig().isCacheEvictionByExpectedReadCount() + || getCursor().getNumberOfEntries() < topic.getBackloggedCursorThresholdEntries())) { + cursor.setActive(); + } + if (IS_FENCED_UPDATER.get(this) == TRUE) { log.warn("Attempting to add consumer {} on a fenced subscription", consumer); return FutureUtil.failedFuture(new SubscriptionFencedException("Subscription is fenced")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5602cd679512b..6cfb346781262 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -237,6 +237,7 @@ public static boolean isDedupCursorName(String name) { private Optional dispatchRateLimiter = Optional.empty(); private final Object dispatchRateLimiterLock = new Object(); private Optional subscribeRateLimiter = Optional.empty(); + @Getter private final long backloggedCursorThresholdEntries; public static final int MESSAGE_RATE_BACKOFF_MS = 1000; @@ -3417,6 +3418,10 @@ public void checkBackloggedCursors() { } private void checkBackloggedCursor(PersistentSubscription subscription) { + // ignore cursor inactivating a cursor based on backlog size when eviction is based on expected read count + if (getManagedLedger().getConfig().isCacheEvictionByExpectedReadCount()) { + return; + } // activate caught up cursor which include consumers if (!subscription.getConsumers().isEmpty() && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionEntryImpl.java index ab18373d859d8..51b4b37081ea0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionEntryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionEntryImpl.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.EntryReadCountHandler; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.transaction.buffer.TransactionEntry; import org.apache.pulsar.client.api.transaction.TxnID; @@ -136,4 +137,9 @@ public long getEntryId() { public boolean release() { return this.entry.release(); } + + @Override + public EntryReadCountHandler getReadCountHandler() { + return entry.getReadCountHandler(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BacklogConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BacklogConsumerTest.java index 5bd2a213523d4..585c14b422bc4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BacklogConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BacklogConsumerTest.java @@ -26,6 +26,7 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -57,6 +58,13 @@ protected void cleanup() throws Exception { internalCleanup(); } + @Override + protected ServiceConfiguration getDefaultConf() { + ServiceConfiguration defaultConf = super.getDefaultConf(); + defaultConf.setCacheEvictionByExpectedReadCount(false); + return defaultConf; + } + @DataProvider(name = "ackReceiptEnabled") public Object[][] ackReceiptEnabled() { return new Object[][] { { true }, { false } }; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/MinimumBacklogCacheStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/MinimumBacklogCacheStrategyTest.java index 3bb068def2806..9505d20831e58 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/MinimumBacklogCacheStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/MinimumBacklogCacheStrategyTest.java @@ -67,6 +67,7 @@ protected ServiceConfiguration getDefaultConf() { defaultConf.setManagedLedgerMinimumBacklogCursorsForCaching(2); defaultConf.setManagedLedgerMinimumBacklogEntriesForCaching(10); defaultConf.setManagedLedgerCacheEvictionTimeThresholdMillis(60 * 1000); + defaultConf.setCacheEvictionByExpectedReadCount(false); return defaultConf; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 9df9ff8386a8e..7500deb56d996 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -23,10 +23,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -82,16 +80,17 @@ import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl; import org.apache.commons.lang3.RandomUtils; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; @@ -154,9 +153,22 @@ protected void setup() throws Exception { super.producerBaseSetup(); } + @Override + protected ServiceConfiguration getDefaultConf() { + ServiceConfiguration defaultConf = super.getDefaultConf(); + configureDefaults(defaultConf); + return defaultConf; + } + + private void configureDefaults(ServiceConfiguration defaultConf) { + // Set the default managed ledger cache eviction time threshold to 60 seconds + defaultConf.setManagedLedgerCacheEvictionTimeThresholdMillis(60000L); + } + @AfterMethod(alwaysRun = true) public void cleanupAfterMethod() throws Exception { try { + pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(null); pulsar.getConfiguration().setForceDeleteTenantAllowed(true); pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); @@ -173,6 +185,7 @@ public void cleanupAfterMethod() throws Exception { pulsar.getConfiguration().setForceDeleteTenantAllowed(false); pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + configureDefaults(pulsar.getConfiguration()); super.producerBaseSetup(); } catch (Exception | AssertionError e) { log.warn("Failed to clean up state. Restarting broker.", e); @@ -1163,13 +1176,13 @@ protected void beforePulsarStart(PulsarService pulsar) throws Exception { } /** - * Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages - - * EntryCache should be cleaned : Once active subscription consumes messages. + * Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should be used - + * entries should be eligible for eviction : Once active subscription consumes messages. * - * Usecase 2: 2 Active Subscriptions (faster and slower) and slower gets closed - 2 subscribers - Produce Messages - - * 1 faster-subscriber consumes all messages and another slower-subscriber none - EntryCache should have cached - * messages as slower-subscriber has not consumed messages yet - close slower-subscriber - EntryCache should be - * cleared + * Usecase 2: 2 Active Subscriptions (faster and slower) - 2 subscribers - Produce Messages - + * 1 faster-subscriber consumes all messages and another slower-subscriber none - cache should have cached + * messages as slower-subscriber has not consumed messages yet - consume messages in slower-subscriber - + * entries should be eligible for eviction * * @throws Exception */ @@ -1177,19 +1190,26 @@ protected void beforePulsarStart(PulsarService pulsar) throws Exception { public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { log.info("-- Starting {} test --", methodName); - final long batchMessageDelayMs = 100; final int receiverSize = 10; - final String topicName = "cache-topic"; + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/cache-topic"); final String sub1 = "faster-sub1"; final String sub2 = "slower-sub2"; + // Make the test fail if there's a cache miss + pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor((ledgerId, firstEntry, lastEntry, entries) -> { + log.error("Attempting to read from BK when cache should be used. {}:{} to {}:{}", ledgerId, firstEntry, + ledgerId, lastEntry); + return CompletableFuture.failedFuture( + new ManagedLedgerException.NonRecoverableLedgerException( + "Should not read from BK since cache should be used.")); + }); + /************ usecase-1: *************/ // 1. Subscriber Faster subscriber Consumer subscriber1 = pulsarClient.newConsumer() - .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1) + .topic(topic).subscriptionName(sub1) .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe(); - final String topic = "persistent://my-property/my-ns/" + topicName; ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topic); producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); @@ -1200,7 +1220,8 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); - EntryCache entryCache = (EntryCache) FieldUtils.readField(ledger, "entryCache", true); + RangeEntryCacheManagerImpl entryCacheManager = + (RangeEntryCacheManagerImpl) ledger.getFactory().getEntryCacheManager(); Message msg; // 2. Produce messages @@ -1214,8 +1235,8 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { subscriber1.acknowledge(msg); } - // Verify: EntryCache has been invalidated - verify(entryCache, atLeastOnce()).invalidateEntries(any()); + // Verify: entry cache should have been cleared as subscriber1 has consumed all messages + Awaitility.await().untilAsserted(() -> assertEquals(entryCacheManager.getNonEvictableSize(), Pair.of(0, 0L))); // sleep for a second: as ledger.updateCursorRateLimit RateLimiter will allow to invoke cursor-update after a // second @@ -1228,7 +1249,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { /************ usecase-2: *************/ // 1.b Subscriber slower-subscriber Consumer subscriber2 = pulsarClient.newConsumer() - .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub2).subscribe(); + .topic(topic).receiverQueueSize(5).subscriptionName(sub2).subscribe(); // Produce messages final int moreMessages = 10; for (int i = 0; i < receiverSize + moreMessages; i++) { @@ -1250,96 +1271,21 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { msg = subscriber1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); // Verify: as active-subscriber2 has not consumed messages: EntryCache must have those entries in cache - Awaitility.await().untilAsserted(() -> assertNotEquals(entryCache.getSize(), 0)); - - // 3.b Close subscriber2: which will trigger cache to clear the cache - subscriber2.close(); - - // retry strategically until broker clean up closed subscribers and invalidate all cache entries - retryStrategically((test) -> entryCache.getSize() == 0, 5, 100); - - // Verify: EntryCache should be cleared - assertEquals(entryCache.getSize(), 0); - subscriber1.close(); - log.info("-- Exiting {} test --", methodName); - } - - @Test(timeOut = 100000, dataProvider = "ackReceiptEnabled") - public void testDeactivatingBacklogConsumer(boolean ackReceiptEnabled) throws Exception { - log.info("-- Starting {} test --", methodName); - - final long batchMessageDelayMs = 100; - final int receiverSize = 10; - final String topicName = "cache-topic"; - final String topic = "persistent://my-property/my-ns/" + topicName; - final String sub1 = "faster-sub1"; - final String sub2 = "slower-sub2"; - - // 1. Subscriber Faster subscriber: let it consume all messages immediately - @Cleanup - Consumer subscriber1 = pulsarClient.newConsumer() - .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1) - .isAckReceiptEnabled(ackReceiptEnabled) - .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe(); - // 1.b. Subscriber Slow subscriber: - @Cleanup - Consumer subscriber2 = pulsarClient.newConsumer() - .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub2) - .isAckReceiptEnabled(ackReceiptEnabled) - .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe(); + assertNotEquals(entryCacheManager.getNonEvictableSize(), Pair.of(0, 0L)); - ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topic); - producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS) - .batchingMaxMessages(5); - @Cleanup - Producer producer = producerBuilder.create(); - - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); - ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); - - // reflection to set/get cache-backlog fields value: - final long maxMessageCacheRetentionTimeMillis = conf.getManagedLedgerCacheEvictionTimeThresholdMillis(); - final long maxActiveCursorBacklogEntries = conf.getManagedLedgerCursorBackloggedThreshold(); - - Message msg; - final int totalMsgs = (int) maxActiveCursorBacklogEntries + receiverSize + 1; - // 2. Produce messages - for (int i = 0; i < totalMsgs; i++) { - String message = "my-message-" + i; - producer.send(message.getBytes()); - } - // 3. Consume messages: at Faster subscriber - for (int i = 0; i < totalMsgs; i++) { - msg = subscriber1.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS); - subscriber1.acknowledgeAsync(msg); - } - - // wait : so message can be eligible to to be evict from cache - Thread.sleep(maxMessageCacheRetentionTimeMillis); - - // 4. deactivate subscriber which has built the backlog - topicRef.checkBackloggedCursors(); - Thread.sleep(100); - - // 5. verify: active subscribers - Set activeSubscriber = new HashSet<>(); - ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName())); - assertTrue(activeSubscriber.contains(sub1)); - assertFalse(activeSubscriber.contains(sub2)); - - // 6. consume messages : at slower subscriber - for (int i = 0; i < totalMsgs; i++) { - msg = subscriber2.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS); - subscriber2.acknowledgeAsync(msg); + // Consume messages for subscriber2 + for (int i = 0; i < receiverSize + moreMessages; i++) { + msg = subscriber2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + subscriber2.acknowledge(msg); } - topicRef.checkBackloggedCursors(); + // Verify: EntryCache should be cleared + Awaitility.await().untilAsserted(() -> assertEquals(entryCacheManager.getNonEvictableSize(), Pair.of(0, 0L))); - activeSubscriber.clear(); - ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName())); + subscriber2.close(); + subscriber1.close(); - assertTrue(activeSubscriber.contains(sub1)); - assertTrue(activeSubscriber.contains(sub2)); + log.info("-- Exiting {} test --", methodName); } @Test(timeOut = 5000) From a77e6ec64b05767c27c129fe737da7179f4f7910 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 29 Jul 2025 11:09:44 +0300 Subject: [PATCH 3/4] Add managedLedgerCacheEvictionTimeThresholdMillisMax to different layers --- conf/broker.conf | 14 +++++++- conf/standalone.conf | 14 +++++++- .../mledger/ManagedLedgerFactory.java | 32 ++++++++++++++++--- .../mledger/ManagedLedgerFactoryConfig.java | 16 +++++++++- .../impl/ManagedLedgerFactoryImpl.java | 13 ++++++++ .../mledger/impl/cache/EntryCacheManager.java | 4 +-- .../cache/RangeEntryCacheManagerImpl.java | 8 +++-- .../pulsar/broker/ServiceConfiguration.java | 19 +++++++++-- .../pulsar/broker/service/BrokerService.java | 7 +++- 9 files changed, 111 insertions(+), 16 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index c117596445dd2..5e8b593ecac0b 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1224,9 +1224,21 @@ managedLedgerCacheEvictionWatermark=0.9 # Configure the cache eviction interval in milliseconds for the managed ledger cache managedLedgerCacheEvictionIntervalMs=10 -# All entries that have stayed in cache for more than the configured time, will be evicted +# All entries that have stayed in cache for more than the configured time will be evicted. +# When cacheEvictionByExpectedReadCount is enabled, this threshold applies only to entries +# that have reached their expected read count (i.e., entries that have been read by all +# anticipated consumers). Entries with a positive expected read count use +# managedLedgerCacheEvictionTimeThresholdMillisMax instead. managedLedgerCacheEvictionTimeThresholdMillis=1000 +# Maximum time-to-live in cache for entries that still have pending expected reads. +# This setting is only effective when cacheEvictionByExpectedReadCount is enabled. +# Entries with a positive expected read count (indicating they are anticipated to be +# read by additional consumers) will be retained in cache up to this longer threshold, +# helping avoid cache misses for scenarios like Key_Shared subscription replays, +# catch-up reads, and consumers that temporarily fall behind the tail. +managedLedgerCacheEvictionTimeThresholdMillisMax=5000 + # Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged' # and thus should be set as inactive. managedLedgerCursorBackloggedThreshold=1000 diff --git a/conf/standalone.conf b/conf/standalone.conf index 1797728984629..84cb5abacacec 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -795,9 +795,21 @@ managedLedgerCacheEvictionWatermark=0.9 # Configure the cache eviction frequency for the managed ledger cache (evictions/sec) managedLedgerCacheEvictionFrequency=100.0 -# All entries that have stayed in cache for more than the configured time, will be evicted +# All entries that have stayed in cache for more than the configured time will be evicted. +# When cacheEvictionByExpectedReadCount is enabled, this threshold applies only to entries +# that have reached their expected read count (i.e., entries that have been read by all +# anticipated consumers). Entries with a positive expected read count use +# managedLedgerCacheEvictionTimeThresholdMillisMax instead. managedLedgerCacheEvictionTimeThresholdMillis=1000 +# Maximum time-to-live in cache for entries that still have pending expected reads. +# This setting is only effective when cacheEvictionByExpectedReadCount is enabled. +# Entries with a positive expected read count (indicating they are anticipated to be +# read by additional consumers) will be retained in cache up to this longer threshold, +# helping avoid cache misses for scenarios like Key_Shared subscription replays, +# catch-up reads, and consumers that temporarily fall behind the tail. +managedLedgerCacheEvictionTimeThresholdMillisMax=5000 + # Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged' # and thus should be set as inactive. managedLedgerCursorBackloggedThreshold=1000 diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index d9c887fac468e..58fc1c6da8bff 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -220,17 +220,41 @@ void asyncDelete(String name, CompletableFuture mlConfigFut EntryCacheManager getEntryCacheManager(); /** - * update cache evictionTimeThreshold. - * - * @param cacheEvictionTimeThresholdNanos time threshold for eviction. + * update cache evictionTimeThreshold dynamically. Similar as + * {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillis(long)} + * but the value is in nanos. This inconsistency is kept for backwards compatibility. + * @param cacheEvictionTimeThresholdNanos time threshold in nanos for eviction. */ void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos); /** + * time threshold for eviction. Similar as + * {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillis(long)} + * but the value is in nanos. * @return time threshold for eviction. - * */ + */ long getCacheEvictionTimeThreshold(); + /** + * update cache evictionTimeThresholdMax. Similar as + * {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillisMax(long)} + * but the value is in nanos. This inconsistency is kept for consistency with + * {@link #updateCacheEvictionTimeThreshold(long)}. + * @param cacheEvictionTimeThresholdMaxNanos time threshold in nanos for eviction. + */ + default void updateCacheEvictionTimeThresholdMax(long cacheEvictionTimeThresholdMaxNanos) { + // Default implementation does nothing for backwards compatibility of the ManagedLedgerFactory interface. + // Subclasses can override this method to provide specific behavior. + } + + /** + * max time threshold for eviction. Similar as + * {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillisMax(long)} + * but the value is in nanos. + * @return max time threshold for eviction. + * */ + long getCacheEvictionTimeThresholdMax(); + /** * @return properties of this managedLedger. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index af538262ed44a..f6ff0f1d33af4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -47,10 +47,24 @@ public class ManagedLedgerFactoryConfig { private long cacheEvictionIntervalMs = 10; /** - * All entries that have stayed in cache for more than the configured time, will be evicted. + * All entries that have stayed in cache for more than the configured time will be evicted. + * When cacheEvictionByExpectedReadCount is enabled, this threshold applies only to entries + * that have reached their expected read count (i.e., entries that have been read by all + * anticipated consumers). Entries with a positive expected read count use + * managedLedgerCacheEvictionTimeThresholdMillisMax instead. */ private long cacheEvictionTimeThresholdMillis = 1000; + /** + * Maximum time-to-live in cache for entries that still have pending expected reads. + * This setting is only effective when cacheEvictionByExpectedReadCount is enabled. + * Entries with a positive expected read count (indicating they are anticipated to be + * read by additional consumers) will be retained in cache up to this longer threshold, + * helping avoid cache misses for scenarios like Key_Shared subscription replays, + * catch-up reads, and consumers that temporarily fall behind the tail. + */ + private long cacheEvictionTimeThresholdMillisMax = 5000; + /** * Whether we should make a copy of the entry payloads when inserting in cache. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index e31db8bac46d6..45be8ce260f09 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -135,6 +135,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final ScheduledFuture flushCursorsTask; private volatile long cacheEvictionTimeThresholdNanos; + private volatile long cacheEvictionTimeThresholdNanosMax; private final MetadataStore metadataStore; private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats; @@ -251,6 +252,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS .toNanos(config.getCacheEvictionTimeThresholdMillis()); + this.cacheEvictionTimeThresholdNanosMax = TimeUnit.MILLISECONDS + .toNanos(config.getCacheEvictionTimeThresholdMillisMax()); long evictionTaskInterval = config.getCacheEvictionIntervalMs(); cacheEvictionExecutor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::doCacheEviction), @@ -1152,6 +1155,16 @@ public long getCacheEvictionTimeThreshold(){ return cacheEvictionTimeThresholdNanos; } + @Override + public void updateCacheEvictionTimeThresholdMax(long cacheEvictionTimeThresholdNanosMax){ + this.cacheEvictionTimeThresholdNanosMax = cacheEvictionTimeThresholdNanosMax; + } + + @Override + public long getCacheEvictionTimeThresholdMax(){ + return cacheEvictionTimeThresholdNanosMax; + } + @Override public ManagedLedgerFactoryMXBean getCacheStats() { return this.mbean; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java index 853cb1b87632c..7a4a3b38a4d4c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java @@ -18,10 +18,10 @@ */ package org.apache.bookkeeper.mledger.impl.cache; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.ManagedLedger; public interface EntryCacheManager { - EntryCache getEntryCache(ManagedLedgerImpl ml); + EntryCache getEntryCache(ManagedLedger ml); void removeEntryCache(String name); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java index 57b7124fbf252..4094388965026 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; @@ -75,14 +76,15 @@ public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory, OrderedSched log.info("Initialized managed-ledger entry cache of {} Mb", maxSize / MB); } - public EntryCache getEntryCache(ManagedLedgerImpl ml) { + public EntryCache getEntryCache(ManagedLedger ml) { if (maxSize == 0) { // Cache is disabled - return new EntryCacheDisabled(ml); + return new EntryCacheDisabled((ManagedLedgerImpl) ml); } EntryCache newEntryCache = - new RangeEntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache(), rangeCacheRemovalQueue); + new RangeEntryCacheImpl(this, (ManagedLedgerImpl) ml, mlFactory.getConfig().isCopyEntriesInCache(), + rangeCacheRemovalQueue); EntryCache currentEntryCache = caches.putIfAbsent(ml.getName(), newEntryCache); if (currentEntryCache != null) { return currentEntryCache; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f52d741624cc2..2ed1acae5b386 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2148,10 +2148,23 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms") private long managedLedgerCacheEvictionIntervalMs = 10; - @FieldContext(category = CATEGORY_STORAGE_ML, - dynamic = true, - doc = "All entries that have stayed in cache for more than the configured time, will be evicted") + @FieldContext(category = CATEGORY_STORAGE_ML, dynamic = true, doc = + "All entries that have stayed in cache for more than the configured time will be evicted. " + + "When cacheEvictionByExpectedReadCount is enabled, this threshold applies only to entries " + + "that have reached their expected read count (i.e., entries that have been read by all " + + "anticipated consumers). Entries with a positive expected read count use " + + "managedLedgerCacheEvictionTimeThresholdMillisMax instead.") private long managedLedgerCacheEvictionTimeThresholdMillis = 1000; + + @FieldContext(category = CATEGORY_STORAGE_ML, dynamic = true, doc = + "Maximum time-to-live in cache for entries that still have pending expected reads. " + + "This setting is only effective when cacheEvictionByExpectedReadCount is enabled. " + + "Entries with a positive expected read count (indicating they are anticipated to be " + + "read by additional consumers) will be retained in cache up to this longer threshold, " + + "helping avoid cache misses for scenarios like Key_Shared subscription replays, " + + "catch-up reads, and consumers that temporarily fall behind the tail.") + private long managedLedgerCacheEvictionTimeThresholdMillisMax = 5000; + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'" + " and thus should be set as inactive.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fc8720acf0533..4fc6cbeb78c9e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2880,7 +2880,12 @@ private void updateConfigurationAndRegisterListeners() { defaultManagedLedgerFactory.updateCacheEvictionTimeThreshold(MILLISECONDS .toNanos((long) cacheEvictionTimeThresholdMills)); }); - + // add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillisMax dynamic config + registerConfigurationListener( + "managedLedgerCacheEvictionTimeThresholdMillisMax", (cacheEvictionTimeThresholdMillisMax) -> { + defaultManagedLedgerFactory.updateCacheEvictionTimeThresholdMax(MILLISECONDS + .toNanos((long) cacheEvictionTimeThresholdMillisMax)); + }); // add listener to update message-dispatch-rate in msg for topic registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> { From 2945c575d5449b9966129900ef6b07c5e5d89dc1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 29 Jul 2025 16:29:37 +0300 Subject: [PATCH 4/4] Remove maxTimestamp from EntryCacheManager.doCacheEviction method --- .../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 3 +-- .../bookkeeper/mledger/impl/cache/EntryCacheManager.java | 2 +- .../mledger/impl/cache/RangeEntryCacheManagerImpl.java | 3 ++- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 45be8ce260f09..0dcc93f2d73cd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -322,8 +322,7 @@ private synchronized void refreshStats() { @VisibleForTesting public synchronized void doCacheEviction() { - long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos; - entryCacheManager.doCacheEviction(maxTimestamp); + entryCacheManager.doCacheEviction(); } /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java index 7a4a3b38a4d4c..49eea3338f21d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java @@ -37,5 +37,5 @@ public interface EntryCacheManager { double getCacheEvictionWatermark(); - void doCacheEviction(long maxTimestamp); + void doCacheEviction(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java index 4094388965026..c9f3a170c8d17 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java @@ -214,11 +214,12 @@ public double getCacheEvictionWatermark() { } @Override - public void doCacheEviction(long maxTimestamp) { + public void doCacheEviction() { // this method is expected to be called from the cache eviction executor CompletableFuture evictionCompletionFuture = new CompletableFuture<>(); evictionInProgress.set(evictionCompletionFuture); try { + long maxTimestamp = System.nanoTime() - mlFactory.getCacheEvictionTimeThreshold(); evictionHandler.invalidateEntriesBeforeTimestampNanos(maxTimestamp); doEvictToWatermarkWhenOverThreshold(); } finally {