Skip to content

Commit 52474cc

Browse files
Fixed missing aggregation on rowSize estimator (#340)
1 parent 9867cf6 commit 52474cc

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

src/main/scala/services/streaming/throughput/MemoryBoundShaper.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class MemoryBoundShaper(
6161

6262
private def estimateRowSize(schema: Schema): Long =
6363
schema.columns().asScala.map(_.`type`()).foldLeft(0L) { case (agg, tp) =>
64-
tp.typeId() match
64+
val typeSize = tp.typeId() match
6565
// 8L added to each type to hold pointer, since all types are objects
6666
case TypeID.TIME => 4L + 8L
6767
case TypeID.INTEGER => 4L + 8L
@@ -76,6 +76,8 @@ class MemoryBoundShaper(
7676
case TypeID.TIMESTAMP_NANO => 8L + 8L
7777
case _ =>
7878
8L + shaperSettings.meanObjectTypeSizeEstimate.toLong // assume large size for structs, lists, geometry, variant and other less common types
79+
80+
agg + typeSize
7981
}
8082

8183
override def estimateChunkSize: Task[(Elements: Int, ElementSize: Long)] = for

0 commit comments

Comments
 (0)