Skip to content

Commit 892df92

Browse files
liangyepianzhouxiangying
andauthored
PCIP-3 Pulsar Extended Transaction API Enhancement Proposal (apache#11)
### Motivation The motivation behind this proposal is to address the issue of duplicate message consumption when using Fail-over subscription mode with cumulative Ack in Pulsar. Despite the use of transactions, achieving exactly-once semantics has been problematic due to the inherent behavior of cumulative Ack. This issue is particularly challenging to resolve within the constraints of the Pulsar main repository for several reasons: - **Complexity and Longevity**: Fixing this issue without altering the Client API is complex and time-consuming. - **Confusing API Usage**: The existing transaction APIs in Pulsar are often confusing, with methods like `abort()` and `commit()` appearing synchronous but functioning asynchronously. This leads to incorrect usage patterns, such as needing to call `abort().get()` and `commit().get()` for proper operation. - **API Modification Challenges**: Modifying the Client API in the Pulsar main repository is difficult due to the uncertainty of whether a new solution will be perfect and the lengthy API update cycle. ### Modifications The proposed solution involves designing a new transaction API in the contributor repository that wraps the original Transaction API. This approach offers several benefits: - **Concise Solution**: By wrapping the original API, the problem can be solved in a straightforward and concise manner. - **Best Practice Reference**: This wrapping serves as a best practice example that does not disrupt existing users' usage while providing a reference solution for those encountering similar issues. - **Enhanced Usability**: The new API aims to clarify and optimize the transaction methods, making them more intuitive and less prone to misuse, thus improving the overall usability and clarity of transactional operations in Pulsar. Co-authored-by: xiangying <[email protected]>
1 parent fd3cdda commit 892df92

File tree

12 files changed

+1023
-1
lines changed

12 files changed

+1023
-1
lines changed

pcip/pcip-3.md

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
# PCIP-3:Pulsar Extended Transaction API Enhancement Proposal
2+
# Background knowledge
3+
When users consume messages using the Fail-over subscription mode and confirm messages using cumulative Ack, duplicate consumption may occur. In this case, even if users use Transaction, they cannot achieve Just-Once.
4+
As shown in the figure below, in failover mode, the two consumer1 and consumer2 started simultaneously frequently undergo two disconnection switches.
5+
Finally, Consumer1 consumed M1, M2, M4, M5; Consumer2 consumed M1, M2, M3. M1 and M2 were consumed twice.
6+
7+
![cumulative-ack-issue.png](static/img/pcip-3/cumulative-ack-issue.png)
8+
9+
# Goals
10+
Solve the problem of cumulative ack consumption duplication by designing a new transaction API.
11+
Why not fix this issue in the Pulsar main repository?
12+
- The complexity of fixing problems without modifying the Client API is high, and the problem-solving cycle is long.
13+
- There are indeed many confusing usage postures for existing transaction APIs
14+
- For example, the abort () and commit () methods may seem synchronous in name, but they are actually asynchronous. In actual use, you need to use abort ().get () and commit ().get ().
15+
- Modifying the Client API in the Pulsar main repository is a difficult task because we cannot determine whether the new solution is necessarily perfect, and the time cycle for updating the API is long.
16+
The benefit of solving this problem in the contributor repository is:
17+
- By wrapping the original Transaction API, this problem can be solved in a concise way. This wrapping can be seen as a best practice that does not affect the use of existing users, while providing a reference solution for users who encounter similar problems.
18+
# High Level Design
19+
Design a new API to place the context of message sending and consumption within Transaction, which not only solves the problem of repeated consumption, but also retains sufficient scalability for possible optimization in the future.
20+
- Solve the problem of repeated consumption - use the function of individual ack messagelist to batch ack messages instead of the original cumulative ack.
21+
- Retained sufficient scalability - sending messages requires using Transaction to construct messages, and consumed transaction messages are recorded in Transaction. Later, more optimizations can be added using this information without changing the interface.
22+
23+
## Public-facing Changes
24+
### Public API
25+
The org.apache.pulsar.txn.api.Transaction interface is an optimized and extended transaction interface designed to enhance usability and clarity in the Pulsar contributors' library. It addresses issues with CumulativeAck and transactions not preventing repeated message consumption, and it refines ambiguous methods for better clarity. Key features include:
26+
- Message Recording: Records messages in a transaction without automatic acknowledgment.
27+
- Asynchronous and Synchronous Acknowledgment: Supports both asynchronous and synchronous acknowledgment of all received messages for specific consumers or across all consumers.
28+
- Transactional Message Builder: Creates a new transactional message builder for a given producer to construct and send messages within a transaction context.
29+
- Committing and Aborting Transactions: Offers both asynchronous and synchronous methods to commit or abort transactions, ensuring the effectiveness of message sends and acknowledgments.
30+
- Transaction ID and State Retrieval: Provides methods to retrieve the unique transaction ID and its current state to determine the transaction's lifecycle phase.
31+
```java
32+
/**
33+
* Interface representing an optimized and extended transaction interface in the Pulsar
34+
* contributors' library.
35+
*
36+
* <p>This interface provides enhancements and extensions to the base transaction interface in
37+
* Pulsar. It specifically addresses the issue where using CumulativeAck with transactions could not
38+
* prevent message consumed repeated. Additionally, it clarifies and optimizes ambiguous methods for
39+
* better usability and clarity.
40+
*/
41+
public interface Transaction {
42+
43+
/**
44+
* Records a message in the transaction.
45+
*
46+
* <p>This method is used to include a message in the current transaction. The message will not be
47+
* automatically acknowledged when the transaction is committed. Instead, it must be explicitly
48+
* acknowledged by calling one of the ack methods.
49+
*
50+
* @param messageId the ID of the message to record
51+
* @param consumer the consumer that received the message
52+
*/
53+
void recordMsg(MessageId messageId, Consumer<?> consumer);
54+
55+
/**
56+
* Asynchronously acknowledges all received messages for a specific consumer in the transaction.
57+
*
58+
* <p>This method is used to acknowledge all messages that have been recorded for the specified
59+
* consumer in the transaction. The acknowledgment is asynchronous, and the future can be used to
60+
* determine when the operation is complete.
61+
*
62+
* @param consumer the consumer that received the messages
63+
* @return a CompletableFuture that will be completed when the acknowledgment is complete
64+
*/
65+
CompletableFuture<Void> ackAllReceivedMsgsAsync(Consumer<?> consumer);
66+
67+
/**
68+
* Acknowledges all received messages for a specific consumer in the transaction.
69+
*
70+
* <p>This method is a synchronous version of {@link #ackAllReceivedMsgsAsync(Consumer)}. It will
71+
* block until the acknowledgment is complete.
72+
*
73+
* @param consumer the consumer that received the messages
74+
* @throws ExecutionException if the acknowledgment fails
75+
* @throws InterruptedException if the thread is interrupted while waiting for the acknowledgment
76+
* to complete
77+
*/
78+
void ackAllReceivedMsgs(Consumer<?> consumer) throws ExecutionException, InterruptedException;
79+
80+
/**
81+
* Acknowledges all received messages in the transaction.
82+
*
83+
* <p>This method is a convenience method that acknowledges all messages across all consumers. It
84+
* will block until the acknowledgment is complete.
85+
*
86+
* @throws ExecutionException if the acknowledgment fails
87+
* @throws InterruptedException if the thread is interrupted while waiting for the acknowledgment
88+
* to complete
89+
*/
90+
void ackAllReceivedMsgs() throws ExecutionException, InterruptedException;
91+
92+
/**
93+
* Asynchronously acknowledges all received messages in the transaction.
94+
*
95+
* <p>This method is a convenience method that acknowledges all messages across all consumers. The
96+
* acknowledgment is asynchronous, and the future can be used to determine when the operation is
97+
* complete.
98+
*
99+
* @return a CompletableFuture that will be completed when the acknowledgment is complete
100+
*/
101+
CompletableFuture<Void> ackAllReceivedMsgsAsync();
102+
103+
/**
104+
* Creates a new transactional message builder for the given producer.
105+
*
106+
* <p>This method returns a {@link TypedMessageBuilder} instance that is bound to the specified
107+
* producer and transaction. The returned message builder can be used to construct and send
108+
* messages within the context of a transaction.
109+
*
110+
* @param producer the producer instance used to send messages
111+
* @param <T> the type of messages produced by the producer
112+
* @return a TypedMessageBuilder instance for building transactional messages
113+
*/
114+
<T> TypedMessageBuilder<T> newTransactionMessage(Producer<T> producer);
115+
116+
/**
117+
* Asynchronously commits the transaction.
118+
*
119+
* <p>This method is used to commit the transaction, making all sent messages and acknowledgments
120+
* effective. When the transaction is committed, consumers receive the transaction messages and
121+
* the pending-ack state becomes ack state. The commit is asynchronous, and the future can be used
122+
* to determine when the operation is complete.
123+
*
124+
* @return a CompletableFuture that will be completed when the commit is complete
125+
*/
126+
CompletableFuture<Void> commitAsync();
127+
128+
/**
129+
* Asynchronously aborts the transaction.
130+
*
131+
* <p>This method is used to abort the transaction, discarding all send messages and
132+
* acknowledgments. The abort is asynchronous, and the future can be used to determine when the
133+
* operation is complete.
134+
*
135+
* @return a CompletableFuture that will be completed when the abort is complete
136+
*/
137+
CompletableFuture<Void> abortAsync();
138+
139+
/**
140+
* Commits the transaction.
141+
*
142+
* <p>This method is a synchronous version of {@link #commitAsync()}. It will block until the
143+
* commit is complete.
144+
*
145+
* @throws ExecutionException if the commit fails
146+
* @throws InterruptedException if the thread is interrupted while waiting for the commit to
147+
* complete
148+
*/
149+
void commit() throws ExecutionException, InterruptedException;
150+
151+
/**
152+
* Aborts the transaction.
153+
*
154+
* <p>This method is a synchronous version of {@link #abortAsync()}. It will block until the abort
155+
* is complete.
156+
*
157+
* @throws ExecutionException if the abort fails
158+
* @throws InterruptedException if the thread is interrupted while waiting for the abort to
159+
* complete
160+
*/
161+
void abort() throws ExecutionException, InterruptedException;
162+
163+
/**
164+
* Gets the transaction ID.
165+
*
166+
* <p>This method returns the unique identifier for the transaction.
167+
*
168+
* @return the transaction ID
169+
*/
170+
TxnID getTxnID();
171+
172+
/**
173+
* Gets the current state of the transaction.
174+
*
175+
* <p>This method returns the current state of the transaction, which can be used to determine if
176+
* the transaction is open, committed, aborted, error or timeout.
177+
*
178+
* @return the current state of the transaction
179+
*/
180+
org.apache.pulsar.client.api.transaction.Transaction.State getState();
181+
}
182+
```
183+
184+
# Get started
185+
## Quick Start
186+
```java
187+
public void transactionDemo() throws Exception {
188+
String pubTopic = "persistent://public/default/my-pub-topic";
189+
String subTopic = "persistent://public/default/my-sub-topic";
190+
String subscription = "my-subscription";
191+
192+
// Create a Pulsar client instance
193+
PulsarClient client = SingletonPulsarContainer.createPulsarClient();
194+
195+
// Create a Transaction object
196+
// Use TransactionFactory to create a transaction object with a timeout of 5 seconds
197+
Transaction transaction =
198+
TransactionFactory.createTransaction(client, 5, TimeUnit.SECONDS).get();
199+
200+
// Create producers and a consumer
201+
// Create two producers to send messages to different topics
202+
Producer<String> producerToPubTopic = client.newProducer(Schema.STRING).topic(pubTopic).create();
203+
Producer<String> producerToSubTopic = client.newProducer(Schema.STRING).topic(subTopic).create();
204+
205+
// Create a consumer to receive messages from the subTopic
206+
Consumer<String> consumerFromSubTopic = client
207+
.newConsumer(Schema.STRING)
208+
.subscriptionName(subscription)
209+
.topic(subTopic)
210+
.subscribe();
211+
212+
// Send a message to the Sub Topic
213+
producerToSubTopic.send("Hello World");
214+
215+
// Receive a message
216+
Message<String> receivedMessage = consumerFromSubTopic.receive();
217+
MessageId receivedMessageId = receivedMessage.getMessageId();
218+
219+
// Record the message in the transaction
220+
transaction.recordMsg(receivedMessageId, consumerFromSubTopic);
221+
222+
// Forward the transaction message to the pub topic
223+
// Use the transaction message builder to forward the received message to the pubTopic
224+
transaction.newTransactionMessage(producerToSubTopic).value(receivedMessage.getValue()).send();
225+
226+
// Acknowledge all received messages
227+
// Acknowledge all messages received from the subTopic within the transaction
228+
transaction.ackAllReceivedMsgs(consumerFromSubTopic);
229+
230+
// Commit the transaction
231+
// Commit the transaction to ensure all recorded messages and acknowledgments take effect
232+
transaction.commit();
233+
234+
// Close the consumer, producers, and client to release resources
235+
consumerFromSubTopic.close();
236+
producerToSubTopic.close();
237+
client.close();
238+
}
239+
```
144 KB
Loading

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
<commons-pool.version>2.12.0</commons-pool.version>
4949
<awaitility.version>4.2.2</awaitility.version>
5050
<testcontainers.version>1.20.1</testcontainers.version>
51+
<junit.version>4.13.1</junit.version>
52+
<mockito.version>5.12.0</mockito.version>
5153
</properties>
5254

5355
<modules>

pulsar-transaction-contrib/pom.xml

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,45 @@
2121
<artifactId>pulsar-java-contrib</artifactId>
2222
<version>1.0.0-SNAPSHOT</version>
2323
</parent>
24-
<inceptionYear>2024</inceptionYear>
2524

2625
<artifactId>pulsar-transaction-contrib</artifactId>
2726

27+
<dependencies>
28+
<dependency>
29+
<groupId>junit</groupId>
30+
<artifactId>junit</artifactId>
31+
<version>${junit.version}</version>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.pulsar</groupId>
36+
<artifactId>pulsar-client-all</artifactId>
37+
<version>${pulsar.version}</version>
38+
</dependency>
39+
<dependency>
40+
<groupId>org.mockito</groupId>
41+
<artifactId>mockito-core</artifactId>
42+
<version>${mockito.version}</version>
43+
<scope>test</scope>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.testcontainers</groupId>
47+
<artifactId>pulsar</artifactId>
48+
<scope>test</scope>
49+
</dependency>
50+
</dependencies>
51+
<build>
52+
<plugins>
53+
<plugin>
54+
<groupId>org.apache.maven.plugins</groupId>
55+
<artifactId>maven-compiler-plugin</artifactId>
56+
<configuration>
57+
<source>17</source>
58+
<target>17</target>
59+
</configuration>
60+
</plugin>
61+
</plugins>
62+
</build>
63+
<inceptionYear>2024</inceptionYear>
64+
2865
</project>

0 commit comments

Comments
 (0)