Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public override void Start()
{
}

/// <inheritdoc />
public override void Close()
{
}

public bool TryCreateMessageConsumer(Span<byte> bytes, INetworkSender networkSender, out IMessageConsumer session)
{
session = null;
Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "10.0.103",
"version": "10.0.201",
"rollForward": "latestMajor",
"allowPrerelease": false
}
Expand Down
7 changes: 7 additions & 0 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,14 @@ public void Dispose(bool deleteDir = true)

private void InternalDispose()
{
// Phase 1: Stop listening on all servers to free ports immediately.
for (var i = 0; i < servers.Length; i++)
servers[i]?.Close();

// Phase 2: Dispose the provider (storage engine shutdown — may take time).
Provider?.Dispose();

// Phase 3: Drain active handlers and clean up remaining resources.
for (var i = 0; i < servers.Length; i++)
servers[i]?.Dispose();
subscribeBroker?.Dispose();
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Lua/LuaTimeoutManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ internal Registration RegisterForTimeout(SessionScriptCache cache)
goto tryAgain;
}

// Other threads might update registrations, so check that before returning
// Other threads might update registrations, so check that before returning
checkUnmodified:
if ((updatedRegistrations = Interlocked.CompareExchange(ref registrations, curRegistrations, curRegistrations)) != curRegistrations)
{
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Metrics/Info/GarnetInfoMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ class GarnetInfoMetrics
_ => true
})];

/// <summary>
/// All info sections excluding module-generated ones.
/// </summary>
public static readonly HashSet<InfoMetricsType> AllInfoSet = [.. DefaultInfo.Where(e => e != InfoMetricsType.MODULES)];

MetricsItem[] serverInfo = null;
MetricsItem[] memoryInfo = null;
MetricsItem[] clusterInfo = null;
Expand Down
23 changes: 13 additions & 10 deletions libs/server/Metrics/Info/InfoCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ private bool NetworkINFO()
reset = true;
else if (sbSection.EqualsUpperCaseSpanIgnoringCase("HELP"u8))
help = true;
else if (!sbSection.EqualsUpperCaseSpanIgnoringCase("ALL"u8))
else if (sbSection.EqualsUpperCaseSpanIgnoringCase("ALL"u8))
sections.UnionWith(GarnetInfoMetrics.AllInfoSet);
else if (sbSection.EqualsUpperCaseSpanIgnoringCase("DEFAULT"u8))
sections.UnionWith(GarnetInfoMetrics.DefaultInfo);
else if (sbSection.EqualsUpperCaseSpanIgnoringCase("EVERYTHING"u8))
sections.UnionWith(GarnetInfoMetrics.DefaultInfo);
else if (parseState.TryGetInfoMetricsType(i, out var sectionType))
{
if (parseState.TryGetInfoMetricsType(i, out var sectionType))
{
sections.Add(sectionType);
}
else
{
invalid = true;
invalidSection = parseState.GetString(i);
}
sections.Add(sectionType);
}
else
{
invalid = true;
invalidSection = parseState.GetString(i);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion libs/server/Metrics/Info/InfoHelp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class InfoHelp
{
internal const string HELP = "HELP";
internal const string ALL = "ALL";
internal const string DEFAULT = "DEFAULT";
internal const string EVERYTHING = "EVERYTHING";
internal const string RESET = "RESET";

public static List<string> GetInfoTypeHelpMessage()
Expand All @@ -33,7 +35,9 @@ public static List<string> GetInfoTypeHelpMessage()
$"{nameof(InfoMetricsType.KEYSPACE)}: Database related statistics.",
$"{nameof(InfoMetricsType.MODULES)}: Information related to loaded modules.",
$"{nameof(InfoMetricsType.HLOGSCAN)}: Distribution of records in main store's hybrid log in-memory portion.",
$"{nameof(ALL)}: Return all informational sections.",
$"{nameof(ALL)}: Return all informational sections (excluding module generated ones).",
$"{nameof(DEFAULT)}: Return the default set of informational sections.",
$"{nameof(EVERYTHING)}: Return all informational sections including module generated ones.",
$"{nameof(HELP)}: Print this help message.",
$"{nameof(RESET)}: Reset stats.",
"\r\n",
Expand Down
3 changes: 3 additions & 0 deletions libs/server/Servers/GarnetServerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetw
/// <inheritdoc />
public abstract void Start();

/// <inheritdoc />
public abstract void Close();

/// <inheritdoc />
public virtual void Dispose()
{
Expand Down
33 changes: 28 additions & 5 deletions libs/server/Servers/GarnetServerTcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,47 @@ public GarnetServerTcp(
this.unixSocketPath = unixSocketPath;
this.unixSocketPermission = unixSocketPermission;

listenSocket = endpoint switch
if (endpoint is UnixDomainSocketEndPoint unix)
{
UnixDomainSocketEndPoint unix => new Socket(unix.AddressFamily, SocketType.Stream, ProtocolType.Unspecified),
// UDS Initialization & Cleanup
listenSocket = new Socket(unix.AddressFamily, SocketType.Stream, ProtocolType.Unspecified);
var socketPath = unix.ToString();
if (File.Exists(socketPath))
{
File.Delete(socketPath);
}
}
else
{
// TCP Initialization & Port Reuse
listenSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

_ => new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
};
// Set reuse BEFORE Bind to handle TIME_WAIT states
listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
}

acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptEventArg_Completed;
}

/// <summary>
/// Stop listening for new connections. Frees the listening port
/// without waiting for active connections to drain.
/// </summary>
public override void Close()
{
listenSocket.Close();
}

/// <summary>
/// Dispose
/// </summary>
public override void Dispose()
{
base.Dispose();
// Close listening socket to free the port and stop accepting new connections.
// This also prevents new connections from arriving while DisposeActiveHandlers drains existing ones.
listenSocket.Dispose();
base.Dispose();
acceptEventArg.UserToken = null;
acceptEventArg.Dispose();
networkPool?.Dispose();
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Servers/IGarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ public interface IGarnetServer : IDisposable
/// Start server
/// </summary>
public void Start();

/// <summary>
/// Stop listening for new connections. Frees the listening port
/// without waiting for active connections to drain.
/// </summary>
public void Close();
}
}
7 changes: 4 additions & 3 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,11 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator

// Drain any pending pushes. We have ended the iteration; we know there are no more matching records, so drop through to end it and return false.
if (numPending > 0)
bContext.CompletePending(wait: true);
_ = bContext.CompletePending(wait: true);

IterationComplete:
if (resetCursor) cursor = 0;
IterationComplete:
if (resetCursor)
cursor = 0;
scanFunctions.OnStop(false, scanCursorState.acceptedCount);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ internal OperationStatus ContinuePendingRMW<TInput, TOutput, TContext, TSessionF
TransientXUnlock<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref key, ref stackCtx);
}

// Must do this *after* Unlocking.
// Must do this *after* Unlocking.
CheckRetry:
if (!HandleImmediateRetryStatus(status, sessionFunctions, ref pendingContext))
return status;
Expand Down
12 changes: 6 additions & 6 deletions playground/Bitmap/BitOp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private static void __bitop_simdX128_and(byte* dstBitmap, long dstLen, byte* src
#endregion

#region fillDstTail
fillTail:
fillTail:
if (dstLen > srcLen)
{
batchSize = 8 * 4;
Expand Down Expand Up @@ -530,7 +530,7 @@ private static void __bitop_simdX128_and_long(byte* dstBitmap, long dstLen, byte
#endregion

#region fillDstTail
fillTail:
fillTail:
if (dstLen > srcLen)
{
batchSize = 8 * 4;
Expand Down Expand Up @@ -729,7 +729,7 @@ private static void __bitop_simdX256_and(byte* dstBitmap, long dstLen, byte* src
#endregion

#region fillDstTail
fillTail:
fillTail:
if (dstLen > srcLen)
{
batchSize = 8 * 4;
Expand Down Expand Up @@ -828,7 +828,7 @@ private static void __bitop_multikey_scalar_and(byte* dstPtr, int dstLen, byte**
*(long*)dstCurr = d00;
dstCurr += batchSize;
}
#endregion
#endregion

fillTail:
#region scalar_1x1
Expand Down Expand Up @@ -1046,7 +1046,7 @@ private static void __bitop_multikey_simdX128_and(byte* dstPtr, int dstLen, byte
*(long*)dstCurr = d00;
dstCurr += batchSize;
}
#endregion
#endregion

fillTail:
#region scalar_1x1
Expand Down Expand Up @@ -1266,7 +1266,7 @@ private static void __bitop_multikey_simdX256_and(byte* dstPtr, int dstLen, byte
*(long*)dstCurr = d00;
dstCurr += batchSize;
}
#endregion
#endregion

fillTail:
#region scalar_1x1
Expand Down
Loading
Loading