Skip to content

Commit f657566

Browse files
perf: optimize compute.Take for fewer memory allocations (apache#557)
### Rationale for this change When writing data to partitioned Iceberg tables using `iceberg-go`'s new partitioned write capability, I found that Arrow's `compute.Take` operation was causing the writes to be significantly slower than unpartitioned writes. Causes: - the `VarBinaryImpl` function here in `arrow-go` was pre-allocating the data buffer with only `meanValueLen` (size for just one string/buffer), not the total size needed for all strings. For, my use case (writing 100k rows at a time) this was causing 15x buffer reallocations as the buffer incrementally scales up. For my iceberg table with 20+ string/binary cols of various size this is significant overhead. - when `VarBinaryImpl` allocates additional space for new items, it would only add the exact amount needed for the new value. This caused O(n) reallocations. ### What changes are included in this PR? 1. Pre-allocate upfront with a better estimate of the necessary buffer size to eliminate repeated reallocations. I somewhat arbitrarily chose a cap of 16MB as a guess at what would be effective at reducing the number of allocations but also trying to be cognizant of the library being used in many bespoke scenarios and not wanting to make massive memory spikes. For my use case, I never hit this 16MB threshold and it could be smaller. I am curious for your input on whether there should be a cap at all or what a reasonable cap would be. 2. Use exponential growth for additional allocations for O(log n) total reallocations. ### Are these changes tested? No unit tests. However, with a dedicated reproducing script that mimics my use case with different write sizes (on a macbook pro m3 max + 64GB ram) average of 3 `table.Append()` calls: | Configuration | 100k rows | 500k rows | 1M rows | 2.5M rows | 10M rows | |--------------|-----------|-----------|---------|-----------|-----------| | **Before** | 4.1s | 53.3s | didn't try | didn't try | didn't try | | **Change 1** | 336ms | 2.41s | 8.75s | cancelled after 5mins | didn't try | | **Change 2** | 227ms | 897ms | 1.72s | 4.10s | 17.1s | ### Are there any user-facing changes? No; just more performant
1 parent a9b0be4 commit f657566

File tree

2 files changed

+237
-2
lines changed

2 files changed

+237
-2
lines changed

arrow/compute/internal/kernels/vector_selection.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,7 +1486,12 @@ func VarBinaryImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecS
14861486
if values.Len > 0 {
14871487
dataLength := rawOffsets[values.Len] - rawOffsets[0]
14881488
meanValueLen := float64(dataLength) / float64(values.Len)
1489-
dataBuilder.reserve(int(meanValueLen))
1489+
estimatedTotalSize := int(meanValueLen * float64(outputLength))
1490+
1491+
// Cap the pre-allocation at a reasonable size
1492+
const maxPreAlloc = 16777216 // 16 MB
1493+
estimatedTotalSize = min(estimatedTotalSize, maxPreAlloc)
1494+
dataBuilder.reserve(estimatedTotalSize)
14901495
}
14911496

14921497
offsetBuilder.reserve(int(outputLength) + 1)
@@ -1503,7 +1508,22 @@ func VarBinaryImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecS
15031508
}
15041509
offset += valSize
15051510
if valSize > OffsetT(spaceAvail) {
1506-
dataBuilder.reserve(int(valSize))
1511+
// Calculate how much total capacity we need
1512+
needed := dataBuilder.len() + int(valSize)
1513+
newCap := dataBuilder.cap()
1514+
1515+
// Double capacity until we have enough space
1516+
// This gives us O(log n) reallocations instead of O(n)
1517+
if newCap == 0 {
1518+
newCap = int(valSize)
1519+
}
1520+
for newCap < needed {
1521+
newCap = newCap * 2
1522+
}
1523+
1524+
// Reserve the additional capacity
1525+
additional := newCap - dataBuilder.len()
1526+
dataBuilder.reserve(additional)
15071527
spaceAvail = dataBuilder.cap() - dataBuilder.len()
15081528
}
15091529
dataBuilder.unsafeAppendSlice(rawData[valOffset : valOffset+valSize])

arrow/compute/vector_selection_test.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,3 +1650,218 @@ func TestFilterKernels(t *testing.T) {
16501650
suite.Run(t, new(FilterKernelWithChunked))
16511651
suite.Run(t, new(FilterKernelWithTable))
16521652
}
1653+
1654+
// Benchmark tests for Take operation with variable-length data
1655+
// These benchmarks test the performance improvements from buffer pre-allocation
1656+
// in VarBinaryImpl for string/binary data reorganization (e.g., partitioning).
1657+
1658+
func BenchmarkTakeString(b *testing.B) {
1659+
// Test various batch sizes and string lengths
1660+
benchmarks := []struct {
1661+
name string
1662+
numRows int64
1663+
avgStrLen int
1664+
stringType arrow.DataType
1665+
}{
1666+
{"SmallBatch_ShortStrings", 1000, 10, arrow.BinaryTypes.String},
1667+
{"MediumBatch_ShortStrings", 10000, 10, arrow.BinaryTypes.String},
1668+
{"LargeBatch_ShortStrings", 50000, 10, arrow.BinaryTypes.String},
1669+
{"XLargeBatch_ShortStrings", 100000, 10, arrow.BinaryTypes.String},
1670+
{"SmallBatch_MediumStrings", 1000, 50, arrow.BinaryTypes.String},
1671+
{"MediumBatch_MediumStrings", 10000, 50, arrow.BinaryTypes.String},
1672+
{"LargeBatch_MediumStrings", 50000, 50, arrow.BinaryTypes.String},
1673+
{"XLargeBatch_MediumStrings", 100000, 50, arrow.BinaryTypes.String},
1674+
{"LargeBatch_ShortStrings_Large", 50000, 10, arrow.BinaryTypes.LargeString},
1675+
{"XLargeBatch_MediumStrings_Large", 100000, 50, arrow.BinaryTypes.LargeString},
1676+
}
1677+
1678+
for _, bm := range benchmarks {
1679+
b.Run(bm.name, func(b *testing.B) {
1680+
mem := memory.NewGoAllocator()
1681+
ctx := compute.WithAllocator(context.Background(), mem)
1682+
1683+
// Create source array with strings of specified average length
1684+
bldr := array.NewBuilder(mem, bm.stringType)
1685+
defer bldr.Release()
1686+
1687+
// Generate test data
1688+
for i := int64(0); i < bm.numRows; i++ {
1689+
// Create varied string content
1690+
str := fmt.Sprintf("value_%d_%s", i, strings.Repeat("x", bm.avgStrLen-10))
1691+
switch b := bldr.(type) {
1692+
case *array.StringBuilder:
1693+
b.Append(str)
1694+
case *array.LargeStringBuilder:
1695+
b.Append(str)
1696+
}
1697+
}
1698+
values := bldr.NewArray()
1699+
defer values.Release()
1700+
1701+
// Create indices that simulate partitioning/reorganization
1702+
// Use a pattern that would be common in partitioned writes:
1703+
// reverse order to maximize data movement
1704+
indicesBldr := array.NewInt64Builder(mem)
1705+
defer indicesBldr.Release()
1706+
for i := bm.numRows - 1; i >= 0; i-- {
1707+
indicesBldr.Append(i)
1708+
}
1709+
indices := indicesBldr.NewArray()
1710+
defer indices.Release()
1711+
1712+
// Reset timer after setup
1713+
b.ResetTimer()
1714+
b.ReportAllocs()
1715+
1716+
// Run benchmark
1717+
for i := 0; i < b.N; i++ {
1718+
result, err := compute.TakeArray(ctx, values, indices)
1719+
if err != nil {
1720+
b.Fatal(err)
1721+
}
1722+
result.Release()
1723+
}
1724+
1725+
// Report throughput
1726+
b.ReportMetric(float64(bm.numRows*int64(b.N))/b.Elapsed().Seconds(), "rows/sec")
1727+
})
1728+
}
1729+
}
1730+
1731+
func BenchmarkTakeStringPartitionPattern(b *testing.B) {
1732+
// Simulate real-world partitioning workload where data is reorganized
1733+
// into multiple partitions (e.g., by timestamp month + host)
1734+
mem := memory.NewGoAllocator()
1735+
ctx := compute.WithAllocator(context.Background(), mem)
1736+
1737+
const numRows = 50000
1738+
const numPartitions = 8
1739+
const avgStrLen = 20
1740+
1741+
// Create source data
1742+
bldr := array.NewLargeStringBuilder(mem)
1743+
defer bldr.Release()
1744+
for i := 0; i < numRows; i++ {
1745+
str := fmt.Sprintf("host_%d_path_%s", i%100, strings.Repeat("x", avgStrLen-15))
1746+
bldr.Append(str)
1747+
}
1748+
values := bldr.NewArray()
1749+
defer values.Release()
1750+
1751+
// Create indices that simulate partitioning by interleaving
1752+
// (every Nth row goes to partition N)
1753+
indicesBldr := array.NewInt64Builder(mem)
1754+
defer indicesBldr.Release()
1755+
for partition := 0; partition < numPartitions; partition++ {
1756+
for i := partition; i < numRows; i += numPartitions {
1757+
indicesBldr.Append(int64(i))
1758+
}
1759+
}
1760+
indices := indicesBldr.NewArray()
1761+
defer indices.Release()
1762+
1763+
b.ResetTimer()
1764+
b.ReportAllocs()
1765+
1766+
for i := 0; i < b.N; i++ {
1767+
result, err := compute.TakeArray(ctx, values, indices)
1768+
if err != nil {
1769+
b.Fatal(err)
1770+
}
1771+
result.Release()
1772+
}
1773+
1774+
b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
1775+
}
1776+
1777+
func BenchmarkTakeMultiColumn(b *testing.B) {
1778+
// Benchmark Take on a record batch with multiple string columns
1779+
// to simulate real-world use cases (e.g., CloudFront logs with 20+ string columns)
1780+
mem := memory.NewGoAllocator()
1781+
ctx := compute.WithAllocator(context.Background(), mem)
1782+
1783+
const numRows = 50000
1784+
const numStringCols = 20
1785+
const avgStrLen = 15
1786+
1787+
// Build schema with multiple string columns
1788+
fields := make([]arrow.Field, numStringCols+3)
1789+
for i := 0; i < numStringCols; i++ {
1790+
fields[i] = arrow.Field{
1791+
Name: fmt.Sprintf("string_col_%d", i),
1792+
Type: arrow.BinaryTypes.LargeString,
1793+
}
1794+
}
1795+
fields[numStringCols] = arrow.Field{Name: "int_col", Type: arrow.PrimitiveTypes.Int64}
1796+
fields[numStringCols+1] = arrow.Field{Name: "float_col", Type: arrow.PrimitiveTypes.Float64}
1797+
fields[numStringCols+2] = arrow.Field{Name: "timestamp_col", Type: arrow.FixedWidthTypes.Timestamp_us}
1798+
schema := arrow.NewSchema(fields, nil)
1799+
1800+
// Build arrays
1801+
arrays := make([]arrow.Array, len(fields))
1802+
defer func() {
1803+
for _, arr := range arrays {
1804+
if arr != nil {
1805+
arr.Release()
1806+
}
1807+
}
1808+
}()
1809+
1810+
// Create string columns
1811+
for col := 0; col < numStringCols; col++ {
1812+
bldr := array.NewLargeStringBuilder(mem)
1813+
for i := 0; i < numRows; i++ {
1814+
str := fmt.Sprintf("col%d_val%d_%s", col, i, strings.Repeat("x", avgStrLen-15))
1815+
bldr.Append(str)
1816+
}
1817+
arrays[col] = bldr.NewArray()
1818+
bldr.Release()
1819+
}
1820+
1821+
// Create numeric columns
1822+
intBldr := array.NewInt64Builder(mem)
1823+
floatBldr := array.NewFloat64Builder(mem)
1824+
tsBldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_us.(*arrow.TimestampType))
1825+
for i := 0; i < numRows; i++ {
1826+
intBldr.Append(int64(i))
1827+
floatBldr.Append(float64(i) * 1.5)
1828+
tsBldr.Append(arrow.Timestamp(1700000000000000 + int64(i)*1000000))
1829+
}
1830+
arrays[numStringCols] = intBldr.NewArray()
1831+
arrays[numStringCols+1] = floatBldr.NewArray()
1832+
arrays[numStringCols+2] = tsBldr.NewArray()
1833+
intBldr.Release()
1834+
floatBldr.Release()
1835+
tsBldr.Release()
1836+
1837+
// Create record batch
1838+
batch := array.NewRecordBatch(schema, arrays, numRows)
1839+
defer batch.Release()
1840+
1841+
// Create indices for partitioning pattern
1842+
indicesBldr := array.NewInt64Builder(mem)
1843+
defer indicesBldr.Release()
1844+
// Reverse order to maximize data movement
1845+
for i := numRows - 1; i >= 0; i-- {
1846+
indicesBldr.Append(int64(i))
1847+
}
1848+
indices := indicesBldr.NewArray()
1849+
defer indices.Release()
1850+
1851+
b.ResetTimer()
1852+
b.ReportAllocs()
1853+
1854+
for i := 0; i < b.N; i++ {
1855+
batchDatum := compute.NewDatum(batch)
1856+
indicesDatum := compute.NewDatum(indices)
1857+
result, err := compute.Take(ctx, *compute.DefaultTakeOptions(), batchDatum, indicesDatum)
1858+
if err != nil {
1859+
b.Fatal(err)
1860+
}
1861+
result.Release()
1862+
batchDatum.Release()
1863+
indicesDatum.Release()
1864+
}
1865+
1866+
b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
1867+
}

0 commit comments

Comments
 (0)