Skip to content

Commit fd3cdda

Browse files
authored
[improve][pcip] PCIP-2 Distributed RPC framework implemented by the Pulsar client (apache#6)
**Feedback is not positive due to PIP-371.** apache/pulsar#23143 apache/pulsar#23194 **We need to implement this distributed RPC framework in a way that does not intrude into the pulsar core library. Therefore, we need to use two topics, one is the request topic and the other is the reply topic. The client side sends RPC requests to the request topic, the server side receives request message and performs customized processing, and finally sends them to the reply topic. The client receives the reply message and returns.** ### Motivation <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> As we known,Pulsar's current **asynchronous** publish-subscribe model serves well for decoupled message distribution, but it lacks a native mechanism for handling **synchronous** interactions typical of Remote Procedure Calls (RPC). This request-reply model can greatly enhance the utility of Pulsar. We can then use Pulsar as RPC. Why would we use Pulsar for this RPC call? - **Implement RPC using Apache Pulsar. Requests can be sent through a client, received by one or more servers and processed in parallel. Finally, the server returns all processing results after processing, and the client can perform summary and other operations after receiving them.** - **This proposal to achieve the function is `request`. `Request` and existing send function of pulsar can be mixed to same topic. This means that the user can choose, and the call to the server side (consumer) can be asynchronous or synchronous, which is controlled by the user flexibly.** - You can directly use Pulsar's own delaying messages, that is, you can execute RPC regularly. - You can directly use Pulsar's own load balancing mechanism. - You can directly use Pulsar's own message consumption throttling mechanism. - You can directly use Pulsar's own expansion and contraction mechanism. - You can directly use Pulsar's own message call tracking, monitoring, and logging mechanisms. ### Modifications ![RPC.drawio](https://github.com/user-attachments/assets/91208b7b-7f65-4a22-8bfb-fc3161e4ec18)
1 parent 033bcc7 commit fd3cdda

32 files changed

+3485
-1
lines changed

pcip/pcip-2.md

Lines changed: 718 additions & 0 deletions
Large diffs are not rendered by default.
242 KB
Loading

pom.xml

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
<org.testing.version>7.10.2</org.testing.version>
4646
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
4747
<google-java-format.version>1.10.0</google-java-format.version>
48+
<commons-pool.version>2.12.0</commons-pool.version>
49+
<awaitility.version>4.2.2</awaitility.version>
50+
<testcontainers.version>1.20.1</testcontainers.version>
4851
</properties>
4952

5053
<modules>
@@ -57,6 +60,7 @@
5760
<module>pulsar-transaction-contrib</module>
5861
<module>pulsar-metrics-contrib</module>
5962
<module>pulsar-auth-contrib</module>
63+
<module>pulsar-rpc-contrib</module>
6064
</modules>
6165

6266
<dependencyManagement>
@@ -75,6 +79,22 @@
7579
<type>pom</type>
7680
<scope>import</scope>
7781
</dependency>
82+
<dependency>
83+
<groupId>org.apache.commons</groupId>
84+
<artifactId>commons-pool2</artifactId>
85+
<version>${commons-pool.version}</version>
86+
</dependency>
87+
<dependency>
88+
<groupId>org.awaitility</groupId>
89+
<artifactId>awaitility</artifactId>
90+
<version>${awaitility.version}</version>
91+
<scope>test</scope>
92+
</dependency>
93+
<dependency>
94+
<groupId>org.testcontainers</groupId>
95+
<artifactId>pulsar</artifactId>
96+
<version>${testcontainers.version}</version>
97+
</dependency>
7898
</dependencies>
7999
</dependencyManagement>
80100

@@ -84,7 +104,6 @@
84104
<artifactId>lombok</artifactId>
85105
<version>${lombok.version}</version>
86106
</dependency>
87-
88107
<dependency>
89108
<groupId>org.slf4j</groupId>
90109
<artifactId>slf4j-api</artifactId>
@@ -100,6 +119,12 @@
100119
<version>${org.testing.version}</version>
101120
<scope>test</scope>
102121
</dependency>
122+
<dependency>
123+
<groupId>org.awaitility</groupId>
124+
<artifactId>awaitility</artifactId>
125+
<version>${awaitility.version}</version>
126+
<scope>test</scope>
127+
</dependency>
103128
</dependencies>
104129
<build>
105130
<plugins>

pulsar-rpc-contrib/pom.xml

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
16+
-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
21+
<parent>
22+
<groupId>org.apache</groupId>
23+
<artifactId>pulsar-java-contrib</artifactId>
24+
<version>1.0.0-SNAPSHOT</version>
25+
</parent>
26+
<inceptionYear>2024</inceptionYear>
27+
28+
<artifactId>pulsar-rpc-contrib</artifactId>
29+
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.apache.pulsar</groupId>
33+
<artifactId>pulsar-client-admin</artifactId>
34+
<scope>test</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.pulsar</groupId>
38+
<artifactId>pulsar-client</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.apache.commons</groupId>
42+
<artifactId>commons-pool2</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.awaitility</groupId>
46+
<artifactId>awaitility</artifactId>
47+
<scope>test</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.testcontainers</groupId>
51+
<artifactId>pulsar</artifactId>
52+
</dependency>
53+
</dependencies>
54+
55+
<build>
56+
<resources>
57+
<resource>
58+
<directory>src/main/resources</directory>
59+
<filtering>true</filtering>
60+
</resource>
61+
</resources>
62+
</build>
63+
64+
</project>
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.pulsar.rpc.contrib.client;
15+
16+
import java.util.concurrent.CompletableFuture;
17+
import lombok.extern.slf4j.Slf4j;
18+
import org.apache.pulsar.client.api.Consumer;
19+
import org.apache.pulsar.client.api.Message;
20+
import org.apache.pulsar.client.api.MessageId;
21+
22+
/**
23+
* Default implementation of {@link RequestCallBack} that handles callback events for Pulsar RPC communications.
24+
*/
25+
@Slf4j
26+
public class DefaultRequestCallBack<V> implements RequestCallBack<V> {
27+
28+
@Override
29+
public void onSendRequestSuccess(String correlationId, MessageId messageId) {
30+
31+
}
32+
33+
@Override
34+
public void onSendRequestError(String correlationId, Throwable t,
35+
CompletableFuture<V> replyFuture) {
36+
replyFuture.completeExceptionally(t);
37+
}
38+
39+
@Override
40+
public void onReplySuccess(String correlationId, String subscription,
41+
V value, CompletableFuture<V> replyFuture) {
42+
replyFuture.complete(value);
43+
}
44+
45+
@Override
46+
public void onReplyError(String correlationId, String subscription,
47+
String errorMessage, CompletableFuture<V> replyFuture) {
48+
replyFuture.completeExceptionally(new Exception(errorMessage));
49+
}
50+
51+
@Override
52+
public void onTimeout(String correlationId, Throwable t) {
53+
54+
}
55+
56+
@Override
57+
public void onReplyMessageAckFailed(String correlationId, Consumer<V> consumer, Message<V> msg, Throwable t) {
58+
consumer.acknowledgeAsync(msg.getMessageId()).exceptionally(ex -> {
59+
log.warn("<onReplyMessageAckFailed> [{}] [{}] Acknowledging message {} failed again.",
60+
msg.getTopicName(), correlationId, msg.getMessageId(), ex);
61+
return null;
62+
});
63+
}
64+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.pulsar.rpc.contrib.client;
15+
16+
import java.util.Collections;
17+
import java.util.Map;
18+
import java.util.concurrent.CompletableFuture;
19+
import lombok.NonNull;
20+
import org.apache.pulsar.client.api.Schema;
21+
import org.apache.pulsar.client.api.TypedMessageBuilder;
22+
import org.apache.pulsar.rpc.contrib.common.PulsarRpcClientException;
23+
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
24+
25+
/**
26+
* Provides the functionality to send asynchronous requests and handle replies using Apache Pulsar as the
27+
* messaging system. This client manages request-response interactions ensuring that messages are sent
28+
* to the correct topics and handling responses through callbacks.
29+
*
30+
* @param <T> The type of the request messages.
31+
* @param <V> The type of the reply messages.
32+
*/
33+
public interface PulsarRpcClient<T, V> extends AutoCloseable {
34+
35+
/**
36+
* Creates a builder for configuring a new {@link PulsarRpcClient}.
37+
*
38+
* @return A new instance of {@link PulsarRpcClientBuilder}.
39+
*/
40+
static <T, V> PulsarRpcClientBuilder<T, V> builder(@NonNull Schema<T> requestSchema,
41+
@NonNull Schema<V> replySchema) {
42+
return new PulsarRpcClientBuilderImpl<>(requestSchema, replySchema);
43+
}
44+
45+
/**
46+
* Synchronously sends a request and waits for the replies.
47+
*
48+
* @param correlationId A unique identifier for the request.
49+
* @param value The value used to generate the request message
50+
* @return The reply value.
51+
* @throws PulsarRpcClientException if an error occurs during the request or while waiting for the reply.
52+
*/
53+
default V request(String correlationId, T value) throws PulsarRpcClientException {
54+
return request(correlationId, value, Collections.emptyMap());
55+
}
56+
57+
/**
58+
* Synchronously sends a request and waits for the replies.
59+
*
60+
* @param correlationId A unique identifier for the request.
61+
* @param value The value used to generate the request message
62+
* @param config Configuration map for creating a request producer,
63+
* will call {@link TypedMessageBuilder#loadConf(Map)}
64+
* @return The reply value.
65+
* @throws PulsarRpcClientException if an error occurs during the request or while waiting for the reply.
66+
*/
67+
V request(String correlationId, T value, Map<String, Object> config) throws PulsarRpcClientException;
68+
69+
/**
70+
* Asynchronously sends a request and returns a future that completes with the reply.
71+
*
72+
* @param correlationId A unique identifier for the request.
73+
* @param value The value used to generate the request message
74+
* @return A CompletableFuture that will complete with the reply value.
75+
*/
76+
default CompletableFuture<V> requestAsync(String correlationId, T value) {
77+
return requestAsync(correlationId, value, Collections.emptyMap());
78+
}
79+
80+
/**
81+
* Asynchronously sends a request and returns a future that completes with the reply.
82+
*
83+
* @param correlationId A unique identifier for the request.
84+
* @param value The value used to generate the request message
85+
* @param config Configuration map for creating a request producer,
86+
* will call {@link TypedMessageBuilder#loadConf(Map)}
87+
* @return A CompletableFuture that will complete with the reply value.
88+
*/
89+
CompletableFuture<V> requestAsync(String correlationId, T value, Map<String, Object> config);
90+
91+
/**
92+
* Removes a request from the tracking map based on its correlation ID.
93+
*
94+
* <p>When this method is executed, ReplyListener the received message will not be processed again.
95+
* You need to make sure that this request has been processed through the callback, or you need to resend it.
96+
*
97+
* @param correlationId The correlation ID of the request to remove.
98+
*/
99+
void removeRequest(String correlationId);
100+
101+
@VisibleForTesting
102+
int pendingRequestSize();
103+
104+
/**
105+
* Closes this client and releases any resources associated with it. This includes closing any active
106+
* producers and consumers and clearing pending requests.
107+
*
108+
* @throws PulsarRpcClientException if there is an error during the closing process.
109+
*/
110+
@Override
111+
void close() throws PulsarRpcClientException;
112+
}

0 commit comments

Comments
 (0)