Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ config :algora, Oban,
event_consumers: 1,
comment_consumers: 1,
search_consumers: 1,
delivery_consumers: 1,
github_og_image: 5,
notify_bounty: 1,
notify_tip_intent: 1,
Expand Down
3 changes: 3 additions & 0 deletions lib/algora/integrations/github/behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ defmodule Algora.Github.Behaviour do

@type token :: String.t()

@callback get_delivery(String.t()) :: {:ok, map()} | {:error, String.t()}
@callback list_deliveries(keyword()) :: {:ok, [map()]} | {:error, String.t()}
@callback redeliver(String.t()) :: {:ok, map()} | {:error, String.t()}
@callback get_issue(token(), String.t(), String.t(), integer()) :: {:ok, map()} | {:error, String.t()}
@callback get_repository(token(), String.t(), String.t()) :: {:ok, map()} | {:error, String.t()}
@callback get_repository(token(), integer()) :: {:ok, map()} | {:error, String.t()}
Expand Down
15 changes: 15 additions & 0 deletions lib/algora/integrations/github/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,21 @@ defmodule Algora.Github.Client do

defp build_query(opts), do: if(opts == [], do: "", else: "?" <> URI.encode_query(opts))

@impl true
def get_delivery(delivery_id) do
fetch_with_jwt("/app/hook/deliveries/#{delivery_id}")
end

@impl true
def list_deliveries(opts \\ []) do
fetch_with_jwt("/app/hook/deliveries#{build_query(opts)}")
end

@impl true
def redeliver(delivery_id) do
fetch_with_jwt("/app/hook/deliveries/#{delivery_id}/attempts", "POST")
end

@impl true
def get_issue(access_token, owner, repo, number) do
fetch(access_token, "/repos/#{owner}/#{repo}/issues/#{number}")
Expand Down
9 changes: 9 additions & 0 deletions lib/algora/integrations/github/github.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ defmodule Algora.Github do
end
end

@impl true
def get_delivery(delivery_id), do: client().get_delivery(delivery_id)

@impl true
def list_deliveries(opts \\ []), do: client().list_deliveries(opts)

@impl true
def redeliver(delivery_id), do: client().redeliver(delivery_id)

@impl true
def get_repository(token, owner, repo), do: client().get_repository(token, owner, repo)

Expand Down
177 changes: 177 additions & 0 deletions lib/algora/integrations/github/poller/deliveries.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
defmodule Algora.Github.Poller.Deliveries do
@moduledoc false
use GenServer

import Ecto.Query, warn: false

alias Algora.Github
alias Algora.Repo
alias Algora.Sync

require Logger

@per_page 100
@poll_interval :timer.seconds(10)

# Client API
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

def pause(pid) do
GenServer.cast(pid, :pause)
end

def resume(pid) do
GenServer.cast(pid, :resume)
end

# Server callbacks
@impl true
def init(opts) do
provider = Keyword.fetch!(opts, :provider)

{:ok,
%{
provider: provider,
cursor: nil,
paused: not Algora.config([:auto_start_pollers])
}, {:continue, :setup}}
end

@impl true
def handle_continue(:setup, state) do
{:ok, cursor} = get_or_create_cursor(state.provider, "deliveries")
schedule_poll()

{:noreply, %{state | cursor: cursor}}
end

@impl true
def handle_info(:poll, %{paused: true} = state) do
{:noreply, state}
end

@impl true
def handle_info(:poll, state) do
{:ok, new_state} = poll(state)
schedule_poll()
{:noreply, new_state}
end

@impl true
def handle_cast(:pause, state) do
{:noreply, %{state | paused: true}}
end

@impl true
def handle_cast(:resume, %{paused: true} = state) do
schedule_poll()
{:noreply, %{state | paused: false}}
end

@impl true
def handle_cast(:resume, state) do
{:noreply, state}
end

@impl true
def handle_call(:get_provider, _from, state) do
{:reply, state.provider, state}
end

@impl true
def handle_call(:is_paused, _from, state) do
{:reply, state.paused, state}
end

defp schedule_poll do
Process.send_after(self(), :poll, @poll_interval)
end

def poll(state) do
with {:ok, deliveries} <- fetch_deliveries(state),
if(length(deliveries) > 0, do: Logger.info("Processing #{length(deliveries)} deliveries")),
{:ok, updated_cursor} <- process_batch(deliveries, state) do
{:ok, %{state | cursor: updated_cursor}}
else
{:error, reason} ->
Logger.error("Failed to fetch deliveries: #{inspect(reason)}")
{:ok, state}
end
end

defp process_batch([], state), do: {:ok, state.cursor}

defp process_batch(deliveries, state) do
Repo.transact(fn ->
with :ok <- process_deliveries(deliveries, state) do
update_last_polled(state.cursor, List.first(deliveries))
end
end)
end

defp process_deliveries(deliveries, state) do
Enum.reduce_while(deliveries, :ok, fn delivery, _acc ->
case process_delivery(delivery, state) do
{:ok, _} -> {:cont, :ok}
error -> {:halt, error}
end
end)
end

defp fetch_deliveries(_state) do
# TODO: paginate via the next and previous page cursors in the link header
Github.list_deliveries(per_page: @per_page)
end

defp get_or_create_cursor(provider, resource) do
case Sync.get_sync_cursor(provider, resource) do
nil ->
Sync.create_sync_cursor(%{provider: provider, resource: resource, timestamp: DateTime.utc_now()})

sync_cursor ->
{:ok, sync_cursor}
end
end

defp update_last_polled(sync_cursor, %{"delivered_at" => timestamp}) do
with {:ok, timestamp, _} <- DateTime.from_iso8601(timestamp),
{:ok, cursor} <-
Sync.update_sync_cursor(sync_cursor, %{
timestamp: timestamp,
last_polled_at: DateTime.utc_now()
}) do
{:ok, cursor}
else
{:error, reason} -> Logger.error("Failed to update sync cursor: #{inspect(reason)}")
end
end

defp process_delivery(delivery, state) do
case DateTime.from_iso8601(delivery["delivered_at"]) do
{:ok, delivered_at, _} ->
skip_reason =
cond do
not DateTime.after?(delivered_at, state.cursor.timestamp) -> :already_processed
delivery["status_code"] < 400 -> :status_ok
true -> nil
end

if skip_reason do
{:ok, nil}
else
dbg("Enqueuing redelivery #{delivery["id"]}")

%{delivery: delivery}
|> Github.Poller.DeliveryConsumer.new()
|> Oban.insert()
end

{:error, reason} ->
Logger.error("Failed to parse delivery: #{inspect(delivery)}. Reason: #{inspect(reason)}")

{:ok, nil}
end
end
end
32 changes: 32 additions & 0 deletions lib/algora/integrations/github/poller/delivery_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Algora.Github.Poller.DeliveryConsumer do
@moduledoc false
use Oban.Worker, queue: :delivery_consumers

import Ecto.Query

alias Algora.Github
alias Algora.Repo

require Logger

@max_attempts 3

@impl Oban.Worker
def perform(%Oban.Job{args: %{"delivery" => delivery} = _args}) do
attempts_count =
Repo.one(
from(j in "oban_jobs",
where: fragment("(args->>'delivery')::jsonb->>'guid' = ?", ^delivery["guid"]),
select: count(j.id)
)
) || 0

if attempts_count <= @max_attempts do
dbg("Redelivering delivery #{delivery["id"]} (attempt #{attempts_count})")
Github.redeliver(delivery["id"])
else
dbg("Max attempts reached for delivery #{delivery["id"]}")
:discard
end
end
end
85 changes: 85 additions & 0 deletions lib/algora/integrations/github/poller/delivery_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Algora.Github.Poller.DeliverySupervisor do
@moduledoc false
use DynamicSupervisor

alias Algora.Github.Poller.Deliveries, as: DeliveryPoller
alias Algora.Sync

require Logger

# Client API
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

@impl DynamicSupervisor
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end

def start_children do
Sync.list_cursors()
|> Task.async_stream(
fn cursor -> add_provider(cursor.provider) end,
max_concurrency: 100,
ordered: false
)
|> Stream.run()

:ok
end

def add_provider(provider \\ "github", opts \\ []) do
DynamicSupervisor.start_child(__MODULE__, {DeliveryPoller, [provider: provider] ++ opts})
end

def terminate_child(provider) do
case find_child(provider) do
{_id, pid, _type, _modules} -> DynamicSupervisor.terminate_child(__MODULE__, pid)
nil -> {:error, :not_found}
end
end

def remove_provider(provider \\ "github") do
with :ok <- terminate_child(provider),
{:ok, _cursor} <- Sync.delete_sync_cursor(provider, "deliveries") do
:ok
end
end

def find_child(provider) do
Enum.find(which_children(), fn {_, pid, _, _} ->
GenServer.call(pid, :get_provider) == provider
end)
end

def pause(provider) do
provider
|> find_child()
|> case do
{_, pid, _, _} -> DeliveryPoller.pause(pid)
nil -> {:error, :not_found}
end
end

def resume(provider) do
provider
|> find_child()
|> case do
{_, pid, _, _} -> DeliveryPoller.resume(pid)
nil -> {:error, :not_found}
end
end

def pause_all do
Enum.each(which_children(), fn {_, pid, _, _} -> DeliveryPoller.pause(pid) end)
end

def resume_all do
Enum.each(which_children(), fn {_, pid, _, _} -> DeliveryPoller.resume(pid) end)
end

def which_children do
DynamicSupervisor.which_children(__MODULE__)
end
end
6 changes: 6 additions & 0 deletions lib/algora/integrations/github/poller/root_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Algora.Github.Poller.RootSupervisor do
@moduledoc false
use Supervisor

alias Algora.Github.Poller.DeliverySupervisor
alias Algora.Github.Poller.SearchSupervisor

def start_link(init_arg) do
Expand All @@ -16,6 +17,11 @@ defmodule Algora.Github.Poller.RootSupervisor do
{Task, &SearchSupervisor.start_children/0},
id: :search_supervisor,
restart: :transient
),
Supervisor.child_spec(
{Task, &DeliverySupervisor.start_children/0},
id: :delivery_supervisor,
restart: :transient
)
]

Expand Down
24 changes: 24 additions & 0 deletions lib/algora/sync/schemas/sync_cursor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Algora.Sync.SyncCursor do
@moduledoc false
use Algora.Schema

import Ecto.Changeset

typed_schema "sync_cursors" do
field :provider, :string
field :resource, :string
field :timestamp, :utc_datetime_usec
field :last_polled_at, :utc_datetime_usec

timestamps()
end

@doc false
def changeset(search_cursor, attrs) do
search_cursor
|> cast(attrs, [:provider, :resource, :timestamp, :last_polled_at])
|> generate_id()
|> validate_required([:provider, :resource, :timestamp])
|> unique_constraint([:provider, :resource])
end
end
Loading