|
1 | 1 | package kafka.server.streamaspect
|
2 | 2 |
|
3 | 3 | import com.automq.stream.api.exceptions.FastReadFailFastException
|
| 4 | +import com.automq.stream.s3.metrics.stats.NetworkStats |
4 | 5 | import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
|
| 6 | +import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy} |
5 | 7 | import com.automq.stream.utils.FutureUtil
|
6 | 8 | import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
|
7 | 9 | import kafka.automq.interceptor.{ClientIdKey, ClientIdMetadata, TrafficInterceptor}
|
@@ -889,14 +891,16 @@ class ElasticReplicaManager(
|
889 | 891 | }
|
890 | 892 |
|
891 | 893 | var partitionIndex = 0;
|
892 |
| - while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size) { |
| 894 | + while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size && fastReadFastFail.get() == null) { |
893 | 895 | // In each iteration, we read as many partitions as possible until we reach the maximum bytes limit.
|
894 | 896 | val readCfArray = readFutureBuffer.get()
|
895 | 897 | readCfArray.clear()
|
896 | 898 | var assignedBytes = 0 // The total bytes we have assigned to the read requests.
|
897 | 899 | val availableBytes = remainingBytes.get() // The remaining bytes we can assign to the read requests, used to control the following loop.
|
898 | 900 |
|
899 |
| - while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size) { |
| 901 | + while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size |
| 902 | + // When there is a fast read exception, quit the loop earlier. |
| 903 | + && fastReadFastFail.get() == null) { |
900 | 904 | // Iterate over the partitions.
|
901 | 905 | val tp = readPartitionInfo(partitionIndex)._1
|
902 | 906 | val partitionData = readPartitionInfo(partitionIndex)._2
|
@@ -971,9 +975,18 @@ class ElasticReplicaManager(
|
971 | 975 | release()
|
972 | 976 | throw fastReadFastFail.get()
|
973 | 977 | }
|
| 978 | + acquireNetworkOutPermit(limitBytes - remainingBytes.get(), if (ReadHint.isFastRead) ThrottleStrategy.TAIL else ThrottleStrategy.CATCH_UP) |
974 | 979 | result
|
975 | 980 | }
|
976 | 981 |
|
| 982 | + private def acquireNetworkOutPermit(size: Int, throttleStrategy: ThrottleStrategy): Unit = { |
| 983 | + val start = time.nanoseconds() |
| 984 | + GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND) |
| 985 | + .consume(throttleStrategy, size).join() |
| 986 | + val networkStats = NetworkStats.getInstance() |
| 987 | + networkStats.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy).record(time.nanoseconds() - start) |
| 988 | + } |
| 989 | + |
977 | 990 | def handlePartitionFailure(partitionDir: String): Unit = {
|
978 | 991 | warn(s"Stopping serving partition $partitionDir")
|
979 | 992 | replicaStateChangeLock synchronized {
|
|
0 commit comments