Skip to content

Commit 07348c9

Browse files
Bulk Update for Postgres (#89)
* WIP * Remove changes to bulkUpdateOnArrayValue * Spotless * Handle BatchUpdateException * Formatting * Reorder private methods * Log update counts * Log write results * moved whenBulkUpdatingExistingRecords_thenExpectOnlyRecordsWhoseConditionsMatchToBeUpdated to DocStoreTest.java * Move remaining tests * Removed unwanted test * removed unwanted files * Rename method * spotless * Removed unused variable * Addressed changes Co-authored-by: anubhav-sharma13 <[email protected]>
1 parent 431e7c0 commit 07348c9

File tree

3 files changed

+194
-74
lines changed

3 files changed

+194
-74
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,6 +1641,80 @@ public void testQueryV1FunctionalSelectionExpressionWithNestedFieldWithAlias(Str
16411641
"mongo/test_selection_expression_nested_fields_alias_result.json");
16421642
}
16431643

1644+
@ParameterizedTest
1645+
@MethodSource("databaseContextProvider")
1646+
public void whenBulkUpdatingNonExistentRecords_thenExpectNothingToBeUpdatedOrCreated(
1647+
String datastoreName) throws Exception {
1648+
Datastore datastore = datastoreMap.get(datastoreName);
1649+
Collection collection = datastore.getCollection(COLLECTION_NAME);
1650+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
1651+
objectNode.put("foo1", "bar1");
1652+
objectNode.put("timestamp", 100);
1653+
1654+
List<BulkUpdateRequest> toUpdate = new ArrayList<>();
1655+
toUpdate.add(
1656+
new BulkUpdateRequest(
1657+
new SingleValueKey("tenant-1", "testKey1"),
1658+
new JSONDocument(objectNode),
1659+
new Filter(Op.LT, "timestamp", 100)));
1660+
toUpdate.add(
1661+
new BulkUpdateRequest(
1662+
new SingleValueKey("tenant-1", "testKey2"),
1663+
new JSONDocument(objectNode),
1664+
new Filter(Op.LT, "timestamp", 100)));
1665+
1666+
BulkUpdateResult result = collection.bulkUpdate(toUpdate);
1667+
Assertions.assertEquals(0, result.getUpdatedCount());
1668+
1669+
Query query = new Query();
1670+
query.setFilter(
1671+
new Filter(Op.EQ, "_id", new SingleValueKey("tenant-1", "testKey1").toString()));
1672+
Iterator<Document> it = collection.search(query);
1673+
assertFalse(it.hasNext());
1674+
}
1675+
1676+
@ParameterizedTest
1677+
@MethodSource("databaseContextProvider")
1678+
public void whenBulkUpdatingExistingRecords_thenExpectOnlyRecordsWhoseConditionsMatchToBeUpdated(
1679+
String dataStoreName) throws Exception {
1680+
Datastore datastore = datastoreMap.get(dataStoreName);
1681+
Collection collection = datastore.getCollection(COLLECTION_NAME);
1682+
ObjectNode persistedObject = OBJECT_MAPPER.createObjectNode();
1683+
persistedObject.put("foo1", "bar1");
1684+
persistedObject.put("timestamp", 90);
1685+
1686+
collection.create(
1687+
new SingleValueKey("tenant-1", "testKey1"), new JSONDocument(persistedObject));
1688+
1689+
ObjectNode updatedObject = OBJECT_MAPPER.createObjectNode();
1690+
updatedObject.put("foo1", "bar1");
1691+
updatedObject.put("timestamp", 110);
1692+
1693+
List<BulkUpdateRequest> toUpdate = new ArrayList<>();
1694+
toUpdate.add(
1695+
new BulkUpdateRequest(
1696+
new SingleValueKey("tenant-1", "testKey1"),
1697+
new JSONDocument(updatedObject),
1698+
new Filter(Op.LT, "timestamp", 100)));
1699+
1700+
toUpdate.add(
1701+
new BulkUpdateRequest(
1702+
new SingleValueKey("tenant-1", "testKey2"),
1703+
new JSONDocument(updatedObject),
1704+
new Filter(Op.LT, "timestamp", 100)));
1705+
1706+
BulkUpdateResult result = collection.bulkUpdate(toUpdate);
1707+
Assertions.assertEquals(1, result.getUpdatedCount());
1708+
1709+
Query query = new Query();
1710+
query.setFilter(
1711+
new Filter(Op.EQ, "_id", new SingleValueKey("tenant-1", "testKey1").toString()));
1712+
Iterator<Document> it = collection.search(query);
1713+
JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson());
1714+
Long timestamp = root.findValue("timestamp").asLong();
1715+
Assertions.assertEquals(110, timestamp);
1716+
}
1717+
16441718
private Map<String, List<CreateUpdateTestThread>> executeCreateUpdateThreads(
16451719
Collection collection, Operation operation, int numThreads, SingleValueKey documentKey) {
16461720
List<CreateUpdateTestThread> threads = new ArrayList<CreateUpdateTestThread>();

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest.Operation.REMOVE;
55
import static org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest.Operation.SET;
66
import static org.junit.jupiter.api.Assertions.assertEquals;
7-
import static org.junit.jupiter.api.Assertions.assertFalse;
87
import static org.junit.jupiter.api.Assertions.assertNotEquals;
98
import static org.junit.jupiter.api.Assertions.assertNotNull;
109
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -37,7 +36,6 @@
3736
import java.util.stream.Collectors;
3837
import org.apache.commons.lang3.tuple.ImmutablePair;
3938
import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest;
40-
import org.hypertrace.core.documentstore.BulkUpdateRequest;
4139
import org.hypertrace.core.documentstore.BulkUpdateResult;
4240
import org.hypertrace.core.documentstore.Collection;
4341
import org.hypertrace.core.documentstore.Datastore;
@@ -165,76 +163,6 @@ public void testUpsertAndReturn() throws IOException {
165163
assertEquals("bar2", node.get("foo2").asText());
166164
}
167165

168-
@Test
169-
public void whenBulkUpdatingNonExistentRecords_thenExpectNothingToBeUpdatedOrCreated()
170-
throws Exception {
171-
Collection collection = datastore.getCollection(COLLECTION_NAME);
172-
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
173-
objectNode.put("foo1", "bar1");
174-
objectNode.put("timestamp", 100);
175-
176-
List<BulkUpdateRequest> toUpdate = new ArrayList<>();
177-
toUpdate.add(
178-
new BulkUpdateRequest(
179-
new SingleValueKey("tenant-1", "testKey1"),
180-
new JSONDocument(objectNode),
181-
new Filter(Op.LT, "timestamp", 100)));
182-
toUpdate.add(
183-
new BulkUpdateRequest(
184-
new SingleValueKey("tenant-1", "testKey2"),
185-
new JSONDocument(objectNode),
186-
new Filter(Op.LT, "timestamp", 100)));
187-
188-
BulkUpdateResult result = collection.bulkUpdate(toUpdate);
189-
Assertions.assertEquals(0, result.getUpdatedCount());
190-
191-
Query query = new Query();
192-
query.setFilter(
193-
new Filter(Op.EQ, "_id", new SingleValueKey("tenant-1", "testKey1").toString()));
194-
Iterator<Document> it = collection.search(query);
195-
assertFalse(it.hasNext());
196-
}
197-
198-
@Test
199-
public void whenBulkUpdatingExistingRecords_thenExpectOnlyRecordsWhoseConditionsMatchToBeUpdated()
200-
throws Exception {
201-
Collection collection = datastore.getCollection(COLLECTION_NAME);
202-
ObjectNode persistedObject = OBJECT_MAPPER.createObjectNode();
203-
persistedObject.put("foo1", "bar1");
204-
persistedObject.put("timestamp", 90);
205-
206-
collection.create(
207-
new SingleValueKey("tenant-1", "testKey1"), new JSONDocument(persistedObject));
208-
209-
ObjectNode updatedObject = OBJECT_MAPPER.createObjectNode();
210-
updatedObject.put("foo1", "bar1");
211-
updatedObject.put("timestamp", 110);
212-
213-
List<BulkUpdateRequest> toUpdate = new ArrayList<>();
214-
toUpdate.add(
215-
new BulkUpdateRequest(
216-
new SingleValueKey("tenant-1", "testKey1"),
217-
new JSONDocument(updatedObject),
218-
new Filter(Op.LT, "timestamp", 100)));
219-
220-
toUpdate.add(
221-
new BulkUpdateRequest(
222-
new SingleValueKey("tenant-1", "testKey2"),
223-
new JSONDocument(updatedObject),
224-
new Filter(Op.LT, "timestamp", 100)));
225-
226-
BulkUpdateResult result = collection.bulkUpdate(toUpdate);
227-
Assertions.assertEquals(1, result.getUpdatedCount());
228-
229-
Query query = new Query();
230-
query.setFilter(
231-
new Filter(Op.EQ, "_id", new SingleValueKey("tenant-1", "testKey1").toString()));
232-
Iterator<Document> it = collection.search(query);
233-
JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson());
234-
Long timestamp = root.findValue("timestamp").asLong();
235-
Assertions.assertEquals(110, timestamp);
236-
}
237-
238166
@Test
239167
public void testBulkUpsertAndVerifyUpdatedTime() throws IOException {
240168
Collection collection = datastore.getCollection(COLLECTION_NAME);

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,36 @@ public CreateResult create(Key key, Document document) throws IOException {
134134

135135
@Override
136136
public BulkUpdateResult bulkUpdate(List<BulkUpdateRequest> bulkUpdateRequests) throws Exception {
137-
throw new UnsupportedOperationException();
137+
138+
List<BulkUpdateRequest> requestsWithFilter =
139+
bulkUpdateRequests.stream()
140+
.filter(req -> req.getFilter() != null)
141+
.collect(Collectors.toList());
142+
143+
List<BulkUpdateRequest> requestsWithoutFilter =
144+
bulkUpdateRequests.stream()
145+
.filter(req -> req.getFilter() == null)
146+
.collect(Collectors.toList());
147+
148+
try {
149+
long totalUpdateCountA = bulkUpdateRequestsWithFilter(requestsWithFilter);
150+
151+
long totalUpdateCountB = bulkUpdateRequestsWithoutFilter(requestsWithoutFilter);
152+
153+
if (LOGGER.isDebugEnabled()) {
154+
LOGGER.debug(
155+
"Write results for whole bulkUpdate {}", totalUpdateCountA + totalUpdateCountB);
156+
}
157+
158+
return new BulkUpdateResult(totalUpdateCountA + totalUpdateCountB);
159+
160+
} catch (IOException e) {
161+
LOGGER.error(
162+
"Exception while executing bulk update, total docs updated {}",
163+
e.getMessage(),
164+
e.getCause());
165+
throw e;
166+
}
138167
}
139168

140169
/**
@@ -323,6 +352,13 @@ private CloseableIterator<Document> executeQueryV1(
323352
protected PreparedStatement buildPreparedStatement(String sqlQuery, Params params)
324353
throws SQLException, RuntimeException {
325354
PreparedStatement preparedStatement = client.prepareStatement(sqlQuery);
355+
enrichPreparedStatementWithParams(preparedStatement, params);
356+
return preparedStatement;
357+
}
358+
359+
@VisibleForTesting
360+
protected void enrichPreparedStatementWithParams(
361+
PreparedStatement preparedStatement, Params params) throws RuntimeException {
326362
params
327363
.getObjectParams()
328364
.forEach(
@@ -337,7 +373,6 @@ protected PreparedStatement buildPreparedStatement(String sqlQuery, Params param
337373
LOGGER.error("SQLException setting Param. key: {}, value: {}", k, v);
338374
}
339375
});
340-
return preparedStatement;
341376
}
342377

343378
private boolean isValidType(Object v) {
@@ -546,6 +581,89 @@ private int[] bulkUpsertImpl(Map<Key, Document> documents) throws SQLException,
546581
}
547582
}
548583

584+
private long bulkUpdateRequestsWithFilter(List<BulkUpdateRequest> requests) throws IOException {
585+
// Note: We cannot batch statements as the filter clause can be difference for each request. So
586+
// we need one PreparedStatement for each request. We try to update the batch on a best-effort
587+
// basis. That is, if any update fails, then we still try the other ones.
588+
long totalRowsUpdated = 0;
589+
Exception sampleException = null;
590+
591+
for (BulkUpdateRequest request : requests) {
592+
Key key = request.getKey();
593+
Document document = request.getDocument();
594+
Filter filter = request.getFilter();
595+
596+
try {
597+
totalRowsUpdated += update(key, document, filter).getUpdatedCount();
598+
} catch (IOException e) {
599+
sampleException = e;
600+
LOGGER.error("SQLException updating document. key: {} content: {}", key, document, e);
601+
}
602+
}
603+
if (LOGGER.isDebugEnabled()) {
604+
LOGGER.debug("Write result for bulkUpdateWithoutFilter {}", totalRowsUpdated);
605+
}
606+
if (sampleException != null) {
607+
throw new IOException(String.valueOf(totalRowsUpdated), sampleException);
608+
}
609+
return totalRowsUpdated;
610+
}
611+
612+
private long bulkUpdateRequestsWithoutFilter(List<BulkUpdateRequest> requestsWithoutFilter)
613+
throws IOException {
614+
// We can batch all requests here since the query is the same.
615+
long totalRowsUpdated = 0;
616+
try {
617+
618+
PreparedStatement ps = client.prepareStatement(getUpdateSQL());
619+
620+
for (BulkUpdateRequest req : requestsWithoutFilter) {
621+
Key key = req.getKey();
622+
Document document = req.getDocument();
623+
624+
String jsonString = prepareDocument(key, document);
625+
Params.Builder paramsBuilder = Params.newBuilder();
626+
paramsBuilder.addObjectParam(key.toString());
627+
paramsBuilder.addObjectParam(jsonString);
628+
629+
enrichPreparedStatementWithParams(ps, paramsBuilder.build());
630+
631+
ps.addBatch();
632+
}
633+
634+
int[] updateCounts = ps.executeBatch();
635+
636+
totalRowsUpdated = Arrays.stream(updateCounts).filter(updateCount -> updateCount >= 0).sum();
637+
638+
if (LOGGER.isDebugEnabled()) {
639+
LOGGER.debug("Write result: {}", totalRowsUpdated);
640+
}
641+
642+
return totalRowsUpdated;
643+
644+
} catch (BatchUpdateException e) {
645+
totalRowsUpdated =
646+
Arrays.stream(e.getUpdateCounts()).filter(updateCount -> updateCount >= 0).sum();
647+
LOGGER.error(
648+
"BatchUpdateException while executing batch. Total rows updated: {}",
649+
totalRowsUpdated,
650+
e);
651+
throw new IOException(String.valueOf(totalRowsUpdated), e);
652+
653+
} catch (SQLException e) {
654+
LOGGER.error(
655+
"SQLException bulk updating documents (without filters). SQLState: {} Error Code:{}",
656+
e.getSQLState(),
657+
e.getErrorCode(),
658+
e);
659+
throw new IOException(String.valueOf(totalRowsUpdated), e);
660+
661+
} catch (IOException e) {
662+
LOGGER.error("IOException during bulk update requests without filter", e);
663+
throw new IOException(String.valueOf(totalRowsUpdated), e);
664+
}
665+
}
666+
549667
@Override
550668
public CloseableIterator<Document> bulkUpsertAndReturnOlderDocuments(Map<Key, Document> documents)
551669
throws IOException {

0 commit comments

Comments
 (0)