diff --git a/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs b/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs
index 6479885..a12dea0 100644
--- a/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs
+++ b/src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs
@@ -617,6 +617,10 @@ private static unsafe void OnComplete(int reqSeq, IntPtr state, CompletionReason
finally
{
requestContext.Release();
+
+ // NOTE: We need to dispose the request context in the thread pool.
+ // If we call Dispose on the native thread, we will release the native handles and crash.
+ ThreadPool.UnsafeQueueUserWorkItem(static r => ((RequestContext)r).Dispose(), requestContext);
}
}
diff --git a/src/YetAnotherHttpHandler/RequestContext.cs b/src/YetAnotherHttpHandler/RequestContext.cs
index 431fd40..da78b09 100644
--- a/src/YetAnotherHttpHandler/RequestContext.cs
+++ b/src/YetAnotherHttpHandler/RequestContext.cs
@@ -21,6 +21,7 @@ internal class RequestContext : IDisposable
private readonly bool _hasRequestContextHandleRef;
private GCHandle _handle;
+ private bool _handleReleased;
internal YahaContextSafeHandle _ctxHandle;
internal YahaRequestContextSafeHandle _requestContextHandle;
@@ -69,6 +70,9 @@ public void Allocate()
}
}
+ ///
+ /// Release the managed allocation of RequestContext. This method does not release the native handle, so Dispose must be called separately.
+ ///
public void Release()
{
Debug.Assert(_handle.IsAllocated);
@@ -77,7 +81,7 @@ public void Release()
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Releasing state");
_handle.Free();
_handle = default;
- _fullyCompleted.Set();
+ _fullyCompleted.Set(); // Finalizer thread can release the native handles.
}
}
@@ -125,6 +129,7 @@ private async Task RunReadRequestLoopAsync(CancellationToken cancellationToken)
finally
{
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Completing RunReadRequestLoopAsync");
+ TryReleaseNativeHandles();
}
}
@@ -309,8 +314,34 @@ public void CompleteAsFailed(string errorMessage, uint h2ErrorCode)
_cancellationTokenSource.Cancel(); // Stop reading the request body.
}
+ private void TryReleaseNativeHandles()
+ {
+ Debug.Assert(!_handle.IsAllocated);
+ UnsafeUtilities.RequireRunningOnManagedThread();
+
+ lock (_handleLock)
+ {
+ if (_handleReleased)
+ {
+ return;
+ }
+
+ if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Releasing native handles");
+ // RequestContextHandle can be released after all the processes using it are complete.
+ if (_hasRequestContextHandleRef)
+ {
+ Debug.Assert(!_requestContextHandle.IsClosed);
+ _requestContextHandle.DangerousRelease();
+ }
+ _requestContextHandle.Dispose();
+
+ _handleReleased = true;
+ }
+ }
+
public void Dispose()
{
+ UnsafeUtilities.RequireRunningOnManagedThread();
Dispose(true);
GC.SuppressFinalize(this);
}
@@ -342,13 +373,7 @@ private void Dispose(bool disposing)
_fullyCompleted.Wait();
}
- // RequestContextHandle can be released after all the processes using it are complete.
- if (_hasRequestContextHandleRef)
- {
- Debug.Assert(!_requestContextHandle.IsClosed);
- _requestContextHandle.DangerousRelease();
- }
- _requestContextHandle.Dispose();
+ TryReleaseNativeHandles();
}
}
}
\ No newline at end of file
diff --git a/src/YetAnotherHttpHandler/ResponseContext.cs b/src/YetAnotherHttpHandler/ResponseContext.cs
index 993845d..fa9d706 100644
--- a/src/YetAnotherHttpHandler/ResponseContext.cs
+++ b/src/YetAnotherHttpHandler/ResponseContext.cs
@@ -115,6 +115,7 @@ public void Complete()
WaitForLatestFlush();
_pipe.Writer.Complete();
_completed = true;
+ _tokenRegistration.Dispose();
}
}
@@ -150,6 +151,7 @@ public void CompleteAsFailed(string errorMessage, uint h2ErrorCode)
WaitForLatestFlush();
_pipe.Writer.Complete(ex);
_completed = true;
+ _tokenRegistration.Dispose();
}
}
@@ -164,6 +166,7 @@ public void Cancel()
WaitForLatestFlush();
_pipe.Writer.Complete(new OperationCanceledException(_cancellationToken));
_completed = true;
+ _tokenRegistration.Dispose();
}
}
diff --git a/src/YetAnotherHttpHandler/UnsafeUtilities.cs b/src/YetAnotherHttpHandler/UnsafeUtilities.cs
index 06cc7f9..7b584d2 100644
--- a/src/YetAnotherHttpHandler/UnsafeUtilities.cs
+++ b/src/YetAnotherHttpHandler/UnsafeUtilities.cs
@@ -1,6 +1,6 @@
using System;
using System.Buffers;
-using System.Collections.Generic;
+using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
@@ -59,6 +59,73 @@ public static bool EqualsIgnoreCase(ref byte left, ref byte right, uint length)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static bool IsAsciiCodePoint(uint value) => value <= 0x7Fu;
}
+
+ [Conditional("DEBUG")]
+ public static void RequireRunningOnManagedThread()
+ {
+ // NOTE: This check logic is working only on Windows.
+ if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ return;
+ }
+
+ var threadName = GetCurrentThreadName();
+ if (threadName == "tokio-runtime-worker")
+ {
+ Environment.FailFast($"The current thread is the tokio worker thread.");
+ }
+
+ static string GetCurrentThreadName()
+ {
+ const uint THREAD_QUERY_LIMITED_INFORMATION = 0x0800;
+
+ var threadId = GetCurrentThreadId();
+ var threadName = string.Empty;
+ var threadHandle = OpenThread(THREAD_QUERY_LIMITED_INFORMATION, false, threadId);
+
+ if (threadHandle != IntPtr.Zero)
+ {
+ try
+ {
+ IntPtr threadDescriptionPtr;
+ var result = GetThreadDescription(threadHandle, out threadDescriptionPtr);
+
+ if (result >= 0 && threadDescriptionPtr != IntPtr.Zero)
+ {
+ try
+ {
+ threadName = Marshal.PtrToStringUni(threadDescriptionPtr);
+ }
+ finally
+ {
+ LocalFree(threadDescriptionPtr);
+ }
+ }
+ }
+ finally
+ {
+ CloseHandle(threadHandle);
+ }
+ }
+
+ return threadName ?? string.Empty;
+
+ [DllImport("kernel32.dll", SetLastError = true)]
+ static extern bool CloseHandle(IntPtr hObject);
+
+ [DllImport("kernel32.dll", SetLastError = true)]
+ static extern uint GetCurrentThreadId();
+
+ [DllImport("kernel32.dll", SetLastError = true)]
+ static extern IntPtr OpenThread(uint dwDesiredAccess, bool bInheritHandle, uint dwThreadId);
+
+ [DllImport("kernel32.dll", SetLastError = true, CharSet = CharSet.Unicode)]
+ static extern int GetThreadDescription(IntPtr hThread, out IntPtr ppszThreadDescription);
+
+ [DllImport("kernel32.dll")]
+ static extern IntPtr LocalFree(IntPtr hMem);
+ }
+ }
}
internal readonly ref struct TempUtf8String
diff --git a/test/YetAnotherHttpHandler.Test/Helpers/NativeLibraryResolver.cs b/test/YetAnotherHttpHandler.Test/Helpers/NativeLibraryResolver.cs
index c14af5f..48c425c 100644
--- a/test/YetAnotherHttpHandler.Test/Helpers/NativeLibraryResolver.cs
+++ b/test/YetAnotherHttpHandler.Test/Helpers/NativeLibraryResolver.cs
@@ -10,6 +10,11 @@ public static void Initialize()
{
NativeLibrary.SetDllImportResolver(typeof(Cysharp.Net.Http.YetAnotherHttpHandler).Assembly, (name, assembly, path) =>
{
+ if (!name.Contains("yaha_native") && !name.Contains("Cysharp.Net.Http.YetAnotherHttpHandler.Native"))
+ {
+ return nint.Zero;
+ }
+
var ext = "";
var prefix = "";
var platform = "";
diff --git a/test/YetAnotherHttpHandler.Test/Helpers/TestWebAppServer.cs b/test/YetAnotherHttpHandler.Test/Helpers/TestWebAppServer.cs
index 2e15a37..f39238b 100644
--- a/test/YetAnotherHttpHandler.Test/Helpers/TestWebAppServer.cs
+++ b/test/YetAnotherHttpHandler.Test/Helpers/TestWebAppServer.cs
@@ -1,5 +1,7 @@
+using System.Collections.Concurrent;
using System.Diagnostics;
using Microsoft.AspNetCore.Builder;
+using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection;
@@ -14,12 +16,17 @@ public class TestWebAppServer : IAsyncDisposable
private readonly Task _appTask;
private readonly IHostApplicationLifetime _appLifetime;
private readonly TaskCompletionSource _waitForAppStarted;
+ private readonly ConcurrentDictionary _activeConnectionsById = new();
+ private int _activeConnections;
public int Port { get; }
public bool IsSecure { get; }
public string BaseUri => $"{(IsSecure ? "https" : "http")}://localhost:{Port}";
+ public int ActiveConnections => _activeConnections;
+ public IReadOnlyList ActiveConnectionIds => _activeConnectionsById.Keys.ToArray();
+
private TestWebAppServer(int port, TestWebAppServerListenMode listenMode, ITestOutputHelper? testOutputHelper, Func webAppBuilder, Action? configure)
{
Port = port;
@@ -51,6 +58,21 @@ TestWebAppServerListenMode.SecureHttp2Only or
{
listenOptions.UseHttps();
}
+
+ listenOptions.Use(async (ctx, next) =>
+ {
+ try
+ {
+ Interlocked.Increment(ref _activeConnections);
+ _activeConnectionsById.TryAdd(ctx.ConnectionId, true);
+ await next();
+ }
+ finally
+ {
+ Interlocked.Decrement(ref _activeConnections);
+ _activeConnectionsById.TryRemove(ctx.ConnectionId, out _);
+ }
+ });
});
});
if (testOutputHelper is not null)
diff --git a/test/YetAnotherHttpHandler.Test/Http2TestBase.cs b/test/YetAnotherHttpHandler.Test/Http2TestBase.cs
index b0fae66..fde6c7d 100644
--- a/test/YetAnotherHttpHandler.Test/Http2TestBase.cs
+++ b/test/YetAnotherHttpHandler.Test/Http2TestBase.cs
@@ -307,7 +307,7 @@ public async Task Cancel_Post_SendingBody()
public async Task Cancel_Post_SendingBody_Duplex()
{
// Arrange
- using var httpHandler = CreateHandler();
+ /*using*/ var httpHandler = CreateHandler();
var httpClient = new HttpClient(httpHandler);
await using var server = await LaunchServerAsync();
@@ -320,13 +320,29 @@ public async Task Cancel_Post_SendingBody_Duplex()
Version = HttpVersion.Version20,
Content = content,
};
+
var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeoutToken);
+ var connectionId = response.Headers.TryGetValues("x-connection-id", out var values) ? string.Join(',', values) : string.Empty;
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
var ex = await Record.ExceptionAsync(async () => await response.Content.ReadAsByteArrayAsync(cts.Token).WaitAsync(TimeoutToken));
+ pipe.Writer.Complete();
+ httpHandler.Dispose();
+ httpClient.Dispose();
+ response = null;
+ httpHandler = null;
+ httpClient = null;
+
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ Thread.Sleep(100);
+ GC.Collect();
+ Thread.Sleep(100);
+
// Assert
var operationCanceledException = Assert.IsAssignableFrom(ex);
Assert.Equal(cts.Token, operationCanceledException.CancellationToken);
+ Assert.DoesNotContain(connectionId, server.ActiveConnectionIds);
}
#endif
diff --git a/test/YetAnotherHttpHandler.Test/TestServerForHttp1AndHttp2.cs b/test/YetAnotherHttpHandler.Test/TestServerForHttp1AndHttp2.cs
index d41a28a..5845e64 100644
--- a/test/YetAnotherHttpHandler.Test/TestServerForHttp1AndHttp2.cs
+++ b/test/YetAnotherHttpHandler.Test/TestServerForHttp1AndHttp2.cs
@@ -24,6 +24,13 @@ public static WebApplication BuildApplication(WebApplicationBuilder builder)
var app = builder.Build();
+ // ConnectionId header
+ app.Use((ctx, next) =>
+ {
+ ctx.Response.Headers["x-connection-id"] = ctx.Connection.Id;
+ return next(ctx);
+ });
+
// SessionState
app.Use((ctx, next) =>
{
diff --git a/test/YetAnotherHttpHandler.Test/YetAnotherHttpHandlerTest.cs b/test/YetAnotherHttpHandler.Test/YetAnotherHttpHandlerTest.cs
index 1aba330..87b2fb0 100644
--- a/test/YetAnotherHttpHandler.Test/YetAnotherHttpHandlerTest.cs
+++ b/test/YetAnotherHttpHandler.Test/YetAnotherHttpHandlerTest.cs
@@ -110,7 +110,8 @@ public async Task InitializationFailure()
}
// NOTE: Currently, this test can only be run on Windows.
- [Fact(Skip = "Due to the state remaining from Http2Test.Cancel_Post_SendingBody_Duplex, this test fails. Enable it after fixing that issue.")]
+ [Fact]
+ [OSSkipCondition(OperatingSystems.MacOSX | OperatingSystems.Linux)]
public async Task SetWorkerThreads()
{
GC.Collect();