Skip to content

Commit fcdd7a1

Browse files
Handle ServerUnreachableException and and ServerNotFoundException as NO_SUB for transient session (#120)
THIS IS A TRADE-OFF! Reason: A network partition (which should be rare compared to a server crash) may also cause SUE and SNFE, leading to unexpected clearing of route information. Therefore, manual intervention is required if this happens.
1 parent eb63c2e commit fcdd7a1

File tree

4 files changed

+150
-13
lines changed

4 files changed

+150
-13
lines changed

bifromq-mqtt/bifromq-mqtt-broker-client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,10 @@
4141
<groupId>org.slf4j</groupId>
4242
<artifactId>slf4j-api</artifactId>
4343
</dependency>
44+
<dependency>
45+
<groupId>org.mockito</groupId>
46+
<artifactId>mockito-core</artifactId>
47+
<scope>test</scope>
48+
</dependency>
4449
</dependencies>
4550
</project>

bifromq-mqtt/bifromq-mqtt-broker-client/src/main/java/com/baidu/bifromq/mqtt/inbox/MqttBrokerClient.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import static java.util.Collections.emptyMap;
1818

1919
import com.baidu.bifromq.baserpc.IRPCClient;
20+
import com.baidu.bifromq.baserpc.exception.ServerNotFoundException;
21+
import com.baidu.bifromq.baserpc.exception.ServerUnreachableException;
2022
import com.baidu.bifromq.mqtt.inbox.rpc.proto.OnlineInboxBrokerGrpc;
2123
import com.baidu.bifromq.mqtt.inbox.rpc.proto.SubReply;
2224
import com.baidu.bifromq.mqtt.inbox.rpc.proto.SubRequest;
@@ -25,12 +27,13 @@
2527
import com.baidu.bifromq.mqtt.inbox.rpc.proto.WriteReply;
2628
import com.baidu.bifromq.mqtt.inbox.rpc.proto.WriteRequest;
2729
import com.baidu.bifromq.mqtt.inbox.util.DeliveryGroupKeyUtil;
28-
import com.baidu.bifromq.plugin.subbroker.DeliveryReply;
29-
import com.baidu.bifromq.plugin.subbroker.DeliveryRequest;
30-
import com.baidu.bifromq.plugin.subbroker.IDeliverer;
30+
import com.baidu.bifromq.plugin.subbroker.*;
31+
import com.baidu.bifromq.type.MatchInfo;
3132
import com.baidu.bifromq.type.QoS;
3233
import com.google.common.base.Preconditions;
3334
import io.reactivex.rxjava3.core.Observable;
35+
36+
import java.util.*;
3437
import java.util.concurrent.CompletableFuture;
3538
import java.util.concurrent.atomic.AtomicBoolean;
3639
import lombok.extern.slf4j.Slf4j;
@@ -40,14 +43,8 @@ final class MqttBrokerClient implements IMqttBrokerClient {
4043
private final AtomicBoolean hasStopped = new AtomicBoolean();
4144
private final IRPCClient rpcClient;
4245

43-
MqttBrokerClient(MqttBrokerClientBuilder builder) {
44-
this.rpcClient = IRPCClient.newBuilder()
45-
.bluePrint(RPCBluePrint.INSTANCE)
46-
.executor(builder.executor)
47-
.eventLoopGroup(builder.eventLoopGroup)
48-
.sslContext(builder.sslContext)
49-
.crdtService(builder.crdtService)
50-
.build();
46+
MqttBrokerClient(IRPCClient rpcClient) {
47+
this.rpcClient = rpcClient;
5148
}
5249

5350
public IDeliverer open(String delivererKey) {
@@ -127,7 +124,33 @@ public CompletableFuture<DeliveryReply> deliver(DeliveryRequest request) {
127124
.setReqId(reqId)
128125
.setRequest(request)
129126
.build())
130-
.thenApply(WriteReply::getReply);
127+
.thenApply(WriteReply::getReply)
128+
.handle((reply, ex) -> {
129+
if (ex != null) {
130+
if (ex.getCause() instanceof ServerUnreachableException
131+
|| ex.getCause() instanceof ServerNotFoundException) {
132+
DeliveryReply.Builder replyBuilder = DeliveryReply.newBuilder();
133+
Map<String, DeliveryResults> resultsMap = new HashMap<>();
134+
for (Map.Entry<String, DeliveryPackage> entry : request.getPackageMap().entrySet()) {
135+
String tenantId = entry.getKey();
136+
Set<MatchInfo> noSub = new HashSet<>();
137+
DeliveryResults.Builder deliveryResultsBuilder = DeliveryResults.newBuilder();
138+
for (DeliveryPack pack: entry.getValue().getPackList()) {
139+
noSub.addAll(pack.getMatchInfoList());
140+
}
141+
noSub.forEach(matchInfo -> deliveryResultsBuilder.addResult(DeliveryResult.newBuilder()
142+
.setMatchInfo(matchInfo)
143+
.setCode(DeliveryResult.Code.NO_SUB)
144+
.build()));
145+
resultsMap.put(tenantId, deliveryResultsBuilder.build());
146+
}
147+
return replyBuilder.putAllResult(resultsMap).build();
148+
} else {
149+
throw new RuntimeException(ex);
150+
}
151+
}
152+
return reply;
153+
});
131154
}
132155

133156
@Override

bifromq-mqtt/bifromq-mqtt-broker-client/src/main/java/com/baidu/bifromq/mqtt/inbox/MqttBrokerClientBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.baidu.bifromq.mqtt.inbox;
1515

1616
import com.baidu.bifromq.basecrdt.service.ICRDTService;
17+
import com.baidu.bifromq.baserpc.IRPCClient;
1718
import io.netty.channel.EventLoopGroup;
1819
import io.netty.handler.ssl.SslContext;
1920
import java.util.concurrent.Executor;
@@ -32,6 +33,12 @@ public final class MqttBrokerClientBuilder implements IMqttBrokerClientBuilder {
3233
Executor executor;
3334

3435
public IMqttBrokerClient build() {
35-
return new MqttBrokerClient(this);
36+
return new MqttBrokerClient(IRPCClient.newBuilder()
37+
.bluePrint(RPCBluePrint.INSTANCE)
38+
.executor(executor)
39+
.eventLoopGroup(eventLoopGroup)
40+
.sslContext(sslContext)
41+
.crdtService(crdtService)
42+
.build());
3643
}
3744
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package com.baidu.bifromq.mqtt.inbox;
2+
3+
import com.baidu.bifromq.baserpc.IRPCClient;
4+
import com.baidu.bifromq.baserpc.exception.ServerNotFoundException;
5+
import com.baidu.bifromq.mqtt.inbox.rpc.proto.OnlineInboxBrokerGrpc;
6+
import com.baidu.bifromq.mqtt.inbox.rpc.proto.WriteReply;
7+
import com.baidu.bifromq.mqtt.inbox.rpc.proto.WriteRequest;
8+
import com.baidu.bifromq.mqtt.inbox.util.DeliveryGroupKeyUtil;
9+
import com.baidu.bifromq.plugin.subbroker.*;
10+
import com.baidu.bifromq.type.MatchInfo;
11+
import org.mockito.Mock;
12+
import org.mockito.MockitoAnnotations;
13+
import org.testng.annotations.AfterMethod;
14+
import org.testng.annotations.BeforeMethod;
15+
import org.testng.annotations.Test;
16+
17+
import java.util.concurrent.CompletableFuture;
18+
19+
import static java.util.Collections.emptyMap;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.when;
22+
23+
public class MqttBrokerClientTest {
24+
private AutoCloseable closeable;
25+
@Mock
26+
private IRPCClient rpcClient;
27+
private MqttBrokerClient mqttBrokerClient;
28+
@Mock
29+
private IDeliverer deliverer;
30+
@Mock
31+
private IRPCClient.IRequestPipeline<WriteRequest, WriteReply> pipeline;
32+
private final String delivererKey = "test:DelivererKey";
33+
private final String tenantId = "testTenantId";
34+
private final MatchInfo matchInfo = MatchInfo.newBuilder()
35+
.setTopicFilter("testTopicFilter")
36+
.setReceiverId("testReceiverId")
37+
.build();
38+
private final DeliveryRequest request = DeliveryRequest.newBuilder()
39+
.putPackage(tenantId, DeliveryPackage.newBuilder()
40+
.addPack(DeliveryPack.newBuilder()
41+
.addMatchInfo(matchInfo)
42+
.addMatchInfo(matchInfo).build())
43+
.build())
44+
.build();
45+
46+
@BeforeMethod
47+
public void setup() {
48+
closeable = MockitoAnnotations.openMocks(this);
49+
mqttBrokerClient = new MqttBrokerClient(rpcClient);
50+
}
51+
52+
@AfterMethod
53+
public void teardown() throws Exception {
54+
closeable.close();
55+
}
56+
57+
@Test
58+
public void deliveryPipelineWithOK() {
59+
CompletableFuture<WriteReply> future = new CompletableFuture<>();
60+
future.complete(WriteReply.newBuilder()
61+
.setReply(DeliveryReply.newBuilder()
62+
.putResult(tenantId, DeliveryResults.newBuilder()
63+
.addResult(DeliveryResult.newBuilder()
64+
.setCode(DeliveryResult.Code.OK)
65+
.setMatchInfo(matchInfo)
66+
.build())
67+
.build())
68+
.build())
69+
.build());
70+
71+
when(rpcClient.createRequestPipeline("", DeliveryGroupKeyUtil.parseServerId(delivererKey), "",
72+
emptyMap(), OnlineInboxBrokerGrpc.getWriteMethod())).thenReturn(pipeline);
73+
when(pipeline.invoke(any())).thenReturn(future);
74+
75+
deliverer = mqttBrokerClient.open(delivererKey);
76+
DeliveryReply reply = deliverer.deliver(request).join();
77+
DeliveryResults results = reply.getResultMap().get(tenantId);
78+
assert results != null;
79+
DeliveryResult result = results.getResult(0);
80+
assert result.getCode() == DeliveryResult.Code.OK;
81+
assert result.getMatchInfo().equals(matchInfo);
82+
}
83+
84+
@Test
85+
public void deliveryPipelineWithSpecialException() {
86+
CompletableFuture<WriteReply> future = new CompletableFuture<>();
87+
future.completeExceptionally(new ServerNotFoundException("Test exception"));
88+
89+
when(rpcClient.createRequestPipeline("", DeliveryGroupKeyUtil.parseServerId(delivererKey), "",
90+
emptyMap(), OnlineInboxBrokerGrpc.getWriteMethod())).thenReturn(pipeline);
91+
when(pipeline.invoke(any())).thenReturn(future);
92+
93+
deliverer = mqttBrokerClient.open(delivererKey);
94+
DeliveryReply reply = deliverer.deliver(request).join();
95+
DeliveryResults results = reply.getResultMap().get(tenantId);
96+
assert results != null;
97+
assert results.getResultList().size() == 1;
98+
DeliveryResult result = results.getResult(0);
99+
assert result.getCode() == DeliveryResult.Code.NO_SUB;
100+
assert result.getMatchInfo().equals(matchInfo);
101+
}
102+
}

0 commit comments

Comments
 (0)