Skip to content

Commit cee57fd

Browse files
authored
chore(config): dynamic change config based on heap size
1 parent 5640433 commit cee57fd

File tree

7 files changed

+22
-11
lines changed

7 files changed

+22
-11
lines changed

core/src/main/java/kafka/automq/AutoMQConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public class AutoMQConfig {
154154
public static final String S3_NETWORK_BASELINE_BANDWIDTH_CONFIG = "s3.network.baseline.bandwidth";
155155
public static final String S3_NETWORK_BASELINE_BANDWIDTH_DOC = "The total available bandwidth for object storage requests. This is used to prevent stream set object compaction and catch-up read from monopolizing normal read and write traffic. Produce and Consume will also separately consume traffic in and traffic out. " +
156156
"For example, suppose this value is set to 100MB/s, and the normal read and write traffic is 80MB/s, then the available traffic for stream set object compaction is 20MB/s.";
157-
public static final long S3_NETWORK_BASELINE_BANDWIDTH = 100 * 1024 * 1024; // 100MB/s
157+
public static final long S3_NETWORK_BASELINE_BANDWIDTH = 1024 * 1024 * 1024; // 1GBps
158158

159159
public static final String S3_NETWORK_REFILL_PERIOD_MS_CONFIG = "s3.network.refill.period.ms";
160160
public static final String S3_NETWORK_REFILL_PERIOD_MS_DOC = "The network bandwidth token refill period in milliseconds.";

core/src/main/java/kafka/automq/zerozone/SnapshotReadPartitionsManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public void close() {
282282
});
283283
}
284284

285-
private void run() {
285+
void run() {
286286
eventLoop.execute(this::unsafeRun);
287287
}
288288

core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,10 @@ class ElasticLog(val metaStream: MetaStream,
640640
}
641641

642642
object ElasticLog extends Logging {
643-
private val APPEND_PERMIT = 100 * 1024 * 1024
643+
private val APPEND_PERMIT = Systems.getEnvInt("AUTOMQ_APPEND_PERMIT_SIZE",
644+
// autoscale the append permit size based on heap size, min 100MiB, max 1GiB, every 6GB heap add 100MiB permit
645+
Math.min(1024, 100 * Math.max(1, (Systems.HEAP_MEMORY_SIZE / (1024 * 1024 * 1024) / 6)).asInstanceOf[Int]) * 1024 * 1024
646+
)
644647
private val APPEND_PERMIT_SEMAPHORE = new Semaphore(APPEND_PERMIT)
645648
S3StreamKafkaMetricsManager.setLogAppendPermitNumSupplier(() => APPEND_PERMIT_SEMAPHORE.availablePermits())
646649

core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.automq.stream.api.exceptions.FastReadFailFastException
44
import com.automq.stream.s3.metrics.stats.NetworkStats
55
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
66
import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy}
7-
import com.automq.stream.utils.FutureUtil
7+
import com.automq.stream.utils.{FutureUtil, Systems}
88
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
99
import kafka.automq.interceptor.{ClientIdKey, ClientIdMetadata, TrafficInterceptor}
1010
import kafka.automq.kafkalinking.KafkaLinkingManager
@@ -124,8 +124,12 @@ class ElasticReplicaManager(
124124
fetchExecutorQueueSizeGaugeMap
125125
})
126126

127-
private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_FAST_NAME) // 200MiB
128-
private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_SLOW_NAME) // 200MiB
127+
private val fetchLimiterSize = Systems.getEnvInt("AUTOMQ_FETCH_LIMITER_SIZE",
128+
// autoscale the fetch limiter size based on heap size, min 200MiB, max 1GiB, every 3GB heap add 100MiB limiter
129+
Math.min(1024, 100 * Math.max(2, (Systems.HEAP_MEMORY_SIZE / (1024 * 1024 * 1024) / 3)).asInstanceOf[Int]) * 1024 * 1024
130+
)
131+
private val fastFetchLimiter = new FairLimiter(fetchLimiterSize, FETCH_LIMITER_FAST_NAME)
132+
private val slowFetchLimiter = new FairLimiter(fetchLimiterSize, FETCH_LIMITER_SLOW_NAME)
129133
private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]()
130134
S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => {
131135
fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads())

core/src/test/java/kafka/automq/zerozone/SnapshotReadPartitionsManagerTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static org.mockito.ArgumentMatchers.anyLong;
6363
import static org.mockito.Mockito.doAnswer;
6464
import static org.mockito.Mockito.mock;
65+
import static org.mockito.Mockito.timeout;
6566
import static org.mockito.Mockito.times;
6667
import static org.mockito.Mockito.verify;
6768
import static org.mockito.Mockito.when;
@@ -152,10 +153,8 @@ public void testSubscriber_zerozonev1() throws Exception {
152153
operationBatch.operations.add(snapshotWithOperation(2, Map.of(4L, 6L), SnapshotOperation.ADD));
153154
subscriber.onNewOperationBatch(operationBatch);
154155

155-
subscriber.unsafeRun();
156-
157-
verify(dataLoader, times(1)).replayWal();
158-
156+
subscriber.run();
157+
verify(dataLoader, timeout(1000L).times(1)).replayWal();
159158
awaitEventLoopClear();
160159

161160
assertEquals(2, subscriber.partitions.size());

s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.automq.stream.s3.objects.ObjectManager;
3535
import com.automq.stream.utils.FutureUtil;
3636
import com.automq.stream.utils.LogSuppressor;
37+
import com.automq.stream.utils.Systems;
3738
import com.automq.stream.utils.Time;
3839
import com.automq.stream.utils.threads.EventLoop;
3940
import com.automq.stream.utils.threads.EventLoopSafe;
@@ -64,7 +65,7 @@
6465
public static final int GET_OBJECT_STEP = 4;
6566
private static final Logger LOGGER = LoggerFactory.getLogger(StreamReader.class);
6667
static final int READAHEAD_SIZE_UNIT = 1024 * 1024 / 2;
67-
private static final int MAX_READAHEAD_SIZE = 32 * 1024 * 1024;
68+
private static final int MAX_READAHEAD_SIZE = Systems.getEnvInt("AUTOMQ_MAX_READAHEAD_SIZE", 32 * 1024 * 1024);
6869
private static final long READAHEAD_RESET_COLD_DOWN_MILLS = TimeUnit.MINUTES.toMillis(1);
6970
private static final long READAHEAD_AVAILABLE_BYTES_THRESHOLD = 32L * 1024 * 1024;
7071
private static final LogSuppressor READAHEAD_RESET_LOG_SUPPRESSOR = new LogSuppressor(LOGGER, 30000);

s3stream/src/main/java/com/automq/stream/utils/Systems.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121

2222
import org.apache.commons.lang3.StringUtils;
2323

24+
import io.netty.util.internal.PlatformDependent;
25+
2426
public class Systems {
2527
public static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
28+
public static final long HEAP_MEMORY_SIZE = Runtime.getRuntime().maxMemory();
29+
public static final long DIRECT_MEMORY_SIZE = PlatformDependent.maxDirectMemory();
2630

2731
public static long getEnvLong(String name, long defaultValue) {
2832
String value = System.getenv(name);

0 commit comments

Comments
 (0)