Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[versions]
# https://gitlab.com/JavaSaBr/maven-repo/-/packages
rlib = "10.0.alpha6"
rlib = "10.0.alpha8"
# https://mvnrepository.com/artifact/org.projectlombok/lombok
lombok = "1.18.38"
# https://mvnrepository.com/artifact/org.jspecify/jspecify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,29 @@ public record MqttClientConnectionConfig(
int topicAliasMaxValue,
int keepAlive,
boolean requestResponseInformation,
boolean requestProblemInformation) {}
boolean requestProblemInformation) {

public boolean subscriptionIdAvailable() {
return server.subscriptionIdAvailable();
}

public boolean retainAvailable() {
return server.retainAvailable();
}

public boolean wildcardSubscriptionAvailable() {
return server.wildcardSubscriptionAvailable();
}

public boolean sharedSubscriptionAvailable() {
return server.sharedSubscriptionAvailable();
}

public boolean sessionsEnabled() {
return server.sessionsEnabled();
}

public int maxTopicLevels() {
return server.maxTopicLevels();
}
}
4 changes: 1 addition & 3 deletions model/src/main/java/javasabr/mqtt/model/MqttProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ public interface MqttProperties {
int TOPIC_ALIAS_MAX = 0xFFFF;
int TOPIC_ALIAS_NOT_SET = 0;

int SUBSCRIPTION_ID_UNDEFINED = 0;

int MESSAGE_ID_UNDEFINED = -1;
int SUBSCRIPTION_ID_IS_NOT_SET = 0;
int MESSAGE_ID_IS_NOT_SET = 0;

boolean SESSIONS_ENABLED_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,23 @@ public MqttServerConnectionConfig withSharedSubscriptionAvailable(boolean shared
subscriptionIdAvailable,
sharedSubscriptionAvailable);
}

public MqttServerConnectionConfig withSubscriptionIdAvailable(boolean subscriptionIdAvailable) {
return new MqttServerConnectionConfig(
maxQos,
maxMessageSize,
maxStringLength,
maxBinarySize,
maxTopicLevels,
minKeepAliveTime,
receiveMaxPublishes,
topicAliasMaxValue,
defaultSessionExpiryInterval,
keepAliveEnabled,
sessionsEnabled,
retainAvailable,
wildcardSubscriptionAvailable,
subscriptionIdAvailable,
sharedSubscriptionAvailable);
}
}
22 changes: 15 additions & 7 deletions model/src/main/java/javasabr/mqtt/model/QoS.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package javasabr.mqtt.model;

import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
import javasabr.rlib.common.util.NumberedEnum;
import javasabr.rlib.common.util.NumberedEnumMap;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -11,22 +13,28 @@
@RequiredArgsConstructor
@Accessors(fluent = true, chain = false)
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public enum QoS {
public enum QoS implements NumberedEnum<QoS> {
AT_MOST_ONCE(0, SubscribeAckReasonCode.GRANTED_QOS_0),
AT_LEAST_ONCE(1, SubscribeAckReasonCode.GRANTED_QOS_1),
EXACTLY_ONCE(2, SubscribeAckReasonCode.GRANTED_QOS_2),
INVALID(3, SubscribeAckReasonCode.IMPLEMENTATION_SPECIFIC_ERROR);

private static final QoS[] VALUES = values();
private static final NumberedEnumMap<QoS> NUMBERED_MAP =
new NumberedEnumMap<>(QoS.class);

public static QoS ofCode(int level) {
if (level < 0 || level > EXACTLY_ONCE.ordinal()) {
return INVALID;
} else {
return VALUES[level];
}
return NUMBERED_MAP.resolve(level, QoS.INVALID);
}

int level;
SubscribeAckReasonCode subscribeAckReasonCode;

@Override
public int number() {
return level;
}

public QoS lower(QoS alternative) {
return level > alternative.level ? alternative : this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public record Subscription(
public static Subscription minimal(TopicFilter topicFilter, QoS qos) {
return new Subscription(
topicFilter,
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
qos,
SubscribeRetainHandling.SEND,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javasabr.rlib.common.ThreadSafe;
import lombok.AccessLevel;
import lombok.experimental.FieldDefaults;
import org.jspecify.annotations.Nullable;

@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ConcurrentTopicTree implements ThreadSafe {
Expand All @@ -20,8 +21,9 @@ public ConcurrentTopicTree() {
this.rootNode = new TopicNode();
}

public void subscribe(SubscriptionOwner owner, Subscription subscription) {
rootNode.subscribe(0, owner, subscription, subscription.topicFilter());
@Nullable
public SingleSubscriber subscribe(SubscriptionOwner owner, Subscription subscription) {
return rootNode.subscribe(0, owner, subscription, subscription.topicFilter());
}

public boolean unsubscribe(SubscriptionOwner owner, TopicFilter topicFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ class TopicNode extends TopicTreeBase {
@Nullable
volatile LockableArray<Subscriber> subscribers;

public void subscribe(int level, SubscriptionOwner owner, Subscription subscription, TopicFilter topicFilter) {
/**
* @return the previous subscription from the same owner
*/
@Nullable
public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscription subscription, TopicFilter topicFilter) {
if (level == topicFilter.levelsCount()) {
addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
return;
return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
}
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
childNode.subscribe(level + 1, owner, subscription, topicFilter);
return childNode.subscribe(level + 1, owner, subscription, topicFilter);
}

public boolean unsubscribe(int level, SubscriptionOwner owner, TopicFilter topicFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
abstract class TopicTreeBase {

protected static void addSubscriber(
/**
* @return previous subscriber with the same owner
*/
@Nullable
protected static SingleSubscriber addSubscriber(
LockableArray<Subscriber> subscribers,
SubscriptionOwner owner,
Subscription subscription,
Expand All @@ -29,14 +33,30 @@ protected static void addSubscriber(
try {
if (topicFilter instanceof SharedTopicFilter stf) {
addSharedSubscriber(subscribers, owner, subscription, stf);
return null;
} else {
SingleSubscriber previous = removePreviousIfExist(subscribers, owner);
subscribers.add(new SingleSubscriber(owner, subscription));
return previous;
}
} finally {
subscribers.writeUnlock(stamp);
}
}

@Nullable
private static SingleSubscriber removePreviousIfExist(
LockableArray<Subscriber> subscribers,
SubscriptionOwner owner) {
int index = subscribers.indexOf(Subscriber::resolveOwner, owner);
if (index < 0) {
return null;
}
return subscribers
.remove(index)
.resolveSingle();
}

private static void addSharedSubscriber(
LockableArray<Subscriber> subscribers,
SubscriptionOwner owner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,94 @@ class TopicTreeTest extends UnitSpecification {
!id3WasUnsubscribed
}

def "should replace the same subscriptions"() {
given:
ConcurrentTopicTree topicTree = new ConcurrentTopicTree()
def owner1 = makeOwner("id1")
def originalSub = makeSubscription('topic/name1')
def replacementSub = makeSubscription('topic/name1')
topicTree.subscribe(makeOwner("id2"), makeSubscription('topic/name1'))
topicTree.subscribe(makeOwner("id3"), makeSubscription('topic/name1'))
when:
def previous = topicTree.subscribe(owner1, originalSub)
def matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 3
previous == null;
when:
previous = topicTree.subscribe(owner1, replacementSub)
matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 3
matched.first().subscription() == replacementSub
previous != null
previous.subscription() == originalSub
}

def "should extend shared subscription group on multiply subscribing by the same topic"() {
given:
ConcurrentTopicTree topicTree = new ConcurrentTopicTree()
def owner1 = makeOwner("id1")
def owner2 = makeOwner("id2")
topicTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1'))
topicTree.subscribe(owner2, makeSharedSubscription('$share/group1/topic/name1'))
when:
def matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 1
matched.first().owner() == owner2
when:
matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 1
matched.first().owner() == owner1
when:
matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 1
matched.first().owner() == owner2
when:
topicTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1'))
matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 1
matched.first().owner() == owner2
when:
matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 1
matched.first().owner() == owner1
when:
matched = topicTree
.matches(TopicName.valueOf("topic/name1"))
.toSet()
then:
matched.size() == 1
matched.first().owner() == owner1
}

static def makeOwner(String id) {
return new TestSubscriptionOwner(id)
}

static def makeSubscription(String topicFilter) {
return new Subscription(
TopicFilter.valueOf(topicFilter),
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
QoS.AT_LEAST_ONCE,
SubscribeRetainHandling.SEND,
true,
Expand All @@ -556,7 +636,7 @@ class TopicTreeTest extends UnitSpecification {
static def makeSharedSubscription(String topicFilter) {
return new Subscription(
SharedTopicFilter.valueOf(topicFilter),
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
QoS.AT_LEAST_ONCE,
SubscribeRetainHandling.SEND,
true,
Expand All @@ -566,7 +646,7 @@ class TopicTreeTest extends UnitSpecification {
static def makeSubscription(String topicFilter, int qos) {
return new Subscription(
TopicFilter.valueOf(topicFilter),
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,
QoS.ofCode(qos),
SubscribeRetainHandling.SEND,
true,
Expand Down
9 changes: 8 additions & 1 deletion network/src/main/java/javasabr/mqtt/network/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import javasabr.mqtt.model.subscribtion.SubscriptionOwner;
import javasabr.mqtt.network.message.out.ConnectAckMqtt311OutMessage;
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.session.MqttSession;
import org.jspecify.annotations.Nullable;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -32,7 +33,13 @@ interface UnsafeMqttClient extends MqttClient {

void send(MqttOutMessage message);

/**
* @return the feature with result of delivering the message
*/
CompletableFuture<Boolean> sendWithFeedback(MqttOutMessage message);

void closeWithReason(MqttOutMessage message);
/**
* @return the feature with result of delivering the reason before closing
*/
CompletableFuture<Boolean> closeWithReason(MqttOutMessage message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private NetworkPacketReader createPacketReader() {
return new MqttMessageReader(
this,
this::updateLastActivity,
this::handleReceivedPacket,
this::handleReceivedValidPacket,
this::handleReceivedInvalidPacket,
maxPacketsByRead);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.network.MqttClient.UnsafeMqttClient;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.MqttSession;
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
import javasabr.mqtt.network.message.out.ConnectAckMqtt311OutMessage;
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.session.MqttSession;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.Getter;
Expand Down Expand Up @@ -61,9 +61,12 @@ public CompletableFuture<Boolean> sendWithFeedback(MqttOutMessage message) {
}

@Override
public void closeWithReason(MqttOutMessage message) {
sendWithFeedback(message)
.thenAccept(_ -> connection.close());
public CompletableFuture<Boolean> closeWithReason(MqttOutMessage message) {
return sendWithFeedback(message)
.thenApply(sent -> {
connection.close();
return sent;
});
}

@Override
Expand Down
Loading
Loading