diff --git a/config/config.exs b/config/config.exs index fb41d5efc..cfedce233 100644 --- a/config/config.exs +++ b/config/config.exs @@ -41,6 +41,7 @@ config :algora, Oban, event_consumers: 1, comment_consumers: 1, search_consumers: 1, + delivery_consumers: 1, github_og_image: 5, notify_bounty: 1, notify_tip_intent: 1, diff --git a/lib/algora/integrations/github/behaviour.ex b/lib/algora/integrations/github/behaviour.ex index fb0fac37f..3beff859c 100644 --- a/lib/algora/integrations/github/behaviour.ex +++ b/lib/algora/integrations/github/behaviour.ex @@ -3,6 +3,9 @@ defmodule Algora.Github.Behaviour do @type token :: String.t() + @callback get_delivery(String.t()) :: {:ok, map()} | {:error, String.t()} + @callback list_deliveries(keyword()) :: {:ok, [map()]} | {:error, String.t()} + @callback redeliver(String.t()) :: {:ok, map()} | {:error, String.t()} @callback get_issue(token(), String.t(), String.t(), integer()) :: {:ok, map()} | {:error, String.t()} @callback get_repository(token(), String.t(), String.t()) :: {:ok, map()} | {:error, String.t()} @callback get_repository(token(), integer()) :: {:ok, map()} | {:error, String.t()} diff --git a/lib/algora/integrations/github/client.ex b/lib/algora/integrations/github/client.ex index ab100b692..d3efc941b 100644 --- a/lib/algora/integrations/github/client.ex +++ b/lib/algora/integrations/github/client.ex @@ -120,6 +120,21 @@ defmodule Algora.Github.Client do defp build_query(opts), do: if(opts == [], do: "", else: "?" <> URI.encode_query(opts)) + @impl true + def get_delivery(delivery_id) do + fetch_with_jwt("/app/hook/deliveries/#{delivery_id}") + end + + @impl true + def list_deliveries(opts \\ []) do + fetch_with_jwt("/app/hook/deliveries#{build_query(opts)}") + end + + @impl true + def redeliver(delivery_id) do + fetch_with_jwt("/app/hook/deliveries/#{delivery_id}/attempts", "POST") + end + @impl true def get_issue(access_token, owner, repo, number) do fetch(access_token, "/repos/#{owner}/#{repo}/issues/#{number}") diff --git a/lib/algora/integrations/github/github.ex b/lib/algora/integrations/github/github.ex index 868770537..a5e52e9d8 100644 --- a/lib/algora/integrations/github/github.ex +++ b/lib/algora/integrations/github/github.ex @@ -67,6 +67,15 @@ defmodule Algora.Github do end end + @impl true + def get_delivery(delivery_id), do: client().get_delivery(delivery_id) + + @impl true + def list_deliveries(opts \\ []), do: client().list_deliveries(opts) + + @impl true + def redeliver(delivery_id), do: client().redeliver(delivery_id) + @impl true def get_repository(token, owner, repo), do: client().get_repository(token, owner, repo) diff --git a/lib/algora/integrations/github/poller/deliveries.ex b/lib/algora/integrations/github/poller/deliveries.ex new file mode 100644 index 000000000..d3460188c --- /dev/null +++ b/lib/algora/integrations/github/poller/deliveries.ex @@ -0,0 +1,177 @@ +defmodule Algora.Github.Poller.Deliveries do + @moduledoc false + use GenServer + + import Ecto.Query, warn: false + + alias Algora.Github + alias Algora.Repo + alias Algora.Sync + + require Logger + + @per_page 100 + @poll_interval :timer.seconds(10) + + # Client API + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + def pause(pid) do + GenServer.cast(pid, :pause) + end + + def resume(pid) do + GenServer.cast(pid, :resume) + end + + # Server callbacks + @impl true + def init(opts) do + provider = Keyword.fetch!(opts, :provider) + + {:ok, + %{ + provider: provider, + cursor: nil, + paused: not Algora.config([:auto_start_pollers]) + }, {:continue, :setup}} + end + + @impl true + def handle_continue(:setup, state) do + {:ok, cursor} = get_or_create_cursor(state.provider, "deliveries") + schedule_poll() + + {:noreply, %{state | cursor: cursor}} + end + + @impl true + def handle_info(:poll, %{paused: true} = state) do + {:noreply, state} + end + + @impl true + def handle_info(:poll, state) do + {:ok, new_state} = poll(state) + schedule_poll() + {:noreply, new_state} + end + + @impl true + def handle_cast(:pause, state) do + {:noreply, %{state | paused: true}} + end + + @impl true + def handle_cast(:resume, %{paused: true} = state) do + schedule_poll() + {:noreply, %{state | paused: false}} + end + + @impl true + def handle_cast(:resume, state) do + {:noreply, state} + end + + @impl true + def handle_call(:get_provider, _from, state) do + {:reply, state.provider, state} + end + + @impl true + def handle_call(:is_paused, _from, state) do + {:reply, state.paused, state} + end + + defp schedule_poll do + Process.send_after(self(), :poll, @poll_interval) + end + + def poll(state) do + with {:ok, deliveries} <- fetch_deliveries(state), + if(length(deliveries) > 0, do: Logger.info("Processing #{length(deliveries)} deliveries")), + {:ok, updated_cursor} <- process_batch(deliveries, state) do + {:ok, %{state | cursor: updated_cursor}} + else + {:error, reason} -> + Logger.error("Failed to fetch deliveries: #{inspect(reason)}") + {:ok, state} + end + end + + defp process_batch([], state), do: {:ok, state.cursor} + + defp process_batch(deliveries, state) do + Repo.transact(fn -> + with :ok <- process_deliveries(deliveries, state) do + update_last_polled(state.cursor, List.first(deliveries)) + end + end) + end + + defp process_deliveries(deliveries, state) do + Enum.reduce_while(deliveries, :ok, fn delivery, _acc -> + case process_delivery(delivery, state) do + {:ok, _} -> {:cont, :ok} + error -> {:halt, error} + end + end) + end + + defp fetch_deliveries(_state) do + # TODO: paginate via the next and previous page cursors in the link header + Github.list_deliveries(per_page: @per_page) + end + + defp get_or_create_cursor(provider, resource) do + case Sync.get_sync_cursor(provider, resource) do + nil -> + Sync.create_sync_cursor(%{provider: provider, resource: resource, timestamp: DateTime.utc_now()}) + + sync_cursor -> + {:ok, sync_cursor} + end + end + + defp update_last_polled(sync_cursor, %{"delivered_at" => timestamp}) do + with {:ok, timestamp, _} <- DateTime.from_iso8601(timestamp), + {:ok, cursor} <- + Sync.update_sync_cursor(sync_cursor, %{ + timestamp: timestamp, + last_polled_at: DateTime.utc_now() + }) do + {:ok, cursor} + else + {:error, reason} -> Logger.error("Failed to update sync cursor: #{inspect(reason)}") + end + end + + defp process_delivery(delivery, state) do + case DateTime.from_iso8601(delivery["delivered_at"]) do + {:ok, delivered_at, _} -> + skip_reason = + cond do + not DateTime.after?(delivered_at, state.cursor.timestamp) -> :already_processed + delivery["status_code"] < 400 -> :status_ok + true -> nil + end + + if skip_reason do + {:ok, nil} + else + dbg("Enqueuing redelivery #{delivery["id"]}") + + %{delivery: delivery} + |> Github.Poller.DeliveryConsumer.new() + |> Oban.insert() + end + + {:error, reason} -> + Logger.error("Failed to parse delivery: #{inspect(delivery)}. Reason: #{inspect(reason)}") + + {:ok, nil} + end + end +end diff --git a/lib/algora/integrations/github/poller/delivery_consumer.ex b/lib/algora/integrations/github/poller/delivery_consumer.ex new file mode 100644 index 000000000..2a3545505 --- /dev/null +++ b/lib/algora/integrations/github/poller/delivery_consumer.ex @@ -0,0 +1,32 @@ +defmodule Algora.Github.Poller.DeliveryConsumer do + @moduledoc false + use Oban.Worker, queue: :delivery_consumers + + import Ecto.Query + + alias Algora.Github + alias Algora.Repo + + require Logger + + @max_attempts 3 + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"delivery" => delivery} = _args}) do + attempts_count = + Repo.one( + from(j in "oban_jobs", + where: fragment("(args->>'delivery')::jsonb->>'guid' = ?", ^delivery["guid"]), + select: count(j.id) + ) + ) || 0 + + if attempts_count <= @max_attempts do + dbg("Redelivering delivery #{delivery["id"]} (attempt #{attempts_count})") + Github.redeliver(delivery["id"]) + else + dbg("Max attempts reached for delivery #{delivery["id"]}") + :discard + end + end +end diff --git a/lib/algora/integrations/github/poller/delivery_supervisor.ex b/lib/algora/integrations/github/poller/delivery_supervisor.ex new file mode 100644 index 000000000..e2a110439 --- /dev/null +++ b/lib/algora/integrations/github/poller/delivery_supervisor.ex @@ -0,0 +1,85 @@ +defmodule Algora.Github.Poller.DeliverySupervisor do + @moduledoc false + use DynamicSupervisor + + alias Algora.Github.Poller.Deliveries, as: DeliveryPoller + alias Algora.Sync + + require Logger + + # Client API + def start_link(init_arg) do + DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + @impl DynamicSupervisor + def init(_init_arg) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + def start_children do + Sync.list_cursors() + |> Task.async_stream( + fn cursor -> add_provider(cursor.provider) end, + max_concurrency: 100, + ordered: false + ) + |> Stream.run() + + :ok + end + + def add_provider(provider \\ "github", opts \\ []) do + DynamicSupervisor.start_child(__MODULE__, {DeliveryPoller, [provider: provider] ++ opts}) + end + + def terminate_child(provider) do + case find_child(provider) do + {_id, pid, _type, _modules} -> DynamicSupervisor.terminate_child(__MODULE__, pid) + nil -> {:error, :not_found} + end + end + + def remove_provider(provider \\ "github") do + with :ok <- terminate_child(provider), + {:ok, _cursor} <- Sync.delete_sync_cursor(provider, "deliveries") do + :ok + end + end + + def find_child(provider) do + Enum.find(which_children(), fn {_, pid, _, _} -> + GenServer.call(pid, :get_provider) == provider + end) + end + + def pause(provider) do + provider + |> find_child() + |> case do + {_, pid, _, _} -> DeliveryPoller.pause(pid) + nil -> {:error, :not_found} + end + end + + def resume(provider) do + provider + |> find_child() + |> case do + {_, pid, _, _} -> DeliveryPoller.resume(pid) + nil -> {:error, :not_found} + end + end + + def pause_all do + Enum.each(which_children(), fn {_, pid, _, _} -> DeliveryPoller.pause(pid) end) + end + + def resume_all do + Enum.each(which_children(), fn {_, pid, _, _} -> DeliveryPoller.resume(pid) end) + end + + def which_children do + DynamicSupervisor.which_children(__MODULE__) + end +end diff --git a/lib/algora/integrations/github/poller/root_supervisor.ex b/lib/algora/integrations/github/poller/root_supervisor.ex index 99586305b..d1fdaab7c 100644 --- a/lib/algora/integrations/github/poller/root_supervisor.ex +++ b/lib/algora/integrations/github/poller/root_supervisor.ex @@ -2,6 +2,7 @@ defmodule Algora.Github.Poller.RootSupervisor do @moduledoc false use Supervisor + alias Algora.Github.Poller.DeliverySupervisor alias Algora.Github.Poller.SearchSupervisor def start_link(init_arg) do @@ -16,6 +17,11 @@ defmodule Algora.Github.Poller.RootSupervisor do {Task, &SearchSupervisor.start_children/0}, id: :search_supervisor, restart: :transient + ), + Supervisor.child_spec( + {Task, &DeliverySupervisor.start_children/0}, + id: :delivery_supervisor, + restart: :transient ) ] diff --git a/lib/algora/sync/schemas/sync_cursor.ex b/lib/algora/sync/schemas/sync_cursor.ex new file mode 100644 index 000000000..1ef990768 --- /dev/null +++ b/lib/algora/sync/schemas/sync_cursor.ex @@ -0,0 +1,24 @@ +defmodule Algora.Sync.SyncCursor do + @moduledoc false + use Algora.Schema + + import Ecto.Changeset + + typed_schema "sync_cursors" do + field :provider, :string + field :resource, :string + field :timestamp, :utc_datetime_usec + field :last_polled_at, :utc_datetime_usec + + timestamps() + end + + @doc false + def changeset(search_cursor, attrs) do + search_cursor + |> cast(attrs, [:provider, :resource, :timestamp, :last_polled_at]) + |> generate_id() + |> validate_required([:provider, :resource, :timestamp]) + |> unique_constraint([:provider, :resource]) + end +end diff --git a/lib/algora/sync/search.ex b/lib/algora/sync/search.ex new file mode 100644 index 000000000..ece0c1850 --- /dev/null +++ b/lib/algora/sync/search.ex @@ -0,0 +1,34 @@ +defmodule Algora.Sync do + @moduledoc false + import Ecto.Query, warn: false + + alias Algora.Repo + alias Algora.Sync.SyncCursor + + def get_sync_cursor(provider, resource) do + Repo.get_by(SyncCursor, provider: provider, resource: resource) + end + + def delete_sync_cursor(provider, resource) do + case get_sync_cursor(provider, resource) do + nil -> {:error, :cursor_not_found} + cursor -> Repo.delete(cursor) + end + end + + def create_sync_cursor(attrs \\ %{}) do + %SyncCursor{} + |> SyncCursor.changeset(attrs) + |> Repo.insert() + end + + def update_sync_cursor(%SyncCursor{} = sync_cursor, attrs) do + sync_cursor + |> SyncCursor.changeset(attrs) + |> Repo.update() + end + + def list_cursors do + Repo.all(from(p in SyncCursor)) + end +end diff --git a/lib/algora_web/controllers/webhooks/github_controller.ex b/lib/algora_web/controllers/webhooks/github_controller.ex index d88454dda..142d69842 100644 --- a/lib/algora_web/controllers/webhooks/github_controller.ex +++ b/lib/algora_web/controllers/webhooks/github_controller.ex @@ -22,8 +22,6 @@ defmodule AlgoraWeb.Webhooks.GithubController do require Logger - # TODO: auto-retry failed deliveries with backoff - def process_delivery(webhook) do with :ok <- ensure_human_author(webhook), {:ok, commands} <- process_commands(webhook), diff --git a/priv/repo/migrations/20250329132007_create_sync_cursors.exs b/priv/repo/migrations/20250329132007_create_sync_cursors.exs new file mode 100644 index 000000000..70f8d6fd2 --- /dev/null +++ b/priv/repo/migrations/20250329132007_create_sync_cursors.exs @@ -0,0 +1,16 @@ +defmodule Algora.Repo.Migrations.CreateSyncCursors do + use Ecto.Migration + + def change do + create table(:sync_cursors) do + add :provider, :string, null: false + add :resource, :string, null: false + add :timestamp, :utc_datetime_usec, null: false + add :last_polled_at, :utc_datetime_usec + + timestamps() + end + + create unique_index(:sync_cursors, [:provider, :resource]) + end +end diff --git a/test/support/github_mock.ex b/test/support/github_mock.ex index a380f3437..834cde246 100644 --- a/test/support/github_mock.ex +++ b/test/support/github_mock.ex @@ -4,6 +4,21 @@ defmodule Algora.Support.GithubMock do defp random_id(n \\ 1000), do: :rand.uniform(n) + @impl true + def get_delivery(id) do + {:ok, %{"id" => id}} + end + + @impl true + def list_deliveries(_opts \\ []) do + {:ok, []} + end + + @impl true + def redeliver(_id) do + {:ok, %{"id" => random_id()}} + end + @impl true def get_issue(_access_token, owner, repo, number) do {:ok,