Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ The following options control how the connector writes rows as documents to Mark
| spark.marklogic.write.abortOnFailure | Whether the Spark job should abort if a batch fails to be written; defaults to `true`. |
| spark.marklogic.write.batchSize | The number of documents written in a call to MarkLogic; defaults to 100. |
| spark.marklogic.write.collections | Comma-delimited string of collection names to add to each document. |
| spark.marklogic.write.documentType | Forces a document type when MarkLogic does not recognize a URI extension; must be one of `JSON`, `XML`, or `TEXT`. |
| spark.marklogic.write.permissions | Comma-delimited string of role names and capabilities to add to each document - e.g. role1,read,role2,update,role3,execute . |
| spark.marklogic.write.fileRows.documentType | Forces a document type when MarkLogic does not recognize a URI extension; must be one of `JSON`, `XML`, or `TEXT`. |
| spark.marklogic.write.jsonRootName | As of 2.3.0, specifies a root field name when writing JSON documents based on arbitrary rows. |
| spark.marklogic.write.temporalCollection | Name of a temporal collection to assign each document to. |
| spark.marklogic.write.threadCount | The number of threads used across all partitions to send documents to MarkLogic; defaults to 4. |
Expand Down
4 changes: 2 additions & 2 deletions docs/reading-data/reading-files/generic-file-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ If you are writing files with extensions that MarkLogic does not recognize based
you can force a document type for each file with an unrecognized extension:

```
.option("spark.marklogic.write.fileRows.documentType", "JSON")
.option("spark.marklogic.write.documentType", "JSON")
```

The `spark.marklogic.write.fileRows.documentType` option supports values of `JSON`, `XML`, and `TEXT`.
The `spark.marklogic.write.documentType` option supports values of `JSON`, `XML`, and `TEXT`.

Please see [the guide on writing data](../../writing.md) for information on how "file rows" can then be written to
MarkLogic as documents.
2 changes: 1 addition & 1 deletion docs/writing.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The URI can then be further adjusted as described in the "Controlling document U
This feature allows for ingesting files of any type. The MarkLogic REST API will
[determine the document type](https://docs.marklogic.com/guide/rest-dev/intro#id_53367) based on the URI extension, if
MarkLogic recognizes it. If MarkLogic does not recognize the extension, and you wish to force a document type on each of
the documents, you can set the `spark.marklogic.write.fileRows.documentType` option to one of `XML`, `JSON`, or `TEXT`.
the documents, you can set the `spark.marklogic.write.documentType` option to one of `XML`, `JSON`, or `TEXT`.

### Writing document rows

Expand Down
3 changes: 3 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ org.gradle.java.installations.paths=/users/ml/builder/java/jdk-11.0.2,/home/buil
semaphoreHost=changeme
semaphoreApiKey=changeme
semaphorePath=/cls/dev/cs1/

# Bumping this up as with Spark 4, Gradle often runs into heap space issues when trying to build the project.
org.gradle.jvmargs=-Xmx4g
Original file line number Diff line number Diff line change
Expand Up @@ -355,16 +355,6 @@ public abstract class Options {
public static final String WRITE_GRAPH = "spark.marklogic.write.graph";
public static final String WRITE_GRAPH_OVERRIDE = "spark.marklogic.write.graphOverride";

/**
* For writing rows adhering to Spark's binaryFile schema - https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html .
*
* @deprecated since 2.3.0
*/
@Deprecated(since = "2.3.0", forRemoval = true)
// We don't need Sonar to remind us of this deprecation.
@SuppressWarnings("java:S1133")
public static final String WRITE_FILE_ROWS_DOCUMENT_TYPE = "spark.marklogic.write.fileRows.documentType";

// Forces a document type when writing rows corresponding to our document row schema. Used when the URI extension
// does not result in MarkLogic choosing the correct document type.
public static final String WRITE_DOCUMENT_TYPE = "spark.marklogic.write.documentType";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.stream.Stream;

/**
* Knows how to build a document from a row corresponding to our {@code FileRowSchema}.
* Knows how to build a document from a row corresponding to a row from Spark's binaryFile data source.
*/
class FileRowConverter implements RowConverter {

Expand Down Expand Up @@ -47,16 +47,10 @@ public Iterator<DocumentInputs> getRemainingDocumentInputs() {
return Stream.<DocumentInputs>empty().iterator();
}

@SuppressWarnings({"deprecation", "removal"})
private void forceFormatIfNecessary(BytesHandle content) {
Format format = writeContext.getDocumentFormat();
if (format != null) {
content.withFormat(format);
} else {
format = writeContext.getDeprecatedFileRowsDocumentFormat();
if (format != null) {
content.withFormat(format);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,28 +181,6 @@ public Format getDocumentFormat() {
return null;
}

/**
* @deprecated since 2.3.0; users should use getDocumentFormat instead.
*/
@Deprecated(since = "2.3.0")
// We don't need Sonar to remind us of this deprecation.
@SuppressWarnings({"java:S1133", "removal"})
Format getDeprecatedFileRowsDocumentFormat() {
final String deprecatedOption = Options.WRITE_FILE_ROWS_DOCUMENT_TYPE;
if (hasOption(deprecatedOption)) {
String value = getStringOption(deprecatedOption);
Objects.requireNonNull(value);
try {
return Format.valueOf(value.toUpperCase());
} catch (IllegalArgumentException e) {
String message = "Invalid value for %s: %s; must be one of 'JSON', 'XML', or 'TEXT'.";
String optionAlias = getOptionNameForMessage(deprecatedOption);
throw new ConnectorException(String.format(message, optionAlias, value));
}
}
return null;
}

/**
* The URI template approach will typically be used with rows with an "arbitrary" schema where each column value
* may be useful in constructing a URI.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copyright (c) 2023-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
# Defines various messages for the connector. Intended to be inherited and overridden by the ETL tool via
# marklogic-spark-messages_en.properties, where each option name can be associated with a CLI option in the ETL tool.
spark.marklogic.client.uri=
Expand All @@ -7,7 +8,6 @@ spark.marklogic.read.numPartitions=
spark.marklogic.read.noOpticQuery=No Optic query found; must define spark.marklogic.read.opticQuery
spark.marklogic.write.batchSize=
spark.marklogic.write.documentType=
spark.marklogic.write.fileRows.documentType=
spark.marklogic.write.graph=
spark.marklogic.write.graphOverride=
spark.marklogic.write.jsonRootName=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,21 @@ void transformThrowsError() {
.load();

SparkException ex = assertThrows(SparkException.class, dataset::count);
assertTrue(ex.getMessage().contains("This is an intentional error for testing purposes."),
String message = ex.getMessage();

if (message.contains("java.io.InterruptedIOException")) {
// This test can sometimes fail due to a timeout while being run by Jenkins. No idea why that happens.
// In the event that this happens, we try again.
logger.warn("Unexpected timeout error, will try again: {}", message);
ex = assertThrows(SparkException.class, dataset::count);
message = ex.getMessage();
}

assertTrue(message.contains("This is an intentional error for testing purposes."),
"When the transform throws an error, our connector throws a ConnectorException, but Spark seems to wrap " +
"its stacktrace into a SparkException, such that we can't access the original ConnectorException " +
"object. But the transform error should be in the error message. " +
"Actual message: " + ex.getMessage());
"Actual message: " + message);
}

private DataFrameReader startRead() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ void uriTemplate() {
}

@Test
@Deprecated
void forceDocumentType() {
newSparkSession()
.read()
Expand All @@ -124,7 +123,7 @@ void forceDocumentType() {
.format(CONNECTOR_IDENTIFIER)
.options(defaultWriteOptions())
// Verifies that the value gets capitalized.
.option(Options.WRITE_FILE_ROWS_DOCUMENT_TYPE, "jSoN")
.option(Options.WRITE_DOCUMENT_TYPE, "jSoN")
.option(Options.WRITE_COLLECTIONS, "json-unrecognized-extension")
.mode(SaveMode.Append)
.save();
Expand All @@ -138,7 +137,6 @@ void forceDocumentType() {
}

@Test
@Deprecated
void invalidDocumentType() {
DataFrameWriter writer = newSparkSession()
.read()
Expand All @@ -147,13 +145,13 @@ void invalidDocumentType() {
.write()
.format(CONNECTOR_IDENTIFIER)
.option(Options.CLIENT_URI, makeClientUri())
.option(Options.WRITE_FILE_ROWS_DOCUMENT_TYPE, "not valid")
.option(Options.WRITE_DOCUMENT_TYPE, "not valid")
.mode(SaveMode.Append);

SparkException ex = assertThrows(SparkException.class, writer::save);
assertTrue(ex.getCause() instanceof ConnectorException);
ConnectorException ce = (ConnectorException) ex.getCause();
assertEquals("Invalid value for " + Options.WRITE_FILE_ROWS_DOCUMENT_TYPE + ": not valid; " +
assertEquals("Invalid value for " + Options.WRITE_DOCUMENT_TYPE + ": not valid; " +
"must be one of 'JSON', 'XML', or 'TEXT'.", ce.getMessage());
}

Expand Down