11// Copyright (c) Microsoft Corporation. All rights reserved.
22// Licensed under the MIT license.
33
4- using System . Collections . Generic ;
5- using System . Threading . Tasks ;
64using FASTER . core ;
75using NUnit . Framework ;
6+ using System . Collections . Generic ;
7+ using System . Threading . Tasks ;
8+ using static FASTER . test . TestUtils ;
89
910namespace FASTER . test
1011{
12+ public struct LocalKeyStructComparer : IFasterEqualityComparer < KeyStruct >
13+ {
14+ internal long ? forceCollisionHash ;
15+
16+ public long GetHashCode64 ( ref KeyStruct key )
17+ {
18+ return forceCollisionHash . HasValue ? forceCollisionHash . Value : Utility . GetHashCode ( key . kfield1 ) ;
19+ }
20+ public bool Equals ( ref KeyStruct k1 , ref KeyStruct k2 )
21+ {
22+ return k1 . kfield1 == k2 . kfield1 && k1 . kfield2 == k2 . kfield2 ;
23+ }
24+
25+ public override string ToString ( ) => $ "forceHashCollision: { forceCollisionHash } ";
26+ }
27+
1128 [ TestFixture ]
1229 class CompletePendingTests
1330 {
1431 private FasterKV < KeyStruct , ValueStruct > fht ;
1532 private IDevice log ;
33+ LocalKeyStructComparer comparer = new ( ) ;
1634
1735 [ SetUp ]
1836 public void Setup ( )
1937 {
2038 // Clean up log files from previous test runs in case they weren't cleaned up
21- TestUtils . DeleteDirectory ( TestUtils . MethodTestDir , wait : true ) ;
39+ DeleteDirectory ( MethodTestDir , wait : true ) ;
2240
23- log = Devices . CreateLogDevice ( $ "{ TestUtils . MethodTestDir } /CompletePendingTests.log", preallocateFile : true , deleteOnClose : true ) ;
24- fht = new FasterKV < KeyStruct , ValueStruct > ( 128 , new LogSettings { LogDevice = log , MemorySizeBits = 29 } ) ;
41+ log = Devices . CreateLogDevice ( $ "{ MethodTestDir } /CompletePendingTests.log", preallocateFile : true , deleteOnClose : true ) ;
42+ fht = new FasterKV < KeyStruct , ValueStruct > ( 128 , new LogSettings { LogDevice = log , MemorySizeBits = 29 } , comparer : comparer ) ;
2543 }
2644
2745 [ TearDown ]
@@ -31,15 +49,15 @@ public void TearDown()
3149 fht = null ;
3250 log ? . Dispose ( ) ;
3351 log = null ;
34- TestUtils . DeleteDirectory ( TestUtils . MethodTestDir , wait : true ) ;
52+ DeleteDirectory ( MethodTestDir , wait : true ) ;
3553 }
3654
3755 const int numRecords = 1000 ;
3856
3957 static KeyStruct NewKeyStruct ( int key ) => new ( ) { kfield1 = key , kfield2 = key + numRecords * 10 } ;
4058 static ValueStruct NewValueStruct ( int key ) => new ( ) { vfield1 = key , vfield2 = key + numRecords * 10 } ;
4159
42- static InputStruct NewInputStruct ( int key ) => new ( ) { ifield1 = key + numRecords * 30 , ifield2 = key + numRecords * 40 } ;
60+ static InputStruct NewInputStruct ( int key ) => new ( ) { ifield1 = key + numRecords * 30 , ifield2 = key + numRecords * 40 } ;
4361 static ContextStruct NewContextStruct ( int key ) => new ( ) { cfield1 = key + numRecords * 50 , cfield2 = key + numRecords * 60 } ;
4462
4563 static void VerifyStructs ( int key , ref KeyStruct keyStruct , ref InputStruct inputStruct , ref OutputStruct outputStruct , ref ContextStruct contextStruct , bool useRMW )
@@ -126,7 +144,7 @@ internal static void VerifyOneNotFound(CompletedOutputIterator<KeyStruct, ValueS
126144
127145 [ Test ]
128146 [ Category ( "FasterKV" ) ]
129- public async ValueTask ReadAndCompleteWithPendingOutput ( [ Values ] bool useRMW , [ Values ] bool isAsync )
147+ public async ValueTask ReadAndCompleteWithPendingOutput ( [ Values ] bool useRMW , [ Values ] bool isAsync )
130148 {
131149 using var session = fht . For ( new FunctionsWithContext < ContextStruct > ( ) ) . NewSession < FunctionsWithContext < ContextStruct > > ( ) ;
132150 Assert . IsNull ( session . completedOutputs ) ; // Do not instantiate until we need it
@@ -214,5 +232,144 @@ public async ValueTask ReadAndCompleteWithPendingOutput([Values]bool useRMW, [Va
214232 Assert . AreEqual ( address , recordMetadata . Address ) ;
215233 }
216234 }
235+
236+ public enum StartAddressMode
237+ {
238+ UseStartAddress ,
239+ NoStartAddress
240+ }
241+
242+ public class PendingReadFunctions < TContext > : FunctionsBase < KeyStruct , ValueStruct , InputStruct , OutputStruct , Empty >
243+ {
244+ public override void ReadCompletionCallback ( ref KeyStruct key , ref InputStruct input , ref OutputStruct output , Empty ctx , Status status , RecordMetadata recordMetadata )
245+ {
246+ Assert . IsTrue ( status . Found ) ;
247+ Assert . AreEqual ( key . kfield1 , output . value . vfield1 ) ;
248+ // Do not compare field2; that's our updated value, and the key won't be found if we change kfield2
249+ }
250+
251+ // Read functions
252+ public override bool SingleReader ( ref KeyStruct key , ref InputStruct input , ref ValueStruct value , ref OutputStruct dst , ref ReadInfo readInfo )
253+ {
254+ Assert . IsFalse ( readInfo . RecordInfo . IsNull ( ) ) ;
255+ dst . value = value ;
256+ return true ;
257+ }
258+
259+ public override bool ConcurrentReader ( ref KeyStruct key , ref InputStruct input , ref ValueStruct value , ref OutputStruct dst , ref ReadInfo readInfo )
260+ => SingleReader ( ref key , ref input , ref value , ref dst , ref readInfo ) ;
261+ }
262+
263+ [ Test ]
264+ [ Category ( "FasterKV" ) ]
265+ public void ReadPendingWithNewSameKey ( [ Values ] StartAddressMode startAddressMode , [ Values ( FlushMode . NoFlush , FlushMode . OnDisk ) ] FlushMode secondRecordFlushMode )
266+ {
267+ const int valueMult = 1000 ;
268+
269+ using var session = fht . For ( new PendingReadFunctions < ContextStruct > ( ) ) . NewSession < PendingReadFunctions < ContextStruct > > ( ) ;
270+
271+ // Store off startAddress before initial upsert
272+ var startAddress = startAddressMode == StartAddressMode . UseStartAddress ? fht . Log . TailAddress : Constants . kInvalidAddress ;
273+
274+ // Insert first record
275+ var firstValue = 0 ; // same as key
276+ var keyStruct = new KeyStruct { kfield1 = firstValue , kfield2 = firstValue * valueMult } ;
277+ var valueStruct = new ValueStruct { vfield1 = firstValue , vfield2 = firstValue * valueMult } ;
278+ session . Upsert ( ref keyStruct , ref valueStruct ) ;
279+
280+ // Flush to make the Read() go pending.
281+ fht . Log . FlushAndEvict ( wait : true ) ;
282+
283+ ReadOptions readOptions = new ( ) { StartAddress = startAddress } ;
284+ var ( status , outputStruct ) = session . Read ( keyStruct , ref readOptions ) ;
285+ Assert . IsTrue ( status . IsPending , $ "Expected status.IsPending: { status } ") ;
286+
287+ // Insert next record with the same key and flush this too if requested.
288+ var secondValue = firstValue + 1 ;
289+ valueStruct . vfield2 = secondValue * valueMult ;
290+ session . Upsert ( ref keyStruct , ref valueStruct ) ;
291+ if ( secondRecordFlushMode == FlushMode . OnDisk )
292+ fht . Log . FlushAndEvict ( wait : true ) ;
293+
294+ session . CompletePendingWithOutputs ( out var completedOutputs , wait : true ) ;
295+ ( status , outputStruct ) = GetSinglePendingResult ( completedOutputs ) ;
296+
297+ if ( startAddressMode == StartAddressMode . UseStartAddress )
298+ Assert . AreEqual ( firstValue * valueMult , outputStruct . value . vfield2 , "UseStartAddress should have returned first value" ) ;
299+ else
300+ Assert . AreEqual ( secondValue * valueMult , outputStruct . value . vfield2 , "NoStartAddress should have returned second value" ) ;
301+ }
302+
303+ [ Test ]
304+ [ Category ( "FasterKV" ) ]
305+ public void ReadPendingWithNewDifferentKeyInChain ( [ Values ] StartAddressMode startAddressMode , [ Values ( FlushMode . NoFlush , FlushMode . OnDisk ) ] FlushMode secondRecordFlushMode )
306+ {
307+ const int valueMult = 1000 ;
308+
309+ using var session = fht . For ( new PendingReadFunctions < ContextStruct > ( ) ) . NewSession < PendingReadFunctions < ContextStruct > > ( ) ;
310+
311+ // Store off startAddress before initial upsert
312+ var startAddress = startAddressMode == StartAddressMode . UseStartAddress ? fht . Log . TailAddress : Constants . kInvalidAddress ;
313+
314+ // Insert first record
315+ var firstValue = 0 ; // same as key
316+ var keyStruct = new KeyStruct { kfield1 = firstValue , kfield2 = firstValue * valueMult } ;
317+ var valueStruct = new ValueStruct { vfield1 = firstValue , vfield2 = firstValue * valueMult } ;
318+ session . Upsert ( ref keyStruct , ref valueStruct ) ;
319+
320+ // Force collisions to test having another key in the chain
321+ comparer . forceCollisionHash = keyStruct . GetHashCode64 ( ref keyStruct ) ;
322+
323+ // Flush to make the Read() go pending.
324+ fht . Log . FlushAndEvict ( wait : true ) ;
325+
326+ ReadOptions readOptions = new ( ) { StartAddress = startAddress } ;
327+ var ( status , outputStruct ) = session . Read ( keyStruct , ref readOptions ) ;
328+ Assert . IsTrue ( status . IsPending , $ "Expected status.IsPending: { status } ") ;
329+
330+ // Insert next record with a different key and flush this too if requested.
331+ var secondValue = firstValue + 1 ;
332+ keyStruct = new ( ) { kfield1 = secondValue , kfield2 = secondValue * valueMult } ;
333+ valueStruct = new ( ) { vfield1 = secondValue , vfield2 = secondValue * valueMult } ;
334+ session . Upsert ( ref keyStruct , ref valueStruct ) ;
335+ if ( secondRecordFlushMode == FlushMode . OnDisk )
336+ fht . Log . FlushAndEvict ( wait : true ) ;
337+
338+ session . CompletePendingWithOutputs ( out var completedOutputs , wait : true ) ;
339+ ( status , outputStruct ) = GetSinglePendingResult ( completedOutputs ) ;
340+
341+ Assert . AreEqual ( firstValue * valueMult , outputStruct . value . vfield2 , "Should have returned first value" ) ;
342+ }
343+
344+ [ Test ]
345+ [ Category ( "FasterKV" ) ]
346+ public void ReadPendingWithNoNewKey ( [ Values ] StartAddressMode startAddressMode )
347+ {
348+ // Basic test of pending read
349+ const int valueMult = 1000 ;
350+
351+ using var session = fht . For ( new PendingReadFunctions < ContextStruct > ( ) ) . NewSession < PendingReadFunctions < ContextStruct > > ( ) ;
352+
353+ // Store off startAddress before initial upsert
354+ var startAddress = startAddressMode == StartAddressMode . UseStartAddress ? fht . Log . TailAddress : Constants . kInvalidAddress ;
355+
356+ // Insert first record
357+ var firstValue = 0 ; // same as key
358+ var keyStruct = new KeyStruct { kfield1 = firstValue , kfield2 = firstValue * valueMult } ;
359+ var valueStruct = new ValueStruct { vfield1 = firstValue , vfield2 = firstValue * valueMult } ;
360+ session . Upsert ( ref keyStruct , ref valueStruct ) ;
361+
362+ // Flush to make the Read() go pending.
363+ fht . Log . FlushAndEvict ( wait : true ) ;
364+
365+ ReadOptions readOptions = new ( ) { StartAddress = startAddress } ;
366+ var ( status , outputStruct ) = session . Read ( keyStruct , ref readOptions ) ;
367+ Assert . IsTrue ( status . IsPending , $ "Expected status.IsPending: { status } ") ;
368+
369+ session . CompletePendingWithOutputs ( out var completedOutputs , wait : true ) ;
370+ ( status , outputStruct ) = GetSinglePendingResult ( completedOutputs ) ;
371+
372+ Assert . AreEqual ( firstValue * valueMult , outputStruct . value . vfield2 , "Should have returned first value" ) ;
373+ }
217374 }
218375}
0 commit comments