Skip to content

Commit e7e3886

Browse files
committed
execute separate transfer for each payable credit
1 parent 33bcf04 commit e7e3886

File tree

5 files changed

+247
-119
lines changed

5 files changed

+247
-119
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Algora.Payments.Jobs.ExecutePendingTransfers do
1+
defmodule Algora.Payments.Jobs.ExecutePendingTransfer do
22
@moduledoc false
33
use Oban.Worker,
44
queue: :transfers,
@@ -7,7 +7,7 @@ defmodule Algora.Payments.Jobs.ExecutePendingTransfers do
77
alias Algora.Payments
88

99
@impl Oban.Worker
10-
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
11-
Payments.execute_pending_transfers(user_id)
10+
def perform(%Oban.Job{args: %{"credit_id" => credit_id}}) do
11+
Payments.execute_pending_transfer(credit_id)
1212
end
1313
end

lib/algora/payments/payments.ex

Lines changed: 100 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Algora.Payments do
88
alias Algora.MoneyUtils
99
alias Algora.Payments.Account
1010
alias Algora.Payments.Customer
11+
alias Algora.Payments.Jobs
1112
alias Algora.Payments.PaymentMethod
1213
alias Algora.Payments.Transaction
1314
alias Algora.Repo
@@ -319,86 +320,131 @@ defmodule Algora.Payments do
319320
end
320321
end
321322

322-
@spec execute_pending_transfers(user_id :: String.t()) :: {:ok, Stripe.Transfer.t()} | {:error, :not_found}
323-
def execute_pending_transfers(user_id) do
324-
pending_amount = get_pending_amount(user_id)
323+
@spec execute_pending_transfer(credit_id :: String.t()) ::
324+
{:ok, Stripe.Transfer.t()} | {:error, :not_found} | {:error, :duplicate_transfer_attempt}
325+
def execute_pending_transfer(credit_id) do
326+
with {:ok, credit} <- Repo.fetch_by(Transaction, id: credit_id, type: :credit, status: :succeeded) do
327+
transfers =
328+
Repo.all(
329+
from(t in Transaction,
330+
where: t.user_id == ^credit.user_id,
331+
where: t.group_id == ^credit.group_id,
332+
where: t.type == :transfer,
333+
where: t.status in [:initialized, :processing, :succeeded]
334+
)
335+
)
325336

326-
with {:ok, account} <- fetch_active_account(user_id),
327-
true <- Money.positive?(pending_amount) do
328-
initialize_and_execute_transfer(user_id, pending_amount, account)
329-
else
330-
_ -> {:ok, nil}
337+
amount_transferred = Enum.reduce(transfers, Money.zero(:USD), fn t, acc -> Money.add!(acc, t.net_amount) end)
338+
339+
if Money.positive?(amount_transferred) do
340+
Logger.error("Duplicate transfer attempt at transaction #{credit_id}")
341+
{:error, :duplicate_transfer_attempt}
342+
else
343+
initialize_and_execute_transfer(credit)
344+
end
331345
end
332346
end
333347

334-
@spec fetch_active_account(user_id :: String.t()) :: {:ok, Account.t()} | {:error, :not_found}
335-
def fetch_active_account(user_id) do
336-
Repo.fetch_by(Account, user_id: user_id, provider: "stripe", payouts_enabled: true)
348+
def list_payable_credits(user_id) do
349+
Repo.all(
350+
from(cr in Transaction,
351+
left_join: tr in Transaction,
352+
on:
353+
tr.user_id == cr.user_id and tr.group_id == cr.group_id and tr.type == :transfer and
354+
tr.status in [:initialized, :processing, :succeeded],
355+
where: cr.user_id == ^user_id,
356+
where: cr.type == :credit,
357+
where: cr.status == :succeeded,
358+
where: is_nil(tr.id)
359+
)
360+
)
337361
end
338362

339-
defp get_pending_amount(user_id) do
340-
total_credits =
341-
Repo.one(
342-
from(t in Transaction,
343-
where: t.user_id == ^user_id,
344-
where: t.type == :credit,
345-
where: t.status == :succeeded,
346-
select: sum(t.net_amount)
347-
)
348-
) || Money.zero(:USD)
349-
350-
total_transfers =
351-
Repo.one(
352-
from(t in Transaction,
353-
where: t.user_id == ^user_id,
354-
where: t.type == :transfer,
355-
where: t.status == :succeeded or t.status == :processing or t.status == :initialized,
356-
select: sum(t.net_amount)
357-
)
358-
) || Money.zero(:USD)
363+
@spec enqueue_pending_transfers(user_id :: String.t()) :: {:ok, nil} | {:error, term()}
364+
def enqueue_pending_transfers(user_id) do
365+
Repo.transact(fn ->
366+
with {:ok, _account} <- fetch_active_account(user_id),
367+
credits = list_payable_credits(user_id),
368+
:ok <-
369+
Enum.reduce_while(credits, :ok, fn credit, :ok ->
370+
case %{credit_id: credit.id}
371+
|> Jobs.ExecutePendingTransfer.new()
372+
|> Oban.insert() do
373+
{:ok, _job} -> {:cont, :ok}
374+
error -> {:halt, error}
375+
end
376+
end) do
377+
{:ok, nil}
378+
else
379+
{:error, reason} ->
380+
Logger.error("Failed to execute pending transfers: #{inspect(reason)}")
381+
{:error, reason}
382+
end
383+
end)
384+
end
359385

360-
Money.sub!(total_credits, total_transfers)
386+
@spec fetch_active_account(user_id :: String.t()) :: {:ok, Account.t()} | {:error, :no_active_account}
387+
def fetch_active_account(user_id) do
388+
case Repo.fetch_by(Account, user_id: user_id, provider: "stripe", payouts_enabled: true) do
389+
{:ok, account} -> {:ok, account}
390+
{:error, :not_found} -> {:error, :no_active_account}
391+
end
361392
end
362393

363-
defp initialize_and_execute_transfer(user_id, pending_amount, account) do
364-
with {:ok, transaction} <- initialize_transfer(user_id, pending_amount),
365-
{:ok, transfer} <- execute_transfer(transaction, account) do
366-
broadcast()
367-
{:ok, transfer}
368-
else
369-
error ->
370-
Logger.error("Failed to execute transfer: #{inspect(error)}")
371-
error
394+
@spec initialize_and_execute_transfer(credit :: Transaction.t()) :: {:ok, Stripe.Transfer.t()} | {:error, term()}
395+
defp initialize_and_execute_transfer(%Transaction{} = credit) do
396+
case fetch_active_account(credit.user_id) do
397+
{:ok, account} ->
398+
with {:ok, transaction} <- initialize_transfer(credit),
399+
{:ok, transfer} <- execute_transfer(transaction, account) do
400+
broadcast()
401+
{:ok, transfer}
402+
else
403+
error ->
404+
Logger.error("Failed to execute transfer: #{inspect(error)}")
405+
error
406+
end
407+
408+
_ ->
409+
Logger.error("Attempted to execute transfer to inactive account")
410+
{:error, :no_active_account}
372411
end
373412
end
374413

375-
defp initialize_transfer(user_id, pending_amount) do
414+
defp initialize_transfer(%Transaction{} = credit) do
376415
%Transaction{}
377416
|> change(%{
378417
id: Nanoid.generate(),
379418
provider: "stripe",
380419
type: :transfer,
381420
status: :initialized,
382-
user_id: user_id,
383-
gross_amount: pending_amount,
384-
net_amount: pending_amount,
385-
total_fee: Money.zero(:USD)
421+
user_id: credit.user_id,
422+
gross_amount: credit.net_amount,
423+
net_amount: credit.net_amount,
424+
total_fee: Money.zero(:USD),
425+
group_id: credit.group_id
386426
})
387427
|> Algora.Validations.validate_positive(:gross_amount)
388428
|> Algora.Validations.validate_positive(:net_amount)
389429
|> foreign_key_constraint(:user_id)
390430
|> Repo.insert()
391431
end
392432

393-
defp execute_transfer(transaction, account) do
394-
# TODO: set other params
433+
defp execute_transfer(%Transaction{} = transaction, account) do
434+
charge = Repo.get_by(Transaction, type: :credit, status: :succeeded, group_id: transaction.group_id)
435+
436+
transfer_params =
437+
%{
438+
amount: MoneyUtils.to_minor_units(transaction.net_amount),
439+
currency: MoneyUtils.to_stripe_currency(transaction.net_amount),
440+
destination: account.provider_id,
441+
metadata: %{"version" => metadata_version()}
442+
}
443+
|> Map.merge(if transaction.group_id, do: %{transfer_group: transaction.group_id}, else: %{})
444+
|> Map.merge(if charge && charge.provider_id, do: %{source_transaction: charge.provider_id}, else: %{})
445+
395446
# TODO: provide idempotency key
396-
case Algora.Stripe.Transfer.create(%{
397-
amount: MoneyUtils.to_minor_units(transaction.net_amount),
398-
currency: MoneyUtils.to_stripe_currency(transaction.net_amount),
399-
destination: account.provider_id,
400-
metadata: %{"version" => metadata_version()}
401-
}) do
447+
case Algora.Stripe.Transfer.create(transfer_params) do
402448
{:ok, transfer} ->
403449
# it's fine if this fails since we'll receive a webhook
404450
transaction

lib/algora_web/controllers/webhooks/stripe_controller.ex

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ defmodule AlgoraWeb.Webhooks.StripeController do
77
alias Algora.Bounties
88
alias Algora.Payments
99
alias Algora.Payments.Customer
10-
alias Algora.Payments.Jobs.ExecutePendingTransfers
1110
alias Algora.Payments.Transaction
1211
alias Algora.Repo
1312
alias Algora.Util
@@ -35,21 +34,20 @@ defmodule AlgoraWeb.Webhooks.StripeController do
3534
where: t.status == :succeeded
3635
)
3736
|> Repo.all()
38-
|> Enum.map(fn %{user_id: user_id} -> user_id end)
39-
|> Enum.uniq()
40-
|> Enum.reduce_while(:ok, fn user_id, :ok ->
41-
case Payments.fetch_active_account(user_id) do
37+
|> Enum.reduce_while(:ok, fn credit, :ok ->
38+
case Payments.fetch_active_account(credit.user_id) do
4239
{:ok, _account} ->
43-
case %{user_id: user_id}
44-
|> Payments.Jobs.ExecutePendingTransfers.new()
40+
case %{credit_id: credit.id}
41+
|> Payments.Jobs.ExecutePendingTransfer.new()
4542
|> Oban.insert() do
4643
{:ok, _job} -> {:cont, :ok}
4744
error -> {:halt, error}
4845
end
4946

50-
{:error, :not_found} ->
47+
{:error, :no_active_account} ->
5148
# TODO:
5249
installation_id = 0
50+
5351
# TODO:
5452
ticket_ref = %{"owner" => "", "repo" => "", "number" => 0}
5553

0 commit comments

Comments
 (0)