diff --git a/lib/algora/comments/comments.ex b/lib/algora/comments/comments.ex deleted file mode 100644 index 394e2f613..000000000 --- a/lib/algora/comments/comments.ex +++ /dev/null @@ -1,34 +0,0 @@ -defmodule Algora.Comments do - @moduledoc false - import Ecto.Query, warn: false - - alias Algora.Comments.CommentCursor - alias Algora.Repo - - def get_comment_cursor(provider, repo_owner, repo_name) do - Repo.get_by(CommentCursor, provider: provider, repo_owner: repo_owner, repo_name: repo_name) - end - - def delete_comment_cursor(provider, repo_owner, repo_name) do - case get_comment_cursor(provider, repo_owner, repo_name) do - nil -> {:error, :cursor_not_found} - cursor -> Repo.delete(cursor) - end - end - - def create_comment_cursor(attrs \\ %{}) do - %CommentCursor{} - |> CommentCursor.changeset(attrs) - |> Repo.insert() - end - - def update_comment_cursor(%CommentCursor{} = comment_cursor, attrs) do - comment_cursor - |> CommentCursor.changeset(attrs) - |> Repo.update() - end - - def list_cursors do - Repo.all(from(p in CommentCursor)) - end -end diff --git a/lib/algora/comments/schemas/comment_cursor.ex b/lib/algora/comments/schemas/comment_cursor.ex deleted file mode 100644 index ec8298318..000000000 --- a/lib/algora/comments/schemas/comment_cursor.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Algora.Comments.CommentCursor do - @moduledoc false - use Algora.Schema - - import Ecto.Changeset - - typed_schema "comment_cursors" do - field :provider, :string - field :repo_owner, :string - field :repo_name, :string - field :timestamp, :utc_datetime_usec - field :last_polled_at, :utc_datetime_usec - field :last_comment_id, :string - - timestamps() - end - - @doc false - def changeset(comment_cursor, attrs) do - comment_cursor - |> cast(attrs, [ - :provider, - :repo_owner, - :repo_name, - :timestamp, - :last_polled_at, - :last_comment_id - ]) - |> generate_id() - |> validate_required([:provider, :repo_owner, :repo_name, :timestamp]) - |> unique_constraint([:provider, :repo_owner, :repo_name]) - end -end diff --git a/lib/algora/events/events.ex b/lib/algora/events/events.ex deleted file mode 100644 index 8d1f5862e..000000000 --- a/lib/algora/events/events.ex +++ /dev/null @@ -1,27 +0,0 @@ -defmodule Algora.Events do - @moduledoc false - import Ecto.Query, warn: false - - alias Algora.Events.EventCursor - alias Algora.Repo - - def get_event_cursor(provider, repo_owner, repo_name) do - Repo.get_by(EventCursor, provider: provider, repo_owner: repo_owner, repo_name: repo_name) - end - - def create_event_cursor(attrs \\ %{}) do - %EventCursor{} - |> EventCursor.changeset(attrs) - |> Repo.insert() - end - - def update_event_cursor(%EventCursor{} = event_cursor, attrs) do - event_cursor - |> EventCursor.changeset(attrs) - |> Repo.update() - end - - def list_cursors do - Repo.all(from(p in EventCursor)) - end -end diff --git a/lib/algora/events/schemas/event_cursor.ex b/lib/algora/events/schemas/event_cursor.ex deleted file mode 100644 index 60f676fe8..000000000 --- a/lib/algora/events/schemas/event_cursor.ex +++ /dev/null @@ -1,25 +0,0 @@ -defmodule Algora.Events.EventCursor do - @moduledoc false - use Algora.Schema - - import Ecto.Changeset - - typed_schema "event_cursors" do - field :provider, :string - field :repo_owner, :string - field :repo_name, :string - field :last_event_id, :string - field :last_polled_at, :utc_datetime_usec - - timestamps() - end - - @doc false - def changeset(event_cursor, attrs) do - event_cursor - |> cast(attrs, [:provider, :repo_owner, :repo_name, :last_event_id, :last_polled_at]) - |> generate_id() - |> validate_required([:provider, :repo_owner, :repo_name]) - |> unique_constraint([:provider, :repo_owner, :repo_name]) - end -end diff --git a/lib/algora/integrations/github/poller/comment_consumer.ex b/lib/algora/integrations/github/poller/comment_consumer.ex deleted file mode 100644 index 31dfe7a72..000000000 --- a/lib/algora/integrations/github/poller/comment_consumer.ex +++ /dev/null @@ -1,53 +0,0 @@ -defmodule Algora.Github.Poller.CommentConsumer do - @moduledoc false - use Oban.Worker, queue: :comment_consumers - - alias Algora.Accounts - alias Algora.Bounties - alias Algora.Util - - require Logger - - @impl Oban.Worker - def perform(%Oban.Job{ - args: %{"comment" => comment, "command" => encoded_command, "ticket_ref" => encoded_ticket_ref} = _args - }) do - command = Util.base64_to_term!(encoded_command) - ticket_ref = Util.base64_to_term!(encoded_ticket_ref) - - run_command(command, ticket_ref, comment) - end - - defp run_command({:claim, _args}, _ticket_ref, _comment) do - # TODO: implement claim command - :ok - end - - defp run_command({:tip, args}, ticket_ref, _comment) do - Bounties.create_tip_intent(%{ - recipient: args[:recipient], - amount: args[:amount], - ticket_ref: %{owner: ticket_ref[:owner], repo: ticket_ref[:repo], number: ticket_ref[:number]} - }) - end - - defp run_command({:bounty, args}, ticket_ref, comment) do - case Accounts.fetch_user_by(provider_id: to_string(comment["user"]["id"])) do - {:ok, user} -> - Bounties.create_bounty( - %{ - creator: user, - owner: user, - amount: args[:amount], - ticket_ref: %{owner: ticket_ref[:owner], repo: ticket_ref[:repo], number: ticket_ref[:number]} - }, - command_id: comment["id"], - command_source: :comment - ) - - {:error, _reason} = error -> - Logger.error("Failed to create bounty: #{inspect(error)}") - error - end - end -end diff --git a/lib/algora/integrations/github/poller/comments.ex b/lib/algora/integrations/github/poller/comments.ex deleted file mode 100644 index 6332e3e1a..000000000 --- a/lib/algora/integrations/github/poller/comments.ex +++ /dev/null @@ -1,213 +0,0 @@ -defmodule Algora.Github.Poller.Comments do - @moduledoc false - use GenServer - - import Ecto.Query, warn: false - - alias Algora.Comments - alias Algora.Github - alias Algora.Github.Command - alias Algora.Parser - alias Algora.Repo - alias Algora.Util - - require Logger - - @per_page 10 - @poll_interval :timer.seconds(3) - - # Client API - def start_link(opts) do - GenServer.start_link(__MODULE__, opts) - end - - def pause(pid) do - GenServer.cast(pid, :pause) - end - - def resume(pid) do - GenServer.cast(pid, :resume) - end - - # Server callbacks - @impl true - def init(opts) do - repo_owner = Keyword.fetch!(opts, :repo_owner) - repo_name = Keyword.fetch!(opts, :repo_name) - - {:ok, - %{ - repo_owner: repo_owner, - repo_name: repo_name, - cursor: nil, - paused: not Algora.config([:auto_start_pollers]) - }, {:continue, :setup}} - end - - @impl true - def handle_continue(:setup, state) do - {:ok, cursor} = get_or_create_cursor(state.repo_owner, state.repo_name) - schedule_poll() - - {:noreply, %{state | cursor: cursor}} - end - - @impl true - def handle_info(:poll, %{paused: true} = state) do - {:noreply, state} - end - - @impl true - def handle_info(:poll, state) do - {:ok, new_state} = poll(state) - schedule_poll() - {:noreply, new_state} - end - - @impl true - def handle_cast(:pause, state) do - {:noreply, %{state | paused: true}} - end - - @impl true - def handle_cast(:resume, %{paused: true} = state) do - schedule_poll() - {:noreply, %{state | paused: false}} - end - - @impl true - def handle_cast(:resume, state) do - {:noreply, state} - end - - @impl true - def handle_call(:get_repo_info, _from, state) do - {:reply, {state.repo_owner, state.repo_name}, state} - end - - @impl true - def handle_call(:is_paused, _from, state) do - {:reply, state.paused, state} - end - - defp schedule_poll do - Process.send_after(self(), :poll, @poll_interval) - end - - def poll(state) do - with {:ok, token} <- get_token(), - {:ok, comments} <- fetch_comments(token, state), - if(length(comments) > 0, do: Logger.debug("Processing #{length(comments)} comments")), - {:ok, updated_cursor} <- process_batch(comments, state.cursor) do - {:ok, %{state | cursor: updated_cursor}} - else - {:error, :no_token_available} -> - Logger.warning("No token available, pausing poller") - {:ok, %{state | paused: true}} - - {:error, reason} -> - Logger.error("Failed to fetch comments: #{inspect(reason)}") - {:ok, state} - end - end - - defp process_batch([], comment_cursor), do: {:ok, comment_cursor} - - defp process_batch(comments, comment_cursor) do - Repo.transact(fn -> - with :ok <- process_comments(comments) do - update_last_polled(comment_cursor, List.last(comments)) - end - end) - end - - defp process_comments(comments) do - Enum.reduce_while(comments, :ok, fn comment, _acc -> - case process_comment(comment) do - {:ok, _} -> {:cont, :ok} - error -> {:halt, error} - end - end) - end - - defp fetch_comments(token, state) do - # TODO: ignore comments from bots and GITHUB_BOT_HANDLE - with {:ok, comments} <- - Github.list_repository_comments( - token, - state.repo_owner, - state.repo_name, - per_page: @per_page, - since: DateTime.to_iso8601(state.cursor.timestamp), - sort: "updated", - direction: "asc" - ) do - {:ok, Enum.drop_while(comments, &(to_string(&1["id"]) == state.cursor.last_comment_id))} - end - end - - defp get_or_create_cursor(repo_owner, repo_name) do - case Comments.get_comment_cursor("github", repo_owner, repo_name) do - nil -> - Comments.create_comment_cursor(%{ - provider: "github", - repo_owner: repo_owner, - repo_name: repo_name, - timestamp: DateTime.utc_now() - }) - - comment_cursor -> - {:ok, comment_cursor} - end - end - - defp update_last_polled(comment_cursor, %{"id" => id, "updated_at" => updated_at}) do - with {:ok, updated_at, _} <- DateTime.from_iso8601(updated_at), - {:ok, cursor} <- - Comments.update_comment_cursor(comment_cursor, %{ - timestamp: updated_at, - last_comment_id: to_string(id), - last_polled_at: DateTime.utc_now() - }) do - {:ok, cursor} - else - {:error, reason} -> Logger.error("Failed to update comment cursor: #{inspect(reason)}") - end - end - - defp process_comment(%{"updated_at" => updated_at, "body" => body, "html_url" => html_url} = comment) do - with {:ok, updated_at, _} <- DateTime.from_iso8601(updated_at), - {:ok, [ticket_ref: ticket_ref], _, _, _, _} <- Parser.full_ticket_ref(html_url), - {:ok, commands} <- Command.parse(body) do - Logger.info("Latency: #{DateTime.diff(DateTime.utc_now(), updated_at, :second)}s") - - Enum.reduce_while(commands, :ok, fn command, _acc -> - res = - %{ - comment: comment, - command: Util.term_to_base64(command), - ticket_ref: Util.term_to_base64(ticket_ref) - } - |> Github.Poller.CommentConsumer.new() - |> Oban.insert() - - case res do - {:ok, _job} -> {:cont, :ok} - error -> {:halt, error} - end - end) - else - {:error, reason} -> - Logger.error("Failed to parse commands from comment: #{inspect(comment)}. Reason: #{inspect(reason)}") - - :ok - end - end - - defp get_token do - case Github.TokenPool.get_token() do - nil -> {:error, :no_token_available} - token -> {:ok, token} - end - end -end diff --git a/lib/algora/integrations/github/poller/event_consumer.ex b/lib/algora/integrations/github/poller/event_consumer.ex deleted file mode 100644 index 561611800..000000000 --- a/lib/algora/integrations/github/poller/event_consumer.ex +++ /dev/null @@ -1,46 +0,0 @@ -defmodule Algora.Github.Poller.EventConsumer do - @moduledoc false - use Oban.Worker, queue: :event_consumers - - alias Algora.Bounties - alias Algora.Github - alias Algora.Util - alias Algora.Workspace - - require Logger - - @impl Oban.Worker - def perform(%Oban.Job{args: %{"event" => event, "command" => encoded_command} = _args}) do - command = Util.base64_to_term!(encoded_command) - - run_command(command, event) - end - - defp run_command({:bounty, args}, event) do - # TODO: use user's own token if available - token = Github.TokenPool.get_token() - - with {:ok, amount} <- Keyword.fetch(args, :amount), - {:ok, user} <- Workspace.ensure_user(token, extract_actor(event)), - {:ok, _bounty} <- - Bounties.create_bounty(%{ - creator: user, - owner: user, - amount: amount, - ticket_ref: extract_ticket_ref(event) - }) do - :ok - else - {:error, _reason} = error -> - Logger.error("Failed to create bounty: #{inspect(error)}") - error - end - end - - defp extract_actor(%{"actor" => %{"login" => login}}), do: login - - defp extract_ticket_ref(%{"repo" => %{"name" => repo_full_name}, "payload" => %{"issue" => %{"number" => number}}}) do - [repo_owner, repo_name] = String.split(repo_full_name, "/") - %{owner: repo_owner, repo: repo_name, number: number} - end -end diff --git a/lib/algora/integrations/github/poller/events.ex b/lib/algora/integrations/github/poller/events.ex deleted file mode 100644 index 78eb38c46..000000000 --- a/lib/algora/integrations/github/poller/events.ex +++ /dev/null @@ -1,228 +0,0 @@ -defmodule Algora.Github.Poller.Events do - @moduledoc false - use GenServer - - import Ecto.Query, warn: false - - alias Algora.Events - alias Algora.Github - alias Algora.Github.Command - alias Algora.Repo - alias Algora.Util - - require Logger - - @per_page 10 - @poll_interval :timer.seconds(1) - - # Client API - def start_link(opts) do - GenServer.start_link(__MODULE__, opts) - end - - def pause(pid) do - GenServer.cast(pid, :pause) - end - - def resume(pid) do - GenServer.cast(pid, :resume) - end - - # Server callbacks - @impl true - def init(opts) do - repo_owner = Keyword.fetch!(opts, :repo_owner) - repo_name = Keyword.fetch!(opts, :repo_name) - backfill_limit = Keyword.get(opts, :backfill_limit, 1) - - {:ok, - %{ - repo_owner: repo_owner, - repo_name: repo_name, - backfill_limit: backfill_limit, - cursor: nil, - paused: not Algora.config([:auto_start_pollers]) - }, {:continue, :setup}} - end - - @impl true - def handle_continue(:setup, state) do - {:ok, cursor} = get_or_create_cursor(state.repo_owner, state.repo_name) - schedule_poll() - - {:noreply, %{state | cursor: cursor}} - end - - @impl true - def handle_info(:poll, %{paused: true} = state) do - {:noreply, state} - end - - @impl true - def handle_info(:poll, state) do - token = Github.TokenPool.get_token() - {:ok, new_state} = poll(token, state) - schedule_poll() - {:noreply, new_state} - end - - @impl true - def handle_cast(:pause, state) do - {:noreply, %{state | paused: true}} - end - - @impl true - def handle_cast(:resume, %{paused: true} = state) do - schedule_poll() - {:noreply, %{state | paused: false}} - end - - @impl true - def handle_cast(:resume, state) do - {:noreply, state} - end - - defp schedule_poll do - Process.send_after(self(), :poll, @poll_interval) - end - - def poll(token, state) do - with {:ok, events} <- collect_new_events(token, state), - if(length(events) > 0, do: Logger.debug("Processing #{length(events)} events")), - {:ok, updated_cursor} <- process_batch(events, state.cursor) do - {:ok, %{state | cursor: updated_cursor}} - end - end - - defp filter_new_events(events, acc, state) do - {new_events, _total_count, has_more?} = - Enum.reduce_while( - events, - {[], length(acc), true}, - fn event, {page_acc, total_count, _} -> - if should_continue_processing?(event, state, total_count) do - {:cont, {[event | page_acc], total_count + 1, true}} - else - {:halt, {page_acc, total_count, false}} - end - end - ) - - {Enum.reverse(new_events), has_more?} - end - - defp should_continue_processing?(event, state, total_count) do - cond do - event["id"] == state.cursor.last_event_id -> false - state.cursor.last_event_id != nil -> true - state.backfill_limit == :infinity -> true - total_count + 1 > state.backfill_limit -> false - true -> true - end - end - - defp collect_new_events(token, state, page \\ 1, acc \\ []) do - case fetch_events(token, state, page) do - {:ok, events} -> - {new_events, has_more?} = filter_new_events(events, acc, state) - acc = acc ++ new_events - - if has_more? do - collect_new_events(token, state, page + 1, acc) - else - {:ok, acc} - end - - {:error, reason} = error -> - Logger.error("Failed to fetch repository events: #{inspect(reason)}") - error - end - end - - defp process_batch([], event_cursor), do: {:ok, event_cursor} - - defp process_batch(events, event_cursor) do - Repo.transact(fn -> - with :ok <- process_events(events) do - update_last_polled(event_cursor, List.first(events)) - end - end) - end - - defp process_events(events) do - Enum.reduce_while(events, :ok, fn event, _acc -> - case process_event(event) do - {:ok, _} -> {:cont, :ok} - error -> {:halt, error} - end - end) - end - - defp fetch_events(token, state, page) do - Github.list_repository_events( - token, - state.repo_owner, - state.repo_name, - per_page: @per_page, - page: page - ) - end - - defp get_or_create_cursor(repo_owner, repo_name) do - case Events.get_event_cursor("github", repo_owner, repo_name) do - nil -> - Events.create_event_cursor(%{ - provider: "github", - repo_owner: repo_owner, - repo_name: repo_name - }) - - event_cursor -> - {:ok, event_cursor} - end - end - - defp update_last_polled(event_cursor, %{"id" => event_id}) when not is_nil(event_id) do - Events.update_event_cursor(event_cursor, %{ - last_event_id: event_id, - last_polled_at: DateTime.utc_now() - }) - end - - defp process_event(%{"updated_at" => updated_at} = event) do - with {:ok, updated_at, _} <- DateTime.from_iso8601(updated_at), - body = extract_body(event), - {:ok, commands} <- Command.parse(body) do - Logger.info("Latency: #{DateTime.diff(DateTime.utc_now(), updated_at, :second)}s") - - Enum.reduce_while(commands, :ok, fn command, _acc -> - res = - %{event: event, command: Util.term_to_base64(command)} - |> Github.Poller.EventConsumer.new() - |> Oban.insert() - - case res do - {:ok, _job} -> {:cont, :ok} - error -> {:halt, error} - end - end) - else - {:error, reason} -> - Logger.error("Failed to parse commands from event: #{inspect(event)}. Reason: #{inspect(reason)}") - - :ok - end - end - - defp extract_body(%{"type" => "IssueCommentEvent", "payload" => %{"action" => action, "comment" => %{"body" => body}}}) - when action in ["created", "edited"] do - body - end - - defp extract_body(%{"type" => "IssuesEvent", "payload" => %{"action" => action, "issue" => %{"body" => body}}}) - when action in ["opened", "edited"] do - body - end - - defp extract_body(_event), do: nil -end diff --git a/priv/repo/migrations/20250330142911_drop_cursors.exs b/priv/repo/migrations/20250330142911_drop_cursors.exs new file mode 100644 index 000000000..222228188 --- /dev/null +++ b/priv/repo/migrations/20250330142911_drop_cursors.exs @@ -0,0 +1,8 @@ +defmodule Algora.Repo.Migrations.DropObsoleteCursors do + use Ecto.Migration + + def change do + drop table(:comment_cursors) + drop table(:event_cursors) + end +end