Skip to content

Commit 6b4d418

Browse files
ankitchoudhary111Ankit ChoudharyAnkit Choudharyskjindal93
authored
Use UpdateMany() instead of UpdateOne() for bulk updates (#53)
* used updateMany() instead of updateOne() for bulk update operations * added implementation for postgres * added bulkUpdateSubDoc * added integration tests * changed return value for bulkUpdateSubDocs from int to BulkUpdateResult * Update document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java Co-authored-by: SJ <48863181+skjindal93@users.noreply.github.com> Co-authored-by: Ankit Choudhary <ankitchoudhary@Ankits-MacBook-Pro.local> Co-authored-by: Ankit Choudhary <ankitchoudhary@192.168.2.102> Co-authored-by: SJ <48863181+skjindal93@users.noreply.github.com>
1 parent 125a600 commit 6b4d418

File tree

4 files changed

+164
-3
lines changed

4 files changed

+164
-3
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.hypertrace.core.documentstore.utils.CreateUpdateTestThread.FAILURE;
44
import static org.hypertrace.core.documentstore.utils.CreateUpdateTestThread.SUCCESS;
55
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
import static org.junit.jupiter.api.Assertions.assertFalse;
67
import static org.junit.jupiter.api.Assertions.assertThrows;
78
import static org.junit.jupiter.api.Assertions.assertTrue;
89

@@ -629,6 +630,81 @@ public void testSubDocumentUpdate(String dataStoreName) throws IOException {
629630
Assertions.assertEquals(expected, OBJECT_MAPPER.writeValueAsString(jsonNode));
630631
}
631632

633+
@ParameterizedTest
634+
@MethodSource("databaseContextProvider")
635+
public void bulkUpdateSubDocForNonExistingDocuments(String dataStoreName) throws Exception {
636+
Datastore datastore = datastoreMap.get(dataStoreName);
637+
Collection collection = datastore.getCollection(COLLECTION_NAME);
638+
Map<Key, Map<String, Document>> toUpdate = new HashMap<>();
639+
Key key1 = new SingleValueKey("tenant-1", "testKey1");
640+
Key key2 = new SingleValueKey("tenant-2", "testKey2");
641+
Map<String, Document> subDoc1 = new HashMap<>(), subDoc2 = new HashMap<>();
642+
subDoc1.put("subDocPath1", Utils.createDocument("timestamp", "100"));
643+
subDoc2.put("subDocPath2", Utils.createDocument("timestamp", "100"));
644+
toUpdate.put(key1, subDoc1);
645+
toUpdate.put(key2, subDoc2);
646+
BulkUpdateResult bulkUpdateResult = collection.bulkUpdateSubDocs(toUpdate);
647+
long result = bulkUpdateResult.getUpdatedCount();
648+
assertEquals(0, result);
649+
}
650+
651+
@ParameterizedTest
652+
@MethodSource("databaseContextProvider")
653+
public void bulkUpdateSubDocForEmptyMap(String dataStoreName) throws Exception {
654+
Datastore datastore = datastoreMap.get(dataStoreName);
655+
Collection collection = datastore.getCollection(COLLECTION_NAME);
656+
Map<Key, Map<String, Document>> toUpdate = new HashMap<>();
657+
BulkUpdateResult bulkUpdateResult = collection.bulkUpdateSubDocs(toUpdate);
658+
long result = bulkUpdateResult.getUpdatedCount();
659+
assertEquals(0, result);
660+
}
661+
662+
@ParameterizedTest
663+
@MethodSource("databaseContextProvider")
664+
public void bulkUpdateSubDocOnlyForExistingDocuments(String dataStoreName) throws Exception {
665+
Datastore datastore = datastoreMap.get(dataStoreName);
666+
Collection collection = datastore.getCollection(COLLECTION_NAME);
667+
Key key1 = new SingleValueKey("tenant-1", "testKey1");
668+
Key key2 = new SingleValueKey("tenant-2", "testKey2");
669+
Key key3 = new SingleValueKey("tenant-3", "testKey3");
670+
collection.upsert(key1, Utils.createDocument("foo1", "bar1"));
671+
collection.upsert(key3, Utils.createDocument("foo3", "bar3"));
672+
673+
Map<Key, Map<String, Document>> toUpdate = new HashMap<>();
674+
Map<String, Document> subDoc1 = new HashMap<>(),
675+
subDoc2 = new HashMap<>(),
676+
subDoc3 = new HashMap<>();
677+
subDoc1.put("subDocPath1", Utils.createDocument("nested1", "100"));
678+
subDoc2.put("subDocPath2", Utils.createDocument("nested2", "100"));
679+
// update on already existing subDocPath
680+
subDoc3.put("foo3", Utils.createDocument("nested3", "100"));
681+
toUpdate.put(key1, subDoc1);
682+
toUpdate.put(key2, subDoc2);
683+
toUpdate.put(key3, subDoc3);
684+
BulkUpdateResult bulkUpdateResult = collection.bulkUpdateSubDocs(toUpdate);
685+
long result = bulkUpdateResult.getUpdatedCount();
686+
assertEquals(2, result);
687+
688+
Query query = new Query();
689+
query.setFilter(new Filter(Op.EQ, "_id", key1.toString()));
690+
Iterator<Document> it = collection.search(query);
691+
JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson());
692+
String nestedTimestamp = root.findValue("subDocPath1").toString();
693+
assertEquals("{\"nested1\":\"100\"}", nestedTimestamp);
694+
695+
query = new Query();
696+
query.setFilter(new Filter(Op.EQ, "_id", key3.toString()));
697+
it = collection.search(query);
698+
root = OBJECT_MAPPER.readTree(it.next().toJson());
699+
nestedTimestamp = root.findValue("foo3").toString();
700+
assertEquals("{\"nested3\":\"100\"}", nestedTimestamp);
701+
702+
query = new Query();
703+
query.setFilter(new Filter(Op.EQ, "_id", key2.toString()));
704+
it = collection.search(query);
705+
assertFalse(it.hasNext());
706+
}
707+
632708
@ParameterizedTest
633709
@MethodSource("databaseContextProvider")
634710
public void testSubDocumentDelete(String dataStoreName) throws IOException {

document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,19 @@ public interface Collection {
3333
* @param key Unique key of the document in the collection.
3434
* @param subDocPath Path to the sub document that needs to be updated
3535
* @param subDocument Sub document that needs to be updated at the above path
36+
* @deprecated use {@link #bulkUpdateSubDocs(Map)} ()} instead.
3637
*/
38+
@Deprecated
3739
boolean updateSubDoc(Key key, String subDocPath, Document subDocument);
3840

41+
/**
42+
* Updates sub documents
43+
*
44+
* @param documents contains the mapping of key and the corresponding sub doc update queries
45+
* @return the update count or -1 if there is any exception
46+
*/
47+
BulkUpdateResult bulkUpdateSubDocs(Map<Key, Map<String, Document>> documents) throws Exception;
48+
3949
/**
4050
* Search for documents matching the query
4151
*
@@ -53,7 +63,7 @@ public interface Collection {
5363
boolean delete(Key key);
5464

5565
/**
56-
* Deletes a sub documents
66+
* Deletes a sub document
5767
*
5868
* @param key Unique key of the document in the collection
5969
* @param subDocPath Path to the sub document that needs to be updated

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.mongodb.client.model.FindOneAndUpdateOptions;
1717
import com.mongodb.client.model.Projections;
1818
import com.mongodb.client.model.ReturnDocument;
19+
import com.mongodb.client.model.UpdateManyModel;
1920
import com.mongodb.client.model.UpdateOneModel;
2021
import com.mongodb.client.model.UpdateOptions;
2122
import com.mongodb.client.result.DeleteResult;
@@ -283,17 +284,55 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) {
283284
UpdateResult writeResult =
284285
collection.updateOne(selectionCriteriaForKey(key), setObject, new UpdateOptions());
285286
if (LOGGER.isDebugEnabled()) {
286-
LOGGER.debug("Write result: " + writeResult.toString());
287+
LOGGER.debug("Write result: " + writeResult);
287288
}
288289
// TODO:look into the writeResult to ensure it was successful. Was not easy to find this from
289290
// docs.
290291
return true;
291292
} catch (IOException e) {
292-
LOGGER.error("Exception inserting document. key: {} content:{}", key, subDocument);
293+
LOGGER.error("Exception updating document. key: {} content:{}", key, subDocument);
293294
return false;
294295
}
295296
}
296297

298+
@Override
299+
public BulkUpdateResult bulkUpdateSubDocs(Map<Key, Map<String, Document>> documents)
300+
throws Exception {
301+
List<UpdateManyModel<BasicDBObject>> bulkWriteUpdate = new ArrayList<>();
302+
for (Key key : documents.keySet()) {
303+
Map<String, Document> subDocuments = documents.get(key);
304+
List<BasicDBObject> updateOperations = new ArrayList<>();
305+
for (String subDocPath : subDocuments.keySet()) {
306+
Document subDocument = subDocuments.get(subDocPath);
307+
String jsonString = subDocument.toJson();
308+
try {
309+
JsonNode jsonNode = MAPPER.readTree(jsonString);
310+
// escape "." and "$" in field names since Mongo DB does not like them
311+
JsonNode sanitizedJsonNode = recursiveClone(jsonNode, this::encodeKey);
312+
BasicDBObject dbObject =
313+
new BasicDBObject(
314+
subDocPath, BasicDBObject.parse(MAPPER.writeValueAsString(sanitizedJsonNode)));
315+
dbObject.append(LAST_UPDATED_TIME, System.currentTimeMillis());
316+
BasicDBObject setObject = new BasicDBObject("$set", dbObject);
317+
updateOperations.add(setObject);
318+
} catch (Exception e) {
319+
LOGGER.error("Exception updating document. key: {} content:{}", key, subDocument);
320+
throw e;
321+
}
322+
}
323+
bulkWriteUpdate.add(
324+
new UpdateManyModel(selectionCriteriaForKey(key), updateOperations, new UpdateOptions()));
325+
}
326+
if (bulkWriteUpdate.isEmpty()) {
327+
return new BulkUpdateResult(0);
328+
}
329+
BulkWriteResult writeResult = collection.bulkWrite(bulkWriteUpdate);
330+
if (LOGGER.isDebugEnabled()) {
331+
LOGGER.debug("Write result: " + writeResult);
332+
}
333+
return new BulkUpdateResult(writeResult.getModifiedCount());
334+
}
335+
297336
private JsonNode recursiveClone(JsonNode src, Function<String, String> function) {
298337
if (!src.isObject()) {
299338
return src;

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,42 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) {
189189
return false;
190190
}
191191

192+
@Override
193+
public BulkUpdateResult bulkUpdateSubDocs(Map<Key, Map<String, Document>> documents)
194+
throws Exception {
195+
String updateSubDocSQL =
196+
String.format(
197+
"UPDATE %s SET %s=jsonb_set(%s, ?::text[], ?::jsonb) WHERE %s = ?",
198+
collectionName, DOCUMENT, DOCUMENT, ID);
199+
try {
200+
PreparedStatement preparedStatement = client.prepareStatement(updateSubDocSQL);
201+
for (Key key : documents.keySet()) {
202+
Map<String, Document> subDocuments = documents.get(key);
203+
for (String subDocPath : subDocuments.keySet()) {
204+
Document subDocument = subDocuments.get(subDocPath);
205+
String jsonSubDocPath = getJsonSubDocPath(subDocPath);
206+
String jsonString = subDocument.toJson();
207+
preparedStatement.setString(1, jsonSubDocPath);
208+
preparedStatement.setString(2, jsonString);
209+
preparedStatement.setString(3, key.toString());
210+
preparedStatement.addBatch();
211+
}
212+
}
213+
int[] updateCounts = preparedStatement.executeBatch();
214+
if (LOGGER.isDebugEnabled()) {
215+
LOGGER.debug("Write result: {}", updateCounts);
216+
}
217+
int totalUpdateCount = 0;
218+
for (int update : updateCounts) {
219+
totalUpdateCount += update;
220+
}
221+
return new BulkUpdateResult(totalUpdateCount);
222+
} catch (SQLException e) {
223+
LOGGER.error("SQLException updating sub document.", e);
224+
throw e;
225+
}
226+
}
227+
192228
@Override
193229
public Iterator<Document> search(Query query) {
194230
String filters = null;

0 commit comments

Comments
 (0)