Skip to content

Commit 1cabcb3

Browse files
m-v-wadamreeve
andauthored
GH-46189 [C#] Use pooled buffers in ArrowStreamWriter (#46190)
### Rationale for this change `ArrowStreamWriter` should not make large heap allocations for temporary buffers. Pooled buffers should be used. All allocations made by `MemoryAllocator` should be disposed before they are finalized. ### What changes are included in this PR? - `ArrowStreamWriter` makes sure all temporary buffers are disposed. - `ArrowStreamWriter` uses `MemoryAllocator` for decompression buffers instead of `MemoryStream`. - Added `TryCompress` method to `ICompressionCodec` and implemented it. ### Are these changes tested? Yes. - `MemoryOwnerDisposal` in `Apache.Arrow.Compression.Tests.ArrowStreamWriterTests` - `MemoryOwnerDisposalSlicedArray` in `Apache.Arrow.Tests.ArrowStreamWriterTests` ### Are there any user-facing changes? Yes. `ICompressionCodec` requires the implementation of a new method 'TryCompress' **This PR includes breaking changes to public APIs.** `ICompressionCodec` requires the implementation of a new method 'TryCompress' * GitHub Issue: #46189 Lead-authored-by: Michael <[email protected]> Co-authored-by: Adam Reeve <[email protected]> Signed-off-by: Curt Hagenlocher <[email protected]>
1 parent 0d71897 commit 1cabcb3

File tree

8 files changed

+268
-98
lines changed

8 files changed

+268
-98
lines changed

csharp/src/Apache.Arrow.Compression/Lz4CompressionCodec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public Lz4CompressionCodec(int? compressionLevel = null)
3333
{
3434
_settings = new LZ4EncoderSettings
3535
{
36-
CompressionLevel = (LZ4Level) compressionLevel,
36+
CompressionLevel = (LZ4Level) compressionLevel
3737
};
3838
}
3939
else

csharp/src/Apache.Arrow.Compression/ZstdCompressionCodec.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
namespace Apache.Arrow.Compression
2222
{
23-
internal sealed class ZstdCompressionCodec : ICompressionCodec
23+
internal sealed class ZstdCompressionCodec : ITryCompressionCodec
2424
{
2525
private readonly Decompressor _decompressor;
2626
private readonly Compressor _compressor;
@@ -52,6 +52,11 @@ public void Compress(ReadOnlyMemory<byte> source, Stream destination)
5252
compressor.Write(source.Span);
5353
}
5454

55+
public bool TryCompress(ReadOnlyMemory<byte> source, Memory<byte> destination, out int bytesWritten)
56+
{
57+
return _compressor.TryWrap(source.Span, destination.Span, out bytesWritten);
58+
}
59+
5560
public void Dispose()
5661
{
5762
_decompressor.Dispose();

csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs

Lines changed: 127 additions & 89 deletions
Large diffs are not rendered by default.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System;
17+
18+
namespace Apache.Arrow.Ipc
19+
{
20+
public interface ITryCompressionCodec : ICompressionCodec
21+
{
22+
/// <summary>
23+
/// Try to write compressed data to a fixed length memory span
24+
/// </summary>
25+
/// <param name="source">The data to compress</param>
26+
/// <param name="destination">Memory to write compressed data to</param>
27+
/// <param name="bytesWritten">The number of bytes written to the destination</param>
28+
/// <returns>true if compression was successful, false if the destination buffer is too small</returns>
29+
bool TryCompress(ReadOnlyMemory<byte> source, Memory<byte> destination, out int bytesWritten);
30+
31+
}
32+
}

csharp/test/Apache.Arrow.Compression.Tests/ArrowStreamReaderTests.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using Apache.Arrow.Ipc;
1717
using System;
1818
using System.Reflection;
19+
using Apache.Arrow.Tests;
1920
using Xunit;
2021

2122
namespace Apache.Arrow.Compression.Tests
@@ -65,6 +66,26 @@ public void ErrorReadingCompressedStreamWithoutCodecFactory()
6566
Assert.Contains("no ICompressionCodecFactory has been configured", exception.Message);
6667
}
6768

69+
[Theory]
70+
[InlineData("ipc_lz4_compression.arrow_stream")]
71+
[InlineData("ipc_zstd_compression.arrow_stream")]
72+
public void MemoryPoolDisposedOnReadCompressedIpcStream(string fileName)
73+
{
74+
var assembly = Assembly.GetExecutingAssembly();
75+
using var stream = assembly.GetManifestResourceStream($"Apache.Arrow.Compression.Tests.Resources.{fileName}");
76+
Assert.NotNull(stream);
77+
var codecFactory = new CompressionCodecFactory();
78+
var allocator = new TestMemoryAllocator();
79+
using var reader = new ArrowStreamReader(stream, allocator, codecFactory, false);
80+
using (var recordBatch = reader.ReadNextRecordBatch())
81+
{
82+
VerifyCompressedIpcFileBatch(recordBatch);
83+
}
84+
Assert.True(allocator.Statistics.Allocations > 0);
85+
Assert.Equal(0, allocator.Rented);
86+
87+
}
88+
6889
private static void VerifyCompressedIpcFileBatch(RecordBatch batch)
6990
{
7091
var intArray = (Int32Array) batch.Column("integers");

csharp/test/Apache.Arrow.Compression.Tests/ArrowStreamWriterTests.cs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using System.IO;
1919
using System.Threading.Tasks;
2020
using Apache.Arrow.Ipc;
21+
using Apache.Arrow.Memory;
2122
using Apache.Arrow.Tests;
2223
using K4os.Compression.LZ4;
2324
using Xunit;
@@ -111,6 +112,20 @@ public void ThrowsForInvalidCompressionLevel(CompressionCodecType codec)
111112
writer.WriteEnd();
112113
});
113114
}
115+
[Theory]
116+
[InlineData(CompressionCodecType.Zstd)]
117+
[InlineData(CompressionCodecType.Lz4Frame)]
118+
public async Task MemoryOwnerDisposal(CompressionCodecType codec)
119+
{
120+
var allocator = new TestMemoryAllocator();
121+
var originalBatch = TestData.CreateSampleRecordBatch(length: 100);
122+
var options = new IpcOptions() { CompressionCodecFactory = new CompressionCodecFactory(), CompressionCodec = codec };
123+
await TestRoundTripRecordBatchesAsync(new List<RecordBatch> () {originalBatch}, options, options.CompressionCodecFactory,
124+
allocator);
125+
Assert.True(allocator.Statistics.Allocations > 0);
126+
// make sure all memory allocated by the writer was disposed
127+
Assert.Equal(0,allocator.Rented);
128+
}
114129

115130
private static void TestRoundTripRecordBatches(
116131
IReadOnlyList<RecordBatch> originalBatches, IpcOptions options, ICompressionCodecFactory codecFactory)
@@ -147,11 +162,11 @@ private static void TestRoundTripRecordBatches(
147162
}
148163

149164
private static async Task TestRoundTripRecordBatchesAsync(
150-
IReadOnlyList<RecordBatch> originalBatches, IpcOptions options, ICompressionCodecFactory codecFactory)
165+
IReadOnlyList<RecordBatch> originalBatches, IpcOptions options, ICompressionCodecFactory codecFactory, MemoryAllocator writerAllocator = null)
151166
{
152167
using var stream = new MemoryStream();
153168

154-
using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options))
169+
using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options, writerAllocator))
155170
{
156171
foreach (var originalBatch in originalBatches)
157172
{

csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using System.Net.Sockets;
2323
using System.Threading.Tasks;
2424
using Apache.Arrow.Ipc;
25+
using Apache.Arrow.Memory;
2526
using Apache.Arrow.Types;
2627
using Xunit;
2728

@@ -239,7 +240,7 @@ private static void TestRoundTripRecordBatches(List<RecordBatch> originalBatches
239240
{
240241
using (MemoryStream stream = new MemoryStream())
241242
{
242-
using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options))
243+
using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options, new TestMemoryAllocator()))
243244
{
244245
foreach (RecordBatch originalBatch in originalBatches)
245246
{
@@ -261,11 +262,11 @@ private static void TestRoundTripRecordBatches(List<RecordBatch> originalBatches
261262
}
262263
}
263264

264-
private static async Task TestRoundTripRecordBatchesAsync(List<RecordBatch> originalBatches, IpcOptions options = null, bool strictCompare = true)
265+
private static async Task TestRoundTripRecordBatchesAsync(List<RecordBatch> originalBatches, IpcOptions options = null, bool strictCompare = true, MemoryAllocator memoryAllocator = null)
265266
{
266267
using (MemoryStream stream = new MemoryStream())
267268
{
268-
using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options))
269+
using (var writer = new ArrowStreamWriter(stream, originalBatches[0].Schema, leaveOpen: true, options, memoryAllocator ?? new TestMemoryAllocator()))
269270
{
270271
foreach (RecordBatch originalBatch in originalBatches)
271272
{
@@ -715,5 +716,24 @@ public async Task WritesEmptyFileAsync()
715716
Assert.Null(readBatch);
716717
SchemaComparer.Compare(originalBatch.Schema, reader.Schema);
717718
}
719+
720+
721+
[Theory]
722+
[InlineData(0, 45)]
723+
[InlineData(3, 45)]
724+
[InlineData(16, 45)]
725+
public async Task MemoryOwnerDisposalSlicedArray(int sliceOffset, int sliceLength)
726+
{
727+
var originalBatch = TestData.CreateSampleRecordBatch(length: 100);
728+
var slicedArrays = originalBatch.Arrays
729+
.Select(array => ArrowArrayFactory.Slice(array, sliceOffset, sliceLength))
730+
.ToList();
731+
var slicedBatch = new RecordBatch(originalBatch.Schema, slicedArrays, sliceLength);
732+
var allocator = new TestMemoryAllocator();
733+
await TestRoundTripRecordBatchesAsync(new List<RecordBatch> () {slicedBatch}, null, false, allocator);
734+
if(sliceOffset % 8 != 0)
735+
Assert.True(allocator.Statistics.Allocations > 0);
736+
Assert.Equal(0,allocator.Rented);
737+
}
718738
}
719739
}

csharp/test/Apache.Arrow.Tests/TestMemoryAllocator.cs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,56 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16+
using System;
1617
using Apache.Arrow.Memory;
1718
using System.Buffers;
19+
using System.Threading;
1820

1921
namespace Apache.Arrow.Tests
2022
{
2123
public class TestMemoryAllocator : MemoryAllocator
2224
{
25+
private int _rented = 0;
26+
public int Rented => _rented;
27+
2328
protected override IMemoryOwner<byte> AllocateInternal(int length, out int bytesAllocated)
2429
{
25-
bytesAllocated = length;
26-
return MemoryPool<byte>.Shared.Rent(length);
30+
var mem = MemoryPool<byte>.Shared.Rent(length);
31+
bytesAllocated = mem.Memory.Length;
32+
Interlocked.Increment(ref _rented);
33+
return new TestMemoryOwner(mem, this);
34+
}
35+
36+
private class TestMemoryOwner : IMemoryOwner<byte>
37+
{
38+
private readonly IMemoryOwner<byte> _inner;
39+
private readonly TestMemoryAllocator _allocator;
40+
private bool _disposed;
41+
42+
public TestMemoryOwner(IMemoryOwner<byte> inner, TestMemoryAllocator allocator)
43+
{
44+
_inner = inner;
45+
_allocator = allocator;
46+
}
47+
48+
public Memory<byte> Memory
49+
{
50+
get
51+
{
52+
if (_disposed)
53+
throw new ObjectDisposedException(nameof(TestMemoryOwner));
54+
return _inner.Memory;
55+
}
56+
}
57+
58+
public void Dispose()
59+
{
60+
if (_disposed)
61+
return;
62+
_disposed = true;
63+
Interlocked.Decrement(ref _allocator._rented);
64+
_inner?.Dispose();
65+
}
2766
}
2867
}
2968
}

0 commit comments

Comments
 (0)