Skip to content

Commit 87a2952

Browse files
committed
Merge branch 'feature-broker-22' into feature-broker-25
# Conflicts: # src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java # src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java # src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java # src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java # src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos1PublishInHandler.java # src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos2PublishInHandler.java # src/main/java/com/ss/mqtt/broker/model/SubscribeTopicFilter.java
2 parents da788c0 + d1a25a1 commit 87a2952

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1103
-366
lines changed

build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,15 @@ allprojects {
8686
tasks.withType(Test) {
8787
maxParallelForks = 2
8888
forkEvery = 100
89+
jvmArgs += "--enable-preview"
90+
}
91+
92+
tasks.withType(JavaCompile) {
93+
options.compilerArgs += "--enable-preview"
94+
}
95+
96+
tasks.withType(GroovyCompile) {
97+
options.forkOptions.jvmArgs += "--enable-preview"
8998
}
9099

91100
processResources {

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ private interface ChannelFactory extends
101101
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
102102
clientIdRegistry,
103103
authenticationService,
104-
mqttSessionService
104+
mqttSessionService,
105+
subscriptionService
105106
);
106107
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
107108
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
@@ -118,9 +119,14 @@ private interface ChannelFactory extends
118119
@Bean
119120
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
120121
@NotNull ClientIdRegistry clientIdRegistry,
121-
@NotNull MqttSessionService mqttSessionService
122+
@NotNull MqttSessionService mqttSessionService,
123+
@NotNull SubscriptionService subscriptionService
122124
) {
123-
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService);
125+
return new DefaultMqttClientReleaseHandler(
126+
clientIdRegistry,
127+
mqttSessionService,
128+
subscriptionService
129+
);
124130
}
125131

126132
@Bean
@@ -158,7 +164,7 @@ private interface ChannelFactory extends
158164

159165
@Bean
160166
@NotNull SubscriptionService subscriptionService() {
161-
return new SimpleSubscriptionService(new SimpleSubscriptions());
167+
return new SimpleSubscriptionService();
162168
}
163169

164170
@Bean
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.ss.mqtt.broker.exception;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class InconsistentSubscriptionStateException extends RuntimeException {
6+
7+
public InconsistentSubscriptionStateException(@NotNull String message) {
8+
super(message);
9+
}
10+
11+
public InconsistentSubscriptionStateException(@NotNull Throwable cause) {
12+
super(cause);
13+
}
14+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
package com.ss.mqtt.broker.exception;
22

3+
import lombok.NoArgsConstructor;
4+
5+
@NoArgsConstructor
36
public class MalformedPacketMqttException extends MqttException {}

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
55
import com.ss.mqtt.broker.service.ClientIdRegistry;
66
import com.ss.mqtt.broker.service.MqttSessionService;
7+
import com.ss.mqtt.broker.service.SubscriptionService;
78
import com.ss.rlib.common.util.StringUtils;
89
import lombok.RequiredArgsConstructor;
910
import lombok.extern.log4j.Log4j2;
@@ -17,6 +18,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
1718

1819
private final @NotNull ClientIdRegistry clientIdRegistry;
1920
private final @NotNull MqttSessionService sessionService;
21+
private final @NotNull SubscriptionService subscriptionService;
2022

2123
@Override
2224
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -40,9 +42,12 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
4042

4143
Mono<?> asyncActions = null;
4244

43-
if (session != null && client.getConnectionConfig().isSessionsEnabled()) {
44-
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
45-
client.setSession(null);
45+
if (session != null) {
46+
subscriptionService.cleanSubscriptions(client, session);
47+
if (client.getConnectionConfig().isSessionsEnabled()) {
48+
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
49+
client.setSession(null);
50+
}
4651
}
4752

4853
if (asyncActions != null) {

src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
44
import com.ss.mqtt.broker.service.ClientIdRegistry;
55
import com.ss.mqtt.broker.service.MqttSessionService;
6+
import com.ss.mqtt.broker.service.SubscriptionService;
67
import org.jetbrains.annotations.NotNull;
78

89
public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
910

1011
public DefaultMqttClientReleaseHandler(
1112
@NotNull ClientIdRegistry clientIdRegistry,
12-
@NotNull MqttSessionService sessionService
13+
@NotNull MqttSessionService sessionService,
14+
@NotNull SubscriptionService subscriptionService
1315
) {
14-
super(clientIdRegistry, sessionService);
16+
super(clientIdRegistry, sessionService, subscriptionService);
1517
}
1618
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1111
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1212
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
13-
import com.ss.mqtt.broker.service.AuthenticationService;
14-
import com.ss.mqtt.broker.service.ClientIdRegistry;
15-
import com.ss.mqtt.broker.service.MqttSessionService;
13+
import com.ss.mqtt.broker.service.*;
1614
import com.ss.rlib.common.util.StringUtils;
1715
import lombok.RequiredArgsConstructor;
1816
import lombok.extern.log4j.Log4j2;
@@ -26,6 +24,7 @@ public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie
2624
private final @NotNull ClientIdRegistry clientIdRegistry;
2725
private final @NotNull AuthenticationService authenticationService;
2826
private final @NotNull MqttSessionService mqttSessionService;
27+
private final @NotNull SubscriptionService subscriptionService;
2928

3029
@Override
3130
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -137,6 +136,8 @@ private Mono<Boolean> onConnected(
137136
packet.getReceiveMax()
138137
);
139138

139+
subscriptionService.restoreSubscriptions(client, session);
140+
140141
return Mono.fromFuture(client.sendWithFeedback(response)
141142
.thenApply(result -> {
142143

src/main/java/com/ss/mqtt/broker/handler/publish/in/AbstractPublishInHandler.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.QoS;
6+
import com.ss.mqtt.broker.model.Subscriber;
7+
import com.ss.mqtt.broker.network.client.MqttClient;
8+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
59
import com.ss.mqtt.broker.service.SubscriptionService;
610
import lombok.RequiredArgsConstructor;
711
import org.jetbrains.annotations.NotNull;
@@ -12,7 +16,30 @@ abstract class AbstractPublishInHandler implements PublishInHandler {
1216
protected final @NotNull SubscriptionService subscriptionService;
1317
protected final @NotNull PublishOutHandler[] publishOutHandlers;
1418

15-
protected @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
19+
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20+
handleResult(client, packet, subscriptionService.forEachTopicSubscriber(
21+
packet.getTopicName(),
22+
packet,
23+
this::sendToSubscriber
24+
));
25+
}
26+
27+
private @NotNull ActionResult sendToSubscriber(
28+
@NotNull Subscriber subscriber,
29+
@NotNull PublishInPacket packet
30+
) {
31+
return publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
32+
}
33+
34+
private @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
1635
return publishOutHandlers[qos.ordinal()];
1736
}
37+
38+
protected void handleResult(
39+
@NotNull MqttClient client,
40+
@NotNull PublishInPacket packet,
41+
@NotNull ActionResult result
42+
) {
43+
// nothing to do
44+
}
1845
}
Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4-
import com.ss.mqtt.broker.network.client.MqttClient;
5-
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
64
import com.ss.mqtt.broker.service.SubscriptionService;
75
import org.jetbrains.annotations.NotNull;
86

@@ -14,14 +12,4 @@ public Qos0PublishInHandler(
1412
) {
1513
super(subscriptionService, publishOutHandlers);
1614
}
17-
18-
@Override
19-
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20-
21-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
22-
23-
for (var subscriber : subscribers) {
24-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
25-
}
26-
}
2715
}

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos1PublishInHandler.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode;
56
import com.ss.mqtt.broker.network.client.MqttClient;
67
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
@@ -19,20 +20,40 @@ public Qos1PublishInHandler(
1920
@Override
2021
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
2122

22-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
23+
var session = client.getSession();
2324

24-
for (var subscriber : subscribers) {
25-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
25+
// it means this client was already closed
26+
if (session == null) {
27+
return;
2628
}
2729

28-
var reasonCode = subscribers.isEmpty() ?
29-
PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS : PublishAckReasonCode.SUCCESS;
30+
super.handle(client, packet);
31+
}
32+
33+
@Override
34+
protected void handleResult(
35+
@NotNull MqttClient client,
36+
@NotNull PublishInPacket packet,
37+
@NotNull ActionResult result
38+
) {
3039

31-
var ackPacket = client.getPacketOutFactory().newPublishAck(
40+
PublishAckReasonCode reasonCode;
41+
42+
switch (result) {
43+
case EMPTY:
44+
reasonCode = PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
45+
break;
46+
case SUCCESS:
47+
reasonCode = PublishAckReasonCode.SUCCESS;
48+
break;
49+
default:
50+
reasonCode = PublishAckReasonCode.UNSPECIFIED_ERROR;
51+
break;
52+
}
53+
54+
client.send(client.getPacketOutFactory().newPublishAck(
3255
packet.getPacketId(),
3356
reasonCode
34-
);
35-
36-
client.send(ackPacket);
57+
));
3758
}
3859
}

0 commit comments

Comments
 (0)