Skip to content

Commit a7ba024

Browse files
committed
continue working on reorganazing network
1 parent 2b861dc commit a7ba024

File tree

39 files changed

+728
-115
lines changed

39 files changed

+728
-115
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
import javasabr.mqtt.network.handler.PublishInHandler;
1313
import javasabr.mqtt.service.AuthenticationService;
1414
import javasabr.mqtt.service.ClientIdRegistry;
15+
import javasabr.mqtt.service.ConnectionService;
1516
import javasabr.mqtt.service.CredentialSource;
16-
import javasabr.mqtt.service.MqttConnectionService;
1717
import javasabr.mqtt.service.MqttSessionService;
18+
import javasabr.mqtt.service.PublishDeliveringService;
19+
import javasabr.mqtt.service.PublishReceivingService;
1820
import javasabr.mqtt.service.PublishingService;
1921
import javasabr.mqtt.service.SubscriptionService;
2022
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
23+
import javasabr.mqtt.service.impl.DefaultConnectionService;
2124
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
22-
import javasabr.mqtt.service.impl.DefaultMqttConnectionService;
25+
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
2326
import javasabr.mqtt.service.impl.DefaultPublishingService;
2427
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
2528
import javasabr.mqtt.service.impl.FileCredentialsSource;
@@ -30,12 +33,17 @@
3033
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
3134
import javasabr.mqtt.service.message.handler.impl.ConnectInMqttInMessageHandler;
3235
import javasabr.mqtt.service.message.handler.impl.DisconnectMqttInMessageHandler;
33-
import javasabr.mqtt.service.message.handler.impl.PendingResponseMqttInMessageHandler;
3436
import javasabr.mqtt.service.message.handler.impl.PublishAckMqttInMessageHandler;
3537
import javasabr.mqtt.service.message.handler.impl.PublishCompleteMqttInMessageHandler;
3638
import javasabr.mqtt.service.message.handler.impl.PublishMqttInMessageHandler;
3739
import javasabr.mqtt.service.message.handler.impl.PublishReceiveMqttInMessageHandler;
3840
import javasabr.mqtt.service.message.handler.impl.PublishReleaseMqttInMessageHandler;
41+
import javasabr.mqtt.service.message.handler.impl.SubscribeMqttInMessageHandler;
42+
import javasabr.mqtt.service.message.handler.impl.UnsubscribeMqttInMessageHandler;
43+
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
44+
import javasabr.mqtt.service.publish.handler.impl.Qos0PublishInMessageHandler;
45+
import javasabr.mqtt.service.publish.handler.impl.Qos1PublishInMessageHandler;
46+
import javasabr.mqtt.service.publish.handler.impl.Qos2PublishInMessageHandler;
3947
import javasabr.rlib.network.NetworkFactory;
4048
import javasabr.rlib.network.ServerNetworkConfig;
4149
import javasabr.rlib.network.server.ServerNetwork;
@@ -133,8 +141,50 @@ MqttInMessageHandler disconnectMqttInMessageHandler() {
133141
}
134142

135143
@Bean
136-
MqttConnectionService mqttConnectionService(Collection<? extends MqttInMessageHandler> inMessageHandlers) {
137-
return new DefaultMqttConnectionService(inMessageHandlers);
144+
MqttInMessageHandler subscribeMqttInMessageHandler(SubscriptionService subscriptionService) {
145+
return new SubscribeMqttInMessageHandler(subscriptionService);
146+
}
147+
148+
@Bean
149+
MqttInMessageHandler unsubscribeMqttInMessageHandler(SubscriptionService subscriptionService) {
150+
return new UnsubscribeMqttInMessageHandler(subscriptionService);
151+
}
152+
153+
@Bean
154+
ConnectionService mqttConnectionService(Collection<? extends MqttInMessageHandler> inMessageHandlers) {
155+
return new DefaultConnectionService(inMessageHandlers);
156+
}
157+
158+
@Bean
159+
PublishDeliveringService publishDeliveringService() {
160+
return null;
161+
}
162+
163+
@Bean
164+
MqttPublishInMessageHandler qos0PublishInMessageHandler(
165+
SubscriptionService subscriptionService,
166+
PublishDeliveringService publishDeliveringService) {
167+
return new Qos0PublishInMessageHandler(subscriptionService, publishDeliveringService);
168+
}
169+
170+
@Bean
171+
MqttPublishInMessageHandler qos1PublishInMessageHandler(
172+
SubscriptionService subscriptionService,
173+
PublishDeliveringService publishDeliveringService) {
174+
return new Qos1PublishInMessageHandler(subscriptionService, publishDeliveringService);
175+
}
176+
177+
@Bean
178+
MqttPublishInMessageHandler qos2PublishInMessageHandler(
179+
SubscriptionService subscriptionService,
180+
PublishDeliveringService publishDeliveringService) {
181+
return new Qos2PublishInMessageHandler(subscriptionService, publishDeliveringService);
182+
}
183+
184+
@Bean
185+
PublishReceivingService publishReceivingService(
186+
Collection<? extends MqttPublishInMessageHandler> knownPublishInHandlers) {
187+
return new DefaultPublishReceivingService(knownPublishInHandlers);
138188
}
139189

140190
@Bean
@@ -250,11 +300,11 @@ ServerNetwork<MqttConnection> externalNetwork(
250300
@Bean
251301
ApplicationListener<ApplicationStartedEvent> externalNetworkStarter(
252302
ServerNetwork<MqttConnection> externalNetwork,
253-
MqttConnectionService mqttConnectionService,
303+
ConnectionService connectionService,
254304
InetSocketAddress externalNetworkAddress) {
255305
return _ -> {
256306
externalNetwork.start(externalNetworkAddress);
257-
externalNetwork.onAccept(mqttConnectionService::processAcceptedConnection);
307+
externalNetwork.onAccept(connectionService::processAcceptedConnection);
258308
log.info(externalNetworkAddress, "Started external MQTT network by address:[%s]"::formatted);
259309
};
260310
}
Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
package javasabr.mqtt.model;
22

33
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
4+
import lombok.AccessLevel;
45
import lombok.Getter;
56
import lombok.RequiredArgsConstructor;
7+
import lombok.experimental.Accessors;
8+
import lombok.experimental.FieldDefaults;
69

710
@Getter
811
@RequiredArgsConstructor
12+
@Accessors(fluent = true, chain = false)
13+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
914
public enum QoS {
10-
AT_MOST_ONCE(SubscribeAckReasonCode.GRANTED_QOS_0),
11-
AT_LEAST_ONCE(SubscribeAckReasonCode.GRANTED_QOS_1),
12-
EXACTLY_ONCE(SubscribeAckReasonCode.GRANTED_QOS_2),
13-
INVALID(SubscribeAckReasonCode.IMPLEMENTATION_SPECIFIC_ERROR);
15+
AT_MOST_ONCE(0, SubscribeAckReasonCode.GRANTED_QOS_0),
16+
AT_LEAST_ONCE(1, SubscribeAckReasonCode.GRANTED_QOS_1),
17+
EXACTLY_ONCE(2, SubscribeAckReasonCode.GRANTED_QOS_2),
18+
INVALID(3, SubscribeAckReasonCode.IMPLEMENTATION_SPECIFIC_ERROR);
1419

1520
private static final QoS[] VALUES = values();
1621

@@ -22,5 +27,6 @@ public static QoS of(int level) {
2227
}
2328
}
2429

25-
private final SubscribeAckReasonCode subscribeAckReasonCode;
30+
int index;
31+
SubscribeAckReasonCode subscribeAckReasonCode;
2632
}

model/src/main/java/javasabr/mqtt/model/reason/code/PublishAckReasonCode.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,16 @@
22

33
import java.util.stream.Stream;
44
import javasabr.rlib.common.util.ObjectUtils;
5+
import lombok.AccessLevel;
56
import lombok.Getter;
67
import lombok.RequiredArgsConstructor;
8+
import lombok.experimental.Accessors;
9+
import lombok.experimental.FieldDefaults;
710

11+
@Getter
812
@RequiredArgsConstructor
13+
@Accessors(fluent = true, chain = false)
14+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
915
public enum PublishAckReasonCode {
1016
/**
1117
* The message is accepted. Publication of the QoS 1 message proceeds.
@@ -54,14 +60,14 @@ public enum PublishAckReasonCode {
5460

5561
static {
5662

57-
var maxId = Stream
63+
int maxValue = Stream
5864
.of(values())
59-
.mapToInt(PublishAckReasonCode::getValue)
65+
.mapToInt(PublishAckReasonCode::value)
6066
.map(value -> Byte.toUnsignedInt((byte) value))
6167
.max()
6268
.orElse(0);
6369

64-
var values = new PublishAckReasonCode[maxId + 1];
70+
var values = new PublishAckReasonCode[maxValue + 1];
6571

6672
for (var value : values()) {
6773
values[Byte.toUnsignedInt(value.value)] = value;
@@ -70,13 +76,12 @@ public enum PublishAckReasonCode {
7076
VALUES = values;
7177
}
7278

73-
public static PublishAckReasonCode of(int index) {
79+
public static PublishAckReasonCode ofValue(int value) {
7480
return ObjectUtils.notNull(
75-
VALUES[index],
76-
index,
81+
VALUES[value],
82+
value,
7783
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
7884
}
7985

80-
@Getter
81-
private final byte value;
86+
byte value;
8287
}

model/src/main/java/javasabr/mqtt/model/reason/code/PublishReceivedReasonCode.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@
22

33
import java.util.stream.Stream;
44
import javasabr.rlib.common.util.ObjectUtils;
5+
import lombok.AccessLevel;
56
import lombok.Getter;
67
import lombok.RequiredArgsConstructor;
8+
import lombok.experimental.Accessors;
9+
import lombok.experimental.FieldDefaults;
710

11+
@Getter
812
@RequiredArgsConstructor
13+
@Accessors(fluent = true, chain = false)
14+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
915
public enum PublishReceivedReasonCode {
1016

1117
/**
12-
* The message is accepted. Publication of the QoS 2 message proceeds..
18+
* The message is accepted. Publication of the QoS 2 message proceeds...
1319
*/
1420
SUCCESS((byte) 0x00),
1521
/**
@@ -52,14 +58,14 @@ public enum PublishReceivedReasonCode {
5258

5359
static {
5460

55-
var maxId = Stream
61+
int maxValue = Stream
5662
.of(values())
57-
.mapToInt(PublishReceivedReasonCode::getValue)
63+
.mapToInt(PublishReceivedReasonCode::value)
5864
.map(value -> Byte.toUnsignedInt((byte) value))
5965
.max()
6066
.orElse(0);
6167

62-
var values = new PublishReceivedReasonCode[maxId + 1];
68+
var values = new PublishReceivedReasonCode[maxValue + 1];
6369

6470
for (var value : values()) {
6571
values[Byte.toUnsignedInt(value.value)] = value;
@@ -68,13 +74,12 @@ public enum PublishReceivedReasonCode {
6874
VALUES = values;
6975
}
7076

71-
public static PublishReceivedReasonCode of(int index) {
77+
public static PublishReceivedReasonCode ofValue(int index) {
7278
return ObjectUtils.notNull(
7379
VALUES[index],
7480
index,
7581
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
7682
}
7783

78-
@Getter
79-
private final byte value;
84+
byte value;
8085
}

network/src/main/java/javasabr/mqtt/network/MqttSession.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ interface UnsafeMqttSession extends MqttSession {
1919
void onRestored();
2020
}
2121

22-
interface PendingPacketHandler {
22+
interface PendingMessageHandler {
2323

2424
/**
2525
* @return true if pending packet can be removed.
@@ -48,9 +48,9 @@ default void resend(MqttClient client, PublishInPacket packet, int packetId) {}
4848

4949
boolean hasOutPending(int packetId);
5050

51-
void registerOutPublish(PublishInPacket publish, PendingPacketHandler handler, int packetId);
51+
void registerOutPublish(PublishInPacket publish, PendingMessageHandler handler, int packetId);
5252

53-
void registerInPublish(PublishInPacket publish, PendingPacketHandler handler, int packetId);
53+
void registerInPublish(PublishInPacket publish, PendingMessageHandler handler, int packetId);
5454

5555
void updateOutPendingPacket(MqttClient client, HasPacketId response);
5656

network/src/main/java/javasabr/mqtt/network/impl/DefaultMqttSession.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ public class DefaultMqttSession implements UnsafeMqttSession {
3030
@AllArgsConstructor
3131
private static class PendingPublish {
3232
private final PublishInPacket publish;
33-
private final PendingPacketHandler handler;
33+
private final PendingMessageHandler handler;
3434
private final int packetId;
3535
}
3636

3737
private static void registerPublish(
3838
PublishInPacket publish,
39-
PendingPacketHandler handler,
39+
PendingMessageHandler handler,
4040
int packetId,
4141
LockableArray<PendingPublish> pendingPublishes) {
4242
PendingPublish pendingPublish = new PendingPublish(publish, handler, packetId);
@@ -114,12 +114,12 @@ public String clientId() {
114114
}
115115

116116
@Override
117-
public void registerOutPublish(PublishInPacket publish, PendingPacketHandler handler, int packetId) {
117+
public void registerOutPublish(PublishInPacket publish, PendingMessageHandler handler, int packetId) {
118118
registerPublish(publish, handler, packetId, pendingOutPublishes);
119119
}
120120

121121
@Override
122-
public void registerInPublish(PublishInPacket publish, PendingPacketHandler handler, int packetId) {
122+
public void registerInPublish(PublishInPacket publish, PendingMessageHandler handler, int packetId) {
123123
registerPublish(publish, handler, packetId, pendingInPublishes);
124124
}
125125

@@ -165,7 +165,7 @@ public void resendPendingPackets(MqttClient mqttClient) {
165165
.iterations()
166166
.forEach(
167167
mqttClient, (pendingPublish, client) -> {
168-
PendingPacketHandler handler = pendingPublish.handler;
168+
PendingMessageHandler handler = pendingPublish.handler;
169169
handler.resend(client, pendingPublish.publish, pendingPublish.packetId);
170170
});
171171
} finally {

network/src/main/java/javasabr/mqtt/network/packet/in/PublishAckInPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer)
7272

7373
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901123
7474
if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) {
75-
reasonCode = PublishAckReasonCode.of(readByteUnsigned(buffer));
75+
reasonCode = PublishAckReasonCode.ofValue(readByteUnsigned(buffer));
7676
}
7777
}
7878

network/src/main/java/javasabr/mqtt/network/packet/in/PublishReceivedInPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer)
6969

7070
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901143
7171
if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) {
72-
reasonCode = PublishReceivedReasonCode.of(readByteUnsigned(buffer));
72+
reasonCode = PublishReceivedReasonCode.ofValue(readByteUnsigned(buffer));
7373
}
7474
}
7575

service/src/main/java/javasabr/mqtt/service/MqttConnectionService.java renamed to service/src/main/java/javasabr/mqtt/service/ConnectionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import javasabr.mqtt.network.MqttConnection;
44

5-
public interface MqttConnectionService {
5+
public interface ConnectionService {
66

77
void processAcceptedConnection(MqttConnection connection);
88
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package javasabr.mqtt.service;
2+
3+
import javasabr.mqtt.model.ActionResult;
4+
import javasabr.mqtt.model.subscriber.SingleSubscriber;
5+
import javasabr.mqtt.network.packet.in.PublishInPacket;
6+
7+
public interface PublishDeliveringService {
8+
9+
ActionResult startDelivering(PublishInPacket publish, SingleSubscriber subscriber);
10+
}

0 commit comments

Comments
 (0)