26
26
import org .apache .comet .vector .NativeUtil ;
27
27
28
28
/**
29
- * An iterator that can be used to get batches of Arrow arrays from a Spark iterator of
30
- * ColumnarBatch. It will consume input iterator and return Arrow arrays by addresses. This is
31
- * called by native code to retrieve Arrow arrays from Spark through JNI.
29
+ * Iterator for fetching batches from JVM to native code. Usually called via JNI from native
30
+ * ScanExec.
31
+ *
32
+ * <p>Batches are owned by the JVM. Native code can safely access the batch after calling `next` but
33
+ * the native code must not retain references to the batch because the next call to `hasNext` will
34
+ * signal to the JVM that the batch can be closed.
32
35
*/
33
36
public class CometBatchIterator {
34
- final Iterator <ColumnarBatch > input ;
35
- final NativeUtil nativeUtil ;
37
+ private final Iterator <ColumnarBatch > input ;
38
+ private final NativeUtil nativeUtil ;
39
+ private ColumnarBatch previousBatch = null ;
36
40
private ColumnarBatch currentBatch = null ;
37
41
38
42
CometBatchIterator (Iterator <ColumnarBatch > input , NativeUtil nativeUtil ) {
@@ -41,11 +45,16 @@ public class CometBatchIterator {
41
45
}
42
46
43
47
/**
44
- * Fetch the next input batch.
48
+ * Fetch the next input batch and allow the previous batch to be closed (this may not happen
49
+ * immediately).
45
50
*
46
51
* @return Number of rows in next batch or -1 if no batches left.
47
52
*/
48
53
public int hasNext () {
54
+
55
+ // release reference to previous batch
56
+ previousBatch = null ;
57
+
49
58
if (currentBatch == null ) {
50
59
if (input .hasNext ()) {
51
60
currentBatch = input .next ();
@@ -59,7 +68,7 @@ public int hasNext() {
59
68
}
60
69
61
70
/**
62
- * Get the next batches of Arrow arrays.
71
+ * Get the next batch of Arrow arrays.
63
72
*
64
73
* @param arrayAddrs The addresses of the ArrowArray structures.
65
74
* @param schemaAddrs The addresses of the ArrowSchema structures.
@@ -69,8 +78,16 @@ public int next(long[] arrayAddrs, long[] schemaAddrs) {
69
78
if (currentBatch == null ) {
70
79
return -1 ;
71
80
}
81
+
82
+ // export the batch using the Arrow C Data Interface
72
83
int numRows = nativeUtil .exportBatch (arrayAddrs , schemaAddrs , currentBatch );
84
+
85
+ // keep a reference to the exported batch so that it doesn't get garbage collected
86
+ // while the native code is still processing it
87
+ previousBatch = currentBatch ;
88
+
73
89
currentBatch = null ;
90
+
74
91
return numRows ;
75
92
}
76
93
}
0 commit comments