@@ -42,8 +42,8 @@ public class TestFixture<TEntryPoint> : IAsyncLifetime
4242 where TEntryPoint : class
4343{
4444 private readonly WebApplicationFactory < TEntryPoint > _factory ;
45- private int Timeout => 300 ; // Second
46- private ITestHarness TestHarness => ServiceProvider ? . GetTestHarness ( ) ;
45+ private int Timeout => 120 ; // Second
46+ public ITestHarness TestHarness => ServiceProvider ? . GetTestHarness ( ) ;
4747 private Action < IServiceCollection > TestRegistrationServices { get ; set ; }
4848 private PostgreSqlContainer PostgresTestcontainer ;
4949 private PostgreSqlContainer PostgresPersistTestContainer ;
@@ -136,10 +136,14 @@ public async Task InitializeAsync()
136136 {
137137 CancellationTokenSource = new CancellationTokenSource ( ) ;
138138 await StartTestContainerAsync ( ) ;
139+
140+ await TestHarness . Start ( ) ;
139141 }
140142
141143 public async Task DisposeAsync ( )
142144 {
145+ await TestHarness . Stop ( ) ;
146+
143147 await StopTestContainerAsync ( ) ;
144148 await _factory . DisposeAsync ( ) ;
145149 await CancellationTokenSource . CancelAsync ( ) ;
@@ -201,83 +205,71 @@ public Task SendAsync(IRequest request)
201205 public async Task Publish < TMessage > ( TMessage message , CancellationToken cancellationToken = default )
202206 where TMessage : class , IEvent
203207 {
208+ // Use harness bus to ensure publish happens only after the bus is started.
204209 await TestHarness . Bus . Publish ( message , cancellationToken ) ;
205210 }
206211
207212 public async Task < bool > WaitForPublishing < TMessage > ( CancellationToken cancellationToken = default )
208213 where TMessage : class , IEvent
209214 {
210- var result = await WaitUntilConditionMet ( async ( ) =>
211- {
212- var published = await TestHarness . Published . Any < TMessage > ( cancellationToken ) ;
213-
214- return published ;
215- } ) ;
216-
217- return result ;
218- }
219-
220- public async Task < bool > WaitForConsuming < TMessage > ( CancellationToken cancellationToken = default )
221- where TMessage : class , IEvent
222- {
223- var result = await WaitUntilConditionMet ( async ( ) =>
224- {
225- var consumed = await TestHarness . Consumed . Any < TMessage > ( cancellationToken ) ;
215+ var result = await WaitUntilConditionMet (
216+ async ( ) =>
217+ {
218+ var published = await TestHarness . Published . Any < TMessage > ( cancellationToken ) ;
226219
227- return consumed ;
228- } ) ;
220+ return published ;
221+ } ,
222+ cancellationToken : cancellationToken
223+ ) ;
229224
230225 return result ;
231226 }
232227
233- public async Task < bool > ShouldProcessedPersistInternalCommand < TInternalCommand > (
228+ public Task < bool > WaitUntilAsync (
229+ Func < Task < bool > > condition ,
230+ TimeSpan ? timeout = null ,
231+ TimeSpan ? pollInterval = null ,
234232 CancellationToken cancellationToken = default
235233 )
236- where TInternalCommand : class , IInternalCommand
237234 {
238- var result = await WaitUntilConditionMet ( async ( ) =>
239- {
240- return await ExecuteScopeAsync ( async sp =>
241- {
242- var persistMessageProcessor = sp . GetService < IPersistMessageProcessor > ( ) ;
235+ var effectiveTimeout = timeout ?? TimeSpan . FromSeconds ( Timeout ) ;
236+ var effectivePollInterval = pollInterval ?? TimeSpan . FromMilliseconds ( 200 ) ;
243237
244- Guard . Against . Null ( persistMessageProcessor , nameof ( persistMessageProcessor ) ) ;
245-
246- var filter = await persistMessageProcessor . GetByFilterAsync ( x =>
247- x . DeliveryType == MessageDeliveryType . Internal && typeof ( TInternalCommand ) . ToString ( ) == x . DataType
248- ) ;
249-
250- var res = filter . Any ( x => x . MessageStatus == MessageStatus . Processed ) ;
251-
252- return res ;
253- } ) ;
254- } ) ;
255-
256- return result ;
238+ return WaitUntilConditionMet (
239+ conditionToMet : async ( ) =>
240+ {
241+ cancellationToken . ThrowIfCancellationRequested ( ) ;
242+ return await condition ( ) ;
243+ } ,
244+ timeoutSecond : ( int ) Math . Ceiling ( effectiveTimeout . TotalSeconds ) ,
245+ pollInterval : effectivePollInterval ,
246+ cancellationToken : cancellationToken
247+ ) ;
257248 }
258249
259250 // Ref: https://tech.energyhelpline.com/in-memory-testing-with-masstransit/
260- private async Task < bool > WaitUntilConditionMet ( Func < Task < bool > > conditionToMet , int ? timeoutSecond = null )
251+ private async Task < bool > WaitUntilConditionMet (
252+ Func < Task < bool > > conditionToMet ,
253+ int ? timeoutSecond = null ,
254+ TimeSpan ? pollInterval = null ,
255+ CancellationToken cancellationToken = default
256+ )
261257 {
262258 var time = timeoutSecond ?? Timeout ;
259+ var delay = pollInterval ?? TimeSpan . FromMilliseconds ( 100 ) ;
263260
264- var startTime = DateTime . Now ;
265- var timeoutExpired = false ;
266- var meet = await conditionToMet . Invoke ( ) ;
267-
268- while ( ! meet )
261+ var startTime = DateTime . UtcNow ;
262+ while ( DateTime . UtcNow - startTime <= TimeSpan . FromSeconds ( time ) )
269263 {
270- if ( timeoutExpired )
271- {
272- return false ;
273- }
264+ cancellationToken . ThrowIfCancellationRequested ( ) ;
265+
266+ if ( await conditionToMet . Invoke ( ) )
267+ return true ;
274268
275- await Task . Delay ( 100 ) ;
276- meet = await conditionToMet . Invoke ( ) ;
277- timeoutExpired = DateTime . Now - startTime > TimeSpan . FromSeconds ( time ) ;
269+ await Task . Delay ( delay , cancellationToken ) ;
278270 }
279271
280- return true ;
272+ return false ;
281273 }
282274
283275 private async Task StartTestContainerAsync ( )
@@ -288,17 +280,14 @@ private async Task StartTestContainerAsync()
288280 MongoDbTestContainer = TestContainers . MongoTestContainer ( ) ;
289281 EventStoreDbTestContainer = TestContainers . EventStoreTestContainer ( ) ;
290282
291- // Start containers in parallel for speed
292283 await Task . WhenAll (
293284 MongoDbTestContainer . StartAsync ( ) ,
294285 PostgresTestcontainer . StartAsync ( ) ,
295286 PostgresPersistTestContainer . StartAsync ( ) ,
296287 EventStoreDbTestContainer . StartAsync ( )
297288 ) ;
298289
299- // Start RabbitMQ last and wait extra time
300290 await RabbitMqTestContainer . StartAsync ( ) ;
301- await Task . Delay ( 5000 ) ; // Give RabbitMQ extra time to initialize
302291 }
303292
304293 private async Task StopTestContainerAsync ( )
@@ -321,7 +310,7 @@ private void AddCustomAppSettings(IConfigurationBuilder configuration)
321310 new ( "PostgresOptions:ConnectionString:Identity" , PostgresTestcontainer . GetConnectionString ( ) ) ,
322311 new ( "PostgresOptions:ConnectionString:Passenger" , PostgresTestcontainer . GetConnectionString ( ) ) ,
323312 new ( "PersistMessageOptions:ConnectionString" , PostgresPersistTestContainer . GetConnectionString ( ) ) ,
324- new ( "RabbitMqOptions:HostName" , RabbitMqTestContainer . Hostname ) ,
313+ new ( "RabbitMqOptions:HostName" , "127.0.0.1" ) ,
325314 new ( "RabbitMqOptions:UserName" , TestContainers . RabbitMqContainerConfiguration . UserName ) ,
326315 new ( "RabbitMqOptions:Password" , TestContainers . RabbitMqContainerConfiguration . Password ) ,
327316 new (
0 commit comments