Skip to content

Commit 6f3a71d

Browse files
authored
fix: Fix potential resource leak in native shuffle block reader (apache#2247)
1 parent 0bb3fc6 commit 6f3a71d

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.comet.vector.NativeUtil
3636
* and use Arrow FFI to return the Arrow record batch.
3737
*/
3838
case class NativeBatchDecoderIterator(
39-
var in: InputStream,
39+
in: InputStream,
4040
taskContext: TaskContext,
4141
decodeTime: SQLMetric)
4242
extends Iterator[ColumnarBatch] {
@@ -45,6 +45,7 @@ case class NativeBatchDecoderIterator(
4545
private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
4646
private val native = new Native()
4747
private val nativeUtil = new NativeUtil()
48+
private val tracingEnabled = CometConf.COMET_TRACING_ENABLED.get()
4849
private var currentBatch: ColumnarBatch = null
4950
private var batch = fetchNext()
5051

@@ -167,7 +168,7 @@ case class NativeBatchDecoderIterator(
167168
bytesToRead.toInt,
168169
arrayAddrs,
169170
schemaAddrs,
170-
CometConf.COMET_TRACING_ENABLED.get())
171+
tracingEnabled)
171172
})
172173
decodeTime.add(System.nanoTime() - startTime)
173174

@@ -182,6 +183,7 @@ case class NativeBatchDecoderIterator(
182183
currentBatch = null
183184
}
184185
in.close()
186+
nativeUtil.close()
185187
resetDataBuf()
186188
isClosed = true
187189
}

0 commit comments

Comments
 (0)