-
Notifications
You must be signed in to change notification settings - Fork 646
Implement graceful shutdown for Garnet server ( #1382 ) #1551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 32 commits
5901c4f
9ea9339
7731df6
597ec57
9e168f4
67a7f2e
5b25407
476d629
9bf52de
11d115b
3b3df07
2489995
9d5017a
5eb025c
ff37e9b
c542789
efd4c75
0b113a7
19b1a13
d77bd6b
f14fb2c
c8da690
2e9a53b
30fc80d
3177001
920c328
8ea0c24
bf05d23
01a2547
65b1391
5145094
cf9c997
514456c
db633e3
de4e462
74f1c0a
7bb2ab2
7cb1e9c
2530260
784f630
bc6580d
e9b0a3e
aaaf45b
714c2f6
08e48a2
c6fcb19
4ccad8e
d37ad7c
60faf96
e0616b7
36a85b9
d29c716
b7d4d25
42d7bd2
e6be406
1a2fff7
fccbbd0
6c883a2
6d97896
9d98495
b682fff
34ebabd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| using System.Runtime.InteropServices; | ||
| using System.Text; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Garnet.cluster; | ||
| using Garnet.common; | ||
| using Garnet.networking; | ||
|
|
@@ -431,6 +432,179 @@ public void Start() | |
| Console.WriteLine("* Ready to accept connections"); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Performs graceful shutdown of the server. | ||
| /// Stops accepting new connections, waits for active connections to complete, commits AOF, and takes checkpoint if needed. | ||
| /// </summary> | ||
| /// <param name="timeout">Timeout for waiting on active connections (default: 30 seconds)</param> | ||
| /// <param name="token">Cancellation token</param> | ||
| /// <returns>Task representing the async shutdown operation</returns> | ||
| public async Task ShutdownAsync(TimeSpan? timeout = null, CancellationToken token = default) | ||
| { | ||
| var shutdownTimeout = timeout ?? TimeSpan.FromSeconds(30); | ||
|
|
||
| try | ||
| { | ||
| // Stop accepting new connections first | ||
| StopListening(); | ||
|
|
||
| // Wait for existing connections to complete | ||
| await WaitForActiveConnectionsAsync(shutdownTimeout, token).ConfigureAwait(false); | ||
|
|
||
| // Commit AOF and take checkpoint if needed | ||
| await FinalizeDataAsync(token).ConfigureAwait(false); | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Force shutdown requested | ||
| } | ||
yuseok-kim-edushare marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| catch (Exception ex) | ||
| { | ||
| logger?.LogError(ex, "Error during graceful shutdown"); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Stop all servers from accepting new connections. | ||
| /// </summary> | ||
| private void StopListening() | ||
| { | ||
| if (servers == null) return; | ||
|
|
||
| logger?.LogDebug("Stopping listeners to prevent new connections..."); | ||
| foreach (var server in servers) | ||
| { | ||
| try | ||
| { | ||
| server?.StopListening(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogWarning(ex, "Error stopping listener"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Waits for active connections to complete within the specified timeout. | ||
| /// </summary> | ||
| private async Task WaitForActiveConnectionsAsync(TimeSpan timeout, CancellationToken token) | ||
| { | ||
| if (servers == null) return; | ||
|
|
||
| // Linked Token : between external token and timeout | ||
| using var cts = CancellationTokenSource.CreateLinkedTokenSource(token); | ||
| cts.CancelAfter(timeout); | ||
|
|
||
| var delays = new[] { 50, 300, 1000 }; | ||
| var delayIndex = 0; | ||
|
|
||
| try | ||
| { | ||
| while (!cts.Token.IsCancellationRequested) | ||
| { | ||
| var activeConnections = GetActiveConnectionCount(); | ||
| if (activeConnections == 0) | ||
| { | ||
| logger?.LogInformation("All connections have been closed gracefully."); | ||
| return; | ||
| } | ||
|
|
||
| logger?.LogInformation("Waiting for {ActiveConnections} active connections to complete...", activeConnections); | ||
|
|
||
| var currentDelay = delays[delayIndex]; | ||
| if (delayIndex < delays.Length - 1) delayIndex++; | ||
|
|
||
| await Task.Delay(currentDelay, cts.Token).ConfigureAwait(false); | ||
| } | ||
| } | ||
| catch (OperationCanceledException) when (token.IsCancellationRequested) | ||
| { | ||
| throw; | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // timeout reached error logging | ||
| logger?.LogWarning("Timeout reached after {TimeoutSeconds} seconds. Some connections may still be active.", | ||
| timeout.TotalSeconds); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogWarning(ex, "Error checking active connections"); | ||
| delayIndex = 0; | ||
yuseok-kim-edushare marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| await Task.Delay(500, token).ConfigureAwait(false); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the current number of active connections directly from server instances. | ||
| /// </summary> | ||
| private long GetActiveConnectionCount() | ||
| { | ||
| long count = 0; | ||
| if (servers != null) | ||
| { | ||
| foreach (var garnetServer in servers) | ||
| { | ||
| if (garnetServer is GarnetServerBase garnetServerBase) | ||
| { | ||
| count += garnetServerBase.get_conn_active(); | ||
| } | ||
yuseok-kim-edushare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| return count; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Commits AOF and takes checkpoint for data durability during shutdown. | ||
| /// </summary> | ||
| private async Task FinalizeDataAsync(CancellationToken token) | ||
| { | ||
| // Commit AOF before checkpoint/shutdown | ||
|
||
| if (opts.EnableAOF) | ||
| { | ||
| logger?.LogDebug("Committing AOF before shutdown..."); | ||
| try | ||
| { | ||
| var commitSuccess = await Store.CommitAOFAsync(token).ConfigureAwait(false); | ||
| if (commitSuccess) | ||
| { | ||
| logger?.LogDebug("AOF committed successfully."); | ||
| } | ||
| else | ||
| { | ||
| logger?.LogInformation("AOF commit skipped (another commit in progress or replica mode)."); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogError(ex, "Error committing AOF during shutdown"); | ||
| } | ||
| } | ||
|
|
||
| // Take checkpoint for tiered storage | ||
| if (opts.EnableStorageTier) | ||
|
||
| { | ||
| logger?.LogDebug("Taking checkpoint for tiered storage..."); | ||
| try | ||
| { | ||
| var checkpointSuccess = Store.TakeCheckpoint(background: false, token); | ||
yuseok-kim-edushare marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (checkpointSuccess) | ||
| { | ||
| logger?.LogDebug("Checkpoint completed successfully."); | ||
| } | ||
| else | ||
| { | ||
| logger?.LogInformation("Checkpoint skipped (another checkpoint in progress or replica mode)."); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogError(ex, "Error taking checkpoint during shutdown"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Dispose store (including log and checkpoint directory) | ||
| /// </summary> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ public class GarnetServerTcp : GarnetServerBase, IServerHook | |
| readonly int networkConnectionLimit; | ||
| readonly string unixSocketPath; | ||
| readonly UnixFileMode unixSocketPermission; | ||
| volatile bool isListening; | ||
|
|
||
| /// <inheritdoc/> | ||
| public override IEnumerable<IMessageConsumer> ActiveConsumers() | ||
|
|
@@ -117,19 +118,43 @@ public override void Start() | |
| } | ||
|
|
||
| listenSocket.Listen(512); | ||
| isListening = true; | ||
| if (!listenSocket.AcceptAsync(acceptEventArg)) | ||
| AcceptEventArg_Completed(null, acceptEventArg); | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public override void StopListening() | ||
| { | ||
| if (!isListening) | ||
| return; | ||
|
|
||
| isListening = false; | ||
| try | ||
| { | ||
| // Close the listen socket to stop accepting new connections | ||
| // This will cause any pending AcceptAsync to complete with an error | ||
| listenSocket.Close(); | ||
| logger?.LogDebug("Stopped accepting new connections on {endpoint}", EndPoint); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogDebug(ex, "Error closing listen socket on {endpoint}", EndPoint); | ||
| } | ||
| } | ||
|
|
||
| private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) | ||
| { | ||
| try | ||
| { | ||
| do | ||
| { | ||
| // Check isListening flag before processing and before calling AcceptAsync again | ||
|
||
| if (!isListening) break; | ||
|
|
||
| if (!HandleNewConnection(e)) break; | ||
| e.AcceptSocket = null; | ||
| } while (!listenSocket.AcceptAsync(e)); | ||
| } while (isListening && !listenSocket.AcceptAsync(e)); | ||
yuseok-kim-edushare marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| // socket disposed | ||
| catch (ObjectDisposedException) { } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can take noSave as an argument here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added argument
noSaveinShutdownAsyncon e9b0a3e
but I don't have any idea good way to noSave = True calling in Garnet
I will think about that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#1004 Request RESP Shutdown Commands and others
And My ShutdownAsync can be baseline to implement RESP Shutdown Save | NoSave