Skip to content

Commit 8821d3b

Browse files
committed
[server] Coordinator use 'server.background.threads' instead of 'coordinator.io-pool.size' as the default executor config
1 parent d20612e commit 8821d3b

File tree

7 files changed

+63
-79
lines changed

7 files changed

+63
-79
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,6 @@ public class ConfigOptions {
166166
+ " (“50100,50101”), ranges (“50100-50200”) or a combination"
167167
+ " of both.");
168168

169-
public static final ConfigOption<Integer> COORDINATOR_IO_POOL_SIZE =
170-
key("coordinator.io-pool.size")
171-
.intType()
172-
.defaultValue(1)
173-
.withDescription(
174-
"The size of the IO thread pool to run blocking operations for coordinator server. "
175-
+ "This includes discard unnecessary snapshot files. "
176-
+ "Increase this value if you experience slow unnecessary snapshot files clean. "
177-
+ "The default value is 1.");
178-
179169
// ------------------------------------------------------------------------
180170
// ConfigOptions for Tablet Server
181171
// ------------------------------------------------------------------------

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@
8080
import com.alibaba.fluss.server.zk.data.TabletServerRegistration;
8181
import com.alibaba.fluss.server.zk.data.ZkData.PartitionIdsZNode;
8282
import com.alibaba.fluss.server.zk.data.ZkData.TableIdsZNode;
83-
import com.alibaba.fluss.utils.ExecutorUtils;
84-
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
8583
import com.alibaba.fluss.utils.types.Tuple2;
8684

8785
import org.slf4j.Logger;
@@ -99,8 +97,6 @@
9997
import java.util.Set;
10098
import java.util.concurrent.CompletableFuture;
10199
import java.util.concurrent.ExecutorService;
102-
import java.util.concurrent.Executors;
103-
import java.util.concurrent.TimeUnit;
104100
import java.util.stream.Collectors;
105101

106102
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
@@ -109,7 +105,6 @@
109105
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
110106
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
111107
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
112-
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
113108
import static com.alibaba.fluss.utils.concurrent.FutureUtils.completeFromCallable;
114109

115110
/** An implementation for {@link EventProcessor}. */
@@ -134,7 +129,6 @@ public class CoordinatorEventProcessor implements EventProcessor {
134129
private final CoordinatorMetricGroup coordinatorMetricGroup;
135130

136131
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
137-
private final ExecutorService ioExecutor;
138132

139133
// in normal case, it won't be null, but from I can see, it'll only be null in unit test
140134
// since the we won't register a coordinator node in zk.
@@ -153,15 +147,17 @@ public CoordinatorEventProcessor(
153147
CoordinatorChannelManager coordinatorChannelManager,
154148
AutoPartitionManager autoPartitionManager,
155149
CoordinatorMetricGroup coordinatorMetricGroup,
156-
Configuration conf) {
150+
Configuration conf,
151+
ExecutorService ioExecutor) {
157152
this(
158153
zooKeeperClient,
159154
serverMetadataCache,
160155
coordinatorChannelManager,
161156
new CoordinatorContext(),
162157
autoPartitionManager,
163158
coordinatorMetricGroup,
164-
conf);
159+
conf,
160+
ioExecutor);
165161
}
166162

167163
public CoordinatorEventProcessor(
@@ -171,7 +167,8 @@ public CoordinatorEventProcessor(
171167
CoordinatorContext coordinatorContext,
172168
AutoPartitionManager autoPartitionManager,
173169
CoordinatorMetricGroup coordinatorMetricGroup,
174-
Configuration conf) {
170+
Configuration conf,
171+
ExecutorService ioExecutor) {
175172
this.zooKeeperClient = zooKeeperClient;
176173
this.serverMetadataCache = serverMetadataCache;
177174
this.coordinatorChannelManager = coordinatorChannelManager;
@@ -190,11 +187,6 @@ public CoordinatorEventProcessor(
190187
zooKeeperClient);
191188
this.metadataManager = new MetadataManager(zooKeeperClient, conf);
192189

193-
int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
194-
checkArgument(ioExecutorPoolSize > 0, "ioExecutorPoolSize must be positive");
195-
this.ioExecutor =
196-
Executors.newFixedThreadPool(
197-
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
198190
this.tableManager =
199191
new TableManager(
200192
metadataManager,
@@ -430,8 +422,6 @@ private void onShutdown() {
430422
// then stop watchers
431423
tableChangeWatcher.stop();
432424
tabletServerChangeWatcher.stop();
433-
// shutdown io executor
434-
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
435425
}
436426

437427
@Override

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import com.alibaba.fluss.server.zk.ZooKeeperUtils;
3737
import com.alibaba.fluss.server.zk.data.CoordinatorAddress;
3838
import com.alibaba.fluss.utils.ExceptionUtils;
39+
import com.alibaba.fluss.utils.ExecutorUtils;
40+
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
3941
import com.alibaba.fluss.utils.concurrent.FutureUtils;
4042

4143
import org.slf4j.Logger;
@@ -48,6 +50,9 @@
4850
import java.util.List;
4951
import java.util.UUID;
5052
import java.util.concurrent.CompletableFuture;
53+
import java.util.concurrent.ExecutorService;
54+
import java.util.concurrent.Executors;
55+
import java.util.concurrent.TimeUnit;
5156
import java.util.concurrent.atomic.AtomicBoolean;
5257

5358
/**
@@ -109,6 +114,9 @@ public class CoordinatorServer extends ServerBase {
109114
@GuardedBy("lock")
110115
private AutoPartitionManager autoPartitionManager;
111116

117+
@GuardedBy("lock")
118+
private ExecutorService ioExecutor;
119+
112120
public CoordinatorServer(Configuration conf) {
113121
super(conf);
114122
validateConfigs(conf);
@@ -174,6 +182,11 @@ protected void startServices() throws Exception {
174182
new AutoPartitionManager(metadataCache, metadataManager, conf);
175183
autoPartitionManager.start();
176184

185+
int ioExecutorPoolSize = conf.get(ConfigOptions.BACKGROUND_THREADS);
186+
this.ioExecutor =
187+
Executors.newFixedThreadPool(
188+
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
189+
177190
// start coordinator event processor after we register coordinator leader to zk
178191
// so that the event processor can get the coordinator leader node from zk during start
179192
// up.
@@ -186,7 +199,8 @@ protected void startServices() throws Exception {
186199
coordinatorChannelManager,
187200
autoPartitionManager,
188201
serverMetricGroup,
189-
conf);
202+
conf,
203+
ioExecutor);
190204
coordinatorEventProcessor.startup();
191205

192206
createDefaultDatabase();
@@ -265,6 +279,15 @@ CompletableFuture<Void> stopServices() {
265279
exception = ExceptionUtils.firstOrSuppressed(t, exception);
266280
}
267281

282+
try {
283+
if (ioExecutor != null) {
284+
// shutdown io executor
285+
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
286+
}
287+
} catch (Throwable t) {
288+
exception = ExceptionUtils.firstOrSuppressed(t, exception);
289+
}
290+
268291
try {
269292
if (coordinatorEventProcessor != null) {
270293
coordinatorEventProcessor.shutdown();
@@ -358,11 +381,11 @@ private static void validateConfigs(Configuration conf) {
358381
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()));
359382
}
360383

361-
if (conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE) < 1) {
384+
if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
362385
throw new IllegalConfigurationException(
363386
String.format(
364387
"Invalid configuration for %s, it must be greater than or equal 1.",
365-
ConfigOptions.COORDINATOR_IO_POOL_SIZE.key()));
388+
ConfigOptions.BACKGROUND_THREADS.key()));
366389
}
367390

368391
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,13 @@ private static void validateConfigs(Configuration conf) {
392392
ConfigOptions.TABLET_SERVER_ID.key()));
393393
}
394394

395+
if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
396+
throw new IllegalConfigurationException(
397+
String.format(
398+
"Invalid configuration for %s, it must be greater than or equal 1.",
399+
ConfigOptions.BACKGROUND_THREADS.key()));
400+
}
401+
395402
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
396403
throw new IllegalConfigurationException(
397404
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 19 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
5656
import com.alibaba.fluss.types.DataTypes;
5757
import com.alibaba.fluss.utils.ExceptionUtils;
58+
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
5859
import com.alibaba.fluss.utils.types.Tuple2;
5960

6061
import org.junit.jupiter.api.AfterEach;
@@ -74,6 +75,7 @@
7475
import java.util.Optional;
7576
import java.util.Set;
7677
import java.util.concurrent.CompletableFuture;
78+
import java.util.concurrent.Executors;
7779
import java.util.concurrent.TimeUnit;
7880
import java.util.function.Consumer;
7981
import java.util.function.Function;
@@ -146,14 +148,7 @@ void beforeEach() throws IOException {
146148
new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration());
147149
Configuration conf = new Configuration();
148150
conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
149-
eventProcessor =
150-
new CoordinatorEventProcessor(
151-
zookeeperClient,
152-
serverMetadataCache,
153-
testCoordinatorChannelManager,
154-
autoPartitionManager,
155-
TestingMetricGroups.COORDINATOR_METRICS,
156-
new Configuration());
151+
eventProcessor = buildCoordinatorEventProcessor();
157152
eventProcessor.startup();
158153
metadataManager.createDatabase(
159154
defaultDatabase, DatabaseDescriptor.builder().build(), false);
@@ -215,14 +210,7 @@ void testCreateAndDropTable() throws Exception {
215210
metadataManager.dropTable(t2, false);
216211

217212
// start the coordinator
218-
eventProcessor =
219-
new CoordinatorEventProcessor(
220-
zookeeperClient,
221-
serverMetadataCache,
222-
testCoordinatorChannelManager,
223-
autoPartitionManager,
224-
TestingMetricGroups.COORDINATOR_METRICS,
225-
new Configuration());
213+
eventProcessor = buildCoordinatorEventProcessor();
226214
initCoordinatorChannel();
227215
eventProcessor.startup();
228216
// make sure the table can still be deleted successfully
@@ -377,14 +365,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception {
377365

378366
// let's restart to check everything is ok
379367
eventProcessor.shutdown();
380-
eventProcessor =
381-
new CoordinatorEventProcessor(
382-
zookeeperClient,
383-
serverMetadataCache,
384-
testCoordinatorChannelManager,
385-
autoPartitionManager,
386-
TestingMetricGroups.COORDINATOR_METRICS,
387-
new Configuration());
368+
eventProcessor = buildCoordinatorEventProcessor();
388369

389370
// in this test case, so make requests to gateway should always be
390371
// successful for when start up, it will send request to tablet servers
@@ -422,14 +403,7 @@ void testRestartTriggerReplicaToOffline() throws Exception {
422403
// let's restart
423404
initCoordinatorChannel();
424405
eventProcessor.shutdown();
425-
eventProcessor =
426-
new CoordinatorEventProcessor(
427-
zookeeperClient,
428-
serverMetadataCache,
429-
testCoordinatorChannelManager,
430-
autoPartitionManager,
431-
TestingMetricGroups.COORDINATOR_METRICS,
432-
new Configuration());
406+
eventProcessor = buildCoordinatorEventProcessor();
433407
int failedServer = 0;
434408
initCoordinatorChannel(failedServer);
435409
eventProcessor.startup();
@@ -593,14 +567,7 @@ void testCreateAndDropPartition() throws Exception {
593567
metadataManager.dropTable(tablePath, false);
594568

595569
// start the coordinator
596-
eventProcessor =
597-
new CoordinatorEventProcessor(
598-
zookeeperClient,
599-
serverMetadataCache,
600-
testCoordinatorChannelManager,
601-
autoPartitionManager,
602-
TestingMetricGroups.COORDINATOR_METRICS,
603-
new Configuration());
570+
eventProcessor = buildCoordinatorEventProcessor();
604571
initCoordinatorChannel();
605572
eventProcessor.startup();
606573
verifyPartitionDropped(tableId, partition2Id);
@@ -647,14 +614,7 @@ void testRestartResumeDropPartition() throws Exception {
647614
zookeeperClient.deletePartition(tablePath, partition2Name);
648615

649616
// start the coordinator
650-
eventProcessor =
651-
new CoordinatorEventProcessor(
652-
zookeeperClient,
653-
serverMetadataCache,
654-
testCoordinatorChannelManager,
655-
autoPartitionManager,
656-
TestingMetricGroups.COORDINATOR_METRICS,
657-
new Configuration());
617+
eventProcessor = buildCoordinatorEventProcessor();
658618
initCoordinatorChannel();
659619
eventProcessor.startup();
660620

@@ -668,6 +628,17 @@ void testRestartResumeDropPartition() throws Exception {
668628
replicationFactor);
669629
}
670630

631+
private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
632+
return new CoordinatorEventProcessor(
633+
zookeeperClient,
634+
serverMetadataCache,
635+
testCoordinatorChannelManager,
636+
autoPartitionManager,
637+
TestingMetricGroups.COORDINATOR_METRICS,
638+
new Configuration(),
639+
Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io")));
640+
}
641+
671642
private void initCoordinatorChannel() throws Exception {
672643
makeSendLeaderAndStopRequestAlwaysSuccess(
673644
testCoordinatorChannelManager,

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
4141
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets;
4242
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
43+
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
4344

4445
import org.junit.jupiter.api.BeforeAll;
4546
import org.junit.jupiter.api.BeforeEach;
@@ -51,6 +52,7 @@
5152
import java.util.Arrays;
5253
import java.util.Collections;
5354
import java.util.Optional;
55+
import java.util.concurrent.Executors;
5456

5557
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NewBucket;
5658
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket;
@@ -234,7 +236,9 @@ void testStateChangeToOnline() throws Exception {
234236
coordinatorContext,
235237
autoPartitionManager,
236238
TestingMetricGroups.COORDINATOR_METRICS,
237-
new Configuration());
239+
new Configuration(),
240+
Executors.newFixedThreadPool(
241+
1, new ExecutorThreadFactory("test-coordinator-io")));
238242
CoordinatorEventManager eventManager =
239243
new CoordinatorEventManager(coordinatorEventProcessor);
240244
coordinatorRequestBatch =

website/docs/maintenance/configuration.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ during the Fluss cluster working.
5757
|--------------------------|---------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
5858
| coordinator.host | String | (None) | The config parameter defining the network address to connect to for communication with the coordinator server. If the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static hostname or address. |
5959
| coordinator.port | String | 9123 | The config parameter defining the network port to connect to for communication with the coordinator server. Like 'coordinator.host', if the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static port. Otherwise, the value can be set to "0" for a dynamic service name resolution. The value accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. |
60-
| coordinator.io-pool.size | Integer | 1 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 1. |
6160
6261
## TabletServer
6362

0 commit comments

Comments
 (0)