Skip to content

Commit 8d963fc

Browse files
Add option to configure change stream behaviour at collection creation time.
Introduce CollectionChangeStreamOptions which allows to define the changeStreamPreAndPostImages of the createCollection command. Original Pull Request: #4193
1 parent c1de745 commit 8d963fc

File tree

4 files changed

+85
-13
lines changed

4 files changed

+85
-13
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/CollectionOptions.java

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,19 @@ public class CollectionOptions {
4646
private @Nullable Collation collation;
4747
private ValidationOptions validationOptions;
4848
private @Nullable TimeSeriesOptions timeSeriesOptions;
49+
private @Nullable CollectionChangeStreamOptions changeStreamOptions;
4950

5051
private CollectionOptions(@Nullable Long size, @Nullable Long maxDocuments, @Nullable Boolean capped,
5152
@Nullable Collation collation, ValidationOptions validationOptions,
52-
@Nullable TimeSeriesOptions timeSeriesOptions) {
53+
@Nullable TimeSeriesOptions timeSeriesOptions, @Nullable CollectionChangeStreamOptions changeStreamOptions) {
5354

5455
this.maxDocuments = maxDocuments;
5556
this.size = size;
5657
this.capped = capped;
5758
this.collation = collation;
5859
this.validationOptions = validationOptions;
5960
this.timeSeriesOptions = timeSeriesOptions;
61+
this.changeStreamOptions = changeStreamOptions;
6062
}
6163

6264
/**
@@ -70,7 +72,7 @@ public static CollectionOptions just(Collation collation) {
7072

7173
Assert.notNull(collation, "Collation must not be null");
7274

73-
return new CollectionOptions(null, null, null, collation, ValidationOptions.none(), null);
75+
return new CollectionOptions(null, null, null, collation, ValidationOptions.none(), null, null);
7476
}
7577

7678
/**
@@ -80,7 +82,7 @@ public static CollectionOptions just(Collation collation) {
8082
* @since 2.0
8183
*/
8284
public static CollectionOptions empty() {
83-
return new CollectionOptions(null, null, null, null, ValidationOptions.none(), null);
85+
return new CollectionOptions(null, null, null, null, ValidationOptions.none(), null, null);
8486
}
8587

8688
/**
@@ -97,6 +99,18 @@ public static CollectionOptions timeSeries(String timeField) {
9799
return empty().timeSeries(TimeSeriesOptions.timeSeries(timeField));
98100
}
99101

102+
/**
103+
* Quick way to set up {@link CollectionOptions} for emitting (pre & post) change events.
104+
*
105+
* @return new instance of {@link CollectionOptions}.
106+
* @see #changeStream(CollectionChangeStreamOptions)
107+
* @see CollectionChangeStreamOptions#preAndPostImages(boolean)
108+
* @since 4.0
109+
*/
110+
public static CollectionOptions emitChangedRevisions() {
111+
return empty().changeStream(CollectionChangeStreamOptions.preAndPostImages(true));
112+
}
113+
100114
/**
101115
* Create new {@link CollectionOptions} with already given settings and capped set to {@literal true}. <br />
102116
* <strong>NOTE</strong> Using capped collections requires defining {@link #size(long)}.
@@ -105,7 +119,7 @@ public static CollectionOptions timeSeries(String timeField) {
105119
* @since 2.0
106120
*/
107121
public CollectionOptions capped() {
108-
return new CollectionOptions(size, maxDocuments, true, collation, validationOptions, null);
122+
return new CollectionOptions(size, maxDocuments, true, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
109123
}
110124

111125
/**
@@ -116,7 +130,7 @@ public CollectionOptions capped() {
116130
* @since 2.0
117131
*/
118132
public CollectionOptions maxDocuments(long maxDocuments) {
119-
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
133+
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
120134
}
121135

122136
/**
@@ -127,7 +141,7 @@ public CollectionOptions maxDocuments(long maxDocuments) {
127141
* @since 2.0
128142
*/
129143
public CollectionOptions size(long size) {
130-
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
144+
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
131145
}
132146

133147
/**
@@ -138,7 +152,7 @@ public CollectionOptions size(long size) {
138152
* @since 2.0
139153
*/
140154
public CollectionOptions collation(@Nullable Collation collation) {
141-
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
155+
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
142156
}
143157

144158
/**
@@ -258,7 +272,7 @@ public CollectionOptions schemaValidationAction(ValidationAction validationActio
258272
public CollectionOptions validation(ValidationOptions validationOptions) {
259273

260274
Assert.notNull(validationOptions, "ValidationOptions must not be null");
261-
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
275+
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
262276
}
263277

264278
/**
@@ -271,7 +285,20 @@ public CollectionOptions validation(ValidationOptions validationOptions) {
271285
public CollectionOptions timeSeries(TimeSeriesOptions timeSeriesOptions) {
272286

273287
Assert.notNull(timeSeriesOptions, "TimeSeriesOptions must not be null");
274-
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
288+
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
289+
}
290+
291+
/**
292+
* Create new {@link CollectionOptions} with the given {@link TimeSeriesOptions}.
293+
*
294+
* @param changeStreamOptions must not be {@literal null}.
295+
* @return new instance of {@link CollectionOptions}.
296+
* @since 3.3
297+
*/
298+
public CollectionOptions changeStream(CollectionChangeStreamOptions changeStreamOptions) {
299+
300+
Assert.notNull(changeStreamOptions, "ChangeStreamOptions must not be null");
301+
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
275302
}
276303

277304
/**
@@ -332,6 +359,16 @@ public Optional<TimeSeriesOptions> getTimeSeriesOptions() {
332359
return Optional.ofNullable(timeSeriesOptions);
333360
}
334361

362+
/**
363+
* Get the {@link CollectionChangeStreamOptions} if available.
364+
*
365+
* @return {@link Optional#empty()} if not specified.
366+
* @since 4.0
367+
*/
368+
public Optional<CollectionChangeStreamOptions> getChangeStreamOptions() {
369+
return Optional.ofNullable(changeStreamOptions);
370+
}
371+
335372
/**
336373
* Encapsulation of ValidationOptions options.
337374
*
@@ -428,6 +465,34 @@ boolean isEmpty() {
428465
}
429466
}
430467

468+
/**
469+
* Encapsulation of options applied to define collections change stream behaviour.
470+
*
471+
* @author Christoph Strobl
472+
* @since 4.0
473+
*/
474+
public static class CollectionChangeStreamOptions {
475+
476+
private final boolean preAndPostImages;
477+
478+
private CollectionChangeStreamOptions(boolean emitChangedRevisions) {
479+
this.preAndPostImages = emitChangedRevisions;
480+
}
481+
482+
/**
483+
* Output the version of a document before and after changes (the document pre- and post-images).
484+
*
485+
* @return new instance of {@link CollectionChangeStreamOptions}.
486+
*/
487+
public static CollectionChangeStreamOptions preAndPostImages(boolean emitChangedRevisions) {
488+
return new CollectionChangeStreamOptions(true);
489+
}
490+
491+
public boolean getPreAndPostImages() {
492+
return preAndPostImages;
493+
}
494+
}
495+
431496
/**
432497
* Options applicable to Time Series collections.
433498
*

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/EntityOperations.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Optional;
2222

2323
import org.bson.Document;
24-
2524
import org.springframework.core.convert.ConversionService;
2625
import org.springframework.dao.InvalidDataAccessApiUsageException;
2726
import org.springframework.data.convert.CustomConversions;
@@ -57,6 +56,7 @@
5756
import org.springframework.util.ObjectUtils;
5857
import org.springframework.util.StringUtils;
5958

59+
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
6060
import com.mongodb.client.model.CreateCollectionOptions;
6161
import com.mongodb.client.model.TimeSeriesGranularity;
6262
import com.mongodb.client.model.ValidationOptions;
@@ -341,6 +341,9 @@ public CreateCollectionOptions convertToCreateCollectionOptions(@Nullable Collec
341341
result.timeSeriesOptions(options);
342342
});
343343

344+
collectionOptions.getChangeStreamOptions().ifPresent(it -> result
345+
.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(it.getPreAndPostImages())));
346+
344347
return result;
345348
}
346349

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2511,6 +2511,11 @@ protected Document convertToDocument(@Nullable CollectionOptions collectionOptio
25112511
doc.put("timeseries", timeseries);
25122512
});
25132513

2514+
collectionOptions.getChangeStreamOptions().map(it -> new Document("enabled", it.getPreAndPostImages()))
2515+
.ifPresent(it -> {
2516+
doc.put("changeStreamPreAndPostImages", it);
2517+
});
2518+
25142519
return doc;
25152520
}
25162521

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.junit.jupiter.api.extension.ExtendWith;
4747
import org.springframework.data.annotation.Id;
4848
import org.springframework.data.mongodb.core.ChangeStreamOptions;
49+
import org.springframework.data.mongodb.core.CollectionOptions;
4950
import org.springframework.data.mongodb.core.mapping.Field;
5051
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
5152
import org.springframework.data.mongodb.core.messaging.ChangeStreamTask.ChangeStreamEventMessage;
@@ -724,9 +725,7 @@ void readsFullDocumentBeforeChangeWhenOptionDeclaredRequiredAndMongoVersionIsLes
724725
}
725726

726727
private void createUserCollectionWithChangeStreamPreAndPostImagesEnabled() {
727-
CreateCollectionOptions createCollectionOptions = new CreateCollectionOptions();
728-
createCollectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
729-
template.getDb().createCollection("user", createCollectionOptions);
728+
template.createCollection(User.class, CollectionOptions.emitChangedRevisions());
730729
}
731730

732731
@Data

0 commit comments

Comments
 (0)