Skip to content

Commit fab1d64

Browse files
authored
Add post-operation hooks and epoch-safe AOF logging (#1508)
* Add post-operation hooks and epoch-safe AOF logging * Introduce PostUpsertOperation, PostRMWOperation, and PostDeleteOperation hooks to ISessionFunctions, enabling actions after upsert, RMW, and delete but before record unlock. * Add IEpochAccessor interface for epoch management by Post*Operation hooks. * Refactor TsavoriteLog.Enqueue to accept an epoch accessor from caller layer, ensuring we correctly suspend/resume the outer epoch during blocking log appends. * Move AOF write logic in Garnet.server to post-operation hooks, using a UserData flag to defer logging until after the main operation (but before unlocking). * Make UpsertInfo, RMWInfo, and DeleteInfo carry a UserData byte for operation flags. * Update existing ISessionFunctions implementations with no-op post-operation hooks. * Enums UpsertAction, RMWAction, and DeleteAction are now byte-sized. These changes ensure that if we block waiting for AllocateBlock during AOF logging in Tsavorite, we will not be holding the outer (TsavoriteKV-level) epoch, thereby preventing a epoch table full situation for other normal operation (GET/SET) threads. * format, update version * fix bug * fixes * nits * use constant for NeedAofLog
1 parent e0bf18d commit fab1d64

File tree

24 files changed

+378
-115
lines changed

24 files changed

+378
-115
lines changed

libs/server/Storage/Functions/MainStore/DeleteMethods.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public bool SingleDeleter(ref SpanByte key, ref SpanByte value, ref DeleteInfo d
2222
public void PostSingleDeleter(ref SpanByte key, ref DeleteInfo deleteInfo)
2323
{
2424
if (functionsState.appendOnlyFile != null)
25-
WriteLogDelete(ref key, deleteInfo.Version, deleteInfo.SessionID);
25+
deleteInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
2626
}
2727

2828
/// <inheritdoc />
@@ -32,8 +32,18 @@ public bool ConcurrentDeleter(ref SpanByte key, ref SpanByte value, ref DeleteIn
3232
if (!deleteInfo.RecordInfo.Modified)
3333
functionsState.watchVersionMap.IncrementVersion(deleteInfo.KeyHash);
3434
if (functionsState.appendOnlyFile != null)
35-
WriteLogDelete(ref key, deleteInfo.Version, deleteInfo.SessionID);
35+
deleteInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
3636
return true;
3737
}
38+
39+
/// <inheritdoc />
40+
public void PostDeleteOperation<TEpochAccessor>(ref SpanByte key, ref DeleteInfo deleteInfo, TEpochAccessor epochAccessor)
41+
where TEpochAccessor : IEpochAccessor
42+
{
43+
if ((deleteInfo.UserData & NeedAofLog) == NeedAofLog) // Check if we need to write to AOF
44+
{
45+
WriteLogDelete(ref key, deleteInfo.Version, deleteInfo.SessionID, epochAccessor);
46+
}
47+
}
3848
}
3949
}

libs/server/Storage/Functions/MainStore/MainSessionFunctions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace Garnet.server
1010
/// </summary>
1111
public readonly unsafe partial struct MainSessionFunctions : ISessionFunctions<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long>
1212
{
13+
const byte NeedAofLog = 0x1;
1314
readonly FunctionsState functionsState;
1415

1516
/// <summary>

libs/server/Storage/Functions/MainStore/PrivateMethods.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,8 @@ static void WriteValAndEtagToDst(int desiredLength, ref ReadOnlySpan<byte> value
725725
/// a. ConcurrentWriter
726726
/// b. PostSingleWriter
727727
/// </summary>
728-
void WriteLogUpsert(ref SpanByte key, ref RawStringInput input, ref SpanByte value, long version, int sessionId)
728+
void WriteLogUpsert<TEpochAccessor>(ref SpanByte key, ref RawStringInput input, ref SpanByte value, long version, int sessionId, TEpochAccessor epochAccessor)
729+
where TEpochAccessor : IEpochAccessor
729730
{
730731
if (functionsState.StoredProcMode) return;
731732

@@ -737,7 +738,7 @@ void WriteLogUpsert(ref SpanByte key, ref RawStringInput input, ref SpanByte val
737738

738739
functionsState.appendOnlyFile.Enqueue(
739740
new AofHeader { opType = AofEntryType.StoreUpsert, storeVersion = version, sessionID = sessionId },
740-
ref key, ref value, ref input, out _);
741+
ref key, ref value, ref input, epochAccessor, out _);
741742
}
742743

743744
/// <summary>
@@ -746,26 +747,28 @@ void WriteLogUpsert(ref SpanByte key, ref RawStringInput input, ref SpanByte val
746747
/// b. InPlaceUpdater
747748
/// c. PostCopyUpdater
748749
/// </summary>
749-
void WriteLogRMW(ref SpanByte key, ref RawStringInput input, long version, int sessionId)
750+
void WriteLogRMW<TEpochAccessor>(ref SpanByte key, ref RawStringInput input, long version, int sessionId, TEpochAccessor epochAccessor)
751+
where TEpochAccessor : IEpochAccessor
750752
{
751753
if (functionsState.StoredProcMode) return;
752754
input.header.flags |= RespInputFlags.Deterministic;
753755

754756
functionsState.appendOnlyFile.Enqueue(
755757
new AofHeader { opType = AofEntryType.StoreRMW, storeVersion = version, sessionID = sessionId },
756-
ref key, ref input, out _);
758+
ref key, ref input, epochAccessor, out _);
757759
}
758760

759761
/// <summary>
760762
/// Logging Delete from
761763
/// a. ConcurrentDeleter
762764
/// b. PostSingleDeleter
763765
/// </summary>
764-
void WriteLogDelete(ref SpanByte key, long version, int sessionID)
766+
void WriteLogDelete<TEpochAccessor>(ref SpanByte key, long version, int sessionID, TEpochAccessor epochAccessor)
767+
where TEpochAccessor : IEpochAccessor
765768
{
766769
if (functionsState.StoredProcMode) return;
767770
SpanByte def = default;
768-
functionsState.appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.StoreDelete, storeVersion = version, sessionID = sessionID }, ref key, ref def, out _);
771+
functionsState.appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.StoreDelete, storeVersion = version, sessionID = sessionID }, ref key, ref def, epochAccessor, out _);
769772
}
770773

771774
BitFieldCmdArgs GetBitFieldArguments(ref RawStringInput input)

libs/server/Storage/Functions/MainStore/RMWMethods.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ public void PostInitialUpdater(ref SpanByte key, ref RawStringInput input, ref S
297297
if (functionsState.appendOnlyFile != null)
298298
{
299299
input.header.SetExpiredFlag();
300-
WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID);
300+
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
301301
}
302302
}
303303

@@ -314,7 +314,7 @@ public bool InPlaceUpdater(ref SpanByte key, ref RawStringInput input, ref SpanB
314314
if (!rmwInfo.RecordInfo.Modified)
315315
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
316316
if (functionsState.appendOnlyFile != null)
317-
WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID);
317+
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
318318
return true;
319319
case IPUResult.NotUpdated:
320320
default:
@@ -1397,8 +1397,18 @@ public bool PostCopyUpdater(ref SpanByte key, ref RawStringInput input, ref Span
13971397
{
13981398
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
13991399
if (functionsState.appendOnlyFile != null)
1400-
WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID);
1400+
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
14011401
return true;
14021402
}
1403+
1404+
/// <inheritdoc />
1405+
public void PostRMWOperation<TEpochAccessor>(ref SpanByte key, ref RawStringInput input, ref RMWInfo rmwInfo, TEpochAccessor epochAccessor)
1406+
where TEpochAccessor : IEpochAccessor
1407+
{
1408+
if ((rmwInfo.UserData & NeedAofLog) == NeedAofLog) // Check if we need to write to AOF
1409+
{
1410+
WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID, epochAccessor);
1411+
}
1412+
}
14031413
}
14041414
}

libs/server/Storage/Functions/MainStore/UpsertMethods.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public void PostSingleWriter(ref SpanByte key, ref RawStringInput input, ref Spa
2323
{
2424
functionsState.watchVersionMap.IncrementVersion(upsertInfo.KeyHash);
2525
if (reason == WriteReason.Upsert && functionsState.appendOnlyFile != null)
26-
WriteLogUpsert(ref key, ref input, ref src, upsertInfo.Version, upsertInfo.SessionID);
26+
upsertInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
2727
}
2828

2929
/// <inheritdoc />
@@ -36,13 +36,23 @@ public bool ConcurrentWriter(ref SpanByte key, ref RawStringInput input, ref Spa
3636
if (!upsertInfo.RecordInfo.Modified)
3737
functionsState.watchVersionMap.IncrementVersion(upsertInfo.KeyHash);
3838
if (functionsState.appendOnlyFile != null)
39-
WriteLogUpsert(ref key, ref input, ref src, upsertInfo.Version, upsertInfo.SessionID);
39+
upsertInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
4040
return true;
4141
}
4242
return false;
4343
}
4444

4545
static bool ConcurrentWriterWorker(ref SpanByte src, ref SpanByte dst, ref RawStringInput input, ref UpsertInfo upsertInfo, ref RecordInfo recordInfo)
4646
=> SpanByteFunctions<RawStringInput, SpanByteAndMemory, long>.DoSafeCopy(ref src, ref dst, ref upsertInfo, ref recordInfo, input.arg1);
47+
48+
/// <inheritdoc />
49+
public void PostUpsertOperation<TEpochAccessor>(ref SpanByte key, ref RawStringInput input, ref SpanByte src, ref UpsertInfo upsertInfo, TEpochAccessor epochAccessor)
50+
where TEpochAccessor : IEpochAccessor
51+
{
52+
if ((upsertInfo.UserData & NeedAofLog) == NeedAofLog) // Check if we need to write to AOF
53+
{
54+
WriteLogUpsert(ref key, ref input, ref src, upsertInfo.Version, upsertInfo.SessionID, epochAccessor);
55+
}
56+
}
4757
}
4858
}

libs/server/Storage/Functions/ObjectStore/DeleteMethods.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public void PostSingleDeleter(ref byte[] key, ref DeleteInfo deleteInfo)
2020
if (!deleteInfo.RecordInfo.Modified)
2121
functionsState.watchVersionMap.IncrementVersion(deleteInfo.KeyHash);
2222
if (functionsState.appendOnlyFile != null)
23-
WriteLogDelete(ref key, deleteInfo.Version, deleteInfo.SessionID);
23+
deleteInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
2424
}
2525

2626
/// <inheritdoc />
@@ -29,10 +29,20 @@ public bool ConcurrentDeleter(ref byte[] key, ref IGarnetObject value, ref Delet
2929
if (!deleteInfo.RecordInfo.Modified)
3030
functionsState.watchVersionMap.IncrementVersion(deleteInfo.KeyHash);
3131
if (functionsState.appendOnlyFile != null)
32-
WriteLogDelete(ref key, deleteInfo.Version, deleteInfo.SessionID);
32+
deleteInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
3333
functionsState.objectStoreSizeTracker?.AddTrackedSize(-value.Size);
3434
value = null;
3535
return true;
3636
}
37+
38+
/// <inheritdoc />
39+
public void PostDeleteOperation<TEpochAccessor>(ref byte[] key, ref DeleteInfo deleteInfo, TEpochAccessor epochAccessor)
40+
where TEpochAccessor : IEpochAccessor
41+
{
42+
if ((deleteInfo.UserData & NeedAofLog) == NeedAofLog) // Check if we need to write to AOF
43+
{
44+
WriteLogDelete(ref key, deleteInfo.Version, deleteInfo.SessionID, epochAccessor);
45+
}
46+
}
3747
}
3848
}

libs/server/Storage/Functions/ObjectStore/ObjectSessionFunctions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace Garnet.server
1010
/// </summary>
1111
public readonly unsafe partial struct ObjectSessionFunctions : ISessionFunctions<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long>
1212
{
13+
const byte NeedAofLog = 0x1;
1314
readonly FunctionsState functionsState;
1415

1516
/// <summary>

libs/server/Storage/Functions/ObjectStore/PrivateMethods.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ namespace Garnet.server
2020
/// a. ConcurrentWriter
2121
/// b. PostSingleWriter
2222
/// </summary>
23-
void WriteLogUpsert(ref byte[] key, ref ObjectInput input, ref IGarnetObject value, long version, int sessionID)
23+
void WriteLogUpsert<TEpochAccessor>(ref byte[] key, ref ObjectInput input, ref IGarnetObject value, long version, int sessionID, TEpochAccessor epochAccessor)
24+
where TEpochAccessor : IEpochAccessor
2425
{
2526
if (functionsState.StoredProcMode) return;
2627
input.header.flags |= RespInputFlags.Deterministic;
@@ -35,7 +36,7 @@ void WriteLogUpsert(ref byte[] key, ref ObjectInput input, ref IGarnetObject val
3536

3637
functionsState.appendOnlyFile.Enqueue(
3738
new AofHeader { opType = AofEntryType.ObjectStoreUpsert, storeVersion = version, sessionID = sessionID },
38-
ref keySB, ref valSB, out _);
39+
ref keySB, ref valSB, epochAccessor, out _);
3940
}
4041
}
4142
}
@@ -46,7 +47,8 @@ void WriteLogUpsert(ref byte[] key, ref ObjectInput input, ref IGarnetObject val
4647
/// b. InPlaceUpdater
4748
/// c. PostCopyUpdater
4849
/// </summary>
49-
void WriteLogRMW(ref byte[] key, ref ObjectInput input, long version, int sessionID)
50+
void WriteLogRMW<TEpochAccessor>(ref byte[] key, ref ObjectInput input, long version, int sessionID, TEpochAccessor epochAccessor)
51+
where TEpochAccessor : IEpochAccessor
5052
{
5153
if (functionsState.StoredProcMode) return;
5254
input.header.flags |= RespInputFlags.Deterministic;
@@ -58,7 +60,7 @@ void WriteLogRMW(ref byte[] key, ref ObjectInput input, long version, int sessio
5860

5961
functionsState.appendOnlyFile.Enqueue(
6062
new AofHeader { opType = AofEntryType.ObjectStoreRMW, storeVersion = version, sessionID = sessionID },
61-
ref sbKey, ref input, out _);
63+
ref sbKey, ref input, epochAccessor, out _);
6264
}
6365
}
6466

@@ -67,15 +69,18 @@ void WriteLogRMW(ref byte[] key, ref ObjectInput input, long version, int sessio
6769
/// a. ConcurrentDeleter
6870
/// b. PostSingleDeleter
6971
/// </summary>
70-
void WriteLogDelete(ref byte[] key, long version, int sessionID)
72+
void WriteLogDelete<TEpochAccessor>(ref byte[] key, long version, int sessionID, TEpochAccessor epochAccessor)
73+
where TEpochAccessor : IEpochAccessor
7174
{
7275
if (functionsState.StoredProcMode) return;
7376
fixed (byte* ptr = key)
7477
{
7578
var keySB = SpanByte.FromPinnedPointer(ptr, key.Length);
7679
SpanByte valSB = default;
7780

78-
functionsState.appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.ObjectStoreDelete, storeVersion = version, sessionID = sessionID }, ref keySB, ref valSB, out _);
81+
functionsState.appendOnlyFile.Enqueue(
82+
new AofHeader { opType = AofEntryType.ObjectStoreDelete, storeVersion = version, sessionID = sessionID },
83+
ref keySB, ref valSB, epochAccessor, out _);
7984
}
8085
}
8186

libs/server/Storage/Functions/ObjectStore/RMWMethods.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void PostInitialUpdater(ref byte[] key, ref ObjectInput input, ref IGarne
8181
if (functionsState.appendOnlyFile != null)
8282
{
8383
input.header.SetExpiredFlag();
84-
WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID);
84+
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
8585
}
8686

8787
functionsState.objectStoreSizeTracker?.AddTrackedSize(MemoryUtils.CalculateKeyValueSize(key, value));
@@ -94,7 +94,8 @@ public bool InPlaceUpdater(ref byte[] key, ref ObjectInput input, ref IGarnetObj
9494
{
9595
if (!rmwInfo.RecordInfo.Modified)
9696
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
97-
if (functionsState.appendOnlyFile != null) WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID);
97+
if (functionsState.appendOnlyFile != null)
98+
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
9899
functionsState.objectStoreSizeTracker?.AddTrackedSize(sizeChange);
99100
return true;
100101
}
@@ -271,8 +272,18 @@ public bool PostCopyUpdater(ref byte[] key, ref ObjectInput input, ref IGarnetOb
271272
functionsState.objectStoreSizeTracker?.AddTrackedSize(sizeAdjustment);
272273

273274
if (functionsState.appendOnlyFile != null)
274-
WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID);
275+
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
275276
return true;
276277
}
278+
279+
/// <inheritdoc />
280+
public void PostRMWOperation<TEpochAccessor>(ref byte[] key, ref ObjectInput input, ref RMWInfo rmwInfo, TEpochAccessor epochAccessor)
281+
where TEpochAccessor : IEpochAccessor
282+
{
283+
if ((rmwInfo.UserData & NeedAofLog) == NeedAofLog) // Check if we need to write to AOF
284+
{
285+
WriteLogRMW(ref key, ref input, rmwInfo.Version, rmwInfo.SessionID, epochAccessor);
286+
}
287+
}
277288
}
278289
}

libs/server/Storage/Functions/ObjectStore/UpsertMethods.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public void PostSingleWriter(ref byte[] key, ref ObjectInput input, ref IGarnetO
2323
if (reason != WriteReason.CopyToTail)
2424
functionsState.watchVersionMap.IncrementVersion(upsertInfo.KeyHash);
2525
if (reason == WriteReason.Upsert && functionsState.appendOnlyFile != null)
26-
WriteLogUpsert(ref key, ref input, ref src, upsertInfo.Version, upsertInfo.SessionID);
26+
upsertInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
2727

2828
if (reason == WriteReason.CopyToReadCache)
2929
functionsState.objectStoreSizeTracker?.AddReadCacheTrackedSize(MemoryUtils.CalculateKeyValueSize(key, src));
@@ -38,9 +38,19 @@ public bool ConcurrentWriter(ref byte[] key, ref ObjectInput input, ref IGarnetO
3838
if (!upsertInfo.RecordInfo.Modified)
3939
functionsState.watchVersionMap.IncrementVersion(upsertInfo.KeyHash);
4040
if (functionsState.appendOnlyFile != null)
41-
WriteLogUpsert(ref key, ref input, ref src, upsertInfo.Version, upsertInfo.SessionID);
41+
upsertInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
4242
functionsState.objectStoreSizeTracker?.AddTrackedSize(dst.Size - src.Size);
4343
return true;
4444
}
45+
46+
/// <inheritdoc />
47+
public void PostUpsertOperation<TEpochAccessor>(ref byte[] key, ref ObjectInput input, ref IGarnetObject src, ref UpsertInfo upsertInfo, TEpochAccessor epochAccessor)
48+
where TEpochAccessor : IEpochAccessor
49+
{
50+
if ((upsertInfo.UserData & NeedAofLog) == NeedAofLog) // Check if we need to write to AOF
51+
{
52+
WriteLogUpsert(ref key, ref input, ref src, upsertInfo.Version, upsertInfo.SessionID, epochAccessor);
53+
}
54+
}
4555
}
4656
}

0 commit comments

Comments
 (0)