Skip to content

Commit 8801394

Browse files
committed
Document and improve PubAck handling
This improves the doc around how to back-pressure the publication of Publish commands to avoid buffer overflow in QoS 1+ cases. Implements the ask pattern for this purpose.
1 parent 3a7f0ae commit 8801394

File tree

5 files changed

+82
-13
lines changed

5 files changed

+82
-13
lines changed

docs/src/main/paradox/mqtt-streaming.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,17 @@ Note that the `Publish` command is not offered to the command flow given MQTT Qo
5858
session is told to perform `Publish` given that it can retry continuously with buffering until a command
5959
flow is established.
6060

61-
We filter the events received as there will be ACKs to our connect, subscribe and publish. The collected event
62-
is the publication to the topic we just subscribed to.
61+
We scan the events received as there will be ACKs to our connect, subscribe and publish. The collected event
62+
is the publication to the topic we just subscribed to, and we only publish that once the server has acknowledged the
63+
original publish that we sent. We do this to illustrate how objects can be carried with certain commands, namely the
64+
connect, subscribe and publish ones. These objects, in our example a `Promise[Done]`, can be used to correlate a reply event
65+
with its corresponding command. We complete the promise carried through with our publish command when we
66+
receive its ACK. You can use a similar approach to provide back-pressure the queuing of publish commands by submitting them
67+
only when any previous one has been acknowledged by the server. If you do not do this then you can easily overflow buffers
68+
when publishing.
69+
70+
> Consider using `QoSAtMostOnceDelivery` i.e. "best effort" for your use case as the trade-off with other QoS approaches
71+
> yields higher latency and greater code complexity. IoT use-cases are often best-effort and tolerate occasional loss.
6372
6473
To shut down the flow after use, the command queue `commands` is completed and after its completion the `session` is shut down.
6574

mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/javadsl/MqttSession.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package akka.stream.alpakka.mqtt.streaming
66
package javadsl
77

8+
import java.util.concurrent.CompletionStage
9+
810
import akka.NotUsed
911
import akka.actor.ActorSystem
1012
import akka.stream.Materializer
@@ -16,6 +18,8 @@ import akka.stream.alpakka.mqtt.streaming.scaladsl.{
1618
}
1719
import akka.stream.javadsl.Source
1820

21+
import scala.compat.java8.FutureConverters._
22+
1923
/**
2024
* Represents MQTT session state for both clients or servers. Session
2125
* state can survive across connections i.e. their lifetime is
@@ -32,6 +36,18 @@ abstract class MqttSession {
3236
*/
3337
def tell[A](cp: Command[A]): Unit
3438

39+
/**
40+
* Ask the session to perform a command regardless of the state it is
41+
* in. This is important for sending Publish messages in particular,
42+
* as a connection may not have been established with a session.
43+
* @param cp The command to perform
44+
* @tparam A The type of any carry for the command.
45+
* @return A future indicating when the command has completed. Completion
46+
* is defined as when it has been acknowledged by the recipient
47+
* endpoint.
48+
*/
49+
def ask[A](cp: Command[A]): CompletionStage[A]
50+
3551
/**
3652
* Shutdown the session gracefully
3753
*/
@@ -47,6 +63,9 @@ abstract class MqttClientSession extends MqttSession {
4763
override def tell[A](cp: Command[A]): Unit =
4864
underlying ! cp
4965

66+
override def ask[A](cp: Command[A]): CompletionStage[A] =
67+
(underlying ? cp).toJava
68+
5069
override def shutdown(): Unit =
5170
underlying.shutdown()
5271
}
@@ -91,6 +110,9 @@ abstract class MqttServerSession extends MqttSession {
91110
override def tell[A](cp: Command[A]): Unit =
92111
underlying ! cp
93112

113+
override def ask[A](cp: Command[A]): CompletionStage[A] =
114+
(underlying ? cp).toJava
115+
94116
override def shutdown(): Unit =
95117
underlying.shutdown()
96118
}

mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,31 @@ abstract class MqttSession {
5555
*/
5656
def ![A](cp: Command[A]): Unit
5757

58+
/**
59+
* Ask the session to perform a command regardless of the state it is
60+
* in. This is important for sending Publish messages in particular,
61+
* as a connection may not have been established with a session.
62+
* @param cp The command to perform
63+
* @tparam A The type of any carry for the command.
64+
* @return A future indicating when the command has completed. Completion
65+
* is defined as when it has been acknowledged by the recipient
66+
* endpoint.
67+
*/
68+
final def ask[A](cp: Command[A]): Future[A] =
69+
this ? cp
70+
71+
/**
72+
* Ask the session to perform a command regardless of the state it is
73+
* in. This is important for sending Publish messages in particular,
74+
* as a connection may not have been established with a session.
75+
* @param cp The command to perform
76+
* @tparam A The type of any carry for the command.
77+
* @return A future indicating when the command has completed. Completion
78+
* is defined as when it has been acknowledged by the recipient
79+
* endpoint.
80+
*/
81+
def ?[A](cp: Command[A]): Future[A]
82+
5883
/**
5984
* Shutdown the session gracefully
6085
*/
@@ -171,6 +196,9 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat:
171196
case c: Command[A] => throw new IllegalStateException(c + " is not a client command that can be sent directly")
172197
}
173198

199+
override def ?[A](cp: Command[A]): Future[A] =
200+
???
201+
174202
override def shutdown(): Unit = {
175203
system.stop(clientConnector.toClassic)
176204
system.stop(consumerPacketRouter.toClassic)
@@ -513,6 +541,9 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat:
513541
case c: Command[A] => throw new IllegalStateException(c + " is not a server command that can be sent directly")
514542
}
515543

544+
override def ?[A](cp: Command[A]): Future[A] =
545+
???
546+
516547
override def shutdown(): Unit = {
517548
system.stop(serverConnector.toClassic)
518549
system.stop(consumerPacketRouter.toClassic)

mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,20 @@ public Publish apply(DecodeErrorOrEvent<Object> x, boolean isCheck) {
129129
SourceQueueWithComplete<Command<Object>> commands = run.first();
130130
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
131131
commands.offer(new Command<>(new Subscribe(topic)));
132-
session.tell(
133-
new Command<>(
134-
new Publish(
135-
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
136-
topic,
137-
ByteString.fromString("ohi"))));
132+
CompletionStage<Done> publishDone =
133+
session.ask(
134+
new Command<>(
135+
new Publish(
136+
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
137+
topic,
138+
ByteString.fromString("ohi")),
139+
Done.getInstance()));
138140
// #run-streaming-flow
139141

140-
CompletionStage<Publish> event = run.second();
141-
Publish publishEvent = event.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
142+
publishDone.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
143+
144+
CompletionStage<Publish> events = run.second();
145+
Publish publishEvent = events.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
142146
assertEquals(publishEvent.topicName(), topic);
143147
assertEquals(publishEvent.payload(), ByteString.fromString("ohi"));
144148

mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,14 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit
7070

7171
commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
7272
commands.offer(Command(Subscribe(topic)))
73-
session ! Command(
74-
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi"))
75-
)
73+
val publishDone = session ? Command(
74+
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")),
75+
Done
76+
)
77+
7678
//#run-streaming-flow
7779

80+
publishDone.futureValue shouldBe Done
7881
events.futureValue match {
7982
case Publish(_, `topic`, _, bytes) => bytes shouldBe ByteString("ohi")
8083
case e => fail("Unexpected event: " + e)

0 commit comments

Comments
 (0)