44import com .marklogic .client .eval .EvalResultIterator ;
55import com .marklogic .client .eval .ServerEvaluationCall ;
66import com .marklogic .spark .Options ;
7+ import com .marklogic .spark .ReadProgressLogger ;
78import com .marklogic .spark .reader .JsonRowDeserializer ;
89import org .apache .spark .sql .catalyst .InternalRow ;
910import org .apache .spark .sql .catalyst .expressions .GenericInternalRow ;
@@ -19,6 +20,10 @@ class CustomCodePartitionReader implements PartitionReader<InternalRow> {
1920 private final JsonRowDeserializer jsonRowDeserializer ;
2021 private final DatabaseClient databaseClient ;
2122
23+ // Only needed for logging progress.
24+ private final long batchSize ;
25+ private long progressCounter ;
26+
2227 public CustomCodePartitionReader (CustomCodeContext customCodeContext , String partition ) {
2328 this .databaseClient = customCodeContext .connectToMarkLogic ();
2429 this .serverEvaluationCall = customCodeContext .buildCall (
@@ -31,6 +36,8 @@ public CustomCodePartitionReader(CustomCodeContext customCodeContext, String par
3136 this .serverEvaluationCall .addVariable ("PARTITION" , partition );
3237 }
3338
39+ this .batchSize = customCodeContext .getNumericOption (Options .READ_BATCH_SIZE , 1 , 1 );
40+
3441 this .isCustomSchema = customCodeContext .isCustomSchema ();
3542 this .jsonRowDeserializer = new JsonRowDeserializer (customCodeContext .getSchema ());
3643 }
@@ -49,6 +56,11 @@ public InternalRow get() {
4956 if (this .isCustomSchema ) {
5057 return this .jsonRowDeserializer .deserializeJson (val );
5158 }
59+ progressCounter ++;
60+ if (progressCounter >= batchSize ) {
61+ ReadProgressLogger .logProgressIfNecessary (progressCounter );
62+ progressCounter = 0 ;
63+ }
5264 return new GenericInternalRow (new Object []{UTF8String .fromString (val )});
5365 }
5466
0 commit comments