Skip to content

Commit fc3cc5c

Browse files
committed
feat: introduce new MQTT discovery method
1 parent 44bb791 commit fc3cc5c

File tree

4 files changed

+141
-1
lines changed

4 files changed

+141
-1
lines changed

lib/src/binding_mqtt/mqtt_client.dart

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import "mqtt_subscription.dart";
2121
/// [ProtocolClient] for supporting the MQTT protocol.
2222
///
2323
/// Currently, only MQTT version 3.1.1 is supported.
24-
final class MqttClient extends ProtocolClient {
24+
final class MqttClient extends ProtocolClient with MqttDiscoverer {
2525
/// Constructor.
2626
MqttClient({
2727
MqttConfig? mqttConfig,
@@ -197,4 +197,46 @@ final class MqttClient extends ProtocolClient {
197197

198198
return MqttSubscription(form, client, complete, next: next, error: error);
199199
}
200+
201+
@override
202+
Stream<Content> performMqttDiscovery(
203+
Uri brokerUri, {
204+
required String discoveryTopic,
205+
required String expectedContentType,
206+
required Duration discoveryTimeout,
207+
}) async* {
208+
final client = await _connect(brokerUri, null);
209+
210+
// TODO: Revisit QoS value and subscription check
211+
if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) {
212+
throw MqttBindingException(
213+
"Subscription to topic $discoveryTopic failed",
214+
);
215+
}
216+
217+
final receivedMessageStream = client.updates;
218+
if (receivedMessageStream == null) {
219+
throw MqttBindingException(
220+
"Subscription to topic $discoveryTopic failed",
221+
);
222+
}
223+
224+
Timer(
225+
discoveryTimeout,
226+
() async {
227+
client.disconnect();
228+
},
229+
);
230+
231+
await for (final receivedMessageList in receivedMessageStream) {
232+
for (final receivedMessage in receivedMessageList) {
233+
final mqttMessage = receivedMessage.payload;
234+
if (mqttMessage is MqttPublishMessage) {
235+
final messagePayload = mqttMessage.payload.message;
236+
237+
yield Content(expectedContentType, Stream.value(messagePayload));
238+
}
239+
}
240+
}
241+
}
200242
}

lib/src/core/implementation/discovery/discovery_configuration.dart

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,45 @@ final class ExploreDirectoryConfiguration extends DiscoveryConfiguration {
138138
final int? limit;
139139
}
140140

141+
/// Experimental [DiscoveryConfiguration] that is used to perform discovery with
142+
/// the MQTT protocol.
143+
@experimental
144+
final class MqttDiscoveryConfiguration extends DiscoveryConfiguration {
145+
/// Instantiates a new [DiscoveryConfiguration] for MQTT.
146+
const MqttDiscoveryConfiguration(
147+
this.brokerUri, {
148+
this.discoveryTopic = "wot/td/#",
149+
this.expectedContentType = "application/td+json",
150+
this.discoveryTimeout = const Duration(seconds: 5),
151+
});
152+
153+
/// [Uri] of the broker the
154+
final Uri brokerUri;
155+
156+
/// The topic that will be used for performing the discovery process.
157+
///
158+
/// If a wildcard topic is used, then the discovery process may return more
159+
/// than one TD.
160+
///
161+
/// Defaults to `wot/td/#`.
162+
final String discoveryTopic;
163+
164+
/// The Thing Description content type that is expected during the discovery
165+
/// process.
166+
///
167+
/// Data that is received during the discovery process that is not
168+
/// deserializable using the content type provided here will be ignored.
169+
///
170+
/// Defaults to `application/td+json`.
171+
final String expectedContentType;
172+
173+
/// Time period after which the MQTT discovery process is going to be
174+
/// cancelled.
175+
///
176+
/// Defaults to five seconds.
177+
final Duration discoveryTimeout;
178+
}
179+
141180
/// Base class for configuring discovery mechanisms that involve a two-step
142181
/// approach.
143182
///

lib/src/core/implementation/thing_discovery.dart

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,18 @@ class ThingDiscovery extends Stream<ThingDescription>
7575
thingFilter: thingFilter,
7676
);
7777
yield* thingDiscoveryProcess;
78+
case MqttDiscoveryConfiguration(
79+
:final brokerUri,
80+
:final discoveryTopic,
81+
:final expectedContentType,
82+
:final discoveryTimeout,
83+
):
84+
yield* _performMqttDiscovery(
85+
brokerUri,
86+
discoveryTopic,
87+
expectedContentType,
88+
discoveryTimeout,
89+
);
7890
}
7991
}
8092
}
@@ -250,6 +262,34 @@ class ThingDiscovery extends Stream<ThingDescription>
250262
);
251263
}
252264

265+
Stream<ThingDescription> _performMqttDiscovery(
266+
Uri brokerUri,
267+
String discoveryTopic,
268+
String expectedContentType,
269+
Duration discoveryTimeout,
270+
) async* {
271+
final uriScheme = brokerUri.scheme;
272+
final client = _clientForUriScheme(uriScheme);
273+
274+
if (client is! MqttDiscoverer) {
275+
yield* Stream.error(
276+
DiscoveryException(
277+
"Client for URI scheme $uriScheme does not support MQTT Discovery.",
278+
),
279+
);
280+
return;
281+
}
282+
283+
final contentStream = client.performMqttDiscovery(
284+
brokerUri,
285+
discoveryTopic: discoveryTopic,
286+
expectedContentType: expectedContentType,
287+
discoveryTimeout: discoveryTimeout,
288+
);
289+
290+
yield* _transformContentStreamToThingDescriptions(contentStream);
291+
}
292+
253293
@override
254294
Future<void> stop() async {
255295
final stopFutures = _clients.values.map((client) => client.stop());

lib/src/core/protocol_interfaces/protocol_discoverer.dart

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,22 @@ base mixin CoreLinkFormatDiscoverer on ProtocolClient {
4545
@experimental
4646
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri);
4747
}
48+
49+
/// Interface for performing experimental discovery using the MQTT protocol.
50+
@experimental
51+
base mixin MqttDiscoverer on ProtocolClient {
52+
/// Performs discovery of Thing Descriptions using the MQTT protocol via the
53+
/// given [brokerUri].
54+
///
55+
/// By default, the [discoveryTopic] `wot/td/#` will be used as discussed in
56+
/// [this issue].
57+
///
58+
/// [this issue]: https://github.com/w3c/wot-discovery/issues/134
59+
@experimental
60+
Stream<Content> performMqttDiscovery(
61+
Uri brokerUri, {
62+
required String discoveryTopic,
63+
required String expectedContentType,
64+
required Duration discoveryTimeout,
65+
});
66+
}

0 commit comments

Comments
 (0)