Skip to content

Commit 662f90b

Browse files
Move rechunk after throttle (#341)
1 parent 52474cc commit 662f90b

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

src/main/scala/services/streaming/throughput/base/ThroughputShaper.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ trait ThroughputShaper:
2626
burst <- estimateShapeBurst(chunkSize.Elements, chunkSize.ElementSize)
2727
rate <- estimateShapeRate(chunkSize.Elements, chunkSize.ElementSize)
2828
_ <- zlog(
29-
"Shaping stream using chunkSize %s, burst %s and rate %s/%s (chunks/s)",
29+
"Shaping stream using chunkSize %s, burst %s and rate %s/%s (elements/s)",
3030
chunkSize.Elements.toString,
3131
burst.toString,
3232
rate.Elements.toString,
@@ -35,5 +35,5 @@ trait ThroughputShaper:
3535
yield (Size = chunkSize, Burst = burst, Rate = rate)
3636
}
3737
.flatMap { case (size, burst, rate) =>
38-
stream.rechunk(size.Elements).throttleShape(rate.Elements, rate.Period, burst)(estimateChunkCost)
38+
stream.throttleShape(rate.Elements, rate.Period, burst)(estimateChunkCost).rechunk(size.Elements)
3939
}

0 commit comments

Comments
 (0)