Skip to content

Commit f460df3

Browse files
Correct the atomic update Postgres implementation (#130)
1 parent 27ed49c commit f460df3

File tree

7 files changed

+151
-17
lines changed

7 files changed

+151
-17
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,7 +1240,7 @@ public void testFindWithSortingAndPagination(String datastoreName) throws IOExce
12401240

12411241
@ParameterizedTest
12421242
@MethodSource("databaseContextBoth")
1243-
public void testUpdateAtomicWithFilter(final String datastoreName)
1243+
public void testAtomicUpdateWithFilter(final String datastoreName)
12441244
throws IOException, ExecutionException, InterruptedException {
12451245
final Collection collection = getCollection(datastoreName, UPDATABLE_COLLECTION_NAME);
12461246
createCollectionData("mongo/updatable_collection_data.json", UPDATABLE_COLLECTION_NAME);
@@ -1308,7 +1308,72 @@ public void testUpdateAtomicWithFilter(final String datastoreName)
13081308
.addSelection(IdentifierExpression.of("props.brand"), "brand")
13091309
.addSort(IdentifierExpression.of("_id"), ASC)
13101310
.build()),
1311-
"mongo/updateable_collection_data_after_atomic_update.json",
1311+
"mongo/updatable_collection_data_after_atomic_update.json",
1312+
9);
1313+
}
1314+
1315+
@ParameterizedTest
1316+
@MethodSource("databaseContextBoth")
1317+
public void testAtomicUpdateSameDocumentWithFilter(final String datastoreName)
1318+
throws IOException, ExecutionException, InterruptedException {
1319+
final Collection collection = getCollection(datastoreName, UPDATABLE_COLLECTION_NAME);
1320+
createCollectionData("mongo/updatable_collection_data.json", UPDATABLE_COLLECTION_NAME);
1321+
1322+
final Query query =
1323+
Query.builder()
1324+
.setFilter(
1325+
RelationalExpression.of(
1326+
IdentifierExpression.of("item"), EQ, ConstantExpression.of("Soap")))
1327+
.addSort(SortingSpec.of(IdentifierExpression.of("price"), ASC))
1328+
.addSort(SortingSpec.of(IdentifierExpression.of("date"), DESC))
1329+
.addSelection(IdentifierExpression.of("quantity"))
1330+
.addSelection(IdentifierExpression.of("price"))
1331+
.addSelection(IdentifierExpression.of("date"))
1332+
.addSelection(IdentifierExpression.of("props"))
1333+
.build();
1334+
final SubDocumentUpdate dateUpdate = SubDocumentUpdate.of("date", "2022-08-09T18:53:17Z");
1335+
final SubDocumentUpdate quantityUpdate = SubDocumentUpdate.of("quantity", 1000);
1336+
final SubDocumentUpdate propsUpdate =
1337+
SubDocumentUpdate.of(
1338+
"props", SubDocumentValue.of(new JSONDocument("{\"brand\": \"Dettol\"}")));
1339+
1340+
final Random random = new Random();
1341+
final Callable<Optional<Document>> callable =
1342+
() -> {
1343+
MILLISECONDS.sleep(random.nextInt(1000));
1344+
return collection.update(query, List.of(dateUpdate, quantityUpdate, propsUpdate));
1345+
};
1346+
1347+
final ExecutorService executor = Executors.newFixedThreadPool(2);
1348+
final Future<Optional<Document>> future1 = executor.submit(callable);
1349+
final Future<Optional<Document>> future2 = executor.submit(callable);
1350+
1351+
final Optional<Document> doc1Optional = future1.get();
1352+
final Optional<Document> doc2Optional = future2.get();
1353+
1354+
assertTrue(doc1Optional.isPresent());
1355+
assertTrue(doc2Optional.isPresent());
1356+
1357+
final Document document1 = doc1Optional.get();
1358+
final Document document2 = doc2Optional.get();
1359+
1360+
assertNotEquals(document1, document2);
1361+
assertDocsAndSizeEqualWithoutOrder(
1362+
datastoreName,
1363+
List.of(document1, document2).iterator(),
1364+
2,
1365+
"mongo/atomic_update_same_document_response.json");
1366+
assertDocsAndSizeEqual(
1367+
collection.find(
1368+
Query.builder()
1369+
.addSelection(IdentifierExpression.of("item"))
1370+
.addSelection(IdentifierExpression.of("price"))
1371+
.addSelection(IdentifierExpression.of("quantity"))
1372+
.addSelection(IdentifierExpression.of("date"))
1373+
.addSelection(IdentifierExpression.of("props.brand"), "brand")
1374+
.addSort(IdentifierExpression.of("_id"), ASC)
1375+
.build()),
1376+
"mongo/updatable_collection_data_after_atomic_update_same_document.json",
13121377
9);
13131378
}
13141379

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[
2+
{
3+
"date": "2022-08-09T18:53:17Z",
4+
"quantity": 1000,
5+
"price": 10,
6+
"props": {
7+
"brand": "Dettol"
8+
}
9+
},
10+
{
11+
"date": "2016-02-06T20:20:13Z",
12+
"price": 10,
13+
"quantity": 5
14+
}
15+
]

document-store/src/integrationTest/resources/mongo/updateable_collection_data_after_atomic_update.json renamed to document-store/src/integrationTest/resources/mongo/updatable_collection_data_after_atomic_update.json

File renamed without changes.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
[
2+
{
3+
"date": "2014-03-01T08:00:00Z",
4+
"item": "Soap",
5+
"price": 10,
6+
"quantity": 2,
7+
"brand": "Dettol"
8+
},
9+
{
10+
"date": "2014-03-01T09:00:00Z",
11+
"item": "Mirror",
12+
"price": 20,
13+
"quantity": 1
14+
},
15+
{
16+
"date": "2014-03-15T09:00:00Z",
17+
"item": "Shampoo",
18+
"price": 5,
19+
"quantity": 10,
20+
"brand": "Sunsilk"
21+
},
22+
{
23+
"date": "2014-04-04T11:21:39.736Z",
24+
"item": "Shampoo",
25+
"price": 5,
26+
"quantity": 20
27+
},
28+
{
29+
"date": "2014-04-04T21:23:13.331Z",
30+
"item": "Soap",
31+
"price": 20,
32+
"quantity": 5,
33+
"brand": "Lifebuoy"
34+
},
35+
{
36+
"date": "2015-06-04T05:08:13Z",
37+
"item": "Comb",
38+
"price": 7.5,
39+
"quantity": 5
40+
},
41+
{
42+
"date": "2015-09-10T08:43:00Z",
43+
"item": "Comb",
44+
"price": 7.5,
45+
"quantity": 10
46+
},
47+
{
48+
"date": "2022-08-09T18:53:17Z",
49+
"item": "Soap",
50+
"price": 10,
51+
"quantity": 1000,
52+
"brand": "Dettol"
53+
},
54+
{
55+
"item": "Soap",
56+
"price": 88,
57+
"quantity": 50,
58+
"date": "2023-08-09T18:53:17Z"
59+
}
60+
]

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ public Optional<Document> update(
411411
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser parser =
412412
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
413413
collectionName, query);
414-
final String selectQuery = parser.buildSelectQueryForUpdateSkippingLocked();
414+
final String selectQuery = parser.buildSelectQueryForUpdate();
415415

416416
try (final PreparedStatement pStmt = connection.prepareStatement(selectQuery)) {
417417
enrichPreparedStatementWithParams(pStmt, parser.getParamsBuilder().build());

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public String parse() {
8787
return sqlBuilder.toString();
8888
}
8989

90-
public String buildSelectQueryForUpdateSkippingLocked() {
90+
public String buildSelectQueryForUpdate() {
9191
final String selections = getSelectionsWithImplicitId();
9292
final Optional<String> optionalOrderBy = parseOrderBy();
9393
final Optional<String> optionalFilter = parseFilter();
@@ -98,7 +98,7 @@ public String buildSelectQueryForUpdateSkippingLocked() {
9898
optionalFilter.ifPresent(filter -> queryBuilder.append(" WHERE ").append(filter));
9999
optionalOrderBy.ifPresent(orderBy -> queryBuilder.append(" ORDER BY ").append(orderBy));
100100
queryBuilder.append(" LIMIT 1");
101-
queryBuilder.append(" FOR UPDATE SKIP LOCKED");
101+
queryBuilder.append(" FOR UPDATE");
102102

103103
return queryBuilder.toString();
104104
}

document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException {
9191
+ "document->'price' ASC NULLS FIRST,"
9292
+ "document->'date' DESC NULLS LAST "
9393
+ "LIMIT 1 "
94-
+ "FOR UPDATE "
95-
+ "SKIP LOCKED",
94+
+ "FOR UPDATE",
9695
COLLECTION_NAME)))
9796
.thenReturn(mockSelectPreparedStatement);
9897
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
@@ -142,8 +141,7 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException {
142141
+ "document->'price' ASC NULLS FIRST,"
143142
+ "document->'date' DESC NULLS LAST "
144143
+ "LIMIT 1 "
145-
+ "FOR UPDATE "
146-
+ "SKIP LOCKED",
144+
+ "FOR UPDATE",
147145
COLLECTION_NAME));
148146
verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap");
149147
verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z");
@@ -199,8 +197,7 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException
199197
+ "document->'price' ASC NULLS FIRST,"
200198
+ "document->'date' DESC NULLS LAST "
201199
+ "LIMIT 1 "
202-
+ "FOR UPDATE "
203-
+ "SKIP LOCKED",
200+
+ "FOR UPDATE",
204201
COLLECTION_NAME)))
205202
.thenReturn(mockSelectPreparedStatement);
206203
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
@@ -228,8 +225,7 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException
228225
+ "document->'price' ASC NULLS FIRST,"
229226
+ "document->'date' DESC NULLS LAST "
230227
+ "LIMIT 1 "
231-
+ "FOR UPDATE "
232-
+ "SKIP LOCKED",
228+
+ "FOR UPDATE",
233229
COLLECTION_NAME));
234230
verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap");
235231
verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z");
@@ -267,8 +263,7 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception {
267263
+ "document->'price' ASC NULLS FIRST,"
268264
+ "document->'date' DESC NULLS LAST "
269265
+ "LIMIT 1 "
270-
+ "FOR UPDATE "
271-
+ "SKIP LOCKED",
266+
+ "FOR UPDATE",
272267
COLLECTION_NAME)))
273268
.thenReturn(mockSelectPreparedStatement);
274269
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
@@ -315,8 +310,7 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception {
315310
+ "document->'price' ASC NULLS FIRST,"
316311
+ "document->'date' DESC NULLS LAST "
317312
+ "LIMIT 1 "
318-
+ "FOR UPDATE "
319-
+ "SKIP LOCKED",
313+
+ "FOR UPDATE",
320314
COLLECTION_NAME));
321315
verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap");
322316
verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z");

0 commit comments

Comments
 (0)