Skip to content

Commit 87003f9

Browse files
authored
Merge pull request #84 from eclipse-thingweb/explore-directory
feat: implement exploreDirectory method
2 parents 836ab1c + e921b65 commit 87003f9

File tree

6 files changed

+457
-45
lines changed

6 files changed

+457
-45
lines changed

lib/src/core/thing_discovery.dart

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,3 +375,72 @@ extension _FlatStreamExtension<T> on Stream<Stream<T>> {
375375
}
376376
}
377377
}
378+
379+
/// Implemention of the [scripting_api.ThingDiscoveryProcess] interface.
380+
class ThingDiscoveryProcess extends Stream<ThingDescription>
381+
implements scripting_api.ThingDiscoveryProcess {
382+
/// Constructs a new [ThingDiscoveryProcess].
383+
///
384+
/// Accepts a [_thingDescriptionStream], which is filtered by an optional
385+
/// [thingFilter].
386+
ThingDiscoveryProcess(
387+
this._thingDescriptionStream,
388+
this.thingFilter,
389+
);
390+
391+
StreamSubscription<ThingDescription>? _streamSubscription;
392+
393+
final Stream<ThingDescription> _thingDescriptionStream;
394+
395+
var _done = false;
396+
397+
@override
398+
bool get done => _done;
399+
400+
Exception? _error;
401+
402+
@override
403+
Exception? get error => _error;
404+
405+
@override
406+
final scripting_api.ThingFilter? thingFilter;
407+
408+
@override
409+
StreamSubscription<ThingDescription> listen(
410+
void Function(ThingDescription event)? onData, {
411+
Function? onError,
412+
void Function()? onDone,
413+
bool? cancelOnError,
414+
}) {
415+
final streamSubscription = _thingDescriptionStream.listen(
416+
onData,
417+
onError: (error, stackTrace) {
418+
if (error is Exception) {
419+
_error = error;
420+
// ignore: avoid_dynamic_calls
421+
onError?.call(error, stackTrace);
422+
}
423+
},
424+
onDone: () {
425+
_done = true;
426+
onDone?.call();
427+
},
428+
cancelOnError: cancelOnError,
429+
);
430+
431+
_streamSubscription = streamSubscription;
432+
433+
return streamSubscription;
434+
}
435+
436+
@override
437+
Future<void> stop() async {
438+
if (done) {
439+
return;
440+
}
441+
442+
await _streamSubscription?.cancel();
443+
444+
_done = true;
445+
}
446+
}

lib/src/core/wot.dart

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
//
55
// SPDX-License-Identifier: BSD-3-Clause
66

7+
import 'dart:async';
8+
79
import '../../scripting_api.dart' as scripting_api;
810
import '../definitions/thing_description.dart';
911
import '../scripting_api/discovery/discovery_method.dart';
1012
import 'consumed_thing.dart';
1113
import 'exposed_thing.dart';
1214
import 'servient.dart';
13-
import 'thing_discovery.dart' show ThingDiscovery;
15+
import 'thing_discovery.dart'
16+
show DiscoveryException, ThingDiscovery, ThingDiscoveryProcess;
1417

1518
/// This [Exception] is thrown if an error during the consumption of a
1619
/// [ThingDescription] occurs.
@@ -95,4 +98,93 @@ class WoT implements scripting_api.WoT {
9598
Future<ThingDescription> requestThingDescription(Uri url) {
9699
return _servient.requestThingDescription(url);
97100
}
101+
102+
@override
103+
Future<scripting_api.ThingDiscoveryProcess> exploreDirectory(
104+
Uri url, [
105+
scripting_api.ThingFilter? filter,
106+
]) async {
107+
final thingDescription = await requestThingDescription(url);
108+
109+
if (!thingDescription.isValidDirectoryThingDescription) {
110+
throw DiscoveryException(
111+
'Encountered an invalid Directory Thing Description',
112+
);
113+
}
114+
115+
final consumedDirectoryThing = await consume(thingDescription);
116+
117+
final interactionOutput =
118+
await consumedDirectoryThing.readProperty('things');
119+
final rawThingDescriptions = await interactionOutput.value();
120+
121+
if (rawThingDescriptions is! List<dynamic>) {
122+
throw DiscoveryException(
123+
'Expected an array of Thing Descriptions but received an '
124+
'invalid output instead.',
125+
);
126+
}
127+
128+
final thingDescriptionStream = Stream.fromIterable(
129+
rawThingDescriptions.whereType<Map<String, dynamic>>(),
130+
).toThingDescriptionStream();
131+
132+
return ThingDiscoveryProcess(thingDescriptionStream, filter);
133+
}
134+
}
135+
136+
extension _DirectoryValidationExtension on ThingDescription {
137+
bool get isValidDirectoryThingDescription {
138+
final atTypes = atType;
139+
140+
if (atTypes == null) {
141+
return false;
142+
}
143+
144+
const discoveryContextUri = 'https://www.w3.org/2022/wot/discovery';
145+
const type = 'ThingDirectory';
146+
const fullIri = '$discoveryContextUri#$type';
147+
148+
if (atTypes.contains(fullIri)) {
149+
return true;
150+
}
151+
152+
return context.contains((value: discoveryContextUri, key: null)) &&
153+
atTypes.contains(type);
154+
}
155+
}
156+
157+
extension _DirectoryTdDeserializationExtension on Stream<Map<String, dynamic>> {
158+
Stream<ThingDescription> toThingDescriptionStream() {
159+
const streamTransformer = StreamTransformer(_transformerMethod);
160+
161+
return transform(streamTransformer);
162+
}
163+
164+
static StreamSubscription<ThingDescription> _transformerMethod(
165+
Stream<Map<String, dynamic>> rawThingDescriptionStream,
166+
bool cancelOnError,
167+
) {
168+
final streamController = StreamController<ThingDescription>();
169+
170+
final streamSubscription = rawThingDescriptionStream.listen(
171+
(rawThingDescription) {
172+
try {
173+
streamController.add(ThingDescription.fromJson(rawThingDescription));
174+
} on Exception catch (exception) {
175+
streamController.addError(exception);
176+
}
177+
},
178+
onDone: streamController.close,
179+
onError: streamController.addError,
180+
cancelOnError: cancelOnError,
181+
);
182+
183+
streamController
184+
..onPause = streamSubscription.pause
185+
..onResume = streamSubscription.resume
186+
..onCancel = streamSubscription.cancel;
187+
188+
return streamController.stream.listen(null);
189+
}
98190
}

lib/src/definitions/thing_description.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class ThingDescription {
175175
final Set<String> parsedFields = {};
176176

177177
context.addAll(json.parseContext(prefixMapping, parsedFields));
178+
atType = json.parseArrayField('@type', parsedFields);
178179
title = json.parseRequiredField<String>('title', parsedFields);
179180
titles.addAll(json.parseMapField<String>('titles', parsedFields) ?? {});
180181
description = json.parseField<String>('description', parsedFields);

lib/src/scripting_api/discovery/thing_discovery.dart

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,23 @@ abstract interface class ThingDiscovery implements Stream<ThingDescription> {
2222
/// Stops the discovery process.
2323
void stop();
2424
}
25+
26+
/// Provides the properties and methods controlling the discovery process, and
27+
/// returning the results.
28+
abstract interface class ThingDiscoveryProcess
29+
implements Stream<ThingDescription> {
30+
/// Optional filter that can applied during the discovery process.
31+
ThingFilter? get thingFilter;
32+
33+
/// `true` if the discovery has been stopped or completed with no more results
34+
/// to report.
35+
bool get done;
36+
37+
/// Represents the last error that occurred during the discovery process.
38+
///
39+
/// Typically used for critical errors that stop discovery.
40+
Exception? get error;
41+
42+
/// Stops the discovery process.
43+
Future<void> stop();
44+
}

lib/src/scripting_api/wot.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ abstract interface class WoT {
3434
/// Requests a [ThingDescription] from the given [url].
3535
Future<ThingDescription> requestThingDescription(Uri url);
3636

37+
/// Starts the discovery process that given a TD Directory [url], will provide
38+
/// [ThingDescription] objects for Thing Descriptions that match an optional
39+
/// [filter] argument of type [ThingFilter].
40+
Future<ThingDiscoveryProcess> exploreDirectory(
41+
Uri url, [
42+
ThingFilter? filter,
43+
]);
44+
3745
/// Discovers [ThingDescription]s from a given [url] using the specified
3846
/// [method].
3947
///

0 commit comments

Comments
 (0)