Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ public class ConfigOptions {
public static final ConfigOption<Integer> COORDINATOR_IO_POOL_SIZE =
key("coordinator.io-pool.size")
.intType()
.defaultValue(1)
.defaultValue(10)
.withDescription(
"The size of the IO thread pool to run blocking operations for coordinator server. "
+ "This includes discard unnecessary snapshot files. "
+ "Increase this value if you experience slow unnecessary snapshot files clean. "
+ "The default value is 1.");
+ "The default value is 10.");

// ------------------------------------------------------------------------
// ConfigOptions for Tablet Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@
import com.alibaba.fluss.server.zk.data.TabletServerRegistration;
import com.alibaba.fluss.server.zk.data.ZkData.PartitionIdsZNode;
import com.alibaba.fluss.server.zk.data.ZkData.TableIdsZNode;
import com.alibaba.fluss.utils.ExecutorUtils;
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
import com.alibaba.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
Expand All @@ -101,8 +99,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
Expand All @@ -111,7 +107,6 @@
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
import static com.alibaba.fluss.utils.concurrent.FutureUtils.completeFromCallable;

/** An implementation for {@link EventProcessor}. */
Expand All @@ -137,7 +132,6 @@ public class CoordinatorEventProcessor implements EventProcessor {
private final String internalListenerName;

private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
private final ExecutorService ioExecutor;

// in normal case, it won't be null, but from I can see, it'll only be null in unit test
// since the we won't register a coordinator node in zk.
Expand All @@ -156,15 +150,17 @@ public CoordinatorEventProcessor(
CoordinatorChannelManager coordinatorChannelManager,
AutoPartitionManager autoPartitionManager,
CoordinatorMetricGroup coordinatorMetricGroup,
Configuration conf) {
Configuration conf,
ExecutorService ioExecutor) {
this(
zooKeeperClient,
serverMetadataCache,
coordinatorChannelManager,
new CoordinatorContext(),
autoPartitionManager,
coordinatorMetricGroup,
conf);
conf,
ioExecutor);
}

public CoordinatorEventProcessor(
Expand All @@ -174,7 +170,8 @@ public CoordinatorEventProcessor(
CoordinatorContext coordinatorContext,
AutoPartitionManager autoPartitionManager,
CoordinatorMetricGroup coordinatorMetricGroup,
Configuration conf) {
Configuration conf,
ExecutorService ioExecutor) {
this.zooKeeperClient = zooKeeperClient;
this.serverMetadataCache = serverMetadataCache;
this.coordinatorChannelManager = coordinatorChannelManager;
Expand All @@ -193,11 +190,6 @@ public CoordinatorEventProcessor(
zooKeeperClient);
this.metadataManager = new MetadataManager(zooKeeperClient, conf);

int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
checkArgument(ioExecutorPoolSize > 0, "ioExecutorPoolSize must be positive");
this.ioExecutor =
Executors.newFixedThreadPool(
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
this.tableManager =
new TableManager(
metadataManager,
Expand Down Expand Up @@ -446,8 +438,6 @@ private void onShutdown() {
// then stop watchers
tableChangeWatcher.stop();
tabletServerChangeWatcher.stop();
// shutdown io executor
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.alibaba.fluss.server.zk.ZooKeeperUtils;
import com.alibaba.fluss.server.zk.data.CoordinatorAddress;
import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.ExecutorUtils;
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
import com.alibaba.fluss.utils.concurrent.FutureUtils;

import org.slf4j.Logger;
Expand All @@ -50,6 +52,9 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -111,6 +116,9 @@ public class CoordinatorServer extends ServerBase {
@GuardedBy("lock")
private AutoPartitionManager autoPartitionManager;

@GuardedBy("lock")
private ExecutorService ioExecutor;

public CoordinatorServer(Configuration conf) {
super(conf);
validateConfigs(conf);
Expand Down Expand Up @@ -175,6 +183,11 @@ protected void startServices() throws Exception {
new AutoPartitionManager(metadataCache, metadataManager, conf);
autoPartitionManager.start();

int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
this.ioExecutor =
Executors.newFixedThreadPool(
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));

// start coordinator event processor after we register coordinator leader to zk
// so that the event processor can get the coordinator leader node from zk during start
// up.
Expand All @@ -187,7 +200,8 @@ protected void startServices() throws Exception {
coordinatorChannelManager,
autoPartitionManager,
serverMetricGroup,
conf);
conf,
ioExecutor);
coordinatorEventProcessor.startup();

createDefaultDatabase();
Expand Down Expand Up @@ -268,6 +282,15 @@ CompletableFuture<Void> stopServices() {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

try {
if (ioExecutor != null) {
// shutdown io executor
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

try {
if (coordinatorEventProcessor != null) {
coordinatorEventProcessor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ private static void validateConfigs(Configuration conf) {
ConfigOptions.TABLET_SERVER_ID.key()));
}

if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COORDINATOR_IO_POOL_SIZE ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TabletServer use BACKGROUND_THREADS as io param. Here I just noticed that there was no such check before, so I added it

throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal 1.",
ConfigOptions.BACKGROUND_THREADS.key()));
}

if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
throw new IllegalConfigurationException(
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
import com.alibaba.fluss.types.DataTypes;
import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
import com.alibaba.fluss.utils.types.Tuple2;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -75,6 +76,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -152,14 +154,7 @@ void beforeEach() throws IOException {
new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration());
Configuration conf = new Configuration();
conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
eventProcessor =
new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration());
eventProcessor = buildCoordinatorEventProcessor();
eventProcessor.startup();
metadataManager.createDatabase(
defaultDatabase, DatabaseDescriptor.builder().build(), false);
Expand Down Expand Up @@ -221,14 +216,7 @@ void testCreateAndDropTable() throws Exception {
metadataManager.dropTable(t2, false);

// start the coordinator
eventProcessor =
new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration());
eventProcessor = buildCoordinatorEventProcessor();
initCoordinatorChannel();
eventProcessor.startup();
// make sure the table can still be deleted successfully
Expand Down Expand Up @@ -386,14 +374,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception {

// let's restart to check everything is ok
eventProcessor.shutdown();
eventProcessor =
new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration());
eventProcessor = buildCoordinatorEventProcessor();

// in this test case, so make requests to gateway should always be
// successful for when start up, it will send request to tablet servers
Expand Down Expand Up @@ -431,14 +412,7 @@ void testRestartTriggerReplicaToOffline() throws Exception {
// let's restart
initCoordinatorChannel();
eventProcessor.shutdown();
eventProcessor =
new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration());
eventProcessor = buildCoordinatorEventProcessor();
int failedServer = 0;
initCoordinatorChannel(failedServer);
eventProcessor.startup();
Expand Down Expand Up @@ -602,14 +576,7 @@ void testCreateAndDropPartition() throws Exception {
metadataManager.dropTable(tablePath, false);

// start the coordinator
eventProcessor =
new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration());
eventProcessor = buildCoordinatorEventProcessor();
initCoordinatorChannel();
eventProcessor.startup();
verifyPartitionDropped(tableId, partition2Id);
Expand Down Expand Up @@ -656,14 +623,7 @@ void testRestartResumeDropPartition() throws Exception {
zookeeperClient.deletePartition(tablePath, partition2Name);

// start the coordinator
eventProcessor =
new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration());
eventProcessor = buildCoordinatorEventProcessor();
initCoordinatorChannel();
eventProcessor.startup();

Expand All @@ -677,6 +637,17 @@ void testRestartResumeDropPartition() throws Exception {
replicationFactor);
}

private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
return new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration(),
Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io")));
}

private void initCoordinatorChannel() throws Exception {
makeSendLeaderAndStopRequestAlwaysSuccess(
testCoordinatorChannelManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets;
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -51,6 +52,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.Executors;

import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NewBucket;
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket;
Expand Down Expand Up @@ -234,7 +236,9 @@ void testStateChangeToOnline() throws Exception {
coordinatorContext,
autoPartitionManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration());
new Configuration(),
Executors.newFixedThreadPool(
1, new ExecutorThreadFactory("test-coordinator-io")));
CoordinatorEventManager eventManager =
new CoordinatorEventManager(coordinatorEventProcessor);
coordinatorRequestBatch =
Expand Down
2 changes: 1 addition & 1 deletion website/docs/maintenance/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ during the Fluss cluster working.
|--------------------------|---------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| coordinator.host | String | (None) | The config parameter defining the network address to connect to for communication with the coordinator server. If the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static hostname or address. |
| coordinator.port | String | 9123 | The config parameter defining the network port to connect to for communication with the coordinator server. Like 'coordinator.host', if the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static port. Otherwise, the value can be set to "0" for a dynamic service name resolution. The value accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. |
| coordinator.io-pool.size | Integer | 1 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 1. |
| coordinator.io-pool.size | Integer | 10 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. |

## TabletServer

Expand Down