Skip to content

Commit 06bfe93

Browse files
committed
feat: process imports in batches
1 parent be3a5af commit 06bfe93

File tree

8 files changed

+99
-70
lines changed

8 files changed

+99
-70
lines changed

lib/algora/admin/admin.ex

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,16 @@ defmodule Algora.Admin do
123123
else
124124
query
125125
|> Repo.stream()
126-
|> Enum.each(fn user ->
127-
%{provider_login: user.provider_login}
126+
|> Enum.chunk_every(10)
127+
|> Enum.each(fn users ->
128+
logins = Enum.map(users, fn user -> user.provider_login end)
129+
130+
%{provider_logins: logins}
128131
|> FetchTopContributions.new()
129132
|> Oban.insert()
130133
|> case do
131-
{:ok, _job} -> IO.puts("Enqueued job for #{user.provider_login}")
132-
{:error, error} -> IO.puts("Failed to enqueue job for #{user.provider_login}: #{inspect(error)}")
134+
{:ok, _job} -> IO.puts("Enqueued job for #{logins}")
135+
{:error, error} -> IO.puts("Failed to enqueue job for #{logins}: #{inspect(error)}")
133136
end
134137
end)
135138
end
@@ -1043,13 +1046,17 @@ defmodule Algora.Admin do
10431046
:ok
10441047

10451048
{:ok, stargazers} ->
1046-
Enum.each(stargazers, fn user ->
1047-
%{provider_login: user["login"], repo_id: repo_id}
1049+
stargazers
1050+
|> Enum.chunk_every(10)
1051+
|> Enum.each(fn users ->
1052+
logins = Enum.map(users, fn user -> user["login"] end)
1053+
1054+
%{provider_logins: logins, repo_id: repo_id}
10481055
|> ImportStargazer.new()
10491056
|> Oban.insert()
10501057
|> case do
1051-
{:ok, _job} -> Logger.info("Enqueued job for #{user["login"]}")
1052-
{:error, error} -> Logger.error("Failed to enqueue job for #{user["login"]}: #{inspect(error)}")
1058+
{:ok, _job} -> Logger.info("Enqueued job for #{logins}")
1059+
{:error, error} -> Logger.error("Failed to enqueue job for #{logins}: #{inspect(error)}")
10531060
end
10541061
end)
10551062

lib/algora/cloud.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
defmodule Algora.Cloud do
22
@moduledoc false
33

4-
def top_contributions(github_handle) do
5-
call(AlgoraCloud, :top_contributions, [github_handle])
4+
def top_contributions(github_handles) do
5+
call(AlgoraCloud, :top_contributions, [github_handles])
66
end
77

88
def list_top_matches(opts \\ []) do

lib/algora/workspace/jobs/fetch_top_contributions.ex

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ defmodule Algora.Workspace.Jobs.FetchTopContributions do
22
@moduledoc false
33
use Oban.Worker,
44
queue: :fetch_top_contributions,
5-
max_attempts: 3,
6-
# 30 days
7-
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
5+
max_attempts: 3
86

97
alias Algora.Github
108

119
@impl Oban.Worker
12-
def perform(%Oban.Job{args: %{"provider_login" => provider_login}}) do
13-
Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_login)
10+
def perform(%Oban.Job{args: %{"provider_logins" => provider_logins}}) do
11+
Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins)
1412
end
1513

1614
def timeout(_), do: :timer.seconds(30)

lib/algora/workspace/jobs/import_stargazer.ex

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@ defmodule Algora.Workspace.Jobs.ImportStargazer do
22
@moduledoc false
33
use Oban.Worker,
44
queue: :fetch_top_contributions,
5-
max_attempts: 3,
6-
# 30 days
7-
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
5+
max_attempts: 3
86

97
alias Algora.Github
108
alias Algora.Repo
119
alias Algora.Workspace.Stargazer
1210

1311
@impl Oban.Worker
14-
def perform(%Oban.Job{args: %{"provider_login" => provider_login, "repo_id" => repo_id}}) do
15-
with {:ok, user} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_login) do
16-
%Stargazer{}
17-
|> Stargazer.changeset(%{user_id: user.id, repository_id: repo_id})
18-
|> Repo.insert()
12+
def perform(%Oban.Job{args: %{"provider_logins" => provider_logins, "repo_id" => repo_id}}) do
13+
with {:ok, users} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins) do
14+
Repo.insert_all(Stargazer, Enum.map(users, fn user -> %{user_id: user.id, repository_id: repo_id} end))
1915
end
2016
end
2117

lib/algora/workspace/jobs/sync_contribution.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ defmodule Algora.Workspace.Jobs.SyncContribution do
22
@moduledoc false
33
use Oban.Worker,
44
queue: :sync_contribution,
5-
max_attempts: 3,
6-
# 30 days
7-
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
5+
max_attempts: 3
86

97
alias Algora.Github
108
alias Algora.Workspace

lib/algora/workspace/workspace.ex

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -698,72 +698,97 @@ defmodule Algora.Workspace do
698698
end
699699
end
700700

701-
def fetch_top_contributions(token, provider_login) do
702-
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login),
703-
{:ok, user} <- ensure_user(token, provider_login),
704-
:ok <- add_contributions(token, user.id, contributions) do
701+
def fetch_top_contributions(token, provider_logins) when is_list(provider_logins) do
702+
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_logins),
703+
{:ok, users} <- ensure_users(token, provider_logins),
704+
:ok <- add_contributions(token, users, contributions) do
705705
{:ok, contributions}
706706
else
707707
{:error, reason} ->
708-
Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}")
708+
Logger.error("Failed to fetch contributions for #{inspect(provider_logins)}: #{inspect(reason)}")
709709
{:error, reason}
710710
end
711711
end
712712

713-
def fetch_top_contributions_async(token, provider_login) do
714-
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login),
715-
{:ok, user} <- ensure_user(token, provider_login),
716-
{:ok, _} <- add_contributions_async(token, user.id, contributions) do
717-
{:ok, user}
713+
def fetch_top_contributions_async(token, provider_logins) when is_list(provider_logins) do
714+
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_logins),
715+
{:ok, users} <- ensure_users(token, provider_logins),
716+
:ok <- add_contributions_async(token, users, contributions) do
717+
{:ok, users}
718718
else
719719
{:error, reason} ->
720-
Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}")
720+
Logger.error("Failed to fetch contributions for #{inspect(provider_logins)}: #{inspect(reason)}")
721721
{:error, reason}
722722
end
723723
end
724724

725-
def add_contributions_async(_token, user_id, opts) do
726-
Repo.transact(fn ->
727-
Enum.reduce_while(opts, :ok, fn contribution, _ ->
728-
case %{
729-
"user_id" => user_id,
730-
"repo_full_name" => contribution.repo_name,
731-
"contribution_count" => contribution.contribution_count
732-
}
733-
|> Jobs.SyncContribution.new()
734-
|> Oban.insert() do
735-
{:ok, _job} -> {:cont, :ok}
736-
error -> {:halt, error}
725+
defp add_contributions_async(_token, users, contributions) do
726+
users_map = Enum.group_by(users, & &1.provider_login)
727+
728+
results =
729+
Enum.map(contributions, fn contribution ->
730+
case users_map[contribution.provider_login] do
731+
[user] ->
732+
%{
733+
"user_id" => user.id,
734+
"repo_full_name" => contribution.repo_name,
735+
"contribution_count" => contribution.contribution_count
736+
}
737+
|> Jobs.SyncContribution.new()
738+
|> Oban.insert()
739+
740+
_ ->
741+
{:error, :user_not_found}
737742
end
738743
end)
739-
end)
744+
745+
if Enum.any?(results, &match?({:ok, _}, &1)), do: :ok, else: {:error, :failed}
740746
end
741747

742-
def add_contributions(token, user_id, opts) do
748+
defp add_contributions(token, users, contributions) do
749+
users_map = Enum.group_by(users, & &1.provider_login)
750+
743751
results =
744-
Enum.map(opts, fn %{repo_name: repo_name, contribution_count: contribution_count} ->
745-
add_contribution(%{
746-
token: token,
747-
user_id: user_id,
748-
repo_full_name: repo_name,
749-
contribution_count: contribution_count
750-
})
752+
Enum.map(contributions, fn contribution ->
753+
case users_map[contribution.provider_login] do
754+
[user] ->
755+
add_contribution(%{
756+
token: token,
757+
user_id: user.id,
758+
repo_full_name: contribution.repo_name,
759+
contribution_count: contribution.contribution_count
760+
})
761+
762+
_ ->
763+
{:error, :user_not_found}
764+
end
751765
end)
752766

753767
if Enum.any?(results, fn result -> result == :ok end), do: :ok, else: {:error, :failed}
754768
end
755769

756-
def add_contribution(%{
757-
token: token,
758-
user_id: user_id,
759-
repo_full_name: repo_full_name,
760-
contribution_count: contribution_count
761-
}) do
770+
defp add_contribution(%{
771+
token: token,
772+
user_id: user_id,
773+
repo_full_name: repo_full_name,
774+
contribution_count: contribution_count
775+
}) do
762776
with [repo_owner, repo_name] <- String.split(repo_full_name, "/"),
763777
{:ok, repo} <- ensure_repository(token, repo_owner, repo_name),
764778
{:ok, _tech_stack} <- ensure_repo_tech_stack(token, repo),
765779
{:ok, _contribution} <- upsert_user_contribution(user_id, repo.id, contribution_count) do
766780
:ok
767781
end
768782
end
783+
784+
def ensure_users(token, provider_logins) do
785+
Repo.transact(fn ->
786+
provider_logins
787+
|> Enum.map(&ensure_user(token, &1))
788+
|> Enum.reduce_while({:ok, []}, fn
789+
{:ok, user}, {:ok, users} -> {:cont, {:ok, [user | users]}}
790+
{:error, reason}, _ -> {:halt, {:error, reason}}
791+
end)
792+
end)
793+
end
769794
end

lib/algora_web/controllers/oauth_callback_controller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ defmodule AlgoraWeb.OAuthCallbackController do
2626
{:ok, info} <- Github.OAuth.exchange_access_token(code: code, state: state),
2727
%{info: info, primary_email: primary, emails: emails, token: token} = info,
2828
{:ok, user} <- Accounts.register_github_user(conn.assigns[:current_user], primary, info, emails, token) do
29-
%{provider_login: info["login"]}
29+
%{provider_logins: [info["login"]]}
3030
|> Algora.Workspace.Jobs.FetchTopContributions.new()
3131
|> Oban.insert()
3232

lib/algora_web/live/org/job_live.ex

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -927,14 +927,19 @@ defmodule AlgoraWeb.Org.JobLive do
927927

928928
if Enum.any?(users_without_contributions) do
929929
Task.start(fn ->
930+
# Process in batches of 10 users
930931
users_without_contributions
932+
|> Enum.chunk_every(10)
931933
|> Task.async_stream(
932-
fn handle ->
933-
broadcast(socket.assigns.job, {:contributions_fetching, handle})
934+
fn handles ->
935+
Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_fetching, &1}))
934936

935-
case Algora.Workspace.fetch_top_contributions(Algora.Admin.token(), handle) do
936-
{:ok, contributions} -> broadcast(socket.assigns.job, {:contributions_fetched, handle, contributions})
937-
{:error, _reason} -> broadcast(socket.assigns.job, {:contributions_failed, handle})
937+
case Algora.Workspace.fetch_top_contributions(Algora.Admin.token(), handles) do
938+
{:ok, contributions} ->
939+
Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_fetched, &1, contributions}))
940+
941+
{:error, _reason} ->
942+
Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_failed, &1}))
938943
end
939944
end,
940945
timeout: length(users_without_contributions) * 60_000,

0 commit comments

Comments
 (0)