Skip to content

Commit 5fd0f1c

Browse files
authored
Merge pull request #167 from eclipse-thingweb/discovery-interfaces
feat!: rework discoverer APIs
2 parents 669333a + 77e39d6 commit 5fd0f1c

File tree

16 files changed

+315
-202
lines changed

16 files changed

+315
-202
lines changed

lib/core.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
/// runtime used for consuming, exposing, and discovering Things.
1010
library core;
1111

12+
// TODO(JKRhb): Reorganize top-level core package into smaller packages.
1213
export "src/core/definitions.dart";
1314
export "src/core/exceptions.dart";
15+
export "src/core/extensions.dart";
1416
export "src/core/implementation.dart";
1517
export "src/core/protocol_interfaces.dart";
1618
export "src/core/scripting_api.dart";

lib/src/binding_coap/coap_client.dart

Lines changed: 33 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ coap.PskCredentialsCallback? _createPskCallback(
7373
}
7474

7575
/// A [ProtocolClient] for the Constrained Application Protocol (CoAP).
76-
final class CoapClient extends ProtocolClient {
76+
final class CoapClient extends ProtocolClient
77+
with DirectDiscoverer, MulticastDiscoverer, CoreLinkFormatDiscoverer {
7778
/// Creates a new [CoapClient] based on an optional [CoapConfig].
7879
CoapClient({
7980
CoapConfig? coapConfig,
@@ -446,67 +447,22 @@ final class CoapClient extends ProtocolClient {
446447
@override
447448
Future<void> stop() async {}
448449

449-
Stream<DiscoveryContent> _discoverFromMulticast(
450-
coap.CoapClient client,
451-
Uri uri,
452-
) async* {
453-
final streamController = StreamController<DiscoveryContent>();
454-
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
455-
(data) {
456-
streamController.add(data.determineDiscoveryContent(uri.scheme));
457-
},
458-
onError: streamController.addError,
459-
onDone: () async {
460-
await streamController.close();
461-
},
462-
);
463-
464-
final content = _sendDiscoveryRequest(
465-
uri,
466-
coap.RequestMethod.get,
467-
form: null,
468-
accept: coap.CoapMediaType.applicationTdJson,
469-
multicastResponseHandler: multicastResponseHandler,
470-
);
471-
unawaited(content);
472-
yield* streamController.stream;
473-
}
474-
475-
Stream<DiscoveryContent> _discoverFromUnicast(
476-
coap.CoapClient client,
477-
Uri uri,
478-
) async* {
479-
yield await _sendDiscoveryRequest(
480-
uri,
481-
coap.RequestMethod.get,
482-
form: null,
483-
accept: coap.CoapMediaType.applicationTdJson,
484-
);
485-
}
486-
487450
@override
488-
Stream<DiscoveryContent> discoverDirectly(
489-
Uri uri, {
490-
bool disableMulticast = false,
491-
}) async* {
492-
final client = coap.CoapClient(uri);
493-
494-
if (uri.isMulticastAddress) {
495-
if (!disableMulticast) {
496-
yield* _discoverFromMulticast(client, uri);
497-
}
498-
} else {
499-
yield* _discoverFromUnicast(client, uri);
500-
}
501-
}
451+
Future<DiscoveryContent> discoverDirectly(Uri uri) async =>
452+
_sendDiscoveryRequest(
453+
uri,
454+
coap.RequestMethod.get,
455+
form: null,
456+
accept: coap.CoapMediaType.applicationTdJson,
457+
);
502458

503459
@override
504460
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) async* {
505461
coap.CoapMulticastResponseHandler? multicastResponseHandler;
506462
final streamController = StreamController<DiscoveryContent>();
507463

508464
// TODO: Replace once https://github.com/shamblett/coap/pull/129 is merged
509-
if (uri.isMulticastAddress) {
465+
if (uri.hasMulticastAddress) {
510466
multicastResponseHandler = coap.CoapMulticastResponseHandler(
511467
(data) {
512468
streamController.add(data.determineDiscoveryContent(uri.scheme));
@@ -526,18 +482,34 @@ final class CoapClient extends ProtocolClient {
526482
multicastResponseHandler: multicastResponseHandler,
527483
);
528484

529-
if (uri.isMulticastAddress) {
485+
if (uri.hasMulticastAddress) {
530486
yield* streamController.stream;
531487
} else {
532488
yield content;
533489
}
534490
}
535491

536492
@override
537-
Future<Content> requestThingDescription(Uri url) async => _sendRequest(
538-
url,
539-
coap.RequestMethod.get,
540-
form: null,
541-
accept: coap.CoapMediaType.applicationTdJson,
542-
);
493+
Stream<Content> discoverViaMulticast(Uri uri) async* {
494+
final streamController = StreamController<DiscoveryContent>();
495+
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
496+
(data) {
497+
streamController.add(data.determineDiscoveryContent(uri.scheme));
498+
},
499+
onError: streamController.addError,
500+
onDone: () async {
501+
await streamController.close();
502+
},
503+
);
504+
505+
final content = _sendDiscoveryRequest(
506+
uri,
507+
coap.RequestMethod.get,
508+
form: null,
509+
accept: coap.CoapMediaType.applicationTdJson,
510+
multicastResponseHandler: multicastResponseHandler,
511+
);
512+
unawaited(content);
513+
yield* streamController.stream;
514+
}
543515
}

lib/src/binding_coap/coap_extensions.dart

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
//
55
// SPDX-License-Identifier: BSD-3-Clause
66

7-
import "dart:io";
87
import "dart:typed_data";
98

109
import "package:cbor/cbor.dart";
@@ -16,15 +15,6 @@ import "../../core.dart" hide PskCredentials;
1615
import "coap_binding_exception.dart";
1716
import "coap_definitions.dart";
1817

19-
/// Extension which makes it easier to handle [Uri]s containing
20-
/// [InternetAddress]es.
21-
extension InternetAddressMethods on Uri {
22-
/// Checks whether the host of this [Uri] is a multicast [InternetAddress].
23-
bool get isMulticastAddress {
24-
return InternetAddress.tryParse(host)?.isMulticast ?? false;
25-
}
26-
}
27-
2818
/// CoAP-specific extensions for the [AugmentedForm] class.
2919
extension CoapFormExtension on AugmentedForm {
3020
T? _obtainVocabularyTerm<T>(String vocabularyTerm) {

lib/src/binding_http/http_client.dart

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ const _authorizationHeader = "Authorization";
3636
/// [RFC 7616]: https://datatracker.ietf.org/doc/html/rfc7616
3737
/// [RFC 6750]: https://datatracker.ietf.org/doc/html/rfc6750
3838
/// [`ComboSecurityScheme`]: https://w3c.github.io/wot-thing-description/#combosecurityscheme
39-
final class HttpClient extends ProtocolClient {
39+
final class HttpClient extends ProtocolClient
40+
with DirectDiscoverer, CoreLinkFormatDiscoverer {
4041
/// Creates a new [HttpClient].
4142
HttpClient({
4243
AsyncClientSecurityCallback<BasicCredentials>? basicCredentialsCallback,
@@ -304,13 +305,13 @@ final class HttpClient extends ProtocolClient {
304305
}
305306

306307
@override
307-
Stream<DiscoveryContent> discoverDirectly(
308+
Future<DiscoveryContent> discoverDirectly(
308309
Uri uri, {
309310
bool disableMulticast = false,
310-
}) async* {
311+
}) async {
311312
final request = Request(HttpRequestMethod.get.methodName, uri);
312313

313-
yield await _sendDiscoveryRequest(
314+
return _sendDiscoveryRequest(
314315
request,
315316
acceptHeaderValue: "application/td+json",
316317
);
@@ -327,18 +328,4 @@ final class HttpClient extends ProtocolClient {
327328

328329
yield encodedLinks;
329330
}
330-
331-
@override
332-
Future<Content> requestThingDescription(Uri url) async {
333-
final request = Request(HttpRequestMethod.get.methodName, url);
334-
const tdContentType = "application/td+json";
335-
request.headers["Accept"] = tdContentType;
336-
337-
final response = await _client.send(request);
338-
339-
return Content(
340-
response.headers["Content-Type"] ?? tdContentType,
341-
response.stream,
342-
);
343-
}
344331
}

lib/src/binding_mqtt/constants.dart

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ const mqttContextUri = "http://www.example.org/mqtt-binding#";
3232
/// The default prefix used in MQTT-related compact URIs (CURIEs) in TDs.
3333
const defaultMqttPrefix = "mqv";
3434

35-
/// Default timeout length used for reading properties and discovering TDs.
36-
const defaultTimeout = Duration(seconds: 10);
35+
/// Default timeout length used for reading properties.
36+
const defaultReadTimeout = Duration(seconds: 10);
3737

3838
/// Default duration MQTT connections are kept alive in seconds.
3939
const defaultKeepAlivePeriod = 20;
@@ -43,9 +43,3 @@ const defaultKeepAlivePeriod = 20;
4343
///
4444
/// Evaluates to `'application/octet-stream'.
4545
const defaultContentType = "application/octet-stream";
46-
47-
/// Content type used for the Content objects returned by discovery using MQTT.
48-
///
49-
/// Evaluates to `application/td+json`.
50-
// TODO: Should probably be redefined globally
51-
const discoveryContentType = "application/td+json";

lib/src/binding_mqtt/mqtt_client.dart

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import "mqtt_subscription.dart";
2121
/// [ProtocolClient] for supporting the MQTT protocol.
2222
///
2323
/// Currently, only MQTT version 3.1.1 is supported.
24-
final class MqttClient extends ProtocolClient {
24+
final class MqttClient extends ProtocolClient with MqttDiscoverer {
2525
/// Constructor.
2626
MqttClient({
2727
MqttConfig? mqttConfig,
@@ -199,22 +199,13 @@ final class MqttClient extends ProtocolClient {
199199
}
200200

201201
@override
202-
Stream<DiscoveryContent> discoverDirectly(
203-
Uri uri, {
204-
bool disableMulticast = false,
202+
Stream<Content> performMqttDiscovery(
203+
Uri brokerUri, {
204+
required String discoveryTopic,
205+
required String expectedContentType,
206+
required Duration discoveryTimeout,
205207
}) async* {
206-
final client = await _connect(uri, null);
207-
const discoveryTopic = "wot/td/#";
208-
209-
final streamController = StreamController<DiscoveryContent>();
210-
211-
Timer(
212-
_mqttConfig.discoveryTimeout,
213-
() async {
214-
client.disconnect();
215-
await streamController.close();
216-
},
217-
);
208+
final client = await _connect(brokerUri, null);
218209

219210
// TODO: Revisit QoS value and subscription check
220211
if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) {
@@ -223,36 +214,29 @@ final class MqttClient extends ProtocolClient {
223214
);
224215
}
225216

226-
client.updates?.listen(
227-
(messages) {
228-
for (final message in messages) {
229-
final publishedMessage = message.payload as MqttPublishMessage;
230-
final payload = publishedMessage.payload.message;
231-
232-
streamController.add(
233-
DiscoveryContent(
234-
discoveryContentType,
235-
Stream.value(payload),
236-
uri,
237-
),
238-
);
239-
}
217+
final receivedMessageStream = client.updates;
218+
if (receivedMessageStream == null) {
219+
throw MqttBindingException(
220+
"Subscription to topic $discoveryTopic failed",
221+
);
222+
}
223+
224+
Timer(
225+
discoveryTimeout,
226+
() async {
227+
client.disconnect();
240228
},
241-
cancelOnError: false,
242229
);
243230

244-
yield* streamController.stream;
245-
}
246-
247-
@override
248-
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) {
249-
// TODO: implement discoverWithCoreLinkFormat
250-
throw UnimplementedError();
251-
}
231+
await for (final receivedMessageList in receivedMessageStream) {
232+
for (final receivedMessage in receivedMessageList) {
233+
final mqttMessage = receivedMessage.payload;
234+
if (mqttMessage is MqttPublishMessage) {
235+
final messagePayload = mqttMessage.payload.message;
252236

253-
@override
254-
Future<Content> requestThingDescription(Uri url) {
255-
// TODO: implement requestThingDescription
256-
throw UnimplementedError();
237+
yield Content(expectedContentType, Stream.value(messagePayload));
238+
}
239+
}
240+
}
257241
}
258242
}

lib/src/binding_mqtt/mqtt_config.dart

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@ import "constants.dart";
1414
/// The default [QoS] values for the different operation types will be used if
1515
/// no Quality of Service is defined in the respective form.
1616
///
17-
/// If no [readTimeout] or [discoveryTimeout] is defined, a [defaultTimeout] of
18-
/// 10 seconds will be used. Furthermore, the [keepAlivePeriod] defaults to 20
19-
/// seconds.
17+
/// If no [readTimeout] is defined, a [defaultReadTimeout] of
18+
/// 10 seconds will be used.
19+
/// Furthermore, the [keepAlivePeriod] defaults to a [defaultKeepAlivePeriod] of
20+
/// 20 seconds.
2021
class MqttConfig {
2122
/// Creates a new [MqttConfig] object.
2223
MqttConfig({
2324
this.defaultReadQoS = QoS.atMostOnce,
2425
this.defaultWriteQoS = QoS.atMostOnce,
2526
this.defaultActionQoS = QoS.atMostOnce,
2627
this.defaultSubscribeQoS = QoS.atLeastOnce,
27-
this.readTimeout = defaultTimeout,
28-
this.discoveryTimeout = defaultTimeout,
28+
this.readTimeout = defaultReadTimeout,
2929
this.keepAlivePeriod = defaultKeepAlivePeriod,
3030
});
3131

@@ -50,11 +50,6 @@ class MqttConfig {
5050
/// If no value has been read until the timeout has expired, the operation
5151
/// will be canceled.
5252
final Duration readTimeout;
53-
54-
/// Timeout value used for discovery using MQTT.
55-
///
56-
/// The discovery process will be aborted once the timeout has expired.
57-
final Duration discoveryTimeout;
5853
}
5954

6055
/// Enum for indicating the default Quality of Service (QoS) that should be used

lib/src/core/extensions.dart

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
//
5+
// SPDX-License-Identifier: BSD-3-Clause
6+
7+
/// Sub-library for extensions used by `dart_wot`.
8+
library extensions;
9+
10+
export "extensions/uri_extensions.dart";
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
//
5+
// SPDX-License-Identifier: BSD-3-Clause
6+
7+
import "dart:io";
8+
9+
/// Extension that makes it easier to handle [Uri]s which potentially contain
10+
/// [InternetAddress]es.
11+
extension InternetAddressMethodExtension on Uri {
12+
/// Checks whether the host of this [Uri] is a multicast [InternetAddress].
13+
bool get hasMulticastAddress {
14+
return InternetAddress.tryParse(host)?.isMulticast ?? false;
15+
}
16+
}

0 commit comments

Comments
 (0)