1+ // Copyright (c) Microsoft Corporation.
2+ // Licensed under the MIT license.
3+
4+ using System ;
5+ using System . Diagnostics ;
6+ using System . Runtime . CompilerServices ;
7+ using Garnet . common ;
8+ using Microsoft . Extensions . Logging ;
9+ using Tsavorite . core ;
10+
11+ namespace Garnet . server
12+ {
13+ using MainStoreAllocator = SpanByteAllocator < StoreFunctions < SpanByte , SpanByte , SpanByteComparer , SpanByteRecordDisposer > > ;
14+ using MainStoreFunctions = StoreFunctions < SpanByte , SpanByte , SpanByteComparer , SpanByteRecordDisposer > ;
15+
16+ using ObjectStoreAllocator = GenericAllocator < byte [ ] , IGarnetObject , StoreFunctions < byte [ ] , IGarnetObject , ByteArrayKeyComparer , DefaultRecordDisposer < byte [ ] , IGarnetObject > > > ;
17+ using ObjectStoreFunctions = StoreFunctions < byte [ ] , IGarnetObject , ByteArrayKeyComparer , DefaultRecordDisposer < byte [ ] , IGarnetObject > > ;
18+
19+ public sealed unsafe partial class AofProcessor
20+ {
21+ /// <summary>
22+ /// Coordinates the replay of Append-Only File (AOF) operations, including transaction processing, fuzzy region
23+ /// handling, and stored procedure execution.
24+ /// </summary>
25+ /// <remarks>This class is responsible for managing the replay context, processing transaction
26+ /// groups, and handling operations within fuzzy regions. It provides methods to add, replay, and process
27+ /// transactions and operations, ensuring consistency and correctness during AOF replay. The <see
28+ /// cref="AofReplayCoordinator"/> is designed to work with an <see cref="AofProcessor"/> to facilitate the
29+ /// replay of operations.</remarks>
30+ /// <param name="aofProcessor"></param>
31+ /// <param name="logger"></param>
32+ public class AofReplayCoordinator ( AofProcessor aofProcessor , ILogger logger = null ) : IDisposable
33+ {
34+ readonly AofProcessor aofProcessor = aofProcessor ;
35+ readonly AofReplayContext aofReplayContext = InitializeReplayContext ( ) ;
36+ public AofReplayContext GetReplayContext ( ) => aofReplayContext ;
37+ readonly ILogger logger = logger ;
38+
39+ internal static AofReplayContext InitializeReplayContext ( )
40+ {
41+ return new AofReplayContext ( ) ;
42+ }
43+
44+ /// <summary>
45+ /// Dispose
46+ /// </summary>
47+ public void Dispose ( )
48+ {
49+ aofReplayContext . output . MemoryOwner ? . Dispose ( ) ;
50+ }
51+
52+ /// <summary>
53+ /// Get fuzzy region buffer count
54+ /// </summary>
55+ /// <returns></returns>
56+ internal int FuzzyRegionBufferCount ( ) => aofReplayContext . fuzzyRegionOps . Count ;
57+
58+ /// <summary>
59+ /// Clear fuzzy region buffer
60+ /// </summary>
61+ internal void ClearFuzzyRegionBuffer ( ) => aofReplayContext . fuzzyRegionOps . Clear ( ) ;
62+
63+ /// <summary>
64+ /// Add single operation to fuzzy region buffer
65+ /// </summary>
66+ /// <param name="entry"></param>
67+ internal unsafe void AddFuzzyRegionOperation ( ReadOnlySpan < byte > entry ) => aofReplayContext . fuzzyRegionOps . Add ( entry . ToArray ( ) ) ;
68+
69+ /// <summary>
70+ /// This method will perform one of the following
71+ /// 1. TxnStart: Create a new transaction group
72+ /// 2. TxnCommit: Replay or buffer transaction group depending if we are in fuzzyRegion.
73+ /// 3. TxnAbort: Clear corresponding sublog replay buffer.
74+ /// 4. Default: Add an operation to an existing transaction group
75+ /// </summary>
76+ /// <param name="ptr"></param>
77+ /// <param name="length"></param>
78+ /// <param name="asReplica"></param>
79+ /// <returns>Returns true if a txn operation was processed and added otherwise false</returns>
80+ /// <exception cref="GarnetException"></exception>
81+ internal unsafe bool AddOrReplayTransactionOperation ( byte * ptr , int length , bool asReplica )
82+ {
83+ var header = * ( AofHeader * ) ptr ;
84+ var replayContext = GetReplayContext ( ) ;
85+ // First try to process this as an existing transaction
86+ if ( aofReplayContext . activeTxns . TryGetValue ( header . sessionID , out var group ) )
87+ {
88+ switch ( header . opType )
89+ {
90+ case AofEntryType . TxnStart :
91+ throw new GarnetException ( "No nested transactions expected" ) ;
92+ case AofEntryType . TxnAbort :
93+ ClearSessionTxn ( ) ;
94+ break ;
95+ case AofEntryType . TxnCommit :
96+ if ( replayContext . inFuzzyRegion )
97+ {
98+ // If in fuzzy region we want to record the commit marker and
99+ // buffer the transaction group for later replay
100+ var commitMarker = new ReadOnlySpan < byte > ( ptr , length ) ;
101+ aofReplayContext . AddToFuzzyRegionBuffer ( group , commitMarker ) ;
102+ }
103+ else
104+ {
105+ // Otherwise process transaction group immediately
106+ ProcessTransactionGroup ( ptr , asReplica , group ) ;
107+ }
108+
109+ // We want to clear and remove in both cases to make space for next txn from session
110+ ClearSessionTxn ( ) ;
111+ break ;
112+ case AofEntryType . StoredProcedure :
113+ throw new GarnetException ( $ "Unexpected AOF header operation type { header . opType } within transaction") ;
114+ default :
115+ group . operations . Add ( new ReadOnlySpan < byte > ( ptr , length ) . ToArray ( ) ) ;
116+ break ;
117+ }
118+
119+ void ClearSessionTxn ( )
120+ {
121+ aofReplayContext . activeTxns [ header . sessionID ] . Clear ( ) ;
122+ _ = aofReplayContext . activeTxns . Remove ( header . sessionID ) ;
123+ }
124+
125+ return true ;
126+ }
127+
128+ // See if you have detected a txn
129+ switch ( header . opType )
130+ {
131+ case AofEntryType . TxnStart :
132+ aofReplayContext . AddTransactionGroup ( header . sessionID ) ;
133+ break ;
134+ case AofEntryType . TxnAbort :
135+ case AofEntryType . TxnCommit :
136+ // We encountered a transaction end without start - this could happen because we truncated the AOF
137+ // after a checkpoint, and the transaction belonged to the previous version. It can safely
138+ // be ignored.
139+ break ;
140+ default :
141+ // Continue processing
142+ return false ;
143+ }
144+
145+ // Processed this record successfully
146+ return true ;
147+ }
148+
149+ /// <summary>
150+ /// Process fuzzy region operations if any
151+ /// </summary>
152+ /// <param name="storeVersion"></param>
153+ /// <param name="asReplica"></param>
154+ internal void ProcessFuzzyRegionOperations ( long storeVersion , bool asReplica )
155+ {
156+ var fuzzyRegionOps = aofReplayContext . fuzzyRegionOps ;
157+ if ( fuzzyRegionOps . Count > 0 )
158+ logger ? . LogInformation ( "Replaying {fuzzyRegionBufferCount} records from fuzzy region for checkpoint {newVersion}" , fuzzyRegionOps . Count , storeVersion ) ;
159+ foreach ( var entry in fuzzyRegionOps )
160+ {
161+ fixed ( byte * entryPtr = entry )
162+ _ = aofProcessor . ReplayOp ( aofProcessor . basicContext , aofProcessor . objectStoreBasicContext , entryPtr , entry . Length , asReplica ) ;
163+ }
164+ }
165+
166+ /// <summary>
167+ /// Process fuzzy region transaction groups
168+ /// </summary>
169+ /// <param name="ptr"></param>
170+ /// <param name="asReplica"></param>
171+ internal void ProcessFuzzyRegionTransactionGroup ( byte * ptr , bool asReplica )
172+ {
173+ Debug . Assert ( aofReplayContext . txnGroupBuffer != null ) ;
174+ // Process transaction groups in FIFO order
175+ if ( aofReplayContext . txnGroupBuffer . Count > 0 )
176+ {
177+ var txnGroup = aofReplayContext . txnGroupBuffer . Dequeue ( ) ;
178+ ProcessTransactionGroup ( ptr , asReplica , txnGroup ) ;
179+ }
180+ }
181+
182+ /// <summary>
183+ /// Process provided transaction group
184+ /// </summary>
185+ /// <param name="asReplica"></param>
186+ /// <param name="txnGroup"></param>
187+ internal void ProcessTransactionGroup ( byte * ptr , bool asReplica , TransactionGroup txnGroup )
188+ {
189+ if ( ! asReplica )
190+ {
191+ // If recovering reads will not expose partial transactions so we can replay without locking.
192+ // Also we don't have to synchronize replay of sublogs because write ordering has been established at the time of enqueue.
193+ ProcessTransactionGroupOperations ( aofProcessor , aofProcessor . basicContext , aofProcessor . objectStoreBasicContext , txnGroup , asReplica ) ;
194+ }
195+ else
196+ {
197+ var txnManager = aofProcessor . respServerSession . txnManager ;
198+
199+ // Start by saving transaction keys for locking
200+ SaveTransactionGroupKeysToLock ( txnManager , txnGroup ) ;
201+
202+ // Start transaction
203+ _ = txnManager . Run ( internal_txn : true ) ;
204+
205+ // Process in parallel transaction group
206+ ProcessTransactionGroupOperations ( aofProcessor , txnManager . LockableContext , txnManager . ObjectStoreLockableContext , txnGroup , asReplica ) ;
207+
208+ // NOTE:
209+ // This txnManager instance is taken from a session with StoreWrapper(recordToAof=false).
210+ // For this reason its internal appendOnlyFile instance is null.
211+ // Hence this commit will not write into the replica's Aof file as it is required.
212+ Debug . Assert ( ! txnManager . AofEnabled ) ;
213+ txnManager . Commit ( true ) ;
214+ }
215+
216+ // Helper to iterate of transaction keys and add them to lockset
217+ static unsafe void SaveTransactionGroupKeysToLock ( TransactionManager txnManager , TransactionGroup txnGroup )
218+ {
219+ foreach ( var entry in txnGroup . operations )
220+ {
221+ ref var key = ref Unsafe . NullRef < SpanByte > ( ) ;
222+ fixed ( byte * entryPtr = entry )
223+ {
224+ var header = * ( AofHeader * ) entryPtr ;
225+ var isObject = false ;
226+ switch ( header . opType )
227+ {
228+ case AofEntryType . StoreUpsert :
229+ case AofEntryType . StoreRMW :
230+ case AofEntryType . StoreDelete :
231+ key = ref Unsafe . AsRef < SpanByte > ( entryPtr + sizeof ( AofHeader ) ) ;
232+ isObject = false ;
233+ break ;
234+ case AofEntryType . ObjectStoreUpsert :
235+ case AofEntryType . ObjectStoreRMW :
236+ case AofEntryType . ObjectStoreDelete :
237+ key = ref Unsafe . AsRef < SpanByte > ( entryPtr + sizeof ( AofHeader ) ) ;
238+ isObject = true ;
239+ break ;
240+ default :
241+ throw new GarnetException ( $ "Invalid replay operation { header . opType } within transaction") ;
242+ }
243+
244+ // Add key to the lockset
245+ txnManager . SaveKeyEntryToLock ( ArgSlice . FromPinnedSpan ( key . AsReadOnlySpan ( ) ) , isObject : isObject , LockType . Exclusive ) ;
246+ }
247+ }
248+ }
249+
250+ // Process transaction
251+ static void ProcessTransactionGroupOperations < TContext , TObjectContext > ( AofProcessor aofProcessor , TContext context , TObjectContext objectContext , TransactionGroup txnGroup , bool asReplica )
252+ where TContext : ITsavoriteContext < SpanByte , SpanByte , RawStringInput , SpanByteAndMemory , long , MainSessionFunctions , MainStoreFunctions , MainStoreAllocator >
253+ where TObjectContext : ITsavoriteContext < byte [ ] , IGarnetObject , ObjectInput , GarnetObjectStoreOutput , long , ObjectSessionFunctions , ObjectStoreFunctions , ObjectStoreAllocator >
254+ {
255+ foreach ( var entry in txnGroup . operations )
256+ {
257+ fixed ( byte * entryPtr = entry )
258+ _ = aofProcessor . ReplayOp ( context , objectContext , entryPtr , entry . Length , asReplica : asReplica ) ;
259+ }
260+ }
261+ }
262+
263+ /// <summary>
264+ /// Replay StoredProc wrapper for single and sharded logs
265+ /// </summary>
266+ /// <param name="id"></param>
267+ /// <param name="ptr"></param>
268+ internal void ReplayStoredProc ( byte id , byte * ptr )
269+ {
270+ StoredProcRunnerBase ( id , ptr ) ;
271+
272+ // Based run stored proc method used of legacy single log implementation
273+ void StoredProcRunnerBase ( byte id , byte * ptr )
274+ {
275+ var curr = ptr + sizeof ( AofHeader ) ;
276+
277+ // Reconstructing CustomProcedureInput
278+ _ = aofReplayContext . customProcInput . DeserializeFrom ( curr ) ;
279+
280+ // Run the stored procedure with the reconstructed input
281+ var output = aofReplayContext . output ;
282+ _ = aofProcessor . respServerSession . RunTransactionProc ( id , ref aofReplayContext . customProcInput , ref output , isRecovering : true ) ;
283+ }
284+ }
285+ }
286+ }
287+ }
0 commit comments