Skip to content

Commit 32676de

Browse files
committed
Merge remote-tracking branch 'github/develop' into feature-broker-22
# Conflicts: # build.gradle # src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java # src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java # src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos2PublishOutHandler.java # src/main/java/com/ss/mqtt/broker/model/MqttSession.java # src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java
2 parents e47597e + e6f9fa7 commit 32676de

39 files changed

+1013
-296
lines changed

build.gradle

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.7.0"
35+
rlibVersion = "9.8.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'
@@ -86,6 +86,15 @@ allprojects {
8686
tasks.withType(Test) {
8787
maxParallelForks = 2
8888
forkEvery = 100
89+
jvmArgs += "--enable-preview"
90+
}
91+
92+
tasks.withType(JavaCompile) {
93+
options.compilerArgs += "--enable-preview"
94+
}
95+
96+
tasks.withType(GroovyCompile) {
97+
options.forkOptions.jvmArgs += "--enable-preview"
8998
}
9099

91100
processResources {

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ private interface ChannelFactory extends
103103
clientIdRegistry,
104104
authenticationService,
105105
mqttSessionService,
106-
publishRetryService
106+
publishRetryService,
107+
subscriptionService
107108
);
108109
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
109110
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
@@ -121,9 +122,15 @@ private interface ChannelFactory extends
121122
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
122123
@NotNull ClientIdRegistry clientIdRegistry,
123124
@NotNull MqttSessionService mqttSessionService,
124-
@NotNull PublishRetryService publishRetryService
125+
@NotNull PublishRetryService publishRetryService,
126+
@NotNull SubscriptionService subscriptionService
125127
) {
126-
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService);
128+
return new DefaultMqttClientReleaseHandler(
129+
clientIdRegistry,
130+
mqttSessionService,
131+
publishRetryService,
132+
subscriptionService
133+
);
127134
}
128135

129136
@Bean
@@ -169,7 +176,7 @@ private interface ChannelFactory extends
169176

170177
@Bean
171178
@NotNull SubscriptionService subscriptionService() {
172-
return new SimpleSubscriptionService(new SimpleSubscriptions());
179+
return new SimpleSubscriptionService();
173180
}
174181

175182
@Bean
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.ss.mqtt.broker.exception;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class InconsistentSubscriptionStateException extends RuntimeException {
6+
7+
public InconsistentSubscriptionStateException(@NotNull String message) {
8+
super(message);
9+
}
10+
11+
public InconsistentSubscriptionStateException(@NotNull Throwable cause) {
12+
super(cause);
13+
}
14+
}

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package com.ss.mqtt.broker.handler.client;
22

3-
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
43
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
4+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
55
import com.ss.mqtt.broker.service.ClientIdRegistry;
66
import com.ss.mqtt.broker.service.MqttSessionService;
77
import com.ss.mqtt.broker.service.PublishRetryService;
8+
import com.ss.mqtt.broker.service.SubscriptionService;
89
import com.ss.rlib.common.util.StringUtils;
910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.log4j.Log4j2;
@@ -19,6 +20,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
1920
private final @NotNull ClientIdRegistry clientIdRegistry;
2021
private final @NotNull MqttSessionService sessionService;
2122
private final @NotNull PublishRetryService publishRetryService;
23+
private final @NotNull SubscriptionService subscriptionService;
2224

2325
@Override
2426
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -41,9 +43,12 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
4143

4244
Mono<?> asyncActions = null;
4345

44-
if (session != null && client.getConnectionConfig().isSessionsEnabled()) {
45-
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
46-
client.setSession(null);
46+
if (session != null) {
47+
subscriptionService.cleanSubscriptions(client, session);
48+
if (client.getConnectionConfig().isSessionsEnabled()) {
49+
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
50+
client.setSession(null);
51+
}
4752
}
4853

4954
if (asyncActions != null) {

src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@
44
import com.ss.mqtt.broker.service.ClientIdRegistry;
55
import com.ss.mqtt.broker.service.MqttSessionService;
66
import com.ss.mqtt.broker.service.PublishRetryService;
7+
import com.ss.mqtt.broker.service.SubscriptionService;
78
import org.jetbrains.annotations.NotNull;
89

910
public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
1011

1112
public DefaultMqttClientReleaseHandler(
1213
@NotNull ClientIdRegistry clientIdRegistry,
1314
@NotNull MqttSessionService sessionService,
14-
@NotNull PublishRetryService publishRetryService
15+
@NotNull PublishRetryService publishRetryService,
16+
@NotNull SubscriptionService subscriptionService
1517
) {
16-
super(clientIdRegistry, sessionService, publishRetryService);
18+
super(clientIdRegistry, sessionService, publishRetryService, subscriptionService);
1719
}
1820
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
package com.ss.mqtt.broker.handler.packet.in;
22

3+
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
34
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD;
45
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID;
5-
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
66
import static com.ss.mqtt.broker.util.ReactorUtils.ifTrue;
77
import com.ss.mqtt.broker.exception.ConnectionRejectException;
88
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
9-
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
109
import com.ss.mqtt.broker.model.MqttSession;
10+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1111
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1212
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
13-
import com.ss.mqtt.broker.service.AuthenticationService;
14-
import com.ss.mqtt.broker.service.ClientIdRegistry;
15-
import com.ss.mqtt.broker.service.MqttSessionService;
16-
import com.ss.mqtt.broker.service.PublishRetryService;
13+
import com.ss.mqtt.broker.service.*;
1714
import com.ss.rlib.common.util.StringUtils;
1815
import lombok.RequiredArgsConstructor;
1916
import org.jetbrains.annotations.NotNull;
@@ -26,6 +23,7 @@ public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie
2623
private final @NotNull AuthenticationService authenticationService;
2724
private final @NotNull MqttSessionService mqttSessionService;
2825
private final @NotNull PublishRetryService publishRetryService;
26+
private final @NotNull SubscriptionService subscriptionService;
2927

3028
@Override
3129
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -139,6 +137,8 @@ private Mono<Boolean> onConnected(
139137

140138
publishRetryService.register(client);
141139

140+
subscriptionService.restoreSubscriptions(client, session);
141+
142142
return Mono.just(Boolean.TRUE);
143143
}
144144

src/main/java/com/ss/mqtt/broker/handler/publish/in/AbstractPublishInHandler.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.QoS;
6+
import com.ss.mqtt.broker.model.Subscriber;
7+
import com.ss.mqtt.broker.network.client.MqttClient;
8+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
59
import com.ss.mqtt.broker.service.SubscriptionService;
610
import lombok.RequiredArgsConstructor;
711
import org.jetbrains.annotations.NotNull;
@@ -12,7 +16,30 @@ abstract class AbstractPublishInHandler implements PublishInHandler {
1216
protected final @NotNull SubscriptionService subscriptionService;
1317
protected final @NotNull PublishOutHandler[] publishOutHandlers;
1418

15-
protected @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
19+
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20+
handleResult(client, packet, subscriptionService.forEachTopicSubscriber(
21+
packet.getTopicName(),
22+
packet,
23+
this::sendToSubscriber
24+
));
25+
}
26+
27+
private @NotNull ActionResult sendToSubscriber(
28+
@NotNull Subscriber subscriber,
29+
@NotNull PublishInPacket packet
30+
) {
31+
return publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
32+
}
33+
34+
private @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
1635
return publishOutHandlers[qos.ordinal()];
1736
}
37+
38+
protected void handleResult(
39+
@NotNull MqttClient client,
40+
@NotNull PublishInPacket packet,
41+
@NotNull ActionResult result
42+
) {
43+
// nothing to do
44+
}
1845
}
Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4-
import com.ss.mqtt.broker.network.client.MqttClient;
5-
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
64
import com.ss.mqtt.broker.service.SubscriptionService;
75
import org.jetbrains.annotations.NotNull;
86

@@ -14,14 +12,4 @@ public Qos0PublishInHandler(
1412
) {
1513
super(subscriptionService, publishOutHandlers);
1614
}
17-
18-
@Override
19-
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20-
21-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
22-
23-
for (var subscriber : subscribers) {
24-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
25-
}
26-
}
2715
}

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos1PublishInHandler.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode;
56
import com.ss.mqtt.broker.network.client.MqttClient;
67
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
@@ -19,21 +20,33 @@ public Qos1PublishInHandler(
1920
@Override
2021
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
2122

22-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
23+
var session = client.getSession();
2324

24-
for (var subscriber : subscribers) {
25-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
25+
// it means this client was already closed
26+
if (session == null) {
27+
return;
2628
}
2729

28-
var reasonCode = subscribers.isEmpty() ?
29-
PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS : PublishAckReasonCode.SUCCESS;
30+
super.handle(client, packet);
31+
}
32+
33+
@Override
34+
protected void handleResult(
35+
@NotNull MqttClient client,
36+
@NotNull PublishInPacket packet,
37+
@NotNull ActionResult result
38+
) {
39+
40+
var reasonCode = switch (result) {
41+
case EMPTY -> PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
42+
case SUCCESS -> PublishAckReasonCode.SUCCESS;
43+
default -> PublishAckReasonCode.UNSPECIFIED_ERROR;
44+
};
3045

31-
var ackPacket = client.getPacketOutFactory().newPublishAck(
46+
client.send(client.getPacketOutFactory().newPublishAck(
3247
client,
3348
packet.getPacketId(),
3449
reasonCode
35-
);
36-
37-
client.send(ackPacket);
50+
));
3851
}
3952
}

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos2PublishInHandler.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.MqttSession;
56
import com.ss.mqtt.broker.model.reason.code.PublishCompletedReasonCode;
67
import com.ss.mqtt.broker.model.reason.code.PublishReceivedReasonCode;
@@ -38,20 +39,23 @@ public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet)
3839
}
3940
}
4041

41-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
42-
43-
for (var subscriber : subscribers) {
44-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
45-
}
42+
super.handle(client, packet);
43+
}
4644

47-
var reasonCode = subscribers.isEmpty() ?
48-
PublishReceivedReasonCode.NO_MATCHING_SUBSCRIBERS : PublishReceivedReasonCode.SUCCESS;
45+
@Override
46+
protected void handleResult(
47+
@NotNull MqttClient client,
48+
@NotNull PublishInPacket packet,
49+
@NotNull ActionResult result
50+
) {
4951

50-
var packetId = packet.getPacketId();
51-
session.registerInPublish(packet, this, packetId);
52+
var reasonCode = switch (result) {
53+
case EMPTY -> PublishReceivedReasonCode.NO_MATCHING_SUBSCRIBERS;
54+
case SUCCESS -> PublishReceivedReasonCode.SUCCESS;
55+
default -> PublishReceivedReasonCode.UNSPECIFIED_ERROR;
56+
};
5257

53-
var packetOutFactory = client.getPacketOutFactory();
54-
client.send(packetOutFactory.newPublishReceived(
58+
client.send(client.getPacketOutFactory().newPublishReceived(
5559
client,
5660
packet.getPacketId(),
5761
reasonCode

0 commit comments

Comments
 (0)