Skip to content

Commit d50499e

Browse files
authored
fix(s3stream): fix the network out over-consumed (#2752) (#2753)
Consider the following scenario: 1. A Fetch request contains partitions P1 and P2. The data of P1 is in LogCache, while the data of P2 is not. 2. First, a fast read will be attempted. At this time, P1 will return data and consume Network Out, and P2 will return a FastReadException. 3. Due to the FastReadException, the entire Fetch attempts a slow read. At this time, both P1 and P2 return data and consume Network Out. 4. At this point, the Network Out in step 2 is consumed repeatedly. Solution: Move the S3Stream network out consumption to ElasticReplicaManager. Avoid the network out traffic over-consumed, when there are mixin(tail read & catch-up read) partitions reading.
1 parent 8641ba8 commit d50499e

File tree

2 files changed

+27
-24
lines changed

2 files changed

+27
-24
lines changed

core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package kafka.server.streamaspect
22

33
import com.automq.stream.api.exceptions.FastReadFailFastException
4+
import com.automq.stream.s3.metrics.stats.NetworkStats
45
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
6+
import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy}
57
import com.automq.stream.utils.FutureUtil
68
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
79
import kafka.automq.interceptor.{ClientIdKey, ClientIdMetadata, TrafficInterceptor}
@@ -889,14 +891,16 @@ class ElasticReplicaManager(
889891
}
890892

891893
var partitionIndex = 0;
892-
while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size) {
894+
while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size && fastReadFastFail.get() == null) {
893895
// In each iteration, we read as many partitions as possible until we reach the maximum bytes limit.
894896
val readCfArray = readFutureBuffer.get()
895897
readCfArray.clear()
896898
var assignedBytes = 0 // The total bytes we have assigned to the read requests.
897899
val availableBytes = remainingBytes.get() // The remaining bytes we can assign to the read requests, used to control the following loop.
898900

899-
while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size) {
901+
while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size
902+
// When there is a fast read exception, quit the loop earlier.
903+
&& fastReadFastFail.get() == null) {
900904
// Iterate over the partitions.
901905
val tp = readPartitionInfo(partitionIndex)._1
902906
val partitionData = readPartitionInfo(partitionIndex)._2
@@ -971,9 +975,18 @@ class ElasticReplicaManager(
971975
release()
972976
throw fastReadFastFail.get()
973977
}
978+
acquireNetworkOutPermit(limitBytes - remainingBytes.get(), if (ReadHint.isFastRead) ThrottleStrategy.TAIL else ThrottleStrategy.CATCH_UP)
974979
result
975980
}
976981

982+
private def acquireNetworkOutPermit(size: Int, throttleStrategy: ThrottleStrategy): Unit = {
983+
val start = time.nanoseconds()
984+
GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
985+
.consume(throttleStrategy, size).join()
986+
val networkStats = NetworkStats.getInstance()
987+
networkStats.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy).record(time.nanoseconds() - start)
988+
}
989+
977990
def handlePartitionFailure(partitionDir: String): Unit = {
978991
warn(s"Stopping serving partition $partitionDir")
979992
replicaStateChangeLock synchronized {

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.automq.stream.s3.metrics.stats.NetworkStats;
4040
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
4141
import com.automq.stream.s3.model.StreamRecordBatch;
42-
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
4342
import com.automq.stream.s3.network.NetworkBandwidthLimiter;
4443
import com.automq.stream.s3.network.ThrottleStrategy;
4544
import com.automq.stream.s3.streams.StreamManager;
@@ -240,28 +239,19 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
240239
readLock.lock();
241240
try {
242241
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
243-
CompletableFuture<FetchResult> retCf = cf.thenCompose(rs -> {
244-
if (networkOutboundLimiter != null) {
245-
long totalSize = 0L;
246-
for (RecordBatch recordBatch : rs.recordBatchList()) {
247-
totalSize += recordBatch.rawPayload().remaining();
248-
}
249-
final long finalSize = totalSize;
250-
long start = System.nanoTime();
251-
ThrottleStrategy throttleStrategy = context.readOptions().prioritizedRead() ? ThrottleStrategy.BYPASS
252-
: (context.readOptions().fastRead() ? ThrottleStrategy.TAIL : ThrottleStrategy.CATCH_UP);
253-
return networkOutboundLimiter.consume(throttleStrategy, totalSize).thenApply(nil -> {
254-
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy)
255-
.record(TimerUtil.timeElapsedSince(start, TimeUnit.NANOSECONDS));
256-
if (context.readOptions().fastRead()) {
257-
NetworkStats.getInstance().fastReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
258-
} else {
259-
NetworkStats.getInstance().slowReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
260-
}
261-
return rs;
262-
});
242+
CompletableFuture<FetchResult> retCf = cf.thenApply(rs -> {
243+
// TODO: move the fast / slow read metrics to kafka module.
244+
long totalSize = 0L;
245+
for (RecordBatch recordBatch : rs.recordBatchList()) {
246+
totalSize += recordBatch.rawPayload().remaining();
247+
}
248+
final long finalSize = totalSize;
249+
if (context.readOptions().fastRead()) {
250+
NetworkStats.getInstance().fastReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
251+
} else {
252+
NetworkStats.getInstance().slowReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
263253
}
264-
return CompletableFuture.completedFuture(rs);
254+
return rs;
265255
});
266256
pendingFetches.add(retCf);
267257
pendingFetchTimestamps.push(timerUtil.lastAs(TimeUnit.NANOSECONDS));

0 commit comments

Comments
 (0)