Skip to content

Commit af57add

Browse files
authored
feat: auto-retry failed GitHub deliveries (#108)
- Added a new `DeliverySupervisor` and `DeliveryPoller` to handle polling of GitHub deliveries. - Introduced `DeliveryConsumer` for processing delivery redeliveries with a maximum attempt limit. - Created `SyncCursor` schema and associated functions for managing sync state. - Updated GitHub integration to include new delivery-related callbacks and methods. - Implemented database migration for `sync_cursors` table to track delivery polling state.
1 parent 46772f7 commit af57add

File tree

13 files changed

+417
-2
lines changed

13 files changed

+417
-2
lines changed

config/config.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ config :algora, Oban,
4141
event_consumers: 1,
4242
comment_consumers: 1,
4343
search_consumers: 1,
44+
delivery_consumers: 1,
4445
github_og_image: 5,
4546
notify_bounty: 1,
4647
notify_tip_intent: 1,

lib/algora/integrations/github/behaviour.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ defmodule Algora.Github.Behaviour do
33

44
@type token :: String.t()
55

6+
@callback get_delivery(String.t()) :: {:ok, map()} | {:error, String.t()}
7+
@callback list_deliveries(keyword()) :: {:ok, [map()]} | {:error, String.t()}
8+
@callback redeliver(String.t()) :: {:ok, map()} | {:error, String.t()}
69
@callback get_issue(token(), String.t(), String.t(), integer()) :: {:ok, map()} | {:error, String.t()}
710
@callback get_repository(token(), String.t(), String.t()) :: {:ok, map()} | {:error, String.t()}
811
@callback get_repository(token(), integer()) :: {:ok, map()} | {:error, String.t()}

lib/algora/integrations/github/client.ex

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,21 @@ defmodule Algora.Github.Client do
120120

121121
defp build_query(opts), do: if(opts == [], do: "", else: "?" <> URI.encode_query(opts))
122122

123+
@impl true
124+
def get_delivery(delivery_id) do
125+
fetch_with_jwt("/app/hook/deliveries/#{delivery_id}")
126+
end
127+
128+
@impl true
129+
def list_deliveries(opts \\ []) do
130+
fetch_with_jwt("/app/hook/deliveries#{build_query(opts)}")
131+
end
132+
133+
@impl true
134+
def redeliver(delivery_id) do
135+
fetch_with_jwt("/app/hook/deliveries/#{delivery_id}/attempts", "POST")
136+
end
137+
123138
@impl true
124139
def get_issue(access_token, owner, repo, number) do
125140
fetch(access_token, "/repos/#{owner}/#{repo}/issues/#{number}")

lib/algora/integrations/github/github.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ defmodule Algora.Github do
6767
end
6868
end
6969

70+
@impl true
71+
def get_delivery(delivery_id), do: client().get_delivery(delivery_id)
72+
73+
@impl true
74+
def list_deliveries(opts \\ []), do: client().list_deliveries(opts)
75+
76+
@impl true
77+
def redeliver(delivery_id), do: client().redeliver(delivery_id)
78+
7079
@impl true
7180
def get_repository(token, owner, repo), do: client().get_repository(token, owner, repo)
7281

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
defmodule Algora.Github.Poller.Deliveries do
2+
@moduledoc false
3+
use GenServer
4+
5+
import Ecto.Query, warn: false
6+
7+
alias Algora.Github
8+
alias Algora.Repo
9+
alias Algora.Sync
10+
11+
require Logger
12+
13+
@per_page 100
14+
@poll_interval :timer.seconds(10)
15+
16+
# Client API
17+
def start_link(opts) do
18+
GenServer.start_link(__MODULE__, opts)
19+
end
20+
21+
def pause(pid) do
22+
GenServer.cast(pid, :pause)
23+
end
24+
25+
def resume(pid) do
26+
GenServer.cast(pid, :resume)
27+
end
28+
29+
# Server callbacks
30+
@impl true
31+
def init(opts) do
32+
provider = Keyword.fetch!(opts, :provider)
33+
34+
{:ok,
35+
%{
36+
provider: provider,
37+
cursor: nil,
38+
paused: not Algora.config([:auto_start_pollers])
39+
}, {:continue, :setup}}
40+
end
41+
42+
@impl true
43+
def handle_continue(:setup, state) do
44+
{:ok, cursor} = get_or_create_cursor(state.provider, "deliveries")
45+
schedule_poll()
46+
47+
{:noreply, %{state | cursor: cursor}}
48+
end
49+
50+
@impl true
51+
def handle_info(:poll, %{paused: true} = state) do
52+
{:noreply, state}
53+
end
54+
55+
@impl true
56+
def handle_info(:poll, state) do
57+
{:ok, new_state} = poll(state)
58+
schedule_poll()
59+
{:noreply, new_state}
60+
end
61+
62+
@impl true
63+
def handle_cast(:pause, state) do
64+
{:noreply, %{state | paused: true}}
65+
end
66+
67+
@impl true
68+
def handle_cast(:resume, %{paused: true} = state) do
69+
schedule_poll()
70+
{:noreply, %{state | paused: false}}
71+
end
72+
73+
@impl true
74+
def handle_cast(:resume, state) do
75+
{:noreply, state}
76+
end
77+
78+
@impl true
79+
def handle_call(:get_provider, _from, state) do
80+
{:reply, state.provider, state}
81+
end
82+
83+
@impl true
84+
def handle_call(:is_paused, _from, state) do
85+
{:reply, state.paused, state}
86+
end
87+
88+
defp schedule_poll do
89+
Process.send_after(self(), :poll, @poll_interval)
90+
end
91+
92+
def poll(state) do
93+
with {:ok, deliveries} <- fetch_deliveries(state),
94+
if(length(deliveries) > 0, do: Logger.info("Processing #{length(deliveries)} deliveries")),
95+
{:ok, updated_cursor} <- process_batch(deliveries, state) do
96+
{:ok, %{state | cursor: updated_cursor}}
97+
else
98+
{:error, reason} ->
99+
Logger.error("Failed to fetch deliveries: #{inspect(reason)}")
100+
{:ok, state}
101+
end
102+
end
103+
104+
defp process_batch([], state), do: {:ok, state.cursor}
105+
106+
defp process_batch(deliveries, state) do
107+
Repo.transact(fn ->
108+
with :ok <- process_deliveries(deliveries, state) do
109+
update_last_polled(state.cursor, List.first(deliveries))
110+
end
111+
end)
112+
end
113+
114+
defp process_deliveries(deliveries, state) do
115+
Enum.reduce_while(deliveries, :ok, fn delivery, _acc ->
116+
case process_delivery(delivery, state) do
117+
{:ok, _} -> {:cont, :ok}
118+
error -> {:halt, error}
119+
end
120+
end)
121+
end
122+
123+
defp fetch_deliveries(_state) do
124+
# TODO: paginate via the next and previous page cursors in the link header
125+
Github.list_deliveries(per_page: @per_page)
126+
end
127+
128+
defp get_or_create_cursor(provider, resource) do
129+
case Sync.get_sync_cursor(provider, resource) do
130+
nil ->
131+
Sync.create_sync_cursor(%{provider: provider, resource: resource, timestamp: DateTime.utc_now()})
132+
133+
sync_cursor ->
134+
{:ok, sync_cursor}
135+
end
136+
end
137+
138+
defp update_last_polled(sync_cursor, %{"delivered_at" => timestamp}) do
139+
with {:ok, timestamp, _} <- DateTime.from_iso8601(timestamp),
140+
{:ok, cursor} <-
141+
Sync.update_sync_cursor(sync_cursor, %{
142+
timestamp: timestamp,
143+
last_polled_at: DateTime.utc_now()
144+
}) do
145+
{:ok, cursor}
146+
else
147+
{:error, reason} -> Logger.error("Failed to update sync cursor: #{inspect(reason)}")
148+
end
149+
end
150+
151+
defp process_delivery(delivery, state) do
152+
case DateTime.from_iso8601(delivery["delivered_at"]) do
153+
{:ok, delivered_at, _} ->
154+
skip_reason =
155+
cond do
156+
not DateTime.after?(delivered_at, state.cursor.timestamp) -> :already_processed
157+
delivery["status_code"] < 400 -> :status_ok
158+
true -> nil
159+
end
160+
161+
if skip_reason do
162+
{:ok, nil}
163+
else
164+
dbg("Enqueuing redelivery #{delivery["id"]}")
165+
166+
%{delivery: delivery}
167+
|> Github.Poller.DeliveryConsumer.new()
168+
|> Oban.insert()
169+
end
170+
171+
{:error, reason} ->
172+
Logger.error("Failed to parse delivery: #{inspect(delivery)}. Reason: #{inspect(reason)}")
173+
174+
{:ok, nil}
175+
end
176+
end
177+
end
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
defmodule Algora.Github.Poller.DeliveryConsumer do
2+
@moduledoc false
3+
use Oban.Worker, queue: :delivery_consumers
4+
5+
import Ecto.Query
6+
7+
alias Algora.Github
8+
alias Algora.Repo
9+
10+
require Logger
11+
12+
@max_attempts 3
13+
14+
@impl Oban.Worker
15+
def perform(%Oban.Job{args: %{"delivery" => delivery} = _args}) do
16+
attempts_count =
17+
Repo.one(
18+
from(j in "oban_jobs",
19+
where: fragment("(args->>'delivery')::jsonb->>'guid' = ?", ^delivery["guid"]),
20+
select: count(j.id)
21+
)
22+
) || 0
23+
24+
if attempts_count <= @max_attempts do
25+
dbg("Redelivering delivery #{delivery["id"]} (attempt #{attempts_count})")
26+
Github.redeliver(delivery["id"])
27+
else
28+
dbg("Max attempts reached for delivery #{delivery["id"]}")
29+
:discard
30+
end
31+
end
32+
end
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
defmodule Algora.Github.Poller.DeliverySupervisor do
2+
@moduledoc false
3+
use DynamicSupervisor
4+
5+
alias Algora.Github.Poller.Deliveries, as: DeliveryPoller
6+
alias Algora.Sync
7+
8+
require Logger
9+
10+
# Client API
11+
def start_link(init_arg) do
12+
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
13+
end
14+
15+
@impl DynamicSupervisor
16+
def init(_init_arg) do
17+
DynamicSupervisor.init(strategy: :one_for_one)
18+
end
19+
20+
def start_children do
21+
Sync.list_cursors()
22+
|> Task.async_stream(
23+
fn cursor -> add_provider(cursor.provider) end,
24+
max_concurrency: 100,
25+
ordered: false
26+
)
27+
|> Stream.run()
28+
29+
:ok
30+
end
31+
32+
def add_provider(provider \\ "github", opts \\ []) do
33+
DynamicSupervisor.start_child(__MODULE__, {DeliveryPoller, [provider: provider] ++ opts})
34+
end
35+
36+
def terminate_child(provider) do
37+
case find_child(provider) do
38+
{_id, pid, _type, _modules} -> DynamicSupervisor.terminate_child(__MODULE__, pid)
39+
nil -> {:error, :not_found}
40+
end
41+
end
42+
43+
def remove_provider(provider \\ "github") do
44+
with :ok <- terminate_child(provider),
45+
{:ok, _cursor} <- Sync.delete_sync_cursor(provider, "deliveries") do
46+
:ok
47+
end
48+
end
49+
50+
def find_child(provider) do
51+
Enum.find(which_children(), fn {_, pid, _, _} ->
52+
GenServer.call(pid, :get_provider) == provider
53+
end)
54+
end
55+
56+
def pause(provider) do
57+
provider
58+
|> find_child()
59+
|> case do
60+
{_, pid, _, _} -> DeliveryPoller.pause(pid)
61+
nil -> {:error, :not_found}
62+
end
63+
end
64+
65+
def resume(provider) do
66+
provider
67+
|> find_child()
68+
|> case do
69+
{_, pid, _, _} -> DeliveryPoller.resume(pid)
70+
nil -> {:error, :not_found}
71+
end
72+
end
73+
74+
def pause_all do
75+
Enum.each(which_children(), fn {_, pid, _, _} -> DeliveryPoller.pause(pid) end)
76+
end
77+
78+
def resume_all do
79+
Enum.each(which_children(), fn {_, pid, _, _} -> DeliveryPoller.resume(pid) end)
80+
end
81+
82+
def which_children do
83+
DynamicSupervisor.which_children(__MODULE__)
84+
end
85+
end

lib/algora/integrations/github/poller/root_supervisor.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule Algora.Github.Poller.RootSupervisor do
22
@moduledoc false
33
use Supervisor
44

5+
alias Algora.Github.Poller.DeliverySupervisor
56
alias Algora.Github.Poller.SearchSupervisor
67

78
def start_link(init_arg) do
@@ -16,6 +17,11 @@ defmodule Algora.Github.Poller.RootSupervisor do
1617
{Task, &SearchSupervisor.start_children/0},
1718
id: :search_supervisor,
1819
restart: :transient
20+
),
21+
Supervisor.child_spec(
22+
{Task, &DeliverySupervisor.start_children/0},
23+
id: :delivery_supervisor,
24+
restart: :transient
1925
)
2026
]
2127

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule Algora.Sync.SyncCursor do
2+
@moduledoc false
3+
use Algora.Schema
4+
5+
import Ecto.Changeset
6+
7+
typed_schema "sync_cursors" do
8+
field :provider, :string
9+
field :resource, :string
10+
field :timestamp, :utc_datetime_usec
11+
field :last_polled_at, :utc_datetime_usec
12+
13+
timestamps()
14+
end
15+
16+
@doc false
17+
def changeset(search_cursor, attrs) do
18+
search_cursor
19+
|> cast(attrs, [:provider, :resource, :timestamp, :last_polled_at])
20+
|> generate_id()
21+
|> validate_required([:provider, :resource, :timestamp])
22+
|> unique_constraint([:provider, :resource])
23+
end
24+
end

0 commit comments

Comments
 (0)