Skip to content

Commit 447b17b

Browse files
committed
continue updating subscribe mechanism
1 parent 6ef2239 commit 447b17b

File tree

22 files changed

+230
-244
lines changed

22 files changed

+230
-244
lines changed

application/src/test/groovy/javasabr/mqtt/broker/application/ConnectSubscribePublishTest.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
1212
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode
1313

1414
import java.util.concurrent.CompletableFuture
15-
import java.util.concurrent.atomic.AtomicReference
1615

1716
class ConnectSubscribePublishTest extends IntegrationSpecification {
1817

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[versions]
22
# https://gitlab.com/JavaSaBr/maven-repo/-/packages
3-
rlib = "10.0.alpha5"
3+
rlib = "10.0.alpha6"
44
# https://mvnrepository.com/artifact/org.projectlombok/lombok
55
lombok = "1.18.38"
66
# https://mvnrepository.com/artifact/org.jspecify/jspecify
Lines changed: 47 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,162 +1,152 @@
11
package javasabr.mqtt.model.reason.code;
22

3-
import java.util.stream.Stream;
4-
import javasabr.rlib.common.util.ObjectUtils;
3+
import javasabr.rlib.common.util.NumberedEnum;
4+
import javasabr.rlib.common.util.NumberedEnumMap;
5+
import lombok.AccessLevel;
56
import lombok.Getter;
67
import lombok.RequiredArgsConstructor;
8+
import lombok.experimental.Accessors;
9+
import lombok.experimental.FieldDefaults;
710

11+
@Getter
812
@RequiredArgsConstructor
9-
public enum DisconnectReasonCode {
13+
@Accessors(fluent = true, chain = false)
14+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
15+
public enum DisconnectReasonCode implements NumberedEnum<DisconnectReasonCode> {
1016
/**
1117
* Close the connection normally. Do not send the Will Message. Client or Server.
1218
*/
13-
NORMAL_DISCONNECTION((byte) 0x00),
19+
NORMAL_DISCONNECTION(0x00),
1420
/**
1521
* The Client wishes to disconnect but requires that the Server also publishes its Will Message. Client.
1622
*/
17-
DISCONNECT_WITH_WILL_MESSAGE((byte) 0x04),
23+
DISCONNECT_WITH_WILL_MESSAGE(0x04),
1824

1925
// ERRORS
2026

2127
/**
2228
* The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason
2329
* Codes apply. Client or Server.
2430
*/
25-
UNSPECIFIED_ERROR((byte) 0x80),
31+
UNSPECIFIED_ERROR(0x80),
2632
/**
2733
* The received packet does not conform to this specification. Client or Server.
2834
*/
29-
MALFORMED_PACKET((byte) 0x81),
35+
MALFORMED_PACKET(0x81),
3036
/**
3137
* An unexpected or out of order packet was received. Client or Server.
3238
*/
33-
PROTOCOL_ERROR((byte) 0x82),
39+
PROTOCOL_ERROR(0x82),
3440
/**
3541
* The packet received is valid but cannot be processed by this implementation. Client or Server.
3642
*/
37-
IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83),
43+
IMPLEMENTATION_SPECIFIC_ERROR(0x83),
3844
/**
3945
* The request is not authorized. Server.
4046
*/
41-
NOT_AUTHORIZED((byte) 0x87),
47+
NOT_AUTHORIZED(0x87),
4248
/**
4349
* The Server is busy and cannot continue processing requests from this Client. Server.
4450
*/
45-
SERVER_BUSY((byte) 0x89),
51+
SERVER_BUSY(0x89),
4652
/**
4753
* The Server is shutting down. Server.
4854
*/
49-
SERVER_SHUTTING_DOWN((byte) 0x8B),
55+
SERVER_SHUTTING_DOWN(0x8B),
5056
/**
5157
* The Connection is closed because no packet has been received for 1.5 times the Keepalive time. Server.
5258
*/
53-
KEEP_ALIVE_TIMEOUT((byte) 0x8D),
59+
KEEP_ALIVE_TIMEOUT(0x8D),
5460
/**
5561
* Another Connection using the same ClientID has connected causing this Connection to be closed. Server.
5662
*/
57-
SESSION_TAKEN_OVER((byte) 0x8E),
63+
SESSION_TAKEN_OVER(0x8E),
5864
/**
5965
* The Topic Filter is correctly formed, but is not accepted by this Sever. Server.
6066
*/
61-
TOPIC_FILTER_INVALID((byte) 0x8F),
67+
TOPIC_FILTER_INVALID(0x8F),
6268
/**
6369
* The Topic Name is correctly formed, but is not accepted by this Client or Server. Client or Server.
6470
*/
65-
TOPIC_NAME_INVALID((byte) 0x90),
71+
TOPIC_NAME_INVALID(0x90),
6672
/**
6773
* The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or
6874
* PUBCOMP. Client or Server.
6975
*/
70-
RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93),
76+
RECEIVE_MAXIMUM_EXCEEDED(0x93),
7177
/**
7278
* The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic
7379
* Alias it sent in the CONNECT or CONNACK packet. Client or Server.
7480
*/
75-
TOPIC_ALIAS_INVALID((byte) 0x94),
81+
TOPIC_ALIAS_INVALID(0x94),
7682
/**
7783
* The packet size is greater than Maximum Packet Size for this Client or Server. Client or Server.
7884
*/
79-
PACKET_TOO_LARGE((byte) 0x95),
85+
PACKET_TOO_LARGE(0x95),
8086
/**
8187
* The received data rate is too high. Client or Server.
8288
*/
83-
MESSAGE_RATE_TOO_HIGH((byte) 0x96),
89+
MESSAGE_RATE_TOO_HIGH(0x96),
8490
/**
8591
* An implementation or administrative imposed limit has been exceeded. Client or Server.
8692
*/
87-
QUOTA_EXCEEDED((byte) 0x97),
93+
QUOTA_EXCEEDED(0x97),
8894
/**
8995
* The Connection is closed due to an administrative action. Client or Server.
9096
*/
91-
ADMINISTRATIVE_ACTION((byte) 0x98),
97+
ADMINISTRATIVE_ACTION(0x98),
9298
/**
9399
* The payload format does not match the one specified by the Payload Format Indicator. Client or Server.
94100
*/
95-
PAYLOAD_FORMAT_INVALID((byte) 0x99),
101+
PAYLOAD_FORMAT_INVALID(0x99),
96102
/**
97103
* The Server has does not support retained messages. Server.
98104
*/
99-
RETAIN_NOT_SUPPORTED((byte) 0x9A),
105+
RETAIN_NOT_SUPPORTED(0x9A),
100106
/**
101107
* The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK. Server.
102108
*/
103-
QOS_NOT_SUPPORTED((byte) 0x9B),
109+
QOS_NOT_SUPPORTED(0x9B),
104110
/**
105111
* The Client should temporarily change its Server. Server.
106112
*/
107-
USE_ANOTHER_SERVER((byte) 0x9C),
113+
USE_ANOTHER_SERVER(0x9C),
108114
/**
109115
* The Server is moved and the Client should permanently change its server location. Server.
110116
*/
111-
SERVER_MOVED((byte) 0x9D),
117+
SERVER_MOVED(0x9D),
112118
/**
113119
* The Server does not support Shared Subscriptions. Server.
114120
*/
115-
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED((byte) 0x9E),
121+
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED(0x9E),
116122
/**
117123
* This connection is closed because the connection rate is too high. Server.
118124
*/
119-
CONNECTION_RATE_EXCEEDED((byte) 0x9F),
125+
CONNECTION_RATE_EXCEEDED(0x9F),
120126
/**
121127
* The maximum connection time authorized for this connection has been exceeded. Server.
122128
*/
123-
MAXIMUM_CONNECT_TIME((byte) 0xA0),
129+
MAXIMUM_CONNECT_TIME(0xA0),
124130
/**
125131
* The Server does not support Subscription Identifiers; the subscription is not accepted. Server.
126132
*/
127-
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1),
133+
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED(0xA1),
128134
/**
129135
* The Server does not support Wildcard Subscriptions; the subscription is not accepted. Server.
130136
*/
131-
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED((byte) 0xA2);
137+
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED(0xA2);
132138

133-
private static final DisconnectReasonCode[] VALUES;
139+
private static final NumberedEnumMap<DisconnectReasonCode> NUMBERED_MAP =
140+
new NumberedEnumMap<>(DisconnectReasonCode.class);
134141

135-
static {
136-
137-
var maxId = Stream
138-
.of(values())
139-
.mapToInt(DisconnectReasonCode::getValue)
140-
.map(value -> Byte.toUnsignedInt((byte) value))
141-
.max()
142-
.orElse(0);
143-
144-
var values = new DisconnectReasonCode[maxId + 1];
145-
146-
for (var value : values()) {
147-
values[Byte.toUnsignedInt(value.value)] = value;
148-
}
149-
150-
VALUES = values;
142+
public static DisconnectReasonCode ofCode(int code) {
143+
return NUMBERED_MAP.require(code);
151144
}
152145

153-
public static DisconnectReasonCode of(int index) {
154-
return ObjectUtils.notNull(
155-
VALUES[index],
156-
index,
157-
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
158-
}
146+
int code;
159147

160-
private @Getter
161-
final byte value;
148+
@Override
149+
public int number() {
150+
return code;
151+
}
162152
}
Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,82 @@
11
package javasabr.mqtt.model.reason.code;
22

3-
import java.util.stream.Stream;
4-
import javasabr.rlib.common.util.ObjectUtils;
3+
import javasabr.rlib.common.util.NumberedEnum;
4+
import javasabr.rlib.common.util.NumberedEnumMap;
5+
import lombok.AccessLevel;
56
import lombok.Getter;
67
import lombok.RequiredArgsConstructor;
8+
import lombok.experimental.Accessors;
9+
import lombok.experimental.FieldDefaults;
710

11+
@Getter
812
@RequiredArgsConstructor
9-
public enum SubscribeAckReasonCode {
13+
@Accessors(fluent = true, chain = false)
14+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
15+
public enum SubscribeAckReasonCode implements NumberedEnum<SubscribeAckReasonCode> {
1016
/**
1117
* The subscription is accepted and the maximum QoS sent will be QoS 0. This might be a lower QoS than was requested.
1218
*/
13-
GRANTED_QOS_0((byte) 0x00),
19+
GRANTED_QOS_0(0x00),
1420
/**
1521
* The subscription is accepted and the maximum QoS sent will be QoS 1. This might be a lower QoS than was requested.
1622
*/
17-
GRANTED_QOS_1((byte) 0x01),
23+
GRANTED_QOS_1(0x01),
1824
/**
1925
* The subscription is accepted and any received QoS will be sent to this subscription.
2026
*/
21-
GRANTED_QOS_2((byte) 0x02),
27+
GRANTED_QOS_2(0x02),
2228

2329
// ERRORS
2430

2531
/**
2632
* The subscription is not accepted and the Server either does not wish to reveal the reason or none of the other
2733
* Reason Codes apply.
2834
*/
29-
UNSPECIFIED_ERROR((byte) 0x80),
35+
UNSPECIFIED_ERROR(0x80),
3036
/**
3137
* The SUBSCRIBE is valid but the Server does not accept it.
3238
*/
33-
IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83),
39+
IMPLEMENTATION_SPECIFIC_ERROR(0x83),
3440
/**
3541
* The Client is not authorized to make this subscription.
3642
*/
37-
NOT_AUTHORIZED((byte) 0x87),
43+
NOT_AUTHORIZED(0x87),
3844
/**
3945
* The Topic Filter is correctly formed but is not allowed for this Client.
4046
*/
41-
TOPIC_FILTER_INVALID((byte) 0x8F),
47+
TOPIC_FILTER_INVALID(0x8F),
4248
/**
4349
* The specified Packet Identifier is already in use.
4450
*/
45-
PACKET_IDENTIFIER_IN_USE((byte) 0x91),
51+
PACKET_IDENTIFIER_IN_USE(0x91),
4652
/**
4753
* An implementation or administrative imposed limit has been exceeded.
4854
*/
49-
QUOTA_EXCEEDED((byte) 0x97),
55+
QUOTA_EXCEEDED(0x97),
5056
/**
5157
* The Server does not support Shared Subscriptions for this Client.
5258
*/
53-
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED((byte) 0x9E),
59+
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED(0x9E),
5460
/**
5561
* The Server does not support Subscription Identifiers; the subscription is not accepted.
5662
*/
57-
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1),
63+
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED(0xA1),
5864
/**
5965
* The Server does not support Wildcard Subscriptions; the subscription is not accepted.
6066
*/
61-
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED((byte) 0xA2);
67+
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED(0xA2);
6268

63-
private static final SubscribeAckReasonCode[] VALUES;
69+
private static final NumberedEnumMap<SubscribeAckReasonCode> NUMBERED_MAP =
70+
new NumberedEnumMap<>(SubscribeAckReasonCode.class);
6471

65-
static {
66-
67-
var maxId = Stream
68-
.of(values())
69-
.mapToInt(SubscribeAckReasonCode::getValue)
70-
.map(value -> Byte.toUnsignedInt((byte) value))
71-
.max()
72-
.orElse(0);
73-
74-
var values = new SubscribeAckReasonCode[maxId + 1];
75-
76-
for (var value : values()) {
77-
values[Byte.toUnsignedInt(value.value)] = value;
78-
}
79-
80-
VALUES = values;
72+
public static SubscribeAckReasonCode ofCode(int code) {
73+
return NUMBERED_MAP.require(code);
8174
}
8275

83-
public static SubscribeAckReasonCode of(int index) {
84-
return ObjectUtils.notNull(
85-
VALUES[index],
86-
index,
87-
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
88-
}
76+
int code;
8977

90-
@Getter
91-
private final byte value;
78+
@Override
79+
public int number() {
80+
return code;
81+
}
9282
}

model/src/main/java/javasabr/mqtt/model/subscribtion/RequestedSubscription.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import javasabr.mqtt.model.QoS;
44
import javasabr.mqtt.model.SubscribeRetainHandling;
5-
import javasabr.mqtt.model.topic.TopicFilter;
65

76
public record RequestedSubscription(
87
/*

network/src/main/java/javasabr/mqtt/network/MqttClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ interface UnsafeMqttClient extends MqttClient {
3030

3131
MqttClientConnectionConfig connectionConfig();
3232

33-
void send(MqttOutMessage packet);
33+
void send(MqttOutMessage message);
3434

35-
CompletableFuture<Boolean> sendWithFeedback(MqttOutMessage packet);
35+
CompletableFuture<Boolean> sendWithFeedback(MqttOutMessage message);
36+
37+
void closeWithReason(MqttOutMessage message);
3638
}

network/src/main/java/javasabr/mqtt/network/impl/AbstractMqttClient.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,21 @@ public AbstractMqttClient(MqttConnection connection, MqttClientReleaseHandler re
4949
}
5050

5151
@Override
52-
public void send(MqttOutMessage packet) {
53-
log.debug(clientId, packet.name(), packet, "[%s] Send to client packet:[%s] %s"::formatted);
54-
connection.send(packet);
52+
public void send(MqttOutMessage message) {
53+
log.debug(clientId, message.name(), message, "[%s] Send to client packet:[%s] %s"::formatted);
54+
connection.send(message);
5555
}
5656

5757
@Override
58-
public CompletableFuture<Boolean> sendWithFeedback(MqttOutMessage packet) {
59-
log.debug(clientId, packet.name(), packet, "[%s] Send to client packet:[%s] %s"::formatted);
60-
return connection.sendWithFeedback(packet);
58+
public CompletableFuture<Boolean> sendWithFeedback(MqttOutMessage message) {
59+
log.debug(clientId, message.name(), message, "[%s] Send to client packet:[%s] %s"::formatted);
60+
return connection.sendWithFeedback(message);
61+
}
62+
63+
@Override
64+
public void closeWithReason(MqttOutMessage message) {
65+
sendWithFeedback(message)
66+
.thenAccept(_ -> connection.close());
6167
}
6268

6369
@Override

0 commit comments

Comments
 (0)