1919
2020package org .apache .spark .sql .comet .execution .arrow
2121
22- import org .apache .arrow .memory .{ BufferAllocator , RootAllocator }
22+ import org .apache .arrow .memory .BufferAllocator
2323import org .apache .arrow .vector .VectorSchemaRoot
2424import org .apache .arrow .vector .types .pojo .Schema
2525import org .apache .spark .TaskContext
@@ -29,12 +29,10 @@ import org.apache.spark.sql.comet.util.Utils
2929import org .apache .spark .sql .types .StructType
3030import org .apache .spark .sql .vectorized .{ColumnarArray , ColumnarBatch }
3131
32+ import org .apache .comet .CometArrowAllocator
3233import org .apache .comet .vector .NativeUtil
3334
3435object CometArrowConverters extends Logging {
35- // TODO: we should reuse the same root allocator in the comet code base?
36- private val rootAllocator : BufferAllocator = new RootAllocator (Long .MaxValue )
37-
3836 // This is similar how Spark converts internal row to Arrow format except that it is transforming
3937 // the result batch to Comet's ColumnarBatch instead of serialized bytes.
4038 // There's another big difference that Comet may consume the ColumnarBatch by exporting it to
@@ -56,7 +54,7 @@ object CometArrowConverters extends Logging {
5654 protected val arrowSchema : Schema = Utils .toArrowSchema(schema, timeZoneId)
5755 // Reuse the same root allocator here.
5856 protected val allocator : BufferAllocator =
59- rootAllocator .newChildAllocator(s " to ${this .getClass.getSimpleName}" , 0 , Long .MaxValue )
57+ CometArrowAllocator .newChildAllocator(s " to ${this .getClass.getSimpleName}" , 0 , Long .MaxValue )
6058 protected val root : VectorSchemaRoot = VectorSchemaRoot .create(arrowSchema, allocator)
6159 protected val arrowWriter : ArrowWriter = ArrowWriter .create(root)
6260
0 commit comments