Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package com.mongodb.spark.sql.connector.write;

import static com.mongodb.spark.sql.connector.config.MongoConfig.COMMENT_CONFIG;
import static com.mongodb.spark.sql.connector.config.WriteConfig.IGNORE_DUPLICATES_ON_INSERT_CONFIG;
import static com.mongodb.spark.sql.connector.interop.JavaScala.asJava;
import static java.util.Arrays.asList;
import static org.apache.spark.sql.types.DataTypes.createStructField;
import static org.apache.spark.sql.types.DataTypes.createStructType;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -29,6 +31,7 @@

import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.TimeSeriesGranularity;
import com.mongodb.client.model.TimeSeriesOptions;
import com.mongodb.spark.sql.connector.config.MongoConfig;
Expand All @@ -38,6 +41,7 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.spark.SparkException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -111,6 +115,83 @@ void testIgnoreNullValues() {
dataWithoutNulls.stream().map(BsonDocument::parse).collect(Collectors.toList()));
}

@Test
void testIgnoreDuplicates() {
// Given
List<String> dataWithDuplicates = asList(
"{'_id': 1, 'name': 'Bilbo Baggins'}",
"{'_id': 2, 'name': 'Gandalf'}",
"{'_id': 1, 'name': 'Bilbo Baggins'}",
"{'_id': 3, 'name': 'Thorin'}",
"{'_id': 2, 'name': 'Gandalf'}",
"{'_id': 4, 'name': 'Balin'}",
"{'_id': 3, 'name': 'Thorin'}",
"{'_id': 5, 'name': 'Kíli'}",
"{'_id': 5, 'name': 'Balin'}",
"{'_id': 6, 'name': 'Bombur'}");

List<String> dataWithoutDuplicates = asList(
"{'_id': 1, 'name': 'Bilbo Baggins'}",
"{'_id': 2, 'name': 'Gandalf'}",
"{'_id': 3, 'name': 'Thorin'}",
"{'_id': 4, 'name': 'Balin'}",
"{'_id': 5, 'name': 'Kíli'}",
"{'_id': 6, 'name': 'Bombur'}");

SparkSession spark = getOrCreateSparkSession();

DataFrameWriter<Row> dfw = spark
.read()
.json(spark.createDataset(dataWithDuplicates, Encoders.STRING()))
.write()
.format("mongodb")
.option(WriteConfig.MAX_BATCH_SIZE_CONFIG, 4)
.mode("Append")
.option(WriteConfig.OPERATION_TYPE_CONFIG, "insert")
.option(IGNORE_DUPLICATES_ON_INSERT_CONFIG, "true");

// Then using ordered inserts with ignore duplicates it throws an error
assertThrows(SparkException.class, dfw::save);

// Then using unordered inserts with ignore duplicates it does not throw
getCollection().deleteMany(BsonDocument.parse("{}"));
assertDoesNotThrow(
() -> dfw.option(WriteConfig.ORDERED_BULK_OPERATION_CONFIG, false).save());
assertCollection(
dataWithoutDuplicates.stream().map(BsonDocument::parse).collect(Collectors.toList()));

// Given a collection with unique names index
getCollection().deleteMany(BsonDocument.parse("{}"));
getCollection().createIndex(BsonDocument.parse("{name: 1}"), new IndexOptions().unique(true));

List<String> dataWithDuplicatesNoIds = asList(
"{'name': 'Bilbo Baggins'}",
"{'name': 'Gandalf'}",
"{'name': 'Bilbo Baggins'}",
"{'name': 'Thorin'}",
"{'name': 'Gandalf'}",
"{'name': 'Balin'}",
"{'name': 'Thorin'}",
"{'name': 'Kíli'}",
"{'name': 'Balin'}",
"{'name': 'Bombur'}");

DataFrameWriter<Row> dfwNoIds = spark
.read()
.json(spark.createDataset(dataWithDuplicatesNoIds, Encoders.STRING()))
.write()
.format("mongodb")
.option(WriteConfig.MAX_BATCH_SIZE_CONFIG, 4)
.option(WriteConfig.OPERATION_TYPE_CONFIG, "insert")
.option(WriteConfig.ORDERED_BULK_OPERATION_CONFIG, false)
.option(IGNORE_DUPLICATES_ON_INSERT_CONFIG, "true")
.mode("Append");

// Then using unordered inserts with ignore duplicates does not throw
assertDoesNotThrow(() -> dfwNoIds.save());
assertEquals(6, getCollection().countDocuments());
}

@Test
void testTimeseriesSupport() {
assumeTrue(isAtLeastFiveDotZero());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,25 @@ public String toString() {

private static final boolean IGNORE_NULL_VALUES_DEFAULT = false;

/**
* Ignore duplicate values when inserting.
*
* <p><strong>Note:</strong> requires <em></em>unordered inserts</em> to be configured otherwise this configuration is ignored.</p>
*
* <p>Configuration: {@value}
*
* <p>Default: {@value IGNORE_DUPLICATES_ON_INSERT_DEFAULT}
*
* <p>If true, and doing unordered bulk inserts duplicate key exceptions will be ignored.
*
* @since 10.6
* @see #getOperationType()
* @see #isOrdered()
*/
public static final String IGNORE_DUPLICATES_ON_INSERT_CONFIG = "ignoreDuplicatesOnInsert";

private static final boolean IGNORE_DUPLICATES_ON_INSERT_DEFAULT = false;

private final WriteConcern writeConcern;
private final OperationType operationType;

Expand Down Expand Up @@ -319,6 +338,29 @@ public boolean ignoreNullValues() {
return getBoolean(IGNORE_NULL_VALUES_CONFIG, IGNORE_NULL_VALUES_DEFAULT);
}

/**
* @return the true if ignoring duplicates on insert and operation mode is 'insert' and the bulk write is unordered.
* @since 10.6
* @see #getOperationType()
* @see #isOrdered()
*/
public boolean ignoreDuplicatesOnInsert() {
boolean ignoreDuplicatesOnInsert =
getBoolean(IGNORE_DUPLICATES_ON_INSERT_CONFIG, IGNORE_DUPLICATES_ON_INSERT_DEFAULT);
boolean allowIgnoreDuplicatesOnInserts =
ignoreDuplicatesOnInsert && !isOrdered() && getOperationType() == OperationType.INSERT;
if (ignoreDuplicatesOnInsert && !allowIgnoreDuplicatesOnInserts) {
LOGGER.warn(
"{}: is configured but requires unordered inserts. Current settings: {} = {} and {} = {}.",
IGNORE_DUPLICATES_ON_INSERT_CONFIG,
ORDERED_BULK_OPERATION_CONFIG,
isOrdered(),
OPERATION_TYPE_CONFIG,
operationType);
}
return allowIgnoreDuplicatesOnInserts;
}

@Override
CollectionsConfig parseAndValidateCollectionsConfig() {
CollectionsConfig collectionsConfig = super.parseAndValidateCollectionsConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static java.lang.String.format;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
Expand Down Expand Up @@ -186,18 +187,32 @@ private void releaseClient() {
}

private void writeModels() {
if (writeModelList.size() > 0) {
if (!writeModelList.isEmpty()) {
LOGGER.debug(
"Writing batch of {} operations to: {}. PartitionId: {}, TaskId: {}.",
writeModelList.size(),
writeConfig.getNamespace().getFullName(),
partitionId,
taskId);
getMongoClient()
.getDatabase(writeConfig.getDatabaseName())
.getCollection(writeConfig.getCollectionName(), BsonDocument.class)
.withWriteConcern(writeConfig.getWriteConcern())
.bulkWrite(writeModelList, bulkWriteOptions);

try {
getMongoClient()
.getDatabase(writeConfig.getDatabaseName())
.getCollection(writeConfig.getCollectionName(), BsonDocument.class)
.withWriteConcern(writeConfig.getWriteConcern())
.bulkWrite(writeModelList, bulkWriteOptions);
} catch (MongoBulkWriteException ex) {
if (writeConfig.ignoreDuplicatesOnInsert()
&& ex.getWriteErrors().stream().allMatch(e -> e.getCode() == 11000)) {
LOGGER.info(
"Writing batch. PartitionId: {}, TaskId: {}. Duplicates found on insert and they were ignored as requested: {}",
partitionId,
taskId,
ex.getWriteErrors());
} else {
throw ex;
}
}
writeModelList.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,30 @@ void testWriteConfigConvertJson() {
WriteConfig.ConvertJson.OBJECT_OR_ARRAY_ONLY);
}

@Test
void testWriteConfigIgnoreDuplicatesOnInsert() {
WriteConfig writeConfig = MongoConfig.createConfig(CONFIG_MAP).toWriteConfig();
// Default
assertFalse(writeConfig.ignoreDuplicatesOnInsert());
assertFalse(
writeConfig.withOption("ignoreDuplicatesOnInsert", "False").ignoreDuplicatesOnInsert());

// Missing other required settings
assertFalse(
writeConfig.withOption("ignoreDuplicatesOnInsert", "True").ignoreDuplicatesOnInsert());
assertFalse(writeConfig
.withOption("ignoreDuplicatesOnInsert", "True")
.withOption("ordered", "False")
.ignoreDuplicatesOnInsert());

// With all required settings
assertTrue(writeConfig
.withOption("ignoreDuplicatesOnInsert", "True")
.withOption("ordered", "False")
.withOption("operationType", "insert")
.ignoreDuplicatesOnInsert());
}

@Test
void testMongoConfigOptionsParsing() {
MongoConfig mongoConfig = MongoConfig.readConfig(OPTIONS_CONFIG_MAP);
Expand Down