From a831dbcb772f5cc3a3ee78d4fb237572dfca7418 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 6 Jun 2025 11:54:41 +0300 Subject: [PATCH] Make managedLedgerMaxBatchDeletedIndexToPersist configurable and document other related configs --- conf/broker.conf | 34 +++++++++++++------ conf/standalone.conf | 22 +++++++++--- .../mledger/ManagedLedgerConfig.java | 10 ++++++ .../pulsar/broker/ServiceConfiguration.java | 24 ++++++++++--- .../pulsar/broker/service/BrokerService.java | 2 ++ 5 files changed, 73 insertions(+), 19 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 5c1485aa9e9e2..1888758e3f771 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1280,6 +1280,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000 # Max time before triggering a rollover on a cursor ledger managedLedgerCursorRolloverTimeInSeconds=14400 +# Maximum amount of memory used hold data read from storage (or from the cache). +# This mechanism prevents the broker to have too many concurrent +# reads from storage and fall into Out of Memory errors in case +# of multiple concurrent reads to multiple concurrent consumers. +# Set 0 in order to disable the feature. +# +managedLedgerMaxReadsInFlightSizeInMB=0 + # Max number of "acknowledgment holes" that are going to be persistently stored. # When acknowledging out of order, a consumer will leave holes that are supposed # to be quickly filled by acking all the messages. The information of which @@ -1289,13 +1297,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400 # crashes. managedLedgerMaxUnackedRangesToPersist=10000 -# Maximum amount of memory used hold data read from storage (or from the cache). -# This mechanism prevents the broker to have too many concurrent -# reads from storage and fall into Out of Memory errors in case -# of multiple concurrent reads to multiple concurrent consumers. -# Set 0 in order to disable the feature. -# -managedLedgerMaxReadsInFlightSizeInMB=0 +# Maximum number of partially acknowledged batch messages per subscription that will have their batch +# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true. +# When this limit is exceeded, remaining batch message containing the batch deleted indexes will +# only be tracked in memory. In case of broker restarts or load balancing events, the batch +# deleted indexes will be cleared while redelivering the messages to consumers. +managedLedgerMaxBatchDeletedIndexToPersist=10000 + +# When storing acknowledgement state, choose a more compact serialization format that stores +# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires +# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective. +managedLedgerPersistIndividualAckAsLongArray=true + +# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position" +# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged +# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages. +managedLedgerUnackedRangesOpenCacheSetEnabled=true # Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into @@ -1777,9 +1794,6 @@ narExtractionDirectory= # Maximum prefetch rounds for ledger reading for offloading managedLedgerOffloadPrefetchRounds=1 -# Use Open Range-Set to cache unacked messages -managedLedgerUnackedRangesOpenCacheSetEnabled=true - # For Amazon S3 ledger offload, AWS region s3ManagedLedgerOffloadRegion= diff --git a/conf/standalone.conf b/conf/standalone.conf index 40b88d185c517..e890191b313a3 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -827,6 +827,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048 # crashes. managedLedgerMaxUnackedRangesToPersist=10000 +# Maximum number of partially acknowledged batch messages per subscription that will have their batch +# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true. +# When this limit is exceeded, remaining batch message containing the batch deleted indexes will +# only be tracked in memory. In case of broker restarts or load balancing events, the batch +# deleted indexes will be cleared while redelivering the messages to consumers. +managedLedgerMaxBatchDeletedIndexToPersist=10000 + +# When storing acknowledgement state, choose a more compact serialization format that stores +# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires +# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective. +managedLedgerPersistIndividualAckAsLongArray=true + +# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position" +# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged +# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages. +managedLedgerUnackedRangesOpenCacheSetEnabled=true + # Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into # MetadataStore. @@ -861,9 +878,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000 # Maximum backlog entry difference to prevent caching entries that can't be reused. managedLedgerMaxBacklogBetweenCursorsForCaching=1000 -# Use Open Range-Set to cache unacked messages -managedLedgerUnackedRangesOpenCacheSetEnabled=true - # Managed ledger prometheus stats latency rollover seconds (default: 60s) managedLedgerPrometheusStatsLatencyRolloverSeconds=60 @@ -1362,4 +1376,4 @@ topicCompactionRetainNullKey=false # If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory", # will create topic compaction service based on message eventTime. # By default compaction service is based on message publishing order. -compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory \ No newline at end of file +compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index c1cbf0ad704fe..aaa127973a5ec 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -502,6 +502,16 @@ public int getMaxBatchDeletedIndexToPersist() { return maxBatchDeletedIndexToPersist; } + /** + * Set max batch deleted index that will be persisted and recovered. + * + * @param maxBatchDeletedIndexToPersist + * max batch deleted index that will be persisted and recovered. + */ + public void setMaxBatchDeletedIndexToPersist(int maxBatchDeletedIndexToPersist) { + this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist; + } + public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() { return persistentUnackedRangesWithMultipleEntriesEnabled; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index af6e2d1418059..6698a1df10882 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2270,10 +2270,22 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " will only be tracked in memory and messages will be redelivered in case of" + " crashes.") private int managedLedgerMaxUnackedRangesToPersist = 10000; - @FieldContext( - category = CATEGORY_STORAGE_ML, - doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate") + + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Maximum number of partially acknowledged batch messages per subscription that will have their batch " + + "deleted indexes persisted. Batch deleted index state is handled when " + + "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n" + + "When this limit is exceeded, remaining batch message containing the batch deleted indexes will " + + "only be tracked in memory. In case of broker restarts or load balancing events, the batch " + + "deleted indexes will be cleared while redelivering the messages to consumers.") + private int managedLedgerMaxBatchDeletedIndexToPersist = 10000; + + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "When storing acknowledgement state, choose a more compact serialization format that stores" + + " individual acknowledgements as a bitmap which is serialized to an array of long values.\n\n" + + "NOTE: This setting requires managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.") private boolean managedLedgerPersistIndividualAckAsLongArray = true; + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" " @@ -2296,8 +2308,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000; @FieldContext( category = CATEGORY_STORAGE_OFFLOADING, - doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)" - ) + doc = "When set to true, a BitSet will be used to track acknowledged messages that come after the \"mark " + + "delete position\" for each subscription.\n\nRoaringBitmap is used as a memory efficient BitSet " + + "implementation for the acknowledged messages tracking. Unacknowledged ranges are the message " + + "ranges excluding the acknowledged messages.") private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true; @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index cba1385621261..307deab8647da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2056,6 +2056,8 @@ public CompletableFuture getManagedLedgerConfig(@NonNull To managedLedgerConfig .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); + managedLedgerConfig.setMaxBatchDeletedIndexToPersist( + serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist()); managedLedgerConfig .setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray()); managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(