Skip to content

Commit d5ad2fa

Browse files
committed
continue working on reorganazing network
1 parent a7ba024 commit d5ad2fa

15 files changed

+321
-26
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
2323
import javasabr.mqtt.service.impl.DefaultConnectionService;
2424
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
25+
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
2526
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
2627
import javasabr.mqtt.service.impl.DefaultPublishingService;
2728
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
@@ -41,9 +42,13 @@
4142
import javasabr.mqtt.service.message.handler.impl.SubscribeMqttInMessageHandler;
4243
import javasabr.mqtt.service.message.handler.impl.UnsubscribeMqttInMessageHandler;
4344
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
44-
import javasabr.mqtt.service.publish.handler.impl.Qos0PublishInMessageHandler;
45-
import javasabr.mqtt.service.publish.handler.impl.Qos1PublishInMessageHandler;
46-
import javasabr.mqtt.service.publish.handler.impl.Qos2PublishInMessageHandler;
45+
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
46+
import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishInMessageHandler;
47+
import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishOutMessageHandler;
48+
import javasabr.mqtt.service.publish.handler.impl.Qos1MqttPublishInMessageHandler;
49+
import javasabr.mqtt.service.publish.handler.impl.Qos1MqttPublishOutMessageHandler;
50+
import javasabr.mqtt.service.publish.handler.impl.Qos2MqttPublishInMessageHandler;
51+
import javasabr.mqtt.service.publish.handler.impl.Qos2MqttPublishOutMessageHandler;
4752
import javasabr.rlib.network.NetworkFactory;
4853
import javasabr.rlib.network.ServerNetworkConfig;
4954
import javasabr.rlib.network.server.ServerNetwork;
@@ -156,29 +161,45 @@ ConnectionService mqttConnectionService(Collection<? extends MqttInMessageHandle
156161
}
157162

158163
@Bean
159-
PublishDeliveringService publishDeliveringService() {
160-
return null;
164+
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(SubscriptionService subscriptionService) {
165+
return new Qos0MqttPublishOutMessageHandler(subscriptionService);
161166
}
162167

163168
@Bean
164-
MqttPublishInMessageHandler qos0PublishInMessageHandler(
169+
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(SubscriptionService subscriptionService) {
170+
return new Qos1MqttPublishOutMessageHandler(subscriptionService);
171+
}
172+
173+
@Bean
174+
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(SubscriptionService subscriptionService) {
175+
return new Qos2MqttPublishOutMessageHandler(subscriptionService);
176+
}
177+
178+
@Bean
179+
PublishDeliveringService publishDeliveringService(
180+
Collection<? extends MqttPublishOutMessageHandler> knownPublishOutHandlers) {
181+
return new DefaultPublishDeliveringService(knownPublishOutHandlers);
182+
}
183+
184+
@Bean
185+
MqttPublishInMessageHandler qos0MqttPublishInMessageHandler(
165186
SubscriptionService subscriptionService,
166187
PublishDeliveringService publishDeliveringService) {
167-
return new Qos0PublishInMessageHandler(subscriptionService, publishDeliveringService);
188+
return new Qos0MqttPublishInMessageHandler(subscriptionService, publishDeliveringService);
168189
}
169190

170191
@Bean
171-
MqttPublishInMessageHandler qos1PublishInMessageHandler(
192+
MqttPublishInMessageHandler qos1MqttPublishInMessageHandler(
172193
SubscriptionService subscriptionService,
173194
PublishDeliveringService publishDeliveringService) {
174-
return new Qos1PublishInMessageHandler(subscriptionService, publishDeliveringService);
195+
return new Qos1MqttPublishInMessageHandler(subscriptionService, publishDeliveringService);
175196
}
176197

177198
@Bean
178-
MqttPublishInMessageHandler qos2PublishInMessageHandler(
199+
MqttPublishInMessageHandler qos2MqttPublishInMessageHandler(
179200
SubscriptionService subscriptionService,
180201
PublishDeliveringService publishDeliveringService) {
181-
return new Qos2PublishInMessageHandler(subscriptionService, publishDeliveringService);
202+
return new Qos2MqttPublishInMessageHandler(subscriptionService, publishDeliveringService);
182203
}
183204

184205
@Bean
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package javasabr.mqtt.service;
22

3-
import javasabr.mqtt.model.ActionResult;
43
import javasabr.mqtt.model.subscriber.SingleSubscriber;
54
import javasabr.mqtt.network.packet.in.PublishInPacket;
5+
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
66

77
public interface PublishDeliveringService {
88

9-
ActionResult startDelivering(PublishInPacket publish, SingleSubscriber subscriber);
9+
PublishHandlingResult startDelivering(PublishInPacket publish, SingleSubscriber subscriber);
1010
}

service/src/main/java/javasabr/mqtt/service/SubscriptionService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
77
import javasabr.mqtt.model.subscriber.SingleSubscriber;
88
import javasabr.mqtt.model.subscriber.SubscribeTopicFilter;
9+
import javasabr.mqtt.model.subscriber.Subscriber;
910
import javasabr.mqtt.model.topic.TopicFilter;
1011
import javasabr.mqtt.model.topic.TopicName;
1112
import javasabr.mqtt.network.MqttClient;
@@ -20,6 +21,8 @@ public interface SubscriptionService {
2021

2122
boolean isValid(TopicName topicName);
2223

24+
MqttClient resolveClient(Subscriber subscriber);
25+
2326
default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
2427
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
2528
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package javasabr.mqtt.service.impl;
2+
3+
import java.util.Collection;
4+
import javasabr.mqtt.model.QoS;
5+
import javasabr.mqtt.model.subscriber.SingleSubscriber;
6+
import javasabr.mqtt.network.packet.in.PublishInPacket;
7+
import javasabr.mqtt.service.PublishDeliveringService;
8+
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
9+
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
10+
import lombok.AccessLevel;
11+
import lombok.CustomLog;
12+
import lombok.experimental.FieldDefaults;
13+
import org.jspecify.annotations.Nullable;
14+
15+
@CustomLog
16+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
17+
public class DefaultPublishDeliveringService implements PublishDeliveringService {
18+
19+
@Nullable
20+
MqttPublishOutMessageHandler[] publishOutMessageHandlers;
21+
22+
public DefaultPublishDeliveringService(
23+
Collection<? extends MqttPublishOutMessageHandler> knownPublishOutHandlers) {
24+
25+
int maxIndex = knownPublishOutHandlers
26+
.stream()
27+
.map(MqttPublishOutMessageHandler::qos)
28+
.mapToInt(QoS::index)
29+
.max()
30+
.orElse(0);
31+
32+
var publishOutHandlers = new MqttPublishOutMessageHandler[maxIndex + 1];
33+
34+
for (MqttPublishOutMessageHandler knownPublishOutHandler : knownPublishOutHandlers) {
35+
QoS qos = knownPublishOutHandler.qos();
36+
if (publishOutHandlers[qos.index()] != null) {
37+
throw new IllegalArgumentException(
38+
"Found duplicate MqttPublishOutMessageHandler:[" + knownPublishOutHandler + "]");
39+
}
40+
publishOutHandlers[qos.index()] = knownPublishOutHandler;
41+
}
42+
43+
this.publishOutMessageHandlers = publishOutHandlers;
44+
}
45+
46+
@Override
47+
public PublishHandlingResult startDelivering(PublishInPacket publish, SingleSubscriber subscriber) {
48+
try {
49+
//noinspection DataFlowIssue
50+
return publishOutMessageHandlers[publish.getQos().index()].handle(publish, subscriber);
51+
} catch (IndexOutOfBoundsException | NullPointerException ex) {
52+
log.warning(publish, "Received not supported publish message:[%s]"::formatted);
53+
return PublishHandlingResult.UNSPECIFIED_ERROR;
54+
}
55+
}
56+
}
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package javasabr.mqtt.service.publish.handler;
22

3-
import javasabr.mqtt.model.ActionResult;
43
import javasabr.mqtt.model.QoS;
54
import javasabr.mqtt.model.subscriber.SingleSubscriber;
65
import javasabr.mqtt.network.packet.in.PublishInPacket;
@@ -9,5 +8,5 @@ public interface MqttPublishOutMessageHandler {
98

109
QoS qos();
1110

12-
ActionResult handle(PublishInPacket packet, SingleSubscriber subscriber);
11+
PublishHandlingResult handle(PublishInPacket packet, SingleSubscriber subscriber);
1312
}

service/src/main/java/javasabr/mqtt/service/message/handler/PublishHandlingResult.java renamed to service/src/main/java/javasabr/mqtt/service/publish/handler/PublishHandlingResult.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package javasabr.mqtt.service.message.handler;
1+
package javasabr.mqtt.service.publish.handler;
22

33
import javasabr.mqtt.model.reason.code.PublishAckReasonCode;
44
import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode;
@@ -32,7 +32,13 @@ public enum PublishHandlingResult {
3232
PAYLOAD_FORMAT_INVALID(
3333
true,
3434
PublishAckReasonCode.PAYLOAD_FORMAT_INVALID,
35-
PublishReceivedReasonCode.PAYLOAD_FORMAT_INVALID);
35+
PublishReceivedReasonCode.PAYLOAD_FORMAT_INVALID),
36+
37+
// CUSTOM
38+
NOT_EXPECTED_CLIENT(
39+
true,
40+
PublishAckReasonCode.UNSPECIFIED_ERROR,
41+
PublishReceivedReasonCode.UNSPECIFIED_ERROR);
3642

3743
boolean error;
3844
PublishAckReasonCode ackReasonCode;

service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import javasabr.mqtt.network.packet.in.PublishInPacket;
77
import javasabr.mqtt.service.PublishDeliveringService;
88
import javasabr.mqtt.service.SubscriptionService;
9-
import javasabr.mqtt.service.message.handler.PublishHandlingResult;
109
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
10+
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
1111
import javasabr.rlib.collections.array.Array;
1212
import lombok.AccessLevel;
13+
import lombok.CustomLog;
1314
import lombok.RequiredArgsConstructor;
1415
import lombok.experimental.FieldDefaults;
1516

17+
@CustomLog
1618
@RequiredArgsConstructor
1719
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
1820
public abstract class AbstractMqttPublishInMessageHandler<C extends MqttClient>
@@ -24,6 +26,10 @@ public abstract class AbstractMqttPublishInMessageHandler<C extends MqttClient>
2426

2527
@Override
2628
public void handle(MqttClient client, PublishInPacket packet) {
29+
if (!expectedClient.isInstance(client)) {
30+
log.warning(client, "Not expected client:[%s]"::formatted);
31+
return;
32+
}
2733
handleImpl(expectedClient.cast(client), packet);
2834
}
2935

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package javasabr.mqtt.service.publish.handler.impl;
2+
3+
import javasabr.mqtt.model.MqttProperties;
4+
import javasabr.mqtt.model.subscriber.SingleSubscriber;
5+
import javasabr.mqtt.network.MqttClient;
6+
import javasabr.mqtt.network.packet.in.PublishInPacket;
7+
import javasabr.mqtt.service.SubscriptionService;
8+
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
9+
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
10+
import lombok.AccessLevel;
11+
import lombok.CustomLog;
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.experimental.FieldDefaults;
14+
15+
@CustomLog
16+
@RequiredArgsConstructor
17+
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
18+
public abstract class AbstractMqttPublishOutMessageHandler<C extends MqttClient>
19+
implements MqttPublishOutMessageHandler {
20+
21+
Class<C> expectedClient;
22+
SubscriptionService subscriptionService;
23+
24+
@Override
25+
public PublishHandlingResult handle(PublishInPacket packet, SingleSubscriber subscriber) {
26+
MqttClient mqttClient = subscriptionService.resolveClient(subscriber);
27+
if (!expectedClient.isInstance(mqttClient)) {
28+
log.warning(mqttClient, "Accepted not expected client:[%s]"::formatted);
29+
return PublishHandlingResult.NOT_EXPECTED_CLIENT;
30+
}
31+
return handleImpl(packet, expectedClient.cast(mqttClient));
32+
}
33+
34+
protected abstract PublishHandlingResult handleImpl(PublishInPacket packet, C client) ;
35+
36+
protected void startDelivering(
37+
MqttClient client,
38+
PublishInPacket packet,
39+
int messageId,
40+
boolean duplicate) {
41+
var packetOutFactory = client.packetOutFactory();
42+
client.send(packetOutFactory.newPublish(
43+
messageId,
44+
qos(),
45+
packet.isRetained(),
46+
duplicate,
47+
packet
48+
.getTopicName()
49+
.toString(),
50+
MqttProperties.TOPIC_ALIAS_NOT_SET,
51+
packet.getPayload(),
52+
packet.isPayloadFormatIndicator(),
53+
packet.getResponseTopic(),
54+
packet.getCorrelationData(),
55+
packet.userProperties()));
56+
}
57+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package javasabr.mqtt.service.publish.handler.impl;
2+
3+
import javasabr.mqtt.network.MqttClient;
4+
import javasabr.mqtt.network.MqttSession;
5+
import javasabr.mqtt.network.MqttSession.PendingMessageHandler;
6+
import javasabr.mqtt.network.client.ExternalMqttClient;
7+
import javasabr.mqtt.network.packet.HasPacketId;
8+
import javasabr.mqtt.network.packet.in.PublishInPacket;
9+
import javasabr.mqtt.service.SubscriptionService;
10+
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
11+
import lombok.AccessLevel;
12+
import lombok.experimental.FieldDefaults;
13+
14+
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
15+
public abstract class PersistedMqttPublishOutMessageHandler extends AbstractMqttPublishOutMessageHandler<ExternalMqttClient> {
16+
17+
PendingMessageHandler pendingMessageHandler;
18+
19+
protected PersistedMqttPublishOutMessageHandler(SubscriptionService subscriptionService) {
20+
super(ExternalMqttClient.class, subscriptionService);
21+
this.pendingMessageHandler = new PendingMessageHandler() {
22+
@Override
23+
public boolean handleResponse(MqttClient client, HasPacketId response) {
24+
return handleReceivedResponse(client, response);
25+
}
26+
@Override
27+
public void resend(MqttClient client, PublishInPacket packet, int packetId) {
28+
tryToDeliverAgain(client, packet, packetId);
29+
}
30+
};
31+
}
32+
33+
@Override
34+
protected PublishHandlingResult handleImpl(PublishInPacket packet, ExternalMqttClient client) {
35+
36+
MqttSession session = client.session();
37+
if (session == null) {
38+
return PublishHandlingResult.SKIPPED;
39+
}
40+
// generate new uniq packet id per client
41+
var packetId = session.nextPacketId();
42+
43+
// register waiting async response
44+
session.registerOutPublish(packet, pendingMessageHandler, packetId);
45+
46+
// send publish
47+
startDelivering(client, packet, packetId, false);
48+
49+
return PublishHandlingResult.SUCCESS;
50+
}
51+
52+
protected boolean handleReceivedResponse(MqttClient client, HasPacketId response) {
53+
return false;
54+
}
55+
56+
protected void tryToDeliverAgain(MqttClient client, PublishInPacket packet, int messageId) {
57+
startDelivering(client, packet, messageId, true);
58+
}
59+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import javasabr.mqtt.service.PublishDeliveringService;
66
import javasabr.mqtt.service.SubscriptionService;
77

8-
public class Qos0PublishInMessageHandler extends AbstractMqttPublishInMessageHandler<ExternalMqttClient> {
8+
public class Qos0MqttPublishInMessageHandler extends AbstractMqttPublishInMessageHandler<ExternalMqttClient> {
99

10-
public Qos0PublishInMessageHandler(
10+
public Qos0MqttPublishInMessageHandler(
1111
SubscriptionService subscriptionService,
1212
PublishDeliveringService publishDeliveringService) {
1313
super(ExternalMqttClient.class, subscriptionService, publishDeliveringService);

0 commit comments

Comments
 (0)