Skip to content

Commit e1e40a6

Browse files
committed
Refactored MqttStatefulIncomingPublish + MqttMatchingPublishFlows -> MqttStatefulPublishWithFlows
1 object with all incoming state, separated from message
1 parent 7087f48 commit e1e40a6

File tree

14 files changed

+344
-352
lines changed

14 files changed

+344
-352
lines changed

src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt3/Mqtt3PublishDecoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.hivemq.client.internal.mqtt.codec.decoder.MqttMessageDecoder;
2222
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
2323
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
24-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulIncomingPublish;
24+
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
2525
import com.hivemq.client.internal.mqtt.message.publish.mqtt3.Mqtt3PublishView;
2626
import com.hivemq.client.internal.util.ByteBufferUtil;
2727
import com.hivemq.client.mqtt.datatypes.MqttQos;
@@ -47,7 +47,7 @@ public class Mqtt3PublishDecoder implements MqttMessageDecoder {
4747
Mqtt3PublishDecoder() {}
4848

4949
@Override
50-
public @NotNull MqttStatefulIncomingPublish decode(
50+
public @NotNull MqttStatefulPublish decode(
5151
final int flags, final @NotNull ByteBuf in, final @NotNull MqttDecoderContext context)
5252
throws MqttDecoderException {
5353

src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt5/Mqtt5PublishDecoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.hivemq.client.internal.mqtt.codec.decoder.MqttMessageDecoder;
2222
import com.hivemq.client.internal.mqtt.datatypes.*;
2323
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
24-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulIncomingPublish;
24+
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
2525
import com.hivemq.client.internal.util.ByteBufferUtil;
2626
import com.hivemq.client.internal.util.Utf8Util;
2727
import com.hivemq.client.internal.util.collections.ImmutableIntList;
@@ -54,7 +54,7 @@ public class Mqtt5PublishDecoder implements MqttMessageDecoder {
5454
Mqtt5PublishDecoder() {}
5555

5656
@Override
57-
public @NotNull MqttStatefulIncomingPublish decode(
57+
public @NotNull MqttStatefulPublish decode(
5858
final int flags, final @NotNull ByteBuf in, final @NotNull MqttDecoderContext context)
5959
throws MqttDecoderException {
6060

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishConfirmable.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,20 @@
2626
*/
2727
class MqttIncomingPublishConfirmable implements Confirmable, Runnable {
2828

29-
private final long id;
3029
private final @NotNull MqttIncomingPublishFlow flow;
31-
private final @NotNull MqttMatchingPublishFlows flows;
30+
private final @NotNull MqttStatefulPublishWithFlows publishWithFlows;
3231
private final @NotNull AtomicBoolean confirmed = new AtomicBoolean(false);
3332

3433
MqttIncomingPublishConfirmable(
35-
final long id, final @NotNull MqttIncomingPublishFlow flow, final @NotNull MqttMatchingPublishFlows flows) {
34+
final @NotNull MqttIncomingPublishFlow flow, final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
3635

37-
this.id = id;
3836
this.flow = flow;
39-
this.flows = flows;
37+
this.publishWithFlows = publishWithFlows;
4038
}
4139

4240
@Override
4341
public long getId() {
44-
return id;
42+
return publishWithFlows.id;
4543
}
4644

4745
@Override
@@ -55,7 +53,7 @@ public boolean confirm() {
5553

5654
@Override
5755
public void run() {
58-
flows.acknowledge(flow);
56+
publishWithFlows.acknowledge(flow);
5957
}
6058

6159
static class Qos0 implements Confirmable {

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlows.java

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.hivemq.client.internal.annotations.NotThreadSafe;
2020
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicFilterImpl;
2121
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
22-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
2322
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
2423
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscription;
2524
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
@@ -118,29 +117,21 @@ void cancelGlobal(final @NotNull MqttGlobalIncomingPublishFlow flow) {
118117
}
119118
}
120119

121-
@NotNull MqttMatchingPublishFlows findMatching(final @NotNull MqttStatefulPublish publish) {
122-
final MqttMatchingPublishFlows matchingFlows = new MqttMatchingPublishFlows();
123-
findMatching(publish, matchingFlows);
124-
return matchingFlows;
125-
}
126-
127-
void findMatching(
128-
final @NotNull MqttStatefulPublish publish, final @NotNull MqttMatchingPublishFlows matchingFlows) {
129-
130-
subscribedFlows.findMatching(publish.stateless().getTopic(), matchingFlows);
131-
if (matchingFlows.subscriptionFound) {
132-
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.SUBSCRIBED.ordinal()]);
120+
void findMatching(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
121+
subscribedFlows.findMatching(publishWithFlows);
122+
if (publishWithFlows.subscriptionFound) {
123+
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.SUBSCRIBED.ordinal()]);
133124
} else {
134-
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.UNSOLICITED.ordinal()]);
125+
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.UNSOLICITED.ordinal()]);
135126
}
136-
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.ALL.ordinal()]);
137-
if (matchingFlows.isEmpty()) {
138-
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.REMAINING.ordinal()]);
127+
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.ALL.ordinal()]);
128+
if (publishWithFlows.isEmpty()) {
129+
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.REMAINING.ordinal()]);
139130
}
140131
}
141132

142133
private static void add(
143-
final @NotNull MqttMatchingPublishFlows matchingPublishFlows,
134+
final @NotNull MqttStatefulPublishWithFlows matchingPublishFlows,
144135
final @Nullable HandleList<MqttGlobalIncomingPublishFlow> globalFlows) {
145136

146137
if (globalFlows != null) {

src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishService.java

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.hivemq.client.internal.logging.InternalLoggerFactory;
2323
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
2424
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
25-
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulIncomingPublish;
2625
import com.hivemq.client.internal.util.collections.ChunkedArrayQueue;
2726
import com.hivemq.client.internal.util.collections.HandleList.Handle;
2827
import com.hivemq.client.mqtt.datatypes.MqttQos;
@@ -61,13 +60,12 @@ class MqttIncomingPublishService {
6160
}
6261

6362
@CallByThread("Netty EventLoop")
64-
void onPublishQos0(final @NotNull MqttStatefulIncomingPublish publish, final int receiveMaximum) {
65-
if (qos0Queue.size() >= (2 * receiveMaximum)) { // TODO receiveMaximum
63+
void onPublishQos0(final @NotNull MqttStatefulPublishWithFlows publishWithFlows, final int receiveMaximum) {
64+
if (qos0Queue.size() >= receiveMaximum) { // TODO receiveMaximum
6665
LOGGER.warn("QoS 0 publish message dropped.");
6766
if (QOS_0_DROP_OLDEST) {
6867
qos0It.reset();
69-
qos0It.next();
70-
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos0It.next();
68+
final MqttStatefulPublishWithFlows flows = (MqttStatefulPublishWithFlows) qos0It.next();
7169
qos0It.remove();
7270
for (Handle<MqttIncomingPublishFlow> h = flows.getFirst(); h != null; h = h.getNext()) {
7371
if (h.getElement().dereference() == 0) {
@@ -78,43 +76,40 @@ void onPublishQos0(final @NotNull MqttStatefulIncomingPublish publish, final int
7876
return;
7977
}
8078
}
81-
final MqttMatchingPublishFlows flows = onPublish(publish);
82-
if (!flows.isEmpty()) {
83-
qos0Queue.offer(publish);
84-
qos0Queue.offer(flows);
79+
onPublish(publishWithFlows);
80+
if (!publishWithFlows.isEmpty()) {
81+
qos0Queue.offer(publishWithFlows);
8582
}
8683
}
8784

8885
@CallByThread("Netty EventLoop")
89-
boolean onPublishQos1Or2(final @NotNull MqttStatefulIncomingPublish publish, final int receiveMaximum) {
90-
if (qos1Or2Queue.size() >= (2 * receiveMaximum)) {
86+
boolean onPublishQos1Or2(final @NotNull MqttStatefulPublishWithFlows publishWithFlows, final int receiveMaximum) {
87+
if (qos1Or2Queue.size() >= receiveMaximum) {
9188
return false; // flow control error
9289
}
93-
publish.setId(nextQoS1Or2PublishId++);
94-
final MqttMatchingPublishFlows flows = onPublish(publish);
95-
if (qos1Or2Queue.isEmpty() && flows.isEmpty() && flows.areAcknowledged()) {
96-
incomingQosHandler.ack(publish);
90+
publishWithFlows.id = nextQoS1Or2PublishId++;
91+
onPublish(publishWithFlows);
92+
if (qos1Or2Queue.isEmpty() && publishWithFlows.isEmpty() && publishWithFlows.areAcknowledged()) {
93+
incomingQosHandler.ack(publishWithFlows);
9794
} else {
98-
qos1Or2Queue.offer(publish);
99-
qos1Or2Queue.offer(flows);
95+
qos1Or2Queue.offer(publishWithFlows);
10096
}
10197
return true;
10298
}
10399

104100
@CallByThread("Netty EventLoop")
105-
private @NotNull MqttMatchingPublishFlows onPublish(final @NotNull MqttStatefulIncomingPublish publish) {
106-
final MqttMatchingPublishFlows flows = incomingPublishFlows.findMatching(publish);
107-
if (flows.isEmpty()) {
108-
LOGGER.warn("No publish flow registered for {}.", publish);
101+
private void onPublish(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
102+
incomingPublishFlows.findMatching(publishWithFlows);
103+
if (publishWithFlows.isEmpty()) {
104+
LOGGER.warn("No publish flow registered for {}.", publishWithFlows.publish);
109105
}
110106
drain();
111-
for (Handle<MqttIncomingPublishFlow> h = flows.getFirst(); h != null; h = h.getNext()) {
107+
for (Handle<MqttIncomingPublishFlow> h = publishWithFlows.getFirst(); h != null; h = h.getNext()) {
112108
if (h.getElement().reference() == 1) {
113109
referencedFlowCount++;
114110
}
115111
}
116-
emit(publish, flows);
117-
return flows;
112+
emit(publishWithFlows);
118113
}
119114

120115
@CallByThread("Netty EventLoop")
@@ -124,22 +119,20 @@ void drain() {
124119

125120
qos1Or2It.reset();
126121
while (qos1Or2It.hasNext()) {
127-
final MqttStatefulIncomingPublish publish = (MqttStatefulIncomingPublish) qos1Or2It.next();
128-
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos1Or2It.next();
129-
emit(publish, flows);
130-
if ((qos1Or2It.getIterated() == 2) && flows.isEmpty() && flows.areAcknowledged()) {
122+
final MqttStatefulPublishWithFlows publishWithFlows = (MqttStatefulPublishWithFlows) qos1Or2It.next();
123+
emit(publishWithFlows);
124+
if ((qos1Or2It.getIterated() == 1) && publishWithFlows.isEmpty() && publishWithFlows.areAcknowledged()) {
131125
qos1Or2It.remove();
132-
incomingQosHandler.ack(publish);
126+
incomingQosHandler.ack(publishWithFlows);
133127
} else if (blockingFlowCount == referencedFlowCount) {
134128
return;
135129
}
136130
}
137131
qos0It.reset();
138132
while (qos0It.hasNext()) {
139-
final MqttStatefulIncomingPublish publish = (MqttStatefulIncomingPublish) qos0It.next();
140-
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos0It.next();
141-
emit(publish, flows);
142-
if ((qos0It.getIterated() == 2) && flows.isEmpty()) {
133+
final MqttStatefulPublishWithFlows publishWithFlows = (MqttStatefulPublishWithFlows) qos0It.next();
134+
emit(publishWithFlows);
135+
if ((qos0It.getIterated() == 1) && publishWithFlows.isEmpty()) {
143136
qos0It.remove();
144137
} else if (blockingFlowCount == referencedFlowCount) {
145138
return;
@@ -148,32 +141,30 @@ void drain() {
148141
}
149142

150143
@CallByThread("Netty EventLoop")
151-
private void emit(
152-
final @NotNull MqttStatefulIncomingPublish statefulPublish, final @NotNull MqttMatchingPublishFlows flows) {
153-
154-
for (Handle<MqttIncomingPublishFlow> h = flows.getFirst(); h != null; h = h.getNext()) {
144+
private void emit(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
145+
for (Handle<MqttIncomingPublishFlow> h = publishWithFlows.getFirst(); h != null; h = h.getNext()) {
155146
final MqttIncomingPublishFlow flow = h.getElement();
156147

157148
if (flow.isCancelled()) {
158-
flows.remove(h);
149+
publishWithFlows.remove(h);
159150
if (flow.dereference() == 0) {
160151
referencedFlowCount--;
161152
}
162153
} else {
163154
final long requested = flow.requested(runIndex);
164155
if (requested > 0) {
165-
MqttPublish publish = statefulPublish.stateless();
156+
MqttPublish publish = publishWithFlows.publish.stateless();
166157
if (flow.manualAcknowledgement) {
167158
final Confirmable confirmable;
168159
if (publish.getQos() == MqttQos.AT_MOST_ONCE) {
169160
confirmable = new MqttIncomingPublishConfirmable.Qos0();
170161
} else {
171-
confirmable = new MqttIncomingPublishConfirmable(statefulPublish.getId(), flow, flows);
162+
confirmable = new MqttIncomingPublishConfirmable(flow, publishWithFlows);
172163
}
173164
publish = publish.withConfirmable(confirmable);
174165
}
175166
flow.onNext(publish);
176-
flows.remove(h);
167+
publishWithFlows.remove(h);
177168
if (flow.dereference() == 0) {
178169
referencedFlowCount--;
179170
flow.checkDone();

0 commit comments

Comments
 (0)