Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions lib/algora/admin/admin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,16 @@ defmodule Algora.Admin do
else
query
|> Repo.stream()
|> Enum.each(fn user ->
%{provider_login: user.provider_login}
|> Enum.chunk_every(10)
|> Enum.each(fn users ->
logins = Enum.map(users, fn user -> user.provider_login end)

%{provider_logins: logins}
|> FetchTopContributions.new()
|> Oban.insert()
|> case do
{:ok, _job} -> IO.puts("Enqueued job for #{user.provider_login}")
{:error, error} -> IO.puts("Failed to enqueue job for #{user.provider_login}: #{inspect(error)}")
{:ok, _job} -> IO.puts("Enqueued job for #{logins}")
{:error, error} -> IO.puts("Failed to enqueue job for #{logins}: #{inspect(error)}")
end
end)
end
Expand Down Expand Up @@ -1043,13 +1046,17 @@ defmodule Algora.Admin do
:ok

{:ok, stargazers} ->
Enum.each(stargazers, fn user ->
%{provider_login: user["login"], repo_id: repo_id}
stargazers
|> Enum.chunk_every(10)
|> Enum.each(fn users ->
logins = Enum.map(users, fn user -> user["login"] end)

%{provider_logins: logins, repo_id: repo_id}
|> ImportStargazer.new()
|> Oban.insert()
|> case do
{:ok, _job} -> Logger.info("Enqueued job for #{user["login"]}")
{:error, error} -> Logger.error("Failed to enqueue job for #{user["login"]}: #{inspect(error)}")
{:ok, _job} -> Logger.info("Enqueued job for #{logins}")
{:error, error} -> Logger.error("Failed to enqueue job for #{logins}: #{inspect(error)}")
end
end)

Expand Down
4 changes: 2 additions & 2 deletions lib/algora/cloud.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule Algora.Cloud do
@moduledoc false

def top_contributions(github_handle) do
call(AlgoraCloud, :top_contributions, [github_handle])
def top_contributions(github_handles) do
call(AlgoraCloud, :top_contributions, [github_handles])
end

def list_top_matches(opts \\ []) do
Expand Down
8 changes: 3 additions & 5 deletions lib/algora/workspace/jobs/fetch_top_contributions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ defmodule Algora.Workspace.Jobs.FetchTopContributions do
@moduledoc false
use Oban.Worker,
queue: :fetch_top_contributions,
max_attempts: 3,
# 30 days
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
max_attempts: 3

alias Algora.Github

@impl Oban.Worker
def perform(%Oban.Job{args: %{"provider_login" => provider_login}}) do
Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_login)
def perform(%Oban.Job{args: %{"provider_logins" => provider_logins}}) do
Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins)
end

def timeout(_), do: :timer.seconds(30)
Expand Down
30 changes: 22 additions & 8 deletions lib/algora/workspace/jobs/import_stargazer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,34 @@ defmodule Algora.Workspace.Jobs.ImportStargazer do
@moduledoc false
use Oban.Worker,
queue: :fetch_top_contributions,
max_attempts: 3,
# 30 days
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
max_attempts: 3

alias Algora.Github
alias Algora.Repo
alias Algora.Workspace.Stargazer

@impl Oban.Worker
def perform(%Oban.Job{args: %{"provider_login" => provider_login, "repo_id" => repo_id}}) do
with {:ok, user} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_login) do
%Stargazer{}
|> Stargazer.changeset(%{user_id: user.id, repository_id: repo_id})
|> Repo.insert()
def perform(%Oban.Job{args: %{"provider_logins" => provider_logins, "repo_id" => repo_id}}) do
with {:ok, users} <- Algora.Workspace.fetch_top_contributions_async(Github.TokenPool.get_token(), provider_logins) do
{count, _} =
Repo.insert_all(
Stargazer,
Enum.map(users, fn user ->
%{
id: Nanoid.generate(),
inserted_at: DateTime.utc_now(),
updated_at: DateTime.utc_now(),
user_id: user.id,
repository_id: repo_id
}
end)
)

if count > 0 do
:ok
else
{:error, :insert_all_failed}
end
end
end

Expand Down
4 changes: 1 addition & 3 deletions lib/algora/workspace/jobs/sync_contribution.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ defmodule Algora.Workspace.Jobs.SyncContribution do
@moduledoc false
use Oban.Worker,
queue: :sync_contribution,
max_attempts: 3,
# 30 days
unique: [period: 30 * 24 * 60 * 60, fields: [:args]]
max_attempts: 3

alias Algora.Github
alias Algora.Workspace
Expand Down
101 changes: 63 additions & 38 deletions lib/algora/workspace/workspace.ex
Original file line number Diff line number Diff line change
Expand Up @@ -698,72 +698,97 @@ defmodule Algora.Workspace do
end
end

def fetch_top_contributions(token, provider_login) do
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login),
{:ok, user} <- ensure_user(token, provider_login),
:ok <- add_contributions(token, user.id, contributions) do
def fetch_top_contributions(token, provider_logins) when is_list(provider_logins) do
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_logins),
{:ok, users} <- ensure_users(token, provider_logins),
:ok <- add_contributions(token, users, contributions) do
{:ok, contributions}
else
{:error, reason} ->
Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}")
Logger.error("Failed to fetch contributions for #{inspect(provider_logins)}: #{inspect(reason)}")
{:error, reason}
end
end

def fetch_top_contributions_async(token, provider_login) do
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_login),
{:ok, user} <- ensure_user(token, provider_login),
{:ok, _} <- add_contributions_async(token, user.id, contributions) do
{:ok, user}
def fetch_top_contributions_async(token, provider_logins) when is_list(provider_logins) do
with {:ok, contributions} <- Algora.Cloud.top_contributions(provider_logins),
{:ok, users} <- ensure_users(token, provider_logins),
:ok <- add_contributions_async(token, users, contributions) do
{:ok, users}
else
{:error, reason} ->
Logger.error("Failed to fetch contributions for #{provider_login}: #{inspect(reason)}")
Logger.error("Failed to fetch contributions for #{inspect(provider_logins)}: #{inspect(reason)}")
{:error, reason}
end
end

def add_contributions_async(_token, user_id, opts) do
Repo.transact(fn ->
Enum.reduce_while(opts, :ok, fn contribution, _ ->
case %{
"user_id" => user_id,
"repo_full_name" => contribution.repo_name,
"contribution_count" => contribution.contribution_count
}
|> Jobs.SyncContribution.new()
|> Oban.insert() do
{:ok, _job} -> {:cont, :ok}
error -> {:halt, error}
defp add_contributions_async(_token, users, contributions) do
users_map = Enum.group_by(users, & &1.provider_login)

results =
Enum.map(contributions, fn contribution ->
case users_map[contribution.provider_login] do
[user] ->
%{
"user_id" => user.id,
"repo_full_name" => contribution.repo_name,
"contribution_count" => contribution.contribution_count
}
|> Jobs.SyncContribution.new()
|> Oban.insert()

_ ->
{:error, :user_not_found}
end
end)
end)

if Enum.any?(results, &match?({:ok, _}, &1)), do: :ok, else: {:error, :failed}
end

def add_contributions(token, user_id, opts) do
defp add_contributions(token, users, contributions) do
users_map = Enum.group_by(users, & &1.provider_login)

results =
Enum.map(opts, fn %{repo_name: repo_name, contribution_count: contribution_count} ->
add_contribution(%{
token: token,
user_id: user_id,
repo_full_name: repo_name,
contribution_count: contribution_count
})
Enum.map(contributions, fn contribution ->
case users_map[contribution.provider_login] do
[user] ->
add_contribution(%{
token: token,
user_id: user.id,
repo_full_name: contribution.repo_name,
contribution_count: contribution.contribution_count
})

_ ->
{:error, :user_not_found}
end
end)

if Enum.any?(results, fn result -> result == :ok end), do: :ok, else: {:error, :failed}
end

def add_contribution(%{
token: token,
user_id: user_id,
repo_full_name: repo_full_name,
contribution_count: contribution_count
}) do
defp add_contribution(%{
token: token,
user_id: user_id,
repo_full_name: repo_full_name,
contribution_count: contribution_count
}) do
with [repo_owner, repo_name] <- String.split(repo_full_name, "/"),
{:ok, repo} <- ensure_repository(token, repo_owner, repo_name),
{:ok, _tech_stack} <- ensure_repo_tech_stack(token, repo),
{:ok, _contribution} <- upsert_user_contribution(user_id, repo.id, contribution_count) do
:ok
end
end

def ensure_users(token, provider_logins) do
Repo.transact(fn ->
provider_logins
|> Enum.map(&ensure_user(token, &1))
|> Enum.reduce_while({:ok, []}, fn
{:ok, user}, {:ok, users} -> {:cont, {:ok, [user | users]}}
{:error, reason}, _ -> {:halt, {:error, reason}}
end)
end)
end
end
2 changes: 1 addition & 1 deletion lib/algora_web/controllers/oauth_callback_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule AlgoraWeb.OAuthCallbackController do
{:ok, info} <- Github.OAuth.exchange_access_token(code: code, state: state),
%{info: info, primary_email: primary, emails: emails, token: token} = info,
{:ok, user} <- Accounts.register_github_user(conn.assigns[:current_user], primary, info, emails, token) do
%{provider_login: info["login"]}
%{provider_logins: [info["login"]]}
|> Algora.Workspace.Jobs.FetchTopContributions.new()
|> Oban.insert()

Expand Down
15 changes: 10 additions & 5 deletions lib/algora_web/live/org/job_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -927,14 +927,19 @@ defmodule AlgoraWeb.Org.JobLive do

if Enum.any?(users_without_contributions) do
Task.start(fn ->
# Process in batches of 10 users
users_without_contributions
|> Enum.chunk_every(10)
|> Task.async_stream(
fn handle ->
broadcast(socket.assigns.job, {:contributions_fetching, handle})
fn handles ->
Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_fetching, &1}))

case Algora.Workspace.fetch_top_contributions(Algora.Admin.token(), handle) do
{:ok, contributions} -> broadcast(socket.assigns.job, {:contributions_fetched, handle, contributions})
{:error, _reason} -> broadcast(socket.assigns.job, {:contributions_failed, handle})
case Algora.Workspace.fetch_top_contributions(Algora.Admin.token(), handles) do
{:ok, contributions} ->
Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_fetched, &1, contributions}))

{:error, _reason} ->
Enum.each(handles, &broadcast(socket.assigns.job, {:contributions_failed, &1}))
end
end,
timeout: length(users_without_contributions) * 60_000,
Expand Down