Skip to content

Commit fa9254b

Browse files
committed
Code changes for pulsar 4.0
1 parent c5d04e4 commit fa9254b

File tree

12 files changed

+57
-59
lines changed

12 files changed

+57
-59
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import java.util.stream.Collectors;
4141
import lombok.extern.slf4j.Slf4j;
4242
import org.apache.bookkeeper.mledger.Position;
43-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
43+
import org.apache.bookkeeper.mledger.PositionFactory;
4444
import org.apache.kafka.common.Node;
4545
import org.apache.kafka.common.config.ConfigResource;
4646
import org.apache.kafka.common.errors.InvalidPartitionsException;
@@ -291,7 +291,7 @@ public void truncateTopic(String topicToDelete,
291291
errorConsumer.accept("Cannot find position");
292292
return;
293293
}
294-
if (position.equals(PositionImpl.LATEST)) {
294+
if (position.equals(PositionFactory.LATEST)) {
295295
admin.topics()
296296
.truncateAsync(topicToDelete)
297297
.thenRun(() -> {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2323
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2424
import lombok.extern.slf4j.Slf4j;
25-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
25+
import org.apache.bookkeeper.mledger.Position;
26+
import org.apache.bookkeeper.mledger.PositionFactory;
2627
import org.apache.kafka.common.TopicPartition;
2728
import org.apache.kafka.common.message.FetchRequestData;
2829

@@ -112,12 +113,12 @@ public boolean tryComplete() {
112113
if (partitionLog == null) {
113114
return true;
114115
}
115-
PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition();
116-
if (currLastPosition.compareTo(PositionImpl.EARLIEST) == 0) {
116+
Position currLastPosition = partitionLog.getLastPosition();
117+
if (currLastPosition.compareTo(PositionFactory.EARLIEST) == 0) {
117118
HAS_ERROR_UPDATER.set(this, true);
118119
return forceComplete();
119120
}
120-
PositionImpl lastPosition = (PositionImpl) result.lastPosition();
121+
Position lastPosition = result.lastPosition();
121122
if (currLastPosition.compareTo(lastPosition) > 0) {
122123
int diffBytes = (int) (fetchMaxBytes - bytesReadable);
123124
if (diffBytes != fetchMaxBytes) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090
import org.apache.bookkeeper.mledger.ManagedLedgerException;
9191
import org.apache.bookkeeper.mledger.Position;
9292
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
93-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
9493
import org.apache.commons.collections4.ListUtils;
9594
import org.apache.commons.lang3.NotImplementedException;
9695
import org.apache.commons.lang3.tuple.Pair;
@@ -841,13 +840,13 @@ private void startSendOperationForThrottling(long msgSize) {
841840
}
842841
disableCnxAutoRead();
843842
autoReadDisabledPublishBufferLimiting = true;
844-
setPausedConnections(pulsarService, 1);
843+
setPausedConnections(pulsarService);
845844
}
846845
}
847846

848847
@VisibleForTesting
849-
public static void setPausedConnections(PulsarService pulsarService, int numConnections) {
850-
pulsarService.getBrokerService().pausedConnections(numConnections);
848+
public static void setPausedConnections(PulsarService pulsarService) {
849+
pulsarService.getBrokerService().recordConnectionPaused();
851850
}
852851

853852
private void completeSendOperationForThrottling(long msgSize) {
@@ -859,13 +858,13 @@ private void completeSendOperationForThrottling(long msgSize) {
859858
}
860859
autoReadDisabledPublishBufferLimiting = false;
861860
enableCnxAutoRead();
862-
resumePausedConnections(pulsarService, 1);
861+
resumePausedConnections(pulsarService);
863862
}
864863
}
865864

866865
@VisibleForTesting
867-
public static void resumePausedConnections(PulsarService pulsarService, int numConnections) {
868-
pulsarService.getBrokerService().resumedConnections(numConnections);
866+
public static void resumePausedConnections(PulsarService pulsarService) {
867+
pulsarService.getBrokerService().recordConnectionResumed();
869868
}
870869

871870
@Override
@@ -1286,15 +1285,15 @@ private CompletableFuture<Pair<Errors, Long>> fetchOffset(String topicName, long
12861285
}
12871286
PersistentTopic perTopic = perTopicOpt.get();
12881287
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) perTopic.getManagedLedger();
1289-
PositionImpl lac = (PositionImpl) managedLedger.getLastConfirmedEntry();
1288+
Position lac = managedLedger.getLastConfirmedEntry();
12901289
if (lac == null) {
12911290
log.error("[{}] Unexpected LastConfirmedEntry for topic {}, managed ledger: {}",
12921291
ctx, perTopic.getName(), managedLedger.getName());
12931292
partitionData.complete(Pair.of(Errors.UNKNOWN_SERVER_ERROR, -1L));
12941293
return;
12951294
}
12961295
if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
1297-
PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
1296+
Position position = managedLedger.getLastConfirmedEntry();
12981297
if (log.isDebugEnabled()) {
12991298
log.debug("Get latest position for topic {} time {}. result: {}",
13001299
perTopic.getName(), timestamp, position);
@@ -1303,7 +1302,7 @@ private CompletableFuture<Pair<Errors, Long>> fetchOffset(String topicName, long
13031302
partitionData.complete(Pair.of(Errors.NONE, offset));
13041303

13051304
} else if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
1306-
PositionImpl position = OffsetFinder.getFirstValidPosition(managedLedger);
1305+
Position position = OffsetFinder.getFirstValidPosition(managedLedger);
13071306
if (position == null) {
13081307
log.error("[{}] Failed to find first valid position for topic {}", ctx, perTopic.getName());
13091308
partitionData.complete(Pair.of(Errors.UNKNOWN_SERVER_ERROR, -1L));
@@ -1347,7 +1346,7 @@ private CompletableFuture<Pair<Errors, Long>> fetchOffset(String topicName, long
13471346

13481347
private void fetchOffsetByTimestamp(CompletableFuture<Pair<Errors, Long>> partitionData,
13491348
ManagedLedgerImpl managedLedger,
1350-
PositionImpl lac,
1349+
Position lac,
13511350
long timestamp,
13521351
String topic) {
13531352
// find with real wanted timestamp
@@ -1356,7 +1355,7 @@ private void fetchOffsetByTimestamp(CompletableFuture<Pair<Errors, Long>> partit
13561355
offsetFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback() {
13571356
@Override
13581357
public void findEntryComplete(Position position, Object ctx) {
1359-
PositionImpl finalPosition;
1358+
Position finalPosition;
13601359
if (position == null) {
13611360
finalPosition = OffsetFinder.getFirstValidPosition(managedLedger);
13621361
if (finalPosition == null) {
@@ -1366,7 +1365,7 @@ public void findEntryComplete(Position position, Object ctx) {
13661365
return;
13671366
}
13681367
} else {
1369-
finalPosition = (PositionImpl) position;
1368+
finalPosition = position;
13701369
}
13711370

13721371

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.apache.bookkeeper.mledger.ManagedLedger;
3434
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3535
import org.apache.bookkeeper.mledger.Position;
36+
import org.apache.bookkeeper.mledger.PositionFactory;
3637
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
37-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
3838
import org.apache.commons.codec.digest.DigestUtils;
3939
import org.apache.commons.lang3.tuple.Pair;
4040
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -260,7 +260,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
260260
+ "-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
261261

262262
// get previous position, because NonDurableCursor is read from next position.
263-
final PositionImpl previous = ((ManagedLedgerImpl) ledger).getPreviousPosition((PositionImpl) position);
263+
final Position previous = ((ManagedLedgerImpl) ledger).getPreviousPosition(position);
264264
if (log.isDebugEnabled()) {
265265
log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}",
266266
description, cursorName, offset, position, previous);
@@ -309,7 +309,7 @@ public CompletableFuture<Position> findPositionForIndex(Long offset) {
309309
final ManagedLedger ledger = topic.getManagedLedger();
310310

311311
return MessageMetadataUtils.asyncFindPosition(ledger, offset, skipMessagesWithoutIndex).thenApply(position -> {
312-
PositionImpl lastConfirmedEntry = (PositionImpl) ledger.getLastConfirmedEntry();
312+
Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
313313
log.info("Found position {} for offset {}, lastConfirmedEntry {}", position, offset, lastConfirmedEntry);
314314
if (position == null) {
315315
return null;
@@ -318,7 +318,7 @@ public CompletableFuture<Position> findPositionForIndex(Long offset) {
318318
&& Objects.equals(lastConfirmedEntry.getNext(), position)) {
319319
log.debug("Found position {} for offset {}, LAC {} -> RETURN LATEST",
320320
position, offset, lastConfirmedEntry);
321-
return PositionImpl.LATEST;
321+
return PositionFactory.LATEST;
322322
} else {
323323
return position;
324324
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import lombok.Setter;
5151
import lombok.experimental.Accessors;
5252
import lombok.extern.slf4j.Slf4j;
53-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
53+
import org.apache.bookkeeper.mledger.Position;
5454
import org.apache.commons.lang3.StringUtils;
5555
import org.apache.kafka.common.TopicPartition;
5656
import org.apache.pulsar.common.schema.KeyValue;
@@ -167,7 +167,7 @@ public static class GroupSummary {
167167
*/
168168
@Data
169169
static class CommitRecordMetadataAndOffset {
170-
private final Optional<PositionImpl> appendedPosition;
170+
private final Optional<Position> appendedPosition;
171171
private final OffsetAndMetadata offsetAndMetadata;
172172

173173
public boolean olderThan(CommitRecordMetadataAndOffset that) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@
6262
import lombok.extern.slf4j.Slf4j;
6363
import org.apache.bookkeeper.common.concurrent.FutureUtils;
6464
import org.apache.bookkeeper.common.util.MathUtils;
65-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
65+
import org.apache.bookkeeper.mledger.Position;
66+
import org.apache.bookkeeper.mledger.PositionFactory;
6667
import org.apache.commons.lang3.tuple.Triple;
6768
import org.apache.kafka.common.TopicPartition;
6869
import org.apache.kafka.common.protocol.Errors;
@@ -484,7 +485,7 @@ public CompletableFuture<Map<TopicPartition, Errors>> storeOffsets(
484485
filteredOffsetMetadata.forEach((tp, offsetAndMetadata) -> {
485486
CommitRecordMetadataAndOffset commitRecordMetadataAndOffset =
486487
new CommitRecordMetadataAndOffset(
487-
Optional.of(new PositionImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId())),
488+
Optional.of(PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId())),
488489
offsetAndMetadata
489490
);
490491
if (isTxnOffsetCommit) {
@@ -715,7 +716,7 @@ private void processOffsetMessage(Message<ByteBuffer> msg,
715716
pendingOffsets.remove(batch.producerId());
716717
}
717718
} else {
718-
Optional<PositionImpl> batchBaseOffset = Optional.empty();
719+
Optional<Position> batchBaseOffset = Optional.empty();
719720
for (Record record : batch) {
720721
if (!record.hasKey()) {
721722
// It throws an exception here in Kafka. However, the exception will be caught and processed
@@ -724,7 +725,7 @@ private void processOffsetMessage(Message<ByteBuffer> msg,
724725
continue;
725726
}
726727
if (batchBaseOffset.isEmpty()) {
727-
batchBaseOffset = Optional.of(new PositionImpl(0, record.offset()));
728+
batchBaseOffset = Optional.of(PositionFactory.create(0, record.offset()));
728729
}
729730
BaseKey bk = readMessageKey(record.key());
730731
if (log.isTraceEnabled()) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.netty.buffer.ByteBuf;
1919
import io.netty.channel.ChannelHandlerContext;
2020
import io.netty.util.Recycler;
21+
import io.netty.util.concurrent.FastThreadLocal;
2122
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
2223
import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager;
2324
import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService;
@@ -64,9 +65,9 @@
6465
import org.apache.bookkeeper.mledger.ManagedLedger;
6566
import org.apache.bookkeeper.mledger.ManagedLedgerException;
6667
import org.apache.bookkeeper.mledger.Position;
68+
import org.apache.bookkeeper.mledger.PositionFactory;
6769
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
6870
import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
69-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
7071
import org.apache.bookkeeper.stats.OpStatsLogger;
7172
import org.apache.commons.compress.utils.Lists;
7273
import org.apache.commons.lang3.tuple.Pair;
@@ -92,6 +93,7 @@
9293
import org.apache.pulsar.common.naming.TopicName;
9394
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
9495
import org.apache.pulsar.common.util.FutureUtil;
96+
import org.jetbrains.annotations.NotNull;
9597

9698
/**
9799
* Analyze result.
@@ -368,7 +370,7 @@ public static ReadRecordsResult empty(long highWatermark,
368370
}
369371

370372
public static ReadRecordsResult error(Errors errors, PartitionLog partitionLog) {
371-
return ReadRecordsResult.error(PositionImpl.EARLIEST, errors, partitionLog);
373+
return ReadRecordsResult.error(PositionFactory.EARLIEST, errors, partitionLog);
372374
}
373375

374376
public static ReadRecordsResult error(Position position, Errors errors, PartitionLog partitionLog) {
@@ -767,7 +769,7 @@ private static byte getCompatibleMagic(short apiVersion) {
767769

768770
private Position getLastPositionFromEntries(List<Entry> entries) {
769771
if (entries == null || entries.isEmpty()) {
770-
return PositionImpl.EARLIEST;
772+
return PositionFactory.EARLIEST;
771773
}
772774
return entries.get(entries.size() - 1).getPosition();
773775
}
@@ -823,7 +825,7 @@ private CompletableFuture<List<Entry>> readEntries(final ManagedCursor cursor,
823825
public void readEntriesComplete(List<Entry> entries, Object ctx) {
824826
if (!entries.isEmpty()) {
825827
final Entry lastEntry = entries.get(entries.size() - 1);
826-
final PositionImpl currentPosition = PositionImpl.get(
828+
final Position currentPosition = PositionFactory.create(
827829
lastEntry.getLedgerId(), lastEntry.getEntryId());
828830

829831
try {
@@ -868,13 +870,13 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
868870
MathUtils.elapsedNanos(startReadingMessagesNanos), TimeUnit.NANOSECONDS);
869871
readFuture.completeExceptionally(exception);
870872
}
871-
}, null, PositionImpl.LATEST);
873+
}, null, PositionFactory.LATEST);
872874

873875
return readFuture;
874876
}
875877

876878
// commit the offset, so backlog not affect by this cursor.
877-
private static void commitOffset(NonDurableCursorImpl cursor, PositionImpl currentPosition) {
879+
private static void commitOffset(NonDurableCursorImpl cursor, Position currentPosition) {
878880
cursor.asyncMarkDelete(currentPosition, new AsyncCallbacks.MarkDeleteCallback() {
879881
@Override
880882
public void markDeleteComplete(Object ctx) {
@@ -1148,9 +1150,9 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
11481150
}
11491151
log.info("{} numberOfEntries={}", fullPartitionName, numberOfEntries);
11501152
// this is a DUMMY entry with -1
1151-
PositionImpl firstPosition = managedLedger.getFirstPosition();
1153+
Position firstPosition = managedLedger.getFirstPosition();
11521154
// look for the first entry with data
1153-
PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition);
1155+
Position nextValidPosition = managedLedger.getNextValidPosition(firstPosition);
11541156

11551157
fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition);
11561158

@@ -1159,7 +1161,7 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
11591161
}
11601162

11611163
private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture<Long> future,
1162-
ManagedLedgerImpl managedLedger, PositionImpl position) {
1164+
ManagedLedgerImpl managedLedger, Position position) {
11631165
managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
11641166
@Override
11651167
public void readEntryComplete(Entry entry, Object ctx) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.bookkeeper.mledger.ManagedLedgerException;
2727
import org.apache.bookkeeper.mledger.Position;
2828
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
29-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
3029
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
3130
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
3231
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -66,7 +65,7 @@ public static long getPublishTime(final ByteBuf byteBuf) throws MetadataCorrupte
6665
}
6766

6867
public static CompletableFuture<Long> getOffsetOfPosition(ManagedLedgerImpl managedLedger,
69-
PositionImpl position,
68+
Position position,
7069
boolean needCheckMore,
7170
long timestamp,
7271
boolean skipMessagesWithoutIndex) {

0 commit comments

Comments
 (0)