Skip to content

Commit 40f7711

Browse files
committed
fix(periodic_scheduler): handle partial unsuspend failures
1 parent 22b57bf commit 40f7711

File tree

3 files changed

+130
-13
lines changed

3 files changed

+130
-13
lines changed

periodic_scheduler/scheduler/docker-compose.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
app:
3-
build:
3+
build:
44
context: ../..
55
dockerfile: periodic_scheduler/scheduler/Dockerfile
66
target: dev
@@ -26,6 +26,8 @@ services:
2626
tty: true
2727
volumes:
2828
- ../:/app:delegated
29+
- /app/scheduler/_build
30+
- /app/scheduler/deps
2931
working_dir: "/app/scheduler"
3032

3133
ciapp:
@@ -39,7 +41,7 @@ services:
3941
SSH_AUTH_SOCK: ${SSH_AUTH_SOCK}
4042
depends_on:
4143
postgres:
42-
condition: service_healthy
44+
condition: service_healthy
4345
rabbitmq:
4446
condition: service_healthy
4547
profiles: ["ci"]

periodic_scheduler/scheduler/lib/scheduler/events_consumers/org_unblocked.ex

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked do
3939
defp unsuspend_batch(org_id, batch_no) do
4040
with {:ok, periodics} <- PeriodicsQueries.get_all_from_org(org_id, batch_no),
4141
{:periodics_found, true} <- {:periodics_found, length(periodics) > 0},
42-
{:ok, _periodics} <- unsuspend_periodics(periodics) do
42+
{:ok, %{failed: failed}} <- unsuspend_periodics(periodics) do
43+
if failed != [] do
44+
failed
45+
|> LT.warn("Failed to unsuspend some periodics for organization #{org_id}")
46+
end
47+
4348
unsuspend_batch(org_id, batch_no + 1)
4449
else
4550
{:periodics_found, false} ->
@@ -51,13 +56,27 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked do
5156
end
5257

5358
defp unsuspend_periodics(periodics) do
54-
periodics
55-
|> Enum.reduce_while({:ok, []}, fn periodic, {:ok, results} ->
56-
case unsuspend_periodic(periodic) do
57-
{:ok, periodic} -> {:cont, {:ok, results ++ [periodic]}}
58-
error -> {:halt, error}
59-
end
60-
end)
59+
result =
60+
Enum.reduce(periodics, %{unsuspended: [], failed: []}, fn periodic, acc ->
61+
case unsuspend_periodic(periodic) do
62+
{:ok, periodic} ->
63+
%{acc | unsuspended: [periodic | acc.unsuspended]}
64+
65+
{:error, reason} ->
66+
failed_entry = %{id: periodic.id, reason: reason}
67+
%{acc | failed: [failed_entry | acc.failed]}
68+
69+
other ->
70+
failed_entry = %{id: periodic.id, reason: other}
71+
%{acc | failed: [failed_entry | acc.failed]}
72+
end
73+
end)
74+
75+
{:ok,
76+
%{
77+
unsuspended: Enum.reverse(result.unsuspended),
78+
failed: Enum.reverse(result.failed)
79+
}}
6180
end
6281

6382
defp unsuspend_periodic(periodic) do

periodic_scheduler/scheduler/test/events_consumers/org_unblocked_test.exs

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,50 @@
11
defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do
22
use ExUnit.Case
33

4+
import Ecto.Query, only: [from: 2]
5+
46
alias InternalApi.Organization.OrganizationUnblocked
57
alias Scheduler.Periodics.Model.PeriodicsQueries
8+
alias Scheduler.Periodics.Model.Periodics
9+
alias Scheduler.PeriodicsRepo
610
alias Scheduler.EventsConsumers.OrgUnblocked
711
alias Scheduler.Workers.QuantumScheduler
812
alias Scheduler.Actions
913
alias Util.Proto
1014

15+
@grpc_port 50_057
16+
@mocked_services [Test.MockFeatureService]
17+
18+
setup_all do
19+
GRPC.Server.start(@mocked_services, @grpc_port)
20+
{:ok, consumer_pid} = start_org_unblocked_consumer()
21+
22+
on_exit(fn ->
23+
stop_org_unblocked_consumer(consumer_pid)
24+
GRPC.Server.stop(@mocked_services)
25+
end)
26+
27+
:ok
28+
end
29+
1130
setup do
1231
Test.Helpers.truncate_db()
1332

1433
Test.Helpers.purge_queue("unblocked")
1534

1635
ids_1 = Test.Helpers.seed_front_db()
1736
ids_2 = Test.Helpers.seed_front_db()
37+
ids_3 = Test.Helpers.seed_front_db()
1838

1939
start_supervised!(QuantumScheduler)
2040

2141
System.put_env("GITHUB_APP_ID", "client_id")
2242
System.put_env("GITHUB_SECRET_ID", "client_secret")
2343

24-
{:ok, %{ids_1: ids_1, ids_2: ids_2}}
44+
reset_mock_feature_service()
45+
mock_feature_response("just_run")
46+
47+
{:ok, %{ids_1: ids_1, ids_2: ids_2, ids_3: ids_3}}
2548
end
2649

2750
test "valid message received => all periodics from the org are unsuspended", ctx do
@@ -44,15 +67,40 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do
4467
event = Proto.deep_new!(OrganizationUnblocked, %{org_id: ctx.ids_1.org_id})
4568
encoded = OrganizationUnblocked.encode(event)
4669

47-
assert {:ok, _pid} = OrgUnblocked.start_link()
48-
4970
Tackle.publish(encoded, exchange_params())
5071

5172
:timer.sleep(2_000)
5273

5374
assert_only_periodics_from_unblocked_org_unsuspended(periodics, ctx.ids_1.org_id)
5475
end
5576

77+
test "invalid cron expression does not block unsuspending other periodics", ctx do
78+
[{:ok, first_id}, {:ok, invalid_id}, {:ok, third_id}] =
79+
1..3
80+
|> Enum.map(&create_periodic(ctx.ids_3, &1))
81+
82+
invalidate_cron_expression(invalid_id)
83+
84+
event = Proto.deep_new!(OrganizationUnblocked, %{org_id: ctx.ids_3.org_id})
85+
encoded = OrganizationUnblocked.encode(event)
86+
87+
Tackle.publish(encoded, exchange_params())
88+
89+
:timer.sleep(2_000)
90+
91+
assert {:ok, first_periodic} = PeriodicsQueries.get_by_id(first_id)
92+
assert first_periodic.suspended == false
93+
assert nil != first_id |> String.to_atom() |> QuantumScheduler.find_job()
94+
95+
assert {:ok, invalid_periodic} = PeriodicsQueries.get_by_id(invalid_id)
96+
assert invalid_periodic.suspended == false
97+
assert nil == invalid_id |> String.to_atom() |> QuantumScheduler.find_job()
98+
99+
assert {:ok, third_periodic} = PeriodicsQueries.get_by_id(third_id)
100+
assert third_periodic.suspended == false
101+
assert nil != third_id |> String.to_atom() |> QuantumScheduler.find_job()
102+
end
103+
56104
defp create_periodic(ids, ind) do
57105
{:ok, id} =
58106
%{
@@ -96,11 +144,59 @@ defmodule Scheduler.EventsConsumers.OrgUnblocked.Test do
96144
end)
97145
end
98146

147+
defp invalidate_cron_expression(id) do
148+
from(p in Periodics, where: p.id == ^id)
149+
|> PeriodicsRepo.update_all(set: [at: "invalid cron"])
150+
end
151+
99152
defp exchange_params() do
100153
%{
101154
url: System.get_env("RABBITMQ_URL"),
102155
exchange: "organization_exchange",
103156
routing_key: "unblocked"
104157
}
105158
end
159+
160+
defp start_org_unblocked_consumer do
161+
case OrgUnblocked.start_link() do
162+
{:ok, pid} ->
163+
wait_for_org_unblocked_consumer()
164+
{:ok, pid}
165+
166+
{:error, {:already_started, pid}} ->
167+
wait_for_org_unblocked_consumer()
168+
{:ok, pid}
169+
170+
error ->
171+
error
172+
end
173+
end
174+
175+
defp wait_for_org_unblocked_consumer do
176+
# Give Tackle time to register a default consumer before publishing
177+
Process.sleep(200)
178+
end
179+
180+
defp stop_org_unblocked_consumer(pid) when is_pid(pid) do
181+
if Process.alive?(pid) do
182+
GenServer.stop(pid)
183+
else
184+
:ok
185+
end
186+
end
187+
188+
defp stop_org_unblocked_consumer(_), do: :ok
189+
190+
defp reset_mock_feature_service() do
191+
Cachex.clear(Elixir.Scheduler.FeatureHubProvider)
192+
193+
Application.put_env(
194+
:scheduler,
195+
:feature_api_grpc_endpoint,
196+
"localhost:#{inspect(@grpc_port)}"
197+
)
198+
end
199+
200+
defp mock_feature_response(value),
201+
do: Application.put_env(:scheduler, :mock_feature_service_response, value)
106202
end

0 commit comments

Comments
 (0)