Skip to content

Commit 44bb791

Browse files
committed
feat!: rework discoverer APIs
1 parent 405757d commit 44bb791

File tree

9 files changed

+153
-187
lines changed

9 files changed

+153
-187
lines changed

lib/src/binding_coap/coap_client.dart

Lines changed: 31 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ coap.PskCredentialsCallback? _createPskCallback(
7373
}
7474

7575
/// A [ProtocolClient] for the Constrained Application Protocol (CoAP).
76-
final class CoapClient extends ProtocolClient {
76+
final class CoapClient extends ProtocolClient
77+
with DirectDiscoverer, MulticastDiscoverer, CoreLinkFormatDiscoverer {
7778
/// Creates a new [CoapClient] based on an optional [CoapConfig].
7879
CoapClient({
7980
CoapConfig? coapConfig,
@@ -446,59 +447,14 @@ final class CoapClient extends ProtocolClient {
446447
@override
447448
Future<void> stop() async {}
448449

449-
Stream<DiscoveryContent> _discoverFromMulticast(
450-
coap.CoapClient client,
451-
Uri uri,
452-
) async* {
453-
final streamController = StreamController<DiscoveryContent>();
454-
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
455-
(data) {
456-
streamController.add(data.determineDiscoveryContent(uri.scheme));
457-
},
458-
onError: streamController.addError,
459-
onDone: () async {
460-
await streamController.close();
461-
},
462-
);
463-
464-
final content = _sendDiscoveryRequest(
465-
uri,
466-
coap.RequestMethod.get,
467-
form: null,
468-
accept: coap.CoapMediaType.applicationTdJson,
469-
multicastResponseHandler: multicastResponseHandler,
470-
);
471-
unawaited(content);
472-
yield* streamController.stream;
473-
}
474-
475-
Stream<DiscoveryContent> _discoverFromUnicast(
476-
coap.CoapClient client,
477-
Uri uri,
478-
) async* {
479-
yield await _sendDiscoveryRequest(
480-
uri,
481-
coap.RequestMethod.get,
482-
form: null,
483-
accept: coap.CoapMediaType.applicationTdJson,
484-
);
485-
}
486-
487450
@override
488-
Stream<DiscoveryContent> discoverDirectly(
489-
Uri uri, {
490-
bool disableMulticast = false,
491-
}) async* {
492-
final client = coap.CoapClient(uri);
493-
494-
if (uri.isMulticastAddress) {
495-
if (!disableMulticast) {
496-
yield* _discoverFromMulticast(client, uri);
497-
}
498-
} else {
499-
yield* _discoverFromUnicast(client, uri);
500-
}
501-
}
451+
Future<DiscoveryContent> discoverDirectly(Uri uri) async =>
452+
_sendDiscoveryRequest(
453+
uri,
454+
coap.RequestMethod.get,
455+
form: null,
456+
accept: coap.CoapMediaType.applicationTdJson,
457+
);
502458

503459
@override
504460
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) async* {
@@ -534,10 +490,26 @@ final class CoapClient extends ProtocolClient {
534490
}
535491

536492
@override
537-
Future<Content> requestThingDescription(Uri url) async => _sendRequest(
538-
url,
539-
coap.RequestMethod.get,
540-
form: null,
541-
accept: coap.CoapMediaType.applicationTdJson,
542-
);
493+
Stream<Content> discoverViaMulticast(Uri uri) async* {
494+
final streamController = StreamController<DiscoveryContent>();
495+
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
496+
(data) {
497+
streamController.add(data.determineDiscoveryContent(uri.scheme));
498+
},
499+
onError: streamController.addError,
500+
onDone: () async {
501+
await streamController.close();
502+
},
503+
);
504+
505+
final content = _sendDiscoveryRequest(
506+
uri,
507+
coap.RequestMethod.get,
508+
form: null,
509+
accept: coap.CoapMediaType.applicationTdJson,
510+
multicastResponseHandler: multicastResponseHandler,
511+
);
512+
unawaited(content);
513+
yield* streamController.stream;
514+
}
543515
}

lib/src/binding_http/http_client.dart

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ const _authorizationHeader = "Authorization";
3636
/// [RFC 7616]: https://datatracker.ietf.org/doc/html/rfc7616
3737
/// [RFC 6750]: https://datatracker.ietf.org/doc/html/rfc6750
3838
/// [`ComboSecurityScheme`]: https://w3c.github.io/wot-thing-description/#combosecurityscheme
39-
final class HttpClient extends ProtocolClient {
39+
final class HttpClient extends ProtocolClient
40+
with DirectDiscoverer, CoreLinkFormatDiscoverer {
4041
/// Creates a new [HttpClient].
4142
HttpClient({
4243
AsyncClientSecurityCallback<BasicCredentials>? basicCredentialsCallback,
@@ -304,13 +305,13 @@ final class HttpClient extends ProtocolClient {
304305
}
305306

306307
@override
307-
Stream<DiscoveryContent> discoverDirectly(
308+
Future<DiscoveryContent> discoverDirectly(
308309
Uri uri, {
309310
bool disableMulticast = false,
310-
}) async* {
311+
}) async {
311312
final request = Request(HttpRequestMethod.get.methodName, uri);
312313

313-
yield await _sendDiscoveryRequest(
314+
return _sendDiscoveryRequest(
314315
request,
315316
acceptHeaderValue: "application/td+json",
316317
);
@@ -327,18 +328,4 @@ final class HttpClient extends ProtocolClient {
327328

328329
yield encodedLinks;
329330
}
330-
331-
@override
332-
Future<Content> requestThingDescription(Uri url) async {
333-
final request = Request(HttpRequestMethod.get.methodName, url);
334-
const tdContentType = "application/td+json";
335-
request.headers["Accept"] = tdContentType;
336-
337-
final response = await _client.send(request);
338-
339-
return Content(
340-
response.headers["Content-Type"] ?? tdContentType,
341-
response.stream,
342-
);
343-
}
344331
}

lib/src/binding_mqtt/mqtt_client.dart

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -197,62 +197,4 @@ final class MqttClient extends ProtocolClient {
197197

198198
return MqttSubscription(form, client, complete, next: next, error: error);
199199
}
200-
201-
@override
202-
Stream<DiscoveryContent> discoverDirectly(
203-
Uri uri, {
204-
bool disableMulticast = false,
205-
}) async* {
206-
final client = await _connect(uri, null);
207-
const discoveryTopic = "wot/td/#";
208-
209-
final streamController = StreamController<DiscoveryContent>();
210-
211-
Timer(
212-
_mqttConfig.discoveryTimeout,
213-
() async {
214-
client.disconnect();
215-
await streamController.close();
216-
},
217-
);
218-
219-
// TODO: Revisit QoS value and subscription check
220-
if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) {
221-
throw MqttBindingException(
222-
"Subscription to topic $discoveryTopic failed",
223-
);
224-
}
225-
226-
client.updates?.listen(
227-
(messages) {
228-
for (final message in messages) {
229-
final publishedMessage = message.payload as MqttPublishMessage;
230-
final payload = publishedMessage.payload.message;
231-
232-
streamController.add(
233-
DiscoveryContent(
234-
discoveryContentType,
235-
Stream.value(payload),
236-
uri,
237-
),
238-
);
239-
}
240-
},
241-
cancelOnError: false,
242-
);
243-
244-
yield* streamController.stream;
245-
}
246-
247-
@override
248-
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) {
249-
// TODO: implement discoverWithCoreLinkFormat
250-
throw UnimplementedError();
251-
}
252-
253-
@override
254-
Future<Content> requestThingDescription(Uri url) {
255-
// TODO: implement requestThingDescription
256-
throw UnimplementedError();
257-
}
258200
}

lib/src/core/implementation/servient.dart

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,16 @@ class InternalServient implements Servient {
280280

281281
/// Requests a [ThingDescription] from a [url].
282282
Future<ThingDescription> requestThingDescription(Uri url) async {
283-
final client = clientFor(url.scheme);
284-
final content = await client.requestThingDescription(url);
283+
final uriScheme = url.scheme;
284+
final client = clientFor(uriScheme);
285285

286+
if (client is! DirectDiscoverer) {
287+
throw DiscoveryException(
288+
"Client with URI scheme $uriScheme does not support direct discovery.",
289+
);
290+
}
291+
292+
final content = await client.discoverDirectly(url);
286293
final dataSchemaValue = await contentSerdes.contentToValue(content, null);
287294

288295
if (dataSchemaValue

lib/src/core/implementation/thing_discovery.dart

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import "package:multicast_dns/multicast_dns.dart";
1212

1313
import "../definitions.dart";
1414
import "../exceptions.dart";
15+
import "../extensions.dart";
1516
import "../protocol_interfaces.dart";
1617
import "../scripting_api.dart" as scripting_api;
1718

@@ -63,7 +64,11 @@ class ThingDiscovery extends Stream<ThingDescription>
6364
):
6465
yield* _discoverFromCoreResourceDirectory(uri, discoveryType);
6566
case DirectConfiguration(:final uri):
66-
yield* Stream.fromFuture(_servient.requestThingDescription(uri));
67+
if (!uri.hasMulticastAddress) {
68+
yield* Stream.fromFuture(_servient.requestThingDescription(uri));
69+
} else {
70+
yield* _performMulticastDiscovery(uri);
71+
}
6772
case ExploreDirectoryConfiguration(:final uri, :final thingFilter):
6873
final thingDiscoveryProcess = await _servient.exploreDirectory(
6974
uri,
@@ -190,7 +195,18 @@ class ThingDiscovery extends Stream<ThingDescription>
190195
Uri uri,
191196
String resourceType,
192197
) async* {
193-
final client = _clientForUriScheme(uri.scheme);
198+
final uriScheme = uri.scheme;
199+
final client = _clientForUriScheme(uriScheme);
200+
201+
if (client is! CoreLinkFormatDiscoverer) {
202+
yield* Stream.error(
203+
DiscoveryException(
204+
"Client for URI scheme $uriScheme does not support Core Link Format "
205+
"Discovery.",
206+
),
207+
);
208+
return;
209+
}
194210

195211
await for (final coreWebLink in client.discoverWithCoreLinkFormat(uri)) {
196212
try {
@@ -251,7 +267,7 @@ class ThingDiscovery extends Stream<ThingDescription>
251267

252268
if (dataSchemaValue is! scripting_api.DataSchemaValue<String>) {
253269
throw DiscoveryException(
254-
"Could not parse Thing Description obtained from $sourceUri",
270+
"Could not parse CoRE web links obtained from $sourceUri",
255271
);
256272
}
257273

@@ -319,6 +335,45 @@ class ThingDiscovery extends Stream<ThingDescription>
319335

320336
return Map.fromEntries(recordsList);
321337
}
338+
339+
Stream<ThingDescription> _performMulticastDiscovery(Uri uri) async* {
340+
final client = _clientForUriScheme(uri.scheme);
341+
342+
if (client is MulticastDiscoverer) {
343+
final contentStream = client.discoverViaMulticast(uri);
344+
yield* _transformContentStreamToThingDescriptions(contentStream);
345+
}
346+
}
347+
348+
Stream<ThingDescription> _transformContentStreamToThingDescriptions(
349+
Stream<Content> contentStream,
350+
) async* {
351+
await for (final content in contentStream) {
352+
try {
353+
final thingDescription =
354+
await _convertContentToThingDescription(content);
355+
yield thingDescription;
356+
} on Exception catch (exception) {
357+
yield* Stream.error(exception);
358+
}
359+
}
360+
}
361+
362+
Future<ThingDescription> _convertContentToThingDescription(
363+
Content content,
364+
) async {
365+
final dataSchemaValue =
366+
await _servient.contentSerdes.contentToValue(content, null);
367+
368+
if (dataSchemaValue is scripting_api.ObjectValue) {
369+
return dataSchemaValue.value.toThingDescription();
370+
}
371+
372+
throw ValidationException(
373+
"Encountered wrong datatype ${dataSchemaValue.runtimeType} that cannot "
374+
"be processed as a Thing Description.",
375+
);
376+
}
322377
}
323378

324379
extension _CoreLinkFormatExtension on String {

lib/src/core/protocol_interfaces.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66

77
export "protocol_interfaces/protocol_client.dart";
88
export "protocol_interfaces/protocol_client_factory.dart";
9+
export "protocol_interfaces/protocol_discoverer.dart";
910
export "protocol_interfaces/protocol_server.dart";
1011
export "protocol_interfaces/protocol_subscription.dart";

lib/src/core/protocol_interfaces/protocol_client.dart

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,6 @@ abstract base class ProtocolClient {
1515
/// Stops this [ProtocolClient].
1616
Future<void> stop();
1717

18-
/// Discovers one or more Thing Descriptions from a [uri], returning a
19-
/// [Stream] of [Content].
20-
///
21-
/// Allows the caller to explicitly [disableMulticast], overriding the
22-
/// multicast settings in the config of the underlying binding implementation.
23-
Stream<DiscoveryContent> discoverDirectly(
24-
Uri uri, {
25-
bool disableMulticast = false,
26-
});
27-
28-
/// Discovers links using the CoRE Link Format (see [RFC 6690]) from a [uri],
29-
/// encoded as a [Stream] of [Content].
30-
///
31-
/// This method will also be used for discovery from CoRE Resource
32-
/// Directories ([RFC 9176]).
33-
///
34-
/// If the [uri]'s path is empty, then `/.well-known/core` will be set as a
35-
/// default value.
36-
///
37-
/// Certain protocols (like CoAP) might also use multicast for this discovery
38-
/// method if the underlying binding implementation supports it and if it is
39-
/// activated in the config.
40-
///
41-
/// [RFC 6690]: https://datatracker.ietf.org/doc/html/rfc6690
42-
/// [RFC 9176]: https://datatracker.ietf.org/doc/html/rfc9176
43-
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri);
44-
4518
/// Requests the client to perform a `readproperty` operation on a [form].
4619
Future<Content> readResource(AugmentedForm form);
4720

@@ -61,7 +34,4 @@ abstract base class ProtocolClient {
6134
void Function(Exception error)? error,
6235
required void Function() complete,
6336
});
64-
65-
/// Requests a Thing Description as [Content] from a [url].
66-
Future<Content> requestThingDescription(Uri url);
6737
}

0 commit comments

Comments
 (0)