Skip to content

Commit a605066

Browse files
authored
fix(s3stream): remove unnecessary config check (#2084)
Signed-off-by: Shichao Nie <[email protected]>
1 parent b4f0adf commit a605066

File tree

2 files changed

+14
-10
lines changed

2 files changed

+14
-10
lines changed

s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ protected AbstractObjectStorage(
122122
if (!manualMergeRead) {
123123
scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS);
124124
}
125-
checkConfig();
126125
S3StreamMetricsManager.registerInflightS3ReadQuotaSupplier(inflightReadLimiter::availablePermits, currentIndex);
127126
S3StreamMetricsManager.registerInflightS3WriteQuotaSupplier(inflightWriteLimiter::availablePermits, currentIndex);
128127

@@ -663,15 +662,6 @@ static int getMaxObjectStorageConcurrency() {
663662
return Math.max(MIN_CONCURRENCY, Math.min(cpuCores * DEFAULT_CONCURRENCY_PER_CORE, MAX_CONCURRENCY));
664663
}
665664

666-
private void checkConfig() {
667-
if (this.networkInboundBandwidthLimiter != null) {
668-
if (this.networkInboundBandwidthLimiter.getMaxTokens() < Writer.MIN_PART_SIZE) {
669-
throw new IllegalArgumentException(String.format("Network inbound burst bandwidth limit %d must be no less than min part size %d",
670-
this.networkInboundBandwidthLimiter.getMaxTokens(), Writer.MIN_PART_SIZE));
671-
}
672-
}
673-
}
674-
675665
/**
676666
* Acquire read permit, permit will auto release when cf complete.
677667
*

s3stream/src/test/java/com/automq/stream/s3/AsyncNetworkBandwidthLimiterTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,18 @@ public void testThrottleConsumeWithPriority1() {
152152
result.join();
153153
cf2.join();
154154
}
155+
156+
@Test
157+
@Timeout(10)
158+
public void testThrottleConsumeWithLargeChunk() {
159+
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 1024 * 1024, 100);
160+
CompletableFuture<Void> bypassCf = bucket.consume(ThrottleStrategy.BYPASS, 1024 * 1024);
161+
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.CATCH_UP, 10 * 1024 * 1024);
162+
CompletableFuture<Void> result = cf.whenComplete((v, e) -> {
163+
Assertions.assertNull(e);
164+
Assertions.assertEquals(0, bucket.getAvailableTokens());
165+
Assertions.assertTrue(bypassCf.isDone());
166+
});
167+
result.join();
168+
}
155169
}

0 commit comments

Comments
 (0)