Skip to content

Commit 54dbf3a

Browse files
feat(elixir): add shared per-host ssh worker caps
Summary: - add an optional worker.max_concurrent_agents_per_host setting to the Elixir workflow schema - update orchestrator host selection to skip SSH hosts that are at the shared per-host cap and wait when all SSH hosts are full - cover the new cap in config, scheduler, and spec documentation tests Rationale: - let operators bound concurrent ticket execution per SSH worker machine without introducing per-host bespoke config - keep scheduling behavior predictable by treating saturated SSH hosts as temporarily unavailable instead of falling back to local execution - document the feature lightly in the spec so the extension is visible without overspecifying implementation details Tests: - make all Co-authored-by: Codex <codex@openai.com>
1 parent 10b3980 commit 54dbf3a

File tree

6 files changed

+130
-13
lines changed

6 files changed

+130
-13
lines changed

SPEC.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,10 @@ This section is intentionally redundant so a coding agent can implement the conf
559559
- `tracker.terminal_states`: list of strings, default `["Closed", "Cancelled", "Canceled", "Duplicate", "Done"]`
560560
- `polling.interval_ms`: integer, default `30000`
561561
- `workspace.root`: path, default `<system-temp>/symphony_workspaces`
562+
- `worker.ssh_hosts` (extension): list of SSH host strings, optional; when omitted, work runs
563+
locally
564+
- `worker.max_concurrent_agents_per_host` (extension): positive integer, optional; shared per-host
565+
cap applied across configured SSH hosts
562566
- `hooks.after_create`: shell script or null
563567
- `hooks.before_run`: shell script or null
564568
- `hooks.after_run`: shell script or null
@@ -729,6 +733,12 @@ Per-state limit:
729733

730734
The runtime counts issues by their current tracked state in the `running` map.
731735

736+
Optional SSH host limit:
737+
738+
- When `worker.max_concurrent_agents_per_host` is set, each configured SSH host may run at most
739+
that many concurrent agents at once.
740+
- Hosts at that cap are skipped for new dispatch until capacity frees up.
741+
732742
### 8.4 Retry and Backoff
733743

734744
Retry entry creation:

elixir/lib/symphony_elixir/config/schema.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,14 @@ defmodule SymphonyElixir.Config.Schema do
108108
@primary_key false
109109
embedded_schema do
110110
field(:ssh_hosts, {:array, :string}, default: [])
111+
field(:max_concurrent_agents_per_host, :integer)
111112
end
112113

113114
@spec changeset(%__MODULE__{}, map()) :: Ecto.Changeset.t()
114115
def changeset(schema, attrs) do
115116
schema
116-
|> cast(attrs, [:ssh_hosts], empty_values: [])
117+
|> cast(attrs, [:ssh_hosts, :max_concurrent_agents_per_host], empty_values: [])
118+
|> validate_number(:max_concurrent_agents_per_host, greater_than: 0)
117119
end
118120
end
119121

elixir/lib/symphony_elixir/orchestrator.ex

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,12 @@ defmodule SymphonyElixir.Orchestrator do
327327
sort_issues_for_dispatch(issues)
328328
end
329329

330+
@doc false
331+
@spec select_worker_host_for_test(term(), String.t() | nil) :: String.t() | nil | :no_worker_capacity
332+
def select_worker_host_for_test(%State{} = state, preferred_worker_host) do
333+
select_worker_host(state, preferred_worker_host)
334+
end
335+
330336
defp reconcile_running_issue_states([], state, _active_states, _terminal_states), do: state
331337

332338
defp reconcile_running_issue_states([issue | rest], state, active_states, terminal_states) do
@@ -556,7 +562,8 @@ defmodule SymphonyElixir.Orchestrator do
556562
!MapSet.member?(claimed, issue.id) and
557563
!Map.has_key?(running, issue.id) and
558564
available_slots(state) > 0 and
559-
state_slots_available?(issue, running)
565+
state_slots_available?(issue, running) and
566+
worker_slots_available?(state)
560567
end
561568

562569
defp should_dispatch_issue?(_issue, _state, _active_states, _terminal_states), do: false
@@ -672,8 +679,18 @@ defmodule SymphonyElixir.Orchestrator do
672679

673680
defp do_dispatch_issue(%State{} = state, issue, attempt, preferred_worker_host) do
674681
recipient = self()
675-
worker_host = select_worker_host(state, preferred_worker_host)
676682

683+
case select_worker_host(state, preferred_worker_host) do
684+
:no_worker_capacity ->
685+
Logger.debug("No SSH worker slots available for #{issue_context(issue)} preferred_worker_host=#{inspect(preferred_worker_host)}")
686+
state
687+
688+
worker_host ->
689+
spawn_issue_on_worker_host(state, issue, attempt, recipient, worker_host)
690+
end
691+
end
692+
693+
defp spawn_issue_on_worker_host(%State{} = state, issue, attempt, recipient, worker_host) do
677694
case Task.Supervisor.start_child(SymphonyElixir.TaskSupervisor, fn ->
678695
AgentRunner.run(issue, recipient, attempt: attempt, worker_host: worker_host)
679696
end) do
@@ -885,7 +902,8 @@ defmodule SymphonyElixir.Orchestrator do
885902

886903
defp handle_active_retry(state, issue, attempt, metadata) do
887904
if retry_candidate_issue?(issue, terminal_state_set()) and
888-
dispatch_slots_available?(issue, state) do
905+
dispatch_slots_available?(issue, state) and
906+
worker_slots_available?(state, metadata[:worker_host]) do
889907
{:noreply, dispatch_issue(state, issue, attempt, metadata[:worker_host])}
890908
else
891909
Logger.debug("No available slots for retrying #{issue_context(issue)}; retrying again")
@@ -958,10 +976,17 @@ defmodule SymphonyElixir.Orchestrator do
958976
nil
959977

960978
hosts ->
961-
if preferred_worker_host_available?(preferred_worker_host, hosts) do
962-
preferred_worker_host
963-
else
964-
least_loaded_worker_host(state, hosts)
979+
available_hosts = Enum.filter(hosts, &worker_host_slots_available?(state, &1))
980+
981+
cond do
982+
available_hosts == [] ->
983+
:no_worker_capacity
984+
985+
preferred_worker_host_available?(preferred_worker_host, available_hosts) ->
986+
preferred_worker_host
987+
988+
true ->
989+
least_loaded_worker_host(state, available_hosts)
965990
end
966991
end
967992
end
@@ -989,6 +1014,24 @@ defmodule SymphonyElixir.Orchestrator do
9891014
end)
9901015
end
9911016

1017+
defp worker_slots_available?(%State{} = state) do
1018+
select_worker_host(state, nil) != :no_worker_capacity
1019+
end
1020+
1021+
defp worker_slots_available?(%State{} = state, preferred_worker_host) do
1022+
select_worker_host(state, preferred_worker_host) != :no_worker_capacity
1023+
end
1024+
1025+
defp worker_host_slots_available?(%State{} = state, worker_host) when is_binary(worker_host) do
1026+
case Config.settings!().worker.max_concurrent_agents_per_host do
1027+
limit when is_integer(limit) and limit > 0 ->
1028+
running_worker_host_count(state.running, worker_host) < limit
1029+
1030+
_ ->
1031+
true
1032+
end
1033+
end
1034+
9921035
defp find_issue_by_id(issues, issue_id) when is_binary(issue_id) do
9931036
Enum.find(issues, fn
9941037
%Issue{id: ^issue_id} ->

elixir/test/support/test_support.exs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ defmodule SymphonyElixir.TestSupport do
102102
poll_interval_ms: 30_000,
103103
workspace_root: Path.join(System.tmp_dir!(), "symphony_workspaces"),
104104
worker_ssh_hosts: [],
105+
worker_max_concurrent_agents_per_host: nil,
105106
max_concurrent_agents: 10,
106107
max_turns: 20,
107108
max_retry_backoff_ms: 300_000,
@@ -138,6 +139,7 @@ defmodule SymphonyElixir.TestSupport do
138139
poll_interval_ms = Keyword.get(config, :poll_interval_ms)
139140
workspace_root = Keyword.get(config, :workspace_root)
140141
worker_ssh_hosts = Keyword.get(config, :worker_ssh_hosts)
142+
worker_max_concurrent_agents_per_host = Keyword.get(config, :worker_max_concurrent_agents_per_host)
141143
max_concurrent_agents = Keyword.get(config, :max_concurrent_agents)
142144
max_turns = Keyword.get(config, :max_turns)
143145
max_retry_backoff_ms = Keyword.get(config, :max_retry_backoff_ms)
@@ -176,7 +178,7 @@ defmodule SymphonyElixir.TestSupport do
176178
" interval_ms: #{yaml_value(poll_interval_ms)}",
177179
"workspace:",
178180
" root: #{yaml_value(workspace_root)}",
179-
worker_yaml(worker_ssh_hosts),
181+
worker_yaml(worker_ssh_hosts, worker_max_concurrent_agents_per_host),
180182
"agent:",
181183
" max_concurrent_agents: #{yaml_value(max_concurrent_agents)}",
182184
" max_turns: #{yaml_value(max_turns)}",
@@ -238,14 +240,18 @@ defmodule SymphonyElixir.TestSupport do
238240
|> Enum.join("\n")
239241
end
240242

241-
defp worker_yaml([]), do: nil
242-
defp worker_yaml(nil), do: nil
243+
defp worker_yaml(ssh_hosts, max_concurrent_agents_per_host)
244+
when ssh_hosts in [nil, []] and is_nil(max_concurrent_agents_per_host),
245+
do: nil
243246

244-
defp worker_yaml(ssh_hosts) do
247+
defp worker_yaml(ssh_hosts, max_concurrent_agents_per_host) do
245248
[
246249
"worker:",
247-
" ssh_hosts: #{yaml_value(ssh_hosts)}"
250+
ssh_hosts not in [nil, []] && " ssh_hosts: #{yaml_value(ssh_hosts)}",
251+
!is_nil(max_concurrent_agents_per_host) &&
252+
" max_concurrent_agents_per_host: #{yaml_value(max_concurrent_agents_per_host)}"
248253
]
254+
|> Enum.reject(&(&1 in [nil, false]))
249255
|> Enum.join("\n")
250256
end
251257

elixir/test/symphony_elixir/core_test.exs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,53 @@ defmodule SymphonyElixir.CoreTest do
703703
assert {:noreply, ^coalesced_state} = Orchestrator.handle_info({:tick, stale_tick_token}, coalesced_state)
704704
end
705705

706+
test "select_worker_host_for_test skips full ssh hosts under the shared per-host cap" do
707+
write_workflow_file!(Workflow.workflow_file_path(),
708+
worker_ssh_hosts: ["worker-a", "worker-b"],
709+
worker_max_concurrent_agents_per_host: 1
710+
)
711+
712+
state = %Orchestrator.State{
713+
running: %{
714+
"issue-1" => %{worker_host: "worker-a"}
715+
}
716+
}
717+
718+
assert Orchestrator.select_worker_host_for_test(state, nil) == "worker-b"
719+
end
720+
721+
test "select_worker_host_for_test returns no_worker_capacity when every ssh host is full" do
722+
write_workflow_file!(Workflow.workflow_file_path(),
723+
worker_ssh_hosts: ["worker-a", "worker-b"],
724+
worker_max_concurrent_agents_per_host: 1
725+
)
726+
727+
state = %Orchestrator.State{
728+
running: %{
729+
"issue-1" => %{worker_host: "worker-a"},
730+
"issue-2" => %{worker_host: "worker-b"}
731+
}
732+
}
733+
734+
assert Orchestrator.select_worker_host_for_test(state, nil) == :no_worker_capacity
735+
end
736+
737+
test "select_worker_host_for_test keeps the preferred ssh host when it still has capacity" do
738+
write_workflow_file!(Workflow.workflow_file_path(),
739+
worker_ssh_hosts: ["worker-a", "worker-b"],
740+
worker_max_concurrent_agents_per_host: 2
741+
)
742+
743+
state = %Orchestrator.State{
744+
running: %{
745+
"issue-1" => %{worker_host: "worker-a"},
746+
"issue-2" => %{worker_host: "worker-b"}
747+
}
748+
}
749+
750+
assert Orchestrator.select_worker_host_for_test(state, "worker-a") == "worker-a"
751+
end
752+
706753
defp assert_due_in_range(due_at_ms, min_remaining_ms, max_remaining_ms) do
707754
remaining_ms = due_at_ms - System.monotonic_time(:millisecond)
708755

elixir/test/symphony_elixir/workspace_and_config_test.exs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do
742742
assert config.tracker.api_key == nil
743743
assert config.tracker.project_slug == nil
744744
assert config.workspace.root == Path.join(System.tmp_dir!(), "symphony_workspaces")
745+
assert config.worker.max_concurrent_agents_per_host == nil
745746
assert config.agent.max_concurrent_agents == 10
746747
assert config.codex.command == "codex app-server"
747748

@@ -813,6 +814,10 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do
813814
assert {:error, {:invalid_workflow_config, message}} = Config.validate!()
814815
assert message =~ "agent.max_concurrent_agents"
815816

817+
write_workflow_file!(Workflow.workflow_file_path(), worker_max_concurrent_agents_per_host: 0)
818+
assert {:error, {:invalid_workflow_config, message}} = Config.validate!()
819+
assert message =~ "worker.max_concurrent_agents_per_host"
820+
816821
write_workflow_file!(Workflow.workflow_file_path(), codex_turn_timeout_ms: "bad")
817822
assert {:error, {:invalid_workflow_config, message}} = Config.validate!()
818823
assert message =~ "codex.turn_timeout_ms"
@@ -955,6 +960,10 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do
955960
assert Config.max_concurrent_agents_for_state("In Review") == 2
956961
assert Config.max_concurrent_agents_for_state("Closed") == 10
957962
assert Config.max_concurrent_agents_for_state(:not_a_string) == 10
963+
964+
write_workflow_file!(Workflow.workflow_file_path(), worker_max_concurrent_agents_per_host: 2)
965+
assert :ok = Config.validate!()
966+
assert Config.settings!().worker.max_concurrent_agents_per_host == 2
958967
end
959968

960969
test "schema helpers cover custom type and state limit validation" do

0 commit comments

Comments
 (0)