Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public NotFoundException(Throwable t) {
}
}

public static class BookKeeperNotSupportedException extends PulsarServerException {

public BookKeeperNotSupportedException() {
super("ManagedLedgerStorage#getDefaultStorageClass does not return a BookkeeperManagedLedgerStorageClass "
+ "instance");
}
}

public static PulsarServerException from(Throwable throwable) {
if (throwable instanceof CompletionException) {
return from(throwable.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
private BookkeeperManagedLedgerStorageClass defaultStorageClass;
private BookKeeperClientFactory bookkeeperProvider;
private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
Expand All @@ -61,9 +62,18 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {

@Override
public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
BookKeeperClientFactory bookkeeperProvider,
EventLoopGroup eventLoopGroup,
OpenTelemetry openTelemetry) throws Exception {
OpenTelemetry openTelemetry) {
throw new IllegalStateException("The initialize method should not be called for the built-in storage");
}

@VisibleForTesting
public ManagedLedgerClientFactory() {
}

public ManagedLedgerClientFactory(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
EventLoopGroup eventLoopGroup,
OpenTelemetry openTelemetry) throws Exception {
this.bookkeeperProvider = new BookKeeperClientFactoryImpl();
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
Expand Down Expand Up @@ -153,6 +163,11 @@ public StatsProvider getStatsProvider() {
return statsProvider;
}

@Override
public BookKeeperClientFactory getBookKeeperClientFactory() {
return bookkeeperProvider;
}

@Override
public BookKeeper getBookKeeperClient() {
return defaultBkClient;
Expand Down Expand Up @@ -229,6 +244,7 @@ public void close() throws IOException {
log.warn("Failed to close bookkeeper-client for policy {}", policy, e);
}
});
bookkeeperProvider.close();
log.info("Closed BookKeeper client");
} catch (Exception e) {
log.warn(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
Expand Down Expand Up @@ -166,6 +165,7 @@
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.DisabledTopicCompactionService;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
import org.apache.pulsar.compaction.TopicCompactionService;
Expand Down Expand Up @@ -218,7 +218,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private WebService webService = null;
private WebSocketService webSocketService = null;
private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
private BookKeeperClientFactory bkClientFactory;
protected CompactionServiceFactory compactionServiceFactory;
private StrategicTwoPhaseCompactor strategicCompactor;
private ResourceUsageTransportManager resourceUsageTransportManager;
Expand Down Expand Up @@ -590,11 +589,6 @@ public CompletableFuture<Void> closeAsync() {
this.managedLedgerStorage = null;
}

if (bkClientFactory != null) {
this.bkClientFactory.close();
this.bkClientFactory = null;
}

closeLeaderElectionService();

if (adminClient != null) {
Expand Down Expand Up @@ -879,8 +873,6 @@ public void start() throws PulsarServerException {
protocolHandlers.initialize(config);

// Now we are ready to start services
this.bkClientFactory = newBookKeeperClientFactory();

managedLedgerStorage = newManagedLedgerStorage();

this.brokerService = newBrokerService(this);
Expand Down Expand Up @@ -1105,10 +1097,12 @@ protected OrderedExecutor newOrderedExecutor() {

@VisibleForTesting
protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception {
return ManagedLedgerStorage.create(
config, localMetadataStore,
bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry()
);
final var openTelemetry = this.openTelemetry.getOpenTelemetryService().getOpenTelemetry();
if (config.getManagedLedgerStorageClassName().equals(ManagedLedgerClientFactory.class.getName())) {
return new ManagedLedgerClientFactory(config, localMetadataStore, ioEventLoopGroup, openTelemetry);
} else {
return ManagedLedgerStorage.create(config, localMetadataStore, openTelemetry);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -1516,16 +1510,19 @@ public WorkerService getWorkerService() throws UnsupportedOperationException {
+ "is not enabled, probably functionsWorkerEnabled is set to false"));
}

public BookKeeper getBookKeeperClient() {
ManagedLedgerStorageClass defaultStorageClass = getManagedLedgerStorage().getDefaultStorageClass();
if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass bkStorageClass) {
return bkStorageClass.getBookKeeperClient();
public Optional<BookKeeper> getOptionalBookKeeperClient() {
final var defaultStorage = managedLedgerStorage.getDefaultStorageClass();
if (defaultStorage instanceof BookkeeperManagedLedgerStorageClass bkStorage) {
return Optional.of(bkStorage.getBookKeeperClient());
} else {
// TODO: Refactor code to support other than default bookkeeper based storage class
throw new UnsupportedOperationException("BookKeeper client is not available");
return Optional.empty();
}
}

public BookKeeper getBookKeeperClient() throws PulsarServerException {
return getOptionalBookKeeperClient().orElseThrow(PulsarServerException.BookKeeperNotSupportedException::new);
}

public ManagedLedgerFactory getDefaultManagedLedgerFactory() {
return getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory();
}
Expand Down Expand Up @@ -1609,12 +1606,12 @@ private SchemaStorage createAndStartSchemaStorage() throws Exception {
return schemaStorage;
}

public BookKeeperClientFactory newBookKeeperClientFactory() {
return new BookKeeperClientFactoryImpl();
}

public BookKeeperClientFactory getBookKeeperClientFactory() {
return bkClientFactory;
public BookKeeperClientFactory getBookKeeperClientFactory() throws PulsarServerException {
if (managedLedgerStorage.getDefaultStorageClass() instanceof BookkeeperManagedLedgerStorageClass bkStorage) {
return bkStorage.getBookKeeperClientFactory();
} else {
throw new PulsarServerException.BookKeeperNotSupportedException();
}
}

public synchronized ScheduledExecutorService getCompactorExecutor() {
Expand All @@ -1637,17 +1634,15 @@ public Compactor getNullableCompactor() {
return null;
}

public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException {
return new StrategicTwoPhaseCompactor(this.getConfiguration(),
getClient(), getBookKeeperClient(),
getCompactorExecutor());
}

public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() throws PulsarServerException {
public synchronized Optional<StrategicTwoPhaseCompactor> getStrategicCompactor() throws PulsarServerException {
if (getOptionalBookKeeperClient().isEmpty()) {
return Optional.empty();
}
if (this.strategicCompactor == null) {
this.strategicCompactor = newStrategicCompactor();
this.strategicCompactor = new StrategicTwoPhaseCompactor(this.getConfiguration(), getClient(),
getOptionalBookKeeperClient().get(), getCompactorExecutor());
}
return this.strategicCompactor;
return Optional.of(this.strategicCompactor);
}

protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
Expand Down Expand Up @@ -2076,6 +2071,11 @@ private CompactionServiceFactory loadCompactionServiceFactory() {
var compactionServiceFactory =
Reflections.createInstance(compactionServiceFactoryClassName, CompactionServiceFactory.class,
Thread.currentThread().getContextClassLoader());
if (getOptionalBookKeeperClient().isEmpty()
&& PulsarCompactionServiceFactory.class.isAssignableFrom(compactionServiceFactory.getClass())) {
LOG.warn("BookKeeper client is not available, fallback to a disabled compaction service");
return new DisabledTopicCompactionService();
}
compactionServiceFactory.initialize(this).join();
return compactionServiceFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -90,10 +89,6 @@ public abstract class AdminResource extends PulsarWebResource {
protected NamespaceName namespaceName;
protected TopicName topicName;

protected BookKeeper bookKeeper() {
return pulsar().getBookKeeperClient();
}

/**
* Get the domain of the topic (whether it's persistent or non-persistent).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ public void getBookiesRackInfo(@Suspended final AsyncResponse asyncResponse) {
public BookiesClusterInfo getAllBookies() throws Exception {
validateSuperUserAccess();

BookKeeper bookKeeper = bookKeeper();
BookKeeper bookKeeper = pulsar().getOptionalBookKeeperClient().orElse(null);
if (bookKeeper == null) {
return BookiesClusterInfo.builder().bookies(List.of()).build();
}
MetadataClientDriver metadataClientDriver = bookKeeper.getMetadataClientDriver();
RegistrationClient registrationClient = metadataClientDriver.getRegistrationClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
*/
package org.apache.pulsar.broker.delayed;

import java.io.IOException;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.util.Reflections;

@UtilityClass
public class DelayedDeliveryTrackerLoader {
public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(PulsarService pulsarService)
throws IOException {
throws PulsarServerException {
try {
ServiceConfiguration conf = pulsarService.getConfiguration();
DelayedDeliveryTrackerFactory factory =
Expand All @@ -36,7 +36,7 @@ public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(Pu
factory.initialize(pulsarService);
return factory;
} catch (Exception e) {
throw new IOException(e);
throw PulsarServerException.from(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2904,8 +2904,9 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
getLedgerMetadataFutures.add(completableFuture);
CompletableFuture<LedgerMetadata> metadataFuture = null;
try {
metadataFuture = brokerService.getPulsar().getBookKeeperClient()
.getLedgerMetadata(ledgerId);
metadataFuture = brokerService.getPulsar().getOptionalBookKeeperClient()
.map(bkClient -> bkClient.getLedgerMetadata(ledgerId))
.orElse(null);
} catch (NullPointerException e) {
// related to bookkeeper issue https://github.com/apache/bookkeeper/issues/2741
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -4025,7 +4026,8 @@ public synchronized void triggerCompaction()

if (strategicCompactionMap.containsKey(topic)) {
currentCompaction = brokerService.pulsar().getStrategicCompactor()
.compact(topic, strategicCompactionMap.get(topic));
.map(__ -> __.compact(topic, strategicCompactionMap.get(topic)))
.orElse(CompletableFuture.completedFuture(COMPACTION_NEVER_RUN));
} else {
currentCompaction = topicCompactionService.compact().thenApply(x -> null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.broker.BookKeeperClientFactory;

/**
* ManagedLedgerStorageClass represents a configured instance of ManagedLedgerFactory for managed ledgers.
Expand All @@ -39,4 +40,9 @@ public interface BookkeeperManagedLedgerStorageClass extends ManagedLedgerStorag
* @return the stats provider.
*/
StatsProvider getStatsProvider();

/**
* Return the bookkeeper client factory instance that can be used to create a bookkeeper client instance.
*/
BookKeeperClientFactory getBookKeeperClientFactory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
*/
package org.apache.pulsar.broker.storage;

import io.netty.channel.EventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.classification.InterfaceAudience.Private;
import org.apache.pulsar.common.classification.InterfaceStability.Unstable;
Expand All @@ -47,13 +45,12 @@ public interface ManagedLedgerStorage extends AutoCloseable {
* Initialize the managed ledger storage.
*
* @param conf service config
* @param bookkeeperProvider bookkeeper provider
* @param metadataStore the metadata store used in Pulsar
* @param openTelemetry the OpenTelemetry instance used in Pulsar
* @throws Exception
*/
void initialize(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
BookKeeperClientFactory bookkeeperProvider,
EventLoopGroup eventLoopGroup,
OpenTelemetry openTelemetry) throws Exception;

/**
Expand Down Expand Up @@ -85,21 +82,20 @@ default ManagedLedgerStorageClass getDefaultStorageClass() {
void close() throws IOException;

/**
* Initialize the {@link ManagedLedgerStorage} from the provided resources.
* Initialize the {@link ManagedLedgerStorage} from the provided resources if it's not the built-in storage.
*
* @param conf service config
* @param bkProvider bookkeeper client provider
* @param metadataStore the metadata store used in Pulsar
* @param openTelemetry the OpenTelemetry instance used in Pulsar
* @return the initialized managed ledger storage.
*/
static ManagedLedgerStorage create(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
BookKeeperClientFactory bkProvider,
EventLoopGroup eventLoopGroup,
OpenTelemetry openTelemetry) throws Exception {
ManagedLedgerStorage storage =
Reflections.createInstance(conf.getManagedLedgerStorageClassName(), ManagedLedgerStorage.class,
Thread.currentThread().getContextClassLoader());
storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, openTelemetry);
storage.initialize(conf, metadataStore, openTelemetry);
return storage;
}
}
Loading
Loading