From a1a29c589fd9eb7435c6954cba6a8932eb447a47 Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Mon, 13 Oct 2025 13:03:20 -0400 Subject: [PATCH] MLE-24717 Some Polaris fixes --- .../com/marklogic/spark/reader/JsonRowDeserializer.java | 5 ++++- .../java/com/marklogic/spark/reader/file/FileUtil.java | 9 ++++++++- .../spark/reader/file/MakeFilePartitionsTest.java | 6 ++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/JsonRowDeserializer.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/JsonRowDeserializer.java index 2d6363ac..fc87ab75 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/JsonRowDeserializer.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/JsonRowDeserializer.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Objects; /** * Handles deserializing a JSON object into a Spark InternalRow. This is accomplished via Spark's JacksonParser. @@ -44,7 +45,9 @@ public JsonRowDeserializer(StructType schema) { } public InternalRow deserializeJson(String json) { - return this.jacksonParser.parse(json, this.jsonParserCreator, this.utf8StringCreator).head(); + var parsedResult = this.jacksonParser.parse(json, this.jsonParserCreator, this.utf8StringCreator); + Objects.requireNonNull(parsedResult, "The output from parsing the JSON should never be null"); + return parsedResult.head(); } private JacksonParser newJacksonParser(StructType schema) { diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/file/FileUtil.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/file/FileUtil.java index 4264eb00..43bf209b 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/file/FileUtil.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/file/FileUtil.java @@ -47,11 +47,18 @@ static ZipEntry findNextFileEntry(ZipInputStream zipInputStream) throws IOExcept } static FilePartition[] makeFilePartitions(String[] files, int numPartitions) { - int filesPerPartition = (int) Math.ceil((double) files.length / (double) numPartitions); + if (numPartitions <= 0) { + // Divide-by-zero protection. + numPartitions = 1; + } + + final int filesPerPartition = (int) Math.ceil((double) files.length / (double) numPartitions); if (files.length < numPartitions) { numPartitions = files.length; } + final FilePartition[] partitions = new FilePartition[numPartitions]; + List currentPartition = new ArrayList<>(); int partitionIndex = 0; for (int i = 0; i < files.length; i++) { diff --git a/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/MakeFilePartitionsTest.java b/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/MakeFilePartitionsTest.java index 491d5d6d..6f016400 100644 --- a/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/MakeFilePartitionsTest.java +++ b/marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/MakeFilePartitionsTest.java @@ -38,4 +38,10 @@ void morePartitionsThanFiles() { assertEquals("B", partitions[1].getPaths().get(0)); assertEquals("C", partitions[2].getPaths().get(0)); } + + @Test + void zeroPartitions() { + FilePartition[] partitions = FileUtil.makeFilePartitions(new String[]{"A", "B", "C"}, 0); + assertEquals(1, partitions.length, "If a value less than 1 is passed in, partitions should default to 1."); + } }