Skip to content

Commit 35fb715

Browse files
authored
fix(datastore): fix selective sync expressions to run DDB query when possible (#1100)
* fix(datastore): wrap provided syncexpression in 'and' operator to query items * fix(datastore): refactor InitialSyncOperation tests * fix(datastore): update docs * fix(datastore): move optimization logic down to sync query * fix(datastore): optimized sync, add more tests * fix(datastore): optimized sync, querygroup filter test
1 parent b70c98a commit 35fb715

File tree

5 files changed

+329
-157
lines changed

5 files changed

+329
-157
lines changed

AmplifyPlugins/Core/AWSPluginsCore/Model/GraphQLRequest/GraphQLRequest+AnyModelWithSync.swift

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory {
170170
var documentBuilder = ModelBasedGraphQLDocumentBuilder(modelSchema: modelSchema,
171171
operationType: .query)
172172
documentBuilder.add(decorator: DirectiveNameDecorator(type: .sync))
173-
if let predicate = predicate {
173+
if let predicate = optimizePredicate(predicate) {
174174
documentBuilder.add(decorator: FilterDecorator(filter: predicate.graphQLFilter(for: modelSchema)))
175175
}
176176
documentBuilder.add(decorator: PaginationDecorator(limit: limit, nextToken: nextToken))
@@ -207,4 +207,22 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory {
207207
responseType: MutationSyncResult.self,
208208
decodePath: document.name)
209209
}
210+
211+
/// This function tries to optimize provided `QueryPredicate` to perform a DynamoDB query instead of a scan.
212+
/// Wrapping the predicate with a group AND enables AppSync to perform the optimization.
213+
/// If the provided predicate is already a QueryPredicateGroup, this is not needed.
214+
/// If the provided group is of type AND, the optimization will occur.
215+
/// If the top level group is OR or NOT, the optimization is not possible anyway.
216+
private static func optimizePredicate(_ predicate: QueryPredicate?) -> QueryPredicate? {
217+
guard let predicate = predicate else {
218+
return nil
219+
}
220+
if predicate as? QueryPredicateGroup != nil {
221+
return predicate
222+
} else if let predicate = predicate as? QueryPredicateConstant,
223+
predicate == .all {
224+
return predicate
225+
}
226+
return QueryPredicateGroup(type: .and, predicates: [predicate])
227+
}
210228
}

AmplifyPlugins/Core/AWSPluginsCoreTests/Model/GraphQLRequest/GraphQLRequestAnyModelWithSyncTests.swift

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,65 @@ class GraphQLRequestAnyModelWithSyncTests: XCTestCase {
275275
XCTAssertEqual(variables["lastSync"] as? Int, lastSync)
276276
}
277277

278+
func testOptimizedSyncQueryGraphQLRequestWithFilter() {
279+
let modelType = Post.self as Model.Type
280+
let nextToken = "nextToken"
281+
let limit = 100
282+
let lastSync = 123
283+
let postId = "123"
284+
let predicate = Post.CodingKeys.id.eq(postId)
285+
let request = GraphQLRequest<SyncQueryResult>.syncQuery(
286+
modelSchema: modelType.schema,
287+
where: predicate,
288+
limit: limit,
289+
nextToken: nextToken,
290+
lastSync: lastSync)
291+
292+
guard let variables = request.variables else {
293+
XCTFail("The request doesn't contain variables")
294+
return
295+
}
296+
guard variables["filter"] != nil, let filter = variables["filter"] as? [String: Any] else {
297+
XCTFail("The request doesn't contain a filter")
298+
return
299+
}
300+
301+
XCTAssertEqual(variables["limit"] as? Int, limit)
302+
XCTAssertEqual(variables["nextToken"] as? String, nextToken)
303+
XCTAssertNotNil(filter)
304+
XCTAssertNotNil(filter["and"])
305+
}
306+
307+
func testSyncQueryGraphQLRequestWithPredicateGroupFilter() {
308+
let modelType = Post.self as Model.Type
309+
let nextToken = "nextToken"
310+
let limit = 100
311+
let lastSync = 123
312+
let postId = "123"
313+
let altPostId = "456"
314+
let predicate = Post.CodingKeys.id.eq(postId) || Post.CodingKeys.id.eq(altPostId)
315+
let request = GraphQLRequest<SyncQueryResult>.syncQuery(
316+
modelSchema: modelType.schema,
317+
where: predicate,
318+
limit: limit,
319+
nextToken: nextToken,
320+
lastSync: lastSync)
321+
322+
guard let variables = request.variables else {
323+
XCTFail("The request doesn't contain variables")
324+
return
325+
}
326+
guard variables["filter"] != nil, let filter = variables["filter"] as? [String: Any] else {
327+
XCTFail("The request doesn't contain a filter")
328+
return
329+
}
330+
331+
XCTAssertEqual(variables["limit"] as? Int, limit)
332+
XCTAssertEqual(variables["nextToken"] as? String, nextToken)
333+
XCTAssertNotNil(filter)
334+
XCTAssertNotNil(filter["or"])
335+
}
336+
278337
func testUpdateMutationWithEmptyFilter() {
279338
let post = Post(title: "title", content: "content", createdAt: .now())
280339
let documentStringValue = """
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import XCTest
9+
import SQLite
10+
import Combine
11+
12+
@testable import Amplify
13+
@testable import AmplifyTestCommon
14+
@testable import AWSDataStoreCategoryPlugin
15+
@testable import AWSPluginsCore
16+
17+
class InitialSyncOperationSyncExpressionTests: XCTestCase {
18+
typealias APIPluginQueryResponder = QueryRequestListenerResponder<PaginatedList<AnyModel>>
19+
20+
var storageAdapter: StorageEngineAdapter!
21+
var apiPlugin = MockAPICategoryPlugin()
22+
let reconciliationQueue = MockReconciliationQueue()
23+
var syncStartedReceived: XCTestExpectation!
24+
var syncCompletionReceived: XCTestExpectation!
25+
var finishedReceived: XCTestExpectation!
26+
var apiWasQueried: XCTestExpectation!
27+
28+
override func setUpWithError() throws {
29+
storageAdapter = try SQLiteStorageEngineAdapter(connection: Connection(.inMemory))
30+
try storageAdapter.setUp(modelSchemas: StorageEngine.systemModelSchemas + [MockSynced.schema])
31+
syncStartedReceived = expectation(description: "Sync started received, sync operation started")
32+
syncCompletionReceived = expectation(description: "Sync completion received, sync operation is complete")
33+
finishedReceived = expectation(description: "InitialSyncOperation finishe offering items")
34+
apiWasQueried = expectation(description: "API was queried with sync expression")
35+
}
36+
37+
func initialSyncOperation(withSyncExpression syncExpression: DataStoreSyncExpression,
38+
responder: APIPluginQueryResponder) -> InitialSyncOperation {
39+
apiPlugin.responders[.queryRequestListener] = responder
40+
let configuration = DataStoreConfiguration.custom(syncPageSize: 10, syncExpressions: [syncExpression])
41+
return InitialSyncOperation(
42+
modelSchema: MockSynced.schema,
43+
api: apiPlugin,
44+
reconciliationQueue: reconciliationQueue,
45+
storageAdapter: storageAdapter,
46+
dataStoreConfiguration: configuration)
47+
}
48+
49+
func testBaseQueryWithBasicSyncExpression() throws {
50+
let responder = APIPluginQueryResponder { request, listener in
51+
XCTAssertEqual(request.document, """
52+
query SyncMockSynceds($filter: ModelMockSyncedFilterInput, $limit: Int) {
53+
syncMockSynceds(filter: $filter, limit: $limit) {
54+
items {
55+
id
56+
__typename
57+
_version
58+
_deleted
59+
_lastChangedAt
60+
}
61+
nextToken
62+
startedAt
63+
}
64+
}
65+
""")
66+
guard let filter = request.variables?["filter"] as? [String: Any?] else {
67+
XCTFail("Unable to get filter")
68+
return nil
69+
}
70+
guard let group = filter["and"] as? [[String: Any?]] else {
71+
XCTFail("Unable to find 'and' group")
72+
return nil
73+
}
74+
75+
guard let key = group[0]["id"] as? [String: Any?] else {
76+
XCTFail("Unable to get id from filter")
77+
return nil
78+
}
79+
guard let value = key["eq"] as? String else {
80+
XCTFail("Unable to get eq from key")
81+
return nil
82+
}
83+
XCTAssertEqual(value, "id-123")
84+
85+
let list = PaginatedList<AnyModel>(items: [], nextToken: nil, startedAt: nil)
86+
let event: GraphQLOperation<PaginatedList<AnyModel>>.OperationResult = .success(.success(list))
87+
listener?(event)
88+
self.apiWasQueried.fulfill()
89+
return nil
90+
}
91+
92+
let syncExpression = DataStoreSyncExpression.syncExpression(MockSynced.schema, where: {
93+
MockSynced.keys.id == "id-123"
94+
})
95+
let operation = initialSyncOperation(withSyncExpression: syncExpression, responder: responder)
96+
97+
let sink = operation
98+
.publisher
99+
.sink(receiveCompletion: { _ in
100+
self.syncCompletionReceived.fulfill()
101+
}, receiveValue: { [self] value in
102+
switch value {
103+
case .started(modelName: let modelName, syncType: let syncType):
104+
XCTAssertEqual(modelName, "MockSynced")
105+
XCTAssertEqual(syncType, .fullSync)
106+
syncStartedReceived.fulfill()
107+
case .finished(modelName: let modelName):
108+
XCTAssertEqual(modelName, "MockSynced")
109+
finishedReceived.fulfill()
110+
default:
111+
break
112+
}
113+
})
114+
115+
operation.main()
116+
117+
waitForExpectations(timeout: 1)
118+
sink.cancel()
119+
}
120+
121+
func testBaseQueryWithFilterSyncExpression() throws {
122+
let responder = APIPluginQueryResponder { request, listener in
123+
XCTAssertEqual(request.document, """
124+
query SyncMockSynceds($filter: ModelMockSyncedFilterInput, $limit: Int) {
125+
syncMockSynceds(filter: $filter, limit: $limit) {
126+
items {
127+
id
128+
__typename
129+
_version
130+
_deleted
131+
_lastChangedAt
132+
}
133+
nextToken
134+
startedAt
135+
}
136+
}
137+
""")
138+
guard let filter = request.variables?["filter"] as? [String: Any?] else {
139+
XCTFail("Unable to get filter")
140+
return nil
141+
}
142+
guard let group = filter["or"] as? [[String: Any?]] else {
143+
XCTFail("Unable to find 'or' group")
144+
return nil
145+
}
146+
147+
guard let key = group[0]["id"] as? [String: Any?] else {
148+
XCTFail("Unable to get id from filter")
149+
return nil
150+
}
151+
guard let value = key["eq"] as? String else {
152+
XCTFail("Unable to get eq from key")
153+
return nil
154+
}
155+
XCTAssertEqual(value, "id-123")
156+
157+
let list = PaginatedList<AnyModel>(items: [], nextToken: nil, startedAt: nil)
158+
let event: GraphQLOperation<PaginatedList<AnyModel>>.OperationResult = .success(.success(list))
159+
listener?(event)
160+
self.apiWasQueried.fulfill()
161+
return nil
162+
}
163+
164+
let syncExpression = DataStoreSyncExpression.syncExpression(MockSynced.schema, where: {
165+
MockSynced.keys.id == "id-123" || MockSynced.keys.id == "id-456"
166+
})
167+
let operation = initialSyncOperation(withSyncExpression: syncExpression, responder: responder)
168+
169+
let sink = operation
170+
.publisher
171+
.sink(receiveCompletion: { _ in
172+
self.syncCompletionReceived.fulfill()
173+
}, receiveValue: { [self] value in
174+
switch value {
175+
case .started(modelName: let modelName, syncType: let syncType):
176+
XCTAssertEqual(modelName, "MockSynced")
177+
XCTAssertEqual(syncType, .fullSync)
178+
syncStartedReceived.fulfill()
179+
case .finished(modelName: let modelName):
180+
XCTAssertEqual(modelName, "MockSynced")
181+
finishedReceived.fulfill()
182+
default:
183+
break
184+
}
185+
})
186+
187+
operation.main()
188+
189+
waitForExpectations(timeout: 1)
190+
sink.cancel()
191+
}
192+
193+
func testBaseQueryWithSyncExpressionConstantAll() throws {
194+
let responder = APIPluginQueryResponder { request, listener in
195+
XCTAssertEqual(request.document, """
196+
query SyncMockSynceds($limit: Int) {
197+
syncMockSynceds(limit: $limit) {
198+
items {
199+
id
200+
__typename
201+
_version
202+
_deleted
203+
_lastChangedAt
204+
}
205+
nextToken
206+
startedAt
207+
}
208+
}
209+
""")
210+
XCTAssertNil(request.variables?["filter"])
211+
212+
let list = PaginatedList<AnyModel>(items: [], nextToken: nil, startedAt: nil)
213+
let event: GraphQLOperation<PaginatedList<AnyModel>>.OperationResult = .success(.success(list))
214+
listener?(event)
215+
self.apiWasQueried.fulfill()
216+
return nil
217+
}
218+
219+
let syncExpression = DataStoreSyncExpression.syncExpression(MockSynced.schema, where: {
220+
QueryPredicateConstant.all
221+
})
222+
let operation = initialSyncOperation(withSyncExpression: syncExpression, responder: responder)
223+
224+
let sink = operation
225+
.publisher
226+
.sink(receiveCompletion: { _ in
227+
self.syncCompletionReceived.fulfill()
228+
}, receiveValue: { [self] value in
229+
switch value {
230+
case .started(modelName: let modelName, syncType: let syncType):
231+
XCTAssertEqual(modelName, "MockSynced")
232+
XCTAssertEqual(syncType, .fullSync)
233+
syncStartedReceived.fulfill()
234+
case .finished(modelName: let modelName):
235+
XCTAssertEqual(modelName, "MockSynced")
236+
finishedReceived.fulfill()
237+
default:
238+
break
239+
}
240+
})
241+
242+
operation.main()
243+
244+
waitForExpectations(timeout: 1)
245+
sink.cancel()
246+
}
247+
}

0 commit comments

Comments
 (0)