Skip to content

Commit 5e51ef6

Browse files
authored
Merge pull request #484 from marklogic/feature/medium-polaris
Fixing medium Polaris issues
2 parents f8993b0 + b884e63 commit 5e51ef6

File tree

16 files changed

+61
-64
lines changed

16 files changed

+61
-64
lines changed

marklogic-spark-api/src/main/java/com/marklogic/spark/core/extraction/TikaTextExtractor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.IOException;
1616
import java.util.LinkedHashMap;
1717
import java.util.Map;
18+
import java.util.Objects;
1819
import java.util.Optional;
1920

2021
public class TikaTextExtractor implements TextExtractor {
@@ -29,6 +30,7 @@ public Optional<ExtractionResult> extractText(DocumentInputs inputs) {
2930
return Optional.empty();
3031
}
3132

33+
Objects.requireNonNull(inputs.getContentAsBytes());
3234
try (ByteArrayInputStream stream = new ByteArrayInputStream(inputs.getContentAsBytes())) {
3335
Metadata metadata = new Metadata();
3436
String extractedText = tika.parseToString(stream, metadata);

marklogic-spark-api/src/main/java/com/marklogic/spark/dom/DOMHelper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import javax.xml.xpath.XPathFactory;
2727
import java.io.ByteArrayOutputStream;
2828
import java.io.StringReader;
29+
import java.util.Objects;
2930

3031
/**
3132
* Simplifies operations with the Java DOM API.
@@ -55,6 +56,7 @@ public Document extractDocument(AbstractWriteHandle handle, String sourceUri) {
5556
}
5657

5758
String xml = HandleAccessor.contentAsString(handle);
59+
Objects.requireNonNull(xml);
5860
return parseXmlString(xml, sourceUri);
5961
}
6062

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
1212

1313
import java.util.Map;
14+
import java.util.Objects;
1415

1516
class DocumentContext extends ContextSupport {
1617

@@ -42,7 +43,9 @@ SearchQueryDefinition buildSearchQuery(DatabaseClient client) {
4243
// REST API allows commas in URIs, but not newlines, so that's safe to use as a delimiter.
4344
String[] uris = null;
4445
if (hasOption(Options.READ_DOCUMENTS_URIS)) {
45-
uris = getStringOption(Options.READ_DOCUMENTS_URIS).split("\n");
46+
String value = getStringOption(Options.READ_DOCUMENTS_URIS);
47+
Objects.requireNonNull(value);
48+
uris = value.split("\n");
4649
}
4750
return new SearchQueryBuilder()
4851
.withStringQuery(props.get(Options.READ_DOCUMENTS_STRING_QUERY))
@@ -61,7 +64,9 @@ SearchQueryDefinition buildTriplesSearchQuery(DatabaseClient client) {
6164
final Map<String, String> props = getProperties();
6265
String[] uris = null;
6366
if (hasOption(Options.READ_TRIPLES_URIS)) {
64-
uris = getStringOption(Options.READ_TRIPLES_URIS).split("\n");
67+
String value = getStringOption(Options.READ_TRIPLES_URIS);
68+
Objects.requireNonNull(value);
69+
uris = value.split("\n");
6570
}
6671
return new SearchQueryBuilder()
6772
.withStringQuery(props.get(Options.READ_TRIPLES_STRING_QUERY))

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/document/DocumentRowSchema.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.apache.spark.sql.types.StructField;
1212
import org.apache.spark.sql.types.StructType;
1313

14+
import java.util.Objects;
15+
1416
public abstract class DocumentRowSchema {
1517

1618
public static final StructType SCHEMA = new StructType()
@@ -71,16 +73,19 @@ public static DocumentMetadataHandle makeDocumentMetadata(InternalRow row) {
7173
private static void addCollectionsToMetadata(InternalRow row, DocumentMetadataHandle metadata) {
7274
if (!row.isNullAt(3)) {
7375
ArrayData collections = row.getArray(3);
76+
Objects.requireNonNull(collections);
7477
for (int i = 0; i < collections.numElements(); i++) {
75-
String value = collections.get(i, DataTypes.StringType).toString();
76-
metadata.getCollections().add(value);
78+
Object value = collections.get(i, DataTypes.StringType);
79+
Objects.requireNonNull(value);
80+
metadata.getCollections().add(value.toString());
7781
}
7882
}
7983
}
8084

8185
private static void addPermissionsToMetadata(InternalRow row, DocumentMetadataHandle metadata) {
8286
if (!row.isNullAt(4)) {
8387
MapData permissions = row.getMap(4);
88+
Objects.requireNonNull(permissions);
8489
ArrayData roles = permissions.keyArray();
8590
ArrayData capabilities = permissions.valueArray();
8691
for (int i = 0; i < roles.numElements(); i++) {
@@ -109,6 +114,7 @@ private static void addPropertiesToMetadata(InternalRow row, DocumentMetadataHan
109114
private static void addMetadataValuesToMetadata(InternalRow row, DocumentMetadataHandle metadata) {
110115
if (!row.isNullAt(7)) {
111116
MapData properties = row.getMap(7);
117+
Objects.requireNonNull(properties);
112118
ArrayData keys = properties.keyArray();
113119
ArrayData values = properties.valueArray();
114120
for (int i = 0; i < keys.numElements(); i++) {

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.LoggerFactory;
2525

2626
import java.util.List;
27+
import java.util.Objects;
2728
import java.util.Set;
2829

2930
/**
@@ -116,7 +117,9 @@ public InternalRow get() {
116117
DocumentRecord document = this.currentDocumentPage.next();
117118
DocumentRowBuilder builder = new DocumentRowBuilder(requestedMetadata).withUri(document.getUri());
118119
if (this.contentWasRequested) {
119-
builder.withContent(document.getContent(new BytesHandle()).get());
120+
BytesHandle content = document.getContent(new BytesHandle());
121+
Objects.requireNonNull(content);
122+
builder.withContent(content.get());
120123
builder.withFormat(document.getFormat() != null ? document.getFormat().toString() : Format.UNKNOWN.toString());
121124
}
122125
if (!requestedMetadata.isEmpty()) {

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/document/OpticTriplesReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.net.URISyntaxException;
2626
import java.util.Iterator;
2727
import java.util.List;
28+
import java.util.Objects;
2829

2930
/**
3031
* Reads triples from a batch of document URIs via the Optic fromTriples data accessor.
@@ -105,6 +106,7 @@ public void close() {
105106
}
106107

107108
private void readNextBatchOfTriples(List<String> uris) {
109+
Objects.requireNonNull(uris);
108110
PlanBuilder.ModifyPlan plan = op
109111
.fromTriples(op.pattern(op.col("subject"), op.col("predicate"), op.col(OBJECT_COLUMN), op.graphCol(GRAPH_COLUMN)))
110112
.where(op.cts.documentQuery(op.xs.stringSeq(uris.toArray(new String[0]))));

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.IOException;
1717
import java.util.ArrayList;
1818
import java.util.List;
19+
import java.util.Objects;
1920
import java.util.zip.ZipEntry;
2021
import java.util.zip.ZipInputStream;
2122

@@ -173,7 +174,7 @@ private boolean readMetadataFollowedByContent() throws IOException {
173174

174175
// We still do this to get the stream ready to read the next entry.
175176
ZipEntry contentZipEntry = FileUtil.findNextFileEntry(currentZipInputStream);
176-
177+
Objects.requireNonNull(contentZipEntry);
177178
DocumentRowBuilder rowBuilder = new DocumentRowBuilder(this.metadataCategories)
178179
.withUri(contentZipEntry.getName())
179180
.withMetadata(metadata);

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/file/MlcpMetadataConverter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.HashMap;
1818
import java.util.List;
1919
import java.util.Map;
20+
import java.util.Objects;
2021

2122
/**
2223
* Handles converting an MLCP metadata document, generated when creating an MLCP archive, into a
@@ -54,6 +55,9 @@ private Format getFormat(Element mlcpMetadata) {
5455
Element format = mlcpMetadata.getChild("format");
5556
if (format != null && format.getChild("name") != null) {
5657
String value = format.getChildText("name");
58+
if (value == null) {
59+
return null;
60+
}
5761
// MLCP uses "text()" for an unknown reason.
5862
if (value.startsWith("text")) {
5963
value = "text";
@@ -137,6 +141,7 @@ private void addPermissions(Element mlcpMetadata, DocumentMetadataHandle restMet
137141
Element perms = this.saxBuilder.build(new StringReader(permString.getText())).getRootElement();
138142
for (Element perm : perms.getChildren("permission", SECURITY_NAMESPACE)) {
139143
String capability = perm.getChildText("capability", SECURITY_NAMESPACE);
144+
Objects.requireNonNull(capability);
140145
DocumentMetadataHandle.Capability cap = DocumentMetadataHandle.Capability.valueOf(capability.toUpperCase());
141146
String roleId = perm.getChildText("role-id", SECURITY_NAMESPACE);
142147
String roleName = roleIdsToNames.get(roleId);

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/file/xml/UriElementExtractingReader.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,24 @@
1313
*/
1414
class UriElementExtractingReader extends StreamReaderDelegate {
1515

16-
private XMLStreamReader source;
16+
private XMLStreamReader reader;
1717
private final String uriNamespace;
1818
private final String uriElement;
1919

2020
// Used to track when the URI element is detected.
2121
private boolean isReadingUriElement;
2222
private String uriValue;
2323

24-
UriElementExtractingReader(XMLStreamReader source, String uriNamespace, String uriElement) {
25-
super(source);
26-
this.source = source;
24+
UriElementExtractingReader(XMLStreamReader reader, String uriNamespace, String uriElement) {
25+
super(reader);
26+
this.reader = reader;
2727
this.uriNamespace = uriNamespace;
2828
this.uriElement = uriElement;
2929
}
3030

3131
@Override
3232
public int next() throws XMLStreamException {
33-
int value = source.next();
33+
int value = super.next();
3434
if (value == XMLStreamConstants.START_ELEMENT) {
3535
// Only use the first instance of the URI element that is found.
3636
if (matchesUriElement() && this.uriValue == null) {
@@ -39,7 +39,7 @@ public int next() throws XMLStreamException {
3939
}
4040
} else if (value == XMLStreamConstants.CHARACTERS) {
4141
if (this.isReadingUriElement) {
42-
this.uriValue += source.getText();
42+
this.uriValue += reader.getText();
4343
}
4444
} else if (value == XMLStreamConstants.END_ELEMENT && this.isReadingUriElement && matchesUriElement()) {
4545
this.isReadingUriElement = false;
@@ -48,8 +48,8 @@ public int next() throws XMLStreamException {
4848
}
4949

5050
private boolean matchesUriElement() {
51-
return source.getLocalName().equals(uriElement) &&
52-
(this.uriNamespace == null || this.uriNamespace.equals(source.getNamespaceURI()));
51+
return reader.getLocalName().equals(uriElement) &&
52+
(this.uriNamespace == null || this.uriNamespace.equals(reader.getNamespaceURI()));
5353
}
5454

5555
String getUriValue() {

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/optic/PlanAnalyzer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private PlanAnalysis readRowsInSingleCallToMarkLogic(String dslQuery) {
7171
return new PlanAnalysis(plan, Arrays.asList(PlanAnalysis.Partition.singleCallPartition()), 0);
7272
}
7373

74-
static List<PlanAnalysis.Partition> calculatePartitions(long rowCount, long userPartitionCount, long userBatchSize) {
74+
static List<PlanAnalysis.Partition> calculatePartitions(final long rowCount, final long userPartitionCount, final long userBatchSize) {
7575
final long batchSize = userBatchSize > 0 ? userBatchSize : Long.parseLong("-1");
7676

7777
long bucketsPerPartition = calculateBucketsPerPartition(rowCount, userPartitionCount, batchSize);
@@ -91,8 +91,9 @@ static List<PlanAnalysis.Partition> calculatePartitions(long rowCount, long user
9191
* The number of buckets per partition is always the same, as the random distribution of row IDs means we don't know
9292
* how rows will be distributed across buckets.
9393
*/
94-
private static long calculateBucketsPerPartition(long rowCount, long userPartitionCount, long batchSize) {
95-
double rawBucketsPerPartition = ((double) rowCount / userPartitionCount) / batchSize;
94+
private static long calculateBucketsPerPartition(final long rowCount, final long userPartitionCount, final long batchSize) {
95+
final long divisor = userPartitionCount == 0 ? 1 : userPartitionCount;
96+
double rawBucketsPerPartition = ((double) rowCount / divisor) / batchSize;
9697
// ceil is used here to ensure that given the batch size, a bucket typically will not have more rows in it
9798
// than the batch size. That's not guaranteed, as row IDs could have a distribution such that many rows are in
9899
// one particular bucket.

0 commit comments

Comments
 (0)