Skip to content

Commit fefb9f3

Browse files
committed
Fixing a few high Polaris issues
Hoping I finally got synchronized logging working without Polaris complaining.
1 parent eb979d1 commit fefb9f3

File tree

5 files changed

+51
-30
lines changed

5 files changed

+51
-30
lines changed

docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: marklogic_spark
1+
name: docker-tests-marklogic-spark
22

33
services:
44

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright © 2025 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark;
5+
6+
import java.io.Serializable;
7+
8+
/**
9+
* Stateful class that is intended to be used in a singleton manner with synchronized access to its one method.
10+
*/
11+
class ProgressLogger implements Serializable {
12+
13+
static final long serialVersionUID = 1L;
14+
15+
private final long progressInterval;
16+
private final String message;
17+
18+
private long progressCounter;
19+
private long nextProgressInterval;
20+
21+
ProgressLogger(long progressInterval, String message) {
22+
this.progressInterval = progressInterval;
23+
this.message = message;
24+
this.nextProgressInterval = progressInterval;
25+
}
26+
27+
void logProgressIfNecessary(long itemCount) {
28+
if (Util.MAIN_LOGGER.isInfoEnabled() && progressInterval > 0) {
29+
this.progressCounter += itemCount;
30+
if (progressCounter >= nextProgressInterval) {
31+
Util.MAIN_LOGGER.info(message, nextProgressInterval);
32+
nextProgressInterval += progressInterval;
33+
}
34+
}
35+
}
36+
}

marklogic-spark-connector/src/main/java/com/marklogic/spark/ReadProgressLogger.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package com.marklogic.spark;
55

66
import java.io.Serializable;
7-
import java.util.concurrent.atomic.AtomicLong;
87

98
/**
109
* Handles the progress counter for any operation involving reading from MarkLogic. A Spark job/application can only have
@@ -15,25 +14,17 @@ public class ReadProgressLogger implements Serializable {
1514

1615
static final long serialVersionUID = 1L;
1716

18-
private static final AtomicLong progressCounter = new AtomicLong(0);
1917
private static final Object lock = new Object();
20-
private static long progressInterval;
21-
private static long nextProgressInterval;
22-
private static String message;
18+
private static ProgressLogger progressLogger;
2319

2420
public static void initialize(long progressInterval, String message) {
25-
progressCounter.set(0);
26-
ReadProgressLogger.progressInterval = progressInterval;
27-
nextProgressInterval = progressInterval;
28-
ReadProgressLogger.message = message;
21+
progressLogger = new ProgressLogger(progressInterval, message);
2922
}
3023

3124
public static void logProgressIfNecessary(long itemCount) {
32-
if (Util.MAIN_LOGGER.isInfoEnabled() && progressInterval > 0
33-
&& progressCounter.addAndGet(itemCount) >= nextProgressInterval) {
25+
if (Util.MAIN_LOGGER.isInfoEnabled() && progressLogger != null) {
3426
synchronized (lock) {
35-
Util.MAIN_LOGGER.info(message, nextProgressInterval);
36-
nextProgressInterval += progressInterval;
27+
progressLogger.logProgressIfNecessary(itemCount);
3728
}
3829
}
3930
}

marklogic-spark-connector/src/main/java/com/marklogic/spark/WriteProgressLogger.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package com.marklogic.spark;
55

66
import java.io.Serializable;
7-
import java.util.concurrent.atomic.AtomicLong;
87

98
/**
109
* Handles the progress counter for any operation involving writing to MarkLogic. A Spark job/application can only have
@@ -15,25 +14,17 @@ public class WriteProgressLogger implements Serializable {
1514

1615
static final long serialVersionUID = 1L;
1716

18-
private static final AtomicLong progressCounter = new AtomicLong(0);
1917
private static final Object lock = new Object();
20-
private static long progressInterval;
21-
private static long nextProgressInterval;
22-
private static String message;
18+
private static ProgressLogger progressLogger;
2319

2420
public static void initialize(long progressInterval, String message) {
25-
progressCounter.set(0);
26-
WriteProgressLogger.progressInterval = progressInterval;
27-
nextProgressInterval = progressInterval;
28-
WriteProgressLogger.message = message;
21+
progressLogger = new ProgressLogger(progressInterval, message);
2922
}
3023

3124
public static void logProgressIfNecessary(long itemCount) {
32-
if (Util.MAIN_LOGGER.isInfoEnabled() && progressInterval > 0
33-
&& progressCounter.addAndGet(itemCount) >= nextProgressInterval) {
25+
if (Util.MAIN_LOGGER.isInfoEnabled() && progressLogger != null) {
3426
synchronized (lock) {
35-
Util.MAIN_LOGGER.info(message, nextProgressInterval);
36-
nextProgressInterval += progressInterval;
27+
progressLogger.logProgressIfNecessary(itemCount);
3728
}
3829
}
3930
}

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/document/OpticTriplesReader.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
import com.marklogic.client.query.SearchQueryDefinition;
99
import com.marklogic.client.row.RowManager;
1010
import com.marklogic.client.row.RowRecord;
11+
import com.marklogic.client.row.RowSet;
1112
import com.marklogic.client.type.PlanColumn;
1213
import com.marklogic.spark.Options;
1314
import com.marklogic.spark.ReadProgressLogger;
15+
import org.apache.commons.io.IOUtils;
1416
import org.apache.spark.sql.catalyst.InternalRow;
1517
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1618
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -46,6 +48,7 @@ class OpticTriplesReader implements PartitionReader<InternalRow> {
4648
private final long batchSize;
4749
private long progressCounter;
4850

51+
private RowSet<RowRecord> currentRowSet;
4952
private Iterator<RowRecord> currentRowIterator;
5053

5154
public OpticTriplesReader(ForestPartition forestPartition, DocumentContext context) {
@@ -98,7 +101,7 @@ public InternalRow get() {
98101

99102
@Override
100103
public void close() {
101-
// Nothing to close.
104+
IOUtils.closeQuietly(this.currentRowSet);
102105
}
103106

104107
private void readNextBatchOfTriples(List<String> uris) {
@@ -112,8 +115,8 @@ private void readNextBatchOfTriples(List<String> uris) {
112115
}
113116

114117
plan = bindDatatypeAndLang(plan);
115-
116-
currentRowIterator = rowManager.resultRows(plan).iterator();
118+
this.currentRowSet = rowManager.resultRows(plan);
119+
this.currentRowIterator = this.currentRowSet.iterator();
117120
}
118121

119122
/**

0 commit comments

Comments
 (0)