@@ -270,5 +270,49 @@ [I] public void WaitBulkAllThrowsAndIsCaught()
270270 e . Message . Should ( ) . Be ( "boom" ) ;
271271 }
272272 }
273+
274+ [ I ]
275+ public void ForEachAsyncReleasesProcessedItemsInMemory ( )
276+ {
277+ WeakReference < SmallObject > deallocReference = null ;
278+ SmallObject obj = null ;
279+
280+ var lazyCollection = GetLazyCollection (
281+ weakRef => deallocReference = weakRef ,
282+ delegate { } , //...
283+ delegate { } , //Making sure that all of the objects have gone through pipeline
284+ delegate { } , //so that the first one can be deallocated
285+ delegate { } , //Various GC roots prevent several of previous (2 or 3)
286+ delegate { } , //items in the lazy Enumerable from deallocation during forced GC
287+ delegate { } , //...
288+ delegate {
289+ GC . Collect ( 2 , GCCollectionMode . Forced , true ) ;
290+ deallocReference . TryGetTarget ( out obj ) ;
291+ }
292+ ) ;
293+
294+ var index = CreateIndexName ( ) ;
295+ var observableBulk = this . _client . BulkAll ( lazyCollection , f => f
296+ . MaxDegreeOfParallelism ( 1 )
297+ . Size ( 1 )
298+ . Index ( index )
299+ . BufferToBulk ( ( r , buffer ) => r . IndexMany ( buffer ) ) ) ;
300+
301+ observableBulk . Wait ( TimeSpan . FromSeconds ( 30 ) , delegate { } ) ;
302+
303+ deallocReference . Should ( ) . NotBeNull ( ) ;
304+ obj . Should ( ) . BeNull ( ) ;
305+ }
306+
307+ private IEnumerable < SmallObject > GetLazyCollection ( params Action < WeakReference < SmallObject > > [ ] getFirstObjectCallBack )
308+ {
309+ var counter = 0 ;
310+ foreach ( var callback in getFirstObjectCallBack )
311+ {
312+ var obj = new SmallObject { Id = ++ counter } ;
313+ callback ( new WeakReference < SmallObject > ( obj ) ) ;
314+ yield return obj ;
315+ }
316+ }
273317 }
274318}
0 commit comments