diff --git a/docs/configuration.md b/docs/configuration.md index 3fa79822..c39ba7a9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -189,8 +189,8 @@ The following options control how the connector writes rows as documents to Mark | spark.marklogic.write.abortOnFailure | Whether the Spark job should abort if a batch fails to be written; defaults to `true`. | | spark.marklogic.write.batchSize | The number of documents written in a call to MarkLogic; defaults to 100. | | spark.marklogic.write.collections | Comma-delimited string of collection names to add to each document. | +| spark.marklogic.write.documentType | Forces a document type when MarkLogic does not recognize a URI extension; must be one of `JSON`, `XML`, or `TEXT`. | | spark.marklogic.write.permissions | Comma-delimited string of role names and capabilities to add to each document - e.g. role1,read,role2,update,role3,execute . | -| spark.marklogic.write.fileRows.documentType | Forces a document type when MarkLogic does not recognize a URI extension; must be one of `JSON`, `XML`, or `TEXT`. | | spark.marklogic.write.jsonRootName | As of 2.3.0, specifies a root field name when writing JSON documents based on arbitrary rows. | | spark.marklogic.write.temporalCollection | Name of a temporal collection to assign each document to. | | spark.marklogic.write.threadCount | The number of threads used across all partitions to send documents to MarkLogic; defaults to 4. | diff --git a/docs/reading-data/reading-files/generic-file-support.md b/docs/reading-data/reading-files/generic-file-support.md index 5838879a..89b1df7a 100644 --- a/docs/reading-data/reading-files/generic-file-support.md +++ b/docs/reading-data/reading-files/generic-file-support.md @@ -92,10 +92,10 @@ If you are writing files with extensions that MarkLogic does not recognize based you can force a document type for each file with an unrecognized extension: ``` - .option("spark.marklogic.write.fileRows.documentType", "JSON") + .option("spark.marklogic.write.documentType", "JSON") ``` -The `spark.marklogic.write.fileRows.documentType` option supports values of `JSON`, `XML`, and `TEXT`. +The `spark.marklogic.write.documentType` option supports values of `JSON`, `XML`, and `TEXT`. Please see [the guide on writing data](../../writing.md) for information on how "file rows" can then be written to MarkLogic as documents. diff --git a/docs/writing.md b/docs/writing.md index d9625c2b..99dc9276 100644 --- a/docs/writing.md +++ b/docs/writing.md @@ -49,7 +49,7 @@ The URI can then be further adjusted as described in the "Controlling document U This feature allows for ingesting files of any type. The MarkLogic REST API will [determine the document type](https://docs.marklogic.com/guide/rest-dev/intro#id_53367) based on the URI extension, if MarkLogic recognizes it. If MarkLogic does not recognize the extension, and you wish to force a document type on each of -the documents, you can set the `spark.marklogic.write.fileRows.documentType` option to one of `XML`, `JSON`, or `TEXT`. +the documents, you can set the `spark.marklogic.write.documentType` option to one of `XML`, `JSON`, or `TEXT`. ### Writing document rows diff --git a/gradle.properties b/gradle.properties index 8f8af585..151d37de 100644 --- a/gradle.properties +++ b/gradle.properties @@ -25,3 +25,6 @@ org.gradle.java.installations.paths=/users/ml/builder/java/jdk-11.0.2,/home/buil semaphoreHost=changeme semaphoreApiKey=changeme semaphorePath=/cls/dev/cs1/ + +# Bumping this up as with Spark 4, Gradle often runs into heap space issues when trying to build the project. +org.gradle.jvmargs=-Xmx4g diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/Options.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/Options.java index c3160f16..fd859228 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/Options.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/Options.java @@ -355,16 +355,6 @@ public abstract class Options { public static final String WRITE_GRAPH = "spark.marklogic.write.graph"; public static final String WRITE_GRAPH_OVERRIDE = "spark.marklogic.write.graphOverride"; - /** - * For writing rows adhering to Spark's binaryFile schema - https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html . - * - * @deprecated since 2.3.0 - */ - @Deprecated(since = "2.3.0", forRemoval = true) - // We don't need Sonar to remind us of this deprecation. - @SuppressWarnings("java:S1133") - public static final String WRITE_FILE_ROWS_DOCUMENT_TYPE = "spark.marklogic.write.fileRows.documentType"; - // Forces a document type when writing rows corresponding to our document row schema. Used when the URI extension // does not result in MarkLogic choosing the correct document type. public static final String WRITE_DOCUMENT_TYPE = "spark.marklogic.write.documentType"; diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/FileRowConverter.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/FileRowConverter.java index e707e3ee..3dbbe854 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/FileRowConverter.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/FileRowConverter.java @@ -19,7 +19,7 @@ import java.util.stream.Stream; /** - * Knows how to build a document from a row corresponding to our {@code FileRowSchema}. + * Knows how to build a document from a row corresponding to a row from Spark's binaryFile data source. */ class FileRowConverter implements RowConverter { @@ -47,16 +47,10 @@ public Iterator getRemainingDocumentInputs() { return Stream.empty().iterator(); } - @SuppressWarnings({"deprecation", "removal"}) private void forceFormatIfNecessary(BytesHandle content) { Format format = writeContext.getDocumentFormat(); if (format != null) { content.withFormat(format); - } else { - format = writeContext.getDeprecatedFileRowsDocumentFormat(); - if (format != null) { - content.withFormat(format); - } } } diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/WriteContext.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/WriteContext.java index 170650c2..28ea0913 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/WriteContext.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/WriteContext.java @@ -181,28 +181,6 @@ public Format getDocumentFormat() { return null; } - /** - * @deprecated since 2.3.0; users should use getDocumentFormat instead. - */ - @Deprecated(since = "2.3.0") - // We don't need Sonar to remind us of this deprecation. - @SuppressWarnings({"java:S1133", "removal"}) - Format getDeprecatedFileRowsDocumentFormat() { - final String deprecatedOption = Options.WRITE_FILE_ROWS_DOCUMENT_TYPE; - if (hasOption(deprecatedOption)) { - String value = getStringOption(deprecatedOption); - Objects.requireNonNull(value); - try { - return Format.valueOf(value.toUpperCase()); - } catch (IllegalArgumentException e) { - String message = "Invalid value for %s: %s; must be one of 'JSON', 'XML', or 'TEXT'."; - String optionAlias = getOptionNameForMessage(deprecatedOption); - throw new ConnectorException(String.format(message, optionAlias, value)); - } - } - return null; - } - /** * The URI template approach will typically be used with rows with an "arbitrary" schema where each column value * may be useful in constructing a URI. diff --git a/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties b/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties index f3997e1d..7f0ad14a 100644 --- a/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties +++ b/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties @@ -1,3 +1,4 @@ +# Copyright (c) 2023-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. # Defines various messages for the connector. Intended to be inherited and overridden by the ETL tool via # marklogic-spark-messages_en.properties, where each option name can be associated with a CLI option in the ETL tool. spark.marklogic.client.uri= @@ -7,7 +8,6 @@ spark.marklogic.read.numPartitions= spark.marklogic.read.noOpticQuery=No Optic query found; must define spark.marklogic.read.opticQuery spark.marklogic.write.batchSize= spark.marklogic.write.documentType= -spark.marklogic.write.fileRows.documentType= spark.marklogic.write.graph= spark.marklogic.write.graphOverride= spark.marklogic.write.jsonRootName= diff --git a/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java b/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java index 9afce5d7..21072809 100644 --- a/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java +++ b/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java @@ -315,11 +315,21 @@ void transformThrowsError() { .load(); SparkException ex = assertThrows(SparkException.class, dataset::count); - assertTrue(ex.getMessage().contains("This is an intentional error for testing purposes."), + String message = ex.getMessage(); + + if (message.contains("java.io.InterruptedIOException")) { + // This test can sometimes fail due to a timeout while being run by Jenkins. No idea why that happens. + // In the event that this happens, we try again. + logger.warn("Unexpected timeout error, will try again: {}", message); + ex = assertThrows(SparkException.class, dataset::count); + message = ex.getMessage(); + } + + assertTrue(message.contains("This is an intentional error for testing purposes."), "When the transform throws an error, our connector throws a ConnectorException, but Spark seems to wrap " + "its stacktrace into a SparkException, such that we can't access the original ConnectorException " + "object. But the transform error should be in the error message. " + - "Actual message: " + ex.getMessage()); + "Actual message: " + message); } private DataFrameReader startRead() { diff --git a/marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/WriteFileRowsTest.java b/marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/WriteFileRowsTest.java index 6bbc0e4c..cdff43f2 100644 --- a/marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/WriteFileRowsTest.java +++ b/marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/WriteFileRowsTest.java @@ -114,7 +114,6 @@ void uriTemplate() { } @Test - @Deprecated void forceDocumentType() { newSparkSession() .read() @@ -124,7 +123,7 @@ void forceDocumentType() { .format(CONNECTOR_IDENTIFIER) .options(defaultWriteOptions()) // Verifies that the value gets capitalized. - .option(Options.WRITE_FILE_ROWS_DOCUMENT_TYPE, "jSoN") + .option(Options.WRITE_DOCUMENT_TYPE, "jSoN") .option(Options.WRITE_COLLECTIONS, "json-unrecognized-extension") .mode(SaveMode.Append) .save(); @@ -138,7 +137,6 @@ void forceDocumentType() { } @Test - @Deprecated void invalidDocumentType() { DataFrameWriter writer = newSparkSession() .read() @@ -147,13 +145,13 @@ void invalidDocumentType() { .write() .format(CONNECTOR_IDENTIFIER) .option(Options.CLIENT_URI, makeClientUri()) - .option(Options.WRITE_FILE_ROWS_DOCUMENT_TYPE, "not valid") + .option(Options.WRITE_DOCUMENT_TYPE, "not valid") .mode(SaveMode.Append); SparkException ex = assertThrows(SparkException.class, writer::save); assertTrue(ex.getCause() instanceof ConnectorException); ConnectorException ce = (ConnectorException) ex.getCause(); - assertEquals("Invalid value for " + Options.WRITE_FILE_ROWS_DOCUMENT_TYPE + ": not valid; " + + assertEquals("Invalid value for " + Options.WRITE_DOCUMENT_TYPE + ": not valid; " + "must be one of 'JSON', 'XML', or 'TEXT'.", ce.getMessage()); }