Skip to content

Commit 4d3316a

Browse files
authored
Diskless Sync Updates (#1329)
* add semaphore slim to WaitForSyncCompletion in diskless replication * use TimeSpan type for ReplicaSyncTimeout and ReplicaAttachTimeout * make migrateSlot async * propagate cancellation token in scan * add more descriptive message for unit test assertion * add more logging and fix bug at object store SingleReader
1 parent abc1710 commit 4d3316a

File tree

15 files changed

+76
-60
lines changed

15 files changed

+76
-60
lines changed

libs/cluster/Server/Migration/MigrateOperation.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ internal sealed partial class MigrateOperation
2424

2525
public GarnetClientSession Client => gcs;
2626

27+
public void ThrowIfCancelled() => session._cts.Token.ThrowIfCancellationRequested();
28+
2729
public bool Contains(int slot) => session._sslots.Contains(slot);
2830

2931
public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchSize = 1 << 18)

libs/cluster/Server/Migration/MigrateScanFunctions.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ internal sealed unsafe partial class MigrateSession
1313
#region mainStoreScan
1414
internal sealed unsafe class MainStoreScan : IScanIteratorFunctions<SpanByte, SpanByte>
1515
{
16-
readonly MigrateSession.MigrateOperation mss;
16+
readonly MigrateOperation mss;
1717

18-
internal MainStoreScan(MigrateSession.MigrateOperation mss)
18+
internal MainStoreScan(MigrateOperation mss)
1919
{
2020
this.mss = mss;
2121
}
@@ -30,12 +30,14 @@ public unsafe bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMeta
3030
{
3131
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
3232

33+
mss.ThrowIfCancelled();
34+
3335
// Do not send key if it is expired
3436
if (ClusterSession.Expired(ref value))
3537
return true;
3638

3739
var s = HashSlotUtils.HashSlot(ref key);
38-
// Check if key belongs to slot that is being migrated
40+
// Check if key belongs to slot that is being migrated and if it can be added to our buffer
3941
if (mss.Contains(s) && !mss.sketch.TryHashAndStore(key.AsSpan()))
4042
return false;
4143

@@ -50,9 +52,9 @@ public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadat
5052
#region objectStoreScan
5153
internal sealed unsafe class ObjectStoreScan : IScanIteratorFunctions<byte[], IGarnetObject>
5254
{
53-
readonly MigrateSession.MigrateOperation mss;
55+
readonly MigrateOperation mss;
5456

55-
internal ObjectStoreScan(MigrateSession.MigrateOperation mss)
57+
internal ObjectStoreScan(MigrateOperation mss)
5658
{
5759
this.mss = mss;
5860
}
@@ -70,13 +72,15 @@ public unsafe bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordM
7072
{
7173
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
7274

75+
mss.ThrowIfCancelled();
76+
7377
// Do not send key if it is expired
7478
if (ClusterSession.Expired(ref value))
7579
return true;
7680

7781
var s = HashSlotUtils.HashSlot(key);
78-
// Check if key belongs to slot that is being migrated
79-
if (mss.Contains(s) && mss.sketch.TryHashAndStore(key.AsSpan()))
82+
// Check if key belongs to slot that is being migrated and if it can be added to our buffer
83+
if (mss.Contains(s) && !mss.sketch.TryHashAndStore(key.AsSpan()))
8084
return false;
8185

8286
return true;

libs/cluster/Server/Migration/MigrateSessionSlots.cs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@
1111

1212
namespace Garnet.cluster
1313
{
14-
internal sealed unsafe partial class MigrateSession : IDisposable
14+
internal sealed partial class MigrateSession : IDisposable
1515
{
1616
/// <summary>
1717
/// Migrate Slots inline driver
1818
/// </summary>
1919
/// <returns></returns>
20-
public bool MigrateSlotsDriverInline()
20+
public async Task<bool> MigrateSlotsDriverInline()
2121
{
2222
var storeBeginAddress = clusterProvider.storeWrapper.store.Log.BeginAddress;
2323
var storeTailAddress = clusterProvider.storeWrapper.store.Log.TailAddress;
@@ -29,20 +29,24 @@ public bool MigrateSlotsDriverInline()
2929
#endif
3030

3131
// Send main store
32-
CreateAndRunMigrateTasks(StoreType.Main, storeBeginAddress, storeTailAddress, mainStorePageSize);
32+
logger?.LogWarning("Store migrate scan range [{storeBeginAddress}, {storeTailAddress}]", storeBeginAddress, storeTailAddress);
33+
var success = await CreateAndRunMigrateTasks(StoreType.Main, storeBeginAddress, storeTailAddress, mainStorePageSize);
34+
if (!success) return false;
3335

3436
// Send object store
3537
if (!clusterProvider.serverOptions.DisableObjects)
3638
{
3739
var objectStoreBeginAddress = clusterProvider.storeWrapper.objectStore.Log.BeginAddress;
3840
var objectStoreTailAddress = clusterProvider.storeWrapper.objectStore.Log.TailAddress;
3941
var objectStorePageSize = 1 << clusterProvider.serverOptions.ObjectStorePageSizeBits();
40-
CreateAndRunMigrateTasks(StoreType.Object, objectStoreBeginAddress, objectStoreTailAddress, objectStorePageSize);
42+
logger?.LogWarning("Object Store migrate scan range [{objectStoreBeginAddress}, {objectStoreTailAddress}]", objectStoreBeginAddress, objectStoreTailAddress);
43+
success = await CreateAndRunMigrateTasks(StoreType.Object, objectStoreBeginAddress, objectStoreTailAddress, objectStorePageSize);
44+
if (!success) return false;
4145
}
4246

4347
return true;
4448

45-
void CreateAndRunMigrateTasks(StoreType storeType, long beginAddress, long tailAddress, int pageSize)
49+
async Task<bool> CreateAndRunMigrateTasks(StoreType storeType, long beginAddress, long tailAddress, int pageSize)
4650
{
4751
logger?.LogTrace("{method} > [{storeType}] Scan in range ({BeginAddress},{TailAddress})", nameof(CreateAndRunMigrateTasks), storeType, beginAddress, tailAddress);
4852
var migrateOperationRunners = new Task[clusterProvider.serverOptions.ParallelMigrateTaskCount];
@@ -54,7 +58,17 @@ void CreateAndRunMigrateTasks(StoreType storeType, long beginAddress, long tailA
5458
i++;
5559
}
5660

57-
Task.WaitAll(migrateOperationRunners, _cts.Token);
61+
try
62+
{
63+
await Task.WhenAll(migrateOperationRunners).WaitAsync(_timeout, _cts.Token).ConfigureAwait(false);
64+
}
65+
catch (Exception ex)
66+
{
67+
logger?.LogError(ex, "{CreateAndRunMigrateTasks}: {storeType} {beginAddress} {tailAddress} {pageSize}", nameof(CreateAndRunMigrateTasks), storeType, beginAddress, tailAddress, pageSize);
68+
_cts.Cancel();
69+
return false;
70+
}
71+
return true;
5872
}
5973

6074
Task<bool> ScanStoreTask(int taskId, StoreType storeType, long beginAddress, long tailAddress, int pageSize)
@@ -70,6 +84,7 @@ Task<bool> ScanStoreTask(int taskId, StoreType storeType, long beginAddress, lon
7084
return Task.FromResult(false);
7185

7286
var cursor = workerStartAddress;
87+
logger?.LogWarning("<{StoreType}:{taskId}> migrate scan range [{workerStartAddress}, {workerEndAddress}]", storeType, taskId, workerStartAddress, workerEndAddress);
7388
while (true)
7489
{
7590
var current = cursor;
@@ -80,8 +95,7 @@ Task<bool> ScanStoreTask(int taskId, StoreType storeType, long beginAddress, lon
8095
// Stop if no keys have been found
8196
if (migrateOperation.sketch.argSliceVector.IsEmpty) break;
8297

83-
var currentEnd = current;
84-
logger?.LogTrace("[{taskId}> Scan from {cursor} to {current} and discovered {count} keys",
98+
logger?.LogWarning("[{taskId}> Scan from {cursor} to {current} and discovered {count} keys",
8599
taskId, cursor, current, migrateOperation.sketch.argSliceVector.Count);
86100

87101
// Transition EPSM to MIGRATING
@@ -98,6 +112,7 @@ Task<bool> ScanStoreTask(int taskId, StoreType storeType, long beginAddress, lon
98112
// Deleting keys (Currently gathering keys from push-scan and deleting them outside)
99113
migrateOperation.DeleteKeys();
100114

115+
// Clear keys from buffer
101116
migrateOperation.sketch.Clear();
102117
cursor = current;
103118
}

libs/cluster/Server/Migration/MigrationDriver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private async Task BeginAsyncMigrationTask()
8080

8181
#region migrateData
8282
// Migrate actual data
83-
if (!MigrateSlotsDriverInline())
83+
if (!await MigrateSlotsDriverInline())
8484
{
8585
logger?.LogError("MigrateSlotsDriver failed");
8686
TryRecoverFromFailure();

libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicaSyncSession.cs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT license.
33

44
using System;
5+
using System.Diagnostics;
56
using System.Threading.Tasks;
67
using Garnet.common;
78
using Microsoft.Extensions.Logging;
@@ -130,8 +131,17 @@ public void SendAndResetIterationBuffer()
130131
public void SetStatus(SyncStatus status, string error = null)
131132
{
132133
ssInfo.error = error;
133-
// NOTE: set this last to signal state change
134+
// NOTE: set this after error to signal complete state change
134135
ssInfo.syncStatus = status;
136+
137+
// Signal Release for WaitForSyncCompletion call
138+
switch (status)
139+
{
140+
case SyncStatus.SUCCESS:
141+
case SyncStatus.FAILED:
142+
signalCompletion.Release();
143+
break;
144+
}
135145
}
136146

137147
/// <summary>
@@ -151,7 +161,7 @@ public void SetFlushTask(Task<string> task)
151161
return false;
152162
}
153163
return true;
154-
}, TaskContinuationOptions.OnlyOnRanToCompletion).WaitAsync(replicaSyncTimeout, token);
164+
}, TaskContinuationOptions.OnlyOnRanToCompletion).WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, token);
155165
}
156166
}
157167

@@ -181,11 +191,8 @@ public async Task WaitForSyncCompletion()
181191
{
182192
try
183193
{
184-
while (ssInfo.syncStatus is not SyncStatus.SUCCESS and not SyncStatus.FAILED)
185-
{
186-
token.ThrowIfCancellationRequested();
187-
await Task.Yield();
188-
}
194+
await signalCompletion.WaitAsync(token);
195+
Debug.Assert(ssInfo.syncStatus is SyncStatus.SUCCESS or SyncStatus.FAILED);
189196
}
190197
catch (Exception ex)
191198
{

libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSyncManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public ReplicationSyncManager(ClusterProvider clusterProvider, ILogger logger =
3232
this.logger = logger;
3333

3434
var opts = clusterProvider.serverOptions;
35-
replicaSyncTimeout = opts.ReplicaSyncTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(opts.ReplicaSyncTimeout);
35+
replicaSyncTimeout = opts.ReplicaSyncTimeout;
3636
cts = new();
3737
}
3838

@@ -87,7 +87,7 @@ public async Task WaitForFlush()
8787
/// <returns></returns>
8888
public bool AddReplicaSyncSession(SyncMetadata replicaSyncMetadata, out ReplicaSyncSession replicaSyncSession)
8989
{
90-
replicaSyncSession = new ReplicaSyncSession(ClusterProvider.storeWrapper, ClusterProvider, replicaSyncMetadata, replicaSyncTimeout, cts.Token, logger: logger);
90+
replicaSyncSession = new ReplicaSyncSession(ClusterProvider.storeWrapper, ClusterProvider, replicaSyncMetadata, cts.Token, logger: logger);
9191
replicaSyncSession.SetStatus(SyncStatus.INITIALIZING);
9292
try
9393
{

libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ internal sealed partial class ReplicaSyncSession(
1818
StoreWrapper storeWrapper,
1919
ClusterProvider clusterProvider,
2020
SyncMetadata replicaSyncMetadata = null,
21-
TimeSpan replicaSyncTimeout = default,
2221
CancellationToken token = default,
2322
string replicaNodeId = null,
2423
string replicaAssignedPrimaryId = null,
@@ -30,11 +29,10 @@ internal sealed partial class ReplicaSyncSession(
3029
readonly StoreWrapper storeWrapper = storeWrapper;
3130
readonly ClusterProvider clusterProvider = clusterProvider;
3231
public readonly SyncMetadata replicaSyncMetadata = replicaSyncMetadata;
33-
readonly TimeSpan replicaSyncTimeout = replicaSyncTimeout;
3432
readonly CancellationToken token = token;
3533
readonly CancellationTokenSource cts = new();
3634
SectorAlignedBufferPool bufferPool = null;
37-
readonly SemaphoreSlim semaphore = new(0);
35+
readonly SemaphoreSlim signalCompletion = new(0);
3836

3937
public readonly string replicaNodeId = replicaNodeId;
4038
public readonly string replicaAssignedPrimaryId = replicaAssignedPrimaryId;
@@ -55,7 +53,7 @@ public void Dispose()
5553
AofSyncTask = null;
5654
cts.Cancel();
5755
cts.Dispose();
58-
semaphore?.Dispose();
56+
signalCompletion?.Dispose();
5957
bufferPool?.Free();
6058
}
6159

@@ -135,7 +133,7 @@ public async Task<bool> SendCheckpoint()
135133
(localEntry, aofSyncTaskInfo) = await AcquireCheckpointEntry();
136134
logger?.LogInformation("Checkpoint search completed");
137135

138-
gcs.Connect((int)replicaSyncTimeout.TotalMilliseconds);
136+
gcs.Connect((int)storeWrapper.serverOptions.ReplicaSyncTimeout.TotalMilliseconds);
139137

140138
long index_size = -1;
141139
long obj_index_size = -1;
@@ -288,7 +286,7 @@ public async Task<bool> SendCheckpoint()
288286
clusterProvider.replicationManager.PrimaryReplId,
289287
localEntry.ToByteArray(),
290288
beginAddress,
291-
checkpointAofBeginAddress).WaitAsync(replicaSyncTimeout, cts.Token).ConfigureAwait(false);
289+
checkpointAofBeginAddress).WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, cts.Token).ConfigureAwait(false);
292290
var syncFromAofAddress = long.Parse(resp);
293291

294292
// Assert that AOF address the replica will be requesting can be served, except in case of:
@@ -455,7 +453,7 @@ private async Task SendCheckpointMetadata(GarnetClientSession gcs, GarnetCluster
455453
}
456454
}
457455

458-
var resp = await gcs.ExecuteSendCkptMetadata(fileToken.ToByteArray(), (int)fileType, checkpointMetadata).WaitAsync(replicaSyncTimeout, cts.Token).ConfigureAwait(false);
456+
var resp = await gcs.ExecuteSendCkptMetadata(fileToken.ToByteArray(), (int)fileType, checkpointMetadata).WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, cts.Token).ConfigureAwait(false);
459457
if (!resp.Equals("OK"))
460458
{
461459
logger?.LogError("Primary error at SendCheckpointMetadata {resp}", resp);
@@ -495,7 +493,7 @@ private async Task SendFileSegments(GarnetClientSession gcs, Guid token, Checkpo
495493
(int)(endAddress - startAddress);
496494
var (pbuffer, readBytes) = await ReadInto(device, (ulong)startAddress, num_bytes).ConfigureAwait(false);
497495

498-
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.GetSlice(readBytes)).WaitAsync(replicaSyncTimeout, cts.Token).ConfigureAwait(false);
496+
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.GetSlice(readBytes)).WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, cts.Token).ConfigureAwait(false);
499497
if (!resp.Equals("OK"))
500498
{
501499
logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp);
@@ -506,7 +504,7 @@ private async Task SendFileSegments(GarnetClientSession gcs, Guid token, Checkpo
506504
}
507505

508506
// Send last empty package to indicate end of transmission and let replica dispose IDevice
509-
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, []).WaitAsync(replicaSyncTimeout, cts.Token).ConfigureAwait(false);
507+
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, []).WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, cts.Token).ConfigureAwait(false);
510508
if (!resp.Equals("OK"))
511509
{
512510
logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp);
@@ -541,7 +539,7 @@ private async Task SendObjectFiles(GarnetClientSession gcs, Guid token, Checkpoi
541539
var (pbuffer, readBytes) = await ReadInto(device, (ulong)startAddress, num_bytes, segment).ConfigureAwait(false);
542540

543541
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.GetSlice(readBytes), segment).
544-
WaitAsync(replicaSyncTimeout, cts.Token).ConfigureAwait(false);
542+
WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, cts.Token).ConfigureAwait(false);
545543
if (!resp.Equals("OK"))
546544
{
547545
logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp);
@@ -552,7 +550,7 @@ private async Task SendObjectFiles(GarnetClientSession gcs, Guid token, Checkpoi
552550
startAddress += readBytes;
553551
}
554552

555-
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, 0L, []).WaitAsync(replicaSyncTimeout, cts.Token).ConfigureAwait(false);
553+
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, 0L, []).WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, cts.Token).ConfigureAwait(false);
556554
if (!resp.Equals("OK"))
557555
{
558556
logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp);
@@ -590,7 +588,7 @@ private async Task SendObjectFiles(GarnetClientSession gcs, Guid token, Checkpoi
590588
else
591589
device.ReadAsync(segmentId, address, (IntPtr)pbuffer.aligned_pointer, (uint)numBytesToRead, IOCallback, null);
592590
}
593-
await semaphore.WaitAsync(replicaSyncTimeout, cts.Token).ConfigureAwait(false);
591+
await signalCompletion.WaitAsync(storeWrapper.serverOptions.ReplicaSyncTimeout, cts.Token).ConfigureAwait(false);
594592
return (pbuffer, (int)numBytesToRead);
595593
}
596594

@@ -601,7 +599,7 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context)
601599
var errorMessage = Tsavorite.core.Utility.GetCallbackErrorMessage(errorCode, numBytes, context);
602600
logger?.LogError("[ReplicaSyncSession] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage);
603601
}
604-
semaphore.Release();
602+
signalCompletion.Release();
605603
}
606604
}
607605

libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSessionTaskStore.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Licensed under the MIT license.
33

44
using System;
5-
using System.Threading;
65
using Garnet.common;
76
using Garnet.server;
87
using Microsoft.Extensions.Logging;
@@ -121,8 +120,7 @@ public bool TryAddReplicaSyncSession(ReplicaSyncSession session)
121120
/// <returns></returns>
122121
public bool TryAddReplicaSyncSession(string replicaNodeId, string replicaAssignedPrimaryId, CheckpointEntry replicaCheckpointEntry, long replicaAofBeginAddress, long replicaAofTailAddress)
123122
{
124-
var replicaSyncTimeout = clusterProvider.serverOptions.ReplicaSyncTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(clusterProvider.serverOptions.ReplicaSyncTimeout);
125-
var retSession = new ReplicaSyncSession(storeWrapper, clusterProvider, replicaSyncMetadata: null, replicaSyncTimeout: replicaSyncTimeout, token: default, replicaNodeId, replicaAssignedPrimaryId, replicaCheckpointEntry, replicaAofBeginAddress, replicaAofTailAddress, logger);
123+
var retSession = new ReplicaSyncSession(storeWrapper, clusterProvider, replicaSyncMetadata: null, token: default, replicaNodeId, replicaAssignedPrimaryId, replicaCheckpointEntry, replicaAofBeginAddress, replicaAofTailAddress, logger);
126124
var success = false;
127125
try
128126
{

0 commit comments

Comments
 (0)