Skip to content

Commit 40e0635

Browse files
authored
Reset state machine on exception during operation (#1497)
* Fix exception paths in state machine driver * update version * improvement * make last version stuff private fields * refactor
1 parent 52e8b3d commit 40e0635

File tree

4 files changed

+80
-26
lines changed

4 files changed

+80
-26
lines changed

Version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<!-- VersionPrefix property for builds and packages -->
33
<PropertyGroup>
4-
<VersionPrefix>1.0.91</VersionPrefix>
4+
<VersionPrefix>1.0.92</VersionPrefix>
55
</PropertyGroup>
66
</Project>

libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSMTask.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ public virtual void GlobalBeforeEnteringState(SystemState next, StateMachineDriv
4747
store.CheckpointVersionShiftEnd(lastVersion, next.Version, isStreaming);
4848

4949
Debug.Assert(stateMachineDriver.GetNumActiveTransactions(lastVersion) == 0, $"Active transactions in last version: {stateMachineDriver.GetNumActiveTransactions(lastVersion)}");
50-
stateMachineDriver.lastVersionTransactionsDone = null;
51-
stateMachineDriver.lastVersion = 0;
50+
stateMachineDriver.ResetLastVersion();
5251
// Grab final logical address (end of fuzzy region)
5352
store._hybridLogCheckpoint.info.finalLogicalAddress = store.hlogBase.GetTailAddress();
5453

@@ -93,13 +92,7 @@ public virtual void GlobalAfterEnteringState(SystemState next, StateMachineDrive
9392
case Phase.IN_PROGRESS:
9493
// State machine should wait for active transactions in the last version to complete (drain out).
9594
// Note that we allow new transactions to process in parallel.
96-
if (stateMachineDriver.GetNumActiveTransactions(lastVersion) > 0)
97-
{
98-
stateMachineDriver.lastVersion = lastVersion;
99-
stateMachineDriver.lastVersionTransactionsDone = new(0);
100-
}
101-
if (stateMachineDriver.GetNumActiveTransactions(lastVersion) > 0)
102-
stateMachineDriver.AddToWaitingList(stateMachineDriver.lastVersionTransactionsDone);
95+
stateMachineDriver.TrackLastVersion(lastVersion);
10396
break;
10497
}
10598
}

libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSMTask.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ public void GlobalBeforeEnteringState(SystemState next, StateMachineDriver state
3333
// Verify full transaction barrier
3434
Debug.Assert(stateMachineDriver.GetNumActiveTransactions(lastVersion) == 0);
3535
Debug.Assert(stateMachineDriver.GetNumActiveTransactions(next.Version) == 0);
36-
stateMachineDriver.lastVersionTransactionsDone = null;
37-
stateMachineDriver.lastVersion = 0;
36+
stateMachineDriver.ResetLastVersion();
3837

3938
// Set up the transition to new version of HT
4039
var numChunks = (int)(store.state[store.resizeInfo.version].size / Constants.kSizeofChunk);
@@ -70,13 +69,7 @@ public void GlobalAfterEnteringState(SystemState next, StateMachineDriver stateM
7069
case Phase.PREPARE_GROW:
7170
// State machine should wait for active transactions in the last version to complete (drain out).
7271
// Note that we DO NOT allow new transactions to start in PREPARE_GROW (i.e., this is a full barrier)
73-
if (stateMachineDriver.GetNumActiveTransactions(lastVersion) > 0)
74-
{
75-
stateMachineDriver.lastVersion = lastVersion;
76-
stateMachineDriver.lastVersionTransactionsDone = new(0);
77-
}
78-
if (stateMachineDriver.GetNumActiveTransactions(lastVersion) > 0)
79-
stateMachineDriver.AddToWaitingList(stateMachineDriver.lastVersionTransactionsDone);
72+
stateMachineDriver.TrackLastVersion(lastVersion);
8073
break;
8174

8275
case Phase.IN_PROGRESS_GROW:

libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ public class StateMachineDriver
2121
TaskCompletionSource<bool> stateMachineCompleted;
2222
// All threads have entered the given state
2323
SemaphoreSlim waitForTransitionIn;
24+
Exception waitForTransitionInException;
2425
// All threads have exited the given state
2526
SemaphoreSlim waitForTransitionOut;
2627
// Transactions drained in last version
27-
public long lastVersion;
28-
public SemaphoreSlim lastVersionTransactionsDone;
28+
long lastVersion;
29+
SemaphoreSlim lastVersionTransactionsDone;
2930
List<IStateMachineCallback> callbacks;
3031
readonly LightEpoch epoch;
3132
readonly ILogger logger;
@@ -55,13 +56,35 @@ void DecrementActiveTransactions(long txnVersion)
5556
{
5657
if (Interlocked.Decrement(ref NumActiveTransactions[txnVersion & 0x1]) == 0)
5758
{
58-
if (lastVersionTransactionsDone != null && txnVersion == lastVersion)
59+
var _lastVersionTransactionsDone = lastVersionTransactionsDone;
60+
if (_lastVersionTransactionsDone != null && txnVersion == lastVersion)
5961
{
60-
lastVersionTransactionsDone.Release();
62+
_lastVersionTransactionsDone.Release();
6163
}
6264
}
6365
}
6466

67+
internal void TrackLastVersion(long version)
68+
{
69+
if (GetNumActiveTransactions(version) > 0)
70+
{
71+
// Set version number first, then create semaphore
72+
lastVersion = version;
73+
lastVersionTransactionsDone = new(0);
74+
}
75+
76+
// We have to re-check the number of active transactions after assigning lastVersion and lastVersionTransactionsDone
77+
if (GetNumActiveTransactions(version) > 0)
78+
AddToWaitingList(lastVersionTransactionsDone);
79+
}
80+
81+
internal void ResetLastVersion()
82+
{
83+
// First null semaphore, then reset version number
84+
lastVersionTransactionsDone = null;
85+
lastVersion = 0;
86+
}
87+
6588
/// <summary>
6689
/// Acquire a transaction version - this should be called before
6790
/// BeginLockable is called for all sessions in the transaction.
@@ -211,7 +234,7 @@ void GlobalStateMachineStep(SystemState expectedState)
211234
// Release waiters for new phase
212235
_ = waitForTransitionOut?.Release(int.MaxValue);
213236

214-
// Write new semaphore
237+
// Write new semaphores
215238
waitForTransitionOut = new SemaphoreSlim(0);
216239
waitForTransitionIn = new SemaphoreSlim(0);
217240

@@ -261,13 +284,31 @@ public async Task WaitForCompletion(SystemState currentState)
261284

262285
void MakeTransitionWorker(SystemState nextState)
263286
{
264-
stateMachine.GlobalAfterEnteringState(nextState, this);
265-
waitForTransitionIn.Release(int.MaxValue);
287+
try
288+
{
289+
stateMachine.GlobalAfterEnteringState(nextState, this);
290+
}
291+
catch (Exception e)
292+
{
293+
// Store the exception to be thrown by state machine driver
294+
// We do not throw here as this epoch action may be executed in a different thread context
295+
waitForTransitionInException = e;
296+
297+
logger?.LogError(e, "Exception in state machine transition worker");
298+
}
299+
finally
300+
{
301+
waitForTransitionIn.Release(int.MaxValue);
302+
}
266303
}
267304

268305
async Task ProcessWaitingListAsync(CancellationToken token = default)
269306
{
270307
await waitForTransitionIn.WaitAsync(token);
308+
if (waitForTransitionInException != null)
309+
{
310+
throw waitForTransitionInException;
311+
}
271312
foreach (var waiter in waitingList)
272313
{
273314
await waiter.WaitAsync(token);
@@ -288,6 +329,7 @@ async Task RunStateMachine(CancellationToken token = default)
288329
}
289330
catch (Exception e)
290331
{
332+
FastForwardStateMachineToRest();
291333
logger?.LogError(e, "Exception in state machine");
292334
ex = e;
293335
throw;
@@ -311,5 +353,31 @@ async Task RunStateMachine(CancellationToken token = default)
311353
}
312354
}
313355
}
356+
357+
void FastForwardStateMachineToRest()
358+
{
359+
// Move system state to the next REST phase
360+
while (systemState.Phase != Phase.REST)
361+
{
362+
systemState.Word = stateMachine.NextState(systemState).Word;
363+
}
364+
365+
// Reset last version
366+
ResetLastVersion();
367+
368+
// Release any waiters on existing transition-out semaphore
369+
if (waitForTransitionOut?.CurrentCount == 0)
370+
_ = waitForTransitionOut?.Release(int.MaxValue);
371+
372+
// Clear semaphores
373+
waitForTransitionOut = null;
374+
waitForTransitionIn = null;
375+
376+
// Clear exception if any
377+
waitForTransitionInException = null;
378+
379+
// Clear waiting list
380+
waitingList.Clear();
381+
}
314382
}
315383
}

0 commit comments

Comments
 (0)