Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ce26903
Fix dispose hang in network handler and buffer pool cleanup
vazois Mar 16, 2026
ea99150
use stopwatch only in debug
vazois Mar 16, 2026
35627a1
add concurrent dictionary collection only in debug
vazois Mar 16, 2026
4a247aa
reduce pool entry types to single byte
vazois Mar 16, 2026
42c5d0a
Fix PoolEntry double-dispose in DisposeNetworkSender
vazois Mar 16, 2026
8a901fa
revert dispose calls and elevate logging for LFBP dispose timeout
vazois Mar 18, 2026
e113972
improve TearDown robustness
vazois Mar 19, 2026
5a35660
propagate ownerType flag across clients
vazois Mar 19, 2026
0e2117e
fix ClusterReplicationCheckpointCleanupTest
vazois Mar 16, 2026
19ba56b
fix ClusterResetHardDuring tests by using ConfigureAwait for Exceptio…
vazois Mar 18, 2026
e65a0e9
add logger for write/read asycn in device
vazois Mar 19, 2026
797cd90
separate dispose from close and configure socket to allow rapid connect
vazois Mar 20, 2026
dcd7d06
make ExceptionInjection ConfigureAwait(false)
vazois Mar 20, 2026
9a2eb04
wip; restructuring cluster tests to reduce CI duration
vazois Mar 20, 2026
4ace653
Merge remote-tracking branch 'origin/dev' into vazois/fix-lfbp-dispos…
vazois Mar 23, 2026
6df4bd8
add logging for IDevice implementation
vazois Mar 24, 2026
438b64e
Merge remote-tracking branch 'upstream/dev' into vazois/fix-lfbp-disp…
vazois Mar 24, 2026
f5582d1
log input to AOF for object and unified session functions
vazois Mar 24, 2026
a90535a
fix ClusterSimpleFailoverAuth
vazois Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public override void Start()
{
}

/// <inheritdoc />
public override void Close()
{
}

public bool TryCreateMessageConsumer(Span<byte> bytes, INetworkSender networkSender, out IMessageConsumer session)
{
session = null;
Expand Down
2 changes: 1 addition & 1 deletion libs/client/ClientSession/GarnetClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public GarnetClientSession(

this.usingManagedNetworkPool = networkPool != null;
this.networkBufferSettings = networkBufferSettings;
this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool();
this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.GarnetClientSession, logger: logger);
this.bufferSizeDigits = NumUtils.CountDigits(this.networkBufferSettings.sendBufferSize);

this.logger = logger;
Expand Down
4 changes: 2 additions & 2 deletions libs/client/GarnetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public GarnetClient(
public void Connect(CancellationToken token = default)
{
socket = ConnectSendSocketAsync(timeoutMilliseconds).ConfigureAwait(false).GetAwaiter().GetResult();
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, logger);
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, PoolOwnerType.GarnetClient, logger);
networkHandler.StartAsync(sslOptions, EndPoint.ToString(), token).ConfigureAwait(false).GetAwaiter().GetResult();

if (timeoutMilliseconds > 0)
Expand Down Expand Up @@ -256,7 +256,7 @@ public void Connect(CancellationToken token = default)
public async Task ConnectAsync(CancellationToken token = default)
{
socket = await ConnectSendSocketAsync(timeoutMilliseconds, token).ConfigureAwait(false);
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, logger);
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, PoolOwnerType.GarnetClient, logger);
await networkHandler.StartAsync(sslOptions, EndPoint.ToString(), token).ConfigureAwait(false);

if (timeoutMilliseconds > 0)
Expand Down
4 changes: 2 additions & 2 deletions libs/client/NetworkWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ internal sealed class NetworkWriter : IDisposable
/// <summary>
/// Constructor
/// </summary>
public NetworkWriter(GarnetClient serverHook, Socket socket, int messageBufferSize, SslClientAuthenticationOptions sslOptions, out GarnetClientTcpNetworkHandler networkHandler, int sendPageSize, int networkSendThrottleMax, LightEpoch epoch, ILogger logger = null)
public NetworkWriter(GarnetClient serverHook, Socket socket, int messageBufferSize, SslClientAuthenticationOptions sslOptions, out GarnetClientTcpNetworkHandler networkHandler, int sendPageSize, int networkSendThrottleMax, LightEpoch epoch, PoolOwnerType ownerType, ILogger logger = null)
{
this.networkBufferSettings = new NetworkBufferSettings(messageBufferSize, messageBufferSize);
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
this.networkPool = networkBufferSettings.CreateBufferPool(ownerType: ownerType, logger: logger);

if (BufferSize > PageOffset.kPageMask) throw new Exception();
this.networkHandler = networkHandler = new GarnetClientTcpNetworkHandler(serverHook, AsyncFlushPageCallback, socket, networkBufferSettings, networkPool, sslOptions != null, serverHook, networkSendThrottleMax: networkSendThrottleMax, logger: logger);
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Migration/MigrateSessionSlots.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task<bool> MigrateSlotsDriverInline()

#if DEBUG
// Only on Debug mode
ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Migration_Slot_End_Scan_Range_Acquisition).GetAwaiter().GetResult();
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Migration_Slot_End_Scan_Range_Acquisition).ConfigureAwait(false);
#endif

// Send store
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Migration/MigrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public MigrationManager(ClusterProvider clusterProvider, ILogger logger = null)
this.clusterProvider = clusterProvider;
var sendBufferSize = 1 << clusterProvider.serverOptions.PageSizeBits();
this.networkBufferSettings = new NetworkBufferSettings(sendBufferSize, initialReceiveBufferSize);
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
this.networkPool = networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.Migration, logger: logger);

logger?.LogInformation("NetworkBufferSettings.sendBufferSize:{sendBufferSize}", networkBufferSettings.sendBufferSize);
logger?.LogInformation("NetworkBufferSettings.initialReceiveBufferSize:{initialReceiveBufferSize}", networkBufferSettings.initialReceiveBufferSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public async Task<bool> SendCheckpoint()
const int maxOdcAttempts = 2;
while (true)
{
cts.Token.ThrowIfCancellationRequested();
logger?.LogInformation("AcquireCheckpointEntry iteration {iteration}", iteration);
iteration++;

Expand Down Expand Up @@ -338,7 +339,7 @@ public async Task<bool> SendCheckpoint()

#if DEBUG
// Only on Debug mode
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_Wait_After_Checkpoint_Acquisition);
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Replication_Wait_After_Checkpoint_Acquisition).ConfigureAwait(false);
#endif

// Calculate the minimum start address covered by this checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async Task<string> TryBeginReplicaSync(bool downgradeLock)
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token, resetHandler.Token);

// Exception injection point for testing cluster reset during diskless replication
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_InProgress_During_Diskless_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Replication_InProgress_During_Diskless_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);

var resp = await gcs.ExecuteAttachSync(syncMetadata.ToByteArray()).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async Task<string> ReplicaSyncAttachTask(bool downgradeLock)
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token, resetHandler.Token);

// Exception injection point for testing cluster reset during disk-based replication
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_InProgress_During_DiskBased_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Replication_InProgress_During_DiskBased_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
var resp = await gcs.ExecuteReplicaSync(
nodeId,
PrimaryReplId,
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
this.pageSizeBits = storeWrapper.appendOnlyFile == null ? 0 : storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits();

networkBufferSettings.Log(logger, nameof(ReplicationManager));
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
this.networkPool = networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.Replication, logger: logger);
ValidateNetworkBufferSettings();

aofProcessor = new AofProcessor(storeWrapper, recordToAof: false, clusterProvider: clusterProvider, logger: logger);
Expand Down
53 changes: 47 additions & 6 deletions libs/common/ExceptionInjectionHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace Garnet.common
/// </summary>
public static class ExceptionInjectionHelper
{
static object @lock = new();
static TaskCompletionSource<bool> update = new(TaskCreationOptions.RunContinuationsAsynchronously);

/// <summary>
/// Array of exception injection types
/// </summary>
Expand All @@ -39,14 +42,34 @@ public static void EnableException(ExceptionInjectionType exceptionType)
}

ExceptionInjectionTypes[(int)exceptionType] = true;

TaskCompletionSource<bool> release;

lock (@lock)
{
release = update;
update = new(TaskCreationOptions.RunContinuationsAsynchronously);
}
_ = release.TrySetResult(true);
}

/// <summary>
/// Disable exception scenario (NOTE: for tests you need to always call disable at the end of the test to avoid breaking other tests in the line)
/// </summary>
/// <param name="exceptionType"></param>
[Conditional("DEBUG")]
public static void DisableException(ExceptionInjectionType exceptionType) => ExceptionInjectionTypes[(int)exceptionType] = false;
public static void DisableException(ExceptionInjectionType exceptionType)
{
ExceptionInjectionTypes[(int)exceptionType] = false;
TaskCompletionSource<bool> release;

lock (@lock)
{
release = update;
update = new(TaskCreationOptions.RunContinuationsAsynchronously);
}
_ = release.TrySetResult(true);
}

/// <summary>
/// Trigger exception scenario (NOTE: add this to the location where the exception should be emulated/triggered)
Expand Down Expand Up @@ -89,7 +112,7 @@ public static bool TriggerCondition(ExceptionInjectionType exceptionType)
/// </summary>
/// <param name="exceptionType"></param>
/// <returns></returns>
public static async Task WaitOnSet(ExceptionInjectionType exceptionType)
public static async Task ResetAndWaitAsync(ExceptionInjectionType exceptionType)
{
if (exceptionType == ExceptionInjectionType.None)
{
Expand All @@ -98,10 +121,19 @@ public static async Task WaitOnSet(ExceptionInjectionType exceptionType)

if (IsEnabled(exceptionType))
{
// Reset and wait to signaled to go forward
// Reset and wait to be signaled to go forward
DisableException(exceptionType);
while (!IsEnabled(exceptionType))
await Task.Yield();
{
Task task;
lock (@lock)
{
if (IsEnabled(exceptionType))
break;
task = update.Task;
}
await task.ConfigureAwait(false);
}
}
}

Expand All @@ -112,8 +144,17 @@ public static async Task WaitOnSet(ExceptionInjectionType exceptionType)
/// <returns></returns>
public static async Task WaitOnClearAsync(ExceptionInjectionType exceptionType)
{
while (ExceptionInjectionTypes[(int)exceptionType])
await Task.Yield();
while (IsEnabled(exceptionType))
{
Task task;
lock (@lock)
{
if (!IsEnabled(exceptionType))
break;
task = update.Task;
}
await task.ConfigureAwait(false);
}
}
}
}
3 changes: 2 additions & 1 deletion libs/common/LightClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class LightClient : ClientBase, IServerHook, IMessageConsumer
/// <param name="onResponseDelegateUnsafe">Callback that takes in a byte array and length, and returns the number of bytes read and the number of requests processed</param>
/// <param name="BufferSize">Message buffer size.</param>
/// <param name="sslOptions">SSL options</param>
/// <param name="logger">ILogger instance</param>
public unsafe LightClient(
EndPoint endpoint,
int opType,
Expand All @@ -60,7 +61,7 @@ public unsafe LightClient(
: base(endpoint, BufferSize)
{
this.networkBufferSettings = new NetworkBufferSettings(BufferSize, BufferSize);
this.networkPool = networkBufferSettings.CreateBufferPool();
this.networkPool = networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.LightClient, logger: logger);
this.onResponseDelegateUnsafe = onResponseDelegateUnsafe ?? new OnResponseDelegateUnsafe(DefaultLightReceiveUnsafe);
this.opType = opType;
this.BufferSize = BufferSize;
Expand Down
68 changes: 59 additions & 9 deletions libs/common/Memory/LimitedFixedBufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// Licensed under the MIT license.

using System;
#if DEBUG
using System.Collections.Concurrent;
#endif
using System.Diagnostics;
using System.Numerics;
using System.Runtime.CompilerServices;
Expand All @@ -26,6 +29,11 @@ public sealed class LimitedFixedBufferPool : IDisposable
readonly int maxAllocationSize;
readonly ILogger logger;

/// <summary>
/// Pool owner type, packed into byte 1 of each <see cref="PoolEntry.source"/>.
/// </summary>
readonly int ownerByte;

/// <summary>
/// Min allocation size
/// </summary>
Expand All @@ -41,16 +49,29 @@ public sealed class LimitedFixedBufferPool : IDisposable
/// </summary>
int totalOutOfBoundAllocations;

#if DEBUG
/// <summary>
/// Tracks all outstanding (checked-out) pool entries for leak diagnosis.
/// </summary>
readonly ConcurrentDictionary<PoolEntry, byte> outstandingEntries = new();

/// <summary>
/// Timeout in milliseconds for Dispose to wait before logging outstanding entries.
/// </summary>
const int DisposeWaitDiagnosticMs = 5_000;
#endif

/// <summary>
/// Constructor
/// </summary>
public LimitedFixedBufferPool(int minAllocationSize, int maxEntriesPerLevel = 16, int numLevels = 4, ILogger logger = null)
public LimitedFixedBufferPool(int minAllocationSize, int maxEntriesPerLevel = 16, int numLevels = 4, PoolOwnerType ownerType = PoolOwnerType.Unknown, ILogger logger = null)
{
this.minAllocationSize = minAllocationSize;
this.maxAllocationSize = minAllocationSize << (numLevels - 1);
this.maxEntriesPerLevel = maxEntriesPerLevel;
this.numLevels = numLevels;
this.logger = logger;
this.ownerByte = (int)ownerType << 8;
pool = new PoolLevel[numLevels];
}

Expand Down Expand Up @@ -85,6 +106,9 @@ public bool Validate(NetworkBufferSettings settings)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Return(PoolEntry buffer)
{
#if DEBUG
outstandingEntries.TryRemove(buffer, out _);
#endif
var level = Position(buffer.entry.Length);
if (level >= 0)
{
Expand All @@ -107,9 +131,10 @@ public void Return(PoolEntry buffer)
/// Get buffer
/// </summary>
/// <param name="size"></param>
/// <param name="bufferType">Identifies the caller for leak diagnosis.</param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe PoolEntry Get(int size)
public unsafe PoolEntry Get(int size, PoolEntryBufferType bufferType = PoolEntryBufferType.Unknown)
{
if (Interlocked.Increment(ref totalReferences) < 0)
{
Expand All @@ -118,6 +143,8 @@ public unsafe PoolEntry Get(int size)
return null;
}

var source = ownerByte | (int)bufferType;

var level = Position(size);
if (level == -1) Interlocked.Increment(ref totalOutOfBoundAllocations);

Expand All @@ -132,10 +159,19 @@ public unsafe PoolEntry Get(int size)
{
Interlocked.Decrement(ref pool[level].size);
page.Reuse();
page.source = source;
#if DEBUG
outstandingEntries[page] = 0;
#endif
return page;
}
}
return new PoolEntry(size, this);
var entry = new PoolEntry(size, this);
entry.source = source;
#if DEBUG
outstandingEntries[entry] = 0;
#endif
return entry;
}

/// <summary>
Expand All @@ -157,23 +193,37 @@ public void Purge()
}

/// <summary>
/// Dipose pool entries from all levels
/// Dispose pool entries from all levels
/// NOTE:
/// This is used to destroy the instance and reclaim all allocated buffer pool entries.
/// As a consequence it spin waits until totalReferences goes back down to 0 and blocks any future allocations.
/// In DEBUG builds, logs outstanding unreturned entries after a timeout for leak diagnosis.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Dispose()
{
#if HANGDETECT
int count = 0;
#if DEBUG
var sw = Stopwatch.StartNew();
var diagnosed = false;
#endif
while (totalReferences > int.MinValue &&
Interlocked.CompareExchange(ref totalReferences, int.MinValue, 0) != 0)
{
#if HANGDETECT
if (++count % 10000 == 0)
logger?.LogTrace("Dispose iteration {count}, {activeHandlerCount}", count, activeHandlerCount);
#if DEBUG
if (!diagnosed && sw.ElapsedMilliseconds > DisposeWaitDiagnosticMs)
{
diagnosed = true;
var remaining = totalReferences;
var ownerType = (PoolOwnerType)(ownerByte >> 8);
logger?.LogError("LimitedFixedBufferPool.Dispose blocked with {remaining} unreturned references (poolOwner={ownerType}). Outstanding entries:", remaining, ownerType);
foreach (var kvp in outstandingEntries)
{
var entryBufferType = (PoolEntryBufferType)(kvp.Key.source & 0xFF);
var entryOwnerType = (PoolOwnerType)((kvp.Key.source >> 8) & 0xFF);
logger?.LogCritical(" Unreturned buffer: ownerType={ownerType}, bufferType={bufferType}, size={size}",
entryOwnerType, entryBufferType, kvp.Key.entry.Length);
}
}
#endif
Thread.Yield();
}
Expand Down
6 changes: 6 additions & 0 deletions libs/common/Memory/PoolEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public unsafe class PoolEntry : IDisposable
readonly LimitedFixedBufferPool pool;
bool disposed;

/// <summary>
/// Packed source identifier: low byte = <see cref="PoolEntryBufferType"/>, byte 1 = <see cref="PoolOwnerType"/>.
/// Set when the entry is acquired via <see cref="LimitedFixedBufferPool.Get"/>.
/// </summary>
internal int source;

/// <summary>
/// Constructor
/// </summary>
Expand Down
Loading
Loading