Skip to content

Commit f38193f

Browse files
Implement aggregate count API for MongoCollection (#84)
* Implement aggregate count API for MongoCollection
1 parent e1195c4 commit f38193f

File tree

6 files changed

+91
-3
lines changed

6 files changed

+91
-3
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorIntegrationTest.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.hypertrace.core.documentstore.utils.Utils.convertJsonToMap;
2222
import static org.hypertrace.core.documentstore.utils.Utils.createDocumentsFromResource;
2323
import static org.hypertrace.core.documentstore.utils.Utils.readFileFromResource;
24+
import static org.junit.jupiter.api.Assertions.assertEquals;
2425
import static org.junit.jupiter.api.Assertions.assertFalse;
2526
import static org.junit.jupiter.api.Assertions.assertThrows;
2627

@@ -52,7 +53,6 @@
5253
import org.hypertrace.core.documentstore.query.Sort;
5354
import org.hypertrace.core.documentstore.query.SortingSpec;
5455
import org.junit.jupiter.api.AfterAll;
55-
import org.junit.jupiter.api.Assertions;
5656
import org.junit.jupiter.api.BeforeAll;
5757
import org.junit.jupiter.api.Test;
5858
import org.testcontainers.containers.GenericContainer;
@@ -103,6 +103,7 @@ public void testFindAll() throws IOException {
103103

104104
Iterator<Document> resultDocs = collection.find(query);
105105
assertSizeEqual(resultDocs, "mongo/collection_data.json");
106+
assertSizeEqual(query, "mongo/collection_data.json");
106107
}
107108

108109
@Test
@@ -137,6 +138,7 @@ public void testFindSimple() throws IOException {
137138

138139
Iterator<Document> resultDocs = collection.find(query);
139140
assertDocsEqual(resultDocs, "mongo/simple_filter_response.json");
141+
assertSizeEqual(query, "mongo/simple_filter_response.json");
140142
}
141143

142144
@Test
@@ -163,6 +165,7 @@ public void testFindWithDuplicateSelections() throws IOException {
163165

164166
Iterator<Document> resultDocs = collection.find(query);
165167
assertDocsEqual(resultDocs, "mongo/simple_filter_response.json");
168+
assertSizeEqual(query, "mongo/simple_filter_response.json");
166169
}
167170

168171
@Test
@@ -202,6 +205,7 @@ public void testFindWithSortingAndPagination() throws IOException {
202205

203206
Iterator<Document> resultDocs = collection.find(query);
204207
assertDocsEqual(resultDocs, "mongo/filter_with_sorting_and_pagination_response.json");
208+
assertSizeEqual(query, "mongo/filter_with_sorting_and_pagination_response.json");
205209
}
206210

207211
@Test
@@ -243,6 +247,7 @@ public void testFindWithDuplicateSortingAndPagination() throws IOException {
243247

244248
Iterator<Document> resultDocs = collection.find(query);
245249
assertDocsEqual(resultDocs, "mongo/filter_with_sorting_and_pagination_response.json");
250+
assertSizeEqual(query, "mongo/filter_with_sorting_and_pagination_response.json");
246251
}
247252

248253
@Test
@@ -279,6 +284,7 @@ public void testFindWithNestedFields() throws IOException {
279284

280285
Iterator<Document> resultDocs = collection.find(query);
281286
assertDocsEqual(resultDocs, "mongo/filter_on_nested_fields_response.json");
287+
assertSizeEqual(query, "mongo/filter_on_nested_fields_response.json");
282288
}
283289

284290
@Test
@@ -287,6 +293,7 @@ public void testAggregateEmpty() throws IOException {
287293

288294
Iterator<Document> resultDocs = collection.aggregate(query);
289295
assertSizeEqual(resultDocs, "mongo/collection_data.json");
296+
assertSizeEqual(query, "mongo/collection_data.json");
290297
}
291298

292299
@Test
@@ -298,6 +305,7 @@ public void testAggregateSimple() throws IOException {
298305

299306
Iterator<Document> resultDocs = collection.aggregate(query);
300307
assertDocsEqual(resultDocs, "mongo/count_response.json");
308+
assertSizeEqual(query, "mongo/count_response.json");
301309
}
302310

303311
@Test
@@ -310,6 +318,7 @@ public void testAggregateWithDuplicateSelections() throws IOException {
310318

311319
Iterator<Document> resultDocs = collection.aggregate(query);
312320
assertDocsEqual(resultDocs, "mongo/count_response.json");
321+
assertSizeEqual(query, "mongo/count_response.json");
313322
}
314323

315324
@Test
@@ -346,6 +355,7 @@ public void testAggregateWithFiltersAndOrdering() throws IOException {
346355

347356
Iterator<Document> resultDocs = collection.aggregate(query);
348357
assertDocsEqual(resultDocs, "mongo/sum_response.json");
358+
assertSizeEqual(query, "mongo/sum_response.json");
349359
}
350360

351361
@Test
@@ -385,6 +395,7 @@ public void testAggregateWithFiltersAndDuplicateOrderingAndDuplicateAggregations
385395

386396
Iterator<Document> resultDocs = collection.aggregate(query);
387397
assertDocsEqual(resultDocs, "mongo/sum_response.json");
398+
assertSizeEqual(query, "mongo/sum_response.json");
388399
}
389400

390401
@Test
@@ -403,6 +414,7 @@ public void testAggregateWithNestedFields() throws IOException {
403414

404415
Iterator<Document> resultDocs = collection.aggregate(query);
405416
assertDocsEqual(resultDocs, "mongo/aggregate_on_nested_fields_response.json");
417+
assertSizeEqual(query, "mongo/aggregate_on_nested_fields_response.json");
406418
}
407419

408420
@Test
@@ -417,6 +429,7 @@ public void testAggregateWithoutAggregationAlias() {
417429
.build();
418430

419431
assertThrows(IllegalArgumentException.class, () -> collection.aggregate(query));
432+
assertThrows(IllegalArgumentException.class, () -> collection.count(query));
420433
}
421434

422435
@Test
@@ -441,6 +454,7 @@ public void testAggregateWithUnsupportedExpressionNesting() {
441454
.build();
442455

443456
assertThrows(UnsupportedOperationException.class, () -> collection.aggregate(query));
457+
assertThrows(UnsupportedOperationException.class, () -> collection.count(query));
444458
}
445459

446460
@Test
@@ -467,6 +481,7 @@ public void testAggregateWithMultipleGroupingLevels() throws IOException {
467481

468482
Iterator<Document> resultDocs = collection.aggregate(query);
469483
assertDocsEqual(resultDocs, "mongo/multi_level_grouping_response.json");
484+
assertSizeEqual(query, "mongo/multi_level_grouping_response.json");
470485
}
471486

472487
@Test
@@ -487,6 +502,7 @@ public void testDistinctCount() throws IOException {
487502

488503
Iterator<Document> resultDocs = collection.aggregate(query);
489504
assertDocsEqual(resultDocs, "mongo/distinct_count_response.json");
505+
assertSizeEqual(query, "mongo/distinct_count_response.json");
490506
}
491507

492508
@Test
@@ -506,6 +522,7 @@ public void testUnnestAndAggregate() throws IOException {
506522

507523
Iterator<Document> iterator = collection.aggregate(query);
508524
assertDocsEqual(iterator, "mongo/aggregate_on_nested_array_reponse.json");
525+
assertSizeEqual(query, "mongo/aggregate_on_nested_array_reponse.json");
509526
}
510527

511528
@Test
@@ -520,6 +537,7 @@ public void testUnnestAndAggregate_preserveEmptyTrue() throws IOException {
520537

521538
Iterator<Document> iterator = collection.aggregate(query);
522539
assertDocsEqual(iterator, "mongo/unwind_preserving_empty_array_response.json");
540+
assertSizeEqual(query, "mongo/unwind_preserving_empty_array_response.json");
523541
}
524542

525543
@Test
@@ -534,6 +552,7 @@ public void testUnnestAndAggregate_preserveEmptyFalse() throws IOException {
534552

535553
Iterator<Document> iterator = collection.aggregate(query);
536554
assertDocsEqual(iterator, "mongo/unwind_not_preserving_empty_array_response.json");
555+
assertSizeEqual(query, "mongo/unwind_not_preserving_empty_array_response.json");
537556
}
538557

539558
@Test
@@ -553,6 +572,7 @@ public void testUnnest() throws IOException {
553572

554573
Iterator<Document> iterator = collection.aggregate(query);
555574
assertDocsEqual(iterator, "mongo/unwind_response.json");
575+
assertSizeEqual(query, "mongo/unwind_response.json");
556576
}
557577

558578
@Test
@@ -581,6 +601,7 @@ public void testFilterAndUnnest() throws IOException {
581601

582602
Iterator<Document> iterator = collection.aggregate(query);
583603
assertDocsEqual(iterator, "mongo/unwind_filter_response.json");
604+
assertSizeEqual(query, "mongo/unwind_filter_response.json");
584605
}
585606

586607
private static void assertDocsEqual(Iterator<Document> documents, String filePath)
@@ -593,7 +614,7 @@ private static void assertDocsEqual(Iterator<Document> documents, String filePat
593614
actual.add(convertDocumentToMap(documents.next()));
594615
}
595616

596-
Assertions.assertEquals(expected, actual);
617+
assertEquals(expected, actual);
597618
}
598619

599620
private static void assertSizeEqual(Iterator<Document> documents, String filePath)
@@ -606,6 +627,13 @@ private static void assertSizeEqual(Iterator<Document> documents, String filePat
606627
documents.next();
607628
}
608629

609-
Assertions.assertEquals(expected, actual);
630+
assertEquals(expected, actual);
631+
}
632+
633+
private static void assertSizeEqual(final Query query, final String filePath) throws IOException {
634+
final long actualSize = collection.count(query);
635+
final String fileContent = readFileFromResource(filePath).orElseThrow();
636+
final long expectedSize = convertJsonToMap(fileContent).size();
637+
assertEquals(expectedSize, actualSize);
610638
}
611639
}

document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ public interface Collection {
120120
*/
121121
long total(Query query);
122122

123+
/**
124+
* Count the result-set size of executing the given query. Note that this method is a generic
125+
* version of {@link #count()} and {@link #total(Query)}
126+
*
127+
* @param query The query definition whose result-set size is to be determined
128+
* @return The number of documents conforming to the input query
129+
*/
130+
long count(final org.hypertrace.core.documentstore.query.Query query);
131+
123132
/**
124133
* @param documents to be upserted in bulk
125134
* @return true if the operation succeeded

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,11 @@ public CloseableIterator<Document> aggregate(
474474
return convertToDocumentIterator(queryExecutor.aggregate(query));
475475
}
476476

477+
@Override
478+
public long count(org.hypertrace.core.documentstore.query.Query query) {
479+
return queryExecutor.count(query);
480+
}
481+
477482
@Override
478483
public boolean delete(Key key) {
479484
DeleteResult deleteResult = collection.deleteOne(this.selectionCriteriaForKey(key));

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutor.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.hypertrace.core.documentstore.mongo;
22

3+
import static java.lang.Long.parseLong;
34
import static java.util.Collections.singleton;
45
import static java.util.function.Predicate.not;
56
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.applyPagination;
67
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.getLimitClause;
78
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.getSkipClause;
9+
import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.COUNT_ALIAS;
10+
import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.getCountClause;
811
import static org.hypertrace.core.documentstore.mongo.parser.MongoFilterTypeExpressionParser.getFilter;
912
import static org.hypertrace.core.documentstore.mongo.parser.MongoFilterTypeExpressionParser.getFilterClause;
1013
import static org.hypertrace.core.documentstore.mongo.parser.MongoGroupTypeExpressionParser.getGroupClause;
@@ -21,6 +24,7 @@
2124
import java.util.List;
2225
import java.util.function.Function;
2326
import java.util.stream.Collectors;
27+
import java.util.stream.Stream;
2428
import lombok.AllArgsConstructor;
2529
import lombok.extern.slf4j.Slf4j;
2630
import org.bson.conversions.Bson;
@@ -79,6 +83,29 @@ public MongoCursor<BasicDBObject> aggregate(final Query originalQuery) {
7983
return iterable.cursor();
8084
}
8185

86+
public long count(final Query originalQuery) {
87+
final Query query = transformAndLog(originalQuery);
88+
89+
final List<BasicDBObject> pipeline =
90+
Stream.concat(
91+
AGGREGATE_PIPELINE_FUNCTIONS.stream()
92+
.flatMap(function -> function.apply(query).stream()),
93+
Stream.of(getCountClause()))
94+
.filter(not(BasicDBObject::isEmpty))
95+
.collect(Collectors.toList());
96+
97+
logPipeline(pipeline);
98+
final AggregateIterable<BasicDBObject> iterable = collection.aggregate(pipeline);
99+
100+
try (final MongoCursor<BasicDBObject> cursor = iterable.cursor()) {
101+
if (cursor.hasNext()) {
102+
return parseLong(cursor.next().get(COUNT_ALIAS).toString());
103+
}
104+
}
105+
106+
return 0;
107+
}
108+
82109
private void logClauses(
83110
final Query query,
84111
final Bson projection,
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.hypertrace.core.documentstore.mongo.clause;
2+
3+
import static org.hypertrace.core.documentstore.mongo.MongoUtils.PREFIX;
4+
5+
import com.mongodb.BasicDBObject;
6+
7+
public class MongoCountClauseSupplier {
8+
public static final String COUNT_ALIAS = "count";
9+
private static final String COUNT_CLAUSE = PREFIX + "count";
10+
11+
public static BasicDBObject getCountClause() {
12+
return new BasicDBObject(COUNT_CLAUSE, COUNT_ALIAS);
13+
}
14+
}

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ public CloseableIterator<Document> aggregate(
290290
throw new UnsupportedOperationException();
291291
}
292292

293+
@Override
294+
public long count(org.hypertrace.core.documentstore.query.Query query) {
295+
throw new UnsupportedOperationException();
296+
}
297+
293298
@VisibleForTesting
294299
protected PreparedStatement buildPreparedStatement(String sqlQuery, Params params)
295300
throws SQLException, RuntimeException {

0 commit comments

Comments
 (0)