Skip to content

Commit c03dc6c

Browse files
authored
Return shared ArrayPool (#459)
* Update OpenAIEmbedding.cs * Update AsyncWebsocketMessageEnumerator.cs * Update RealtimeConversationSession.cs * Update ConversationTests.cs Not used * Move to finally
1 parent 195e201 commit c03dc6c

File tree

4 files changed

+30
-24
lines changed

4 files changed

+30
-24
lines changed

src/Custom/Embeddings/OpenAIEmbedding.cs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -118,32 +118,36 @@ private static ReadOnlyMemory<float> ConvertToVectorOfFloats(BinaryData binaryDa
118118

119119
// Decode base64 string to bytes.
120120
byte[] bytes = ArrayPool<byte>.Shared.Rent(Base64.GetMaxDecodedFromUtf8Length(base64.Length));
121-
OperationStatus status = Base64.DecodeFromUtf8(base64, bytes.AsSpan(), out int bytesConsumed, out int bytesWritten);
122-
if (status != OperationStatus.Done || bytesWritten % sizeof(float) != 0)
123-
{
124-
ThrowInvalidData();
125-
}
126-
127-
// Interpret bytes as floats
128-
float[] vector = new float[bytesWritten / sizeof(float)];
129-
bytes.AsSpan(0, bytesWritten).CopyTo(MemoryMarshal.AsBytes(vector.AsSpan()));
130-
if (!BitConverter.IsLittleEndian)
131-
{
132-
Span<int> ints = MemoryMarshal.Cast<float, int>(vector.AsSpan());
121+
try
122+
{
123+
OperationStatus status = Base64.DecodeFromUtf8(base64, bytes.AsSpan(), out int bytesConsumed, out int bytesWritten);
124+
if (status != OperationStatus.Done || bytesWritten % sizeof(float) != 0)
125+
{
126+
ThrowInvalidData();
127+
}
128+
129+
// Interpret bytes as floats
130+
float[] vector = new float[bytesWritten / sizeof(float)];
131+
bytes.AsSpan(0, bytesWritten).CopyTo(MemoryMarshal.AsBytes(vector.AsSpan()));
132+
if (!BitConverter.IsLittleEndian)
133+
{
134+
Span<int> ints = MemoryMarshal.Cast<float, int>(vector.AsSpan());
133135
#if NET8_0_OR_GREATER
134-
BinaryPrimitives.ReverseEndianness(ints, ints);
136+
BinaryPrimitives.ReverseEndianness(ints, ints);
135137
#else
136138
for (int i = 0; i < ints.Length; i++)
137139
{
138140
ints[i] = BinaryPrimitives.ReverseEndianness(ints[i]);
139141
}
140142
#endif
143+
}
144+
return new ReadOnlyMemory<float>(vector);
145+
}
146+
finally
147+
{
148+
ArrayPool<byte>.Shared.Return(bytes);
141149
}
142-
143-
ArrayPool<byte>.Shared.Return(bytes);
144-
return new ReadOnlyMemory<float>(vector);
145-
146-
static void ThrowInvalidData() =>
150+
}
151+
static void ThrowInvalidData() =>
147152
throw new FormatException("The input is not a valid Base64 string of encoded floats.");
148-
}
149153
}

src/Custom/RealtimeConversation/Internal/AsyncWebsocketMessageEnumerator.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public AsyncWebsocketMessageResultEnumerator(WebSocket webSocket, CancellationTo
2626

2727
public ValueTask DisposeAsync()
2828
{
29+
ArrayPool<byte>.Shared.Return(_receiveBuffer);
2930
_webSocket?.Dispose();
3031
return new ValueTask(Task.CompletedTask);
3132
}
@@ -50,4 +51,4 @@ public async ValueTask<bool> MoveNextAsync()
5051
Current = ClientResult.FromResponse(websocketPipelineResponse);
5152
return true;
5253
}
53-
}
54+
}

src/Custom/RealtimeConversation/RealtimeConversationSession.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca
5555
}
5656
_isSendingAudioStream = true;
5757
}
58+
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
5859
try
5960
{
60-
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
6161
while (true)
6262
{
6363
int bytesRead = await audio.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
@@ -75,6 +75,7 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca
7575
}
7676
finally
7777
{
78+
ArrayPool<byte>.Shared.Return(buffer);
7879
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
7980
{
8081
_isSendingAudioStream = false;
@@ -93,9 +94,9 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
9394
}
9495
_isSendingAudioStream = true;
9596
}
97+
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
9698
try
9799
{
98-
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
99100
while (true)
100101
{
101102
int bytesRead = audio.Read(buffer, 0, buffer.Length);
@@ -113,6 +114,7 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
113114
}
114115
finally
115116
{
117+
ArrayPool<byte>.Shared.Return(buffer);
116118
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
117119
{
118120
_isSendingAudioStream = false;
@@ -349,4 +351,4 @@ public void Dispose()
349351
{
350352
WebSocket?.Dispose();
351353
}
352-
}
354+
}

tests/RealtimeConversation/ConversationTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,6 @@ public async Task AudioWithToolsWorks(TestAudioSendType audioSendType)
341341
{
342342
byte[] allAudioBytes = await File.ReadAllBytesAsync(inputAudioFilePath, CancellationToken);
343343
const int audioSendBufferLength = 8 * 1024;
344-
byte[] audioSendBuffer = ArrayPool<byte>.Shared.Rent(audioSendBufferLength);
345344
for (int readPos = 0; readPos < allAudioBytes.Length; readPos += audioSendBufferLength)
346345
{
347346
int nextSegmentLength = Math.Min(audioSendBufferLength, allAudioBytes.Length - readPos);

0 commit comments

Comments
 (0)