diff --git a/src/integrationTest/java/com/mongodb/spark/sql/connector/write/MongoSparkConnectorWriteTest.java b/src/integrationTest/java/com/mongodb/spark/sql/connector/write/MongoSparkConnectorWriteTest.java index 8353c924..838f6c1e 100644 --- a/src/integrationTest/java/com/mongodb/spark/sql/connector/write/MongoSparkConnectorWriteTest.java +++ b/src/integrationTest/java/com/mongodb/spark/sql/connector/write/MongoSparkConnectorWriteTest.java @@ -17,10 +17,12 @@ package com.mongodb.spark.sql.connector.write; import static com.mongodb.spark.sql.connector.config.MongoConfig.COMMENT_CONFIG; +import static com.mongodb.spark.sql.connector.config.WriteConfig.IGNORE_DUPLICATES_ON_INSERT_CONFIG; import static com.mongodb.spark.sql.connector.interop.JavaScala.asJava; import static java.util.Arrays.asList; import static org.apache.spark.sql.types.DataTypes.createStructField; import static org.apache.spark.sql.types.DataTypes.createStructType; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -29,6 +31,7 @@ import com.mongodb.client.model.CountOptions; import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.IndexOptions; import com.mongodb.client.model.TimeSeriesGranularity; import com.mongodb.client.model.TimeSeriesOptions; import com.mongodb.spark.sql.connector.config.MongoConfig; @@ -38,6 +41,7 @@ import java.util.List; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import org.apache.spark.SparkException; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; @@ -111,6 +115,83 @@ void testIgnoreNullValues() { dataWithoutNulls.stream().map(BsonDocument::parse).collect(Collectors.toList())); } + @Test + void testIgnoreDuplicates() { + // Given + List dataWithDuplicates = asList( + "{'_id': 1, 'name': 'Bilbo Baggins'}", + "{'_id': 2, 'name': 'Gandalf'}", + "{'_id': 1, 'name': 'Bilbo Baggins'}", + "{'_id': 3, 'name': 'Thorin'}", + "{'_id': 2, 'name': 'Gandalf'}", + "{'_id': 4, 'name': 'Balin'}", + "{'_id': 3, 'name': 'Thorin'}", + "{'_id': 5, 'name': 'Kíli'}", + "{'_id': 5, 'name': 'Balin'}", + "{'_id': 6, 'name': 'Bombur'}"); + + List dataWithoutDuplicates = asList( + "{'_id': 1, 'name': 'Bilbo Baggins'}", + "{'_id': 2, 'name': 'Gandalf'}", + "{'_id': 3, 'name': 'Thorin'}", + "{'_id': 4, 'name': 'Balin'}", + "{'_id': 5, 'name': 'Kíli'}", + "{'_id': 6, 'name': 'Bombur'}"); + + SparkSession spark = getOrCreateSparkSession(); + + DataFrameWriter dfw = spark + .read() + .json(spark.createDataset(dataWithDuplicates, Encoders.STRING())) + .write() + .format("mongodb") + .option(WriteConfig.MAX_BATCH_SIZE_CONFIG, 4) + .mode("Append") + .option(WriteConfig.OPERATION_TYPE_CONFIG, "insert") + .option(IGNORE_DUPLICATES_ON_INSERT_CONFIG, "true"); + + // Then using ordered inserts with ignore duplicates it throws an error + assertThrows(SparkException.class, dfw::save); + + // Then using unordered inserts with ignore duplicates it does not throw + getCollection().deleteMany(BsonDocument.parse("{}")); + assertDoesNotThrow( + () -> dfw.option(WriteConfig.ORDERED_BULK_OPERATION_CONFIG, false).save()); + assertCollection( + dataWithoutDuplicates.stream().map(BsonDocument::parse).collect(Collectors.toList())); + + // Given a collection with unique names index + getCollection().deleteMany(BsonDocument.parse("{}")); + getCollection().createIndex(BsonDocument.parse("{name: 1}"), new IndexOptions().unique(true)); + + List dataWithDuplicatesNoIds = asList( + "{'name': 'Bilbo Baggins'}", + "{'name': 'Gandalf'}", + "{'name': 'Bilbo Baggins'}", + "{'name': 'Thorin'}", + "{'name': 'Gandalf'}", + "{'name': 'Balin'}", + "{'name': 'Thorin'}", + "{'name': 'Kíli'}", + "{'name': 'Balin'}", + "{'name': 'Bombur'}"); + + DataFrameWriter dfwNoIds = spark + .read() + .json(spark.createDataset(dataWithDuplicatesNoIds, Encoders.STRING())) + .write() + .format("mongodb") + .option(WriteConfig.MAX_BATCH_SIZE_CONFIG, 4) + .option(WriteConfig.OPERATION_TYPE_CONFIG, "insert") + .option(WriteConfig.ORDERED_BULK_OPERATION_CONFIG, false) + .option(IGNORE_DUPLICATES_ON_INSERT_CONFIG, "true") + .mode("Append"); + + // Then using unordered inserts with ignore duplicates does not throw + assertDoesNotThrow(() -> dfwNoIds.save()); + assertEquals(6, getCollection().countDocuments()); + } + @Test void testTimeseriesSupport() { assumeTrue(isAtLeastFiveDotZero()); diff --git a/src/main/java/com/mongodb/spark/sql/connector/config/WriteConfig.java b/src/main/java/com/mongodb/spark/sql/connector/config/WriteConfig.java index e5d0082b..658f6341 100644 --- a/src/main/java/com/mongodb/spark/sql/connector/config/WriteConfig.java +++ b/src/main/java/com/mongodb/spark/sql/connector/config/WriteConfig.java @@ -243,6 +243,25 @@ public String toString() { private static final boolean IGNORE_NULL_VALUES_DEFAULT = false; + /** + * Ignore duplicate values when inserting. + * + *

Note: requires unordered inserts to be configured otherwise this configuration is ignored.

+ * + *

Configuration: {@value} + * + *

Default: {@value IGNORE_DUPLICATES_ON_INSERT_DEFAULT} + * + *

If true, and doing unordered bulk inserts duplicate key exceptions will be ignored. + * + * @since 10.6 + * @see #getOperationType() + * @see #isOrdered() + */ + public static final String IGNORE_DUPLICATES_ON_INSERT_CONFIG = "ignoreDuplicatesOnInsert"; + + private static final boolean IGNORE_DUPLICATES_ON_INSERT_DEFAULT = false; + private final WriteConcern writeConcern; private final OperationType operationType; @@ -319,6 +338,29 @@ public boolean ignoreNullValues() { return getBoolean(IGNORE_NULL_VALUES_CONFIG, IGNORE_NULL_VALUES_DEFAULT); } + /** + * @return true if ignoring duplicates on insert and operation mode is 'insert' and the bulk write is unordered. + * @since 10.6 + * @see #getOperationType() + * @see #isOrdered() + */ + public boolean ignoreDuplicatesOnInsert() { + boolean ignoreDuplicatesOnInsert = + getBoolean(IGNORE_DUPLICATES_ON_INSERT_CONFIG, IGNORE_DUPLICATES_ON_INSERT_DEFAULT); + boolean allowIgnoreDuplicatesOnInserts = + ignoreDuplicatesOnInsert && !isOrdered() && getOperationType() == OperationType.INSERT; + if (ignoreDuplicatesOnInsert && !allowIgnoreDuplicatesOnInserts) { + LOGGER.warn( + "{}: is configured but requires unordered inserts. Current settings: {} = {} and {} = {}.", + IGNORE_DUPLICATES_ON_INSERT_CONFIG, + ORDERED_BULK_OPERATION_CONFIG, + isOrdered(), + OPERATION_TYPE_CONFIG, + operationType); + } + return allowIgnoreDuplicatesOnInserts; + } + @Override CollectionsConfig parseAndValidateCollectionsConfig() { CollectionsConfig collectionsConfig = super.parseAndValidateCollectionsConfig(); diff --git a/src/main/java/com/mongodb/spark/sql/connector/write/MongoDataWriter.java b/src/main/java/com/mongodb/spark/sql/connector/write/MongoDataWriter.java index 12325e68..67e7eabc 100644 --- a/src/main/java/com/mongodb/spark/sql/connector/write/MongoDataWriter.java +++ b/src/main/java/com/mongodb/spark/sql/connector/write/MongoDataWriter.java @@ -18,6 +18,7 @@ import static java.lang.String.format; +import com.mongodb.MongoBulkWriteException; import com.mongodb.client.MongoClient; import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.InsertOneModel; @@ -186,18 +187,32 @@ private void releaseClient() { } private void writeModels() { - if (writeModelList.size() > 0) { + if (!writeModelList.isEmpty()) { LOGGER.debug( "Writing batch of {} operations to: {}. PartitionId: {}, TaskId: {}.", writeModelList.size(), writeConfig.getNamespace().getFullName(), partitionId, taskId); - getMongoClient() - .getDatabase(writeConfig.getDatabaseName()) - .getCollection(writeConfig.getCollectionName(), BsonDocument.class) - .withWriteConcern(writeConfig.getWriteConcern()) - .bulkWrite(writeModelList, bulkWriteOptions); + + try { + getMongoClient() + .getDatabase(writeConfig.getDatabaseName()) + .getCollection(writeConfig.getCollectionName(), BsonDocument.class) + .withWriteConcern(writeConfig.getWriteConcern()) + .bulkWrite(writeModelList, bulkWriteOptions); + } catch (MongoBulkWriteException ex) { + if (writeConfig.ignoreDuplicatesOnInsert() + && ex.getWriteErrors().stream().allMatch(e -> e.getCode() == 11000)) { + LOGGER.info( + "Writing batch. PartitionId: {}, TaskId: {}. Duplicates found on insert and they were ignored as requested: {}", + partitionId, + taskId, + ex.getWriteErrors()); + } else { + throw ex; + } + } writeModelList.clear(); } } diff --git a/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java b/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java index 22e8ed20..8fd3fe9e 100644 --- a/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java +++ b/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java @@ -159,6 +159,30 @@ void testWriteConfigConvertJson() { WriteConfig.ConvertJson.OBJECT_OR_ARRAY_ONLY); } + @Test + void testWriteConfigIgnoreDuplicatesOnInsert() { + WriteConfig writeConfig = MongoConfig.createConfig(CONFIG_MAP).toWriteConfig(); + // Default + assertFalse(writeConfig.ignoreDuplicatesOnInsert()); + assertFalse( + writeConfig.withOption("ignoreDuplicatesOnInsert", "False").ignoreDuplicatesOnInsert()); + + // Missing other required settings + assertFalse( + writeConfig.withOption("ignoreDuplicatesOnInsert", "True").ignoreDuplicatesOnInsert()); + assertFalse(writeConfig + .withOption("ignoreDuplicatesOnInsert", "True") + .withOption("ordered", "False") + .ignoreDuplicatesOnInsert()); + + // With all required settings + assertTrue(writeConfig + .withOption("ignoreDuplicatesOnInsert", "True") + .withOption("ordered", "False") + .withOption("operationType", "insert") + .ignoreDuplicatesOnInsert()); + } + @Test void testMongoConfigOptionsParsing() { MongoConfig mongoConfig = MongoConfig.readConfig(OPTIONS_CONFIG_MAP);