Skip to content

Commit 06f31fa

Browse files
authored
[improve][pcip] PCIP-4 Improve delayed RPC message handling in pulsar-rpc (#12)
* optimize the delay RPC future * PCIP-4 Improve delayed RPC message handling in pulsar-rpc
1 parent 6de0430 commit 06f31fa

File tree

9 files changed

+612
-41
lines changed

9 files changed

+612
-41
lines changed

pcip/pcip-4.md

Lines changed: 342 additions & 0 deletions
Large diffs are not rendered by default.

pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClient.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Collections;
1717
import java.util.Map;
1818
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.TimeUnit;
1920
import lombok.NonNull;
2021
import org.apache.pulsar.client.api.Schema;
2122
import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -88,6 +89,62 @@ default CompletableFuture<V> requestAsync(String correlationId, T value) {
8889
*/
8990
CompletableFuture<V> requestAsync(String correlationId, T value, Map<String, Object> config);
9091

92+
/**
93+
* Deliver the message only at or after the specified absolute timestamp.
94+
* Asynchronously sends a request and returns a future that completes with the reply.
95+
*
96+
* @param correlationId A unique identifier for the request.
97+
* @param value The value used to generate the request message
98+
* @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server.
99+
* @return A CompletableFuture that will complete with the reply value.
100+
*/
101+
default CompletableFuture<V> requestAtAsync(String correlationId, T value, long timestamp) {
102+
return requestAtAsync(correlationId, value, Collections.emptyMap(), timestamp);
103+
}
104+
105+
/**
106+
* Deliver the message only at or after the specified absolute timestamp.
107+
* Asynchronously sends a request and returns a future that completes with the reply.
108+
*
109+
* @param correlationId A unique identifier for the request.
110+
* @param value The value used to generate the request message
111+
* @param config Configuration map for creating a request producer,
112+
* will call {@link TypedMessageBuilder#loadConf(Map)}
113+
* @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server.
114+
* @return A CompletableFuture that will complete with the reply value.
115+
*/
116+
CompletableFuture<V> requestAtAsync(String correlationId, T value, Map<String, Object> config,
117+
long timestamp);
118+
119+
/**
120+
* Request to deliver the message only after the specified relative delay.
121+
* Asynchronously sends a request and returns a future that completes with the reply.
122+
*
123+
* @param correlationId A unique identifier for the request.
124+
* @param value The value used to generate the request message
125+
* @param delay The amount of delay before the message will be delivered.
126+
* @param unit The time unit for the delay.
127+
* @return A CompletableFuture that will complete with the reply value.
128+
*/
129+
default CompletableFuture<V> requestAfterAsync(String correlationId, T value, long delay, TimeUnit unit) {
130+
return requestAfterAsync(correlationId, value, Collections.emptyMap(), delay, unit);
131+
}
132+
133+
/**
134+
* Request to deliver the message only after the specified relative delay.
135+
* Asynchronously sends a request and returns a future that completes with the reply.
136+
*
137+
* @param correlationId A unique identifier for the request.
138+
* @param value The value used to generate the request message
139+
* @param config Configuration map for creating a request producer,
140+
* will call {@link TypedMessageBuilder#loadConf(Map)}
141+
* @param delay The amount of delay before the message will be delivered.
142+
* @param unit The time unit for the delay.
143+
* @return A CompletableFuture that will complete with the reply value.
144+
*/
145+
CompletableFuture<V> requestAfterAsync(String correlationId, T value, Map<String, Object> config,
146+
long delay, TimeUnit unit);
147+
91148
/**
92149
* Removes a request from the tracking map based on its correlation ID.
93150
*

pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClientImpl.java

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package org.apache.pulsar.rpc.contrib.client;
1515

1616
import static lombok.AccessLevel.PACKAGE;
17+
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
1718
import java.io.IOException;
1819
import java.time.Duration;
1920
import java.util.Map;
@@ -104,6 +105,7 @@ public void close() throws PulsarRpcClientException {
104105
}
105106
}
106107

108+
@Override
107109
public V request(String correlationId, T value, Map<String, Object> config) throws PulsarRpcClientException {
108110
try {
109111
return requestAsync(correlationId, value, config).get();
@@ -113,36 +115,80 @@ public V request(String correlationId, T value, Map<String, Object> config) thro
113115
}
114116
}
115117

118+
@Override
116119
public CompletableFuture<V> requestAsync(String correlationId, T value, Map<String, Object> config) {
120+
return internalRequest(correlationId, value, config, -1, -1, null);
121+
}
122+
123+
@Override
124+
public CompletableFuture<V> requestAtAsync(String correlationId, T value, Map<String, Object> config,
125+
long timestamp) {
126+
return internalRequest(correlationId, value, config, timestamp, -1, null);
127+
}
128+
129+
@Override
130+
public CompletableFuture<V> requestAfterAsync(String correlationId, T value, Map<String, Object> config,
131+
long delay, TimeUnit unit) {
132+
return internalRequest(correlationId, value, config, -1, delay, unit);
133+
}
134+
135+
private CompletableFuture<V> internalRequest(String correlationId, T value, Map<String, Object> config,
136+
long timestamp, long delay, TimeUnit unit) {
117137
CompletableFuture<V> replyFuture = new CompletableFuture<>();
118138
long replyTimeoutMillis = replyTimeout.toMillis();
119-
replyFuture.orTimeout(replyTimeoutMillis, TimeUnit.MILLISECONDS)
120-
.exceptionally(e -> {
121-
replyFuture.completeExceptionally(new PulsarRpcClientException(e.getMessage()));
122-
callback.onTimeout(correlationId, e);
123-
removeRequest(correlationId);
124-
return null;
125-
});
126-
pendingRequestsMap.put(correlationId, replyFuture);
127-
128139
TypedMessageBuilder<T> requestMessage = newRequestMessage(correlationId, value, config);
140+
if (timestamp == -1 && delay == -1) {
141+
replyFuture.orTimeout(replyTimeoutMillis, TimeUnit.MILLISECONDS)
142+
.exceptionally(e -> {
143+
replyFuture.completeExceptionally(new PulsarRpcClientException(e.getMessage()));
144+
callback.onTimeout(correlationId, e);
145+
removeRequest(correlationId);
146+
return null;
147+
});
148+
pendingRequestsMap.put(correlationId, replyFuture);
129149

130-
sender.sendRequest(requestMessage, replyTimeoutMillis)
131-
.thenAccept(requestMessageId -> {
132-
if (replyFuture.isCancelled() || replyFuture.isCompletedExceptionally()) {
150+
sender.sendRequest(requestMessage, replyTimeoutMillis)
151+
.thenAccept(requestMessageId -> {
152+
if (replyFuture.isCancelled() || replyFuture.isCompletedExceptionally()) {
153+
removeRequest(correlationId);
154+
} else {
155+
callback.onSendRequestSuccess(correlationId, requestMessageId);
156+
}
157+
}).exceptionally(ex -> {
158+
if (callback != null) {
159+
callback.onSendRequestError(correlationId, ex, replyFuture);
160+
} else {
161+
replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage()));
162+
}
133163
removeRequest(correlationId);
134-
} else {
135-
callback.onSendRequestSuccess(correlationId, requestMessageId);
136-
}
137-
}).exceptionally(ex -> {
138-
if (callback != null) {
139-
callback.onSendRequestError(correlationId, ex, replyFuture);
140-
} else {
141-
replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage()));
142-
}
143-
removeRequest(correlationId);
144-
return null;
145-
});
164+
return null;
165+
});
166+
} else {
167+
// Handle Delayed RPC.
168+
if (pendingRequestsMap.containsKey(correlationId)) {
169+
removeRequest(correlationId);
170+
}
171+
172+
if (timestamp > 0) {
173+
requestMessage.property(REQUEST_DELIVER_AT_TIME, String.valueOf(timestamp));
174+
requestMessage.deliverAt(timestamp);
175+
} else if (delay > 0 && unit != null) {
176+
String delayedAt = String.valueOf(System.currentTimeMillis() + unit.toMillis(delay));
177+
requestMessage.property(REQUEST_DELIVER_AT_TIME, delayedAt);
178+
requestMessage.deliverAfter(delay, unit);
179+
}
180+
sender.sendRequest(requestMessage, replyTimeoutMillis).thenAccept(requestMessageId -> {
181+
callback.onSendRequestSuccess(correlationId, requestMessageId);
182+
}).exceptionally(ex -> {
183+
if (callback != null) {
184+
callback.onSendRequestError(correlationId, ex, replyFuture);
185+
} else {
186+
replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage()));
187+
}
188+
return null;
189+
});
190+
}
191+
146192
return replyFuture;
147193
}
148194

pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
package org.apache.pulsar.rpc.contrib.client;
1515

1616
import static org.apache.pulsar.rpc.contrib.common.Constants.ERROR_MESSAGE;
17+
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
18+
import static org.apache.pulsar.rpc.contrib.common.Constants.SERVER_SUB;
1719
import java.util.concurrent.CompletableFuture;
1820
import java.util.concurrent.ConcurrentHashMap;
1921
import lombok.AccessLevel;
@@ -50,17 +52,19 @@ class ReplyListener<V> implements MessageListener<V> {
5052
public void received(Consumer<V> consumer, Message<V> msg) {
5153
String correlationId = msg.getKey();
5254
try {
53-
if (!pendingRequestsMap.containsKey(correlationId)) {
55+
if (!pendingRequestsMap.containsKey(correlationId) && !msg.hasProperty(REQUEST_DELIVER_AT_TIME)) {
5456
log.warn("[{}] [{}] No pending request found for correlationId {}."
5557
+ " This may indicate the message has already been processed or timed out.",
5658
consumer.getTopic(), consumer.getConsumerName(), correlationId);
5759
} else {
58-
CompletableFuture<V> future = pendingRequestsMap.get(correlationId);
60+
CompletableFuture<V> future = pendingRequestsMap.computeIfAbsent(correlationId,
61+
key -> new CompletableFuture<>());
5962
String errorMessage = msg.getProperty(ERROR_MESSAGE);
63+
String serverSub = msg.getProperty(SERVER_SUB);
6064
if (errorMessage != null) {
61-
callBack.onReplyError(correlationId, consumer.getSubscription(), errorMessage, future);
65+
callBack.onReplyError(correlationId, serverSub, errorMessage, future);
6266
} else {
63-
callBack.onReplySuccess(correlationId, consumer.getSubscription(), msg.getValue(), future);
67+
callBack.onReplySuccess(correlationId, serverSub, msg.getValue(), future);
6468
}
6569
}
6670
} finally {

pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/common/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ public class Constants {
2121
public static final String REPLY_TOPIC = "replyTopic";
2222
public static final String ERROR_MESSAGE = "errorMessage";
2323
public static final String SERVER_SUB = "serverSub";
24+
public static final String REQUEST_DELIVER_AT_TIME = "deliverAt";
2425
}

pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/ReplySender.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package org.apache.pulsar.rpc.contrib.server;
1515

1616
import static org.apache.pulsar.rpc.contrib.common.Constants.ERROR_MESSAGE;
17+
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
1718
import static org.apache.pulsar.rpc.contrib.common.Constants.SERVER_SUB;
1819
import java.util.function.BiConsumer;
1920
import lombok.AccessLevel;
@@ -50,8 +51,9 @@ class ReplySender<T, V> {
5051
* @param sub The subscriber name involved in this interaction.
5152
*/
5253
@SneakyThrows
53-
void sendReply(String topic, String correlationId, V reply, T value, String sub) {
54-
onSend(topic, correlationId, msg -> msg.value(reply), value, sub);
54+
void sendReply(String topic, String correlationId, V reply, T value, String sub,
55+
long delayedAt) {
56+
onSend(topic, correlationId, msg -> msg.value(reply), value, sub, delayedAt);
5557
}
5658

5759
/**
@@ -64,8 +66,10 @@ void sendReply(String topic, String correlationId, V reply, T value, String sub)
6466
* @param sub The subscriber name involved in this interaction.
6567
*/
6668
@SneakyThrows
67-
void sendErrorReply(String topic, String correlationId, String errorMessage, T value, String sub) {
68-
onSend(topic, correlationId, msg -> msg.property(ERROR_MESSAGE, errorMessage).value(null), value, sub);
69+
void sendErrorReply(String topic, String correlationId, String errorMessage, T value, String sub,
70+
long delayedAt) {
71+
onSend(topic, correlationId, msg -> msg.property(ERROR_MESSAGE, errorMessage).value(null),
72+
value, sub, delayedAt);
6973
}
7074

7175
/**
@@ -79,17 +83,17 @@ void sendErrorReply(String topic, String correlationId, String errorMessage, T v
7983
* @param sub The subscriber name to be included in the message metadata.
8084
*/
8185
@SneakyThrows
82-
void onSend(String topic,
83-
String correlationId,
84-
java.util.function.Consumer<TypedMessageBuilder<V>> consumer,
85-
T value,
86-
String sub) {
86+
void onSend(String topic, String correlationId, java.util.function.Consumer<TypedMessageBuilder<V>> consumer,
87+
T value, String sub, long delayedAt) {
8788
log.debug("Sending {}", correlationId);
8889
Producer<V> producer = pool.borrowObject(topic);
8990
try {
9091
TypedMessageBuilder<V> builder = producer.newMessage()
9192
.key(correlationId)
9293
.property(SERVER_SUB, sub);
94+
if (delayedAt > 0) {
95+
builder.property(REQUEST_DELIVER_AT_TIME, String.valueOf(delayedAt));
96+
}
9397
consumer.accept(builder);
9498
builder.sendAsync()
9599
.exceptionally(e -> {

pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/RequestListener.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package org.apache.pulsar.rpc.contrib.server;
1515

1616
import static org.apache.pulsar.rpc.contrib.common.Constants.REPLY_TOPIC;
17+
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
1718
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_TIMEOUT_MILLIS;
1819
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.ExecutionException;
@@ -53,7 +54,7 @@ class RequestListener<T, V> implements MessageListener<T> {
5354
public void received(Consumer<T> consumer, Message<T> msg) {
5455
long replyTimeout = Long.parseLong(msg.getProperty(REQUEST_TIMEOUT_MILLIS))
5556
- (System.currentTimeMillis() - msg.getPublishTime());
56-
if (replyTimeout <= 0) {
57+
if (replyTimeout <= 0 && !msg.hasProperty(REQUEST_DELIVER_AT_TIME)) {
5758
consumer.acknowledgeAsync(msg);
5859
return;
5960
}
@@ -62,12 +63,15 @@ public void received(Consumer<T> consumer, Message<T> msg) {
6263
String requestSubscription = consumer.getSubscription();
6364
String replyTopic = msg.getProperty(REPLY_TOPIC);
6465
T value = msg.getValue();
65-
66+
long delayedAt = msg.hasProperty(REQUEST_DELIVER_AT_TIME)
67+
? Long.parseLong(msg.getProperty(REQUEST_DELIVER_AT_TIME))
68+
+ Long.parseLong(msg.getProperty(REQUEST_TIMEOUT_MILLIS))
69+
: 0;
6670
try {
6771
requestFunction.apply(value)
6872
.orTimeout(replyTimeout, TimeUnit.MILLISECONDS)
6973
.thenAccept(reply -> {
70-
sender.sendReply(replyTopic, correlationId, reply, value, requestSubscription);
74+
sender.sendReply(replyTopic, correlationId, reply, value, requestSubscription, delayedAt);
7175
})
7276
.get();
7377
} catch (ExecutionException e) {
@@ -79,7 +83,7 @@ public void received(Consumer<T> consumer, Message<T> msg) {
7983
log.error("[{}] Error processing request", correlationId, e);
8084
sender.sendErrorReply(replyTopic, correlationId,
8185
cause.getClass().getName() + ": " + cause.getMessage(),
82-
value, requestSubscription);
86+
value, requestSubscription, delayedAt);
8387
}
8488
} catch (InterruptedException e) {
8589
Thread.currentThread().interrupt();

0 commit comments

Comments
 (0)