diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index d3c53e22ea..7f31df208e 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -226,12 +226,12 @@ public class ConfigOptions { public static final ConfigOption COORDINATOR_IO_POOL_SIZE = key("coordinator.io-pool.size") .intType() - .defaultValue(1) + .defaultValue(10) .withDescription( "The size of the IO thread pool to run blocking operations for coordinator server. " + "This includes discard unnecessary snapshot files. " + "Increase this value if you experience slow unnecessary snapshot files clean. " - + "The default value is 1."); + + "The default value is 10."); // ------------------------------------------------------------------------ // ConfigOptions for Tablet Server diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java index 7b4057f8a3..4b14e91049 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -82,8 +82,6 @@ import com.alibaba.fluss.server.zk.data.TabletServerRegistration; import com.alibaba.fluss.server.zk.data.ZkData.PartitionIdsZNode; import com.alibaba.fluss.server.zk.data.ZkData.TableIdsZNode; -import com.alibaba.fluss.utils.ExecutorUtils; -import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; import com.alibaba.fluss.utils.types.Tuple2; import org.slf4j.Logger; @@ -101,8 +99,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OfflineBucket; @@ -111,7 +107,6 @@ import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica; import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted; import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful; -import static com.alibaba.fluss.utils.Preconditions.checkArgument; import static com.alibaba.fluss.utils.concurrent.FutureUtils.completeFromCallable; /** An implementation for {@link EventProcessor}. */ @@ -137,7 +132,6 @@ public class CoordinatorEventProcessor implements EventProcessor { private final String internalListenerName; private final CompletedSnapshotStoreManager completedSnapshotStoreManager; - private final ExecutorService ioExecutor; // in normal case, it won't be null, but from I can see, it'll only be null in unit test // since the we won't register a coordinator node in zk. @@ -156,7 +150,8 @@ public CoordinatorEventProcessor( CoordinatorChannelManager coordinatorChannelManager, AutoPartitionManager autoPartitionManager, CoordinatorMetricGroup coordinatorMetricGroup, - Configuration conf) { + Configuration conf, + ExecutorService ioExecutor) { this( zooKeeperClient, serverMetadataCache, @@ -164,7 +159,8 @@ public CoordinatorEventProcessor( new CoordinatorContext(), autoPartitionManager, coordinatorMetricGroup, - conf); + conf, + ioExecutor); } public CoordinatorEventProcessor( @@ -174,7 +170,8 @@ public CoordinatorEventProcessor( CoordinatorContext coordinatorContext, AutoPartitionManager autoPartitionManager, CoordinatorMetricGroup coordinatorMetricGroup, - Configuration conf) { + Configuration conf, + ExecutorService ioExecutor) { this.zooKeeperClient = zooKeeperClient; this.serverMetadataCache = serverMetadataCache; this.coordinatorChannelManager = coordinatorChannelManager; @@ -193,11 +190,6 @@ public CoordinatorEventProcessor( zooKeeperClient); this.metadataManager = new MetadataManager(zooKeeperClient, conf); - int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE); - checkArgument(ioExecutorPoolSize > 0, "ioExecutorPoolSize must be positive"); - this.ioExecutor = - Executors.newFixedThreadPool( - ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io")); this.tableManager = new TableManager( metadataManager, @@ -446,8 +438,6 @@ private void onShutdown() { // then stop watchers tableChangeWatcher.stop(); tabletServerChangeWatcher.stop(); - // shutdown io executor - ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor); } @Override diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java index 226b193903..c61e4d8a33 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java @@ -38,6 +38,8 @@ import com.alibaba.fluss.server.zk.ZooKeeperUtils; import com.alibaba.fluss.server.zk.data.CoordinatorAddress; import com.alibaba.fluss.utils.ExceptionUtils; +import com.alibaba.fluss.utils.ExecutorUtils; +import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; import com.alibaba.fluss.utils.concurrent.FutureUtils; import org.slf4j.Logger; @@ -50,6 +52,9 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -111,6 +116,9 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private AutoPartitionManager autoPartitionManager; + @GuardedBy("lock") + private ExecutorService ioExecutor; + public CoordinatorServer(Configuration conf) { super(conf); validateConfigs(conf); @@ -175,6 +183,11 @@ protected void startServices() throws Exception { new AutoPartitionManager(metadataCache, metadataManager, conf); autoPartitionManager.start(); + int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE); + this.ioExecutor = + Executors.newFixedThreadPool( + ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io")); + // start coordinator event processor after we register coordinator leader to zk // so that the event processor can get the coordinator leader node from zk during start // up. @@ -187,7 +200,8 @@ protected void startServices() throws Exception { coordinatorChannelManager, autoPartitionManager, serverMetricGroup, - conf); + conf, + ioExecutor); coordinatorEventProcessor.startup(); createDefaultDatabase(); @@ -268,6 +282,15 @@ CompletableFuture stopServices() { exception = ExceptionUtils.firstOrSuppressed(t, exception); } + try { + if (ioExecutor != null) { + // shutdown io executor + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor); + } + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { if (coordinatorEventProcessor != null) { coordinatorEventProcessor.shutdown(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java index c6a1b6f614..b0b0e9a76e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java @@ -400,6 +400,13 @@ private static void validateConfigs(Configuration conf) { ConfigOptions.TABLET_SERVER_ID.key())); } + if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be greater than or equal 1.", + ConfigOptions.BACKGROUND_THREADS.key())); + } + if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { throw new IllegalConfigurationException( String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 5c8f7afdbb..56ca4af065 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -56,6 +56,7 @@ import com.alibaba.fluss.testutils.common.AllCallbackWrapper; import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.utils.ExceptionUtils; +import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; import com.alibaba.fluss.utils.types.Tuple2; import org.junit.jupiter.api.AfterEach; @@ -75,6 +76,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -152,14 +154,7 @@ void beforeEach() throws IOException { new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration()); Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); - eventProcessor = - new CoordinatorEventProcessor( - zookeeperClient, - serverMetadataCache, - testCoordinatorChannelManager, - autoPartitionManager, - TestingMetricGroups.COORDINATOR_METRICS, - new Configuration()); + eventProcessor = buildCoordinatorEventProcessor(); eventProcessor.startup(); metadataManager.createDatabase( defaultDatabase, DatabaseDescriptor.builder().build(), false); @@ -221,14 +216,7 @@ void testCreateAndDropTable() throws Exception { metadataManager.dropTable(t2, false); // start the coordinator - eventProcessor = - new CoordinatorEventProcessor( - zookeeperClient, - serverMetadataCache, - testCoordinatorChannelManager, - autoPartitionManager, - TestingMetricGroups.COORDINATOR_METRICS, - new Configuration()); + eventProcessor = buildCoordinatorEventProcessor(); initCoordinatorChannel(); eventProcessor.startup(); // make sure the table can still be deleted successfully @@ -386,14 +374,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { // let's restart to check everything is ok eventProcessor.shutdown(); - eventProcessor = - new CoordinatorEventProcessor( - zookeeperClient, - serverMetadataCache, - testCoordinatorChannelManager, - autoPartitionManager, - TestingMetricGroups.COORDINATOR_METRICS, - new Configuration()); + eventProcessor = buildCoordinatorEventProcessor(); // in this test case, so make requests to gateway should always be // successful for when start up, it will send request to tablet servers @@ -431,14 +412,7 @@ void testRestartTriggerReplicaToOffline() throws Exception { // let's restart initCoordinatorChannel(); eventProcessor.shutdown(); - eventProcessor = - new CoordinatorEventProcessor( - zookeeperClient, - serverMetadataCache, - testCoordinatorChannelManager, - autoPartitionManager, - TestingMetricGroups.COORDINATOR_METRICS, - new Configuration()); + eventProcessor = buildCoordinatorEventProcessor(); int failedServer = 0; initCoordinatorChannel(failedServer); eventProcessor.startup(); @@ -602,14 +576,7 @@ void testCreateAndDropPartition() throws Exception { metadataManager.dropTable(tablePath, false); // start the coordinator - eventProcessor = - new CoordinatorEventProcessor( - zookeeperClient, - serverMetadataCache, - testCoordinatorChannelManager, - autoPartitionManager, - TestingMetricGroups.COORDINATOR_METRICS, - new Configuration()); + eventProcessor = buildCoordinatorEventProcessor(); initCoordinatorChannel(); eventProcessor.startup(); verifyPartitionDropped(tableId, partition2Id); @@ -656,14 +623,7 @@ void testRestartResumeDropPartition() throws Exception { zookeeperClient.deletePartition(tablePath, partition2Name); // start the coordinator - eventProcessor = - new CoordinatorEventProcessor( - zookeeperClient, - serverMetadataCache, - testCoordinatorChannelManager, - autoPartitionManager, - TestingMetricGroups.COORDINATOR_METRICS, - new Configuration()); + eventProcessor = buildCoordinatorEventProcessor(); initCoordinatorChannel(); eventProcessor.startup(); @@ -677,6 +637,17 @@ void testRestartResumeDropPartition() throws Exception { replicationFactor); } + private CoordinatorEventProcessor buildCoordinatorEventProcessor() { + return new CoordinatorEventProcessor( + zookeeperClient, + serverMetadataCache, + testCoordinatorChannelManager, + autoPartitionManager, + TestingMetricGroups.COORDINATOR_METRICS, + new Configuration(), + Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io"))); + } + private void initCoordinatorChannel() throws Exception { makeSendLeaderAndStopRequestAlwaysSuccess( testCoordinatorChannelManager, diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index 62fd361c94..dd5ebbcc87 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -40,6 +40,7 @@ import com.alibaba.fluss.server.zk.data.LeaderAndIsr; import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets; import com.alibaba.fluss.testutils.common.AllCallbackWrapper; +import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -51,6 +52,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.Executors; import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NewBucket; import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket; @@ -234,7 +236,9 @@ void testStateChangeToOnline() throws Exception { coordinatorContext, autoPartitionManager, TestingMetricGroups.COORDINATOR_METRICS, - new Configuration()); + new Configuration(), + Executors.newFixedThreadPool( + 1, new ExecutorThreadFactory("test-coordinator-io"))); CoordinatorEventManager eventManager = new CoordinatorEventManager(coordinatorEventProcessor); coordinatorRequestBatch = diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 0a5a049276..56ef99db7e 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -57,7 +57,7 @@ during the Fluss cluster working. |--------------------------|---------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | coordinator.host | String | (None) | The config parameter defining the network address to connect to for communication with the coordinator server. If the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static hostname or address. | | coordinator.port | String | 9123 | The config parameter defining the network port to connect to for communication with the coordinator server. Like 'coordinator.host', if the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static port. Otherwise, the value can be set to "0" for a dynamic service name resolution. The value accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. | -| coordinator.io-pool.size | Integer | 1 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 1. | +| coordinator.io-pool.size | Integer | 10 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. | ## TabletServer