Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,22 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover
// Used to delete old checkpoints and cleanup and also cleanup during attachment to new primary
replicationManager.AddCheckpointEntry(entry, storeType, full);

// Truncate AOF
SafeTruncateAOF(CheckpointCoveredAofAddress);
}

/// <inheritdoc />
public void SafeTruncateAOF(long truncateUntil)
{
if (clusterManager.CurrentConfig.LocalNodeRole == NodeRole.PRIMARY)
_ = replicationManager.SafeTruncateAof(CheckpointCoveredAofAddress);
_ = replicationManager.SafeTruncateAof(truncateUntil);
else
{
if (serverOptions.FastAofTruncate)
storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(CheckpointCoveredAofAddress, truncateLog: true);
storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(truncateUntil, truncateLog: true);
else
{
storeWrapper.appendOnlyFile?.TruncateUntil(CheckpointCoveredAofAddress);
storeWrapper.appendOnlyFile?.TruncateUntil(truncateUntil);
if (!serverOptions.EnableFastCommit) storeWrapper.appendOnlyFile?.Commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async Task PrepareForSync()
else
{
// Reset replica database in preparation for full sync
Sessions[i].SetFlushTask(Sessions[i].ExecuteAsync(["FLUSHALL"]));
Sessions[i].SetFlushTask(Sessions[i].ExecuteAsync(["CLUSTER", "FLUSHALL"]));
}
}
catch (Exception ex)
Expand Down
1 change: 1 addition & 0 deletions libs/cluster/Session/ClusterCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ private void ProcessClusterCommands(RespCommand command, out bool invalidParamet
RespCommand.CLUSTER_FAILOVER => NetworkClusterFailover(out invalidParameters),
RespCommand.CLUSTER_FAILREPLICATIONOFFSET => NetworkClusterFailReplicationOffset(out invalidParameters),
RespCommand.CLUSTER_FAILSTOPWRITES => NetworkClusterFailStopWrites(out invalidParameters),
RespCommand.CLUSTER_FLUSHALL => NetworkClusterFlushAll(out invalidParameters),
RespCommand.CLUSTER_FORGET => NetworkClusterForget(out invalidParameters),
RespCommand.CLUSTER_GOSSIP => NetworkClusterGossip(out invalidParameters),
RespCommand.CLUSTER_GETKEYSINSLOT => NetworkClusterGetKeysInSlot(out invalidParameters),
Expand Down
23 changes: 23 additions & 0 deletions libs/cluster/Session/RespClusterReplicationCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,5 +478,28 @@ private bool NetworkClusterSync(out bool invalidParameters)

return true;
}

/// <summary>
/// Implements CLUSTER FLUSHALL
/// </summary>
/// <returns></returns>
private bool NetworkClusterFlushAll(out bool invalidParameters)
{
invalidParameters = false;

// Expecting exactly 0 arguments
if (parseState.Count != 0)
{
invalidParameters = true;
return true;
}

// Flush all keys
clusterProvider.storeWrapper.ExecuteFlushDb(RespCommand.FLUSHALL, false, 0);

while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
return true;
}
}
}
7 changes: 3 additions & 4 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@
{
"Command": "ACL_GETUSER",
"Name": "ACL|GETUSER",
"Summary": "Returns the rules defined for an ACL user.",
"Summary": "Lists the ACL rules of a user.",
"Group": "Server",
"Complexity": "O(1) amortized time considering the typical user.",
"Complexity": "O(N). Where N is the number of password, command and pattern rules that the user has.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "USERNAME",
"DisplayText": "username",
"Type": "String",
"ArgumentFlags": "Multiple"
"Type": "String"
}
]
},
Expand Down
26 changes: 15 additions & 11 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,6 @@
"response_policy:all_succeeded"
]
},
{
"Command": "ACL_GETUSER",
"Name": "ACL|GETUSER",
"Arity": 3,
"Flags": "Admin, Loading, NoScript, Stale",
"AclCategories": "Admin, Dangerous, Slow",
"Tips": [
"request_policy:all_nodes",
"response_policy:all_succeeded"
]
},
{
"Command": "ACL_LIST",
"Name": "ACL|LIST",
Expand Down Expand Up @@ -83,6 +72,13 @@
"Arity": 2,
"Flags": "Loading, NoScript, Stale",
"AclCategories": "Slow"
},
{
"Command": "ACL_GETUSER",
"Name": "ACL|GETUSER",
"Arity": 3,
"Flags": "Admin, Loading, NoScript, Stale",
"AclCategories": "Admin, Dangerous, Slow"
}
]
},
Expand Down Expand Up @@ -953,6 +949,14 @@
"Arity": 4,
"Flags": "Admin, NoMulti, NoScript",
"AclCategories": "Admin, Dangerous, Slow, Garnet"
},
{
"Command": "CLUSTER_FLUSHALL",
"Name": "CLUSTER|FLUSHALL",
"IsInternal": true,
"Arity": 2,
"Flags": "Admin, NoMulti, NoScript",
"AclCategories": "Admin, Dangerous, Slow, Garnet"
}
]
},
Expand Down
11 changes: 10 additions & 1 deletion libs/server/AOF/AofEntryType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public enum AofEntryType : byte
/// StoredProcedure
/// </summary>
StoredProcedure = 0x50,
/// <summary>
/// Flush all
/// </summary>
FlushAll = 0x60,
/// <summary>
/// Flush db
/// </summary>
FlushDb = 0x61,
}

internal enum AofStoreType : byte
Expand All @@ -85,6 +93,7 @@ internal enum AofStoreType : byte
ObjectStoreType = 0x1,
TxnType = 0x2,
ReplicationType = 0x3,
CheckpointType = 0x4
CheckpointType = 0x4,
FlushDbType = 0x5,
}
}
10 changes: 10 additions & 0 deletions libs/server/AOF/AofHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ struct AofHeader
/// </summary>
[FieldOffset(12)]
public int sessionID;
/// <summary>
/// Unsafe truncate log (used with FLUSH command)
/// </summary>
[FieldOffset(1)]
public byte unsafeTruncateLog;
/// <summary>
/// Database ID (used with FLUSH command)
/// </summary>
[FieldOffset(3)]
public byte databaseId;

public AofHeader()
{
Expand Down
8 changes: 8 additions & 0 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic
if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion)
storeWrapper.objectStore.SetVersion(header.storeVersion);
break;
case AofEntryType.FlushAll:
storeWrapper.ExecuteFlushDb(RespCommand.FLUSHALL, unsafeTruncateLog: header.unsafeTruncateLog == 1, databaseId: header.databaseId);
break;
case AofEntryType.FlushDb:
storeWrapper.ExecuteFlushDb(RespCommand.FLUSHDB, unsafeTruncateLog: header.unsafeTruncateLog == 1, databaseId: header.databaseId);
break;
case AofEntryType.ObjectStoreStreamingCheckpointEndCommit:
Debug.Assert(storeWrapper.serverOptions.ReplicaDisklessSync);
break;
Expand Down Expand Up @@ -411,6 +417,7 @@ bool SkipRecord(AofHeader header)
AofStoreType.TxnType => false,
AofStoreType.ReplicationType => false,
AofStoreType.CheckpointType => false,
AofStoreType.FlushDbType => false,
_ => throw new GarnetException($"Unknown AOF header store type {storeType}"),
};
}
Expand All @@ -424,6 +431,7 @@ static AofStoreType ToAofStoreType(AofEntryType type)
AofEntryType.TxnStart or AofEntryType.TxnCommit or AofEntryType.TxnAbort or AofEntryType.StoredProcedure => AofStoreType.TxnType,
AofEntryType.MainStoreCheckpointStartCommit or AofEntryType.ObjectStoreCheckpointStartCommit or AofEntryType.MainStoreStreamingCheckpointStartCommit or AofEntryType.ObjectStoreStreamingCheckpointStartCommit => AofStoreType.CheckpointType,
AofEntryType.MainStoreCheckpointEndCommit or AofEntryType.ObjectStoreCheckpointEndCommit or AofEntryType.MainStoreStreamingCheckpointEndCommit or AofEntryType.ObjectStoreStreamingCheckpointEndCommit => AofStoreType.CheckpointType,
AofEntryType.FlushAll or AofEntryType.FlushDb => AofStoreType.FlushDbType,
_ => throw new GarnetException($"Conversion to AofStoreType not possible for {type}"),
};
}
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Cluster/IClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ public interface IClusterProvider : IDisposable
/// </summary>
void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCoveredAofAddress, Guid storeCheckpointToken, Guid objectStoreCheckpointToken);

/// <summary>
/// Safe truncate AOF until address
/// </summary>
/// <param name="truncateUntil"></param>
void SafeTruncateAOF(long truncateUntil);

/// <summary>
/// Start cluster operations
/// </summary>
Expand Down
26 changes: 17 additions & 9 deletions libs/server/Resp/BasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,13 @@ private bool NetworkFLUSHDB()
return AbortWithWrongNumberOfArguments(nameof(RespCommand.FLUSHDB));
}

if (storeWrapper.serverOptions.EnableCluster && storeWrapper.clusterProvider.IsReplica() && !clusterSession.ReadWriteSession)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_FLUSHALL_READONLY_REPLICA, ref dcurr, dend))
SendAndReset();
return true;
}

FlushDb(RespCommand.FLUSHDB);

return true;
Expand All @@ -902,11 +909,18 @@ private bool NetworkFLUSHDB()
/// </summary>
private bool NetworkFLUSHALL()
{
if (parseState.Count > 2)
if (parseState.Count > 3)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.FLUSHALL));
}

if (storeWrapper.serverOptions.EnableCluster && storeWrapper.clusterProvider.IsReplica() && !clusterSession.ReadWriteSession)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_FLUSHALL_READONLY_REPLICA, ref dcurr, dend))
SendAndReset();
return true;
}

// Since Garnet currently only supports a single database,
// FLUSHALL and FLUSHDB share the same logic
FlushDb(RespCommand.FLUSHALL);
Expand Down Expand Up @@ -1663,21 +1677,15 @@ void FlushDb(RespCommand cmd)
}

if (async)
Task.Run(() => ExecuteFlushDb(unsafeTruncateLog)).ConfigureAwait(false);
Task.Run(() => storeWrapper.ExecuteFlushDb(cmd, unsafeTruncateLog, 0)).ConfigureAwait(false);
else
ExecuteFlushDb(unsafeTruncateLog);
storeWrapper.ExecuteFlushDb(cmd, unsafeTruncateLog, 0);

logger?.LogInformation($"Running {nameof(cmd)} {{async}} {{mode}}", async ? "async" : "sync", unsafeTruncateLog ? " with unsafetruncatelog." : string.Empty);
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}

void ExecuteFlushDb(bool unsafeTruncateLog)
{
storeWrapper.store.Log.ShiftBeginAddress(storeWrapper.store.Log.TailAddress, truncateLog: unsafeTruncateLog);
storeWrapper.objectStore?.Log.ShiftBeginAddress(storeWrapper.objectStore.Log.TailAddress, truncateLog: unsafeTruncateLog);
}

/// <summary>
/// Writes a string describing the given session into the string builder.
/// Does not append a new line.
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_INVALID_CLIENT_UNBLOCK_REASON => "ERR CLIENT UNBLOCK reason should be TIMEOUT or ERROR"u8;
public static ReadOnlySpan<byte> RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK => "UNBLOCKED client unblocked via CLIENT UNBLOCK"u8;
public static ReadOnlySpan<byte> RESP_ERR_INVALID_ETAG => "ETAG must be a numerical value greater than or equal to 0"u8;
public static ReadOnlySpan<byte> RESP_ERR_FLUSHALL_READONLY_REPLICA => "ERR You can't write against a read only replica."u8;
public static ReadOnlySpan<byte> RESP_ERR_DEUBG_DISALLOWED =>
@"ERR DEBUG command not allowed. If the EnableDebugCommand option is set to ""local"", you can run it from a local connection, otherwise you need to set this option in the configuration file, and then restart the server."u8;

Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ public enum RespCommand : ushort
CLUSTER_FAILOVER,
CLUSTER_FAILREPLICATIONOFFSET,
CLUSTER_FAILSTOPWRITES,
CLUSTER_FLUSHALL,
CLUSTER_FORGET,
CLUSTER_GETKEYSINSLOT,
CLUSTER_GOSSIP,
Expand Down Expand Up @@ -2106,6 +2107,10 @@ private RespCommand SlowParseCommand(ref int count, ref ReadOnlySpan<byte> speci
{
return RespCommand.CLUSTER_FAILSTOPWRITES;
}
else if (subCommand.SequenceEqual(CmdStrings.FLUSHALL))
{
return RespCommand.CLUSTER_FLUSHALL;
}
else if (subCommand.SequenceEqual(CmdStrings.SETCONFIGEPOCH))
{
return RespCommand.CLUSTER_SETCONFIGEPOCH;
Expand Down
25 changes: 25 additions & 0 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -938,5 +938,30 @@ public bool HasKeysInSlots(List<int> slots)

return false;
}

public void ExecuteFlushDb(RespCommand cmd, bool unsafeTruncateLog, byte databaseId)
{
store.Log.ShiftBeginAddress(store.Log.TailAddress, truncateLog: unsafeTruncateLog);
objectStore?.Log.ShiftBeginAddress(objectStore.Log.TailAddress, truncateLog: unsafeTruncateLog);

if (serverOptions.EnableCluster && serverOptions.EnableAOF)
{
clusterProvider.SafeTruncateAOF(appendOnlyFile.TailAddress);
if (clusterProvider.IsPrimary())
{
AofHeader header = new()
{
opType = cmd == RespCommand.FLUSHDB ? AofEntryType.FlushDb : AofEntryType.FlushAll,
storeVersion = 0,
sessionID = -1,
unsafeTruncateLog = unsafeTruncateLog ? (byte)0 : (byte)1,
databaseId = databaseId
};
appendOnlyFile?.Enqueue(header, out _);
}
}
else
appendOnlyFile?.TruncateUntil(appendOnlyFile.TailAddress);
}
}
}
7 changes: 7 additions & 0 deletions playground/CommandInfoUpdater/GarnetCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,13 @@
"Summary": "Processes a forwarded published message from a node in the same shard",
"Group": "Cluster",
"Complexity": "O(1)"
},
{
"Command": "CLUSTER_FLUSHALL",
"Name": "CLUSTER|FLUSHALL",
"Summary": "Sent by primary to replica to force to FLUSH its database",
"Group": "Cluster",
"Complexity": "O(1)"
}
]
}
Expand Down
13 changes: 13 additions & 0 deletions playground/CommandInfoUpdater/GarnetCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@
"KeySpecifications": null,
"SubCommands": null
},
{
"Command": "CLUSTER_FLUSHALL",
"Name": "CLUSTER|FLUSHALL",
"IsInternal": true,
"Arity": 2,
"Flags": "Admin, NoScript, NoMulti",
"FirstKey": 0,
"LastKey": 0,
"Step": 0,
"AclCategories": "Admin, Dangerous, Slow, Garnet",
"KeySpecifications": null,
"SubCommands": null
},
{
"Command": "CLUSTER_FAILREPLICATIONOFFSET",
"Name": "CLUSTER|FAILREPLICATIONOFFSET",
Expand Down
2 changes: 2 additions & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class SupportedCommand
[
new("ACL|CAT", RespCommand.ACL_CAT),
new("ACL|DELUSER", RespCommand.ACL_DELUSER),
new("ACL|GETUSER", RespCommand.ACL_GETUSER),
new("ACL|LIST", RespCommand.ACL_LIST),
new("ACL|LOAD", RespCommand.ACL_LOAD),
new("ACL|SAVE", RespCommand.ACL_SAVE),
Expand Down Expand Up @@ -71,6 +72,7 @@ public class SupportedCommand
new("CLUSTER|FAILOVER", RespCommand.CLUSTER_FAILOVER),
new("CLUSTER|FAILREPLICATIONOFFSET", RespCommand.CLUSTER_FAILREPLICATIONOFFSET),
new("CLUSTER|FAILSTOPWRITES", RespCommand.CLUSTER_FAILSTOPWRITES),
new("CLUSTER|FLUSHALL", RespCommand.CLUSTER_FLUSHALL),
new("CLUSTER|FORGET", RespCommand.CLUSTER_FORGET),
new("CLUSTER|GETKEYSINSLOT", RespCommand.CLUSTER_GETKEYSINSLOT),
new("CLUSTER|GOSSIP", RespCommand.CLUSTER_GOSSIP),
Expand Down
Loading