Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions src/Custom/Embeddings/OpenAIEmbedding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,32 +118,36 @@ private static ReadOnlyMemory<float> ConvertToVectorOfFloats(BinaryData binaryDa

// Decode base64 string to bytes.
byte[] bytes = ArrayPool<byte>.Shared.Rent(Base64.GetMaxDecodedFromUtf8Length(base64.Length));
OperationStatus status = Base64.DecodeFromUtf8(base64, bytes.AsSpan(), out int bytesConsumed, out int bytesWritten);
if (status != OperationStatus.Done || bytesWritten % sizeof(float) != 0)
{
ThrowInvalidData();
}

// Interpret bytes as floats
float[] vector = new float[bytesWritten / sizeof(float)];
bytes.AsSpan(0, bytesWritten).CopyTo(MemoryMarshal.AsBytes(vector.AsSpan()));
if (!BitConverter.IsLittleEndian)
{
Span<int> ints = MemoryMarshal.Cast<float, int>(vector.AsSpan());
try
{
OperationStatus status = Base64.DecodeFromUtf8(base64, bytes.AsSpan(), out int bytesConsumed, out int bytesWritten);
if (status != OperationStatus.Done || bytesWritten % sizeof(float) != 0)
{
ThrowInvalidData();
}

// Interpret bytes as floats
float[] vector = new float[bytesWritten / sizeof(float)];
bytes.AsSpan(0, bytesWritten).CopyTo(MemoryMarshal.AsBytes(vector.AsSpan()));
if (!BitConverter.IsLittleEndian)
{
Span<int> ints = MemoryMarshal.Cast<float, int>(vector.AsSpan());
#if NET8_0_OR_GREATER
BinaryPrimitives.ReverseEndianness(ints, ints);
BinaryPrimitives.ReverseEndianness(ints, ints);
#else
for (int i = 0; i < ints.Length; i++)
{
ints[i] = BinaryPrimitives.ReverseEndianness(ints[i]);
}
#endif
}
return new ReadOnlyMemory<float>(vector);
}
finally
{
ArrayPool<byte>.Shared.Return(bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is OK. FWIW, though, we generally don't consider it a bug to not return an ArrayPool array in exceptional situations.

}

ArrayPool<byte>.Shared.Return(bytes);
return new ReadOnlyMemory<float>(vector);

static void ThrowInvalidData() =>
}
static void ThrowInvalidData() =>
throw new FormatException("The input is not a valid Base64 string of encoded floats.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public AsyncWebsocketMessageResultEnumerator(WebSocket webSocket, CancellationTo

public ValueTask DisposeAsync()
{
ArrayPool<byte>.Shared.Return(_receiveBuffer);
Copy link
Contributor

@stephentoub stephentoub Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If disposal is performed multiple times, this will end up returning the buffer multiple times. That needs to be fixed, assuming this enumerator can be returned out to user code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is on me, as I was under the impression that it was safe to return multiple times. However, looks like I'm incorrect.

Copy link
Contributor

@stephentoub stephentoub Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use of ArrayPool looks dangerous in general. The enumerator is yielding references to this array. If the consumer holds on to those yielded elements after disposal and reads from them (which is technically fine for them to do), that's a use-after-free bug.

I think the right answer here is to stop using ArrayPool here, i.e. revert the addition of the Return and change the Rent to just be a normal allocation. That won't be any worse than the state of allocation today because the array was already not being returned. If in the future perf problems demonstrate pooling is important, it can be re-evaluated holistically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it may not be as bad as I feared... tracing through where _recieveBuffer goes, it looks like it may not actually be stored into the yielded object but instead just having its data copied out? If so, using an ArrayPool array would be ok. But we still want to ensure it's only returned to the pool once.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #474 for investigation.

_webSocket?.Dispose();
return new ValueTask(Task.CompletedTask);
}
Expand All @@ -50,4 +51,4 @@ public async ValueTask<bool> MoveNextAsync()
Current = ClientResult.FromResponse(websocketPipelineResponse);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca
}
_isSendingAudioStream = true;
}
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
try
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
while (true)
{
int bytesRead = await audio.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
Expand All @@ -75,6 +75,7 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
{
_isSendingAudioStream = false;
Expand All @@ -93,9 +94,9 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
}
_isSendingAudioStream = true;
}
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
try
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
while (true)
{
int bytesRead = audio.Read(buffer, 0, buffer.Length);
Expand All @@ -113,6 +114,7 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
{
_isSendingAudioStream = false;
Expand Down Expand Up @@ -349,4 +351,4 @@ public void Dispose()
{
WebSocket?.Dispose();
}
}
}
1 change: 0 additions & 1 deletion tests/RealtimeConversation/ConversationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ public async Task AudioWithToolsWorks(TestAudioSendType audioSendType)
{
byte[] allAudioBytes = await File.ReadAllBytesAsync(inputAudioFilePath, CancellationToken);
const int audioSendBufferLength = 8 * 1024;
byte[] audioSendBuffer = ArrayPool<byte>.Shared.Rent(audioSendBufferLength);
for (int readPos = 0; readPos < allAudioBytes.Length; readPos += audioSendBufferLength)
{
int nextSegmentLength = Math.Min(audioSendBufferLength, allAudioBytes.Length - readPos);
Expand Down