Skip to content

Commit 2b861dc

Browse files
committed
continue working on reorganazing network
1 parent 38c5e45 commit 2b861dc

File tree

10 files changed

+171
-1
lines changed

10 files changed

+171
-1
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@
99
import javasabr.mqtt.network.MqttConnection;
1010
import javasabr.mqtt.network.MqttConnectionFactory;
1111
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
12+
import javasabr.mqtt.network.handler.PublishInHandler;
1213
import javasabr.mqtt.service.AuthenticationService;
1314
import javasabr.mqtt.service.ClientIdRegistry;
1415
import javasabr.mqtt.service.CredentialSource;
1516
import javasabr.mqtt.service.MqttConnectionService;
1617
import javasabr.mqtt.service.MqttSessionService;
18+
import javasabr.mqtt.service.PublishingService;
1719
import javasabr.mqtt.service.SubscriptionService;
1820
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
1921
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
2022
import javasabr.mqtt.service.impl.DefaultMqttConnectionService;
23+
import javasabr.mqtt.service.impl.DefaultPublishingService;
2124
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
2225
import javasabr.mqtt.service.impl.FileCredentialsSource;
2326
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
@@ -26,6 +29,13 @@
2629
import javasabr.mqtt.service.impl.SimpleSubscriptionService;
2730
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
2831
import javasabr.mqtt.service.message.handler.impl.ConnectInMqttInMessageHandler;
32+
import javasabr.mqtt.service.message.handler.impl.DisconnectMqttInMessageHandler;
33+
import javasabr.mqtt.service.message.handler.impl.PendingResponseMqttInMessageHandler;
34+
import javasabr.mqtt.service.message.handler.impl.PublishAckMqttInMessageHandler;
35+
import javasabr.mqtt.service.message.handler.impl.PublishCompleteMqttInMessageHandler;
36+
import javasabr.mqtt.service.message.handler.impl.PublishMqttInMessageHandler;
37+
import javasabr.mqtt.service.message.handler.impl.PublishReceiveMqttInMessageHandler;
38+
import javasabr.mqtt.service.message.handler.impl.PublishReleaseMqttInMessageHandler;
2939
import javasabr.rlib.network.NetworkFactory;
3040
import javasabr.rlib.network.ServerNetworkConfig;
3141
import javasabr.rlib.network.server.ServerNetwork;
@@ -74,6 +84,11 @@ SubscriptionService subscriptionService() {
7484
return new SimpleSubscriptionService();
7585
}
7686

87+
@Bean
88+
PublishingService publishingService() {
89+
return new DefaultPublishingService(new PublishInHandler[0]);
90+
}
91+
7792
@Bean
7893
MqttInMessageHandler connectInMqttInMessageHandler(
7994
ClientIdRegistry clientIdRegistry,
@@ -87,6 +102,36 @@ MqttInMessageHandler connectInMqttInMessageHandler(
87102
subscriptionService);
88103
}
89104

105+
@Bean
106+
MqttInMessageHandler publishAckMqttInMessageHandler() {
107+
return new PublishAckMqttInMessageHandler();
108+
}
109+
110+
@Bean
111+
MqttInMessageHandler publishCompleteMqttInMessageHandler() {
112+
return new PublishCompleteMqttInMessageHandler();
113+
}
114+
115+
@Bean
116+
MqttInMessageHandler publishMqttInMessageHandler(PublishingService publishingService) {
117+
return new PublishMqttInMessageHandler(publishingService);
118+
}
119+
120+
@Bean
121+
MqttInMessageHandler publishReceiveMqttInMessageHandler() {
122+
return new PublishReceiveMqttInMessageHandler();
123+
}
124+
125+
@Bean
126+
MqttInMessageHandler publishReleaseMqttInMessageHandler() {
127+
return new PublishReleaseMqttInMessageHandler();
128+
}
129+
130+
@Bean
131+
MqttInMessageHandler disconnectMqttInMessageHandler() {
132+
return new DisconnectMqttInMessageHandler();
133+
}
134+
90135
@Bean
91136
MqttConnectionService mqttConnectionService(Collection<? extends MqttInMessageHandler> inMessageHandlers) {
92137
return new DefaultMqttConnectionService(inMessageHandlers);

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[versions]
22
# https://gitlab.com/JavaSaBr/maven-repo/-/packages
3-
rlib = "10.0.alpha4"
3+
rlib = "10.0.alpha5"
44
# https://mvnrepository.com/artifact/org.jetbrains/annotations
55
jetbrains-annotations = "26.0.2"
66
# https://mvnrepository.com/artifact/org.projectlombok/lombok

service/src/main/java/javasabr/mqtt/service/impl/DefaultMqttConnectionService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public DefaultMqttConnectionService(Collection<? extends MqttInMessageHandler> k
3131
}
3232

3333
this.inMessageHandlers = inMessageHandlers;
34+
log.info(knownInMessageHandlers.size(), "Registered [%s] MqttInMessageHandlers"::formatted);
3435
}
3536

3637
@Override
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package javasabr.mqtt.service.message.handler.impl;
2+
3+
import javasabr.mqtt.network.MqttConnection;
4+
import javasabr.mqtt.network.MqttSession;
5+
import javasabr.mqtt.network.client.ExternalMqttClient;
6+
import javasabr.mqtt.network.packet.HasPacketId;
7+
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
8+
9+
public abstract class PendingResponseMqttInMessageHandler<P extends MqttReadablePacket & HasPacketId>
10+
extends AbstractMqttInMessageHandler<ExternalMqttClient, P> {
11+
12+
protected PendingResponseMqttInMessageHandler(Class<P> expectedNetworkPacket) {
13+
super(ExternalMqttClient.class, expectedNetworkPacket);
14+
}
15+
16+
@Override
17+
protected void processReceived(MqttConnection connection, ExternalMqttClient client, P networkPacket) {
18+
MqttSession session = client.session();
19+
if (session != null) {
20+
session.updateOutPendingPacket(client, networkPacket);
21+
}
22+
}
23+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package javasabr.mqtt.service.message.handler.impl;
2+
3+
import javasabr.mqtt.network.packet.MqttPacketType;
4+
import javasabr.mqtt.network.packet.in.PublishAckInPacket;
5+
6+
public class PublishAckMqttInMessageHandler extends PendingResponseMqttInMessageHandler<PublishAckInPacket> {
7+
8+
public PublishAckMqttInMessageHandler() {
9+
super(PublishAckInPacket.class);
10+
}
11+
12+
@Override
13+
public int packetType() {
14+
return MqttPacketType.PUBLISH_ACK.typeIndex();
15+
}
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package javasabr.mqtt.service.message.handler.impl;
2+
3+
import javasabr.mqtt.network.packet.MqttPacketType;
4+
import javasabr.mqtt.network.packet.in.PublishCompleteInPacket;
5+
6+
public class PublishCompleteMqttInMessageHandler extends PendingResponseMqttInMessageHandler<PublishCompleteInPacket> {
7+
8+
public PublishCompleteMqttInMessageHandler() {
9+
super(PublishCompleteInPacket.class);
10+
}
11+
12+
@Override
13+
public int packetType() {
14+
return MqttPacketType.PUBLISH_COMPLETED.typeIndex();
15+
}
16+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package javasabr.mqtt.service.message.handler.impl;
2+
3+
import javasabr.mqtt.network.MqttConnection;
4+
import javasabr.mqtt.network.client.ExternalMqttClient;
5+
import javasabr.mqtt.network.packet.MqttPacketType;
6+
import javasabr.mqtt.network.packet.in.PublishInPacket;
7+
import javasabr.mqtt.service.PublishingService;
8+
import lombok.AccessLevel;
9+
import lombok.experimental.FieldDefaults;
10+
11+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
12+
public class PublishMqttInMessageHandler extends AbstractMqttInMessageHandler<ExternalMqttClient, PublishInPacket> {
13+
14+
PublishingService publishingService;
15+
16+
public PublishMqttInMessageHandler(PublishingService publishingService) {
17+
super(ExternalMqttClient.class, PublishInPacket.class);
18+
this.publishingService = publishingService;
19+
}
20+
21+
@Override
22+
protected void processReceived(
23+
MqttConnection connection,
24+
ExternalMqttClient client,
25+
PublishInPacket networkPacket) {
26+
publishingService.publish(client, networkPacket);
27+
}
28+
29+
@Override
30+
public int packetType() {
31+
return MqttPacketType.PUBLISH.typeIndex();
32+
}
33+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package javasabr.mqtt.service.message.handler.impl;
2+
3+
import javasabr.mqtt.network.packet.MqttPacketType;
4+
import javasabr.mqtt.network.packet.in.PublishReceivedInPacket;
5+
6+
public class PublishReceiveMqttInMessageHandler extends PendingResponseMqttInMessageHandler<PublishReceivedInPacket> {
7+
8+
public PublishReceiveMqttInMessageHandler() {
9+
super(PublishReceivedInPacket.class);
10+
}
11+
12+
@Override
13+
public int packetType() {
14+
return MqttPacketType.PUBLISH_RECEIVED.typeIndex();
15+
}
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package javasabr.mqtt.service.message.handler.impl;
2+
3+
import javasabr.mqtt.network.packet.MqttPacketType;
4+
import javasabr.mqtt.network.packet.in.PublishReleaseInPacket;
5+
6+
public class PublishReleaseMqttInMessageHandler extends PendingResponseMqttInMessageHandler<PublishReleaseInPacket> {
7+
8+
public PublishReleaseMqttInMessageHandler() {
9+
super(PublishReleaseInPacket.class);
10+
}
11+
12+
@Override
13+
public int packetType() {
14+
return MqttPacketType.PUBLISH_RELEASED.typeIndex();
15+
}
16+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@NullMarked
2+
package javasabr.mqtt.service.message.handler.impl;
3+
4+
import org.jspecify.annotations.NullMarked;

0 commit comments

Comments
 (0)