Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3048,17 +3048,28 @@ private ConfigKeys() {
public static final String CONTROLLER_USE_MULTI_REGION_REAL_TIME_TOPIC_SWITCHER_ENABLED =
"controller.use.multi.region.real.time.topic.switcher.enabled";

/**
* Config for concurrently deleting store versions in Venice controller.
*/
public static final String ENABLE_CONCURRENTLY_DELETING_STORE_VERSIONS = "enable.concurrent_deleting_store_versions";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's follow the format consistent with other configs in the file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.
I'd also suggest if these keys have logical grouping to them, prefix that to the configurations as well. Additionally, some of these configurations can be agnostic to type of clean up e.g., (concurrent vs sequential and so-on)

e.g., store.version.cleanup would be the umbrella under which you can put these configs. One way to model is

store.version.cleanup.policy
store.version.cleanup.wait.time.ms
store.version.cleanup.worker.pool.size

I am aware some of the existing configurations maybe spread across but this way, we can eventually bring them under all related configs under a single umbrella/prefix and thus enables us to parse out configurations and extract feature relevant configurations with ease.

public static final String WORKER_THREAD_SIZE_FOR_CONCURRENTLY_DELETING_STORE_VERSIONS =
"worker_thread_size_for_concurrent_deleting_store_versions";

public static final String MAX_WAIT_TIME_FOR_CONCURRENTLY_DELETING_STORE_VERSIONS_IN_MS =
"max_wait_time_for_concurrently_deleting_store_versions_in_ms";

/**
* Number of consecutive cycles to wait before removing a replica that does not have a corresponding entry in local
* customized view cache before removing it from lag monitor. e.g. if this config is set to 10, and we are using the
* default sleep interval of 60 seconds then we will only remove the replica from lag monitor after at least 600
* seconds without having any corresponding entry in customized view.
*/
*/
public static final String SERVER_LAG_MONITOR_CLEANUP_CYCLE = "server.lag.monitor.cleanup.cycle";

/**
* Thread pool size for the async store change notifier service that handles store metadata change events.
* Default is 1.
*/
public static final String STORE_CHANGE_NOTIFIER_THREAD_POOL_SIZE = "store.change.notifier.thread.pool.size";

}
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ public class VeniceControllerClusterConfig {

private final boolean isSkipHybridStoreRTTopicCompactionPolicyUpdateEnabled;

private final boolean enableConcurrentlyDeletingStoreVersions;
private final int workerThreadSizeForConcurrentlyDeletingStoreVersions;
private final long maxWaitTimeForConcurrentlyDeletingStoreVersionsInMs;

public VeniceControllerClusterConfig(VeniceProperties props) {
this.props = props;
this.clusterName = props.getString(CLUSTER_NAME);
Expand Down Expand Up @@ -1277,6 +1281,13 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.useMultiRegionRealTimeTopicSwitcher =
props.getBoolean(ConfigKeys.CONTROLLER_USE_MULTI_REGION_REAL_TIME_TOPIC_SWITCHER_ENABLED, false);

this.enableConcurrentlyDeletingStoreVersions =
props.getBoolean(ConfigKeys.ENABLE_CONCURRENTLY_DELETING_STORE_VERSIONS, false);
this.workerThreadSizeForConcurrentlyDeletingStoreVersions =
props.getInt(ConfigKeys.WORKER_THREAD_SIZE_FOR_CONCURRENTLY_DELETING_STORE_VERSIONS, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be a case where we don't want to create the threadpool when ENABLE_CONCURRENTLY_DELETING_STORE_VERSIONS is enabled? If so, let's set the default thread count to 1 so we can skip the check in VeniceHelixAdmin L568

this.maxWaitTimeForConcurrentlyDeletingStoreVersionsInMs =
props.getLong(ConfigKeys.MAX_WAIT_TIME_FOR_CONCURRENTLY_DELETING_STORE_VERSIONS_IN_MS, 120000);

this.logClusterConfig();
}

Expand Down Expand Up @@ -2415,6 +2426,18 @@ private void validateHelixCapacities(Integer helixInstanceCapacity, Integer heli
}
}

public boolean isEnableConcurrentlyDeletingStoreVersions() {
return enableConcurrentlyDeletingStoreVersions;
}

public int getWorkerThreadSizeForConcurrentlyDeletingStoreVersions() {
return workerThreadSizeForConcurrentlyDeletingStoreVersions;
}

public long getMaxWaitTimeForConcurrentlyDeletingStoreVersionsInMs() {
return maxWaitTimeForConcurrentlyDeletingStoreVersionsInMs;
}

public LogContext getLogContext() {
return logContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,14 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -491,6 +494,8 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
private final Map<String, DeadStoreStats> deadStoreStatsMap = new VeniceConcurrentHashMap<>();
private final Map<String, LogCompactionStats> logCompactionStatsMap = new VeniceConcurrentHashMap<>();

private final Map<String, ExecutorService> executorServiceMap;

// Test only.
public VeniceHelixAdmin(
VeniceControllerMultiClusterConfig multiClusterConfigs,
Expand Down Expand Up @@ -554,6 +559,34 @@ public VeniceHelixAdmin(
this.pubSubTopicRepository = pubSubTopicRepository;
this.sslEnabled = sslEnabled;

// only create the executor service when concurrent deletion is enabled for the cluster
this.executorServiceMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's create only one shared executor with configurable size

for (String clusterName: multiClusterConfigs.getClusters()) {
if (multiClusterConfigs.getControllerConfig(clusterName).isEnableConcurrentlyDeletingStoreVersions()) {
int workerThreadSize = multiClusterConfigs.getControllerConfig(clusterName)
.getWorkerThreadSizeForConcurrentlyDeletingStoreVersions();
if (workerThreadSize <= 0) {
// by default, if worker thread size is not configured properly, will skip creating the executor service
LOGGER.warn(
"The worker thread pool size for concurrently deleting store versions is set to a non-positive number: {}. Thread pool will not be created",
multiClusterConfigs.getControllerConfig(clusterName)
.getWorkerThreadSizeForConcurrentlyDeletingStoreVersions());
} else {
this.executorServiceMap.put(
clusterName,
new ThreadPoolExecutor(
workerThreadSize,
workerThreadSize,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new DaemonThreadFactory(
String.format("Venice-Admin-Delete-Version-Task-%s", clusterName),
getLogContext())));
}
}
}

if (sslEnabled) {
try {
String sslFactoryClassName = multiClusterConfigs.getSslFactoryClassName();
Expand Down Expand Up @@ -4122,9 +4155,39 @@ public List<Version> deleteAllVersionsInStore(String clusterName, String storeNa
repository.updateStore(store);
List<Version> deletingVersionSnapshot = new ArrayList<>(store.getVersions());

for (Version version: deletingVersionSnapshot) {
deleteOneStoreVersion(clusterName, version.getStoreName(), version.getNumber());
ExecutorService executorService = executorServiceMap.get(clusterName);
if (executorService != null) {
// delete all versions in parallel
List<Callable<Void>> tasks = new ArrayList<>();
for (Version version: deletingVersionSnapshot) {
tasks.add(() -> {
deleteOneStoreVersion(clusterName, version.getStoreName(), version.getNumber());
return null;
});
}
try {
List<Future<Void>> futures = executorService.invokeAll(
tasks,
getMultiClusterConfigs().getControllerConfig(clusterName)
.getMaxWaitTimeForConcurrentlyDeletingStoreVersionsInMs(),
TimeUnit.MILLISECONDS);
for (Future<Void> future: futures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(
"Concurrently store deletion: Failed to delete all versions in store: {} in cluster: {}",
storeName,
clusterName,
e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rethrow exception here?

}
} else {
// delete all versions sequentially
for (Version version: deletingVersionSnapshot) {
deleteOneStoreVersion(clusterName, version.getStoreName(), version.getNumber());
}
}

LOGGER.info("Deleted all versions in store: {} in cluster: {}", storeName, clusterName);
return deletingVersionSnapshot;
}
Expand Down Expand Up @@ -7097,6 +7160,12 @@ public void stopVeniceController() {
zkClient.close();
admin.close();
helixAdminClient.close();
for (String cluster: multiClusterConfigs.getClusters()) {
ExecutorService executorService = executorServiceMap.get(cluster);
if (executorService != null) {
executorService.shutdownNow();
}
}
} catch (Exception e) {
throw new VeniceException("Can not stop controller correctly.", e);
}
Expand Down Expand Up @@ -8583,6 +8652,12 @@ public void close() {

long elapsedTime = System.currentTimeMillis() - closeStartTime;
long remainingTime = Math.max(1, HELIX_MANAGER_DISCONNECT_TIMEOUT_MS - elapsedTime);
for (String cluster: multiClusterConfigs.getClusters()) {
ExecutorService executorService = executorServiceMap.get(cluster);
if (executorService != null) {
executorService.shutdownNow();
}
}
helixManagerDisconnectFuture.get(remainingTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error("Timed out waiting for helixManagerDisconnectFuture. Swallowing and moving on.", e);
Expand Down
Loading