@@ -519,18 +519,22 @@ defimpl Enumerable, for: GenEvent do
519
519
receive do
520
520
# The subscription process gave us a go.
521
521
{ :UP , ^ mon_ref , manager_pid } ->
522
- { mon_ref , manager_pid }
522
+ { mon_ref , mon_pid , manager_pid }
523
523
# The subscription process died due to an abnormal reason.
524
524
{ :DOWN , ^ mon_ref , _ , _ , reason } ->
525
525
exit ( { reason , { __MODULE__ , :start , [ stream ] } } )
526
526
end
527
527
end
528
528
529
- defp next ( % { timeout: timeout } = stream , { mon_ref , manager_pid } = acc ) do
529
+ defp next ( % { timeout: timeout } = stream , { mon_ref , mon_pid , manager_pid } = acc ) do
530
+ # If :DOWN is received must resend it to self so that stop/2 can receive it
531
+ # and know that the handler has been removed.
530
532
receive do
531
533
{ :DOWN , ^ mon_ref , _ , _ , :normal } ->
534
+ send ( self ( ) , { :DOWN , mon_ref , :process , mon_pid , :normal } )
532
535
nil
533
536
{ :DOWN , ^ mon_ref , _ , _ , reason } ->
537
+ send ( self ( ) , { :DOWN , mon_ref , :process , mon_pid , :normal } )
534
538
exit ( { reason , { __MODULE__ , :next , [ stream , acc ] } } )
535
539
{ ^ mon_ref , sync_ref , event } ->
536
540
{ { sync_ref , manager_pid , event } , acc }
@@ -540,9 +544,15 @@ defimpl Enumerable, for: GenEvent do
540
544
end
541
545
end
542
546
543
- defp stop ( % { id: id } , { mon_ref , manager_pid } ) do
544
- remove_handler ( mon_ref , manager_pid , id )
545
- flush_events ( mon_ref )
547
+ defp stop ( % { mode: mode } = stream , { mon_ref , mon_pid , manager_pid } = acc ) do
548
+ case remove_handler ( mon_ref , mon_pid , manager_pid ) do
549
+ :ok when mode == :async ->
550
+ flush_events ( mon_ref )
551
+ :ok ->
552
+ :ok
553
+ { :error , reason } ->
554
+ exit ( { reason , { __MODULE__ , :stop , [ stream , acc ] } } )
555
+ end
546
556
end
547
557
548
558
defp add_handler ( mode , manager , id , duration ) do
@@ -574,6 +584,10 @@ defimpl Enumerable, for: GenEvent do
574
584
{ :UP , ^ mon_ref , manager_pid } ->
575
585
send ( parent , { :UP , mon_ref , manager_pid } )
576
586
receive do
587
+ # The stream has finished, remove the handler.
588
+ { :DONE , ^ mon_ref } ->
589
+ exit_handler ( manager_pid , parent_ref , cancel )
590
+
577
591
# If the parent died, we can exit normally.
578
592
{ :DOWN , ^ parent_ref , _ , _ , _ } ->
579
593
exit ( :normal )
@@ -599,36 +613,43 @@ defimpl Enumerable, for: GenEvent do
599
613
defp cancel_ref ( nil , mon_ref ) , do: mon_ref
600
614
defp cancel_ref ( id , mon_ref ) , do: { id , mon_ref }
601
615
602
- defp remove_handler ( mon_ref , manager_pid , id ) do
603
- Process . demonitor ( mon_ref , [ :flush ] )
604
- handler = { __MODULE__ , cancel_ref ( id , mon_ref ) }
605
-
606
- { _pid , ref } = spawn_monitor fn ->
607
- try do
608
- # handler may nolonger be there, if it is the removal will cause the monitor
609
- # process to exit. If this returns successfuly then no more events will be
610
- # forwarded.
611
- _ = :gen_event . delete_handler ( manager_pid , handler , :remove_handler )
612
- catch
613
- # Do not want to overide the exit reason of the mon_pid so catch errors.
614
- # However if the exit is due to a disconnection, exit because messages could
615
- # leak if the nodes are reconnected before the manager on the other node
616
- # removes the handler. In this case it is very likely that the mon_pid
617
- # exited with the same reason.
618
- :exit , reason when reason != { :nodedown , node ( manager_pid ) } ->
619
- :ok
620
- end
616
+ defp exit_handler ( manager_pid , parent_ref , cancel ) do
617
+ # Send exit signal so manager removes handler.
618
+ Process . exit ( manager_pid , :shutdown )
619
+ receive do
620
+ # If the parent died, we can exit normally.
621
+ { :DOWN , ^ parent_ref , _ , _ , _ } ->
622
+ exit ( :normal )
623
+
624
+ # Probably the reason is :shutdown, which occurs when the manager receives
625
+ # an exit signal from a handler supervising process. However whatever the
626
+ # reason the handler has been removed so it is ok.
627
+ { :gen_event_EXIT , { __MODULE__ , ^ cancel } , _ } ->
628
+ exit ( :normal )
629
+
630
+ # The connection broke, perhaps the handler might try to forward events
631
+ # before it removes the handler, so must exit abnormally.
632
+ { :EXIT , ^ manager_pid , :noconnection } ->
633
+ exit ( { :nodedown , node ( manager_pid ) } )
634
+
635
+ # The manager has exited but don't exit abnormally as the handler has died
636
+ # with the manager and all expected events have been handled. This is ok.
637
+ { :EXIT , ^ manager_pid , _ } ->
638
+ exit ( :normal )
621
639
end
640
+ end
622
641
642
+ defp remove_handler ( mon_ref , mon_pid , manager_pid ) do
643
+ send ( mon_pid , { :DONE , mon_ref } )
623
644
receive do
624
645
{ ^ mon_ref , sync , _ } when sync != nil ->
625
646
send ( manager_pid , { sync , :done } )
626
- Process . demonitor ( ref , [ :flush ] )
647
+ Process . demonitor ( mon_ref , [ :flush ] )
627
648
:ok
628
- { :DOWN , ^ ref , _ , _ , :normal } ->
649
+ { :DOWN , ^ mon_ref , _ , _ , :normal } ->
629
650
:ok
630
- { :DOWN , ^ ref , _ , _ , other } ->
631
- exit ( other )
651
+ { :DOWN , ^ mon_ref , _ , _ , reason } ->
652
+ { :error , reason }
632
653
end
633
654
end
634
655
0 commit comments