Skip to content

Commit 3d954cf

Browse files
authored
[WIP] feat: transfers (#44)
1 parent d0f078e commit 3d954cf

File tree

9 files changed

+336
-25
lines changed

9 files changed

+336
-25
lines changed

config/config.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ config :algora, Oban,
3434
notify_bounty: 1,
3535
notify_tip_intent: 1,
3636
notify_claim: 1,
37+
transfers: 1,
3738
activity_notifier: 1,
3839
activity_mailer: 1
3940
]

lib/algora/admin/admin.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ defmodule Algora.Admin do
22
@moduledoc false
33
import Ecto.Query
44

5+
alias Algora.Accounts.User
6+
alias Algora.Payments
57
alias Algora.Repo
68
alias Algora.Workspace
79
alias Algora.Workspace.Ticket
@@ -72,6 +74,15 @@ defmodule Algora.Admin do
7274
|> Algora.Repo.update()
7375
end
7476

77+
def setup_test_account(user_handle) do
78+
with account_id when is_binary(account_id) <- Algora.config([:stripe, :test_account_id]),
79+
{:ok, user} <- Repo.fetch_by(User, handle: user_handle),
80+
{:ok, acct} <- Payments.create_account(user, "US"),
81+
{:ok, stripe_acct} <- Stripe.Account.retrieve(account_id, []) do
82+
Payments.update_account(acct, stripe_acct)
83+
end
84+
end
85+
7586
defp update_tickets(url, repo_id) do
7687
Repo.update_all(from(t in Ticket, where: fragment("?->>'repository_url' = ?", t.provider_meta, ^url)),
7788
set: [repository_id: repo_id]

lib/algora/bounties/bounties.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ defmodule Algora.Bounties do
556556
|> Enum.map(&LineItem.to_stripe/1)
557557
|> Payments.create_stripe_session(%{
558558
description: description,
559-
metadata: %{"version" => "2", "group_id" => tx_group_id}
559+
metadata: %{"version" => Payments.metadata_version(), "group_id" => tx_group_id}
560560
}) do
561561
{:ok, session.url}
562562
end
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
defmodule Algora.Payments.Jobs.ExecutePendingTransfers do
2+
@moduledoc false
3+
use Oban.Worker,
4+
queue: :transfers,
5+
max_attempts: 1
6+
7+
alias Algora.Payments
8+
9+
@impl Oban.Worker
10+
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
11+
Payments.execute_pending_transfers(user_id)
12+
end
13+
end

lib/algora/payments/payments.ex

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
defmodule Algora.Payments do
22
@moduledoc false
3+
import Ecto.Changeset
34
import Ecto.Query
45

56
alias Algora.Accounts
67
alias Algora.Accounts.User
8+
alias Algora.MoneyUtils
79
alias Algora.Payments.Account
810
alias Algora.Payments.Customer
911
alias Algora.Payments.PaymentMethod
@@ -14,6 +16,8 @@ defmodule Algora.Payments do
1416

1517
require Logger
1618

19+
def metadata_version, do: "2"
20+
1721
def broadcast do
1822
Phoenix.PubSub.broadcast(Algora.PubSub, "payments:all", :payments_updated)
1923
end
@@ -248,4 +252,101 @@ defmodule Algora.Payments do
248252
Repo.delete(account)
249253
end
250254
end
255+
256+
def execute_pending_transfers(user_id) do
257+
pending_amount = get_pending_amount(user_id)
258+
259+
with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: "stripe", payouts_enabled: true),
260+
true <- Money.positive?(pending_amount) do
261+
initialize_and_execute_transfer(user_id, pending_amount, account)
262+
else
263+
_ -> {:ok, nil}
264+
end
265+
end
266+
267+
defp get_pending_amount(user_id) do
268+
total_credits =
269+
Repo.one(
270+
from(t in Transaction,
271+
where: t.user_id == ^user_id,
272+
where: t.type == :credit,
273+
where: t.status == :succeeded,
274+
select: sum(t.net_amount)
275+
)
276+
) || Money.zero(:USD)
277+
278+
total_transfers =
279+
Repo.one(
280+
from(t in Transaction,
281+
where: t.user_id == ^user_id,
282+
where: t.type == :transfer,
283+
where: t.status == :succeeded or t.status == :processing or t.status == :initialized,
284+
select: sum(t.net_amount)
285+
)
286+
) || Money.zero(:USD)
287+
288+
Money.sub!(total_credits, total_transfers)
289+
end
290+
291+
defp initialize_and_execute_transfer(user_id, pending_amount, account) do
292+
with {:ok, transaction} <- initialize_transfer(user_id, pending_amount),
293+
{:ok, transfer} <- execute_transfer(transaction, account) do
294+
broadcast()
295+
{:ok, transfer}
296+
else
297+
error ->
298+
Logger.error("Failed to execute transfer: #{inspect(error)}")
299+
error
300+
end
301+
end
302+
303+
defp initialize_transfer(user_id, pending_amount) do
304+
%Transaction{}
305+
|> change(%{
306+
id: Nanoid.generate(),
307+
provider: "stripe",
308+
type: :transfer,
309+
status: :initialized,
310+
user_id: user_id,
311+
gross_amount: pending_amount,
312+
net_amount: pending_amount,
313+
total_fee: Money.zero(:USD)
314+
})
315+
|> Algora.Validations.validate_positive(:gross_amount)
316+
|> Algora.Validations.validate_positive(:net_amount)
317+
|> foreign_key_constraint(:user_id)
318+
|> Repo.insert()
319+
end
320+
321+
defp execute_transfer(transaction, account) do
322+
# TODO: set other params
323+
# TODO: provide idempotency key
324+
case Algora.Stripe.create_transfer(%{
325+
amount: MoneyUtils.to_minor_units(transaction.net_amount),
326+
currency: MoneyUtils.to_stripe_currency(transaction.net_amount),
327+
destination: account.provider_id,
328+
metadata: %{"version" => metadata_version()}
329+
}) do
330+
{:ok, transfer} ->
331+
# it's fine if this fails since we'll receive a webhook
332+
transaction
333+
|> change(%{
334+
status: :succeeded,
335+
succeeded_at: DateTime.utc_now(),
336+
provider_id: transfer.id,
337+
provider_meta: Util.normalize_struct(transfer)
338+
})
339+
|> Repo.update()
340+
341+
{:ok, transfer}
342+
343+
{:error, error} ->
344+
# TODO: inconsistent state if this fails
345+
transaction
346+
|> change(%{status: :failed})
347+
|> Repo.update()
348+
349+
{:error, error}
350+
end
351+
end
251352
end

lib/algora/shared/money_utils.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ defmodule Algora.MoneyUtils do
1717
amount_int
1818
end
1919

20+
@spec to_stripe_currency(Money.t()) :: String.t()
21+
def to_stripe_currency(money) do
22+
money.currency |> to_string() |> String.downcase()
23+
end
24+
2025
# TODO: Find a way to make this obsolete
2126
# Why does ecto return {currency, amount} instead of Money.t()?
2227
def ensure_money_field(struct, field) do
Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,83 @@
11
defmodule AlgoraWeb.Webhooks.StripeController do
22
@behaviour Stripe.WebhookHandler
33

4+
import Ecto.Changeset
45
import Ecto.Query
56

67
alias Algora.Payments
8+
alias Algora.Payments.Jobs.ExecutePendingTransfers
79
alias Algora.Payments.Transaction
810
alias Algora.Repo
11+
alias Algora.Util
912

1013
require Logger
1114

15+
@metadata_version Payments.metadata_version()
16+
1217
@impl true
1318
def handle_event(%Stripe.Event{
1419
type: "charge.succeeded",
15-
data: %{object: %{metadata: %{"version" => "2", "group_id" => group_id}}}
20+
data: %{object: %Stripe.Charge{metadata: %{"version" => @metadata_version, "group_id" => group_id}}}
1621
})
1722
when is_binary(group_id) do
18-
{:ok, count} =
19-
Repo.transact(fn ->
20-
{count, _} =
21-
Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id),
22-
set: [status: :succeeded, succeeded_at: DateTime.utc_now()]
23-
)
24-
25-
# TODO: initiate pending transfers if any recipient has a payout account
26-
# %{transfer_id: transfer_id, user_id: user_id}
27-
# |> Algora.Workers.InitiateTransfer.new()
28-
# |> Oban.insert()
29-
30-
{:ok, count}
31-
end)
32-
33-
if count == 0 do
34-
{:error, :no_transactions_found}
35-
else
36-
Payments.broadcast()
37-
{:ok, nil}
38-
end
23+
Repo.transact(fn ->
24+
update_result =
25+
Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id),
26+
set: [status: :succeeded, succeeded_at: DateTime.utc_now()]
27+
)
28+
29+
# TODO: split into two groups:
30+
# - has active payout account -> execute pending transfers
31+
# - has no active payout account -> notify user to connect payout account
32+
jobs_result =
33+
from(t in Transaction,
34+
where: t.group_id == ^group_id,
35+
where: t.type == :credit,
36+
where: t.status == :succeeded
37+
)
38+
|> Repo.all()
39+
|> Enum.map(fn %{user_id: user_id} -> user_id end)
40+
|> Enum.uniq()
41+
|> Enum.reduce_while(:ok, fn user_id, :ok ->
42+
case %{user_id: user_id}
43+
|> ExecutePendingTransfers.new()
44+
|> Oban.insert() do
45+
{:ok, _job} -> {:cont, :ok}
46+
error -> {:halt, error}
47+
end
48+
end)
49+
50+
with {count, _} when count > 0 <- update_result,
51+
:ok <- jobs_result do
52+
Payments.broadcast()
53+
{:ok, nil}
54+
else
55+
{:error, reason} ->
56+
Logger.error("Failed to update transactions: #{inspect(reason)}")
57+
{:error, :failed_to_update_transactions}
58+
59+
_error ->
60+
Logger.error("Failed to update transactions")
61+
{:error, :failed_to_update_transactions}
62+
end
63+
end)
3964
end
4065

4166
@impl true
42-
def handle_event(%Stripe.Event{type: "transfer.created"} = event) do
43-
Logger.info("Stripe #{event.type} event: #{event.id}")
67+
def handle_event(%Stripe.Event{
68+
type: "transfer.created",
69+
data: %{object: %Stripe.Transfer{metadata: %{"version" => @metadata_version}} = transfer}
70+
}) do
71+
with {:ok, transaction} <- Repo.fetch_by(Transaction, provider: "stripe", provider_id: transfer.id),
72+
{:ok, _transaction} <- maybe_update_transaction(transaction, transfer) do
73+
# TODO: notify user
74+
Payments.broadcast()
75+
{:ok, nil}
76+
else
77+
error ->
78+
Logger.error("Failed to update transaction: #{inspect(error)}")
79+
{:error, :failed_to_update_transaction}
80+
end
4481
end
4582

4683
@impl true
@@ -50,4 +87,18 @@ defmodule AlgoraWeb.Webhooks.StripeController do
5087

5188
@impl true
5289
def handle_event(_event), do: :ok
90+
91+
defp maybe_update_transaction(transaction, transfer) do
92+
if transaction.status == :succeeded do
93+
{:ok, transaction}
94+
else
95+
transaction
96+
|> change(%{
97+
status: :succeeded,
98+
succeeded_at: DateTime.utc_now(),
99+
provider_meta: Util.normalize_struct(transfer)
100+
})
101+
|> Repo.update()
102+
end
103+
end
53104
end

0 commit comments

Comments
 (0)