Skip to content

Commit 94afb2e

Browse files
author
xiangying
committed
just stash
1 parent 484c192 commit 94afb2e

File tree

8 files changed

+340
-43
lines changed

8 files changed

+340
-43
lines changed

pcip/pcip-4.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
<!--
2+
RULES
3+
* Never place a link to an external site like Google Doc. The proposal should be in this issue entirely.
4+
* Use a spelling and grammar checker tools if available for you (there are plenty of free ones).
5+
6+
PROPOSAL HEALTH CHECK
7+
I can read the design document and understand the problem statement and what you plan to change *without* resorting to a couple of hours of code reading just to start having a high level understanding of the change.
8+
9+
IMAGES
10+
If you need the diagrams, please create a folder named pcip-XXX under the [pcip/static/img](https://github.com/apache/pulsar-java-contrib/tree/master/pcip/static/img) path and put the images in.
11+
12+
THIS COMMENTS
13+
Please remove them when done.
14+
-->
15+
16+
# PCIP-XXX: Proposal title
17+
18+
# Background knowledge
19+
20+
<!--
21+
Describes all the knowledge you need to know in order to understand all the other sections in this PCIP
22+
23+
* Give a high level explanation on all concepts you will be using throughout this document. For example, if you want to talk about Persistent Subscriptions, explain briefly (1 paragraph) what this is. If you're going to talk about Transaction Buffer, explain briefly what this is.
24+
If you're going to change something specific, then go into more detail about it and how it works.
25+
* Provide links where possible if a person wants to dig deeper into the background information.
26+
27+
DON'T
28+
* Do not include links *instead* explanation. Do provide links for further explanation.
29+
30+
EXAMPLES
31+
* See [PCIP-2](https://github.com/apache/pulsar-java-contrib/pull/6), Background section to get an understanding on how you add the background knowledge needed.
32+
(They also included the motivation there, but ignore it as we place that in Motivation section explicitly).
33+
-->
34+
35+
# Motivation
36+
37+
<!--
38+
Describe the problem this proposal is trying to solve.
39+
40+
* Explain what is the problem you're trying to solve - current situation.
41+
* This section is the "Why" of your proposal.
42+
-->
43+
44+
# Goals
45+
46+
## In Scope
47+
48+
<!--
49+
What this PCIP intend to achieve once It's integrated into Pulsar.
50+
Why does it benefit Pulsar.
51+
-->
52+
53+
## Out of Scope
54+
55+
<!--
56+
Describe what you have decided to keep out of scope, perhaps left for a different PCIP/s.
57+
-->
58+
59+
60+
# High Level Design
61+
62+
<!--
63+
Describe the design of your solution in *high level*.
64+
Describe the solution end to end, from a birds-eye view.
65+
Don't go into implementation details in this section.
66+
67+
I should be able to finish reading from beginning of the PCIP to here (including) and understand the feature and
68+
how you intend to solve it, end to end.
69+
70+
DON'T
71+
* Avoid code snippets, unless it's essential to explain your intent.
72+
-->
73+
74+
# Detailed Design
75+
76+
## Design & Implementation Details
77+
78+
<!--
79+
This is the section where you dive into the details. It can be:
80+
* Concrete class names and their roles and responsibility, including methods.
81+
* Code snippets of existing code.
82+
* Interface names and its methods.
83+
* ...
84+
-->
85+
86+
## Public-facing Changes
87+
88+
<!--
89+
Describe the additions you plan to make for each public facing component.
90+
Remove the sections you are not changing.
91+
Clearly mark any changes which are BREAKING backward compatability.
92+
-->
93+
94+
### Public API
95+
<!--
96+
When adding a new endpoint to the REST API, please make sure to document the following:
97+
98+
* path
99+
* query parameters
100+
* HTTP body parameters, usually as JSON.
101+
* Response codes, and for each what they mean.
102+
For each response code, please include a detailed description of the response body JSON, specifying each field and what it means.
103+
This is the place to document the errors.
104+
-->
105+
106+
### Configuration
107+
108+
### CLI
109+
110+
# Get started
111+
112+
## Quick Start
113+
114+
<!--
115+
Introduce how to use it and teach users how to use it quickly
116+
-->

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
<maven.compiler.release>17</maven.compiler.release>
3636
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3737
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
38-
<pulsar.version>3.3.1</pulsar.version>
38+
<pulsar.version>4.1.0-SNAPSHOT</pulsar.version>
3939
<maven-checkstyle-plugin.version>3.4.0</maven-checkstyle-plugin.version>
4040
<puppycrawl.checkstyle.version>8.45.1</puppycrawl.checkstyle.version>
4141
<spotless-maven-plugin.version>2.43.0</spotless-maven-plugin.version>
@@ -50,6 +50,7 @@
5050
<testcontainers.version>1.20.1</testcontainers.version>
5151
<junit.version>4.13.1</junit.version>
5252
<mockito.version>5.12.0</mockito.version>
53+
<caffeine.version>3.2.0</caffeine.version>
5354
</properties>
5455

5556
<modules>

pulsar-client-common-contrib/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<dependency>
3131
<groupId>org.apache.pulsar</groupId>
3232
<artifactId>pulsar-client-all</artifactId>
33-
<version>${pulsar.version}}</version>
33+
<version>${pulsar.version}</version>
3434
</dependency>
3535
<dependency>
3636
<groupId>com.github.ben-manes.caffeine</groupId>

pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private List<Message<T>> readMessages(Reader<T> reader,
144144

145145
@Override
146146
public void ack(long offset, int partition) throws PulsarClientException {
147-
String partitionTopic = buildPartitionTopic(topic, partition);
147+
String partitionTopic = partition >=0 ? buildPartitionTopic(topic, partition) : topic;
148148
Consumer<T> consumer = consumerMap.get(partitionTopic);
149149
if (consumer == null) {
150150
throw new PulsarClientException("Consumer not found for partition: " + partition);
@@ -159,14 +159,14 @@ public void ack(long offset, int partition) throws PulsarClientException {
159159

160160
@Override
161161
public long searchOffset(int partition, long timestamp) throws PulsarAdminException {
162-
String partitionTopic = buildPartitionTopic(topic, partition);
162+
String partitionTopic = partition >=0 ? buildPartitionTopic(topic, partition) : topic;
163163
MessageIdAdv messageId = (MessageIdAdv) pulsarAdmin.topics().getMessageIdByTimestamp(partitionTopic, timestamp);
164164
return extractMessageIndex(partitionTopic, messageId);
165165
}
166166

167167
@Override
168168
public long getConsumeStats(int partition) throws PulsarAdminException {
169-
String partitionTopic = buildPartitionTopic(topic, partition);
169+
String partitionTopic = partition >=0 ? buildPartitionTopic(topic, partition) : topic;
170170
String messageIdStr = pulsarAdmin.topics()
171171
.getPartitionedInternalStats(topic)
172172
.partitions.get(partitionTopic)
@@ -199,7 +199,6 @@ private void closeResources() {
199199
}
200200
}
201201

202-
// Validation helpers
203202
private void validatePullParameters(int maxMessages, int maxBytes) {
204203
if (maxMessages <= 0) {
205204
throw new IllegalArgumentException("maxMessages must be positive");

pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Objects;
88
import java.util.concurrent.ConcurrentHashMap;
99
import org.apache.pulsar.client.admin.PulsarAdmin;
10+
import org.apache.pulsar.client.admin.PulsarAdminException;
1011
import org.apache.pulsar.client.api.MessageId;
1112

1213
/**
@@ -48,15 +49,8 @@ private LoadingCache<Long, MessageId> createCache(String partitionTopic) {
4849
/**
4950
* TODO: Implement after https://github.com/apache/pulsar/pull/24220 is merged
5051
*/
51-
private MessageId loadMessageId(String partitionTopic, Long offset) {
52-
// Placeholder implementation
53-
// try {
54-
// return pulsarAdmin.topics().getMessageIDByOffsetAndPartitionID(partitionTopic, offset);
55-
// } catch (PulsarAdminException e) {
56-
// throw new CompletionException("Failed to load message ID for "
57-
// + partitionTopic + " offset " + offset, e);
58-
// }
59-
return null;
52+
private MessageId loadMessageId(String partitionTopic, Long offset) throws PulsarAdminException {
53+
return pulsarAdmin.topics().getMessageIdByIndex(partitionTopic, offset);
6054
}
6155

6256
public MessageId getMessageIdByOffset(String partitionTopic, long offset) {

pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,8 @@ public Reader<T> getReader(String partitionTopic, long offset) {
116116

117117
// Allows users to proactively return the reader
118118
public void releaseReader(String partitionTopic, long offset, Reader<T> reader) {
119-
// Assumes the partitionTopic entry exists
120119
readerCacheMap.computeIfPresent(partitionTopic, (pt, cache) -> {
121-
if (!reader.isConnected()) {
122-
closeReaderSilently(reader);
123-
} else {
120+
if (reader.isConnected()) {
124121
cache.put(offset, reader);
125122
}
126123
return cache;

pulsar-client-common-contrib/src/test/java/DemoTest.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)