diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 7b460ee6b2..06ed54b4f6 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -1,7 +1,9 @@ package kafka.server.streamaspect import com.automq.stream.api.exceptions.FastReadFailFastException +import com.automq.stream.s3.metrics.stats.NetworkStats import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil} +import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy} import com.automq.stream.utils.FutureUtil import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import kafka.automq.interceptor.{ClientIdKey, ClientIdMetadata, TrafficInterceptor} @@ -889,14 +891,16 @@ class ElasticReplicaManager( } var partitionIndex = 0; - while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size) { + while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size && fastReadFastFail.get() == null) { // In each iteration, we read as many partitions as possible until we reach the maximum bytes limit. val readCfArray = readFutureBuffer.get() readCfArray.clear() var assignedBytes = 0 // The total bytes we have assigned to the read requests. val availableBytes = remainingBytes.get() // The remaining bytes we can assign to the read requests, used to control the following loop. - while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size) { + while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size + // When there is a fast read exception, quit the loop earlier. + && fastReadFastFail.get() == null) { // Iterate over the partitions. val tp = readPartitionInfo(partitionIndex)._1 val partitionData = readPartitionInfo(partitionIndex)._2 @@ -971,9 +975,18 @@ class ElasticReplicaManager( release() throw fastReadFastFail.get() } + acquireNetworkOutPermit(limitBytes - remainingBytes.get(), if (ReadHint.isFastRead) ThrottleStrategy.TAIL else ThrottleStrategy.CATCH_UP) result } + private def acquireNetworkOutPermit(size: Int, throttleStrategy: ThrottleStrategy): Unit = { + val start = time.nanoseconds() + GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND) + .consume(throttleStrategy, size).join() + val networkStats = NetworkStats.getInstance() + networkStats.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy).record(time.nanoseconds() - start) + } + def handlePartitionFailure(partitionDir: String): Unit = { warn(s"Stopping serving partition $partitionDir") replicaStateChangeLock synchronized { diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index b804752d9d..9736455afb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -39,7 +39,6 @@ import com.automq.stream.s3.metrics.stats.NetworkStats; import com.automq.stream.s3.metrics.stats.StreamOperationStats; import com.automq.stream.s3.model.StreamRecordBatch; -import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import com.automq.stream.s3.network.NetworkBandwidthLimiter; import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.streams.StreamManager; @@ -252,28 +251,19 @@ public CompletableFuture fetch(FetchContext context, readLock.lock(); try { CompletableFuture cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), logger, "fetch"); - CompletableFuture retCf = cf.thenCompose(rs -> { - if (networkOutboundLimiter != null) { - long totalSize = 0L; - for (RecordBatch recordBatch : rs.recordBatchList()) { - totalSize += recordBatch.rawPayload().remaining(); - } - final long finalSize = totalSize; - long start = System.nanoTime(); - ThrottleStrategy throttleStrategy = context.readOptions().prioritizedRead() ? ThrottleStrategy.BYPASS - : (context.readOptions().fastRead() ? ThrottleStrategy.TAIL : ThrottleStrategy.CATCH_UP); - return networkOutboundLimiter.consume(throttleStrategy, totalSize).thenApply(nil -> { - NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy) - .record(TimerUtil.timeElapsedSince(start, TimeUnit.NANOSECONDS)); - if (context.readOptions().fastRead()) { - NetworkStats.getInstance().fastReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize)); - } else { - NetworkStats.getInstance().slowReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize)); - } - return rs; - }); + CompletableFuture retCf = cf.thenApply(rs -> { + // TODO: move the fast / slow read metrics to kafka module. + long totalSize = 0L; + for (RecordBatch recordBatch : rs.recordBatchList()) { + totalSize += recordBatch.rawPayload().remaining(); + } + final long finalSize = totalSize; + if (context.readOptions().fastRead()) { + NetworkStats.getInstance().fastReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize)); + } else { + NetworkStats.getInstance().slowReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize)); } - return CompletableFuture.completedFuture(rs); + return rs; }); pendingFetches.add(retCf); pendingFetchTimestamps.push(timerUtil.lastAs(TimeUnit.NANOSECONDS));