@@ -16,23 +16,21 @@ defmodule AgentForge.Flow do
1616 @ type flow :: ( Signal . t ( ) , map ( ) -> { term ( ) , map ( ) } ) | [ ( Signal . t ( ) , map ( ) -> { term ( ) , map ( ) } ) ]
1717
1818 def process ( handlers , signal , state ) when is_list ( handlers ) do
19- try do
20- # Call process_with_limits with default option to not return statistics
21- # This ensures backward compatibility with existing code
22- case process_with_limits ( handlers , signal , state , return_stats: false ) do
23- { :ok , result , new_state } ->
24- { :ok , result , new_state }
25-
26- { :error , reason , _state } ->
27- # Maintain original error format for backward compatibility
28- { :error , reason }
29-
30- other ->
31- other
32- end
33- catch
34- _kind , error -> { :error , "Flow processing error: #{ inspect ( error ) } " }
19+ # Call process_with_limits with default option to not return statistics
20+ # This ensures backward compatibility with existing code
21+ case process_with_limits ( handlers , signal , state , return_stats: false ) do
22+ { :ok , result , new_state } ->
23+ { :ok , result , new_state }
24+
25+ { :error , reason , _state } ->
26+ # Maintain original error format for backward compatibility
27+ { :error , reason }
28+
29+ other ->
30+ other
3531 end
32+ rescue
33+ error -> { :error , "Flow processing error: #{ inspect ( error ) } " }
3634 end
3735
3836 def get_last_execution_stats , do: Process . get ( @ last_execution_stats_key )
@@ -69,27 +67,15 @@ defmodule AgentForge.Flow do
6967 """
7068 def process_with_limits ( handlers , signal , state , opts \\ [ ] ) when is_list ( handlers ) do
7169 # Extract options
72- timeout_ms = Keyword . get ( opts , :timeout_ms , 30000 )
70+ timeout_ms = Keyword . get ( opts , :timeout_ms , 30_000 )
7371 collect_stats = Keyword . get ( opts , :collect_stats , true )
7472 return_stats = Keyword . get ( opts , :return_stats , false )
7573
7674 # Initialize statistics if enabled
7775 stats = if collect_stats , do: ExecutionStats . new ( ) , else: nil
7876
7977 # Create a task to process the signal with timeout
80- # Use try-catch to wrap processing logic to ensure exceptions are properly caught
81- task =
82- Task . async ( fn ->
83- try do
84- process_with_stats ( handlers , signal , state , stats )
85- catch
86- # Explicitly catch exceptions and convert them to appropriate error results
87- _kind , error ->
88- error_message = "Flow processing error: #{ inspect ( error ) } "
89- error_result = { :error , error_message , state , stats }
90- { error_result , stats }
91- end
92- end )
78+ task = Task . async ( fn -> execute_flow_safely ( handlers , signal , state , stats ) end )
9379
9480 # Wait for the task to complete or timeout
9581 case Task . yield ( task , timeout_ms ) || Task . shutdown ( task ) do
@@ -120,33 +106,12 @@ defmodule AgentForge.Flow do
120106 updated_stats =
121107 ExecutionStats . record_step ( current_stats , handler , current_signal , current_state )
122108
123- # Process handler
124- case process_handler ( handler , current_signal , current_state ) do
125- { { :emit , new_signal } , new_state } ->
126- { :cont , { :ok , new_signal , new_state , updated_stats } }
127-
128- { { :emit_many , signals } , new_state } when is_list ( signals ) ->
129- # When multiple signals are emitted, use the last one for continuation
130- { :cont , { :ok , List . last ( signals ) , new_state , updated_stats } }
131-
132- { :skip , new_state } ->
133- { :halt , { :ok , nil , new_state , updated_stats } }
134-
135- { :halt , data } ->
136- { :halt , { :ok , data , current_state , updated_stats } }
137-
138- { { :halt , data } , _state } ->
139- { :halt , { :ok , data , current_state , updated_stats } }
140-
141- { { :error , reason } , new_state } ->
142- { :halt , { :error , reason , new_state , updated_stats } }
143-
144- { other , _ } ->
145- raise "Invalid handler result: #{ inspect ( other ) } "
146-
147- other ->
148- raise "Invalid handler result: #{ inspect ( other ) } "
149- end
109+ # Process handler and handle result
110+ handle_process_result (
111+ process_handler ( handler , current_signal , current_state ) ,
112+ current_state ,
113+ updated_stats
114+ )
150115 end )
151116
152117 # Extract stats from result
@@ -307,4 +272,49 @@ defmodule AgentForge.Flow do
307272 { :skip , Map . put ( state , key , signal . data ) }
308273 end
309274 end
275+
276+ # Safely executes a flow with exception handling
277+ defp execute_flow_safely ( handlers , signal , state , stats ) do
278+ process_with_stats ( handlers , signal , state , stats )
279+ rescue
280+ error ->
281+ error_message = "Flow processing error: #{ inspect ( error ) } "
282+ error_result = { :error , error_message , state , stats }
283+ { error_result , stats }
284+ catch
285+ _kind , error ->
286+ error_message = "Flow processing error: #{ inspect ( error ) } "
287+ error_result = { :error , error_message , state , stats }
288+ { error_result , stats }
289+ end
290+
291+ # Helper function to handle results from process_handler
292+ defp handle_process_result ( result , current_state , updated_stats ) do
293+ case result do
294+ { { :emit , new_signal } , new_state } ->
295+ { :cont , { :ok , new_signal , new_state , updated_stats } }
296+
297+ { { :emit_many , signals } , new_state } when is_list ( signals ) ->
298+ # When multiple signals are emitted, use the last one for continuation
299+ { :cont , { :ok , List . last ( signals ) , new_state , updated_stats } }
300+
301+ { :skip , new_state } ->
302+ { :halt , { :ok , nil , new_state , updated_stats } }
303+
304+ { :halt , data } ->
305+ { :halt , { :ok , data , current_state , updated_stats } }
306+
307+ { { :halt , data } , _state } ->
308+ { :halt , { :ok , data , current_state , updated_stats } }
309+
310+ { { :error , reason } , new_state } ->
311+ { :halt , { :error , reason , new_state , updated_stats } }
312+
313+ { other , _ } ->
314+ raise "Invalid handler result: #{ inspect ( other ) } "
315+
316+ other ->
317+ raise "Invalid handler result: #{ inspect ( other ) } "
318+ end
319+ end
310320end
0 commit comments