Skip to content

Commit cfc9c08

Browse files
authored
Implement Bulk Update for FlatPostgresCollection (#278)
1 parent ef0d310 commit cfc9c08

File tree

3 files changed

+501
-14
lines changed

3 files changed

+501
-14
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 327 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2584,10 +2584,16 @@ void testUpdateUnsupportedOperator() {
25842584
assertThrows(
25852585
IllegalArgumentException.class, () -> flatCollection.update(query, updates, options));
25862586
}
2587+
}
2588+
2589+
@Nested
2590+
@DisplayName("Bulk Update Operations")
2591+
class BulkUpdateTests {
25872592

25882593
@Test
2589-
@DisplayName("Should throw UnsupportedOperationException for bulkUpdate")
2590-
void testBulkUpdate() {
2594+
@DisplayName("Should update multiple rows and return AFTER_UPDATE documents")
2595+
void testBulkUpdateWithAfterUpdateReturn() throws Exception {
2596+
// Filter: price > 5 should match multiple rows (IDs 1, 2, 3, 5, 6, 7, 8)
25912597
Query query =
25922598
Query.builder()
25932599
.setFilter(
@@ -2597,14 +2603,330 @@ void testBulkUpdate() {
25972603
ConstantExpression.of(5)))
25982604
.build();
25992605

2600-
List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("price", 100));
2606+
List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("quantity", 999));
2607+
2608+
UpdateOptions options =
2609+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();
2610+
2611+
CloseableIterator<Document> resultIterator =
2612+
flatCollection.bulkUpdate(query, updates, options);
2613+
2614+
List<Document> results = new ArrayList<>();
2615+
while (resultIterator.hasNext()) {
2616+
results.add(resultIterator.next());
2617+
}
2618+
resultIterator.close();
2619+
2620+
assertTrue(results.size() > 1, "Should return multiple updated documents");
2621+
2622+
for (Document doc : results) {
2623+
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
2624+
assertEquals(999, json.get("quantity").asInt(), "All docs should have updated quantity");
2625+
}
2626+
2627+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
2628+
try (Connection conn = pgDatastore.getPostgresClient();
2629+
PreparedStatement ps =
2630+
conn.prepareStatement(
2631+
String.format(
2632+
"SELECT COUNT(*) FROM \"%s\" WHERE \"quantity\" = 999",
2633+
FLAT_COLLECTION_NAME));
2634+
ResultSet rs = ps.executeQuery()) {
2635+
assertTrue(rs.next());
2636+
assertEquals(results.size(), rs.getInt(1), "DB should have same number of updated rows");
2637+
}
2638+
}
2639+
2640+
@Test
2641+
@DisplayName("Should update multiple rows and return BEFORE_UPDATE documents")
2642+
void testBulkUpdateWithBeforeUpdateReturn() throws Exception {
2643+
// First, get the original quantities for verification
2644+
Map<String, Integer> originalQuantities = new HashMap<>();
2645+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
2646+
try (Connection conn = pgDatastore.getPostgresClient();
2647+
PreparedStatement ps =
2648+
conn.prepareStatement(
2649+
String.format(
2650+
"SELECT \"id\", \"quantity\" FROM \"%s\" WHERE \"price\" > 10",
2651+
FLAT_COLLECTION_NAME));
2652+
ResultSet rs = ps.executeQuery()) {
2653+
while (rs.next()) {
2654+
originalQuantities.put(rs.getString("id"), rs.getInt("quantity"));
2655+
}
2656+
}
2657+
2658+
// Filter: price > 10 should match a subset of rows
2659+
Query query =
2660+
Query.builder()
2661+
.setFilter(
2662+
RelationalExpression.of(
2663+
IdentifierExpression.of("price"),
2664+
RelationalOperator.GT,
2665+
ConstantExpression.of(10)))
2666+
.build();
2667+
2668+
List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("quantity", 888));
2669+
2670+
UpdateOptions options =
2671+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build();
2672+
2673+
CloseableIterator<Document> resultIterator =
2674+
flatCollection.bulkUpdate(query, updates, options);
2675+
2676+
List<Document> results = new ArrayList<>();
2677+
while (resultIterator.hasNext()) {
2678+
results.add(resultIterator.next());
2679+
}
2680+
resultIterator.close();
2681+
2682+
// Verify the returned documents have the ORIGINAL quantities (before update)
2683+
for (Document doc : results) {
2684+
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
2685+
String id = json.get("id").asText();
2686+
int returnedQuantity = json.get("quantity").asInt();
2687+
2688+
assertTrue(originalQuantities.containsKey(id), "Returned doc ID should be in original set");
2689+
assertEquals(
2690+
originalQuantities.get(id).intValue(),
2691+
returnedQuantity,
2692+
"Returned quantity should be the ORIGINAL value");
2693+
}
2694+
2695+
// But database should have the NEW value
2696+
try (Connection conn = pgDatastore.getPostgresClient();
2697+
PreparedStatement ps =
2698+
conn.prepareStatement(
2699+
String.format(
2700+
"SELECT \"quantity\" FROM \"%s\" WHERE \"price\" > 10",
2701+
FLAT_COLLECTION_NAME));
2702+
ResultSet rs = ps.executeQuery()) {
2703+
while (rs.next()) {
2704+
assertEquals(888, rs.getInt("quantity"), "DB should have the updated value");
2705+
}
2706+
}
2707+
}
2708+
2709+
@Test
2710+
@DisplayName("Should return empty iterator when ReturnDocumentType is NONE")
2711+
void testBulkUpdateWithNoneReturn() throws Exception {
2712+
Query query =
2713+
Query.builder()
2714+
.setFilter(
2715+
RelationalExpression.of(
2716+
IdentifierExpression.of("id"),
2717+
RelationalOperator.EQ,
2718+
ConstantExpression.of("1")))
2719+
.build();
2720+
2721+
List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("price", 123));
2722+
2723+
UpdateOptions options =
2724+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.NONE).build();
2725+
2726+
CloseableIterator<Document> resultIterator =
2727+
flatCollection.bulkUpdate(query, updates, options);
2728+
2729+
// Should return empty iterator
2730+
assertFalse(resultIterator.hasNext(), "Should return empty iterator for NONE return type");
2731+
resultIterator.close();
2732+
2733+
// But database should still be updated
2734+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
2735+
try (Connection conn = pgDatastore.getPostgresClient();
2736+
PreparedStatement ps =
2737+
conn.prepareStatement(
2738+
String.format(
2739+
"SELECT \"price\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME));
2740+
ResultSet rs = ps.executeQuery()) {
2741+
assertTrue(rs.next());
2742+
assertEquals(123, rs.getInt("price"));
2743+
}
2744+
}
2745+
2746+
@Test
2747+
@DisplayName("Should return empty iterator when filter matches no documents")
2748+
void testBulkUpdateNoMatchingDocuments() throws Exception {
2749+
Query query =
2750+
Query.builder()
2751+
.setFilter(
2752+
RelationalExpression.of(
2753+
IdentifierExpression.of("id"),
2754+
RelationalOperator.EQ,
2755+
ConstantExpression.of("non-existent-id")))
2756+
.build();
2757+
2758+
List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("price", 999));
2759+
2760+
UpdateOptions options =
2761+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();
2762+
2763+
CloseableIterator<Document> resultIterator =
2764+
flatCollection.bulkUpdate(query, updates, options);
2765+
2766+
assertFalse(resultIterator.hasNext(), "Should return empty iterator when no docs match");
2767+
resultIterator.close();
2768+
}
2769+
2770+
@Test
2771+
@DisplayName("Should update with multiple SubDocumentUpdates")
2772+
void testBulkUpdateMultipleFields() throws Exception {
2773+
// Update item = "Soap" (IDs 1, 5, 8)
2774+
Query query =
2775+
Query.builder()
2776+
.setFilter(
2777+
RelationalExpression.of(
2778+
IdentifierExpression.of("item"),
2779+
RelationalOperator.EQ,
2780+
ConstantExpression.of("Soap")))
2781+
.build();
2782+
2783+
// Update both price and quantity
2784+
List<SubDocumentUpdate> updates =
2785+
List.of(SubDocumentUpdate.of("price", 50), SubDocumentUpdate.of("quantity", 200));
2786+
2787+
UpdateOptions options =
2788+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();
2789+
2790+
CloseableIterator<Document> resultIterator =
2791+
flatCollection.bulkUpdate(query, updates, options);
2792+
2793+
List<Document> results = new ArrayList<>();
2794+
while (resultIterator.hasNext()) {
2795+
results.add(resultIterator.next());
2796+
}
2797+
resultIterator.close();
2798+
2799+
assertEquals(3, results.size(), "Should return 3 Soap items");
2800+
2801+
for (Document doc : results) {
2802+
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
2803+
assertEquals("Soap", json.get("item").asText());
2804+
assertEquals(50, json.get("price").asInt());
2805+
assertEquals(200, json.get("quantity").asInt());
2806+
}
2807+
}
2808+
2809+
@Test
2810+
@DisplayName("Should update nested JSONB paths for multiple documents")
2811+
void testBulkUpdateNestedJsonbPath() throws Exception {
2812+
// Documents with props JSONB: IDs 1, 3, 5, 7
2813+
Query query =
2814+
Query.builder()
2815+
.setFilter(
2816+
RelationalExpression.of(
2817+
IdentifierExpression.of("item"),
2818+
RelationalOperator.EQ,
2819+
ConstantExpression.of("Soap")))
2820+
.build();
2821+
2822+
List<SubDocumentUpdate> updates =
2823+
List.of(SubDocumentUpdate.of("props.brand", "BulkUpdatedBrand"));
2824+
2825+
UpdateOptions options =
2826+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();
2827+
2828+
CloseableIterator<Document> resultIterator =
2829+
flatCollection.bulkUpdate(query, updates, options);
2830+
2831+
List<Document> results = new ArrayList<>();
2832+
while (resultIterator.hasNext()) {
2833+
results.add(resultIterator.next());
2834+
}
2835+
resultIterator.close();
2836+
2837+
// Verify all returned documents have updated props.brand
2838+
for (Document doc : results) {
2839+
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
2840+
JsonNode props = json.get("props");
2841+
if (props != null && !props.isNull()) {
2842+
assertEquals(
2843+
"BulkUpdatedBrand", props.get("brand").asText(), "props.brand should be updated");
2844+
}
2845+
}
2846+
}
2847+
2848+
@Test
2849+
@DisplayName("Should throw IllegalArgumentException for empty updates")
2850+
void testBulkUpdateEmptyUpdates() {
2851+
Query query =
2852+
Query.builder()
2853+
.setFilter(
2854+
RelationalExpression.of(
2855+
IdentifierExpression.of("id"),
2856+
RelationalOperator.EQ,
2857+
ConstantExpression.of("1")))
2858+
.build();
2859+
2860+
List<SubDocumentUpdate> emptyUpdates = Collections.emptyList();
26012861

26022862
UpdateOptions options =
26032863
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();
26042864

26052865
assertThrows(
2606-
UnsupportedOperationException.class,
2607-
() -> flatCollection.bulkUpdate(query, updates, options));
2866+
IllegalArgumentException.class,
2867+
() -> flatCollection.bulkUpdate(query, emptyUpdates, options));
2868+
}
2869+
2870+
@Test
2871+
@DisplayName("Should skip non-existent column with default SKIP strategy")
2872+
void testBulkUpdateNonExistentColumnWithSkipStrategy() throws Exception {
2873+
Query query =
2874+
Query.builder()
2875+
.setFilter(
2876+
RelationalExpression.of(
2877+
IdentifierExpression.of("id"),
2878+
RelationalOperator.EQ,
2879+
ConstantExpression.of("1")))
2880+
.build();
2881+
2882+
// Mix of valid and invalid (non-existent) column paths
2883+
List<SubDocumentUpdate> updates =
2884+
List.of(
2885+
SubDocumentUpdate.of("price", 111),
2886+
SubDocumentUpdate.of("nonExistentColumn", "someValue"));
2887+
2888+
UpdateOptions options =
2889+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();
2890+
2891+
// Default strategy is SKIP - should not throw, just skip the non-existent column
2892+
CloseableIterator<Document> resultIterator =
2893+
flatCollection.bulkUpdate(query, updates, options);
2894+
2895+
List<Document> results = new ArrayList<>();
2896+
while (resultIterator.hasNext()) {
2897+
results.add(resultIterator.next());
2898+
}
2899+
resultIterator.close();
2900+
2901+
assertEquals(1, results.size());
2902+
JsonNode json = OBJECT_MAPPER.readTree(results.get(0).toJson());
2903+
assertEquals(111, json.get("price").asInt(), "Valid column should be updated");
2904+
assertFalse(json.has("nonExistentColumn"), "Non-existent column should not appear");
2905+
}
2906+
2907+
@Test
2908+
@DisplayName("Should throw exception for non-existent column with THROW strategy")
2909+
void testBulkUpdateNonExistentColumnWithThrowStrategy() {
2910+
Collection collectionWithThrowStrategy =
2911+
getFlatCollectionWithStrategy(MissingColumnStrategy.THROW.toString());
2912+
2913+
Query query =
2914+
Query.builder()
2915+
.setFilter(
2916+
RelationalExpression.of(
2917+
IdentifierExpression.of("id"),
2918+
RelationalOperator.EQ,
2919+
ConstantExpression.of("1")))
2920+
.build();
2921+
2922+
List<SubDocumentUpdate> updates =
2923+
List.of(SubDocumentUpdate.of("nonExistentColumn", "someValue"));
2924+
2925+
UpdateOptions options =
2926+
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();
2927+
2928+
assertThrows(
2929+
IOException.class, () -> collectionWithThrowStrategy.bulkUpdate(query, updates, options));
26082930
}
26092931
}
26102932

0 commit comments

Comments
 (0)