Skip to content

Commit 271a9f0

Browse files
committed
continue updating subscription service
1 parent 6b31d26 commit 271a9f0

File tree

10 files changed

+126
-20
lines changed

10 files changed

+126
-20
lines changed

application/src/test/groovy/javasabr/mqtt/broker/application/ConnectSubscribePublishTest.groovy

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReference
1515

1616
class ConnectSubscribePublishTest extends IntegrationSpecification {
1717

18-
def "publisher should publish message QoS 0 using mqtt 3.1.1"() {
18+
def "should deliver publish message QoS 0 using mqtt 3.1.1"() {
1919
given:
2020
def received = new AtomicReference<Mqtt3Publish>()
2121
def subscriber = buildExternalMqtt311Client()
@@ -26,7 +26,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
2626
publisher.connect().join()
2727
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
2828
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
29-
Thread.sleep(100)
3029
then:
3130
noExceptionThrown()
3231
subscribeResult != null
@@ -43,7 +42,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
4342
publisher.disconnect().join()
4443
}
4544

46-
def "publisher should publish message QoS 0 using mqtt 5"() {
45+
def "should deliver publish message QoS 0 using mqtt 5"() {
4746
given:
4847
def received = new AtomicReference<Mqtt5Publish>()
4948
def subscriber = buildExternalMqtt5Client()
@@ -54,7 +53,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
5453
publisher.connect().join()
5554
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
5655
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
57-
Thread.sleep(100)
5856
then:
5957
noExceptionThrown()
6058
subscribeResult != null
@@ -71,7 +69,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
7169
publisher.disconnect().join()
7270
}
7371

74-
def "publisher should publish message QoS 1 using mqtt 3.1.1"() {
72+
def "should deliver publish message QoS 1 using mqtt 3.1.1"() {
7573
given:
7674
def received = new AtomicReference<Mqtt3Publish>()
7775
def subscriber = buildExternalMqtt311Client()
@@ -82,7 +80,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
8280
publisher.connect().join()
8381
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
8482
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
85-
Thread.sleep(100)
8683
then:
8784
noExceptionThrown()
8885
subscribeResult != null

application/src/test/resources/log4j2.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@
1515
<Logger name="javasabr.mqtt.model.network.impl.DefaultMqttSession" level="DEBUG" additivity="false">
1616
<AppenderRef ref="BrokerConsoleTest"/>
1717
</Logger>
18+
<Logger name="javasabr.mqtt.service.impl.DefaultPublishReceivingService" level="DEBUG" additivity="false">
19+
<AppenderRef ref="BrokerConsoleTest"/>
20+
</Logger>
21+
<Logger name="javasabr.mqtt.service.publish.handler.impl.AbstractMqttPublishInMessageHandler" level="DEBUG" additivity="false">
22+
<AppenderRef ref="BrokerConsoleTest"/>
23+
</Logger>
24+
<Logger name="javasabr.mqtt.service.publish.handler.impl.Qos1MqttPublishInMessageHandler" level="DEBUG" additivity="false">
25+
<AppenderRef ref="BrokerConsoleTest"/>
26+
</Logger>
1827
<Root level="INFO">
1928
<AppenderRef ref="BrokerConsoleTest"/>
2029
</Root>

model/src/main/java/javasabr/mqtt/model/publishing/Publish.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package javasabr.mqtt.model.publishing;
22

3+
import javasabr.mqtt.base.util.DebugUtils;
34
import javasabr.mqtt.model.PayloadFormat;
45
import javasabr.mqtt.model.QoS;
56
import javasabr.mqtt.model.TrackableMessage;
@@ -25,6 +26,10 @@ public record Publish(
2526
PayloadFormat payloadFormat,
2627
Array<StringPair> userProperties) implements TrackableMessage {
2728

29+
static {
30+
DebugUtils.registerIncludedFields("topicName", "messageId", "qos", "topicAlias", "payloadFormat");
31+
}
32+
2833
public Publish withDuplicated() {
2934
if (duplicated) {
3035
return this;
@@ -63,4 +68,9 @@ public Publish with(int messageId, QoS qos, boolean duplicated, int topicAlias)
6368
payloadFormat,
6469
userProperties);
6570
}
71+
72+
@Override
73+
public String toString() {
74+
return DebugUtils.toJsonString(this);
75+
}
6676
}

model/src/test/groovy/javasabr/mqtt/model/topic/tree/TopicTreeTest.groovy

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,80 @@ class TopicTreeTest extends UnitSpecification {
462462
(group1.contains(matched[1]) && group2.contains(matched[0]))
463463
(group1.contains(matched2[0]) && group2.contains(matched2[1])) ||
464464
(group1.contains(matched2[1]) && group2.contains(matched2[0]))
465+
}
466+
467+
def "should subscribe and unsubscribe simple topic correctly correctly"() {
468+
given:
469+
ConcurrentTopicTree topicTree = new ConcurrentTopicTree()
470+
topicTree.subscribe(makeOwner("id1"), makeSubscription('topic/name1'))
471+
topicTree.subscribe(makeOwner("id2"), makeSubscription('topic/name1'))
472+
topicTree.subscribe(makeOwner("id3"), makeSubscription('topic/name1'))
473+
when:
474+
def matched = topicTree
475+
.matches(TopicName.valueOf("topic/name1"))
476+
.collect { it.owner().toString() }
477+
.toSet()
478+
then:
479+
matched.size() == 3
480+
when:
481+
def id2WasUnsubscribed = topicTree.unsubscribe(makeOwner("id2"), TopicFilter.valueOf('topic/name1'))
482+
def id3WasUnsubscribed = topicTree.unsubscribe(makeOwner("id3"), TopicFilter.valueOf('topic/name1'))
483+
matched = topicTree
484+
.matches(TopicName.valueOf("topic/name1"))
485+
.collect { it.owner().toString() }
486+
.toSet()
487+
then:
488+
matched.size() == 1
489+
id2WasUnsubscribed
490+
id3WasUnsubscribed
491+
when:
492+
def id1WasUnsubscribed = topicTree.unsubscribe(makeOwner("id1"), TopicFilter.valueOf('topic/name1'))
493+
id3WasUnsubscribed = topicTree.unsubscribe(makeOwner("id3"), TopicFilter.valueOf('topic/name1'))
494+
matched = topicTree
495+
.matches(TopicName.valueOf("topic/name1"))
496+
.collect { it.owner().toString() }
497+
.toSet()
498+
then:
499+
matched.size() == 0
500+
id1WasUnsubscribed
501+
!id3WasUnsubscribed
502+
}
465503

504+
def "should subscribe and unsubscribe shared topic correctly correctly"() {
505+
given:
506+
ConcurrentTopicTree topicTree = new ConcurrentTopicTree()
507+
topicTree.subscribe(makeOwner("id1"), makeSharedSubscription('$share/group1/topic/name1'))
508+
topicTree.subscribe(makeOwner("id2"), makeSharedSubscription('$share/group1/topic/name1'))
509+
topicTree.subscribe(makeOwner("id3"), makeSharedSubscription('$share/group1/topic/name1'))
510+
when:
511+
def matched = topicTree
512+
.matches(TopicName.valueOf("topic/name1"))
513+
.collect { it.owner().toString() }
514+
.toSet()
515+
then:
516+
matched.size() == 1
517+
when:
518+
def id2WasUnsubscribed = topicTree.unsubscribe(makeOwner("id2"), SharedTopicFilter.valueOf('$share/group1/topic/name1'))
519+
def id3WasUnsubscribed = topicTree.unsubscribe(makeOwner("id3"), SharedTopicFilter.valueOf('$share/group1/topic/name1'))
520+
matched = topicTree
521+
.matches(TopicName.valueOf("topic/name1"))
522+
.collect { it.owner().toString() }
523+
.toSet()
524+
then:
525+
matched.size() == 1
526+
id2WasUnsubscribed
527+
id3WasUnsubscribed
528+
when:
529+
def id1WasUnsubscribed = topicTree.unsubscribe(makeOwner("id1"), SharedTopicFilter.valueOf('$share/group1/topic/name1'))
530+
id3WasUnsubscribed = topicTree.unsubscribe(makeOwner("id3"), SharedTopicFilter.valueOf('$share/group1/topic/name1'))
531+
matched = topicTree
532+
.matches(TopicName.valueOf("topic/name1"))
533+
.collect { it.owner().toString() }
534+
.toSet()
535+
then:
536+
matched.size() == 0
537+
id1WasUnsubscribed
538+
!id3WasUnsubscribed
466539
}
467540

468541
static def makeOwner(String id) {

network/src/main/java/javasabr/mqtt/network/message/in/PublishMqttInMessage.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,14 @@ public class PublishMqttInMessage extends TrackableMqttInMessage {
3232
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH.ordinal();
3333

3434
static {
35-
DebugUtils.registerIncludedFields("topicName", "qos", "duplicate", "messageId");
35+
DebugUtils.registerIncludedFields(
36+
"rawTopicName",
37+
"topicAlias",
38+
"qos",
39+
"duplicate",
40+
"messageId",
41+
"messageExpiryInterval",
42+
"payloadFormat");
3643
}
3744

3845
private static final Set<MqttMessageProperty> AVAILABLE_PROPERTIES = EnumSet.of(
Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
11
package javasabr.mqtt.network.message.out;
22

3-
import java.nio.ByteBuffer;
43
import javasabr.mqtt.network.MqttConnection;
54
import javasabr.mqtt.network.message.MqttMessageType;
65
import lombok.AccessLevel;
7-
import lombok.RequiredArgsConstructor;
86
import lombok.experimental.FieldDefaults;
97

108
/**
119
* Publish acknowledgement.
1210
*/
13-
@RequiredArgsConstructor
1411
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
15-
public class PublishAckMqtt311OutMessage extends MqttOutMessage {
12+
public class PublishAckMqtt311OutMessage extends TrackableMqttOutMessage {
1613

1714
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH_ACK.ordinal();
1815

16+
public PublishAckMqtt311OutMessage(int messageId) {
17+
super(messageId);
18+
}
19+
1920
/**
2021
* Packet Identifier from the PUBLISH packet that is being acknowledged.
22+
* {@link TrackableMqttOutMessage#messageId}
2123
*/
22-
int messageId;
24+
//int messageId;
2325

2426
@Override
2527
public int expectedLength(MqttConnection connection) {
@@ -30,10 +32,4 @@ public int expectedLength(MqttConnection connection) {
3032
protected byte messageType() {
3133
return MESSAGE_TYPE;
3234
}
33-
34-
@Override
35-
protected void writeVariableHeader(MqttConnection connection, ByteBuffer buffer) {
36-
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718045
37-
buffer.putShort((short) messageId);
38-
}
3935
}

service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishReceivingService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ public DefaultPublishReceivingService(
4545

4646
@Override
4747
public void processReceivedPublish(MqttClient client, Publish publish) {
48+
log.debug(client.clientId(), publish, "[%s] Processing received publish: [%s]"::formatted);
4849
QoS qos = publish.qos();
4950
try {
5051
//noinspection DataFlowIssue
5152
publishInHandlers[qos.level()].handle(client, publish);
5253
} catch (IndexOutOfBoundsException | NullPointerException ex) {
53-
log.warning(publish, "Received not supported publish message:[%s]"::formatted);
54+
log.warning(client.clientId(), publish, "[%s] Received not supported publish message:[%s]"::formatted);
5455
}
5556
}
5657

service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ protected void processReceived(
5353
client.send(messageOutFactoryService
5454
.resolveFactory(client)
5555
.newPublishAck(messageId, PublishAckReasonCode.PACKET_IDENTIFIER_IN_USE));
56+
log.warning(client.clientId(), messageId, "[%s] Client provided already in use messageId:[%s]..."::formatted);
5657
return;
5758
}
5859

@@ -62,12 +63,12 @@ protected void processReceived(
6263
client.send(messageOutFactoryService
6364
.resolveFactory(client)
6465
.newPublishAck(messageId, PublishAckReasonCode.TOPIC_NAME_INVALID));
66+
log.warning(client.clientId(), rawTopicName, "[%s] Client provided invalid topic name:[%s]..."::formatted);
6567
return;
6668
}
6769

6870
TopicName topicName = topicService.createTopicName(client, rawTopicName);
6971

70-
7172
// TODO
7273
byte[] payload = message.payload();
7374
int topicAlias = message.topicAlias();

service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,16 @@ protected void handleImpl(C client, Publish publish) {
3737
TopicName topicName = publish.topicName();
3838
Array<SingleSubscriber> subscribers = subscriptionService.findSubscribers(topicName);
3939
if (subscribers.isEmpty()) {
40+
log.debug(client.clientId(), publish, "[%s] Not found any subscriber for publish: [%s]"::formatted);
4041
handleEmptySubscriptions(client, publish);
4142
return;
4243
}
4344

4445
for (SingleSubscriber subscriber : subscribers) {
4546
PublishHandlingResult checkResult = checkSubscriber(client, publish, subscriber);
4647
if (checkResult.error()) {
48+
log.debug(client.clientId(), checkResult, subscriber,
49+
"[%s] Found error:[%s] for subscriber:[%s] during checking"::formatted);
4750
handleError(client, publish, checkResult);
4851
return;
4952
}
@@ -61,8 +64,12 @@ protected void handleImpl(C client, Publish publish) {
6164
}
6265

6366
if (errorResult != null) {
67+
log.debug(client.clientId(), errorResult,
68+
"[%s] Found final error:[%s] during processing publish"::formatted);
6469
handleError(client, publish, errorResult);
6570
} else {
71+
log.debug(client.clientId(), count,
72+
"[%s] Successfully started delivering publish to [%s] subscribers"::formatted);
6673
handleSuccessfulResult(client, publish, count);
6774
}
6875
}

service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
import javasabr.mqtt.service.SubscriptionService;
1010
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
1111
import lombok.AccessLevel;
12+
import lombok.CustomLog;
1213
import lombok.experimental.FieldDefaults;
1314

15+
@CustomLog
1416
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
1517
public class Qos1MqttPublishInMessageHandler extends Qos0MqttPublishInMessageHandler {
1618

@@ -32,6 +34,7 @@ public QoS qos() {
3234
@Override
3335
protected void handleEmptySubscriptions(ExternalMqttClient client, Publish publish) {
3436
super.handleEmptySubscriptions(client, publish);
37+
log.debug(client.clientId(), "[%s] Send PUBACK after not found any subscriber..."::formatted);
3538
client.send(messageOutFactoryService
3639
.resolveFactory(client)
3740
.newPublishAck(publish.messageId(), PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS));
@@ -40,6 +43,7 @@ protected void handleEmptySubscriptions(ExternalMqttClient client, Publish publi
4043
@Override
4144
protected void handleError(ExternalMqttClient client, Publish publish, PublishHandlingResult handlingResult) {
4245
super.handleError(client, publish, handlingResult);
46+
log.debug(client.clientId(), "[%s] Send PUBACK after failed processing publish..."::formatted);
4347
client.send(messageOutFactoryService
4448
.resolveFactory(client)
4549
.newPublishAck(publish.messageId(), handlingResult.ackReasonCode()));
@@ -48,6 +52,7 @@ protected void handleError(ExternalMqttClient client, Publish publish, PublishHa
4852
@Override
4953
protected void handleSuccessfulResult(ExternalMqttClient client, Publish publish, int subscribers) {
5054
super.handleSuccessfulResult(client, publish, subscribers);
55+
log.debug(client.clientId(), "[%s] Send PUBACK after successful processing publish..."::formatted);
5156
client.send(messageOutFactoryService
5257
.resolveFactory(client)
5358
.newPublishAck(publish.messageId(), PublishAckReasonCode.SUCCESS));

0 commit comments

Comments
 (0)