Skip to content

Commit 6b7f3d1

Browse files
committed
continue rework network part
1 parent d7fd657 commit 6b7f3d1

31 files changed

+313
-205
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerConfig.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
import javasabr.mqtt.service.AuthenticationService;
55
import javasabr.mqtt.service.ClientIdRegistry;
66
import javasabr.mqtt.service.CredentialSource;
7-
import javasabr.mqtt.service.MqttSessionService;
7+
import javasabr.mqtt.service.SessionService;
88
import javasabr.mqtt.service.SubscriptionService;
99
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
1010
import javasabr.mqtt.service.impl.FileCredentialsSource;
1111
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
12-
import javasabr.mqtt.service.impl.InMemoryMqttSessionService;
12+
import javasabr.mqtt.service.impl.InMemorySessionService;
1313
import javasabr.mqtt.service.impl.SimpleAuthenticationService;
1414
import javasabr.mqtt.service.impl.SimpleSubscriptionService;
1515
import lombok.RequiredArgsConstructor;
@@ -35,8 +35,8 @@ ClientIdRegistry clientIdRegistry() {
3535
}
3636

3737
@Bean
38-
MqttSessionService mqttSessionService() {
39-
return new InMemoryMqttSessionService(env.getProperty("sessions.clean.thread.interval", int.class, 60000));
38+
SessionService mqttSessionService() {
39+
return new InMemorySessionService(env.getProperty("sessions.clean.thread.interval", int.class, 60000));
4040
}
4141

4242
@Bean
@@ -54,9 +54,9 @@ AuthenticationService authenticationService(CredentialSource credentialSource) {
5454
@Bean
5555
MqttClientReleaseHandler mqttClientReleaseHandler(
5656
ClientIdRegistry clientIdRegistry,
57-
MqttSessionService mqttSessionService,
57+
SessionService sessionService,
5858
SubscriptionService subscriptionService) {
59-
return new ExternalMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, subscriptionService);
59+
return new ExternalMqttClientReleaseHandler(clientIdRegistry, sessionService, subscriptionService);
6060
}
6161

6262
@Bean

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,21 @@
1313
import javasabr.mqtt.service.ClientIdRegistry;
1414
import javasabr.mqtt.service.ConnectionService;
1515
import javasabr.mqtt.service.CredentialSource;
16-
import javasabr.mqtt.service.MqttSessionService;
16+
import javasabr.mqtt.service.MessageOutFactoryService;
17+
import javasabr.mqtt.service.SessionService;
1718
import javasabr.mqtt.service.PublishDeliveringService;
1819
import javasabr.mqtt.service.PublishReceivingService;
1920
import javasabr.mqtt.service.SubscriptionService;
2021
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
2122
import javasabr.mqtt.service.impl.DefaultConnectionService;
2223
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
24+
import javasabr.mqtt.service.impl.DefaultMessageOutFactoryService;
2325
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
2426
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
2527
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
2628
import javasabr.mqtt.service.impl.FileCredentialsSource;
2729
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
28-
import javasabr.mqtt.service.impl.InMemoryMqttSessionService;
30+
import javasabr.mqtt.service.impl.InMemorySessionService;
2931
import javasabr.mqtt.service.impl.SimpleAuthenticationService;
3032
import javasabr.mqtt.service.impl.SimpleSubscriptionService;
3133
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
@@ -38,6 +40,9 @@
3840
import javasabr.mqtt.service.message.handler.impl.PublishReleaseMqttInMessageHandler;
3941
import javasabr.mqtt.service.message.handler.impl.SubscribeMqttInMessageHandler;
4042
import javasabr.mqtt.service.message.handler.impl.UnsubscribeMqttInMessageHandler;
43+
import javasabr.mqtt.service.message.out.factory.Mqtt311MessageOutFactory;
44+
import javasabr.mqtt.service.message.out.factory.Mqtt5MessageOutFactory;
45+
import javasabr.mqtt.service.message.out.factory.MessageOutFactory;
4146
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
4247
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
4348
import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishInMessageHandler;
@@ -71,9 +76,9 @@ ClientIdRegistry clientIdRegistry(Environment env) {
7176
}
7277

7378
@Bean
74-
MqttSessionService mqttSessionService(
79+
SessionService mqttSessionService(
7580
@Value("${sessions.clean.thread.interval:60000}") int cleanInterval) {
76-
return new InMemoryMqttSessionService(cleanInterval);
81+
return new InMemorySessionService(cleanInterval);
7782
}
7883

7984
@Bean
@@ -94,17 +99,35 @@ SubscriptionService subscriptionService() {
9499
return new SimpleSubscriptionService();
95100
}
96101

102+
@Bean
103+
MessageOutFactory mqtt311MessageOutFactory() {
104+
return new Mqtt311MessageOutFactory();
105+
}
106+
107+
@Bean
108+
MessageOutFactory mqtt5MessageOutFactory() {
109+
return new Mqtt5MessageOutFactory();
110+
}
111+
112+
@Bean
113+
MessageOutFactoryService mqttMessageOutFactoryService(
114+
Collection<? extends MessageOutFactory> knownFactories) {
115+
return new DefaultMessageOutFactoryService(knownFactories);
116+
}
117+
97118
@Bean
98119
MqttInMessageHandler connectInMqttInMessageHandler(
99120
ClientIdRegistry clientIdRegistry,
100121
AuthenticationService authenticationService,
101-
MqttSessionService mqttSessionService,
102-
SubscriptionService subscriptionService) {
122+
SessionService sessionService,
123+
SubscriptionService subscriptionService,
124+
MessageOutFactoryService messageOutFactoryService) {
103125
return new ConnectInMqttInMessageHandler(
104126
clientIdRegistry,
105127
authenticationService,
106-
mqttSessionService,
107-
subscriptionService);
128+
sessionService,
129+
subscriptionService,
130+
messageOutFactoryService);
108131
}
109132

110133
@Bean
@@ -138,13 +161,17 @@ MqttInMessageHandler disconnectMqttInMessageHandler() {
138161
}
139162

140163
@Bean
141-
MqttInMessageHandler subscribeMqttInMessageHandler(SubscriptionService subscriptionService) {
142-
return new SubscribeMqttInMessageHandler(subscriptionService);
164+
MqttInMessageHandler subscribeMqttInMessageHandler(
165+
SubscriptionService subscriptionService,
166+
MessageOutFactoryService messageOutFactoryService) {
167+
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService);
143168
}
144169

145170
@Bean
146-
MqttInMessageHandler unsubscribeMqttInMessageHandler(SubscriptionService subscriptionService) {
147-
return new UnsubscribeMqttInMessageHandler(subscriptionService);
171+
MqttInMessageHandler unsubscribeMqttInMessageHandler(
172+
SubscriptionService subscriptionService,
173+
MessageOutFactoryService messageOutFactoryService) {
174+
return new UnsubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService);
148175
}
149176

150177
@Bean
@@ -153,18 +180,24 @@ ConnectionService mqttConnectionService(Collection<? extends MqttInMessageHandle
153180
}
154181

155182
@Bean
156-
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(SubscriptionService subscriptionService) {
157-
return new Qos0MqttPublishOutMessageHandler(subscriptionService);
183+
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(
184+
SubscriptionService subscriptionService,
185+
MessageOutFactoryService messageOutFactoryService) {
186+
return new Qos0MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
158187
}
159188

160189
@Bean
161-
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(SubscriptionService subscriptionService) {
162-
return new Qos1MqttPublishOutMessageHandler(subscriptionService);
190+
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(
191+
SubscriptionService subscriptionService,
192+
MessageOutFactoryService messageOutFactoryService) {
193+
return new Qos1MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
163194
}
164195

165196
@Bean
166-
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(SubscriptionService subscriptionService) {
167-
return new Qos2MqttPublishOutMessageHandler(subscriptionService);
197+
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(
198+
SubscriptionService subscriptionService,
199+
MessageOutFactoryService messageOutFactoryService) {
200+
return new Qos2MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
168201
}
169202

170203
@Bean
@@ -183,15 +216,23 @@ MqttPublishInMessageHandler qos0MqttPublishInMessageHandler(
183216
@Bean
184217
MqttPublishInMessageHandler qos1MqttPublishInMessageHandler(
185218
SubscriptionService subscriptionService,
186-
PublishDeliveringService publishDeliveringService) {
187-
return new Qos1MqttPublishInMessageHandler(subscriptionService, publishDeliveringService);
219+
PublishDeliveringService publishDeliveringService,
220+
MessageOutFactoryService messageOutFactoryService) {
221+
return new Qos1MqttPublishInMessageHandler(
222+
subscriptionService,
223+
publishDeliveringService,
224+
messageOutFactoryService);
188225
}
189226

190227
@Bean
191228
MqttPublishInMessageHandler qos2MqttPublishInMessageHandler(
192229
SubscriptionService subscriptionService,
193-
PublishDeliveringService publishDeliveringService) {
194-
return new Qos2MqttPublishInMessageHandler(subscriptionService, publishDeliveringService);
230+
PublishDeliveringService publishDeliveringService,
231+
MessageOutFactoryService messageOutFactoryService) {
232+
return new Qos2MqttPublishInMessageHandler(
233+
subscriptionService,
234+
publishDeliveringService,
235+
messageOutFactoryService);
195236
}
196237

197238
@Bean
@@ -203,9 +244,9 @@ PublishReceivingService publishReceivingService(
203244
@Bean
204245
MqttClientReleaseHandler externalMqttClientReleaseHandler(
205246
ClientIdRegistry clientIdRegistry,
206-
MqttSessionService mqttSessionService,
247+
SessionService sessionService,
207248
SubscriptionService subscriptionService) {
208-
return new ExternalMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, subscriptionService);
249+
return new ExternalMqttClientReleaseHandler(clientIdRegistry, sessionService, subscriptionService);
209250
}
210251

211252
@Bean

application/src/main/java/javasabr/mqtt/broker/application/config/MqttNetworkConfig.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -128,24 +128,6 @@ InetSocketAddress internalNetworkAddress(
128128
return address;
129129
}
130130

131-
@Bean
132-
Consumer<MqttConnection> externalConnectionConsumer() {
133-
return mqttConnection -> {
134-
log.info(mqttConnection.remoteAddress(), "[%s] Accepted external connection"::formatted);
135-
var client = (UnsafeMqttClient) mqttConnection.client();
136-
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
137-
};
138-
}
139-
140-
@Bean
141-
Consumer<MqttConnection> internalConnectionConsumer() {
142-
return mqttConnection -> {
143-
log.info(mqttConnection.remoteAddress(), "[%s] Accepted internal connection"::formatted);
144-
var client = (UnsafeMqttClient) mqttConnection.client();
145-
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
146-
};
147-
}
148-
149131
@Bean
150132
MqttServerConnectionConfig externalConnectionConfig() {
151133
return new MqttServerConnectionConfig(

application/src/test/groovy/javasabr/mqtt/application/integration/PublishRetryTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ import javasabr.mqtt.network.packet.in.PublishInPacket
1212
import javasabr.mqtt.network.packet.in.PublishReleaseInPacket
1313
import javasabr.mqtt.network.packet.in.SubscribeAckInPacket
1414
import javasabr.mqtt.network.packet.out.*
15-
import javasabr.mqtt.service.MqttSessionService
15+
import javasabr.mqtt.service.SessionService
1616
import javasabr.rlib.collections.array.Array
1717
import org.springframework.beans.factory.annotation.Autowired
1818

1919
class PublishRetryTest extends IntegrationSpecification {
2020

2121
@Autowired
22-
MqttSessionService mqttSessionService
22+
SessionService mqttSessionService
2323

2424
def "mqtt 3.1.1 client should be generate session with one pending QoS 1 packet"() {
2525
given:
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@ package javasabr.mqtt.application.integration.service
33
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode
44
import javasabr.mqtt.application.integration.IntegrationSpecification
55
import javasabr.mqtt.service.ClientIdRegistry
6-
import javasabr.mqtt.service.MqttSessionService
6+
import javasabr.mqtt.service.SessionService
77
import org.springframework.beans.factory.annotation.Autowired
88

9-
class MqttSessionServiceTest extends IntegrationSpecification {
9+
class SessionServiceTest extends IntegrationSpecification {
1010

1111
@Autowired
1212
ClientIdRegistry clientIdRegistry
1313

1414
@Autowired
15-
MqttSessionService mqttSessionService
15+
SessionService mqttSessionService
1616

1717
def "subscriber should create and re-use mqtt session"() {
1818
given:

network/src/main/java/javasabr/mqtt/network/MqttClient.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import java.util.concurrent.CompletableFuture;
44
import javasabr.mqtt.model.MqttClientConnectionConfig;
55
import javasabr.mqtt.model.MqttUser;
6-
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
7-
import javasabr.mqtt.network.out.MqttPacketOutFactory;
8-
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
6+
import javasabr.mqtt.network.packet.out.ConnectAck311OutPacket;
97
import javasabr.mqtt.network.packet.out.MqttWritablePacket;
108
import org.jspecify.annotations.Nullable;
119
import reactor.core.publisher.Mono;
@@ -16,19 +14,15 @@ interface UnsafeMqttClient extends MqttClient {
1614

1715
MqttConnection connection();
1816

19-
void handle(MqttReadablePacket packet);
20-
2117
void clientId(String clientId);
2218

2319
void session(@Nullable MqttSession session);
2420

25-
void reject(ConnectAckReasonCode reasonCode);
21+
void reject(ConnectAck311OutPacket connectAsk);
2622

2723
Mono<?> release();
2824
}
2925

30-
MqttPacketOutFactory packetOutFactory();
31-
3226
String clientId();
3327

3428
@Nullable

network/src/main/java/javasabr/mqtt/network/client/AbstractMqttClient.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,11 @@
44
import java.util.concurrent.atomic.AtomicBoolean;
55
import javasabr.mqtt.base.utils.DebugUtils;
66
import javasabr.mqtt.model.MqttClientConnectionConfig;
7-
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
87
import javasabr.mqtt.network.MqttClient.UnsafeMqttClient;
98
import javasabr.mqtt.network.MqttConnection;
109
import javasabr.mqtt.network.MqttSession;
1110
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
12-
import javasabr.mqtt.network.handler.PacketInHandler;
13-
import javasabr.mqtt.network.out.MqttPacketOutFactories;
14-
import javasabr.mqtt.network.out.MqttPacketOutFactory;
15-
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
11+
import javasabr.mqtt.network.packet.out.ConnectAck311OutPacket;
1612
import javasabr.mqtt.network.packet.out.MqttWritablePacket;
1713
import lombok.AccessLevel;
1814
import lombok.CustomLog;
@@ -52,17 +48,6 @@ public AbstractMqttClient(MqttConnection connection, MqttClientReleaseHandler re
5248
this.clientId = connection.remoteAddress();
5349
}
5450

55-
@Override
56-
public void handle(MqttReadablePacket packet) {
57-
log.debug(clientId, packet.name(), packet, "[%s] Received packet:[%s] %s"::formatted);
58-
PacketInHandler packetHandler = null;//connection.packetHandlers()[packet.packetType()];
59-
if (packetHandler != null) {
60-
packetHandler.handle(this, packet);
61-
} else {
62-
log.warning(clientId, packet.name(), packet, "[%s] No packet handler for packet:[%s] %s"::formatted);
63-
}
64-
}
65-
6651
@Override
6752
public void send(MqttWritablePacket packet) {
6853
log.debug(clientId, packet.name(), packet, "[%s] Send to client packet:[%s] %s"::formatted);
@@ -75,19 +60,13 @@ public CompletableFuture<Boolean> sendWithFeedback(MqttWritablePacket packet) {
7560
return connection.sendWithFeedback(packet);
7661
}
7762

78-
public void reject(ConnectAckReasonCode reasonCode) {
63+
@Override
64+
public void reject(ConnectAck311OutPacket connectAsk) {
7965
connection
80-
.sendWithFeedback(packetOutFactory().newConnectAck(this, reasonCode))
66+
.sendWithFeedback(connectAsk)
8167
.thenAccept(_ -> connection.close());
8268
}
8369

84-
@Override
85-
public MqttPacketOutFactory packetOutFactory() {
86-
return MqttPacketOutFactories.of(connection
87-
.clientConnectionConfig()
88-
.mqttVersion());
89-
}
90-
9170
@Override
9271
public Mono<?> release() {
9372
if (released.compareAndSet(false, true)) {

network/src/main/java/javasabr/mqtt/network/handler/PacketInHandler.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

network/src/main/java/javasabr/mqtt/network/handler/client/package-info.java

Lines changed: 0 additions & 4 deletions
This file was deleted.

network/src/main/java/javasabr/mqtt/network/handler/packet/in/package-info.java

Lines changed: 0 additions & 4 deletions
This file was deleted.

0 commit comments

Comments
 (0)