Skip to content

Commit c8f71d7

Browse files
authored
feat(datastore): Add QueryPredicate to Observe (#1332)
1 parent b5a1c3a commit c8f71d7

File tree

5 files changed

+57
-10
lines changed

5 files changed

+57
-10
lines changed

packages/amplify_datastore/example/integration_test/observe_test.dart

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,51 @@ void main() {
7575
await Amplify.DataStore.save(updatedBlog);
7676
await Amplify.DataStore.delete(updatedBlog);
7777
});
78+
79+
testWidgets('observe with query predicates returns all matches',
80+
(WidgetTester tester) async {
81+
Blog blog = Blog(name: 'blog');
82+
Blog updatedBlog = blog.copyWith(name: 'updated blog');
83+
84+
var eventItemStream = Amplify.DataStore.observe(Blog.classType,
85+
where: Blog.NAME.ne("not a blog name"))
86+
.map((event) => event.item);
87+
expectLater(
88+
eventItemStream,
89+
emitsInOrder(
90+
[
91+
blog,
92+
updatedBlog,
93+
updatedBlog,
94+
],
95+
),
96+
);
97+
98+
await Amplify.DataStore.save(blog);
99+
await Amplify.DataStore.save(updatedBlog);
100+
await Amplify.DataStore.delete(updatedBlog);
101+
});
102+
103+
testWidgets('observe with query predicates filters out non matches',
104+
(WidgetTester tester) async {
105+
Blog blog = Blog(name: 'matching blog');
106+
Blog updatedBlog = blog.copyWith(name: 'updated blog');
107+
Blog otherBlog = Blog(name: 'matching blog 2');
108+
109+
var eventItemStream = Amplify.DataStore.observe(Blog.classType,
110+
where: Blog.NAME.contains("matching"))
111+
.map((event) => event.item);
112+
expectLater(
113+
eventItemStream,
114+
emitsInOrder(
115+
[blog, otherBlog],
116+
),
117+
);
118+
119+
await Amplify.DataStore.save(blog);
120+
await Amplify.DataStore.save(updatedBlog);
121+
await Amplify.DataStore.delete(updatedBlog);
122+
await Amplify.DataStore.save(otherBlog);
123+
});
78124
});
79125
}

packages/amplify_datastore/lib/amplify_datastore.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ class AmplifyDataStore extends DataStorePluginInterface {
122122
}
123123

124124
@override
125-
Stream<SubscriptionEvent<T>> observe<T extends Model>(
126-
ModelType<T> modelType) {
127-
return _instance.observe(modelType);
125+
Stream<SubscriptionEvent<T>> observe<T extends Model>(ModelType<T> modelType,
126+
{QueryPredicate? where}) {
127+
return _instance.observe(modelType, where: where);
128128
}
129129

130130
@override

packages/amplify_datastore/lib/method_channel_datastore.dart

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ class AmplifyDataStoreMethodChannel extends AmplifyDataStore {
178178
}
179179

180180
@override
181-
Stream<SubscriptionEvent<T>> observe<T extends Model>(
182-
ModelType<T> modelType) async* {
181+
Stream<SubscriptionEvent<T>> observe<T extends Model>(ModelType<T> modelType,
182+
{QueryPredicate? where}) async* {
183183
await _setUpObserveIfNeeded();
184184

185185
// Step #1. Open the event channel if it's not already open. Note
@@ -200,6 +200,7 @@ class AmplifyDataStoreMethodChannel extends AmplifyDataStore {
200200
// Step #3. Deserialize events and return new broadcast stream
201201
yield* filteredStream
202202
.map((event) => SubscriptionEvent.fromMap(event, modelType))
203+
.where((event) => where == null || where.evaluate(event.item))
203204
.asBroadcastStream()
204205
.cast<SubscriptionEvent<T>>();
205206
}

packages/amplify_datastore_plugin_interface/lib/amplify_datastore_plugin_interface.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ abstract class DataStorePluginInterface extends AmplifyPluginInterface {
121121
throw UnimplementedError('save() has not been implemented');
122122
}
123123

124-
Stream<SubscriptionEvent<T>> observe<T extends Model>(
125-
ModelType<T> modelType) {
124+
Stream<SubscriptionEvent<T>> observe<T extends Model>(ModelType<T> modelType,
125+
{QueryPredicate? where}) {
126126
throw UnimplementedError('observe() has not been implemented.');
127127
}
128128

packages/amplify_flutter/lib/src/categories/amplify_datastore_category.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ class DataStoreCategory {
9292
}
9393

9494
/// Observe changes on the specified [modelType].
95-
Stream<SubscriptionEvent<T>> observe<T extends Model>(
96-
ModelType<T> modelType) {
95+
Stream<SubscriptionEvent<T>> observe<T extends Model>(ModelType<T> modelType,
96+
{QueryPredicate? where}) {
9797
return plugins.length == 1
98-
? plugins[0].observe(modelType)
98+
? plugins[0].observe(modelType, where: where)
9999
: throw _pluginNotAddedException('DataStore');
100100
}
101101

0 commit comments

Comments
 (0)