Skip to content

Commit ae35db7

Browse files
authored
feat: Reset data buf of NativeBatchDecoderIterator on close (#2235)
* Reset data buf of NativeBatchDecoderIterator on close * address comment * address comment
1 parent 28d874c commit ae35db7

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ case class NativeBatchDecoderIterator(
4848
private var currentBatch: ColumnarBatch = null
4949
private var batch = fetchNext()
5050

51-
import NativeBatchDecoderIterator.threadLocalDataBuf
51+
import NativeBatchDecoderIterator._
5252

5353
if (taskContext != null) {
5454
taskContext.addTaskCompletionListener[Unit](_ => {
@@ -182,14 +182,24 @@ case class NativeBatchDecoderIterator(
182182
currentBatch = null
183183
}
184184
in.close()
185+
resetDataBuf()
185186
isClosed = true
186187
}
187188
}
188189
}
189190
}
190191

191192
object NativeBatchDecoderIterator {
193+
194+
private val INITIAL_BUFFER_SIZE = 128 * 1024
195+
192196
private val threadLocalDataBuf: ThreadLocal[ByteBuffer] = ThreadLocal.withInitial(() => {
193-
ByteBuffer.allocateDirect(128 * 1024)
197+
ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE)
194198
})
199+
200+
private def resetDataBuf(): Unit = {
201+
if (threadLocalDataBuf.get().capacity() > INITIAL_BUFFER_SIZE) {
202+
threadLocalDataBuf.set(ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE))
203+
}
204+
}
195205
}

0 commit comments

Comments
 (0)