Skip to content

Commit 8c715ec

Browse files
Add CborReaderProxy to support process CBOR stream in chunks (#3939)
1 parent 5b586e7 commit 8c715ec

File tree

5 files changed

+749
-73
lines changed

5 files changed

+749
-73
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
using Amazon.Runtime.Internal.Util;
16+
using System;
17+
using System.Buffers;
18+
using System.Collections.Generic;
19+
using System.Formats.Cbor;
20+
using System.IO;
21+
22+
namespace Amazon.Extensions.CborProtocol.Internal
23+
{
24+
/// <summary>
25+
/// A streaming CBOR reader that processes large CBOR data streams in chunks without loading
26+
/// the entire payload into memory at once. This class wraps <see cref="CborReader"/>
27+
/// to provide streaming capabilities while maintaining the same reading interface.
28+
/// </summary>
29+
public class CborStreamReader : IDisposable
30+
{
31+
/// <summary>
32+
/// Enum to track the type of CBOR container (map or array)
33+
/// for state management within the CborStreamReader.
34+
/// </summary>
35+
private enum CborContainerType
36+
{
37+
Map,
38+
Array
39+
}
40+
41+
private static readonly ILogger _logger = Logger.GetLogger(typeof(CborStreamReader));
42+
private readonly Stack<CborContainerType> _nestingStack = new Stack<CborContainerType>();
43+
private readonly Stream _stream;
44+
private byte[] _buffer;
45+
private CborReader _internalCborReader;
46+
private int _currentChunkSize;
47+
48+
/// <summary>
49+
/// Initializes a new instance of the <see cref="CborStreamReader"/> class that reads CBOR data
50+
/// from the specified stream.
51+
/// </summary>
52+
/// <param name="stream">The input stream containing CBOR-formatted data.
53+
/// The stream must be readable and remain open for the lifetime of the reader.</param>
54+
/// <exception cref="ArgumentNullException">Thrown when the stream parameter is null.</exception>
55+
/// <remarks>
56+
/// The reader uses a configurable initial buffer size (from <see cref="AWSConfigs.CborReaderInitialBufferSize"/>)
57+
/// and will automatically resize the buffer if needed to handle larger CBOR items.
58+
/// </remarks>
59+
public CborStreamReader(Stream stream)
60+
{
61+
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
62+
_buffer = ArrayPool<byte>.Shared.Rent(AWSConfigs.CborReaderInitialBufferSize);
63+
64+
_currentChunkSize = _stream.Read(_buffer, 0, _buffer.Length);
65+
var memorySlice = new ReadOnlyMemory<byte>(_buffer, 0, _currentChunkSize);
66+
67+
// We must allow multiple root values because when refilling the new chunk is just a fragment of the whole stream.
68+
_internalCborReader = new CborReader(memorySlice, allowMultipleRootLevelValues: true);
69+
}
70+
71+
/// <summary>
72+
/// This method is called when a read operation fails because it needs more
73+
/// data than is currently available in the buffer. It handles stitching leftover
74+
/// data with a new chunk from the stream and, if necessary, resizing the buffer.
75+
/// </summary>
76+
/// <param name="bytesToSkip">Number of bytes to skip before reading new data (e.g., 1 to skip CBOR break byte 0xFF)</param>
77+
private void RefillBuffer(int bytesToSkip = 0)
78+
{
79+
int leftoverBytesCount = _internalCborReader.BytesRemaining;
80+
81+
// Determine where the leftover bytes start
82+
int leftoverStartIndex = _currentChunkSize - leftoverBytesCount;
83+
84+
// If we are skipping bytes, we need to move the start forward
85+
leftoverStartIndex += bytesToSkip;
86+
leftoverBytesCount = leftoverBytesCount - bytesToSkip;
87+
88+
// If the leftover data completely fills the buffer, grow it
89+
if (leftoverBytesCount >= _buffer.Length)
90+
{
91+
int newSize = _buffer.Length * 2;
92+
var newBuffer = ArrayPool<byte>.Shared.Rent(newSize);
93+
94+
Buffer.BlockCopy(_buffer, leftoverStartIndex, newBuffer, 0, leftoverBytesCount);
95+
ArrayPool<byte>.Shared.Return(_buffer);
96+
_buffer = newBuffer;
97+
}
98+
else if (leftoverBytesCount > 0) // Shift leftovers (after skipping) to the beginning of the buffer
99+
{
100+
Buffer.BlockCopy(_buffer, leftoverStartIndex, _buffer, 0, leftoverBytesCount);
101+
}
102+
103+
// Read from stream into buffer after leftovers
104+
int bytesReadFromStream = _stream.Read(_buffer, leftoverBytesCount, _buffer.Length - leftoverBytesCount);
105+
106+
// Update the total size of valid data in our buffer.
107+
_currentChunkSize = leftoverBytesCount + bytesReadFromStream;
108+
109+
// Check for a malformed stream: if we have leftovers but the stream is empty,
110+
// it means the CBOR data was truncated.
111+
if (bytesReadFromStream == 0 && leftoverBytesCount > 0)
112+
{
113+
throw new CborContentException("Stream ended unexpectedly with an incomplete CBOR data item.");
114+
}
115+
116+
var newMemorySlice = new ReadOnlyMemory<byte>(_buffer, 0, _currentChunkSize);
117+
_internalCborReader.Reset(newMemorySlice);
118+
119+
_logger.DebugFormat("Buffer refilled: read {0} byte(s), total in buffer now: {1}.", bytesReadFromStream, _currentChunkSize);
120+
}
121+
122+
/// <summary>
123+
/// Executes a CBOR read operation, refilling the buffer and retrying if a CborContentException is thrown.
124+
/// </summary>
125+
/// <typeparam name="T">The return type of the CBOR read operation.</typeparam>
126+
/// <param name="readOperation">A delegate representing the read operation to execute.</param>
127+
/// <returns>The result of the read operation.</returns>
128+
/// <exception cref="CborContentException">
129+
/// Thrown if too many retries are attempted or if the stream ends unexpectedly.
130+
/// </exception>
131+
private T ExecuteRead<T>(Func<CborReader, T> readOperation)
132+
{
133+
int maxRetries = 64;
134+
int retryCount = 0;
135+
136+
while (true)
137+
{
138+
try
139+
{
140+
return readOperation(_internalCborReader);
141+
}
142+
catch (CborContentException ex)
143+
{
144+
if (_currentChunkSize == 0 && _internalCborReader.BytesRemaining == 0)
145+
{
146+
// Fail fast if we’ve already consumed all input and nothing remains to refill.
147+
throw;
148+
}
149+
150+
if (++retryCount > maxRetries)
151+
{
152+
throw new CborContentException("Too many retries during CBOR stream parsing. Possible malformed or infinite data.", ex);
153+
}
154+
155+
_logger.Debug(ex, "CborContentException caught (attempt #{0}), attempting to refill buffer.", retryCount);
156+
// Attempt to refill and retry the operation.
157+
RefillBuffer();
158+
}
159+
}
160+
}
161+
162+
163+
private bool IsNextByteEndOfContainer()
164+
{
165+
int unreadOffset = _currentChunkSize - _internalCborReader.BytesRemaining;
166+
if (unreadOffset < _currentChunkSize)
167+
return _buffer[unreadOffset] == 0xFF; // 0xFF indicates "break" in indefinite-length map/array
168+
169+
return false;
170+
}
171+
172+
private void ReadEndContainer(CborContainerType expectedType, CborReaderState expectedEndState, Action<CborReader> readEndAction)
173+
{
174+
ExecuteRead(r =>
175+
{
176+
if (_nestingStack.Count == 0 || _nestingStack.Peek() != expectedType)
177+
throw new CborContentException($"Unexpected end of {expectedType.ToString().ToLowerInvariant()}.");
178+
179+
var state = CborReaderState.Finished;
180+
try
181+
{
182+
state = r.PeekState();
183+
}
184+
catch (CborContentException)
185+
{
186+
// This exception is expected in two cases:
187+
// 1. When we've reached the end of the current CBOR buffer chunk and need to refill.
188+
// 2. When the current buffer does not contain the start of the array/map we're trying to end.
189+
// In both cases, the internal CborReader's state will be `Finished` and we will trigger a buffer refill.
190+
// This is not a true error, but logging at Info level can help trace how we arrived at a certain state.
191+
_logger.DebugFormat("CborContentException caught during PeekState while expecting end of {0}", expectedType.ToString().ToLowerInvariant());
192+
}
193+
194+
if (state == expectedEndState)
195+
{
196+
readEndAction(r);
197+
_nestingStack.Pop();
198+
return true;
199+
}
200+
201+
if (state == CborReaderState.Finished)
202+
{
203+
if (IsNextByteEndOfContainer())
204+
{
205+
RefillBuffer(1); // Skip the break marker (0xFF)
206+
}
207+
else
208+
{
209+
RefillBuffer(0); // This means we are in a definite-length map/array which doesn't end with 0xFF.
210+
}
211+
_nestingStack.Pop();
212+
return true;
213+
}
214+
215+
throw new CborContentException($"Expected end of {expectedType.ToString().ToLowerInvariant()} but could not parse it.");
216+
});
217+
}
218+
219+
220+
public void ReadEndMap()
221+
{
222+
ReadEndContainer(CborContainerType.Map, CborReaderState.EndMap, (reader) => reader.ReadEndMap());
223+
}
224+
225+
public void ReadEndArray()
226+
{
227+
ReadEndContainer(CborContainerType.Array, CborReaderState.EndArray, (r) => r.ReadEndArray());
228+
}
229+
230+
231+
public int? ReadStartMap() => ExecuteRead(reader =>
232+
{
233+
var result = reader.ReadStartMap();
234+
_nestingStack.Push(CborContainerType.Map);
235+
return result;
236+
});
237+
238+
public int? ReadStartArray() => ExecuteRead(reader =>
239+
{
240+
var result = reader.ReadStartArray();
241+
_nestingStack.Push(CborContainerType.Array);
242+
return result;
243+
});
244+
245+
246+
public CborReaderState PeekState()
247+
{
248+
return ExecuteRead(r =>
249+
{
250+
try
251+
{
252+
return r.PeekState();
253+
}
254+
catch (CborContentException ex)
255+
{
256+
// Translate a Break code to the appropriate container end state
257+
// based on our own nesting stack.
258+
if (_nestingStack.Count > 0)
259+
{
260+
var inferredState = _nestingStack.Peek() == CborContainerType.Map
261+
? CborReaderState.EndMap
262+
: CborReaderState.EndArray;
263+
264+
_logger.Debug(ex, "CborContentException during PeekState interpreted as {0} due to nesting stack.", inferredState);
265+
return inferredState;
266+
}
267+
// If our stack is empty, it's a genuine error.
268+
throw;
269+
}
270+
});
271+
}
272+
273+
public string ReadTextString() => ExecuteRead(r => r.ReadTextString());
274+
public int ReadInt32() => ExecuteRead(r => r.ReadInt32());
275+
public long ReadInt64() => ExecuteRead(r => r.ReadInt64());
276+
public decimal ReadDecimal() => ExecuteRead(r => r.ReadDecimal());
277+
public double ReadDouble() => ExecuteRead(r => r.ReadDouble());
278+
public bool ReadBoolean() => ExecuteRead(r => r.ReadBoolean());
279+
public float ReadSingle() => ExecuteRead(r => r.ReadSingle());
280+
public CborTag ReadTag() => ExecuteRead(r => r.ReadTag());
281+
public byte[] ReadByteString() => ExecuteRead(r => r.ReadByteString());
282+
public void ReadNull() => ExecuteRead(r => { r.ReadNull(); return true; });
283+
public void SkipValue() => ExecuteRead(r => { r.SkipValue(); return true; });
284+
public int CurrentDepth => _internalCborReader.CurrentDepth;
285+
286+
public void Dispose()
287+
{
288+
if (_buffer != null)
289+
{
290+
ArrayPool<byte>.Shared.Return(_buffer);
291+
_buffer = null;
292+
}
293+
}
294+
}
295+
}

0 commit comments

Comments
 (0)