Skip to content

Commit f49b9a6

Browse files
committed
implement transfers
1 parent 2961da3 commit f49b9a6

File tree

4 files changed

+129
-35
lines changed

4 files changed

+129
-35
lines changed

config/config.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ config :algora, Oban,
3434
notify_bounty: 1,
3535
notify_tip_intent: 1,
3636
notify_claim: 1,
37+
transfers: 1,
3738
activity_notifier: 1,
3839
activity_mailer: 1
3940
]
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
defmodule Algora.Payments.Jobs.ExecutePendingTransfers do
2+
@moduledoc false
3+
use Oban.Worker, queue: :transfers
4+
5+
import Ecto.Changeset
6+
import Ecto.Query
7+
8+
alias Algora.MoneyUtils
9+
alias Algora.Payments.Account
10+
alias Algora.Payments.Transaction
11+
alias Algora.Repo
12+
alias Algora.Util
13+
14+
require Logger
15+
16+
@impl Oban.Worker
17+
def perform(%Oban.Job{args: %{user_id: user_id, group_id: group_id}}) do
18+
total_credits =
19+
Repo.one(
20+
from(t in Transaction,
21+
where: t.user_id == ^user_id,
22+
where: t.type == :credit,
23+
where: t.status == :succeeded,
24+
select: sum(t.net_amount)
25+
)
26+
) || Money.zero(:USD)
27+
28+
total_transfers =
29+
Repo.one(
30+
from(t in Transaction,
31+
where: t.user_id == ^user_id,
32+
where: t.type == :transfer,
33+
where: t.status == :succeeded or t.status == :processing or t.status == :initialized,
34+
select: sum(t.net_amount)
35+
)
36+
) || Money.zero(:USD)
37+
38+
pending_amount = Money.sub!(total_credits, total_transfers)
39+
40+
with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: :stripe, payouts_enabled: true),
41+
true <- Money.positive?(pending_amount) do
42+
{:ok, transaction} =
43+
%Transaction{}
44+
|> change(%{
45+
provider: "stripe",
46+
type: :credit,
47+
status: :initialized,
48+
user_id: user_id,
49+
gross_amount: pending_amount,
50+
net_amount: pending_amount,
51+
total_fee: Money.zero(:USD),
52+
group_id: group_id
53+
})
54+
|> Algora.Validations.validate_positive(:gross_amount)
55+
|> Algora.Validations.validate_positive(:net_amount)
56+
|> foreign_key_constraint(:user_id)
57+
|> Repo.insert()
58+
59+
Repo.transact(fn ->
60+
# TODO: set other params
61+
# TODO: provide idempotency key
62+
{:ok, transfer} =
63+
Stripe.Transfer.create(%{
64+
amount: MoneyUtils.to_minor_units(pending_amount),
65+
currency: to_string(pending_amount.currency),
66+
destination: account.stripe_account_id
67+
})
68+
69+
{:ok, transaction} =
70+
transaction
71+
|> change(%{
72+
status: if(transfer.status == :succeeded, do: :succeeded, else: :failed),
73+
provider_id: transfer.id,
74+
provider_meta: Util.normalize_struct(transfer)
75+
})
76+
|> Repo.update()
77+
78+
{:ok, transaction}
79+
end)
80+
else
81+
_ -> :ok
82+
end
83+
end
84+
end

lib/algora/payments/jobs/execute_transfer.ex

Lines changed: 0 additions & 11 deletions
This file was deleted.

lib/algora_web/controllers/webhooks/stripe_controller.ex

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,67 @@ defmodule AlgoraWeb.Webhooks.StripeController do
44
import Ecto.Query
55

66
alias Algora.Payments
7-
alias Algora.Payments.Jobs.ExecuteTransfer
7+
alias Algora.Payments.Jobs.ExecutePendingTransfers
88
alias Algora.Payments.Transaction
99
alias Algora.Repo
1010

1111
require Logger
1212

13+
@metadata_version "2"
14+
1315
@impl true
1416
def handle_event(%Stripe.Event{
1517
type: "charge.succeeded",
16-
data: %{object: %{metadata: %{"version" => "2", "group_id" => group_id}}}
18+
data: %{object: %{metadata: %{"version" => @metadata_version, "group_id" => group_id}}}
1719
})
1820
when is_binary(group_id) do
19-
{:ok, count} =
20-
Repo.transact(fn ->
21-
{count, _} =
22-
Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id),
23-
set: [status: :succeeded, succeeded_at: DateTime.utc_now()]
24-
)
25-
26-
# TODO: get pending transfers (recipient with active payout accounts)
27-
transfers = []
28-
29-
Enum.map(transfers, fn %{transfer_id: transfer_id, user_id: user_id} ->
30-
%{transfer_id: transfer_id, user_id: user_id}
31-
|> ExecuteTransfer.new()
32-
|> Oban.insert()
21+
Repo.transact(fn ->
22+
update_result =
23+
Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id),
24+
set: [status: :succeeded, succeeded_at: DateTime.utc_now()]
25+
)
26+
27+
# TODO: split into two groups:
28+
# - has active payout account -> execute pending transfers
29+
# - has no active payout account -> notify user to connect payout account
30+
jobs_result =
31+
from(t in Transaction,
32+
where: t.group_id == ^group_id,
33+
where: t.type == :credit,
34+
where: t.status == :succeeded
35+
)
36+
|> Repo.all()
37+
|> Enum.map(fn %{user_id: user_id} -> user_id end)
38+
|> Enum.uniq()
39+
|> Enum.reduce_while(:ok, fn user_id, :ok ->
40+
case %{user_id: user_id, group_id: group_id}
41+
|> ExecutePendingTransfers.new()
42+
|> Oban.insert() do
43+
{:ok, _job} -> {:cont, :ok}
44+
error -> {:halt, error}
45+
end
3346
end)
3447

35-
{:ok, count}
36-
end)
48+
with {count, _} when count > 0 <- update_result,
49+
:ok <- jobs_result do
50+
Payments.broadcast()
51+
else
52+
{:error, reason} ->
53+
Logger.error("Failed to update transactions: #{inspect(reason)}")
54+
{:error, :failed_to_update_transactions}
3755

38-
if count == 0 do
39-
{:error, :no_transactions_found}
40-
else
41-
Payments.broadcast()
42-
{:ok, nil}
43-
end
56+
_error ->
57+
Logger.error("Failed to update transactions")
58+
{:error, :failed_to_update_transactions}
59+
end
60+
end)
4461
end
4562

4663
@impl true
4764
def handle_event(%Stripe.Event{type: "transfer.created"} = event) do
65+
# TODO: update transaction
66+
# TODO: broadcast
67+
# TODO: notify user
4868
Logger.info("Stripe #{event.type} event: #{event.id}")
4969
end
5070

0 commit comments

Comments
 (0)