Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
e1db784
Simpler state machine for checkpointing
badrishc Feb 24, 2025
49b9c16
cleanup
badrishc Feb 24, 2025
d111602
updates
badrishc Feb 25, 2025
9f838f7
remove dead code
badrishc Feb 25, 2025
c907987
updates
badrishc Feb 26, 2025
e715503
updates
badrishc Feb 26, 2025
7529ade
updates
badrishc Feb 26, 2025
6da12ce
update
badrishc Feb 27, 2025
e24720c
kill code
badrishc Feb 27, 2025
1a8ba12
updates
badrishc Feb 27, 2025
a1b87a9
simplify LightEpoch
badrishc Feb 27, 2025
69c07e9
move epvs to test
badrishc Feb 27, 2025
6ec5233
nits
badrishc Feb 27, 2025
9599202
updates
badrishc Feb 27, 2025
a24e7e6
updates
badrishc Feb 28, 2025
223abf6
formatting
badrishc Feb 28, 2025
4d17abb
fix garnet
badrishc Feb 28, 2025
cc4e677
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Feb 28, 2025
2b830b1
nit
badrishc Feb 28, 2025
d92360a
comments
badrishc Feb 28, 2025
8145607
remove manualLockingActive
badrishc Feb 28, 2025
4ba164c
update the barrier condition and remove checkpoint version switch bar…
badrishc Feb 28, 2025
aa36bf4
remove INTERMEDIATE state
badrishc Mar 1, 2025
6119e97
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Mar 1, 2025
528c227
Remove CPR_SHIFT_DETECTED and LartchDestination.Retry
badrishc Mar 1, 2025
2067cb9
add black box test for checkpointing version switch state machine
badrishc Mar 3, 2025
174ae9a
add transaction test
badrishc Mar 3, 2025
8f72d00
clean the test
badrishc Mar 3, 2025
d304ecd
cleanup
badrishc Mar 3, 2025
36a5e18
Refactor the phases of various machines
badrishc Mar 4, 2025
0c68988
format
badrishc Mar 4, 2025
b4763c9
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Mar 4, 2025
efd8bd2
initial commit
badrishc Mar 4, 2025
511fe75
remove sessionName
badrishc Mar 6, 2025
8ed6101
update LightEpoch based on PR comment
badrishc Mar 6, 2025
17f04ac
fix break
badrishc Mar 6, 2025
6dfc383
Use session-local isAcquiredLockable as signal for threads to decide …
badrishc Mar 6, 2025
f1ffaec
address review comments
badrishc Mar 6, 2025
3e3a377
nit
badrishc Mar 6, 2025
7c40678
Merge from base
badrishc Mar 6, 2025
cdd91f9
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Mar 7, 2025
aa3113c
Merge branch 'badrishc/state-machine-v2' into badrishc/two-store-chec…
badrishc Mar 7, 2025
1c49869
minor code move
badrishc Mar 7, 2025
443b8ab
use shared epoch across stores
badrishc Mar 7, 2025
3071098
nit
badrishc Mar 7, 2025
aa4b628
Merge from main
badrishc Mar 7, 2025
f8a1899
add unified checkpointing logic to garnet
badrishc Mar 8, 2025
9930388
Merge remote-tracking branch 'origin/main' into badrishc/two-store-ch…
badrishc Mar 8, 2025
b6df00c
nit
badrishc Mar 8, 2025
5b4c646
fix
badrishc Mar 8, 2025
3e8c2c7
use correct SMD
badrishc Mar 8, 2025
576f92d
nit
badrishc Mar 8, 2025
76430c1
fix
badrishc Mar 8, 2025
7b7bec5
nit
badrishc Mar 8, 2025
bbad4f0
fix test as versions are different due to unified ckpt
badrishc Mar 8, 2025
af870fb
add comment
badrishc Mar 8, 2025
2bccee7
remove targetVersion from checkpoint API, versions always progress by 1.
badrishc Mar 9, 2025
cde0a07
non-working state
badrishc Mar 10, 2025
4f6f529
updates
badrishc Mar 10, 2025
135adde
nits
badrishc Mar 10, 2025
e563edf
fix state machine
badrishc Mar 10, 2025
eb3ffe3
fixes
badrishc Mar 10, 2025
1d573ce
fixes
badrishc Mar 10, 2025
7cacb54
fix
badrishc Mar 10, 2025
d105638
format
badrishc Mar 10, 2025
7dfadda
Merge remote-tracking branch 'origin/main' into badrishc/two-store-ch…
badrishc Mar 10, 2025
5afca31
add assert for safe index growth with locks
badrishc Mar 10, 2025
319fa96
Reinstating cpr_shift_detected instead of barrier'ing threads on PREP…
badrishc Mar 11, 2025
dd16582
fixes based on fuzz test
badrishc Mar 11, 2025
f001933
Merge remote-tracking branch 'origin/main' into badrishc/two-store-ch…
badrishc Mar 11, 2025
a9ae134
improve SMD test to use timing fuzzing
badrishc Mar 11, 2025
f40086d
add test for index grow, fix bugs
badrishc Mar 11, 2025
de1b28e
updates and clean up, improve the test to be multi-iteration.
badrishc Mar 11, 2025
2498616
fix test
badrishc Mar 11, 2025
53df55e
remove isAsync, clarify comment
badrishc Mar 12, 2025
a4934ad
sigh, fix format.
badrishc Mar 12, 2025
037daf7
nit
badrishc Mar 12, 2025
b526e53
Update cluster logic to handle { checkpoint_start, (v) and (v+1) reco…
badrishc Mar 12, 2025
de60cb2
update comment
badrishc Mar 12, 2025
da4a451
Merge with latest main
badrishc Mar 12, 2025
3941303
address comment
badrishc Mar 13, 2025
e9136c7
nit
badrishc Mar 13, 2025
5b65d5c
add comment
badrishc Mar 13, 2025
0b1f42c
fix comment
badrishc Mar 13, 2025
ece2ad1
Merge remote-tracking branch 'origin/main' into badrishc/two-store-ch…
badrishc Mar 13, 2025
3634644
delete checkpoints only after both stores commit
badrishc Mar 14, 2025
239a183
recover to least common version.
badrishc Mar 14, 2025
594a743
handle the case where object store was disabled at checkpointing time
badrishc Mar 14, 2025
7048016
Merge remote-tracking branch 'origin/main' into badrishc/two-store-ch…
badrishc Mar 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,7 @@ public ClusterConfig BumpLocalNodeConfigEpoch()
/// Check if sender has same local worker epoch as the receiver node and resolve collision.
/// </summary>
/// <param name="senderConfig">Incoming configuration object.</param>
/// <param name="logger"></param>
/// <returns>ClusterConfig object with updates.</returns>
public ClusterConfig HandleConfigEpochCollision(ClusterConfig senderConfig, ILogger logger = null)
{
Expand Down
8 changes: 7 additions & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ public void OnCheckpointInitiated(out long CheckpointCoveredAofAddress)
{
Debug.Assert(serverOptions.EnableCluster);
if (serverOptions.EnableAOF && clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
CheckpointCoveredAofAddress = replicationManager.ReplicationOffset;
{
// When the replica takes a checkpoint on encountering the checkpoint end marker, it needs to truncate the AOF only
// until the checkpoint start marker. Otherwise, we will be left with an AOF that starts at the checkpoint end marker.
// ReplicationCheckpointStartOffset is set by { ReplicaReplayTask.Consume -> AofProcessor.ProcessAofRecordInternal } when
// it encounters the checkpoint start marker.
CheckpointCoveredAofAddress = replicationManager.ReplicationCheckpointStartOffset;
}
else
CheckpointCoveredAofAddress = storeWrapper.appendOnlyFile.TailAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,6 @@ public void OnStop(bool completed, long numberOfRecords, bool isMainStore, long
// Wait for flush and response to complete
replicationSyncManager.WaitForFlush().GetAwaiter().GetResult();

// Enqueue commit end marker
var entryType = isMainStore ? AofEntryType.MainStoreStreamingCheckpointEndCommit : AofEntryType.ObjectStoreStreamingCheckpointEndCommit;
replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(entryType, targetVersion);

logger?.LogTrace("{OnStop} {store} {numberOfRecords} {targetVersion}",
nameof(OnStop), isMainStore ? "MAIN STORE" : "OBJECT STORE", numberOfRecords, targetVersion);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress,
var payloadLength = storeWrapper.appendOnlyFile.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true);
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true, out var isCheckpointStart);
// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
// point when we take a checkpoint at the checkpoint end marker
if (isCheckpointStart)
ReplicationCheckpointStartOffset = ReplicationOffset;
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ internal sealed class ReplicationLogCheckpointManager(
public string RecoveredReplicationId = string.Empty;

readonly bool isMainStore = isMainStore;
public Action<bool, long, long> checkpointVersionShift;
public Action<bool, long, long, bool> checkpointVersionShiftStart;
public Action<bool, long, long, bool> checkpointVersionShiftEnd;

readonly bool safelyRemoveOutdated = removeOutdated;

public override void CheckpointVersionShift(long oldVersion, long newVersion)
{
checkpointVersionShift?.Invoke(isMainStore, oldVersion, newVersion);
}
public override void CheckpointVersionShiftStart(long oldVersion, long newVersion, bool isStreaming)
=> checkpointVersionShiftStart?.Invoke(isMainStore, oldVersion, newVersion, isStreaming);

public override void CheckpointVersionShiftEnd(long oldVersion, long newVersion, bool isStreaming)
=> checkpointVersionShiftEnd?.Invoke(isMainStore, oldVersion, newVersion, isStreaming);

public void DeleteLogCheckpoint(Guid logToken)
=> deviceFactory.Delete(checkpointNamingScheme.LogCheckpointBase(logToken));
Expand Down
54 changes: 47 additions & 7 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public long ReplicationOffset
set { replicationOffset = value; }
}

/// <summary>
/// Replication offset corresponding to the checkpoint start marker. We will truncate only to this point after taking a checkpoint (the checkpoint
/// is taken only when we encounter a checkpoint end marker).
/// </summary>
public long ReplicationCheckpointStartOffset;

/// <summary>
/// Replication offset until which AOF address is valid for old primary if failover has occurred
/// </summary>
Expand Down Expand Up @@ -112,9 +118,13 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
ReplicationOffset = 0;

// Set the appendOnlyFile field for all stores
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).checkpointVersionShift = CheckpointVersionShift;
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).checkpointVersionShiftStart = CheckpointVersionShiftStart;
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).checkpointVersionShiftEnd = CheckpointVersionShiftEnd;
if (storeWrapper.objectStore != null)
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShift = CheckpointVersionShift;
{
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShiftStart = CheckpointVersionShiftStart;
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShiftEnd = CheckpointVersionShiftEnd;
}

// If this node starts as replica, it cannot serve requests until it is connected to primary
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA && clusterProvider.serverOptions.Recover && !StartRecovery(RecoveryStatus.InitializeRecover))
Expand Down Expand Up @@ -157,14 +167,44 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null

public string GetBufferPoolStats() => networkPool.GetStats();

void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion)
void CheckpointVersionShiftStart(bool isMainStore, long oldVersion, long newVersion, bool isStreaming)
{
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
return;

if (isStreaming)
{
if (isMainStore)
storeWrapper.EnqueueCommit(AofEntryType.MainStoreStreamingCheckpointStartCommit, newVersion);
else
storeWrapper.EnqueueCommit(AofEntryType.ObjectStoreStreamingCheckpointStartCommit, newVersion);
}
else
{
// We enqueue a single checkpoint start marker, since we have unified checkpointing
if (isMainStore)
storeWrapper.EnqueueCommit(AofEntryType.CheckpointStartCommit, newVersion);
}
}

void CheckpointVersionShiftEnd(bool isMainStore, long oldVersion, long newVersion, bool isStreaming)
{
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
return;
var entryType = clusterProvider.serverOptions.ReplicaDisklessSync ?
(isMainStore ? AofEntryType.MainStoreStreamingCheckpointStartCommit : AofEntryType.ObjectStoreStreamingCheckpointStartCommit) :
(isMainStore ? AofEntryType.MainStoreCheckpointStartCommit : AofEntryType.ObjectStoreCheckpointStartCommit);
storeWrapper.EnqueueCommit(entryType, newVersion);

if (isStreaming)
{
if (isMainStore)
storeWrapper.EnqueueCommit(AofEntryType.MainStoreStreamingCheckpointEndCommit, newVersion);
else
storeWrapper.EnqueueCommit(AofEntryType.ObjectStoreStreamingCheckpointEndCommit, newVersion);
}
else
{
// We enqueue a single checkpoint end marker, since we have unified checkpointing
if (isMainStore)
storeWrapper.EnqueueCommit(AofEntryType.CheckpointEndCommit, newVersion);
}
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ private void InitializeServer()
if (!setMax && !ThreadPool.SetMaxThreads(maxThreads, maxCPThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}");

opts.Initialize(loggerFactory);
CreateMainStore(clusterFactory, out var checkpointDir);
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);

Expand Down Expand Up @@ -324,7 +325,7 @@ private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandMana
objectStoreSizeTracker = null;
if (!opts.DisableObjects)
{
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
objKvSettings = opts.GetObjectStoreSettings(loggerFactory,
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);

// Run checkpoint on its own thread to control p99
Expand Down
36 changes: 20 additions & 16 deletions libs/server/AOF/AofEntryType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,34 @@ public enum AofEntryType : byte
/// </summary>
TxnAbort = 0x22,
/// <summary>
/// Checkpoint for main store start
/// Checkpoint start marker for unified checkpoint
/// </summary>
MainStoreCheckpointStartCommit = 0x30,
CheckpointStartCommit = 0x30,
/// <summary>
/// Checkpoint for object store start
/// Checkpoint end marker for unified checkpoint
/// </summary>
ObjectStoreCheckpointStartCommit = 0x31,
/// <summary>
/// Checkpoint for main store end
/// </summary>
MainStoreCheckpointEndCommit = 0x32,
CheckpointEndCommit = 0x32,
/// <summary>
/// Checkpoint for object store end
/// </summary>
ObjectStoreCheckpointEndCommit = 0x33,
/// <summary>
/// Streaming checkpoint for main store start
/// Streaming checkpoint start marker for main store
/// </summary>
MainStoreStreamingCheckpointStartCommit = 0x40,
/// <summary>
/// Streaming checkpoint for object store start
/// Streaming checkpoint start marker for object store
/// </summary>
ObjectStoreStreamingCheckpointStartCommit = 0x41,
/// <summary>
/// Streaming checkpoint for main store end
/// Streaming checkpoint end marker for main store
/// </summary>
MainStoreStreamingCheckpointEndCommit = 0x42,
/// <summary>
/// Streaming checkpoint for object store end
/// Streaming checkpoint end marker for object store
/// </summary>
ObjectStoreStreamingCheckpointEndCommit = 0x43,
/// <summary>
/// StoredProcedure
/// </summary>
StoredProcedure = 0x50,

/// <summary>
/// Flush all
/// </summary>
Expand All @@ -85,6 +78,17 @@ public enum AofEntryType : byte
/// Flush db
/// </summary>
FlushDb = 0x61,

#region Deprecated markers
/// <summary>
/// Deprecated with unified checkpointing: Checkpoint for object store start
/// </summary>
ObjectStoreCheckpointStartCommit = 0x31,
/// <summary>
/// Deprecated with unified checkpointing: Checkpoint for object store end
/// </summary>
ObjectStoreCheckpointEndCommit = 0x33,
#endregion
}

internal enum AofStoreType : byte
Expand Down
2 changes: 1 addition & 1 deletion libs/server/AOF/AofHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct AofHeader
// * Layout, size, contents of this struct
// * Any of the AofEntryType or AofStoreType enums' existing value mappings
// * SpanByte format or header
const byte AofHeaderVersion = 1;
const byte AofHeaderVersion = 2;

/// <summary>
/// Version of AOF
Expand Down
Loading