diff --git a/periodic_scheduler/AGENTS.md b/periodic_scheduler/AGENTS.md new file mode 100644 index 000000000..8e154f3e9 --- /dev/null +++ b/periodic_scheduler/AGENTS.md @@ -0,0 +1,22 @@ +# Repository Guidelines + +## Project Structure & Module Organization +The repository hosts three Elixir projects. `scheduler/` is the Quantum-based engine that persists and enqueues periodic workflows; its runtime config lives in `scheduler/config/`, business logic in `scheduler/lib/`, tests in `scheduler/test/`, and deployment assets in `scheduler/helm/` and `scheduler/docker-compose.yml`. `definition_validator/` mirrors the standard Mix layout (`lib/`, `config/`, `test/`) and adds Docker-friendly tooling plus deployment manifests. `spec/` stores canonical workflow specifications under `spec/priv/*.yml` and publishes JSON schema artifacts into `spec/publish/`. + +## Build, Test, and Development Commands +Run service-specific commands from within each directory: +- `cd scheduler && mix deps.get && mix test` compiles the scheduler and executes the ExUnit suite; run `mix ecto.create && mix ecto.migrate` beforehand to prep the database, or `make test.ex` for the Dockerized CI-equivalent run that also brings up Postgres and RabbitMQ. +- `cd definition_validator && make unit.test` runs the validator tests inside the standard Elixir container, while `make console CMD="mix run --no-halt"` opens an interactive shell with the project mounted. +- `cd spec && make test` executes the Mix tests in the same container wrapper, and `make publish` regenerates the JSON copies of each YAML schema before shipping. + +## Coding Style & Naming Conventions +Elixir code uses snake_case filenames, PascalCase modules, and descriptive function names. Format before committing (`mix format`), and lint services that include Credo (`definition_validator/Makefile` exposes `make lint`). Keep schema filenames in `spec/priv` lowercase with hyphenated versions only when mirroring public API names. + +## Testing Guidelines +Unit and integration suites rely on ExUnit. Add regression cases under the matching `test/` subtree, group examples with `describe` blocks, and tag slow paths (`@tag :integration`) where useful. For data-dependent scheduler features, seed fixtures through `scheduler/test/support`, and update spec fixtures with `make publish` whenever you change a YAML contract. + +## Commit & Pull Request Guidelines +Follow Conventional Commits (e.g., `feat(scheduler): add retry window`). Each pull request should explain the workflow impact, list verification steps (`mix test`, `make unit.test`, `make publish`), and link Semaphore issues. Include logs or screenshots whenever the change alters scheduling behavior or schema output. Keep scopes tight and ensure linters plus relevant `mix test` invocations pass locally before requesting review. + +## Security & Configuration Tips +Never commit secrets; instead rely on `.env` files referenced by `docker-compose.yml` or pass credentials through the Make targets. Proto files under `scheduler/lib/internal_api` are generated via `scripts/internal_protos.sh`, so ensure your SSH agent can read `git@github.com:renderedtext/internal_api.git`. Prefer Docker BuildKit (`export DOCKER_BUILDKIT=1`) for reproducible images. diff --git a/periodic_scheduler/scheduler/AGENTS.md b/periodic_scheduler/scheduler/AGENTS.md new file mode 100644 index 000000000..b0a6ecd70 --- /dev/null +++ b/periodic_scheduler/scheduler/AGENTS.md @@ -0,0 +1,23 @@ +# Repository Guidelines + +## Project Structure & Module Organization +Runtime code lives under `lib/`, split by domain boundaries (`scheduler/`, `actions/`, `workers/`). Persistence details sit in `lib/scheduler/periodics` and `lib/scheduler/front_db` with schemas mirroring tables. Configuration defaults are in `config/config.exs`, with environment overrides in `config/{dev,test,prod}.exs`. Database migrations and seeds are under `priv/{periodics_repo,front_repo}/`, while `test/` mirrors `lib/` one-to-one for ExUnit coverage. Docker, release, and deployment assets reside in `docker-compose.yml`, `rel/`, and `helm/`. Shared automation scripts sit inside `scripts/`. For a deeper walkthrough, treat `DOCUMENTATION.md` as the go-to triage guide before diving into tasks. + +## Build, Test, and Development Commands +- `mix deps.get && mix compile` installs dependencies and compiles the application locally. +- `MIX_ENV=test make test.ex.setup` prepares the Postgres schema inside Docker for integration tests. +- `mix test` or `mix test --cover` runs the ExUnit suite (JUnit formatter available via `MIX_ENV=test mix test --formatter JUnitFormatter` for CI). +- `mix credo --strict` runs linting, and `mix format` enforces the formatter prior to commits. +- `docker compose up app` boots the scheduler in a container with RabbitMQ/Postgres defaults from the `.env` files. + +## Coding Style & Naming Conventions +Follow the default Elixir formatter (2-space indentation, pipe-first style). Modules use `Scheduler.*` namespaces that map to folders (e.g., `Scheduler.Periodic.Job` ⇔ `lib/scheduler/periodic/job.ex`). Functions are snake_case verbs, macros are camel-case nouns, and tests end in `_test.exs`. Keep public modules documented with `@moduledoc` and prefer pattern matching + guard clauses over nested conditionals. Run `mix format && mix credo` before every push. + +## Testing Guidelines +Author unit tests alongside code in `test/_test.exs`. Use ExUnit’s `describe` blocks per function and tag integration tests with `@moduletag :integration` so CI can filter. Ensure new DB queries include data factories from `test/support`. Aim to maintain or improve coverage reported by `mix test --cover`; add regression tests for every bugfix. + +## Commit & Pull Request Guidelines +Commits typically follow `type(scope): imperative summary (#issue)` as seen in `git log` (e.g., `fix(secrethub): align cache headers (#705)`). Keep commits focused and reference Jira/GitHub IDs in the summary. Pull requests must describe motivation, list test evidence (`mix test`, manual steps), and link related issues or design docs. Include screenshots or logs when UI/API behavior changes and request reviews from domain owners noted in CODEOWNERS. + +## Security & Configuration Tips +Never commit `.env` files or credentials; rely on the provided Docker defaults and override locally via `config/dev.secret.exs`. When touching `scripts/internal_protos.sh` or `pb.gen`, confirm you have VPN + GitHub access before cloning `renderedtext/internal_api`. Validate all scheduler configuration changes against `config/runtime.exs` to avoid breaking production start-up. diff --git a/periodic_scheduler/scheduler/DOCUMENTATION.md b/periodic_scheduler/scheduler/DOCUMENTATION.md new file mode 100644 index 000000000..9c93b923d --- /dev/null +++ b/periodic_scheduler/scheduler/DOCUMENTATION.md @@ -0,0 +1,46 @@ +# Repository Architecture Notes + +## Purpose & High-Level Flow +- This Mix project (`mix.exs`) hosts Semaphore’s periodic workflow scheduler. It exposes a gRPC API (`lib/scheduler/grpc_server.ex`, `lib/scheduler/health_check_server.ex`) that the control plane uses to CRUD periodic definitions and trigger runs. +- `Scheduler.Application` boots the critical OTP tree: both Postgres repos, the gRPC servers, Quantum-based cron scheduler, the dynamic schedule task supervisor, initializer, and the RabbitMQ consumers that react to org block/unblock events. +- At runtime, `Scheduler.Workers.QuantumScheduler` converts cron strings into Quantum jobs that call `Scheduler.Actions.start_schedule_task/2`. Each run spawns a `Scheduler.Workers.ScheduleTask` process to orchestrate workflow execution via downstream APIs. + +## Directory Cheat Sheet +- `lib/scheduler/*` holds service code grouped by concern (actions, workers, clients, repos, DB models, utils). +- `lib/internal_api` mirrors protobuf/gRPC stubs pulled from `renderedtext/internal_api`. Run `make pb.gen` if proto inputs change. +- `priv/periodics_repo` and `priv/front_repo` store migrations for the two databases. `lib/scheduler/periodics` and `lib/scheduler/front_db` carry the matching schemas/query modules. +- `test/` mirrors `lib/` one-to-one; heavier integration helpers sit under `test/support`. +- Deploy tooling: `Dockerfile`, `docker-compose.yml`, `helm/`, `rel/`. Helper scripts live in `scripts/` (notably `internal_protos.sh` and `vagrant_sudo`). + +## Runtime Components +- **Actions layer** (`lib/scheduler/actions*.ex`): thin service layer invoked by gRPC endpoints. Each action module (ApplyImpl, ListImpl, PersistImpl, etc.) encapsulates validation, DB access, and calls to external services. Metrics are emitted via Watchman (see counters in `Scheduler.Actions`). +- **Workers**: `Initializer` pre-warms Quantum jobs by paging through `Scheduler.Periodics.Model.PeriodicsQueries`. `QuantumScheduler` owns long-lived cron jobs, while `ScheduleTaskManager` supervises short-lived `ScheduleTask` processes that call downstream APIs (`WorkflowClient`, `ProjecthubClient`, `RepositoryClient`, `RepoProxyClient`). +- **Messaging**: `Scheduler.EventsConsumers.OrgBlocked` / `OrgUnblocked` subscribe to RabbitMQ via Tackle. They suspend/resume Quantum jobs in batches per organization. +- **Feature flags & metrics**: `FeatureProvider.init/1` picks either YAML-based flags (when `FEATURE_YAML_PATH` exists) or the gRPC-driven `Scheduler.FeatureHubProvider`. Observability is wired through `watchman` and `vmstats` (`Scheduler.VmstatsSink`, `config/config.exs`). + +## Data & Persistence +- `Scheduler.PeriodicsRepo` targets the `periodics_*` database (cron definitions, triggers, delete requests). Key schemas live under `lib/scheduler/periodics/model/`. +- `Scheduler.FrontRepo` connects to the `front_*` DB for project/org metadata (`lib/scheduler/front_db/*`). +- Trigger history is modeled in `lib/scheduler/periodics_triggers/model`, offering both offset (`Scrivener`) and keyset (`Paginator`) pagination helpers. +- Soft-delete pipeline: requests are staged through `lib/scheduler/delete_requests/model` and eventually cleared by workers. +- Config lives in `config/*.exs` with prod overrides in `config/runtime.exs`; most secrets arrive via env vars (`DB_*`, `RABBITMQ_URL`, `INTERNAL_API_URL_*`). + +## Build, Test, and Common Commands +- `mix deps.get && mix compile` – install deps and compile locally. +- `mix test` (optionally `--cover` or `--only integration`) – ExUnit suite; reports land in `./out/test-reports.xml` when `JunitFormatter` is enabled. +- `MIX_ENV=test make test.ex.setup` – boot Postgres (docker compose) and run migrations + seeds for integration specs. +- `mix credo --strict` and `mix format` – lint/format gates prior to commits. +- `docker compose up app` – run the scheduler plus backing services via the provided compose file; adjust `.env` as needed. +- `make pb.gen` – clone `renderedtext/internal_api`, regenerate protobuf stubs into `lib/internal_api`. + +## External Dependencies & Touchpoints +- gRPC backends: feature API, repository hub, repo proxy, project hub, and workflow API. Their endpoints are injected via `INTERNAL_API_URL_*` variables (see `config/runtime.exs`). +- RabbitMQ exchange `organization_exchange` (routing keys `blocked` / `unblocked`) throttles scheduling when an org’s billing status changes. +- FeatureProvider cache uses Cachex (started in `Scheduler.Application` for non-Test environments) with 10-minute TTL. +- Metrics flow to Watchman/StatsD (namespaced by `METRICS_*` env vars); VM stats are emitted every 10s. + +## Tips for Future Changes +- Touching cron semantics? Update both Quantum job creation (`QuantumScheduler`) and the validation logic inside `Scheduler.Actions.ApplyImpl` / `PersistImpl`. +- When adjusting DB queries, remember both pagination strategies (`paginate_offset` and `paginate_keyset`) and the mirrored tests in `test/periodics_*`. +- New gRPC fields require regenerating protos (`make pb.gen`) and updating the transformation helpers in `Scheduler.Grpc.Server`. +- Long-running tasks should go through `ScheduleTaskManager` to benefit from supervision and Watchman metrics; avoid blocking the Quantum scheduler process. diff --git a/periodic_scheduler/scheduler/config/runtime.exs b/periodic_scheduler/scheduler/config/runtime.exs index b3f8fbca9..f96299f4d 100644 --- a/periodic_scheduler/scheduler/config/runtime.exs +++ b/periodic_scheduler/scheduler/config/runtime.exs @@ -52,8 +52,7 @@ if config_env() == :prod do ]} path -> - {FeatureProvider.YamlProvider, - [yaml_path: get_env!.("FEATURE_YAML_PATH"), agent_name: :feature_provider_agent]} + {FeatureProvider.YamlProvider, [yaml_path: path, agent_name: :feature_provider_agent]} end config :scheduler, diff --git a/periodic_scheduler/scheduler/docker-compose.yml b/periodic_scheduler/scheduler/docker-compose.yml index bca2db703..9f21c265a 100644 --- a/periodic_scheduler/scheduler/docker-compose.yml +++ b/periodic_scheduler/scheduler/docker-compose.yml @@ -1,6 +1,6 @@ services: app: - build: + build: context: ../.. dockerfile: periodic_scheduler/scheduler/Dockerfile target: dev @@ -26,6 +26,8 @@ services: tty: true volumes: - ../:/app:delegated + - /app/scheduler/_build + - /app/scheduler/deps working_dir: "/app/scheduler" ciapp: @@ -39,7 +41,7 @@ services: SSH_AUTH_SOCK: ${SSH_AUTH_SOCK} depends_on: postgres: - condition: service_healthy + condition: service_healthy rabbitmq: condition: service_healthy profiles: ["ci"] diff --git a/periodic_scheduler/scheduler/lib/scheduler/events_consumers/org_unblocked.ex b/periodic_scheduler/scheduler/lib/scheduler/events_consumers/org_unblocked.ex index b66418a87..16ea44957 100644 --- a/periodic_scheduler/scheduler/lib/scheduler/events_consumers/org_unblocked.ex +++ b/periodic_scheduler/scheduler/lib/scheduler/events_consumers/org_unblocked.ex @@ -39,7 +39,15 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked do defp unsuspend_batch(org_id, batch_no) do with {:ok, periodics} <- PeriodicsQueries.get_all_from_org(org_id, batch_no), {:periodics_found, true} <- {:periodics_found, length(periodics) > 0}, - {:ok, _periodics} <- unsuspend_periodics(periodics) do + {:ok, %{failed: failed}} <- unsuspend_periodics(periodics) do + if failed != [] do + failed_count = length(failed) + Watchman.submit("scheduler.org_unblocked.unsuspend.failure", failed_count) + + failed + |> LT.warn("Failed to unsuspend some periodics for organization #{org_id}") + end + unsuspend_batch(org_id, batch_no + 1) else {:periodics_found, false} -> @@ -51,13 +59,27 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked do end defp unsuspend_periodics(periodics) do - periodics - |> Enum.reduce_while({:ok, []}, fn periodic, {:ok, results} -> - case unsuspend_periodic(periodic) do - {:ok, periodic} -> {:cont, {:ok, results ++ [periodic]}} - error -> {:halt, error} - end - end) + result = + Enum.reduce(periodics, %{unsuspended: [], failed: []}, fn periodic, acc -> + case unsuspend_periodic(periodic) do + {:ok, periodic} -> + %{acc | unsuspended: [periodic | acc.unsuspended]} + + {:error, reason} -> + failed_entry = %{id: periodic.id, reason: reason} + %{acc | failed: [failed_entry | acc.failed]} + + other -> + failed_entry = %{id: periodic.id, reason: other} + %{acc | failed: [failed_entry | acc.failed]} + end + end) + + {:ok, + %{ + unsuspended: Enum.reverse(result.unsuspended), + failed: Enum.reverse(result.failed) + }} end defp unsuspend_periodic(periodic) do diff --git a/periodic_scheduler/scheduler/lib/scheduler/workers/quantum_scheduler.ex b/periodic_scheduler/scheduler/lib/scheduler/workers/quantum_scheduler.ex index e7e0013d3..a416557a4 100644 --- a/periodic_scheduler/scheduler/lib/scheduler/workers/quantum_scheduler.ex +++ b/periodic_scheduler/scheduler/lib/scheduler/workers/quantum_scheduler.ex @@ -26,7 +26,17 @@ defmodule Scheduler.Workers.QuantumScheduler do |> ToTuple.ok() end - defp add_random_second(at_string) do + defp add_random_second(at_string) when is_binary(at_string) do + at_string + |> String.trim() + |> do_add_random_second() + end + + defp add_random_second(_), do: {:error, :missing_cron_expression} + + defp do_add_random_second(""), do: {:error, :missing_cron_expression} + + defp do_add_random_second(at_string) do with {:ok, schedule} <- Crontab.CronExpression.Parser.parse(at_string), rand_sec <- :rand.uniform(60) - 1, schedule <- Map.merge(schedule, %{extended: true, second: [rand_sec]}) do diff --git a/periodic_scheduler/scheduler/test/events_consumers/org_unblocked_test.exs b/periodic_scheduler/scheduler/test/events_consumers/org_unblocked_test.exs index 821d8020e..fd9ed83ec 100644 --- a/periodic_scheduler/scheduler/test/events_consumers/org_unblocked_test.exs +++ b/periodic_scheduler/scheduler/test/events_consumers/org_unblocked_test.exs @@ -1,13 +1,32 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do use ExUnit.Case + import Ecto.Query, only: [from: 2] + alias InternalApi.Organization.OrganizationUnblocked alias Scheduler.Periodics.Model.PeriodicsQueries + alias Scheduler.Periodics.Model.Periodics + alias Scheduler.PeriodicsRepo alias Scheduler.EventsConsumers.OrgUnblocked alias Scheduler.Workers.QuantumScheduler alias Scheduler.Actions alias Util.Proto + @grpc_port 50_057 + @mocked_services [Test.MockFeatureService] + + setup_all do + GRPC.Server.start(@mocked_services, @grpc_port) + {:ok, consumer_pid} = start_org_unblocked_consumer() + + on_exit(fn -> + stop_org_unblocked_consumer(consumer_pid) + GRPC.Server.stop(@mocked_services) + end) + + :ok + end + setup do Test.Helpers.truncate_db() @@ -15,13 +34,17 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do ids_1 = Test.Helpers.seed_front_db() ids_2 = Test.Helpers.seed_front_db() + ids_3 = Test.Helpers.seed_front_db() start_supervised!(QuantumScheduler) System.put_env("GITHUB_APP_ID", "client_id") System.put_env("GITHUB_SECRET_ID", "client_secret") - {:ok, %{ids_1: ids_1, ids_2: ids_2}} + reset_mock_feature_service() + mock_feature_response("just_run") + + {:ok, %{ids_1: ids_1, ids_2: ids_2, ids_3: ids_3}} end test "valid message received => all periodics from the org are unsuspended", ctx do @@ -44,8 +67,6 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do event = Proto.deep_new!(OrganizationUnblocked, %{org_id: ctx.ids_1.org_id}) encoded = OrganizationUnblocked.encode(event) - assert {:ok, _pid} = OrgUnblocked.start_link() - Tackle.publish(encoded, exchange_params()) :timer.sleep(2_000) @@ -53,6 +74,55 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do assert_only_periodics_from_unblocked_org_unsuspended(periodics, ctx.ids_1.org_id) end + test "invalid cron expression does not block unsuspending other periodics", ctx do + [{:ok, first_id}, {:ok, invalid_id}, {:ok, nil_cron_id}, {:ok, third_id}] = + 1..4 + |> Enum.map(&create_periodic(ctx.ids_3, &1)) + + invalidate_cron_expression(invalid_id) + remove_cron_expression(nil_cron_id) + + event = Proto.deep_new!(OrganizationUnblocked, %{org_id: ctx.ids_3.org_id}) + encoded = OrganizationUnblocked.encode(event) + + Tackle.publish(encoded, exchange_params()) + + :timer.sleep(2_000) + + assert {:ok, first_periodic} = PeriodicsQueries.get_by_id(first_id) + assert first_periodic.suspended == false + assert nil != first_id |> String.to_atom() |> QuantumScheduler.find_job() + + assert {:ok, invalid_periodic} = PeriodicsQueries.get_by_id(invalid_id) + assert invalid_periodic.suspended == false + assert nil == invalid_id |> String.to_atom() |> QuantumScheduler.find_job() + + assert {:ok, nil_cron_periodic} = PeriodicsQueries.get_by_id(nil_cron_id) + assert nil_cron_periodic.suspended == false + assert nil == nil_cron_id |> String.to_atom() |> QuantumScheduler.find_job() + + assert {:ok, third_periodic} = PeriodicsQueries.get_by_id(third_id) + assert third_periodic.suspended == false + assert nil != third_id |> String.to_atom() |> QuantumScheduler.find_job() + end + + test "returns error when database is unavailable", ctx do + repo_supervisor = Scheduler.Supervisor + repo_child = Scheduler.PeriodicsRepo + + on_exit(fn -> + if Process.whereis(repo_child) == nil do + {:ok, _pid} = Supervisor.start_child(repo_supervisor, {repo_child, []}) + end + end) + + assert :ok = Supervisor.terminate_child(repo_supervisor, repo_child) + assert :ok = Supervisor.delete_child(repo_supervisor, repo_child) + + assert {:error, _reason} = + OrgUnblocked.unsuspend_periodics_from_org({:ok, %{org_id: ctx.ids_1.org_id}}) + end + defp create_periodic(ids, ind) do {:ok, id} = %{ @@ -96,6 +166,16 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do end) end + defp invalidate_cron_expression(id) do + from(p in Periodics, where: p.id == ^id) + |> PeriodicsRepo.update_all(set: [at: "invalid cron"]) + end + + defp remove_cron_expression(id) do + from(p in Periodics, where: p.id == ^id) + |> PeriodicsRepo.update_all(set: [at: nil]) + end + defp exchange_params() do %{ url: System.get_env("RABBITMQ_URL"), @@ -103,4 +183,47 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do routing_key: "unblocked" } end + + defp start_org_unblocked_consumer do + case OrgUnblocked.start_link() do + {:ok, pid} -> + wait_for_org_unblocked_consumer() + {:ok, pid} + + {:error, {:already_started, pid}} -> + wait_for_org_unblocked_consumer() + {:ok, pid} + + error -> + error + end + end + + defp wait_for_org_unblocked_consumer do + # Give Tackle time to register a default consumer before publishing + Process.sleep(200) + end + + defp stop_org_unblocked_consumer(pid) when is_pid(pid) do + if Process.alive?(pid) do + GenServer.stop(pid) + else + :ok + end + end + + defp stop_org_unblocked_consumer(_), do: :ok + + defp reset_mock_feature_service() do + Cachex.clear(Elixir.Scheduler.FeatureHubProvider) + + Application.put_env( + :scheduler, + :feature_api_grpc_endpoint, + "localhost:#{inspect(@grpc_port)}" + ) + end + + defp mock_feature_response(value), + do: Application.put_env(:scheduler, :mock_feature_service_response, value) end