@@ -80,7 +80,8 @@ defimpl Enumerable, for: GenEvent.Stream do
80
80
81
81
defp start ( % { manager: manager } = stream ) do
82
82
try do
83
- { :ok , { pid , ref } } = :gen . call ( manager , self ( ) , { :add_process_handler , self ( ) , self ( ) } , :infinity )
83
+ { :ok , { pid , ref } } = :gen . call ( manager , self ( ) ,
84
+ { :add_process_handler , self ( ) , self ( ) , nil } , :infinity )
84
85
mon_ref = Process . monitor ( pid )
85
86
{ pid , ref , mon_ref }
86
87
catch
@@ -92,19 +93,6 @@ defimpl Enumerable, for: GenEvent.Stream do
92
93
self = self ( )
93
94
94
95
receive do
95
- # The handler was removed. Stop iteration, resolve the
96
- # event later. We need to demonitor now, otherwise DOWN
97
- # appears with higher priority in the shutdown process.
98
- { :gen_event_EXIT , ^ self , _reason } = event ->
99
- Process . demonitor ( mon_ref , [ :flush ] )
100
- send ( self , event )
101
- { :halt , { :removed , acc } }
102
-
103
- # The manager died. Stop iteration, resolve the event later.
104
- { :DOWN , ^ mon_ref , _ , _ , _ } = event ->
105
- send ( self , event )
106
- { :halt , { :removed , acc } }
107
-
108
96
# Got an async event.
109
97
{ _from , { ^ pid , ^ ref } , { :notify , event } } ->
110
98
{ [ { :async , pid , ref , event } ] , acc }
@@ -116,6 +104,19 @@ defimpl Enumerable, for: GenEvent.Stream do
116
104
# Got an ack event.
117
105
{ _from , { ^ pid , ^ ref } , { :ack_notify , event } } ->
118
106
{ [ { :ack , pid , ref , event } ] , acc }
107
+
108
+ # The handler was removed. Stop iteration, resolve the
109
+ # event later. We need to demonitor now, otherwise DOWN
110
+ # appears with higher priority in the shutdown process.
111
+ { :gen_event_EXIT , { ^ pid , ^ ref } , _reason } = event ->
112
+ Process . demonitor ( mon_ref , [ :flush ] )
113
+ send ( self , event )
114
+ { :halt , { :removed , acc } }
115
+
116
+ # The manager died. Stop iteration, resolve the event later.
117
+ { :DOWN , ^ mon_ref , _ , _ , _ } = event ->
118
+ send ( self , event )
119
+ { :halt , { :removed , acc } }
119
120
after
120
121
timeout ->
121
122
exit ( { :timeout , { __MODULE__ , :next , [ stream , acc ] } } )
@@ -135,26 +136,20 @@ defimpl Enumerable, for: GenEvent.Stream do
135
136
136
137
# If we reach this branch, the handler was not removed yet,
137
138
# so we trigger a request for doing so.
138
- defp stop ( stream , { pid , _ , _ } = acc ) do
139
- parent = self ( )
140
- _ = Task . start ( fn -> GenEvent . remove_handler ( pid , parent , :shutdown ) end )
139
+ defp stop ( stream , { pid , ref , _ } = acc ) do
140
+ _ = GenEvent . remove_handler ( pid , { pid , ref } , :shutdown )
141
141
stop ( stream , { :removed , acc } )
142
142
end
143
143
144
144
defp wait_for_handler_removal ( pid , ref , mon_ref ) do
145
- self = self ( )
146
-
147
145
receive do
148
- { _from , { ^ pid , ^ ref } , { notify , _event } } when notify in [ :ack_notify , :sync_notify ] ->
149
- send pid , { ref , :done }
150
- wait_for_handler_removal ( pid , ref , mon_ref )
151
- { :gen_event_EXIT , ^ self , reason }
146
+ { :gen_event_EXIT , { ^ pid , ^ ref } , reason }
152
147
when reason == :normal
153
148
when reason == :shutdown
154
149
when tuple_size ( reason ) == 3 and elem ( reason , 0 ) == :swapped ->
155
150
Process . demonitor ( mon_ref , [ :flush ] )
156
151
:ok
157
- { :gen_event_EXIT , ^ self , reason } ->
152
+ { :gen_event_EXIT , { ^ pid , ^ ref } , reason } ->
158
153
Process . demonitor ( mon_ref , [ :flush ] )
159
154
{ :error , reason }
160
155
{ :DOWN , ^ mon_ref , _ , _ , reason } ->
@@ -164,7 +159,7 @@ defimpl Enumerable, for: GenEvent.Stream do
164
159
165
160
defp flush_events ( ref ) do
166
161
receive do
167
- { _from , { _pid , ^ ref } , { : notify, _event } } ->
162
+ { _from , { _pid , ^ ref } , { notify , _event } } when notify in [ :notify , :ack_notify , :sync_notify ] ->
168
163
flush_events ( ref )
169
164
after
170
165
0 -> :ok
0 commit comments