From f67613077844a98935806f2467b06ff0037d041f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 13 May 2025 20:43:09 +0800 Subject: [PATCH 1/3] [improve][pip] PIP-418: Determine the behaviors for components that rely on BookKeeper when BookKeeper is not used --- pip/pip-418.md | 174 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 pip/pip-418.md diff --git a/pip/pip-418.md b/pip/pip-418.md new file mode 100644 index 0000000000000..02bddb2b13d5a --- /dev/null +++ b/pip/pip-418.md @@ -0,0 +1,174 @@ +# PIP-418: Determine the behaviors for components that rely on BookKeeper when BookKeeper is not used + +# Background knowledge + +`PulsarService` has a method for other classes to access the BookKeeper client (`BookKeeper`): + +```java + public BookKeeper getBookKeeperClient() { + ManagedLedgerStorageClass defaultStorageClass = getManagedLedgerStorage().getDefaultStorageClass(); + if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass bkStorageClass) { + return bkStorageClass.getBookKeeperClient(); + } else { + // TODO: Refactor code to support other than default bookkeeper based storage class + throw new UnsupportedOperationException("BookKeeper client is not available"); + } + } +``` + +The BookKeeper client is obtained from the `BookkeeperManagedLedgerStorageClass` interface, which extends the `ManagedLedgerStorageClass` interface. This interface represents the actual storage layer of `ManagedLedgerStorage`. + +```java + Collection getStorageClasses(); +``` + +The following components share the same `BookKeeper` instance: +- `PulsarCompactionServiceFactory`: The default compaction service configured by `compactionServiceFactoryClassName` +- `EventTimeCompactionServiceFactory`: Another built-in compaction service introduced by PIP-215 +- `StrategicTwoPhaseCompactor`: The compactor used only for the `loadbalancer-service-unit-state` topic (introduced by PIP-352) + +This `BookKeeper` instance is also used in: +- `AdminResource#bookKeeper`: Handles the `/bookies/all` REST API +- `PersistentTopic#getInternalStats`: Uses the `BookKeeper` object to get a list of ledgers + +`PulsarService` also has another method to create a new BookKeeper client from the factory class (`BookKeeperClientFactory`): + +```java + public BookKeeperClientFactory getBookKeeperClientFactory() { + return bkClientFactory; + } +``` + +The following components leverages this factory to create their own `BookKeeper` instances: +- `BookkeeperSchemaStorageFactory`: The default schema registry storage configured by `schemaRegistryStorageClassName` +- `BookkeeperBucketSnapshotStorage`: A non-default built-in delayed delivery tracker configured by `delayedDeliveryTrackerFactoryClassName` + +# Motivation + +Pulsar's pluggable managed ledger interface (`ManagedLedgerStorage`) allows a customized storage layer, such as S3. However, in practice, implementing a custom `ManagedLedgerStorage` is insufficient. At least, we have to customize: +- Schema Registry: The default implementation relies on BookKeeper. +- Topic Policies Service: The default implementation relies on system topics, which may not function correctly when the compaction service is enabled. + +Even if a storage layer that does not rely on BookKeeper is configured, Pulsar still sets a `BookKeeperClientFactory` for components like the schema registry. While `PulsarService#getBookKeeperClient` throws an exception if the default storage class is not a `BookkeeperManagedLedgerStorageClass`, the compaction service will not fail immediately. This is because the compaction service is only created when a topic is initialized (in `PulsarService#newTopicCompactionService`). + +When BookKeeper is not the default storage, the behaviors are not clear for these components unless looking into the code. This proposal aims at a clear definition of the behavior. + +# Goals + +## In Scope + +Define the behavior for components that rely on BookKeeper when BookKeeper is not the default storage. + +# High Level Design + +When BookKeeper is not the default storage, + +| Component | Behavior | +| :- | :- | +| Schema Registry | Fail fast | +| Delayed Delivery Tracker | Fail fast if it's `BucketDelayedDeliveryTrackerFactory` | +| Compaction Service | Fallback to a dummy implementation | +| `/bookies/all` REST API | Return an empty bookie list | +| `PersistentTopic#getInternalStats` | Return an empty ledger list in stats | + +Here we don't fail fast for the compaction service because whether to perform compaction does not affect the correctness. It only affects the storage size. + +# Detailed Design + +## Design & Implementation Details + +Simplify the `ManagedLedgerStorage#initialize` method by removing the `BookKeeperClientFactory` and `EventLoopGroup` parameters: + +```java +public interface ManagedLedgerStorage extends AutoCloseable { + + void initialize(ServiceConfiguration conf, + MetadataStoreExtended metadataStore, + OpenTelemetry openTelemetry) throws Exception; +``` + +The reason to have these parameters is that `BookKeeperClientFactory` is maintained in `PulsarService` and creating a `BookKeeper` instance requires a `EventLoopGroup` object. + +However, these BookKeeper related parameters are not useful for a `ManagedLedgerStorage` implementation. Even if it's another BookKeeper based implementation, it should create its own BookKeeper client objects according to the configuration (`conf`). In addition, the `EventLoopGroup` instance passed to the method represents the `pulsar-io` thread pool, which is coupled with Pulsar's internal details. The custom implementation should create is own thread pool. + +Since the built-in implementation still requires an `EventLoopGroup` instance, we can skip calling `initialize` and pass the arguments directly to the constructor: + +```java +if (config.getManagedLedgerStorageClassName().equals(ManagedLedgerClientFactory.class.getName())) { + return new ManagedLedgerClientFactory(config, localMetadataStore, ioEventLoopGroup, openTelemetry); +} else { + // create() will pass these arguments to initialize() + return ManagedLedgerStorage.create(config, localMetadataStore, openTelemetry); +} +``` + +Add a new method to `BookkeeperManagedLedgerStorageClass`: + +```java +public interface BookkeeperManagedLedgerStorageClass extends ManagedLedgerStorageClass { + /* ... */ + BookKeeperClientFactory getBookKeeperClientFactory(); +} +``` + +Then the `bkClientFactory` field will move from `PulsarService` to `ManagedLedgerClientFactory`. + +`PulsarService` still maintains the BookKeeper client getters but they should always access via the `ManagedLedgerStorage` object: + +```java + public Optional getOptionalBookKeeperClient() { + final var defaultStorage = managedLedgerStorage.getDefaultStorageClass(); + if (defaultStorage instanceof BookkeeperManagedLedgerStorageClass bkStorage) { + return Optional.of(bkStorage.getBookKeeperClient()); + } else { + return Optional.empty(); + } + } + + public BookKeeper getBookKeeperClient() throws PulsarServerException { + return getOptionalBookKeeperClient().orElseThrow(PulsarServerException.BookKeeperNotSupportedException::new); + } + + public BookKeeperClientFactory getBookKeeperClientFactory() throws PulsarServerException { + if (managedLedgerStorage.getDefaultStorageClass() instanceof BookkeeperManagedLedgerStorageClass bkStorage) { + return bkStorage.getBookKeeperClientFactory(); + } else { + throw new PulsarServerException.BookKeeperNotSupportedException(); + } + } +``` + +```java + public static class BookKeeperNotSupportedException extends PulsarServerException { +``` + +For fail-fast cases, this new exception will be thrown. + +Specifically, this proposal provides a dummy compaction service implementation: + +```java +public class DisabledTopicCompactionService implements CompactionServiceFactory { +``` + +When a built-in compaction service is configured but BookKeeper is not the storage, fallback to this implementation with a warning log: + +```java +if (getOptionalBookKeeperClient().isEmpty() + && PulsarCompactionServiceFactory.class.isAssignableFrom(compactionServiceFactory.getClass())) { + LOG.warn("BookKeeper client is not available, fallback to a disabled compaction service"); + return new DisabledTopicCompactionService(); +} +``` + +# Backward & Forward Compatibility + +This proposal only breaks some unstable managed ledger related interfaces that might have impact on the plugin implementation. + +If the custom `ManagedLedgerStorage` implementation already rely on the `pulsar-io` thread pool (the previous `eventLoopGroup` parameter), it has to create a separated `EventLoopGroup` instance instead. But it should be a very rare case and it's not a good practice to reuse the `pulsar-io` thread pool. + +# Alternatives + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: From 7a858f3c07f5d05f6fae2c98573be8bef60ccf3b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 13 May 2025 20:52:58 +0800 Subject: [PATCH 2/3] Add discussion thread --- pip/pip-418.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-418.md b/pip/pip-418.md index 02bddb2b13d5a..9e2a9cf3132c3 100644 --- a/pip/pip-418.md +++ b/pip/pip-418.md @@ -170,5 +170,5 @@ If the custom `ManagedLedgerStorage` implementation already rely on the `pulsar- # Links -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/vbgw7ntszkx36mvq5vhk80qlzvtl909r * Mailing List voting thread: From e2b35ec4549d0b6d7f3122d21f250557095dfd97 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Jun 2025 16:15:22 +0800 Subject: [PATCH 3/3] Update pip/pip-418.md Co-authored-by: Penghui Li --- pip/pip-418.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-418.md b/pip/pip-418.md index 9e2a9cf3132c3..9887acf81b5c0 100644 --- a/pip/pip-418.md +++ b/pip/pip-418.md @@ -89,7 +89,7 @@ public interface ManagedLedgerStorage extends AutoCloseable { The reason to have these parameters is that `BookKeeperClientFactory` is maintained in `PulsarService` and creating a `BookKeeper` instance requires a `EventLoopGroup` object. -However, these BookKeeper related parameters are not useful for a `ManagedLedgerStorage` implementation. Even if it's another BookKeeper based implementation, it should create its own BookKeeper client objects according to the configuration (`conf`). In addition, the `EventLoopGroup` instance passed to the method represents the `pulsar-io` thread pool, which is coupled with Pulsar's internal details. The custom implementation should create is own thread pool. +However, these BookKeeper related parameters are not useful for a `ManagedLedgerStorage` implementation. Even if it's another BookKeeper based implementation, it should create its own BookKeeper client objects according to the configuration (`conf`). In addition, the `EventLoopGroup` instance passed to the method represents the `pulsar-io` thread pool, which is coupled with Pulsar's internal details. The custom implementation should create its own thread pool. Since the built-in implementation still requires an `EventLoopGroup` instance, we can skip calling `initialize` and pass the arguments directly to the constructor: