From 06bfe9370a9e882351962cbb3eceb516f3ea8616 Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 19 May 2025 02:34:27 +0300 Subject: [PATCH 1/3] feat: process imports in batches --- lib/algora/admin/admin.ex | 23 ++-- lib/algora/cloud.ex | 4 +- .../workspace/jobs/fetch_top_contributions.ex | 8 +- lib/algora/workspace/jobs/import_stargazer.ex | 12 +-- .../workspace/jobs/sync_contribution.ex | 4 +- lib/algora/workspace/workspace.ex | 101 +++++++++++------- .../controllers/oauth_callback_controller.ex | 2 +- lib/algora_web/live/org/job_live.ex | 15 ++- 8 files changed, 99 insertions(+), 70 deletions(-) diff --git a/lib/algora/admin/admin.ex b/lib/algora/admin/admin.ex index a90a00570..36aaeb1c1 100644 --- a/lib/algora/admin/admin.ex +++ b/lib/algora/admin/admin.ex @@ -123,13 +123,16 @@ defmodule Algora.Admin do else query |> Repo.stream() - |> Enum.each(fn user -> - %{provider_login: user.provider_login} + |> Enum.chunk_every(10) + |> Enum.each(fn users -> + logins = Enum.map(users, fn user -> user.provider_login end) + + %{provider_logins: logins} |> FetchTopContributions.new() |> Oban.insert() |> case do - {:ok, _job} -> IO.puts("Enqueued job for #{user.provider_login}") - {:error, error} -> IO.puts("Failed to enqueue job for #{user.provider_login}: #{inspect(error)}") + {:ok, _job} -> IO.puts("Enqueued job for #{logins}") + {:error, error} -> IO.puts("Failed to enqueue job for #{logins}: #{inspect(error)}") end end) end @@ -1043,13 +1046,17 @@ defmodule Algora.Admin do :ok {:ok, stargazers} -> - Enum.each(stargazers, fn user -> - %{provider_login: user["login"], repo_id: repo_id} + stargazers + |> Enum.chunk_every(10) + |> Enum.each(fn users -> + logins = Enum.map(users, fn user -> user["login"] end) + + %{provider_logins: logins, repo_id: repo_id} |> ImportStargazer.new() |> Oban.insert() |> case do - {:ok, _job} -> Logger.info("Enqueued job for #{user["login"]}") - {:error, error} -> Logger.error("Failed to enqueue job for #{user["login"]}: #{inspect(error)}") + {:ok, _job} -> Logger.info("Enqueued job for #{logins}") + {:error, error} -> Logger.error("Failed to enqueue job for #{logins}: #{inspect(error)}") end end) diff --git a/lib/algora/cloud.ex b/lib/algora/cloud.ex index 442302214..1627a8a93 100644 --- a/lib/algora/cloud.ex +++ b/lib/algora/cloud.ex @@ -1,8 +1,8 @@ defmodule Algora.Cloud do @moduledoc false - def top_contributions(github_handle) do - call(AlgoraCloud, :top_contributions, [github_handle]) + def top_contributions(github_handles) do + call(AlgoraCloud, :top_contributions, [github_handles]) end def list_top_matches(opts \\ []) do diff --git a/lib/algora/workspace/jobs/fetch_top_contributions.ex b/lib/algora/workspace/jobs/fetch_top_contributions.ex index 2cbec30e2..a7d20947d 100644 --- a/lib/algora/workspace/jobs/fetch_top_contributions.ex +++ b/lib/algora/workspace/jobs/fetch_top_contributions.ex @@ -2,15 +2,13 @@ defmodule Algora.Workspace.Jobs.FetchTopContributions do @moduledoc false use Oban.Worker, queue: :fetch_top_contributions, - max_attempts: 3, - # 30 days - unique: [period: 30 * 24 * 60 * 60, fields: [:args]] + max_attempts: 3 alias Algora.Github @impl Oban.Worker - def perform(%Oban.Job{args: %{"provider_login" => provider_login}}) do - Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_login) + def perform(%Oban.Job{args: %{"provider_logins" => provider_logins}}) do + Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins) end def timeout(_), do: :timer.seconds(30) diff --git a/lib/algora/workspace/jobs/import_stargazer.ex b/lib/algora/workspace/jobs/import_stargazer.ex index 947abe94e..9e56f15ca 100644 --- a/lib/algora/workspace/jobs/import_stargazer.ex +++ b/lib/algora/workspace/jobs/import_stargazer.ex @@ -2,20 +2,16 @@ defmodule Algora.Workspace.Jobs.ImportStargazer do @moduledoc false use Oban.Worker, queue: :fetch_top_contributions, - max_attempts: 3, - # 30 days - unique: [period: 30 * 24 * 60 * 60, fields: [:args]] + max_attempts: 3 alias Algora.Github alias Algora.Repo alias Algora.Workspace.Stargazer @impl Oban.Worker - def perform(%Oban.Job{args: %{"provider_login" => provider_login, "repo_id" => repo_id}}) do - with {:ok, user} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_login) do - %Stargazer{} - |> Stargazer.changeset(%{user_id: user.id, repository_id: repo_id}) - |> Repo.insert() + def perform(%Oban.Job{args: %{"provider_logins" => provider_logins, "repo_id" => repo_id}}) do + with {:ok, users} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins) do + Repo.insert_all(Stargazer, Enum.map(users, fn user -> %{user_id: user.id, repository_id: repo_id} end)) end end diff --git a/lib/algora/workspace/jobs/sync_contribution.ex b/lib/algora/workspace/jobs/sync_contribution.ex index 9974d5067..bc86c1877 100644 --- a/lib/algora/workspace/jobs/sync_contribution.ex +++ b/lib/algora/workspace/jobs/sync_contribution.ex @@ -2,9 +2,7 @@ defmodule Algora.Workspace.Jobs.SyncContribution do @moduledoc false use Oban.Worker, queue: :sync_contribution, - max_attempts: 3, - # 30 days - unique: [period: 30 * 24 * 60 * 60, fields: [:args]] + max_attempts: 3 alias Algora.Github alias Algora.Workspace diff --git a/lib/algora/workspace/workspace.ex b/lib/algora/workspace/workspace.ex index c8e90492c..a47233b1e 100644 --- a/lib/algora/workspace/workspace.ex +++ b/lib/algora/workspace/workspace.ex @@ -698,67 +698,81 @@ defmodule Algora.Workspace do end end - def fetch_top_contributions(token, provider_login) do - with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login), - {:ok, user} <- ensure_user(token, provider_login), - :ok <- add_contributions(token, user.id, contributions) do + def fetch_top_contributions(token, provider_logins) when is_list(provider_logins) do + with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_logins), + {:ok, users} <- ensure_users(token, provider_logins), + :ok <- add_contributions(token, users, contributions) do {:ok, contributions} else {:error, reason} -> - Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}") + Logger.error("Failed to fetch contributions for #{inspect(provider_logins)}: #{inspect(reason)}") {:error, reason} end end - def fetch_top_contributions_async(token, provider_login) do - with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login), - {:ok, user} <- ensure_user(token, provider_login), - {:ok, _} <- add_contributions_async(token, user.id, contributions) do - {:ok, user} + def fetch_top_contributions_async(token, provider_logins) when is_list(provider_logins) do + with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_logins), + {:ok, users} <- ensure_users(token, provider_logins), + :ok <- add_contributions_async(token, users, contributions) do + {:ok, users} else {:error, reason} -> - Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}") + Logger.error("Failed to fetch contributions for #{inspect(provider_logins)}: #{inspect(reason)}") {:error, reason} end end - def add_contributions_async(_token, user_id, opts) do - Repo.transact(fn -> - Enum.reduce_while(opts, :ok, fn contribution, _ -> - case %{ - "user_id" => user_id, - "repo_full_name" => contribution.repo_name, - "contribution_count" => contribution.contribution_count - } - |> Jobs.SyncContribution.new() - |> Oban.insert() do - {:ok, _job} -> {:cont, :ok} - error -> {:halt, error} + defp add_contributions_async(_token, users, contributions) do + users_map = Enum.group_by(users, & &1.provider_login) + + results = + Enum.map(contributions, fn contribution -> + case users_map[contribution.provider_login] do + [user] -> + %{ + "user_id" => user.id, + "repo_full_name" => contribution.repo_name, + "contribution_count" => contribution.contribution_count + } + |> Jobs.SyncContribution.new() + |> Oban.insert() + + _ -> + {:error, :user_not_found} end end) - end) + + if Enum.any?(results, &match?({:ok, _}, &1)), do: :ok, else: {:error, :failed} end - def add_contributions(token, user_id, opts) do + defp add_contributions(token, users, contributions) do + users_map = Enum.group_by(users, & &1.provider_login) + results = - Enum.map(opts, fn %{repo_name: repo_name, contribution_count: contribution_count} -> - add_contribution(%{ - token: token, - user_id: user_id, - repo_full_name: repo_name, - contribution_count: contribution_count - }) + Enum.map(contributions, fn contribution -> + case users_map[contribution.provider_login] do + [user] -> + add_contribution(%{ + token: token, + user_id: user.id, + repo_full_name: contribution.repo_name, + contribution_count: contribution.contribution_count + }) + + _ -> + {:error, :user_not_found} + end end) if Enum.any?(results, fn result -> result == :ok end), do: :ok, else: {:error, :failed} end - def add_contribution(%{ - token: token, - user_id: user_id, - repo_full_name: repo_full_name, - contribution_count: contribution_count - }) do + defp add_contribution(%{ + token: token, + user_id: user_id, + repo_full_name: repo_full_name, + contribution_count: contribution_count + }) do with [repo_owner, repo_name] <- String.split(repo_full_name, "/"), {:ok, repo} <- ensure_repository(token, repo_owner, repo_name), {:ok, _tech_stack} <- ensure_repo_tech_stack(token, repo), @@ -766,4 +780,15 @@ defmodule Algora.Workspace do :ok end end + + def ensure_users(token, provider_logins) do + Repo.transact(fn -> + provider_logins + |> Enum.map(&ensure_user(token, &1)) + |> Enum.reduce_while({:ok, []}, fn + {:ok, user}, {:ok, users} -> {:cont, {:ok, [user | users]}} + {:error, reason}, _ -> {:halt, {:error, reason}} + end) + end) + end end diff --git a/lib/algora_web/controllers/oauth_callback_controller.ex b/lib/algora_web/controllers/oauth_callback_controller.ex index d3e9ab442..d1c7bc964 100644 --- a/lib/algora_web/controllers/oauth_callback_controller.ex +++ b/lib/algora_web/controllers/oauth_callback_controller.ex @@ -26,7 +26,7 @@ defmodule AlgoraWeb.OAuthCallbackController do {:ok, info} <- Github.OAuth.exchange_access_token(code: code, state: state), %{info: info, primary_email: primary, emails: emails, token: token} = info, {:ok, user} <- Accounts.register_github_user(conn.assigns[:current_user], primary, info, emails, token) do - %{provider_login: info["login"]} + %{provider_logins: [info["login"]]} |> Algora.Workspace.Jobs.FetchTopContributions.new() |> Oban.insert() diff --git a/lib/algora_web/live/org/job_live.ex b/lib/algora_web/live/org/job_live.ex index 1bab653d3..db4b577fd 100644 --- a/lib/algora_web/live/org/job_live.ex +++ b/lib/algora_web/live/org/job_live.ex @@ -927,14 +927,19 @@ defmodule AlgoraWeb.Org.JobLive do if Enum.any?(users_without_contributions) do Task.start(fn -> + # Process in batches of 10 users users_without_contributions + |> Enum.chunk_every(10) |> Task.async_stream( - fn handle -> - broadcast(socket.assigns.job, {:contributions_fetching, handle}) + fn handles -> + Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_fetching, &1})) - case Algora.Workspace.fetch_top_contributions(Algora.Admin.token(), handle) do - {:ok, contributions} -> broadcast(socket.assigns.job, {:contributions_fetched, handle, contributions}) - {:error, _reason} -> broadcast(socket.assigns.job, {:contributions_failed, handle}) + case Algora.Workspace.fetch_top_contributions(Algora.Admin.token(), handles) do + {:ok, contributions} -> + Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_fetched, &1, contributions})) + + {:error, _reason} -> + Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_failed, &1})) end end, timeout: length(users_without_contributions) * 60_000, From e95a8d44d63b54552be32615628382391bf20e3d Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 19 May 2025 02:40:53 +0300 Subject: [PATCH 2/3] fix: batch insert --- lib/algora/workspace/jobs/import_stargazer.ex | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/algora/workspace/jobs/import_stargazer.ex b/lib/algora/workspace/jobs/import_stargazer.ex index 9e56f15ca..3d5eefb63 100644 --- a/lib/algora/workspace/jobs/import_stargazer.ex +++ b/lib/algora/workspace/jobs/import_stargazer.ex @@ -11,7 +11,18 @@ defmodule Algora.Workspace.Jobs.ImportStargazer do @impl Oban.Worker def perform(%Oban.Job{args: %{"provider_logins" => provider_logins, "repo_id" => repo_id}}) do with {:ok, users} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins) do - Repo.insert_all(Stargazer, Enum.map(users, fn user -> %{user_id: user.id, repository_id: repo_id} end)) + Repo.insert_all( + Stargazer, + Enum.map(users, fn user -> + %{ + id: Nanoid.generate(), + inserted_at: DateTime.utc_now(), + updated_at: DateTime.utc_now(), + user_id: user.id, + repository_id: repo_id + } + end) + ) end end From f4e99c348f7a162a3b94f65fc5b88d7d0150430b Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 19 May 2025 02:44:57 +0300 Subject: [PATCH 3/3] return proper ecto response --- lib/algora/workspace/jobs/import_stargazer.ex | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/lib/algora/workspace/jobs/import_stargazer.ex b/lib/algora/workspace/jobs/import_stargazer.ex index 3d5eefb63..e95bd6e9c 100644 --- a/lib/algora/workspace/jobs/import_stargazer.ex +++ b/lib/algora/workspace/jobs/import_stargazer.ex @@ -11,18 +11,25 @@ defmodule Algora.Workspace.Jobs.ImportStargazer do @impl Oban.Worker def perform(%Oban.Job{args: %{"provider_logins" => provider_logins, "repo_id" => repo_id}}) do with {:ok, users} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins) do - Repo.insert_all( - Stargazer, - Enum.map(users, fn user -> - %{ - id: Nanoid.generate(), - inserted_at: DateTime.utc_now(), - updated_at: DateTime.utc_now(), - user_id: user.id, - repository_id: repo_id - } - end) - ) + {count, _} = + Repo.insert_all( + Stargazer, + Enum.map(users, fn user -> + %{ + id: Nanoid.generate(), + inserted_at: DateTime.utc_now(), + updated_at: DateTime.utc_now(), + user_id: user.id, + repository_id: repo_id + } + end) + ) + + if count > 0 do + :ok + else + {:error, :insert_all_failed} + end end end