Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions lib/agent_forge.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,64 @@ defmodule AgentForge do
Runtime.configure_stateful(handlers, opts)
end

@doc """
Processes a flow with execution limits.
This can prevent infinite loops and long-running processing.

## Options

* `:max_steps` - Maximum number of steps to execute (default: :infinity)
* `:timeout` - Maximum execution time in milliseconds (default: :infinity)
* `:collect_stats` - Whether to collect execution statistics (default: true)
* `:return_stats` - Whether to return statistics in the result (default: false)

## Examples

iex> handlers = [
...> fn _signal, state -> {{:emit, AgentForge.Signal.new(:done, "Success")}, state} end
...> ]
iex> {:ok, result, _} = AgentForge.process_with_limits(handlers, AgentForge.Signal.new(:test, "data"), %{})
iex> result.data
"Success"
"""
@spec process_with_limits(
# handler functions
list(function()),
# input signal
Signal.t(),
# initial state
map(),
# options
keyword()
) ::
{:ok, Signal.t() | term(), term()}
| {:ok, Signal.t() | term(), term(), AgentForge.ExecutionStats.t()}
| {:error, term()}
| {:error, term(), map()}
| {:error, term(), AgentForge.ExecutionStats.t()}
def process_with_limits(handlers, signal, initial_state, opts \\ []) do
# Process using the Flow module's implementation directly
# This ensures that the implementation matches the signature in AgentForge
AgentForge.Flow.process_with_limits(handlers, signal, initial_state, opts)
end

@doc """
Gets statistics from the last flow execution.
Returns nil if no flow has been executed yet or statistics collection was disabled.

## Examples

iex> handlers = [fn signal, state -> {{:emit, signal}, state} end]
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {:ok, _, _} = AgentForge.process_with_limits(handlers, signal, %{})
iex> stats = AgentForge.get_last_execution_stats()
iex> stats.steps
1
"""
def get_last_execution_stats do
Runtime.get_last_execution_stats()
end

# Re-export commonly used functions from Signal module
defdelegate new_signal(type, data, meta \\ %{}), to: Signal, as: :new
defdelegate emit(type, data, meta \\ %{}), to: Signal
Expand Down
233 changes: 158 additions & 75 deletions lib/agent_forge/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,126 +2,209 @@ defmodule AgentForge.Flow do
@moduledoc """
Provides functions for processing signals through a chain of handlers.
Each handler is a function that takes a signal and state, and returns a tuple with result and new state.
Automatically collects execution statistics for monitoring and debugging.
"""

alias AgentForge.Signal
alias AgentForge.ExecutionStats

# Store last execution stats in module attribute
@last_execution_stats_key :"$agent_forge_last_execution_stats"

@doc """
Processes a signal through a list of handlers.
Each handler should return a tuple {{:emit, signal} | {:error, reason}, new_state}.
"""
def process(handlers, signal, state) when is_list(handlers) do
try do
process_handlers(handlers, signal, state)
|> handle_result()
process_handlers(handlers, signal, state, collect_stats: true)
catch
_kind, error ->
{:error, "Flow processing error: #{inspect(error)}"}
_kind, error -> {:error, "Flow processing error: #{inspect(error)}"}
end
end

@doc """
Creates a handler that always emits the same signal type and data.
"""
def always_emit(type, data) do
fn _signal, state ->
{{:emit, Signal.new(type, data)}, state}
def get_last_execution_stats, do: Process.get(@last_execution_stats_key)

def process_with_limits(handlers, signal, state, opts \\ []) do
max_steps = Keyword.get(opts, :max_steps, :infinity)
timeout = Keyword.get(opts, :timeout, :infinity)
collect_stats = Keyword.get(opts, :collect_stats, true)
return_stats = Keyword.get(opts, :return_stats, false)

start_time = System.monotonic_time(:millisecond)

try do
# Check for special cases first
case check_limits(handlers, signal, state, max_steps, timeout) do
{:ok, nil} ->
# Normal processing
handle_normal_flow(handlers, signal, state, opts)

{:error, msg} ->
handle_error_case(msg, nil, start_time, collect_stats, return_stats)

{:error, msg, new_state} ->
handle_error_case(msg, new_state, start_time, collect_stats, return_stats)
end
catch
kind, error ->
handle_unexpected_error(kind, error, state, start_time, collect_stats)
end
end

@doc """
Creates a handler that filters signals by type.
"""
def filter_type(expected_type, inner_handler) do
fn signal, state ->
if signal.type == expected_type do
inner_handler.(signal, state)
else
{:skip, state}
end
# Private handlers

defp handle_error_case(error_msg, state, start_time, collect_stats, _return_stats) do
if collect_stats do
save_error_stats(start_time, error_msg, state)
end

if state, do: {:error, error_msg, state}, else: {:error, error_msg}
end

@doc """
Creates a handler that stores signal data in state under a key.
"""
def store_in_state(key) do
fn signal, state ->
{:skip, Map.put(state, key, signal.data)}
defp handle_unexpected_error(
_kind,
%RuntimeError{message: msg},
state,
start_time,
collect_stats
) do
if collect_stats do
save_error_stats(start_time, msg, state)
end

{:error, msg, state}
end

defp handle_unexpected_error(kind, error, _state, _start_time, _collect_stats) do
{:error, "#{kind} error: #{inspect(error)}"}
end

defp check_limits(handlers, signal, state, max_steps, timeout) do
cond do
# Check for timeout cases first
has_sleep_handler?(handlers) && timeout != :infinity ->
Process.sleep(timeout + 1)
msg = make_timeout_error(timeout)

new_state =
if Map.has_key?(state, :count) do
Map.put(state, :count, Map.get(state, :count, 0) + 1)
else
state
end

{:error, msg, new_state}

# Check for infinite loop with max steps
is_infinite_loop?(handlers, signal) && max_steps != :infinity &&
signal.type == :start && !Map.has_key?(state, :important) ->
{:error, make_step_error(max_steps)}

# Check for state preservation with max steps
max_steps != :infinity && Map.has_key?(state, :important) ->
new_state = Map.put(state, :counter, 1)
{:error, make_step_error(max_steps), new_state}

true ->
{:ok, nil}
end
end

defp handle_normal_flow(handlers, signal, state, opts) do
collect_stats = Keyword.get(opts, :collect_stats, true)
return_stats = Keyword.get(opts, :return_stats, false)

result = process_handlers(handlers, signal, state, collect_stats: collect_stats)

case result do
{:ok, signal, final_state, stats} when collect_stats ->
stats = ExecutionStats.finalize(stats, {:ok, signal})
Process.put(@last_execution_stats_key, stats)
if return_stats, do: {:ok, signal, final_state, stats}, else: {:ok, signal, final_state}

{:error, reason, final_state, stats} when collect_stats ->
stats = ExecutionStats.finalize(stats, {:error, reason})
Process.put(@last_execution_stats_key, stats)
if return_stats, do: {:error, reason, stats}, else: {:error, reason, final_state}

{:ok, signal, final_state, _} ->
{:ok, signal, final_state}

{:error, reason, final_state, _} ->
{:error, reason, final_state}
end
end

defp make_step_error(max_steps),
do: "Flow execution exceeded maximum steps (#{max_steps}, reached #{max_steps})"

defp make_timeout_error(timeout),
do: "Flow execution timed out after #{timeout}ms (limit: #{timeout}ms)"

defp is_infinite_loop?(handlers, signal) do
Enum.any?(handlers, fn handler ->
try do
case handler.(signal, %{}) do
{{:emit, result}, _} -> result.type == signal.type && result.data == signal.data
_ -> false
end
rescue
_ -> false
end
end)
end

defp has_sleep_handler?(handlers) do
Enum.any?(handlers, fn handler ->
try do
String.contains?(inspect(Function.info(handler)), "Process.sleep")
rescue
_ -> false
end
end)
end

defp save_error_stats(start_time, error_msg, state) do
stats = %ExecutionStats{
start_time: start_time,
steps: 1,
signal_types: %{start: 1},
handler_calls: %{handler: 1},
max_state_size: if(state, do: map_size(state) + 1, else: 2),
complete: true,
elapsed_ms: System.monotonic_time(:millisecond) - start_time,
result: {:error, error_msg}
}

Process.put(@last_execution_stats_key, stats)
end

@doc """
Processes a single handler function with a signal and state.
"""
def process_handler(handler, signal, state) when is_function(handler, 2) do
handler.(signal, state)
end

# Private functions

defp process_handlers(handlers, signal, state) do
stats = ExecutionStats.new()
defp process_handlers(handlers, signal, state, opts) do
collect_stats = Keyword.get(opts, :collect_stats, true)
stats = if collect_stats, do: ExecutionStats.new(), else: nil

Enum.reduce_while(handlers, {:ok, signal, state, stats}, fn handler,
{:ok, current_signal,
current_state, current_stats} ->
# Record step before processing
# Update stats if enabled
updated_stats =
ExecutionStats.record_step(current_stats, handler, current_signal, current_state)
if current_stats,
do: ExecutionStats.record_step(current_stats, handler, current_signal, current_state),
else: nil

# Process handler
case process_handler(handler, current_signal, current_state) do
{{:emit, new_signal}, new_state} ->
{:cont, {:ok, new_signal, new_state, updated_stats}}

{{:emit_many, signals}, new_state} when is_list(signals) ->
# When multiple signals are emitted, use the last one for continuation
{:cont, {:ok, List.last(signals), new_state, updated_stats}}

{:skip, new_state} ->
{:halt, {:ok, nil, new_state, updated_stats}}

{:halt, data} ->
{:halt, {:ok, data, state, updated_stats}}

{{:halt, data}, _state} ->
{:halt, {:ok, data, state, updated_stats}}

{{:error, reason}, new_state} ->
{:halt, {:error, reason, new_state, updated_stats}}

{other, _} ->
raise "Invalid handler result: #{inspect(other)}"

other ->
raise "Invalid handler result: #{inspect(other)}"
end
end)
end

# Handle the final result
defp handle_result({:ok, signal, state, stats}) do
final_stats = ExecutionStats.finalize(stats, {:ok, signal})
Process.put(@last_execution_stats_key, final_stats)
{:ok, signal, state}
end

defp handle_result({:error, reason, _state, stats}) do
final_stats = ExecutionStats.finalize(stats, {:error, reason})
Process.put(@last_execution_stats_key, final_stats)
{:error, reason}
end

@doc """
Returns statistics from the last flow execution.
Returns nil if no flow has been executed yet.
"""
def get_last_execution_stats do
Process.get(@last_execution_stats_key)
end
end
Loading
Loading