Skip to content

Commit c4b5941

Browse files
mrjoe7rozza
andcommitted
Streaming added full document before change support
SPARK-449 Original PR: #140 --------- Co-authored-by: Ross Lawley <[email protected]>
1 parent c97fdcc commit c4b5941

File tree

6 files changed

+135
-0
lines changed

6 files changed

+135
-0
lines changed

src/integrationTest/java/com/mongodb/spark/sql/connector/mongodb/MongoSparkConnectorTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ public boolean isAtLeastFiveDotZero() {
101101
return getMaxWireVersion() >= 12;
102102
}
103103

104+
public boolean isAtLeastSixDotZero() {
105+
return getMaxWireVersion() >= 17;
106+
}
107+
104108
public boolean isAtLeastSevenDotZero() {
105109
return getMaxWireVersion() >= 21;
106110
}

src/integrationTest/java/com/mongodb/spark/sql/connector/read/AbstractMongoStreamTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939

4040
import com.mongodb.client.MongoCollection;
4141
import com.mongodb.client.MongoDatabase;
42+
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
43+
import com.mongodb.client.model.CreateCollectionOptions;
4244
import com.mongodb.client.model.Filters;
4345
import com.mongodb.client.model.InsertManyOptions;
4446
import com.mongodb.client.model.Updates;
@@ -54,6 +56,7 @@
5456
import java.util.HashMap;
5557
import java.util.List;
5658
import java.util.Map;
59+
import java.util.Objects;
5760
import java.util.Set;
5861
import java.util.concurrent.TimeoutException;
5962
import java.util.function.BiConsumer;
@@ -304,6 +307,60 @@ void testStreamWithPublishFullDocumentOnly(final String collectionsConfigModeStr
304307
msg)));
305308
}
306309

310+
@Test
311+
void testStreamFullDocumentBeforeChange() {
312+
assumeTrue(supportsChangeStreams());
313+
assumeTrue(isAtLeastSixDotZero());
314+
315+
CollectionsConfig.Type collectionsConfigType = CollectionsConfig.Type.SINGLE;
316+
testIdentifier = computeTestIdentifier("FullDocBeforeChange", collectionsConfigType);
317+
318+
testStreamingQuery(
319+
createMongoConfig(collectionsConfigType)
320+
.withOption(
321+
ReadConfig.READ_PREFIX
322+
+ ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG,
323+
"required"),
324+
DOCUMENT_BEFORE_CHANGE_SCHEMA,
325+
withSourceDb(
326+
"Create the collection",
327+
(msg, db) -> db.createCollection(
328+
collectionName(),
329+
new CreateCollectionOptions()
330+
.changeStreamPreAndPostImagesOptions(
331+
new ChangeStreamPreAndPostImagesOptions(true)))),
332+
withSource("inserting 0-25", (msg, coll) -> coll.insertMany(createDocuments(0, 25))),
333+
withMemorySink("Expected to see 25 documents", (msg, ds) -> {
334+
List<Row> rows = ds.collectAsList();
335+
assertEquals(25, rows.size(), msg);
336+
assertTrue(
337+
rows.stream()
338+
.map(r -> r.getString(r.fieldIndex("fullDocumentBeforeChange")))
339+
.allMatch(Objects::isNull),
340+
msg);
341+
}),
342+
withSource(
343+
"Updating all",
344+
(msg, coll) ->
345+
coll.updateMany(new BsonDocument(), Updates.set("a", new BsonString("a")))),
346+
withMemorySink(
347+
"Expecting to see 50 documents and the last 25 have fullDocumentBeforeChange",
348+
(msg, ds) -> {
349+
List<Row> rows = ds.collectAsList();
350+
assertEquals(50, rows.size());
351+
assertTrue(
352+
rows.subList(0, 24).stream()
353+
.map(r -> r.getString(r.fieldIndex("fullDocumentBeforeChange")))
354+
.allMatch(Objects::isNull),
355+
msg);
356+
assertTrue(
357+
rows.subList(25, 50).stream()
358+
.map(r -> r.getString(r.fieldIndex("fullDocumentBeforeChange")))
359+
.noneMatch(Objects::isNull),
360+
msg);
361+
}));
362+
}
363+
307364
@ParameterizedTest
308365
@ValueSource(strings = {"SINGLE", "MULTIPLE", "ALL"})
309366
void testStreamPublishFullDocumentOnlyHandlesCollectionDrop(
@@ -707,6 +764,11 @@ void testReadsWithParseMode() {
707764
createStructField("clusterTime", DataTypes.StringType, false),
708765
createStructField("fullDocument", DataTypes.StringType, true)));
709766

767+
private static final StructType DOCUMENT_BEFORE_CHANGE_SCHEMA = createStructType(asList(
768+
createStructField("operationType", DataTypes.StringType, false),
769+
createStructField("clusterTime", DataTypes.StringType, false),
770+
createStructField("fullDocumentBeforeChange", DataTypes.StringType, true)));
771+
710772
@SafeVarargs
711773
private final void testStreamingQuery(
712774
final MongoConfig mongoConfig,

src/main/java/com/mongodb/spark/sql/connector/config/ReadConfig.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.util.Collections.unmodifiableList;
2424

2525
import com.mongodb.client.model.changestream.FullDocument;
26+
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
2627
import com.mongodb.spark.sql.connector.exceptions.ConfigException;
2728
import com.mongodb.spark.sql.connector.read.partitioner.Partitioner;
2829
import java.util.HashMap;
@@ -288,6 +289,35 @@ static ParseMode fromString(final String userParseMode) {
288289

289290
private static final String STREAM_LOOKUP_FULL_DOCUMENT_DEFAULT = FullDocument.DEFAULT.getValue();
290291

292+
/**
293+
* Streaming full document <strong>before change</strong> configuration.
294+
*
295+
* <p>Determines what to return as the pre-image of the document during replace, update, or delete operations
296+
* when using a MongoDB Change Stream.
297+
*
298+
* <p>Only applies if the MongoDB server is configured to capture pre-images.
299+
* See: <a href="https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre--and-post-images">
300+
* Change streams lookup full document before change</a> for further details.
301+
*
302+
* <p>Possible values:
303+
* <ul>
304+
* <li><strong>"default"</strong> - Uses the server's default behavior for the <code>fullDocumentBeforeChange</code> field.</li>
305+
* <li><strong>"off"</strong> - Do not include the pre-image of the document in the change stream event.</li>
306+
* <li><strong>"whenAvailable"</strong> - Include the pre-image of the modified document if available; otherwise, omit it.</li>
307+
* <li><strong>"required"</strong> - Include the pre-image, and raise an error if it is not available.</li>
308+
* </ul>
309+
*
310+
* <p>Configuration: {@value}
311+
*
312+
* <p>Default: "default" - the server's default behavior for the <code>fullDocumentBeforeChange</code> field.
313+
* @since 10.6
314+
*/
315+
public static final String STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG =
316+
"change.stream.lookup.full.document.before.change";
317+
318+
private static final String STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_DEFAULT =
319+
FullDocumentBeforeChange.DEFAULT.getValue();
320+
291321
enum StreamingStartupMode {
292322
LATEST,
293323
TIMESTAMP;
@@ -492,6 +522,19 @@ public FullDocument getStreamFullDocument() {
492522
}
493523
}
494524

525+
/** @return the stream full document before change configuration or 'default' if not set.
526+
* @since 10.6
527+
*/
528+
public FullDocumentBeforeChange getStreamFullDocumentBeforeChange() {
529+
try {
530+
return FullDocumentBeforeChange.fromString(getOrDefault(
531+
STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG,
532+
STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_DEFAULT));
533+
} catch (IllegalArgumentException e) {
534+
throw new ConfigException(e);
535+
}
536+
}
537+
495538
/** @return true if should drop any malformed rows */
496539
public boolean dropMalformed() {
497540
return parseMode == ParseMode.DROPMALFORMED;

src/main/java/com/mongodb/spark/sql/connector/read/MongoContinuousPartitionReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ private MongoChangeStreamCursor<BsonDocument> getCursor() {
196196
}
197197
changeStreamIterable
198198
.fullDocument(readConfig.getStreamFullDocument())
199+
.fullDocumentBeforeChange(readConfig.getStreamFullDocumentBeforeChange())
199200
.comment(readConfig.getComment());
200201
changeStreamIterable = lastOffset.applyToChangeStreamIterable(changeStreamIterable);
201202

src/main/java/com/mongodb/spark/sql/connector/read/MongoMicroBatchPartitionReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ private MongoChangeStreamCursor<BsonDocument> getCursor() {
185185
}
186186
changeStreamIterable
187187
.fullDocument(readConfig.getStreamFullDocument())
188+
.fullDocumentBeforeChange(readConfig.getStreamFullDocumentBeforeChange())
188189
.comment(readConfig.getComment());
189190
if (partition.getStartOffsetTimestamp().getTime() >= 0) {
190191
changeStreamIterable.startAtOperationTime(partition.getStartOffsetTimestamp());

src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import com.mongodb.WriteConcern;
3030
import com.mongodb.client.model.changestream.FullDocument;
31+
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
3132
import com.mongodb.spark.sql.connector.exceptions.ConfigException;
3233
import java.util.HashMap;
3334
import java.util.Map;
@@ -324,6 +325,29 @@ void testReadConfigStreamFullDocument() {
324325
assertEquals(readConfig.getStreamFullDocument(), FullDocument.UPDATE_LOOKUP);
325326
}
326327

328+
@Test
329+
void testReadConfigStreamFullDocumentBeforeChange() {
330+
ReadConfig readConfig = MongoConfig.readConfig(CONFIG_MAP);
331+
assertEquals(readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.DEFAULT);
332+
333+
readConfig =
334+
readConfig.withOption(ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "off");
335+
assertEquals(readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.OFF);
336+
337+
readConfig = readConfig.withOption(
338+
ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "whenAvailable");
339+
assertEquals(
340+
readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.WHEN_AVAILABLE);
341+
342+
readConfig = readConfig.withOption(
343+
ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "required");
344+
assertEquals(readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.REQUIRED);
345+
346+
readConfig = readConfig.withOption(
347+
ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "INVALID");
348+
assertThrows(ConfigException.class, readConfig::getStreamFullDocumentBeforeChange);
349+
}
350+
327351
@Test
328352
void testReadConfigSchemaHints() {
329353
ReadConfig readConfig =

0 commit comments

Comments
 (0)