Skip to content

Commit 1c6ab82

Browse files
authored
Compatible mater code (#1093)
1 parent 567ab94 commit 1c6ab82

File tree

3 files changed

+8
-6
lines changed

3 files changed

+8
-6
lines changed

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
package io.streamnative.pulsar.handlers.amqp;
1515

1616
import io.netty.channel.ChannelHandlerContext;
17+
import java.util.Optional;
1718
import org.apache.pulsar.broker.PulsarService;
19+
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
1820
import org.apache.pulsar.broker.service.Consumer;
1921
import org.apache.pulsar.broker.service.ServerCnx;
2022

@@ -30,7 +32,7 @@ public AmqpPulsarServerCnx(PulsarService pulsar, ChannelHandlerContext ctx) {
3032
}
3133

3234
@Override
33-
public void closeConsumer(Consumer consumer) {
35+
public void closeConsumer(Consumer consumer, Optional<BrokerLookupData> assignedBrokerLookupData) {
3436
// avoid close the connection when closing the consumer
3537
}
3638
}

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.streamnative.pulsar.handlers.amqp.AmqpProtocolHandler;
1717
import java.io.Closeable;
1818
import java.io.IOException;
19-
import java.net.InetSocketAddress;
2019
import java.net.URI;
2120
import java.net.URISyntaxException;
2221
import java.util.List;
@@ -27,6 +26,7 @@
2726
import org.apache.commons.lang3.tuple.Pair;
2827
import org.apache.pulsar.broker.PulsarService;
2928
import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
29+
import org.apache.pulsar.client.impl.LookupTopicResult;
3030
import org.apache.pulsar.client.impl.PulsarClientImpl;
3131
import org.apache.pulsar.common.naming.TopicName;
3232
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
@@ -53,19 +53,19 @@ public CompletableFuture<Pair<String, Integer>> findBroker(TopicName topicName,
5353
CompletableFuture<Pair<String, Integer>> lookupResult = new CompletableFuture<>();
5454

5555
// lookup the broker for the given topic
56-
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> lookup =
56+
CompletableFuture<LookupTopicResult> lookup =
5757
pulsarClient.getLookup().getBroker(topicName);
5858
lookup.whenComplete((result, throwable) -> {
5959
if (throwable != null) {
6060
lookupResult.completeExceptionally(throwable);
6161
return;
6262
}
63-
if (result == null || result.getLeft() == null) {
63+
if (result == null || result.getLogicalAddress() == null) {
6464
lookupResult.completeExceptionally(new ProxyException(
6565
"Unable to resolve the broker for the topic: " + topicName));
6666
return;
6767
}
68-
String hostAndPort = result.getLeft().getHostName() + ":" + result.getLeft().getPort();
68+
String hostAndPort = result.getLogicalAddress().getHostName() + ":" + result.getLogicalAddress().getPort();
6969

7070
// fetch the protocol handler data
7171
List<LoadManagerReport> brokers = metadataStoreCacheLoader.getAvailableBrokers();

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
<project.compiler.release>${maven.compiler.target}</project.compiler.release>
4141

4242
<!-- dependencies -->
43-
<pulsar.version>3.0.0.1-SNAPSHOT</pulsar.version>
43+
<pulsar.version>3.2.0-SNAPSHOT</pulsar.version>
4444
<qpid-protocol-plugin.version>8.0.0</qpid-protocol-plugin.version>
4545
<rabbitmq.version>5.8.0</rabbitmq.version>
4646

0 commit comments

Comments
 (0)