Skip to content

Commit 84ab3b9

Browse files
authored
KAFKA-17031: Make RLM thread pool configurations public and fix default handling (apache#17499)
According to KIP-950, remote.log.manager.thread.pool.size should be marked as deprecated and replaced by two new configurations: remote.log.manager.copier.thread.pool.size and remote.log.manager.expiration.thread.pool.size. Fix default handling so that -1 works as expected. Reviewers: Luke Chen <[email protected]>, Gaurav Narula <[email protected]>, Satish Duggana <[email protected]>, Colin P. McCabe <[email protected]>
1 parent e3751a8 commit 84ab3b9

File tree

3 files changed

+39
-18
lines changed

3 files changed

+39
-18
lines changed

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,8 @@ class KafkaConfigTest {
11051105
case RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // ignore string
11061106
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
11071107
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1108+
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
1109+
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
11081110
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
11091111
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
11101112
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.config.AbstractConfig;
2020
import org.apache.kafka.common.config.ConfigDef;
21+
import org.apache.kafka.common.config.ConfigException;
2122

2223
import java.util.Collections;
2324
import java.util.Map;
@@ -93,20 +94,29 @@ public final class RemoteLogManagerConfig {
9394
public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
9495

9596
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size";
96-
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks to copy " +
97+
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " +
9798
"segments, fetch remote log indexes and clean up remote log segments.";
9899
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
99100

101+
private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " +
102+
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
103+
private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with(
104+
(name, value) -> {
105+
if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0");
106+
},
107+
() -> REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK
108+
);
109+
100110
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size";
101-
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in " +
102-
"scheduling tasks to copy segments.";
103-
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10;
111+
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
112+
"to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
113+
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1;
104114

105115
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size";
106-
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in" +
107-
" scheduling tasks to clean up remote log segments.";
108-
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
109-
116+
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
117+
"to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
118+
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1;
119+
110120
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms";
111121
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " +
112122
"segments, and clean up remote log segments.";
@@ -257,16 +267,16 @@ public static ConfigDef configDef() {
257267
atLeast(1),
258268
MEDIUM,
259269
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
260-
.defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
270+
.define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
261271
INT,
262272
DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
263-
atLeast(1),
273+
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
264274
MEDIUM,
265275
REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
266-
.defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
276+
.define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
267277
INT,
268278
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
269-
atLeast(1),
279+
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
270280
MEDIUM,
271281
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
272282
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
@@ -395,11 +405,13 @@ public int remoteLogManagerThreadPoolSize() {
395405
}
396406

397407
public int remoteLogManagerCopierThreadPoolSize() {
398-
return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
408+
int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
409+
return size == -1 ? remoteLogManagerThreadPoolSize() : size;
399410
}
400411

401412
public int remoteLogManagerExpirationThreadPoolSize() {
402-
return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
413+
int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
414+
return size == -1 ? remoteLogManagerThreadPoolSize() : size;
403415
}
404416

405417
public long remoteLogManagerTaskIntervalMs() {

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import static org.junit.jupiter.api.Assertions.assertThrows;
3030

3131
public class RemoteLogManagerConfigTest {
32-
3332
@Test
3433
public void testValidConfigs() {
3534
String rsmPrefix = "__custom.rsm.";
@@ -56,6 +55,16 @@ public void testDefaultConfigs() {
5655
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
5756
}
5857

58+
@Test
59+
public void testThreadPoolDefaults() {
60+
// Even with empty properties, RemoteLogManagerConfig has default values
61+
Map<String, Object> emptyProps = new HashMap<>();
62+
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
63+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
64+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
65+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
66+
}
67+
5968
@Test
6069
public void testValidateEmptyStringConfig() {
6170
// Test with a empty string props should throw ConfigException
@@ -65,7 +74,6 @@ public void testValidateEmptyStringConfig() {
6574
}
6675

6776
private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {
68-
6977
Map<String, Object> props = new HashMap<>();
7078
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
7179
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
@@ -108,7 +116,6 @@ private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {
108116
}
109117

110118
private static class RLMTestConfig extends AbstractConfig {
111-
112119
private final RemoteLogManagerConfig rlmConfig;
113120

114121
public RLMTestConfig(Map<?, ?> originals) {
@@ -120,4 +127,4 @@ public RemoteLogManagerConfig remoteLogManagerConfig() {
120127
return rlmConfig;
121128
}
122129
}
123-
}
130+
}

0 commit comments

Comments
 (0)