Skip to content

Commit db5fe88

Browse files
committed
add oban jobs to fetch top contributions
1 parent e0f3874 commit db5fe88

File tree

6 files changed

+151
-46
lines changed

6 files changed

+151
-46
lines changed

config/config.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ config :algora, Oban,
5757
activity_notifier: 1,
5858
activity_mailer: 1,
5959
activity_discord: 10,
60-
campaign_emails: 1
60+
campaign_emails: 1,
61+
fetch_top_contributions: 3,
62+
sync_contribution: 3
6163
]
6264

6365
# Configures the mailer

lib/algora/admin/admin.ex

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,40 @@ defmodule Algora.Admin do
2222

2323
require Logger
2424

25-
def add_contributions(handle, opts) do
26-
token = token()
27-
28-
case Workspace.ensure_user(token, handle) do
29-
{:ok, user} ->
30-
results =
31-
Enum.map(opts, fn opts ->
32-
with %{repo_name: repo_name, contribution_count: contribution_count} <- opts,
33-
[repo_owner, repo_name] <- String.split(repo_name, "/"),
34-
dbg("fetching repo #{repo_owner}/#{repo_name}"),
35-
{:ok, repo} <- Workspace.ensure_repository(token, repo_owner, repo_name),
36-
{:ok, _tech_stack} <- Workspace.ensure_repo_tech_stack(token, repo),
37-
{:ok, _contribution} <- Algora.Workspace.upsert_user_contribution(user, repo, contribution_count) do
38-
:ok
39-
end
40-
end)
25+
def sync_contributions(opts \\ []) do
26+
query =
27+
User
28+
|> where([u], not is_nil(u.handle))
29+
|> where([u], not is_nil(u.provider_login))
30+
|> where([u], fragment("not exists (select 1 from user_contributions where user_contributions.user_id = ?)", u.id))
4131

42-
if Enum.any?(results, fn result -> result == :ok end), do: :ok, else: {:error, :failed}
32+
query =
33+
if handles = opts[:handles] do
34+
where(query, [u], u.handle in ^handles)
35+
else
36+
query
37+
end
4338

44-
{:error, reason} ->
45-
Logger.error("Failed to add contributions for #{handle}: #{inspect(reason)}")
46-
{:error, reason}
47-
end
39+
Repo.transaction(
40+
fn ->
41+
query
42+
|> Repo.stream()
43+
|> Enum.each(fn user ->
44+
if opts[:dry_run] do
45+
IO.puts("Enqueued job for #{user.provider_login}")
46+
else
47+
%{provider_login: user.provider_login}
48+
|> Workspace.Jobs.FetchTopContributions.new()
49+
|> Oban.insert()
50+
|> case do
51+
{:ok, _job} -> IO.puts("Enqueued job for #{user.provider_login}")
52+
{:error, error} -> IO.puts("Failed to enqueue job for #{user.provider_login}: #{inspect(error)}")
53+
end
54+
end
55+
end)
56+
end,
57+
timeout: :infinity
58+
)
4859
end
4960

5061
def release_payment(tx_id) do
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
defmodule Algora.Workspace.Jobs.FetchTopContributions do
2+
@moduledoc false
3+
use Oban.Worker,
4+
queue: :fetch_top_contributions,
5+
max_attempts: 3,
6+
# 30 days
7+
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
8+
9+
@impl Oban.Worker
10+
def perform(%Oban.Job{args: %{"provider_login" => provider_login}}) do
11+
Algora.Workspace.fetch_top_contributions_async(provider_login)
12+
end
13+
14+
def timeout(_), do: :timer.seconds(10)
15+
end
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule Algora.Workspace.Jobs.SyncContribution do
2+
@moduledoc false
3+
use Oban.Worker,
4+
queue: :sync_contribution,
5+
max_attempts: 3,
6+
# 30 days
7+
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
8+
9+
alias Algora.Workspace
10+
11+
@impl Oban.Worker
12+
def perform(%Oban.Job{
13+
args: %{"user_id" => user_id, "repo_full_name" => repo_full_name, "contribution_count" => contribution_count}
14+
}) do
15+
token = Algora.Admin.token()
16+
17+
with [repo_owner, repo_name] <- String.split(repo_full_name, "/"),
18+
{:ok, repo} <- Workspace.ensure_repository(token, repo_owner, repo_name),
19+
{:ok, _tech_stack} <- Workspace.ensure_repo_tech_stack(token, repo),
20+
{:ok, _contribution} <- Workspace.upsert_user_contribution(user_id, repo.id, contribution_count) do
21+
:ok
22+
end
23+
end
24+
end

lib/algora/workspace/workspace.ex

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,21 +131,13 @@ defmodule Algora.Workspace do
131131
preload: [user: u]
132132
)
133133

134-
res =
135-
case Repo.one(repository_query) do
136-
%Repository{} = repository -> {:ok, repository}
137-
nil -> create_repository_from_github(token, owner, repo)
138-
end
139-
140-
case res do
141-
{:ok, repository} -> maybe_schedule_og_image_update(repository)
142-
error -> error
134+
case Repo.one(repository_query) do
135+
%Repository{} = repository -> {:ok, repository}
136+
nil -> create_repository_from_github(token, owner, repo)
143137
end
144-
145-
res
146138
end
147139

148-
defp maybe_schedule_og_image_update(%Repository{} = repository) do
140+
def maybe_schedule_og_image_update(%Repository{} = repository) do
149141
one_day_ago = DateTime.add(DateTime.utc_now(), -1, :day)
150142

151143
needs_update? =
@@ -646,19 +638,84 @@ defmodule Algora.Workspace do
646638
Repo.all(query)
647639
end
648640

649-
@spec upsert_user_contribution(User.t(), Repository.t(), integer()) ::
641+
@spec upsert_user_contribution(String.t(), String.t(), integer()) ::
650642
{:ok, UserContribution.t()} | {:error, Ecto.Changeset.t()}
651-
def upsert_user_contribution(%User{} = user, %Repository{} = repository, contribution_count) do
643+
def upsert_user_contribution(user_id, repository_id, contribution_count) do
652644
attrs = %{
653-
user_id: user.id,
654-
repository_id: repository.id,
645+
user_id: user_id,
646+
repository_id: repository_id,
655647
contribution_count: contribution_count,
656648
last_fetched_at: DateTime.utc_now()
657649
}
658650

659-
case Repo.get_by(UserContribution, user_id: user.id, repository_id: repository.id) do
651+
case Repo.get_by(UserContribution, user_id: user_id, repository_id: repository_id) do
660652
nil -> %UserContribution{} |> UserContribution.changeset(attrs) |> Repo.insert()
661653
contribution -> contribution |> UserContribution.changeset(attrs) |> Repo.update()
662654
end
663655
end
656+
657+
def fetch_top_contributions(provider_login) do
658+
token = Algora.Admin.token()
659+
660+
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login),
661+
{:ok, user} <- ensure_user(token, provider_login),
662+
:ok <- add_contributions(user.id, contributions) do
663+
{:ok, contributions}
664+
else
665+
{:error, reason} ->
666+
Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}")
667+
{:error, reason}
668+
end
669+
end
670+
671+
def fetch_top_contributions_async(provider_login) do
672+
token = Algora.Admin.token()
673+
674+
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login),
675+
{:ok, user} <- ensure_user(token, provider_login),
676+
{:ok, _} <- add_contributions_async(user.id, contributions) do
677+
{:ok, nil}
678+
else
679+
{:error, reason} ->
680+
Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}")
681+
{:error, reason}
682+
end
683+
end
684+
685+
def add_contributions_async(user_id, opts) do
686+
Repo.transact(fn ->
687+
Enum.reduce_while(opts, :ok, fn contribution, _ ->
688+
case %{
689+
"user_id" => user_id,
690+
"repo_full_name" => contribution.repo_name,
691+
"contribution_count" => contribution.contribution_count
692+
}
693+
|> Jobs.SyncContribution.new()
694+
|> Oban.insert() do
695+
{:ok, _job} -> {:cont, :ok}
696+
error -> {:halt, error}
697+
end
698+
end)
699+
end)
700+
end
701+
702+
def add_contributions(user_id, opts) do
703+
results =
704+
Enum.map(opts, fn %{repo_name: repo_name, contribution_count: contribution_count} ->
705+
add_contribution(%{user_id: user_id, repo_full_name: repo_name, contribution_count: contribution_count})
706+
end)
707+
708+
if Enum.any?(results, fn result -> result == :ok end), do: :ok, else: {:error, :failed}
709+
end
710+
711+
def add_contribution(%{user_id: user_id, repo_full_name: repo_full_name, contribution_count: contribution_count}) do
712+
token = Algora.Admin.token()
713+
714+
with [repo_owner, repo_name] <- String.split(repo_full_name, "/"),
715+
{:ok, repo} <- ensure_repository(token, repo_owner, repo_name),
716+
{:ok, _tech_stack} <- ensure_repo_tech_stack(token, repo),
717+
{:ok, _contribution} <- upsert_user_contribution(user_id, repo.id, contribution_count) do
718+
:ok
719+
end
720+
end
664721
end

lib/algora_web/live/org/job_live.ex

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -729,13 +729,9 @@ defmodule AlgoraWeb.Org.JobLive do
729729
fn handle ->
730730
broadcast(socket.assigns.job, {:contributions_fetching, handle})
731731

732-
with {:ok, contributions} <- Algora.Cloud.top_contributions(handle),
733-
:ok <- Algora.Admin.add_contributions(handle, contributions) do
734-
broadcast(socket.assigns.job, {:contributions_fetched, handle, contributions})
735-
else
736-
{:error, reason} ->
737-
Logger.error("Failed to fetch contributions for #{handle}: #{inspect(reason)}")
738-
broadcast(socket.assigns.job, {:contributions_failed, handle})
732+
case Algora.Workspace.fetch_top_contributions(handle) do
733+
{:ok, contributions} -> broadcast(socket.assigns.job, {:contributions_fetched, handle, contributions})
734+
{:error, _reason} -> broadcast(socket.assigns.job, {:contributions_failed, handle})
739735
end
740736
end,
741737
timeout: length(users_without_contributions) * 60_000,

0 commit comments

Comments
 (0)