@@ -119,6 +119,8 @@ def update(time, old_value, new_value)
119
119
end
120
120
1
121
121
end
122
+ expect ( subject . await_for ( 1 ) ) . to eq true
123
+ expect ( subject ) . to_not be_failed
122
124
end
123
125
124
126
specify 'when the action raises an error the value will not change' do
@@ -372,7 +374,7 @@ def update(time, old_value, new_value)
372
374
373
375
subject . restart ( 42 )
374
376
latch . wait ( 0.1 )
375
- sleep ( 0.1 )
377
+ expect ( subject . await_for ( 1 ) ) . to eq true
376
378
expect ( subject . value ) . to eq expected
377
379
end
378
380
@@ -409,6 +411,7 @@ def update(time, old_value, new_value)
409
411
end
410
412
411
413
latch . wait ( 2 )
414
+ expect ( subject . await_for ( 1 ) ) . to eq true
412
415
expect ( actual ) . to eq expected
413
416
end
414
417
@@ -438,25 +441,31 @@ def update(time, old_value, new_value)
438
441
it 'returns true when the job is post' do
439
442
subject = Agent . new ( 0 )
440
443
expect ( subject . send { nil } ) . to be true
444
+ expect ( subject . await_for ( 1 ) ) . to eq true
441
445
end
442
446
443
447
it 'returns false when #failed?' do
444
448
subject = Agent . new ( 0 )
445
449
allow ( subject ) . to receive ( :failed? ) . and_return ( true )
446
450
expect ( subject . send { nil } ) . to be false
451
+ expect ( subject . await_for ( 1 ) ) . to eq true
447
452
end
448
453
449
454
it 'posts to the global fast executor' do
450
455
subject = Agent . new ( 0 )
451
456
expect ( subject ) . to receive ( :enqueue_action_job ) . with ( anything , anything , Concurrent . global_fast_executor ) . and_call_original
452
457
subject . send { nil }
458
+ expect ( subject . await_for ( 1 ) ) . to eq true
453
459
end
454
460
455
461
it 'does not wait for the action to process' do
456
462
job_done = false
457
463
subject = Agent . new ( 0 )
458
- subject . send { sleep ( 5 ) ; job_done = true }
464
+ latch = CountDownLatch . new
465
+ subject . send { latch . wait ; job_done = true }
459
466
expect ( job_done ) . to be false
467
+ latch . count_down
468
+ expect ( subject . await_for ( 1 ) ) . to eq true
460
469
end
461
470
end
462
471
@@ -465,6 +474,7 @@ def update(time, old_value, new_value)
465
474
it 'returns true when the job is post' do
466
475
subject = Agent . new ( 0 )
467
476
expect ( subject . send! { nil } ) . to be true
477
+ expect ( subject . await_for ( 1 ) ) . to eq true
468
478
end
469
479
470
480
it 'raises an error when #failed?' do
@@ -479,13 +489,17 @@ def update(time, old_value, new_value)
479
489
subject = Agent . new ( 0 )
480
490
expect ( subject ) . to receive ( :enqueue_action_job ) . with ( anything , anything , Concurrent . global_fast_executor ) . and_call_original
481
491
subject . send! { nil }
492
+ expect ( subject . await_for ( 1 ) ) . to eq true
482
493
end
483
494
484
495
it 'does not wait for the action to process' do
485
496
job_done = false
486
497
subject = Agent . new ( 0 )
487
- subject . send! { sleep ( 5 ) ; job_done = true }
498
+ latch = CountDownLatch . new
499
+ subject . send! { latch . wait ; job_done = true }
488
500
expect ( job_done ) . to be false
501
+ latch . count_down
502
+ expect ( subject . await_for ( 1 ) ) . to eq true
489
503
end
490
504
end
491
505
@@ -494,6 +508,7 @@ def update(time, old_value, new_value)
494
508
it 'returns true when the job is post' do
495
509
subject = Agent . new ( 0 )
496
510
expect ( subject . send_off { nil } ) . to be true
511
+ expect ( subject . await_for ( 1 ) ) . to eq true
497
512
end
498
513
499
514
it 'returns false when #failed?' do
@@ -506,13 +521,17 @@ def update(time, old_value, new_value)
506
521
subject = Agent . new ( 0 )
507
522
expect ( subject ) . to receive ( :enqueue_action_job ) . with ( anything , anything , Concurrent . global_io_executor ) . and_call_original
508
523
subject . send_off { nil }
524
+ expect ( subject . await_for ( 1 ) ) . to eq true
509
525
end
510
526
511
527
it 'does not wait for the action to process' do
512
528
job_done = false
513
529
subject = Agent . new ( 0 )
514
- subject . send_off { sleep ( 5 ) ; job_done = true }
530
+ latch = CountDownLatch . new
531
+ subject . send_off { latch . wait ; job_done = true }
515
532
expect ( job_done ) . to be false
533
+ latch . count_down
534
+ expect ( subject . await_for ( 1 ) ) . to eq true
516
535
end
517
536
end
518
537
@@ -521,6 +540,7 @@ def update(time, old_value, new_value)
521
540
it 'returns true when the job is post' do
522
541
subject = Agent . new ( 0 )
523
542
expect ( subject . send_off! { nil } ) . to be true
543
+ expect ( subject . await_for ( 1 ) ) . to eq true
524
544
end
525
545
526
546
it 'raises an error when #failed?' do
@@ -535,13 +555,17 @@ def update(time, old_value, new_value)
535
555
subject = Agent . new ( 0 )
536
556
expect ( subject ) . to receive ( :enqueue_action_job ) . with ( anything , anything , Concurrent . global_io_executor ) . and_call_original
537
557
subject . send_off! { nil }
558
+ expect ( subject . await_for ( 1 ) ) . to eq true
538
559
end
539
560
540
561
it 'does not wait for the action to process' do
541
562
job_done = false
542
563
subject = Agent . new ( 0 )
543
- subject . send_off! { sleep ( 5 ) ; job_done = true }
564
+ latch = CountDownLatch . new
565
+ subject . send_off! { latch . wait ; job_done = true }
544
566
expect ( job_done ) . to be false
567
+ latch . count_down
568
+ expect ( subject . await_for ( 1 ) ) . to eq true
545
569
end
546
570
end
547
571
@@ -563,13 +587,6 @@ def update(time, old_value, new_value)
563
587
subject = Agent . new ( 0 )
564
588
subject . send_via ( immediate ) { nil }
565
589
end
566
-
567
- it 'does not wait for the action to process' do
568
- job_done = false
569
- subject = Agent . new ( 0 )
570
- subject . send_via ( executor ) { sleep ( 5 ) ; job_done = true }
571
- expect ( job_done ) . to be false
572
- end
573
590
end
574
591
575
592
context 'with #send_via!' do
@@ -592,20 +609,14 @@ def update(time, old_value, new_value)
592
609
subject = Agent . new ( 0 )
593
610
subject . send_via! ( immediate ) { nil }
594
611
end
595
-
596
- it 'does not wait for the action to process' do
597
- job_done = false
598
- subject = Agent . new ( 0 )
599
- subject . send_via! ( executor ) { sleep ( 5 ) ; job_done = true }
600
- expect ( job_done ) . to be false
601
- end
602
612
end
603
613
604
614
context 'with #post' do
605
615
606
616
it 'returns true when the job is post' do
607
617
subject = Agent . new ( 0 )
608
618
expect ( subject . post { nil } ) . to be true
619
+ expect ( subject . await_for ( 1 ) ) . to eq true
609
620
end
610
621
611
622
it 'returns false when #failed?' do
@@ -618,13 +629,17 @@ def update(time, old_value, new_value)
618
629
subject = Agent . new ( 0 )
619
630
expect ( subject ) . to receive ( :enqueue_action_job ) . with ( anything , anything , Concurrent . global_io_executor ) . and_call_original
620
631
subject . post { nil }
632
+ expect ( subject . await_for ( 1 ) ) . to eq true
621
633
end
622
634
623
635
it 'does not wait for the action to process' do
624
636
job_done = false
625
637
subject = Agent . new ( 0 )
626
- subject . post { sleep ( 5 ) ; job_done = true }
638
+ latch = CountDownLatch . new
639
+ subject . post { latch . wait ; job_done = true }
627
640
expect ( job_done ) . to be false
641
+ latch . count_down
642
+ expect ( subject . await_for ( 1 ) ) . to eq true
628
643
end
629
644
end
630
645
@@ -633,6 +648,7 @@ def update(time, old_value, new_value)
633
648
it 'returns self when the job is post' do
634
649
subject = Agent . new ( 0 )
635
650
expect ( subject << proc { nil } ) . to be subject
651
+ expect ( subject . await_for ( 1 ) ) . to eq true
636
652
end
637
653
638
654
it 'returns self when #failed?' do
@@ -642,16 +658,20 @@ def update(time, old_value, new_value)
642
658
end
643
659
644
660
it 'posts to the global io executor' do
645
- expect ( Concurrent . global_io_executor ) . to receive ( :post ) . with ( any_args ) . and_call_original
646
661
subject = Agent . new ( 0 )
662
+ expect ( subject ) . to receive ( :enqueue_action_job ) . with ( anything , anything , Concurrent . global_io_executor ) . and_call_original
647
663
subject << proc { nil }
664
+ expect ( subject . await_for ( 1 ) ) . to eq true
648
665
end
649
666
650
667
it 'does not wait for the action to process' do
651
668
job_done = false
652
669
subject = Agent . new ( 0 )
653
- subject << proc { sleep ( 5 ) ; job_done = true }
670
+ latch = CountDownLatch . new
671
+ subject << proc { latch . wait ; job_done = true }
654
672
expect ( job_done ) . to be false
673
+ latch . count_down
674
+ expect ( subject . await_for ( 1 ) ) . to eq true
655
675
end
656
676
end
657
677
end
@@ -707,6 +727,7 @@ def update(time, old_value, new_value)
707
727
subject . restart ( 42 , clear_actions : true )
708
728
result = end_latch . wait ( 0.1 )
709
729
expect ( result ) . to be false
730
+ expect ( subject . await_for ( 1 ) ) . to eq true
710
731
end
711
732
712
733
it 'does not clear the action queue when :clear_actions is false' do
@@ -723,6 +744,7 @@ def update(time, old_value, new_value)
723
744
subject . restart ( 42 , clear_actions : false )
724
745
result = end_latch . wait ( 3 )
725
746
expect ( result ) . to be true
747
+ expect ( subject . await_for ( 1 ) ) . to eq true
726
748
end
727
749
728
750
it 'does not clear the action queue when :clear_actions is not given' do
@@ -739,6 +761,7 @@ def update(time, old_value, new_value)
739
761
subject . restart ( 42 )
740
762
result = end_latch . wait ( 3 )
741
763
expect ( result ) . to be true
764
+ expect ( subject . await_for ( 1 ) ) . to eq true
742
765
end
743
766
744
767
it 'resumes action processing if actions are enqueued' do
@@ -759,6 +782,7 @@ def update(time, old_value, new_value)
759
782
760
783
subject . restart ( 42 , clear_actions : false )
761
784
expect ( finish_latch . wait ( 5 ) ) . to be true
785
+ expect ( subject . await_for ( 1 ) ) . to eq true
762
786
end
763
787
764
788
it 'does not trigger observation' do
@@ -876,14 +900,16 @@ def update(time, old_value, new_value)
876
900
877
901
it 'does not block on actions from other threads' do
878
902
latch = Concurrent ::CountDownLatch . new
903
+ finish = Concurrent ::CountDownLatch . new
879
904
subject = Agent . new ( 0 )
880
905
in_thread do
881
- subject . send_via ( executor ) { sleep }
906
+ subject . send_via ( executor ) { finish . wait }
882
907
latch . count_down
883
908
end
884
909
885
910
latch . wait ( 0.1 )
886
- expect ( subject . await ) . to be_truthy
911
+ expect ( subject . await_for ( 1 ) ) . to eq true
912
+ finish . count_down
887
913
end
888
914
889
915
it 'blocks indefinitely' do
@@ -930,14 +956,16 @@ def update(time, old_value, new_value)
930
956
931
957
it 'does not block on actions from other threads' do
932
958
latch = Concurrent ::CountDownLatch . new
959
+ finish = Concurrent ::CountDownLatch . new
933
960
subject = Agent . new ( 0 )
934
961
in_thread do
935
- subject . send_via ( executor ) { sleep }
962
+ subject . send_via ( executor ) { finish . wait }
936
963
latch . count_down
937
964
end
938
965
939
966
latch . wait ( 0.1 )
940
967
expect ( subject . await_for ( 0.1 ) ) . to be true
968
+ finish . count_down
941
969
end
942
970
943
971
it 'returns true when all prior actions have processed' , notravis : true do
@@ -952,6 +980,7 @@ def update(time, old_value, new_value)
952
980
subject . send_via ( executor ) { sleep ( 1 ) }
953
981
5 . times { subject . send_via ( executor ) { nil } }
954
982
expect ( subject . await_for ( 0.1 ) ) . to be false
983
+ expect ( subject . await_for ( 5 ) ) . to eq true
955
984
end
956
985
957
986
it 'returns false if restarted with :clear_actions true' , notravis : true do
@@ -978,14 +1007,16 @@ def update(time, old_value, new_value)
978
1007
979
1008
it 'does not block on actions from other threads' do
980
1009
latch = Concurrent ::CountDownLatch . new
1010
+ finish = Concurrent ::CountDownLatch . new
981
1011
subject = Agent . new ( 0 )
982
1012
in_thread do
983
- subject . send_via ( executor ) { sleep }
1013
+ subject . send_via ( executor ) { finish . wait }
984
1014
latch . count_down
985
1015
end
986
1016
987
1017
latch . wait ( 0.1 )
988
1018
expect ( subject . await_for! ( 0.1 ) ) . to be true
1019
+ finish . count_down
989
1020
end
990
1021
991
1022
it 'returns true when all prior actions have processed' do
@@ -1002,6 +1033,7 @@ def update(time, old_value, new_value)
1002
1033
expect {
1003
1034
subject . await_for! ( 0.1 )
1004
1035
} . to raise_error ( Concurrent ::TimeoutError )
1036
+ expect ( subject . await_for ( 5 ) ) . to eq true
1005
1037
end
1006
1038
1007
1039
it 'raises an error if restarted with :clear_actions true' , notravis : true do
@@ -1034,14 +1066,16 @@ def update(time, old_value, new_value)
1034
1066
1035
1067
it 'does not block on actions from other threads' do
1036
1068
latch = Concurrent ::CountDownLatch . new
1069
+ finish = Concurrent ::CountDownLatch . new
1037
1070
subject = Agent . new ( 0 )
1038
1071
in_thread do
1039
- subject . send_via ( executor ) { sleep }
1072
+ subject . send_via ( executor ) { finish . wait }
1040
1073
latch . count_down
1041
1074
end
1042
1075
1043
1076
latch . wait ( 0.1 )
1044
1077
expect ( subject . wait ( 0.1 ) ) . to be true
1078
+ finish . count_down
1045
1079
end
1046
1080
1047
1081
it 'blocks indefinitely when timeout is nil' do
@@ -1082,6 +1116,7 @@ def update(time, old_value, new_value)
1082
1116
subject . send_via ( executor ) { sleep ( 1 ) }
1083
1117
5 . times { subject . send_via ( executor ) { nil } }
1084
1118
expect ( subject . wait ( 0.1 ) ) . to be false
1119
+ expect ( subject . wait ( 5 ) ) . to eq true
1085
1120
end
1086
1121
1087
1122
it 'returns false when timeout is given and restarted with :clear_actions true' , notravis : true do
@@ -1133,6 +1168,7 @@ def update(time, old_value, new_value)
1133
1168
agents . each { |agent | agent . send_via ( executor ) { sleep ( 0.3 ) } }
1134
1169
ok = Agent . await_for ( 0.1 , *agents )
1135
1170
expect ( ok ) . to be false
1171
+ expect ( Agent . await_for! ( 1 , *agents ) ) . to eq true
1136
1172
end
1137
1173
end
1138
1174
@@ -1159,6 +1195,7 @@ def update(time, old_value, new_value)
1159
1195
expect {
1160
1196
Agent . await_for! ( 0.1 , *agents )
1161
1197
} . to raise_error ( Concurrent ::TimeoutError )
1198
+ expect ( Agent . await_for! ( 1 , *agents ) ) . to eq true
1162
1199
end
1163
1200
end
1164
1201
end
0 commit comments