Skip to content

Commit e924d5d

Browse files
committed
Revert "bring back publish throttling"
This reverts commit bc6a0a2.
1 parent fa9254b commit e924d5d

File tree

1 file changed

+0
-38
lines changed
  • kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage

1 file changed

+0
-38
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
import com.google.common.annotations.VisibleForTesting;
1717
import com.google.common.collect.Maps;
1818
import io.netty.buffer.ByteBuf;
19-
import io.netty.channel.ChannelHandlerContext;
2019
import io.netty.util.Recycler;
21-
import io.netty.util.concurrent.FastThreadLocal;
2220
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
2321
import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager;
2422
import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService;
@@ -87,7 +85,6 @@
8785
import org.apache.kafka.common.record.RecordBatch;
8886
import org.apache.kafka.common.requests.FetchResponse;
8987
import org.apache.kafka.common.utils.Time;
90-
import org.apache.pulsar.broker.service.Topic;
9188
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
9289
import org.apache.pulsar.broker.service.plugin.EntryFilter;
9390
import org.apache.pulsar.common.naming.TopicName;
@@ -138,7 +135,6 @@ public CompletableFuture<KeyValueSchemaIds> getSchemaIds(String topic, BytesSche
138135
private final KafkaTopicLookupService kafkaTopicLookupService;
139136

140137
private final List<EntryFilter> entryFilters;
141-
private final boolean preciseTopicPublishRateLimitingEnable;
142138

143139
private final ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer;
144140

@@ -172,7 +168,6 @@ public PartitionLog(KafkaServiceConfiguration kafkaConfig,
172168
this.time = time;
173169
this.topicPartition = topicPartition;
174170
this.fullPartitionName = fullPartitionName;
175-
this.preciseTopicPublishRateLimitingEnable = kafkaConfig.isPreciseTopicPublishRateLimiterEnable();
176171
this.kafkaTopicLookupService = kafkaTopicLookupService;
177172
this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer;
178173
this.recoveryExecutor = recoveryExecutor.chooseThread(fullPartitionName);
@@ -898,8 +893,6 @@ private void publishMessages(final CompletableFuture<Long> appendFuture,
898893
final LogAppendInfo appendInfo,
899894
final EncodeResult encodeResult,
900895
final AppendRecordsContext appendRecordsContext) {
901-
checkAndRecordPublishQuota(persistentTopic, appendInfo.validBytes(),
902-
appendInfo.numMessages(), appendRecordsContext);
903896
if (persistentTopic.isSystemTopic()) {
904897
encodeResult.recycle();
905898
log.error("Not support producing message to system topic: {}", persistentTopic);
@@ -944,37 +937,6 @@ private void publishMessages(final CompletableFuture<Long> appendFuture,
944937
});
945938
}
946939

947-
private void checkAndRecordPublishQuota(Topic topic, int msgSize, int numMessages,
948-
AppendRecordsContext appendRecordsContext) {
949-
final boolean isPublishRateExceeded;
950-
if (preciseTopicPublishRateLimitingEnable) {
951-
boolean isPreciseTopicPublishRateExceeded =
952-
topic.isTopicPublishRateExceeded(numMessages, msgSize);
953-
if (isPreciseTopicPublishRateExceeded) {
954-
topic.disableCnxAutoRead();
955-
return;
956-
}
957-
isPublishRateExceeded = topic.isBrokerPublishRateExceeded();
958-
} else {
959-
if (topic.isResourceGroupRateLimitingEnabled()) {
960-
final boolean resourceGroupPublishRateExceeded =
961-
topic.isResourceGroupPublishRateExceeded(numMessages, msgSize);
962-
if (resourceGroupPublishRateExceeded) {
963-
topic.disableCnxAutoRead();
964-
return;
965-
}
966-
}
967-
isPublishRateExceeded = topic.isPublishRateExceeded();
968-
}
969-
970-
if (isPublishRateExceeded) {
971-
ChannelHandlerContext ctx = appendRecordsContext.getCtx();
972-
if (ctx != null && ctx.channel().config().isAutoRead()) {
973-
ctx.channel().config().setAutoRead(false);
974-
}
975-
}
976-
}
977-
978940
/**
979941
* Publish message to bookkeeper.
980942
* When the message is control message, then it will not do the message deduplication.

0 commit comments

Comments
 (0)