Skip to content

Commit 82ef279

Browse files
authored
replace ConcurrentOpenHashMap with ConcurrentHashmap (#1367)
1 parent c5a7b05 commit 82ef279

File tree

4 files changed

+10
-9
lines changed

4 files changed

+10
-9
lines changed

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
import static io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil.isDefaultExchange;
2020

2121
import io.streamnative.pulsar.handlers.amqp.common.exception.AoPException;
22-
import java.util.List;
2322
import java.util.Map;
23+
import java.util.Set;
2424
import java.util.concurrent.CompletableFuture;
2525
import lombok.extern.slf4j.Slf4j;
26+
import org.apache.commons.collections4.CollectionUtils;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2829
import org.apache.pulsar.common.naming.NamespaceName;
@@ -168,8 +169,8 @@ public CompletableFuture<Integer> exchangeBound(NamespaceName namespaceName, Str
168169
if (null == amqpExchange) {
169170
replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
170171
} else {
171-
List<String> subs = amqpExchange.getTopic().getSubscriptions().keys();
172-
if (null == subs || subs.isEmpty()) {
172+
Set<String> subs = amqpExchange.getTopic().getSubscriptions().keySet();
173+
if (CollectionUtils.isEmpty(subs)) {
173174
replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
174175
} else {
175176
replyCode = ExchangeBoundOkBody.OK;

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.Set;
3535
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ConcurrentHashMap;
3637
import java.util.concurrent.ExecutorService;
3738
import lombok.AllArgsConstructor;
3839
import lombok.Data;
@@ -54,7 +55,6 @@
5455
import org.apache.pulsar.common.naming.TopicDomain;
5556
import org.apache.pulsar.common.naming.TopicName;
5657
import org.apache.pulsar.common.util.FutureUtil;
57-
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
5858

5959
/**
6060
* Persistent Exchange.
@@ -72,7 +72,7 @@ public class PersistentExchange extends AbstractAmqpExchange {
7272
private static final String BINDINGS = "BINDINGS";
7373

7474
private PersistentTopic persistentTopic;
75-
private final ConcurrentOpenHashMap<String, CompletableFuture<ManagedCursor>> cursors;
75+
private final ConcurrentHashMap<String, CompletableFuture<ManagedCursor>> cursors;
7676
private AmqpExchangeReplicator messageReplicator;
7777
private AmqpEntryWriter amqpEntryWriter;
7878

@@ -97,7 +97,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis
9797
super(exchangeName, type, Sets.newConcurrentHashSet(), durable, autoDelete, internal, arguments);
9898
this.persistentTopic = persistentTopic;
9999
topicNameValidate();
100-
cursors = new ConcurrentOpenHashMap<>(16, 1);
100+
cursors = new ConcurrentHashMap<>(16, 1);
101101
for (ManagedCursor cursor : persistentTopic.getManagedLedger().getCursors()) {
102102
cursors.put(cursor.getName(), CompletableFuture.completedFuture(cursor));
103103
log.info("PersistentExchange {} recover cursor {}", persistentTopic.getName(), cursor.toString());

amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.Optional;
3535
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ConcurrentHashMap;
3637
import java.util.concurrent.TimeUnit;
3738
import lombok.extern.log4j.Log4j2;
3839
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -59,7 +60,6 @@
5960
import org.apache.pulsar.common.naming.TopicName;
6061
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
6162
import org.apache.pulsar.common.policies.data.Policies;
62-
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
6363
import org.apache.qpid.server.protocol.ProtocolVersion;
6464
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
6565
import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
@@ -189,7 +189,7 @@ private void initMockAmqpTopicManager(){
189189
Mockito.any(), Mockito.anyBoolean(), Mockito.any())).thenReturn(subFuture);
190190
when(subscription.getDispatcher()).thenReturn(mock(Dispatcher.class));
191191
when(subscription.addConsumer(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null));
192-
when(persistentTopic.getSubscriptions()).thenReturn(new ConcurrentOpenHashMap<>());
192+
when(persistentTopic.getSubscriptions()).thenReturn(new ConcurrentHashMap<>());
193193
ManagedLedger managedLedger = mock(ManagedLedgerImpl.class);
194194
when(managedLedger.getCursors()).thenReturn(new ManagedCursorContainer());
195195
when(persistentTopic.getManagedLedger()).thenReturn(managedLedger);

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
<project.compiler.release>${maven.compiler.target}</project.compiler.release>
4141

4242
<!-- dependencies -->
43-
<pulsar.version>4.0.0-ursa-4-SNAPSHOT</pulsar.version>
43+
<pulsar.version>4.0.0-ursa-10-SNAPSHOT</pulsar.version>
4444
<qpid-protocol-plugin.version>8.0.0</qpid-protocol-plugin.version>
4545
<rabbitmq.version>5.8.0</rabbitmq.version>
4646

0 commit comments

Comments
 (0)