Skip to content

Commit 025b80d

Browse files
author
xiangying
committed
适配小红书的Pulsar客户端版本
1 parent 2a988fa commit 025b80d

File tree

4 files changed

+32
-18
lines changed

4 files changed

+32
-18
lines changed

pom.xml

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,27 @@
2929
<packaging>pom</packaging>
3030
<name>Pulsar Java Contrib</name>
3131

32+
<distributionManagement>
33+
<snapshotRepository>
34+
<id>snapshots</id>
35+
<name>maven snapshot repository</name>
36+
<url>https://artifactory.devops.xiaohongshu.com/artifactory/maven-snapshots/</url>
37+
</snapshotRepository>
38+
<repository>
39+
<id>releases</id>
40+
<name>maven releases repository</name>
41+
<url>https://artifactory.devops.xiaohongshu.com/artifactory/maven-releases/</url>
42+
</repository>
43+
</distributionManagement>
44+
3245
<properties>
3346
<lombok.version>1.18.32</lombok.version>
3447
<slf4j.version>2.0.13</slf4j.version>
3548
<maven.compiler.release>17</maven.compiler.release>
3649
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3750
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
38-
<pulsar.version>3.3.1</pulsar.version>
39-
<maven-checkstyle-plugin.version>3.4.0</maven-checkstyle-plugin.version>
51+
<pulsar.version>3.0.5-r0.0.5</pulsar.version>
52+
<maven-checkstyle-plugin.version>3.3.0</maven-checkstyle-plugin.version>
4053
<puppycrawl.checkstyle.version>8.45.1</puppycrawl.checkstyle.version>
4154
<spotless-maven-plugin.version>2.43.0</spotless-maven-plugin.version>
4255
<maven-compiler-plugin>3.13.0</maven-compiler-plugin>
@@ -75,13 +88,6 @@
7588
<type>pom</type>
7689
<scope>import</scope>
7790
</dependency>
78-
<dependency>
79-
<groupId>org.apache.pulsar</groupId>
80-
<artifactId>pulsar-bom</artifactId>
81-
<version>${pulsar.version}</version>
82-
<type>pom</type>
83-
<scope>import</scope>
84-
</dependency>
8591
<dependency>
8692
<groupId>org.apache.commons</groupId>
8793
<artifactId>commons-pool2</artifactId>

pulsar-client-common-contrib/pom.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
<version>1.0.0-SNAPSHOT</version>
2424
</parent>
2525
<inceptionYear>2024</inceptionYear>
26+
<properties>
27+
<maven.compiler.release>8</maven.compiler.release>
28+
</properties>
2629

2730
<artifactId>pulsar-client-common-contrib</artifactId>
2831

2932
<dependencies>
3033
<dependency>
3134
<groupId>org.apache.pulsar</groupId>
32-
<artifactId>pulsar-client-all</artifactId>
35+
<artifactId>pulsar-client</artifactId>
3336
<version>${pulsar.version}</version>
3437
</dependency>
3538
<dependency>

pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,23 @@ private static long extractMessageIndex(String topic, MessageIdAdv messageId, St
7979
}
8080

8181
private static long getMessageIndex(String topic, MessageIdAdv messageId, String brokerCluster,
82-
PulsarAdmin pulsarAdmin) throws PulsarAdminException {
83-
List<Message<byte[]>> messages =
84-
pulsarAdmin.topics().getMessagesById(topic, messageId.getLedgerId(), messageId.getEntryId());
82+
PulsarAdmin pulsarAdmin) throws PulsarAdminException {
83+
Message<byte[]> message =
84+
pulsarAdmin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
8585

86-
if (messages == null || messages.isEmpty()) {
87-
throw new PulsarAdminException("No messages found for " + messageId+ " in topic " + topic);
86+
if (message == null) {
87+
throw new PulsarAdminException("No messages found for " + messageId + " in topic " + topic);
8888
}
8989

90-
return messages.stream().map(Message::getIndex).filter(Optional::isPresent).mapToLong(opt -> {
91-
long index = opt.get();
90+
Optional<Long> indexOptional = message.getIndex();
91+
if (indexOptional.isPresent()) {
92+
long index = indexOptional.get();
9293
OffsetToMessageIdCacheProvider.getOrCreateCache(pulsarAdmin, brokerCluster)
9394
.putMessageIdByOffset(topic, index, messageId);
9495
return index;
95-
}).findFirst().orElseThrow(() -> new PulsarAdminException("Missing message index in " + messageId));
96+
} else {
97+
throw new PulsarAdminException("Message index not found for " + messageId + " in topic " + topic);
98+
}
9699
}
97100

98101
private static long processMessageId(String topic, MessageIdAdv messageId, String brokerCluster,

pulsar-rpc-contrib/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
<dependency>
3232
<groupId>org.apache.pulsar</groupId>
3333
<artifactId>pulsar-client-admin</artifactId>
34+
<version>${pulsar.version}</version>
3435
<scope>test</scope>
3536
</dependency>
3637
<dependency>
3738
<groupId>org.apache.pulsar</groupId>
3839
<artifactId>pulsar-client</artifactId>
40+
<version>${pulsar.version}</version>
3941
</dependency>
4042
<dependency>
4143
<groupId>org.apache.commons</groupId>

0 commit comments

Comments
 (0)