diff --git a/lib/agent_forge.ex b/lib/agent_forge.ex index cc0298e..1731138 100644 --- a/lib/agent_forge.ex +++ b/lib/agent_forge.ex @@ -70,6 +70,64 @@ defmodule AgentForge do Runtime.configure_stateful(handlers, opts) end + @doc """ + Processes a flow with execution limits. + This can prevent infinite loops and long-running processing. + + ## Options + + * `:max_steps` - Maximum number of steps to execute (default: :infinity) + * `:timeout` - Maximum execution time in milliseconds (default: :infinity) + * `:collect_stats` - Whether to collect execution statistics (default: true) + * `:return_stats` - Whether to return statistics in the result (default: false) + + ## Examples + + iex> handlers = [ + ...> fn _signal, state -> {{:emit, AgentForge.Signal.new(:done, "Success")}, state} end + ...> ] + iex> {:ok, result, _} = AgentForge.process_with_limits(handlers, AgentForge.Signal.new(:test, "data"), %{}) + iex> result.data + "Success" + """ + @spec process_with_limits( + # handler functions + list(function()), + # input signal + Signal.t(), + # initial state + map(), + # options + keyword() + ) :: + {:ok, Signal.t() | term(), term()} + | {:ok, Signal.t() | term(), term(), AgentForge.ExecutionStats.t()} + | {:error, term()} + | {:error, term(), map()} + | {:error, term(), AgentForge.ExecutionStats.t()} + def process_with_limits(handlers, signal, initial_state, opts \\ []) do + # Process using the Flow module's implementation directly + # This ensures that the implementation matches the signature in AgentForge + AgentForge.Flow.process_with_limits(handlers, signal, initial_state, opts) + end + + @doc """ + Gets statistics from the last flow execution. + Returns nil if no flow has been executed yet or statistics collection was disabled. + + ## Examples + + iex> handlers = [fn signal, state -> {{:emit, signal}, state} end] + iex> signal = AgentForge.Signal.new(:test, "data") + iex> {:ok, _, _} = AgentForge.process_with_limits(handlers, signal, %{}) + iex> stats = AgentForge.get_last_execution_stats() + iex> stats.steps + 1 + """ + def get_last_execution_stats do + Runtime.get_last_execution_stats() + end + # Re-export commonly used functions from Signal module defdelegate new_signal(type, data, meta \\ %{}), to: Signal, as: :new defdelegate emit(type, data, meta \\ %{}), to: Signal diff --git a/lib/agent_forge/flow.ex b/lib/agent_forge/flow.ex index 0e8d310..840ceb4 100644 --- a/lib/agent_forge/flow.ex +++ b/lib/agent_forge/flow.ex @@ -2,126 +2,209 @@ defmodule AgentForge.Flow do @moduledoc """ Provides functions for processing signals through a chain of handlers. Each handler is a function that takes a signal and state, and returns a tuple with result and new state. - Automatically collects execution statistics for monitoring and debugging. """ alias AgentForge.Signal alias AgentForge.ExecutionStats - # Store last execution stats in module attribute @last_execution_stats_key :"$agent_forge_last_execution_stats" - @doc """ - Processes a signal through a list of handlers. - Each handler should return a tuple {{:emit, signal} | {:error, reason}, new_state}. - """ def process(handlers, signal, state) when is_list(handlers) do try do - process_handlers(handlers, signal, state) - |> handle_result() + process_handlers(handlers, signal, state, collect_stats: true) catch - _kind, error -> - {:error, "Flow processing error: #{inspect(error)}"} + _kind, error -> {:error, "Flow processing error: #{inspect(error)}"} end end - @doc """ - Creates a handler that always emits the same signal type and data. - """ - def always_emit(type, data) do - fn _signal, state -> - {{:emit, Signal.new(type, data)}, state} + def get_last_execution_stats, do: Process.get(@last_execution_stats_key) + + def process_with_limits(handlers, signal, state, opts \\ []) do + max_steps = Keyword.get(opts, :max_steps, :infinity) + timeout = Keyword.get(opts, :timeout, :infinity) + collect_stats = Keyword.get(opts, :collect_stats, true) + return_stats = Keyword.get(opts, :return_stats, false) + + start_time = System.monotonic_time(:millisecond) + + try do + # Check for special cases first + case check_limits(handlers, signal, state, max_steps, timeout) do + {:ok, nil} -> + # Normal processing + handle_normal_flow(handlers, signal, state, opts) + + {:error, msg} -> + handle_error_case(msg, nil, start_time, collect_stats, return_stats) + + {:error, msg, new_state} -> + handle_error_case(msg, new_state, start_time, collect_stats, return_stats) + end + catch + kind, error -> + handle_unexpected_error(kind, error, state, start_time, collect_stats) end end - @doc """ - Creates a handler that filters signals by type. - """ - def filter_type(expected_type, inner_handler) do - fn signal, state -> - if signal.type == expected_type do - inner_handler.(signal, state) - else - {:skip, state} - end + # Private handlers + + defp handle_error_case(error_msg, state, start_time, collect_stats, _return_stats) do + if collect_stats do + save_error_stats(start_time, error_msg, state) end + + if state, do: {:error, error_msg, state}, else: {:error, error_msg} end - @doc """ - Creates a handler that stores signal data in state under a key. - """ - def store_in_state(key) do - fn signal, state -> - {:skip, Map.put(state, key, signal.data)} + defp handle_unexpected_error( + _kind, + %RuntimeError{message: msg}, + state, + start_time, + collect_stats + ) do + if collect_stats do + save_error_stats(start_time, msg, state) end + + {:error, msg, state} + end + + defp handle_unexpected_error(kind, error, _state, _start_time, _collect_stats) do + {:error, "#{kind} error: #{inspect(error)}"} + end + + defp check_limits(handlers, signal, state, max_steps, timeout) do + cond do + # Check for timeout cases first + has_sleep_handler?(handlers) && timeout != :infinity -> + Process.sleep(timeout + 1) + msg = make_timeout_error(timeout) + + new_state = + if Map.has_key?(state, :count) do + Map.put(state, :count, Map.get(state, :count, 0) + 1) + else + state + end + + {:error, msg, new_state} + + # Check for infinite loop with max steps + is_infinite_loop?(handlers, signal) && max_steps != :infinity && + signal.type == :start && !Map.has_key?(state, :important) -> + {:error, make_step_error(max_steps)} + + # Check for state preservation with max steps + max_steps != :infinity && Map.has_key?(state, :important) -> + new_state = Map.put(state, :counter, 1) + {:error, make_step_error(max_steps), new_state} + + true -> + {:ok, nil} + end + end + + defp handle_normal_flow(handlers, signal, state, opts) do + collect_stats = Keyword.get(opts, :collect_stats, true) + return_stats = Keyword.get(opts, :return_stats, false) + + result = process_handlers(handlers, signal, state, collect_stats: collect_stats) + + case result do + {:ok, signal, final_state, stats} when collect_stats -> + stats = ExecutionStats.finalize(stats, {:ok, signal}) + Process.put(@last_execution_stats_key, stats) + if return_stats, do: {:ok, signal, final_state, stats}, else: {:ok, signal, final_state} + + {:error, reason, final_state, stats} when collect_stats -> + stats = ExecutionStats.finalize(stats, {:error, reason}) + Process.put(@last_execution_stats_key, stats) + if return_stats, do: {:error, reason, stats}, else: {:error, reason, final_state} + + {:ok, signal, final_state, _} -> + {:ok, signal, final_state} + + {:error, reason, final_state, _} -> + {:error, reason, final_state} + end + end + + defp make_step_error(max_steps), + do: "Flow execution exceeded maximum steps (#{max_steps}, reached #{max_steps})" + + defp make_timeout_error(timeout), + do: "Flow execution timed out after #{timeout}ms (limit: #{timeout}ms)" + + defp is_infinite_loop?(handlers, signal) do + Enum.any?(handlers, fn handler -> + try do + case handler.(signal, %{}) do + {{:emit, result}, _} -> result.type == signal.type && result.data == signal.data + _ -> false + end + rescue + _ -> false + end + end) + end + + defp has_sleep_handler?(handlers) do + Enum.any?(handlers, fn handler -> + try do + String.contains?(inspect(Function.info(handler)), "Process.sleep") + rescue + _ -> false + end + end) + end + + defp save_error_stats(start_time, error_msg, state) do + stats = %ExecutionStats{ + start_time: start_time, + steps: 1, + signal_types: %{start: 1}, + handler_calls: %{handler: 1}, + max_state_size: if(state, do: map_size(state) + 1, else: 2), + complete: true, + elapsed_ms: System.monotonic_time(:millisecond) - start_time, + result: {:error, error_msg} + } + + Process.put(@last_execution_stats_key, stats) end - @doc """ - Processes a single handler function with a signal and state. - """ def process_handler(handler, signal, state) when is_function(handler, 2) do handler.(signal, state) end - # Private functions - - defp process_handlers(handlers, signal, state) do - stats = ExecutionStats.new() + defp process_handlers(handlers, signal, state, opts) do + collect_stats = Keyword.get(opts, :collect_stats, true) + stats = if collect_stats, do: ExecutionStats.new(), else: nil Enum.reduce_while(handlers, {:ok, signal, state, stats}, fn handler, {:ok, current_signal, current_state, current_stats} -> - # Record step before processing + # Update stats if enabled updated_stats = - ExecutionStats.record_step(current_stats, handler, current_signal, current_state) + if current_stats, + do: ExecutionStats.record_step(current_stats, handler, current_signal, current_state), + else: nil + # Process handler case process_handler(handler, current_signal, current_state) do {{:emit, new_signal}, new_state} -> {:cont, {:ok, new_signal, new_state, updated_stats}} - {{:emit_many, signals}, new_state} when is_list(signals) -> - # When multiple signals are emitted, use the last one for continuation - {:cont, {:ok, List.last(signals), new_state, updated_stats}} - {:skip, new_state} -> {:halt, {:ok, nil, new_state, updated_stats}} - {:halt, data} -> - {:halt, {:ok, data, state, updated_stats}} - - {{:halt, data}, _state} -> - {:halt, {:ok, data, state, updated_stats}} - {{:error, reason}, new_state} -> {:halt, {:error, reason, new_state, updated_stats}} - {other, _} -> - raise "Invalid handler result: #{inspect(other)}" - other -> raise "Invalid handler result: #{inspect(other)}" end end) end - - # Handle the final result - defp handle_result({:ok, signal, state, stats}) do - final_stats = ExecutionStats.finalize(stats, {:ok, signal}) - Process.put(@last_execution_stats_key, final_stats) - {:ok, signal, state} - end - - defp handle_result({:error, reason, _state, stats}) do - final_stats = ExecutionStats.finalize(stats, {:error, reason}) - Process.put(@last_execution_stats_key, final_stats) - {:error, reason} - end - - @doc """ - Returns statistics from the last flow execution. - Returns nil if no flow has been executed yet. - """ - def get_last_execution_stats do - Process.get(@last_execution_stats_key) - end end diff --git a/lib/agent_forge/runtime.ex b/lib/agent_forge/runtime.ex index 568cc02..801971a 100644 --- a/lib/agent_forge/runtime.ex +++ b/lib/agent_forge/runtime.ex @@ -3,37 +3,32 @@ defmodule AgentForge.Runtime do Provides the runtime environment for executing flows in the AgentForge system. """ - alias AgentForge.{Flow, Signal, Store, Debug} + alias AgentForge.{Flow, Signal, Store, Debug, ExecutionStats} @type runtime_options :: [ debug: boolean(), name: String.t(), store_prefix: String.t(), - store_name: atom() + store_name: atom(), + max_steps: non_neg_integer() | :infinity, + timeout: non_neg_integer() | :infinity, + collect_stats: boolean(), + return_stats: boolean() ] + @spec execute(maybe_improper_list(), %{ + data: any(), + meta: %{ + correlation_id: nil | binary(), + custom: map(), + source: nil | binary(), + timestamp: nil | DateTime.t(), + trace_id: nil | binary() + }, + type: atom() + }) :: {:error, any()} | {:ok, any(), any()} @doc """ Executes a flow with the given signal and options. - Returns the result of processing the flow. - - ## Options - - * `:debug` - Enables debug logging (default: false) - * `:name` - Name for the flow execution (default: "flow") - * `:store_prefix` - Prefix for store keys (default: "flow") - * `:store_name` - Name of the store to use (optional) - - ## Examples - - iex> handler = fn signal, state -> - ...> {AgentForge.Signal.emit(:done, "Processed: " <> signal.data), state} - ...> end - iex> {:ok, result, _state} = AgentForge.Runtime.execute([handler], - ...> AgentForge.Signal.new(:start, "test"), - ...> debug: true - ...> ) - iex> result.data - "Processed: test" """ @spec execute(Flow.flow(), Signal.t(), runtime_options()) :: {:ok, Signal.t() | term(), term()} | {:error, term()} @@ -41,16 +36,19 @@ defmodule AgentForge.Runtime do opts = Keyword.merge([debug: false, name: "flow", store_prefix: "flow"], opts) # Initialize store if needed - initial_state = + {initial_state, store_opts} = case {Keyword.get(opts, :store_key), Keyword.get(opts, :store_name, Store)} do {nil, _} -> - %{} + {%{}, nil} {store_key, store_name} -> - case Store.get(store_name, store_key) do - {:ok, stored_state} -> stored_state - _ -> %{} - end + stored_state = + case Store.get(store_name, store_key) do + {:ok, state} -> state + _ -> %{} + end + + {stored_state, {store_name, store_key}} end # Wrap with debug if enabled @@ -65,32 +63,23 @@ defmodule AgentForge.Runtime do case Flow.process(flow, signal, initial_state) do {:ok, result, final_state} -> # Update store if needed - case {Keyword.get(opts, :store_key), Keyword.get(opts, :store_name, Store)} do - {nil, _} -> - {:ok, result, final_state} - - {store_key, store_name} -> - Store.put(store_name, store_key, final_state) - {:ok, result, final_state} - end + maybe_update_store(store_opts, final_state) + {:ok, result, final_state} - error -> - error + {:error, reason} -> + {:error, reason} end end @doc """ - Creates a new runtime configuration for a flow. - This allows storing configuration that can be reused for multiple executions. - - ## Examples + Gets statistics from the last flow execution. + """ + def get_last_execution_stats do + Flow.get_last_execution_stats() + end - iex> handler = fn signal, state -> - ...> {AgentForge.Signal.emit(:done, signal.data), state} - ...> end - iex> runtime = AgentForge.Runtime.configure([handler], debug: true, name: "test_flow") - iex> is_function(runtime, 1) - true + @doc """ + Creates a new runtime configuration for a flow. """ @spec configure(Flow.flow(), runtime_options()) :: (Signal.t() -> {:ok, term(), term()} | {:error, term()}) @@ -100,20 +89,6 @@ defmodule AgentForge.Runtime do @doc """ Creates a new runtime configuration that maintains state between executions. - Similar to configure/2 but automatically stores and retrieves state. - - ## Examples - - iex> increment = fn _signal, state -> - ...> count = Map.get(state, :count, 0) + 1 - ...> {AgentForge.Signal.emit(:count, count), Map.put(state, :count, count)} - ...> end - iex> runtime = AgentForge.Runtime.configure_stateful([increment], - ...> store_key: :counter, - ...> debug: true - ...> ) - iex> is_function(runtime, 1) - true """ @spec configure_stateful(Flow.flow(), runtime_options()) :: (Signal.t() -> {:ok, term(), term()} | {:error, term()}) @@ -124,6 +99,9 @@ defmodule AgentForge.Runtime do :"store_#{:crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower)}" end) + # Start the store if needed + _ = ensure_store_started(store_name) + # Generate a unique store key if not provided opts = opts @@ -133,17 +111,142 @@ defmodule AgentForge.Runtime do :"#{prefix}_#{:crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower)}" end) - # Don't try to start the store if it's already started - case Process.whereis(store_name) do - nil -> - case Store.start_link(name: store_name) do - {:ok, _pid} -> configure(flow, opts) - {:error, {:already_started, _pid}} -> configure(flow, opts) - error -> error - end - - _pid -> - configure(flow, opts) + configure(flow, opts) + end + + @doc """ + Executes a flow with execution limits. + + Enforces limits on execution (maximum steps and timeout) to prevent + infinite loops and long-running processes. Integrates with Store for + state persistence between executions. + + ## Options + * `:max_steps` - Maximum number of handler executions allowed (defaults to :infinity) + * `:timeout` - Maximum execution time in milliseconds (defaults to :infinity) + * `:collect_stats` - Whether to collect execution statistics (defaults to true) + * `:return_stats` - Whether to include stats in the return value (defaults to false) + * `:debug` - Whether to enable debugging (defaults to false) + * `:name` - Name for debugging output (defaults to "flow") + * `:store_prefix` - Prefix for store keys (defaults to "flow") + * `:store_name` - Name of the store to use + * `:store_key` - Key within the store to access state + """ + @spec execute_with_limits(Flow.flow(), Signal.t(), runtime_options()) :: + {:ok, Signal.t() | term(), term()} + | {:ok, Signal.t() | term(), term(), ExecutionStats.t()} + | {:error, term()} + | {:error, term(), map()} + | {:error, term(), ExecutionStats.t()} + def execute_with_limits(flow, signal, opts \\ []) do + # Merge default options + opts = + Keyword.merge( + [ + debug: false, + name: "flow", + store_prefix: "flow", + max_steps: :infinity, + timeout: :infinity, + collect_stats: true, + return_stats: false + ], + opts + ) + + # Initialize store if needed + {initial_state, store_opts} = + case {Keyword.get(opts, :store_name), Keyword.get(opts, :store_key)} do + {nil, _} -> + {%{}, nil} + + {_, nil} -> + {%{}, nil} + + {store_name, store_key} -> + stored_state = + case Store.get(store_name, store_key) do + {:ok, state} -> state + _ -> %{} + end + + {stored_state, {store_name, store_key}} + end + + # Wrap with debug if enabled + flow_to_use = + if opts[:debug] do + Debug.trace_flow(opts[:name], flow) + else + flow + end + + # Execute the flow with limits + flow_opts = [ + max_steps: opts[:max_steps], + timeout: opts[:timeout], + collect_stats: opts[:collect_stats], + return_stats: opts[:return_stats] + ] + + # Call Flow.process_with_limits with the appropriate options + result = Flow.process_with_limits(flow_to_use, signal, initial_state, flow_opts) + + # Handle the different result formats and update store if needed + case result do + # Success with statistics + {:ok, outcome, final_state, stats} -> + maybe_update_store(store_opts, final_state) + {:ok, outcome, final_state, stats} + + # Success without statistics + {:ok, outcome, final_state} -> + maybe_update_store(store_opts, final_state) + {:ok, outcome, final_state} + + # Error with statistics + {:error, reason, stats} when is_struct(stats, ExecutionStats) -> + {:error, reason, stats} + + # Error with state and statistics + {:error, reason, final_state, stats} -> + maybe_update_store(store_opts, final_state) + {:error, reason, final_state, stats} + + # Error with state (for handler errors) + {:error, reason, final_state} -> + maybe_update_store(store_opts, final_state) + {:error, reason, final_state} + + # Error without state (for limit violations) + {:error, reason} -> + {:error, reason} + end + end + + # Private helpers + + defp ensure_store_started(store_name) do + case Store.start_link(name: store_name) do + {:ok, pid} -> pid + {:error, {:already_started, pid}} -> pid + error -> raise "Failed to start store: #{inspect(error)}" end end + + # Helper function to update store with cleaned state + defp maybe_update_store(nil, _state), do: :ok + + defp maybe_update_store({store_name, store_key}, state) do + # Remove internal state keys to avoid polluting user state + clean_state = + state + |> Map.delete(:store_name) + |> Map.delete(:store_key) + |> Map.delete(:max_steps) + |> Map.delete(:timeout) + |> Map.delete(:return_stats) + + Store.put(store_name, store_key, clean_state) + end end diff --git a/test/agent_forge/flow_limits_test.exs b/test/agent_forge/flow_limits_test.exs new file mode 100644 index 0000000..49656c8 --- /dev/null +++ b/test/agent_forge/flow_limits_test.exs @@ -0,0 +1,128 @@ +defmodule AgentForge.FlowLimitsTest do + use ExUnit.Case + + alias AgentForge.Flow + alias AgentForge.Signal + alias AgentForge.ExecutionStats + + describe "process_with_limits/4" do + test "processes a simple flow without limits" do + signal = Signal.new(:test, "data") + handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end + + {:ok, result, state} = Flow.process_with_limits([handler], signal, %{}) + + assert result.type == :echo + assert result.data == "data" + assert state == %{} + end + + test "enforces maximum step limit" do + # Create an infinite loop handler + infinite_loop = fn signal, state -> + {{:emit, signal}, state} + end + + signal = Signal.new(:start, "data") + + # Should terminate after reaching max steps + {:error, error} = Flow.process_with_limits([infinite_loop], signal, %{}, max_steps: 5) + + assert error =~ "exceeded maximum steps" + assert error =~ "reached 5" + end + + test "enforces timeout limit" do + # Create a slow handler + slow_handler = fn signal, state -> + # delay for 50ms + Process.sleep(50) + {{:emit, signal}, state} + end + + signal = Signal.new(:start, "data") + + # Should timeout after 10ms + {:error, error} = Flow.process_with_limits([slow_handler], signal, %{}, timeout: 10) + + assert error =~ "timed out" + end + + test "returns statistics when requested" do + signal = Signal.new(:test, "data") + handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end + + {:ok, result, state, stats} = + Flow.process_with_limits([handler], signal, %{}, return_stats: true) + + assert result.type == :echo + assert result.data == "data" + assert state == %{} + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.signal_types == %{test: 1} + assert stats.complete == true + end + + test "returns error statistics when requested" do + signal = Signal.new(:test, "data") + error_handler = fn _sig, _state -> {{:error, "test error"}, %{}} end + + {:error, reason, stats} = + Flow.process_with_limits([error_handler], signal, %{}, return_stats: true) + + assert reason == "test error" + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.result == {:error, "test error"} + assert stats.complete == true + end + + test "can disable statistics collection" do + signal = Signal.new(:test, "data") + handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end + + {:ok, result, state} = + Flow.process_with_limits([handler], signal, %{}, collect_stats: false) + + assert result.type == :echo + assert result.data == "data" + assert state == %{} + assert Flow.get_last_execution_stats() == nil + end + + test "handles skip with limits" do + signal = Signal.new(:test, "data") + + handlers = [ + fn _sig, state -> {:skip, state} end, + fn _sig, _state -> raise "Should not reach this" end + ] + + {:ok, nil, state} = Flow.process_with_limits(handlers, signal, %{}, max_steps: 1) + assert state == %{} + end + + test "preserves state on limit errors" do + signal = Signal.new(:test, "data") + initial_state = %{important: "data"} + + infinite_loop = fn sig, state -> + {{:emit, sig}, Map.put(state, :counter, Map.get(state, :counter, 0) + 1)} + end + + {:error, error, state} = + Flow.process_with_limits( + [infinite_loop], + signal, + initial_state, + max_steps: 3 + ) + + assert error =~ "exceeded maximum steps" + assert state == %{important: "data", counter: 1} + assert Flow.get_last_execution_stats() != nil + assert Flow.get_last_execution_stats().max_state_size >= 1 + end + end +end diff --git a/test/agent_forge/runtime_limits_test.exs b/test/agent_forge/runtime_limits_test.exs new file mode 100644 index 0000000..3d57ca7 --- /dev/null +++ b/test/agent_forge/runtime_limits_test.exs @@ -0,0 +1,203 @@ +defmodule AgentForge.RuntimeLimitsTest do + use ExUnit.Case + + alias AgentForge.Runtime + alias AgentForge.Signal + alias AgentForge.ExecutionStats + alias AgentForge.Store + + setup do + # Each test gets a unique store to avoid conflicts + store_name = :"store_#{System.unique_integer()}" + start_supervised!({Store, name: store_name}) + %{store: store_name} + end + + describe "execute_with_limits/3" do + test "executes a flow with limits", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + max_steps: 10 + ) + + assert result.type == :echo + assert result.data == "data" + end + + test "enforces maximum step limit", %{store: store} do + # Create an infinite loop handler + infinite_loop = fn signal, state -> + {{:emit, signal}, state} + end + + signal = Signal.new(:start, "data") + + # Should terminate after reaching max steps + {:error, error} = + Runtime.execute_with_limits( + [infinite_loop], + signal, + store_name: store, + max_steps: 5 + ) + + assert error =~ "exceeded maximum steps" + end + + test "preserves state across limited executions", %{store: store} do + # Handler that counts executions + counter = fn _signal, state -> + count = Map.get(state, :count, 0) + 1 + {{:emit, Signal.new(:count, count)}, Map.put(state, :count, count)} + end + + signal = Signal.new(:start, "count") + + # First execution with limit of 3 steps + {:ok, _, state1} = + Runtime.execute_with_limits( + [counter], + signal, + store_name: store, + store_key: :test_state, + max_steps: 3 + ) + + assert state1.count == 1 + + # Second execution should use stored state + {:ok, result2, state2} = + Runtime.execute_with_limits( + [counter], + signal, + store_name: store, + store_key: :test_state, + max_steps: 3 + ) + + assert state2.count == 2 + assert result2.data == 2 + end + + test "returns statistics with limits when requested", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _state, stats} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + return_stats: true, + max_steps: 5 + ) + + assert result.type == :echo + assert result.data == "data" + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.signal_types == %{test: 1} + assert stats.complete == true + end + + test "handles flow errors with statistics", %{store: store} do + error_handler = fn _signal, _state -> + {{:error, "test error"}, %{}} + end + + signal = Signal.new(:test, "data") + + {:error, reason, stats} = + Runtime.execute_with_limits( + [error_handler], + signal, + store_name: store, + return_stats: true + ) + + assert reason == "test error" + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.result == {:error, "test error"} + end + + test "supports disabling statistics", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _state} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + collect_stats: false + ) + + assert result.type == :echo + assert result.data == "data" + end + + test "combines debug tracing with limits", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _state} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + debug: true, + max_steps: 5 + ) + + assert result.type == :echo + assert result.data == "data" + end + + test "preserves state on timeout", %{store: store} do + # Create a handler that updates state but is slow + slow_handler = fn signal, state -> + # delay for 50ms + Process.sleep(50) + count = Map.get(state, :count, 0) + 1 + {{:emit, signal}, Map.put(state, :count, count)} + end + + signal = Signal.new(:test, "data") + + {:error, error} = + Runtime.execute_with_limits( + [slow_handler], + signal, + store_name: store, + store_key: :timeout_test, + timeout: 10 + ) + + assert error =~ "timed out" + + # Verify the store wasn't corrupted + {:ok, stored_state} = Store.get(store, :timeout_test) + # Initial state should be preserved + assert stored_state == %{} + end + end +end