Skip to content

Commit 0f2977d

Browse files
fix(zebra): isolate dispatching of jobs per os_image (#625)
## 📝 Description Isolate dispatching of cloud jobs per `os_image` (on top of `machine_type`) ## ✅ Checklist - [x] I have tested this change - [ ] This change requires documentation update
1 parent 45f0aca commit 0f2977d

File tree

5 files changed

+213
-15
lines changed

5 files changed

+213
-15
lines changed

zebra/AGENTS.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Repository Guidelines
2+
3+
## Project Structure & Module Organization
4+
Semaphore’s monorepo mixes Elixir services (`auth/`, `guard/`, `projecthub/`), Go tooling (`bootstrapper/`, `repohub/`), and the Phoenix/React UI in `front/`. Elixir services keep source in `lib/` and ExUnit suites in `test/`. Go utilities follow the `cmd/` entrypoint with reusable code under `pkg/`. Frontend assets, including React components and bundles, live in `front/assets/`. Shared documentation sits in `docs/` and `rfcs/`, while enterprise-specific code is isolated under `ee/`.
5+
6+
## Build, Test, and Development Commands
7+
Run commands from the relevant service directory. Use `make build` to produce Docker images for CI parity. Execute `make test.ex` (or `make test.ex TEST_FILE=test/<path>.exs`) to run ExUnit suites. For Go modules, run `make test` (`go test ./...`) and add `-race` when debugging concurrency. Frontend checks run through `make test.js`. Start the Phoenix UI locally with `make dev.server`, and consult `LOCAL-DEVELOPMENT.md` for Minikube or dev-container workflows.
8+
9+
## Coding Style & Naming Conventions
10+
Elixir modules are PascalCase with files in snake_case; tests end in `_test.exs`. Go packages stay lowercase, with exported identifiers PascalCase. React components use PascalCase filenames inside `front/assets/`. Before committing, run `make format.ex`, `make lint` for Go format and static checks, and `make lint.js` for ESLint/Prettier to keep the tree consistent.
11+
12+
## Testing Guidelines
13+
Favor focused ExUnit `describe` blocks and use tags such as `--only integration` for longer suites. Keep Go tests table-driven in `_test.go` files. Frontend changes require Jest coverage via `make test.js`. Add regression tests alongside fixes and align Phoenix endpoint updates with matching ExUnit cases.
14+
15+
## Commit & Pull Request Guidelines
16+
Commits follow Conventional Commits, e.g., `feat(auth): add token audit trail`. PRs should explain rationale, outline risk, and link issues when available. Include screenshots or logs for UI or automation changes, and confirm formatters, linters, and relevant `make test*` targets have passed before requesting review.
17+
18+
## Security & Configuration Tips
19+
Run `make check.ex.deps`, `make check.go.deps`, and `make check.docker` regularly to catch dependency and image issues. Keep secrets in local `.env` files referenced by docker-compose, never checked into the repository, and export `DOCKER_BUILDKIT=1` for reproducible Docker builds. For deeper architectural context, review `DOCUMENTATION.md`.

zebra/DOCUMENTATION.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Architecture Handbook
2+
3+
## Overview
4+
Zebra is an Elixir OTP application responsible for job lifecycle orchestration inside Semaphore. It exposes several gRPC services on port `50051`, coordinates queueing and dispatching through background workers, and delegates persistence to a legacy Postgres schema via Ecto.
5+
6+
## Code Layout
7+
- `lib/zebra/application.ex` boots the supervision tree, wiring caches, the feature flag provider, and optional gRPC servers (public/internal job & task APIs, health checks). Services are toggled with `START_*` env vars.
8+
- `lib/zebra/apis/` implements gRPC endpoints. Concrete service modules live under `public_job_api/`, `internal_job_api/`, and `internal_task_api/`, with shared helpers in `utils.ex`.
9+
- `lib/protos/` houses protobuf contracts. Regenerate stubs with `mix grpc.gen` if schemas change upstream.
10+
- `lib/zebra/models/` defines Ecto schemas and state helpers (`Job`, `Task`, `Project`, etc.). Database access goes through `Zebra.LegacyRepo`, whose migrations live in `priv/legacy_repo/migrations/`.
11+
- `lib/zebra/workers/` contains GenServer pipelines (`Dispatcher`, `Scheduler`, `TaskFinisher`, callback handlers) driven by `Workers.active/1`. Workers rely on `DbWorker` to poll the legacy database and on Watchman metrics for observability.
12+
- Support modules include `cache.ex` (Cachex caches), `monitor.ex` (Watchman instrumentation), `machines/` (brownout scheduling), and `feature_hub_provider.ex`.
13+
14+
## Data & External Dependencies
15+
Endpoints for upstream services (artifacthub, cachehub, projecthub, RBAC, etc.) are configured in `config/*.exs` and default to `localhost:50051` for tests. Workers interact with AMQP (`tackle`), GRPC agents (`HostedAgent`, `SelfHostedAgent`), and feature flags (`FeatureProvider`). Secrets stay out of the repo—load them via `.env` for docker-compose.
16+
17+
## Running & Development
18+
- `mix deps.get` to install dependencies; `make build` creates Docker images matching CI.
19+
- `make dev.server` starts the Phoenix UI shelling into this service; alternatively `iex -S mix phx.server` if you need an interactive node.
20+
- Feature flags require `FEATURE_YAML_PATH` or remote provider credentials before boot.
21+
22+
## Testing & Quality
23+
- `make test.ex` wraps `mix test`, seeding/migrating the legacy repo automatically. Use `TEST_FILE=test/zebra/...` to scope cases.
24+
- Workers and gRPC services have fake counterparts under `test/support/fake_servers/` to keep suites hermetic. Extend these fakes when adding new integrations.
25+
- Linting/formatting: `make format.ex` (mix format + Credo), `make lint` for Go stubs (if any), and `make lint.js` for the Phoenix assets.
26+
27+
## Operations & Troubleshooting
28+
- Set `WATCHMAN_HOST` to push metrics; logs are structured via Logger with Sentry backend (`Sentry.LoggerBackend`).
29+
- Scheduler cadence and worker batch sizes live in `config/*.exs` (`Zebra.Workers.Scheduler`, `Dispatcher` timeouts). Tune via env overrides before scaling changes.
30+
- For local brownout testing, edit `lib/zebra/machines/brownout_schedule.ex` and restart the node; the scheduler pulls updates on boot.

zebra/lib/zebra/workers/db_worker.ex

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ defmodule Zebra.Workers.DbWorker do
99
:state_field,
1010
:state_value,
1111
:machine_type_field,
12+
:machine_os_image_field,
1213
:machine_type_environment,
1314
:metric_name,
1415
:order_by,
@@ -45,16 +46,16 @@ defmodule Zebra.Workers.DbWorker do
4546
Watchman.benchmark("#{worker.metric_name}.tick.duration", fn ->
4647
if isolate_machine_types do
4748
query_machine_types(worker)
48-
|> Enum.each(fn machine_type -> tick_(worker, machine_type) end)
49+
|> Enum.each(fn machine_type_tuple -> tick_(worker, machine_type_tuple) end)
4950
else
5051
tick_(worker)
5152
end
5253
end)
5354
end
5455

55-
def tick_(worker, machine_type \\ nil) do
56-
rows = query_jobs(worker, machine_type)
57-
submit_batch_size(worker.metric_name, length(rows), machine_type)
56+
def tick_(worker, machine_type_tuple \\ nil) do
57+
rows = query_jobs(worker, machine_type_tuple)
58+
submit_batch_size(worker.metric_name, length(rows), machine_type_tuple)
5859

5960
parallelism = worker.parallelism || 10
6061

@@ -65,8 +66,8 @@ defmodule Zebra.Workers.DbWorker do
6566

6667
defp submit_batch_size(name, v, nil), do: Watchman.submit("#{name}.batch_size", v)
6768

68-
defp submit_batch_size(name, v, machine_type),
69-
do: Watchman.submit({"#{name}.batch_size", [machine_type]}, v)
69+
defp submit_batch_size(name, v, {machine_type, machine_os_image}),
70+
do: Watchman.submit({"#{name}.batch_size", ["#{machine_type}-#{machine_os_image}"]}, v)
7071

7172
def process(worker, id) do
7273
Watchman.benchmark("#{worker.metric_name}.process.duration", fn ->
@@ -98,14 +99,15 @@ defmodule Zebra.Workers.DbWorker do
9899

99100
defp query_machine_types(worker) do
100101
machine_type_environment = worker.machine_type_environment || :all
102+
machine_os_image_field = worker.machine_os_image_field
101103

102104
cond do
103105
machine_type_environment == :all ->
104106
Repo.all(
105107
from(r in worker.schema,
106108
where: field(r, ^worker.state_field) == ^worker.state_value,
107-
distinct: r.machine_type,
108-
select: r.machine_type
109+
distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)],
110+
select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)}
109111
)
110112
)
111113

@@ -114,8 +116,8 @@ defmodule Zebra.Workers.DbWorker do
114116
from(r in worker.schema,
115117
where: field(r, ^worker.state_field) == ^worker.state_value,
116118
where: like(field(r, ^worker.machine_type_field), @self_hosted_prefix),
117-
distinct: r.machine_type,
118-
select: r.machine_type
119+
distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)],
120+
select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)}
119121
)
120122
)
121123

@@ -124,8 +126,8 @@ defmodule Zebra.Workers.DbWorker do
124126
from(r in worker.schema,
125127
where: field(r, ^worker.state_field) == ^worker.state_value,
126128
where: not like(field(r, ^worker.machine_type_field), @self_hosted_prefix),
127-
distinct: r.machine_type,
128-
select: r.machine_type
129+
distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)],
130+
select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)}
129131
)
130132
)
131133
end
@@ -175,19 +177,35 @@ defmodule Zebra.Workers.DbWorker do
175177
end
176178
end
177179

178-
defp query_jobs(worker, machine_type) do
180+
defp query_jobs(worker, {machine_type, machine_os_image}) do
179181
order_by = worker.order_by || :id
180182
order_dir = worker.order_direction || :asc
181183
records_per_tick = worker.records_per_tick || 100
184+
machine_os_image_field = worker.machine_os_image_field
182185

183-
Repo.all(
186+
base_query =
184187
from(r in worker.schema,
185188
where: field(r, ^worker.state_field) == ^worker.state_value,
186-
where: field(r, ^worker.machine_type_field) == ^machine_type,
189+
where: field(r, ^worker.machine_type_field) == ^machine_type
190+
)
191+
192+
filtered_query =
193+
maybe_filter_machine_os_image(base_query, machine_os_image_field, machine_os_image)
194+
195+
Repo.all(
196+
from(r in filtered_query,
187197
order_by: [{^order_dir, ^order_by}],
188198
select: r.id,
189199
limit: ^records_per_tick
190200
)
191201
)
192202
end
203+
204+
defp maybe_filter_machine_os_image(query, field, nil) do
205+
from(r in query, where: is_nil(field(r, ^field)))
206+
end
207+
208+
defp maybe_filter_machine_os_image(query, field, value) do
209+
from(r in query, where: field(r, ^field) == ^value)
210+
end
193211
end

zebra/lib/zebra/workers/dispatcher.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ defmodule Zebra.Workers.Dispatcher do
1414
state_field: :aasm_state,
1515
state_value: Zebra.Models.Job.state_scheduled(),
1616
machine_type_field: :machine_type,
17+
machine_os_image_field: :machine_os_image,
1718
machine_type_environment: machine_type_environment(),
1819
order_by: :scheduled_at,
1920
order_direction: :asc,

zebra/test/zebra/workers/dispatcher_test.exs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,136 @@ defmodule Zebra.Workers.DispatcherTest do
231231
# a NOT_FOUND response from chmura, and stopped trying to occupy agents.
232232
assert Zebra.Workers.DispatcherTest.Counter.value() == 10
233233
end
234+
235+
test "isolates dispatching by both machine_type and os_image" do
236+
System.put_env("DISPATCH_SELF_HOSTED_ONLY", "false")
237+
System.put_env("DISPATCH_CLOUD_ONLY", "false")
238+
239+
# we need to have at least 20 for each os_image to ensure that we
240+
# don't stop batching when we receive a NOT_FOUND response from chmura
241+
# and stop trying to occupy agents.
242+
243+
ubuntu2404_jobs =
244+
Enum.map(1..20, fn _ ->
245+
{:ok, job} =
246+
Support.Factories.Job.create(:scheduled, %{
247+
machine_type: "e1-standard-2",
248+
machine_os_image: "ubuntu2404"
249+
})
250+
251+
job
252+
end)
253+
254+
# Create jobs with same machine_type but different os_images
255+
ubuntu1804_jobs =
256+
Enum.map(1..20, fn _ ->
257+
{:ok, job} =
258+
Support.Factories.Job.create(:scheduled, %{
259+
machine_type: "e1-standard-2",
260+
machine_os_image: "ubuntu1804"
261+
})
262+
263+
job
264+
end)
265+
266+
ubuntu2004_jobs =
267+
Enum.map(1..20, fn _ ->
268+
{:ok, job} =
269+
Support.Factories.Job.create(:scheduled, %{
270+
machine_type: "e1-standard-2",
271+
machine_os_image: "ubuntu2004"
272+
})
273+
274+
job
275+
end)
276+
277+
# Track which os_images were requested
278+
agent_requests = Agent.start_link(fn -> [] end)
279+
280+
GrpcMock.stub(Support.FakeServers.ChmuraApi, :occupy_agent, fn req, _ ->
281+
Agent.update(elem(agent_requests, 1), fn list ->
282+
[req.machine.os_image | list]
283+
end)
284+
285+
if req.machine.os_image == "ubuntu2404" do
286+
raise GRPC.RPCError, status: GRPC.Status.not_found(), message: "No suitable agent found"
287+
else
288+
%InternalApi.Chmura.OccupyAgentResponse{
289+
agent: %InternalApi.Chmura.Agent{
290+
id: Ecto.UUID.generate(),
291+
ip_address: "1.2.3.4",
292+
ssh_port: 80,
293+
ctrl_port: 80,
294+
auth_token: "asdas"
295+
}
296+
}
297+
end
298+
end)
299+
300+
with_stubbed_http_calls(fn ->
301+
Worker.init() |> Zebra.Workers.DbWorker.tick()
302+
end)
303+
304+
# ubuntu1804 and ubuntu2004 jobs should be started
305+
(ubuntu1804_jobs ++ ubuntu2004_jobs)
306+
|> Enum.each(fn job ->
307+
job = Job.reload(job)
308+
assert Job.started?(job) == true
309+
end)
310+
311+
# ubuntu2404 jobs should remain scheduled (no agents available)
312+
ubuntu2404_jobs
313+
|> Enum.each(fn job ->
314+
job = Job.reload(job)
315+
assert Job.scheduled?(job) == true
316+
end)
317+
318+
# Verify that requests were made with the correct os_images
319+
requested_os_images = Agent.get(elem(agent_requests, 1), & &1)
320+
assert length(requested_os_images) == 50
321+
assert Enum.count(requested_os_images, &(&1 == "ubuntu1804")) == 20
322+
assert Enum.count(requested_os_images, &(&1 == "ubuntu2004")) == 20
323+
# only one batch requested
324+
assert Enum.count(requested_os_images, &(&1 == "ubuntu2404")) == 10
325+
end
326+
327+
test "dispatches self-hosted jobs when os_image is blank or nil" do
328+
System.put_env("DISPATCH_SELF_HOSTED_ONLY", "false")
329+
System.put_env("DISPATCH_CLOUD_ONLY", "false")
330+
331+
{:ok, blank_image_job} =
332+
Support.Factories.Job.create(:scheduled, %{
333+
machine_type: "s1-local-testing",
334+
machine_os_image: ""
335+
})
336+
337+
{:ok, nil_image_job} =
338+
Support.Factories.Job.create(:scheduled, %{
339+
machine_type: "s1-local-testing",
340+
machine_os_image: nil
341+
})
342+
343+
response = %InternalApi.SelfHosted.OccupyAgentResponse{
344+
agent_id: @agent_id,
345+
agent_name: "self-hosted-agent"
346+
}
347+
348+
GrpcMock.stub(Support.FakeServers.SelfHosted, :occupy_agent, fn _, _ -> response end)
349+
350+
with_stubbed_http_calls(fn ->
351+
Worker.init() |> Zebra.Workers.DbWorker.tick()
352+
end)
353+
354+
blank_image_job = Job.reload(blank_image_job)
355+
nil_image_job = Job.reload(nil_image_job)
356+
357+
assert Job.started?(blank_image_job)
358+
assert Job.started?(nil_image_job)
359+
assert blank_image_job.agent_id == @agent_id
360+
assert nil_image_job.agent_id == @agent_id
361+
assert blank_image_job.machine_os_image == ""
362+
assert nil_image_job.machine_os_image in [nil, ""]
363+
end
234364
end
235365

236366
describe ".process" do

0 commit comments

Comments
 (0)