Skip to content

Commit 5520dde

Browse files
authored
fix(s3stream): fix the network out over-consumed (#2752)
Consider the following scenario: A Fetch request contains partitions P1 and P2. The data of P1 is in LogCache, while the data of P2 is not. First, a fast read will be attempted. At this time, P1 will return data and consume Network Out, and P2 will return a FastReadException. Due to the FastReadException, the entire Fetch attempts a slow read. At this time, both P1 and P2 return data and consume Network Out. At this point, the Network Out in step 2 is consumed repeatedly. Solution: Move the S3Stream network out consumption to ElasticReplicaManager. Avoid the network out traffic over-consumed, when there are mixin(tail read & catch-up read) partitions reading. Signed-off-by: Robin Han <[email protected]>
1 parent c5672b3 commit 5520dde

File tree

2 files changed

+27
-24
lines changed

2 files changed

+27
-24
lines changed

core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package kafka.server.streamaspect
22

33
import com.automq.stream.api.exceptions.FastReadFailFastException
4+
import com.automq.stream.s3.metrics.stats.NetworkStats
45
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
6+
import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy}
57
import com.automq.stream.utils.FutureUtil
68
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
79
import kafka.automq.interceptor.{ClientIdKey, ClientIdMetadata, TrafficInterceptor}
@@ -889,14 +891,16 @@ class ElasticReplicaManager(
889891
}
890892

891893
var partitionIndex = 0;
892-
while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size) {
894+
while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size && fastReadFastFail.get() == null) {
893895
// In each iteration, we read as many partitions as possible until we reach the maximum bytes limit.
894896
val readCfArray = readFutureBuffer.get()
895897
readCfArray.clear()
896898
var assignedBytes = 0 // The total bytes we have assigned to the read requests.
897899
val availableBytes = remainingBytes.get() // The remaining bytes we can assign to the read requests, used to control the following loop.
898900

899-
while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size) {
901+
while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size
902+
// When there is a fast read exception, quit the loop earlier.
903+
&& fastReadFastFail.get() == null) {
900904
// Iterate over the partitions.
901905
val tp = readPartitionInfo(partitionIndex)._1
902906
val partitionData = readPartitionInfo(partitionIndex)._2
@@ -971,9 +975,18 @@ class ElasticReplicaManager(
971975
release()
972976
throw fastReadFastFail.get()
973977
}
978+
acquireNetworkOutPermit(limitBytes - remainingBytes.get(), if (ReadHint.isFastRead) ThrottleStrategy.TAIL else ThrottleStrategy.CATCH_UP)
974979
result
975980
}
976981

982+
private def acquireNetworkOutPermit(size: Int, throttleStrategy: ThrottleStrategy): Unit = {
983+
val start = time.nanoseconds()
984+
GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
985+
.consume(throttleStrategy, size).join()
986+
val networkStats = NetworkStats.getInstance()
987+
networkStats.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy).record(time.nanoseconds() - start)
988+
}
989+
977990
def handlePartitionFailure(partitionDir: String): Unit = {
978991
warn(s"Stopping serving partition $partitionDir")
979992
replicaStateChangeLock synchronized {

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.automq.stream.s3.metrics.stats.NetworkStats;
4040
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
4141
import com.automq.stream.s3.model.StreamRecordBatch;
42-
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
4342
import com.automq.stream.s3.network.NetworkBandwidthLimiter;
4443
import com.automq.stream.s3.network.ThrottleStrategy;
4544
import com.automq.stream.s3.streams.StreamManager;
@@ -252,28 +251,19 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
252251
readLock.lock();
253252
try {
254253
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), logger, "fetch");
255-
CompletableFuture<FetchResult> retCf = cf.thenCompose(rs -> {
256-
if (networkOutboundLimiter != null) {
257-
long totalSize = 0L;
258-
for (RecordBatch recordBatch : rs.recordBatchList()) {
259-
totalSize += recordBatch.rawPayload().remaining();
260-
}
261-
final long finalSize = totalSize;
262-
long start = System.nanoTime();
263-
ThrottleStrategy throttleStrategy = context.readOptions().prioritizedRead() ? ThrottleStrategy.BYPASS
264-
: (context.readOptions().fastRead() ? ThrottleStrategy.TAIL : ThrottleStrategy.CATCH_UP);
265-
return networkOutboundLimiter.consume(throttleStrategy, totalSize).thenApply(nil -> {
266-
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy)
267-
.record(TimerUtil.timeElapsedSince(start, TimeUnit.NANOSECONDS));
268-
if (context.readOptions().fastRead()) {
269-
NetworkStats.getInstance().fastReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
270-
} else {
271-
NetworkStats.getInstance().slowReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
272-
}
273-
return rs;
274-
});
254+
CompletableFuture<FetchResult> retCf = cf.thenApply(rs -> {
255+
// TODO: move the fast / slow read metrics to kafka module.
256+
long totalSize = 0L;
257+
for (RecordBatch recordBatch : rs.recordBatchList()) {
258+
totalSize += recordBatch.rawPayload().remaining();
259+
}
260+
final long finalSize = totalSize;
261+
if (context.readOptions().fastRead()) {
262+
NetworkStats.getInstance().fastReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
263+
} else {
264+
NetworkStats.getInstance().slowReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize));
275265
}
276-
return CompletableFuture.completedFuture(rs);
266+
return rs;
277267
});
278268
pendingFetches.add(retCf);
279269
pendingFetchTimestamps.push(timerUtil.lastAs(TimeUnit.NANOSECONDS));

0 commit comments

Comments
 (0)