Skip to content

Commit 943bd93

Browse files
vazoisCopilotCopilot
authored
Add LFBP Instrumentation, Server Shutdown Ordering, and Test TearDown Hardening (#1632)
* Fix dispose hang in network handler and buffer pool cleanup Two bugs caused LimitedFixedBufferPool.Dispose() to hang indefinitely during server teardown (blocking ClusterResetHardDuringDisklessReplicationAttach): 1. TcpNetworkHandlerBase.Dispose() never called DisposeImpl(), so when a handler thread was blocked synchronously (e.g. in TryBeginDisklessSync), the CTS was never cancelled and activeHandlerCount was never decremented. DisposeActiveHandlers() would spin forever waiting for it to reach 0. 2. GarnetTcpNetworkSender.DisposeNetworkSender() disposed the saeaStack but not the current responseObject, leaking a PoolEntry that was never returned to the pool. LimitedFixedBufferPool.Dispose() then spun forever waiting for totalReferences to reach 0. Also adds PoolEntry source tracking infrastructure (PoolEntryBufferType and PoolOwnerType enums) with DEBUG-only diagnostics that log unreturned buffer details after a 5-second timeout during pool disposal. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * use stopwatch only in debug Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * add concurrent dictionary collection only in debug Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * reduce pool entry types to single byte Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Fix PoolEntry double-dispose in DisposeNetworkSender Use Interlocked.Exchange to atomically take ownership of responseObject and ReturnBuffer it back to the saeaStack before disposal. This ensures the PoolEntry is disposed exactly once when saeaStack.Dispose() iterates all items, and avoids a race with ReturnResponseObject() on the handler thread that could cause Debug.Assert(!disposed) to fail. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * revert dispose calls and elevate logging for LFBP dispose timeout * improve TearDown robustness * propagate ownerType flag across clients * fix ClusterReplicationCheckpointCleanupTest * fix ClusterResetHardDuring tests by using ConfigureAwait for ExceptionInjection * add logger for write/read asycn in device * separate dispose from close and configure socket to allow rapid connect * make ExceptionInjection ConfigureAwait(false) * wip; restructuring cluster tests to reduce CI duration * add logging for IDevice implementation * log input to AOF for object and unified session functions * fix ClusterSimpleFailoverAuth --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent 44da71b commit 943bd93

36 files changed

+950
-669
lines changed

benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public override void Start()
110110
{
111111
}
112112

113+
/// <inheritdoc />
114+
public override void Close()
115+
{
116+
}
117+
113118
public bool TryCreateMessageConsumer(Span<byte> bytes, INetworkSender networkSender, out IMessageConsumer session)
114119
{
115120
session = null;

libs/client/ClientSession/GarnetClientSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public GarnetClientSession(
127127

128128
this.usingManagedNetworkPool = networkPool != null;
129129
this.networkBufferSettings = networkBufferSettings;
130-
this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool();
130+
this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.GarnetClientSession, logger: logger);
131131
this.bufferSizeDigits = NumUtils.CountDigits(this.networkBufferSettings.sendBufferSize);
132132

133133
this.logger = logger;

libs/client/GarnetClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public GarnetClient(
210210
public void Connect(CancellationToken token = default)
211211
{
212212
socket = ConnectSendSocketAsync(timeoutMilliseconds).ConfigureAwait(false).GetAwaiter().GetResult();
213-
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, logger);
213+
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, PoolOwnerType.GarnetClient, logger);
214214
networkHandler.StartAsync(sslOptions, EndPoint.ToString(), token).ConfigureAwait(false).GetAwaiter().GetResult();
215215

216216
if (timeoutMilliseconds > 0)
@@ -256,7 +256,7 @@ public void Connect(CancellationToken token = default)
256256
public async Task ConnectAsync(CancellationToken token = default)
257257
{
258258
socket = await ConnectSendSocketAsync(timeoutMilliseconds, token).ConfigureAwait(false);
259-
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, logger);
259+
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, epoch, PoolOwnerType.GarnetClient, logger);
260260
await networkHandler.StartAsync(sslOptions, EndPoint.ToString(), token).ConfigureAwait(false);
261261

262262
if (timeoutMilliseconds > 0)

libs/client/NetworkWriter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ internal sealed class NetworkWriter : IDisposable
7979
/// <summary>
8080
/// Constructor
8181
/// </summary>
82-
public NetworkWriter(GarnetClient serverHook, Socket socket, int messageBufferSize, SslClientAuthenticationOptions sslOptions, out GarnetClientTcpNetworkHandler networkHandler, int sendPageSize, int networkSendThrottleMax, LightEpoch epoch, ILogger logger = null)
82+
public NetworkWriter(GarnetClient serverHook, Socket socket, int messageBufferSize, SslClientAuthenticationOptions sslOptions, out GarnetClientTcpNetworkHandler networkHandler, int sendPageSize, int networkSendThrottleMax, LightEpoch epoch, PoolOwnerType ownerType, ILogger logger = null)
8383
{
8484
this.networkBufferSettings = new NetworkBufferSettings(messageBufferSize, messageBufferSize);
85-
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
85+
this.networkPool = networkBufferSettings.CreateBufferPool(ownerType: ownerType, logger: logger);
8686

8787
if (BufferSize > PageOffset.kPageMask) throw new Exception();
8888
this.networkHandler = networkHandler = new GarnetClientTcpNetworkHandler(serverHook, AsyncFlushPageCallback, socket, networkBufferSettings, networkPool, sslOptions != null, serverHook, networkSendThrottleMax: networkSendThrottleMax, logger: logger);

libs/cluster/Server/Migration/MigrateSessionSlots.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public async Task<bool> MigrateSlotsDriverInline()
2424

2525
#if DEBUG
2626
// Only on Debug mode
27-
ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Migration_Slot_End_Scan_Range_Acquisition).GetAwaiter().GetResult();
27+
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Migration_Slot_End_Scan_Range_Acquisition).ConfigureAwait(false);
2828
#endif
2929

3030
// Send store

libs/cluster/Server/Migration/MigrationManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public MigrationManager(ClusterProvider clusterProvider, ILogger logger = null)
4343
this.clusterProvider = clusterProvider;
4444
var sendBufferSize = 1 << clusterProvider.serverOptions.PageSizeBits();
4545
this.networkBufferSettings = new NetworkBufferSettings(sendBufferSize, initialReceiveBufferSize);
46-
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
46+
this.networkPool = networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.Migration, logger: logger);
4747

4848
logger?.LogInformation("NetworkBufferSettings.sendBufferSize:{sendBufferSize}", networkBufferSettings.sendBufferSize);
4949
logger?.LogInformation("NetworkBufferSettings.initialReceiveBufferSize:{initialReceiveBufferSize}", networkBufferSettings.initialReceiveBufferSize);

libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ public async Task<bool> SendCheckpoint()
310310
const int maxOdcAttempts = 2;
311311
while (true)
312312
{
313+
cts.Token.ThrowIfCancellationRequested();
313314
logger?.LogInformation("AcquireCheckpointEntry iteration {iteration}", iteration);
314315
iteration++;
315316

@@ -338,7 +339,7 @@ public async Task<bool> SendCheckpoint()
338339

339340
#if DEBUG
340341
// Only on Debug mode
341-
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_Wait_After_Checkpoint_Acquisition);
342+
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Replication_Wait_After_Checkpoint_Acquisition).ConfigureAwait(false);
342343
#endif
343344

344345
// Calculate the minimum start address covered by this checkpoint

libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ async Task<string> TryBeginReplicaSync(bool downgradeLock)
128128
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token, resetHandler.Token);
129129

130130
// Exception injection point for testing cluster reset during diskless replication
131-
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_InProgress_During_Diskless_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
131+
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Replication_InProgress_During_Diskless_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
132132

133133
var resp = await gcs.ExecuteAttachSync(syncMetadata.ToByteArray()).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
134134
}

libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ async Task<string> ReplicaSyncAttachTask(bool downgradeLock)
148148
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token, resetHandler.Token);
149149

150150
// Exception injection point for testing cluster reset during disk-based replication
151-
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_InProgress_During_DiskBased_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
151+
await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.Replication_InProgress_During_DiskBased_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
152152
var resp = await gcs.ExecuteReplicaSync(
153153
nodeId,
154154
PrimaryReplId,

libs/cluster/Server/Replication/ReplicationManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
102102
this.pageSizeBits = storeWrapper.appendOnlyFile == null ? 0 : storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits();
103103

104104
networkBufferSettings.Log(logger, nameof(ReplicationManager));
105-
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
105+
this.networkPool = networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.Replication, logger: logger);
106106
ValidateNetworkBufferSettings();
107107

108108
aofProcessor = new AofProcessor(storeWrapper, recordToAof: false, clusterProvider: clusterProvider, logger: logger);

0 commit comments

Comments
 (0)