Skip to content

Commit bce7e91

Browse files
GH-41110: [C#] Handle empty stream in ArrowStreamReaderImplementation (#47098)
### Rationale for this change Fixing #41110. ### What changes are included in this PR? Handle empty stream in `ArrowStreamReaderImplementation`. Similar changes have *not* been made to `ArrowMemoryReaderImplementation` or `ArrowFileReaderImplementation`. ### Are these changes tested? Two basic unit tests have been created to validate the new behavior. This might not be sufficient to cover all cases where an empty stream should be handled without an exception occurring. Original change by @ voidstar69; this takes his change and applies the PR feedback to it. * GitHub Issue: #41110 Lead-authored-by: voidstar69 <[email protected]> Co-authored-by: Curt Hagenlocher <[email protected]> Signed-off-by: Curt Hagenlocher <[email protected]>
1 parent 3aedf1e commit bce7e91

File tree

2 files changed

+44
-7
lines changed

2 files changed

+44
-7
lines changed

csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ protected async ValueTask<RecordBatch> ReadRecordBatchAsync(CancellationToken ca
6868

6969
protected async ValueTask<ReadResult> ReadMessageAsync(CancellationToken cancellationToken)
7070
{
71-
int messageLength = await ReadMessageLengthAsync(throwOnFullRead: false, cancellationToken)
71+
int messageLength = await ReadMessageLengthAsync(throwOnFullRead: false, returnOnEmptyStream: false, cancellationToken)
7272
.ConfigureAwait(false);
7373

7474
if (messageLength == 0)
@@ -116,8 +116,7 @@ protected RecordBatch ReadRecordBatch()
116116

117117
protected ReadResult ReadMessage()
118118
{
119-
int messageLength = ReadMessageLength(throwOnFullRead: false);
120-
119+
int messageLength = ReadMessageLength(throwOnFullRead: false, returnOnEmptyStream: false);
121120
if (messageLength == 0)
122121
{
123122
// reached end
@@ -160,8 +159,12 @@ public override async ValueTask ReadSchemaAsync(CancellationToken cancellationTo
160159
}
161160

162161
// Figure out length of schema
163-
int schemaMessageLength = await ReadMessageLengthAsync(throwOnFullRead: true, cancellationToken)
162+
int schemaMessageLength = await ReadMessageLengthAsync(throwOnFullRead: true, returnOnEmptyStream: true, cancellationToken)
164163
.ConfigureAwait(false);
164+
if (schemaMessageLength == 0)
165+
{
166+
return;
167+
}
165168

166169
using (ArrayPool<byte>.Shared.RentReturn(schemaMessageLength, out Memory<byte> buff))
167170
{
@@ -182,7 +185,11 @@ public override void ReadSchema()
182185
}
183186

184187
// Figure out length of schema
185-
int schemaMessageLength = ReadMessageLength(throwOnFullRead: true);
188+
int schemaMessageLength = ReadMessageLength(throwOnFullRead: true, returnOnEmptyStream: true);
189+
if (schemaMessageLength == 0)
190+
{
191+
return;
192+
}
186193

187194
using (ArrayPool<byte>.Shared.RentReturn(schemaMessageLength, out Memory<byte> buff))
188195
{
@@ -194,13 +201,17 @@ public override void ReadSchema()
194201
}
195202
}
196203

197-
private async ValueTask<int> ReadMessageLengthAsync(bool throwOnFullRead, CancellationToken cancellationToken = default)
204+
private async ValueTask<int> ReadMessageLengthAsync(bool throwOnFullRead, bool returnOnEmptyStream, CancellationToken cancellationToken = default)
198205
{
199206
int messageLength = 0;
200207
using (ArrayPool<byte>.Shared.RentReturn(4, out Memory<byte> lengthBuffer))
201208
{
202209
int bytesRead = await BaseStream.ReadFullBufferAsync(lengthBuffer, cancellationToken)
203210
.ConfigureAwait(false);
211+
if (bytesRead == 0 && returnOnEmptyStream)
212+
{
213+
return 0;
214+
}
204215
if (throwOnFullRead)
205216
{
206217
EnsureFullRead(lengthBuffer, bytesRead);
@@ -233,12 +244,16 @@ private async ValueTask<int> ReadMessageLengthAsync(bool throwOnFullRead, Cancel
233244
return messageLength;
234245
}
235246

236-
private int ReadMessageLength(bool throwOnFullRead)
247+
private int ReadMessageLength(bool throwOnFullRead, bool returnOnEmptyStream)
237248
{
238249
int messageLength = 0;
239250
using (ArrayPool<byte>.Shared.RentReturn(4, out Memory<byte> lengthBuffer))
240251
{
241252
int bytesRead = BaseStream.ReadFullBuffer(lengthBuffer);
253+
if (bytesRead == 0 && returnOnEmptyStream)
254+
{
255+
return 0;
256+
}
242257
if (throwOnFullRead)
243258
{
244259
EnsureFullRead(lengthBuffer, bytesRead);

csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,17 @@ private static async Task TestReaderFromMemory(
131131
await verificationFunc(reader, originalBatch);
132132
}
133133

134+
[Fact]
135+
public void ReadRecordBatch_EmptyStream()
136+
{
137+
using (MemoryStream stream = new())
138+
{
139+
ArrowStreamReader reader = new(stream);
140+
RecordBatch readBatch = reader.ReadNextRecordBatch();
141+
Assert.Null(readBatch);
142+
}
143+
}
144+
134145
[Theory]
135146
[InlineData(true, true)]
136147
[InlineData(true, false)]
@@ -145,6 +156,17 @@ await TestReaderFromStream((reader, originalBatch) =>
145156
}, writeEnd, createDictionaryArray);
146157
}
147158

159+
[Fact]
160+
public async Task ReadRecordBatchAsync_EmptyStream()
161+
{
162+
using (MemoryStream stream = new())
163+
{
164+
ArrowStreamReader reader = new(stream);
165+
RecordBatch readBatch = await reader.ReadNextRecordBatchAsync();
166+
Assert.Null(readBatch);
167+
}
168+
}
169+
148170
[Theory]
149171
[InlineData(true, true)]
150172
[InlineData(true, false)]

0 commit comments

Comments
 (0)