Skip to content

Commit 77224e1

Browse files
committed
feat: implement contributor upsert functionality and job processing
- Added `upsert_contributor/3` function to handle contributor data updates or inserts. - Introduced `ImportContributor` job to fetch and upsert contributor data from GitHub. - Created `ProcessContributors` job to process contributor data and enqueue import jobs. - Updated configuration for `sync_contribution` to a new value.
1 parent 70f061a commit 77224e1

File tree

4 files changed

+108
-3
lines changed

4 files changed

+108
-3
lines changed

config/config.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ config :algora, Oban,
6262
notify_transfer: 1,
6363
prompt_payout_connect: 1,
6464
activity_discord: 1,
65-
sync_contribution: 1
65+
sync_contribution: 20
6666
]
6767

6868
# Configures the mailer
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Algora.Workspace.Jobs.ImportContributor do
2+
@moduledoc false
3+
use Oban.Worker,
4+
queue: :fetch_top_contributions,
5+
max_attempts: 3
6+
7+
alias Algora.Github
8+
9+
@impl Oban.Worker
10+
def perform(%Oban.Job{args: %{"contributors_data" => contributors_data, "repo_id" => repo_id}}) do
11+
token = Github.TokenPool.get_token()
12+
provider_logins = Enum.map(contributors_data, & &1["provider_login"])
13+
14+
with {:ok, users} <- Algora.Workspace.fetch_top_contributions_async(token, provider_logins) do
15+
contributions_map =
16+
Map.new(contributors_data, fn contributor -> {contributor["provider_login"], contributor["contributions"]} end)
17+
18+
for user <- users do
19+
contributions = Map.get(contributions_map, user.provider_login, 0)
20+
{:ok, _} = Algora.Workspace.upsert_contributor(user.id, repo_id, contributions)
21+
{:ok, _} = Algora.Workspace.upsert_user_contribution(user.id, repo_id, contributions)
22+
end
23+
24+
:ok
25+
end
26+
end
27+
end
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
defmodule Algora.Workspace.Jobs.ProcessContributors do
2+
@moduledoc false
3+
use Oban.Worker,
4+
queue: :sync_contribution,
5+
max_attempts: 3
6+
7+
alias Algora.Github
8+
alias Algora.Workspace
9+
alias Algora.Workspace.Jobs.ImportContributor
10+
11+
require Logger
12+
13+
@impl Oban.Worker
14+
def perform(%Oban.Job{args: %{"contributors_data" => contributors_data, "repo_id" => repo_id}}) do
15+
token = Github.TokenPool.get_token()
16+
17+
jobs =
18+
contributors_data
19+
|> Enum.reduce([], fn contributor_data, acc ->
20+
provider_login = contributor_data["login"]
21+
contributions = contributor_data["contributions"]
22+
23+
case Workspace.ensure_user(token, provider_login) do
24+
{:ok, user} ->
25+
[%{provider_login: user.provider_login, contributions: contributions} | acc]
26+
27+
{:error, reason} ->
28+
Logger.error("Failed to fetch user #{provider_login}: #{reason}")
29+
acc
30+
end
31+
end)
32+
|> Enum.chunk_every(10)
33+
|> Enum.map(fn contributors -> ImportContributor.new(%{contributors_data: contributors, repo_id: repo_id}) end)
34+
|> Oban.insert_all()
35+
36+
case jobs do
37+
[] ->
38+
Logger.warning("No contributors to process for #{repo_id}")
39+
40+
_ ->
41+
:ok
42+
end
43+
end
44+
end

lib/algora/workspace/workspace.ex

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,31 @@ defmodule Algora.Workspace do
713713
Repo.aggregate(query, :sum, :contribution_count) || 0
714714
end
715715

716+
@spec upsert_contributor(String.t(), String.t(), integer()) ::
717+
{:ok, Contributor.t()} | {:error, Ecto.Changeset.t()}
718+
def upsert_contributor(user_id, repository_id, contribution_count) do
719+
attrs = %{
720+
user_id: user_id,
721+
repository_id: repository_id,
722+
contribution_count: contribution_count,
723+
last_fetched_at: DateTime.utc_now()
724+
}
725+
726+
case Repo.get_by(Contributor, user_id: user_id, repository_id: repository_id) do
727+
nil ->
728+
%Contributor{} |> Contributor.changeset(attrs) |> Repo.insert()
729+
730+
contributor ->
731+
if contributor.contributions < contribution_count do
732+
contributor
733+
|> Contributor.changeset(attrs)
734+
|> Repo.update()
735+
else
736+
{:ok, contributor}
737+
end
738+
end
739+
end
740+
716741
@spec upsert_user_contribution(String.t(), String.t(), integer()) ::
717742
{:ok, UserContribution.t()} | {:error, Ecto.Changeset.t()}
718743
def upsert_user_contribution(user_id, repository_id, contribution_count) do
@@ -724,8 +749,17 @@ defmodule Algora.Workspace do
724749
}
725750

726751
case Repo.get_by(UserContribution, user_id: user_id, repository_id: repository_id) do
727-
nil -> %UserContribution{} |> UserContribution.changeset(attrs) |> Repo.insert()
728-
contribution -> contribution |> UserContribution.changeset(attrs) |> Repo.update()
752+
nil ->
753+
%UserContribution{} |> UserContribution.changeset(attrs) |> Repo.insert()
754+
755+
contribution ->
756+
if contribution.contribution_count < contribution_count do
757+
contribution
758+
|> UserContribution.changeset(attrs)
759+
|> Repo.update()
760+
else
761+
{:ok, contribution}
762+
end
729763
end
730764
end
731765

0 commit comments

Comments
 (0)