diff --git a/config/config.exs b/config/config.exs index 0995b6e60..837054558 100644 --- a/config/config.exs +++ b/config/config.exs @@ -34,6 +34,7 @@ config :algora, Oban, notify_bounty: 1, notify_tip_intent: 1, notify_claim: 1, + transfers: 1, activity_notifier: 1, activity_mailer: 1 ] diff --git a/lib/algora/admin/admin.ex b/lib/algora/admin/admin.ex index 84589ddea..7d10dd8a0 100644 --- a/lib/algora/admin/admin.ex +++ b/lib/algora/admin/admin.ex @@ -2,6 +2,8 @@ defmodule Algora.Admin do @moduledoc false import Ecto.Query + alias Algora.Accounts.User + alias Algora.Payments alias Algora.Repo alias Algora.Workspace alias Algora.Workspace.Ticket @@ -72,6 +74,15 @@ defmodule Algora.Admin do |> Algora.Repo.update() end + def setup_test_account(user_handle) do + with account_id when is_binary(account_id) <- Algora.config([:stripe, :test_account_id]), + {:ok, user} <- Repo.fetch_by(User, handle: user_handle), + {:ok, acct} <- Payments.create_account(user, "US"), + {:ok, stripe_acct} <- Stripe.Account.retrieve(account_id, []) do + Payments.update_account(acct, stripe_acct) + end + end + defp update_tickets(url, repo_id) do Repo.update_all(from(t in Ticket, where: fragment("?->>'repository_url' = ?", t.provider_meta, ^url)), set: [repository_id: repo_id] diff --git a/lib/algora/bounties/bounties.ex b/lib/algora/bounties/bounties.ex index a75200c4b..a8d69b112 100644 --- a/lib/algora/bounties/bounties.ex +++ b/lib/algora/bounties/bounties.ex @@ -556,7 +556,7 @@ defmodule Algora.Bounties do |> Enum.map(&LineItem.to_stripe/1) |> Payments.create_stripe_session(%{ description: description, - metadata: %{"version" => "2", "group_id" => tx_group_id} + metadata: %{"version" => Payments.metadata_version(), "group_id" => tx_group_id} }) do {:ok, session.url} end diff --git a/lib/algora/payments/jobs/execute_pending_transfers.ex b/lib/algora/payments/jobs/execute_pending_transfers.ex new file mode 100644 index 000000000..2117e4e4d --- /dev/null +++ b/lib/algora/payments/jobs/execute_pending_transfers.ex @@ -0,0 +1,13 @@ +defmodule Algora.Payments.Jobs.ExecutePendingTransfers do + @moduledoc false + use Oban.Worker, + queue: :transfers, + max_attempts: 1 + + alias Algora.Payments + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"user_id" => user_id}}) do + Payments.execute_pending_transfers(user_id) + end +end diff --git a/lib/algora/payments/payments.ex b/lib/algora/payments/payments.ex index 96d167c75..0050566f4 100644 --- a/lib/algora/payments/payments.ex +++ b/lib/algora/payments/payments.ex @@ -1,9 +1,11 @@ defmodule Algora.Payments do @moduledoc false + import Ecto.Changeset import Ecto.Query alias Algora.Accounts alias Algora.Accounts.User + alias Algora.MoneyUtils alias Algora.Payments.Account alias Algora.Payments.Customer alias Algora.Payments.PaymentMethod @@ -14,6 +16,8 @@ defmodule Algora.Payments do require Logger + def metadata_version, do: "2" + def broadcast do Phoenix.PubSub.broadcast(Algora.PubSub, "payments:all", :payments_updated) end @@ -248,4 +252,101 @@ defmodule Algora.Payments do Repo.delete(account) end end + + def execute_pending_transfers(user_id) do + pending_amount = get_pending_amount(user_id) + + with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: "stripe", payouts_enabled: true), + true <- Money.positive?(pending_amount) do + initialize_and_execute_transfer(user_id, pending_amount, account) + else + _ -> {:ok, nil} + end + end + + defp get_pending_amount(user_id) do + total_credits = + Repo.one( + from(t in Transaction, + where: t.user_id == ^user_id, + where: t.type == :credit, + where: t.status == :succeeded, + select: sum(t.net_amount) + ) + ) || Money.zero(:USD) + + total_transfers = + Repo.one( + from(t in Transaction, + where: t.user_id == ^user_id, + where: t.type == :transfer, + where: t.status == :succeeded or t.status == :processing or t.status == :initialized, + select: sum(t.net_amount) + ) + ) || Money.zero(:USD) + + Money.sub!(total_credits, total_transfers) + end + + defp initialize_and_execute_transfer(user_id, pending_amount, account) do + with {:ok, transaction} <- initialize_transfer(user_id, pending_amount), + {:ok, transfer} <- execute_transfer(transaction, account) do + broadcast() + {:ok, transfer} + else + error -> + Logger.error("Failed to execute transfer: #{inspect(error)}") + error + end + end + + defp initialize_transfer(user_id, pending_amount) do + %Transaction{} + |> change(%{ + id: Nanoid.generate(), + provider: "stripe", + type: :transfer, + status: :initialized, + user_id: user_id, + gross_amount: pending_amount, + net_amount: pending_amount, + total_fee: Money.zero(:USD) + }) + |> Algora.Validations.validate_positive(:gross_amount) + |> Algora.Validations.validate_positive(:net_amount) + |> foreign_key_constraint(:user_id) + |> Repo.insert() + end + + defp execute_transfer(transaction, account) do + # TODO: set other params + # TODO: provide idempotency key + case Algora.Stripe.create_transfer(%{ + amount: MoneyUtils.to_minor_units(transaction.net_amount), + currency: MoneyUtils.to_stripe_currency(transaction.net_amount), + destination: account.provider_id, + metadata: %{"version" => metadata_version()} + }) do + {:ok, transfer} -> + # it's fine if this fails since we'll receive a webhook + transaction + |> change(%{ + status: :succeeded, + succeeded_at: DateTime.utc_now(), + provider_id: transfer.id, + provider_meta: Util.normalize_struct(transfer) + }) + |> Repo.update() + + {:ok, transfer} + + {:error, error} -> + # TODO: inconsistent state if this fails + transaction + |> change(%{status: :failed}) + |> Repo.update() + + {:error, error} + end + end end diff --git a/lib/algora/shared/money_utils.ex b/lib/algora/shared/money_utils.ex index 0666de989..4dd5aa4fe 100644 --- a/lib/algora/shared/money_utils.ex +++ b/lib/algora/shared/money_utils.ex @@ -17,6 +17,11 @@ defmodule Algora.MoneyUtils do amount_int end + @spec to_stripe_currency(Money.t()) :: String.t() + def to_stripe_currency(money) do + money.currency |> to_string() |> String.downcase() + end + # TODO: Find a way to make this obsolete # Why does ecto return {currency, amount} instead of Money.t()? def ensure_money_field(struct, field) do diff --git a/lib/algora_web/controllers/webhooks/stripe_controller.ex b/lib/algora_web/controllers/webhooks/stripe_controller.ex index 125b3bd2d..cacffd8a6 100644 --- a/lib/algora_web/controllers/webhooks/stripe_controller.ex +++ b/lib/algora_web/controllers/webhooks/stripe_controller.ex @@ -1,46 +1,83 @@ defmodule AlgoraWeb.Webhooks.StripeController do @behaviour Stripe.WebhookHandler + import Ecto.Changeset import Ecto.Query alias Algora.Payments + alias Algora.Payments.Jobs.ExecutePendingTransfers alias Algora.Payments.Transaction alias Algora.Repo + alias Algora.Util require Logger + @metadata_version Payments.metadata_version() + @impl true def handle_event(%Stripe.Event{ type: "charge.succeeded", - data: %{object: %{metadata: %{"version" => "2", "group_id" => group_id}}} + data: %{object: %Stripe.Charge{metadata: %{"version" => @metadata_version, "group_id" => group_id}}} }) when is_binary(group_id) do - {:ok, count} = - Repo.transact(fn -> - {count, _} = - Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id), - set: [status: :succeeded, succeeded_at: DateTime.utc_now()] - ) - - # TODO: initiate pending transfers if any recipient has a payout account - # %{transfer_id: transfer_id, user_id: user_id} - # |> Algora.Workers.InitiateTransfer.new() - # |> Oban.insert() - - {:ok, count} - end) - - if count == 0 do - {:error, :no_transactions_found} - else - Payments.broadcast() - {:ok, nil} - end + Repo.transact(fn -> + update_result = + Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id), + set: [status: :succeeded, succeeded_at: DateTime.utc_now()] + ) + + # TODO: split into two groups: + # - has active payout account -> execute pending transfers + # - has no active payout account -> notify user to connect payout account + jobs_result = + from(t in Transaction, + where: t.group_id == ^group_id, + where: t.type == :credit, + where: t.status == :succeeded + ) + |> Repo.all() + |> Enum.map(fn %{user_id: user_id} -> user_id end) + |> Enum.uniq() + |> Enum.reduce_while(:ok, fn user_id, :ok -> + case %{user_id: user_id} + |> ExecutePendingTransfers.new() + |> Oban.insert() do + {:ok, _job} -> {:cont, :ok} + error -> {:halt, error} + end + end) + + with {count, _} when count > 0 <- update_result, + :ok <- jobs_result do + Payments.broadcast() + {:ok, nil} + else + {:error, reason} -> + Logger.error("Failed to update transactions: #{inspect(reason)}") + {:error, :failed_to_update_transactions} + + _error -> + Logger.error("Failed to update transactions") + {:error, :failed_to_update_transactions} + end + end) end @impl true - def handle_event(%Stripe.Event{type: "transfer.created"} = event) do - Logger.info("Stripe #{event.type} event: #{event.id}") + def handle_event(%Stripe.Event{ + type: "transfer.created", + data: %{object: %Stripe.Transfer{metadata: %{"version" => @metadata_version}} = transfer} + }) do + with {:ok, transaction} <- Repo.fetch_by(Transaction, provider: "stripe", provider_id: transfer.id), + {:ok, _transaction} <- maybe_update_transaction(transaction, transfer) do + # TODO: notify user + Payments.broadcast() + {:ok, nil} + else + error -> + Logger.error("Failed to update transaction: #{inspect(error)}") + {:error, :failed_to_update_transaction} + end end @impl true @@ -50,4 +87,18 @@ defmodule AlgoraWeb.Webhooks.StripeController do @impl true def handle_event(_event), do: :ok + + defp maybe_update_transaction(transaction, transfer) do + if transaction.status == :succeeded do + {:ok, transaction} + else + transaction + |> change(%{ + status: :succeeded, + succeeded_at: DateTime.utc_now(), + provider_meta: Util.normalize_struct(transfer) + }) + |> Repo.update() + end + end end diff --git a/test/algora/payments_test.exs b/test/algora/payments_test.exs new file mode 100644 index 000000000..081a3137f --- /dev/null +++ b/test/algora/payments_test.exs @@ -0,0 +1,127 @@ +defmodule Algora.PaymentsTest do + use Algora.DataCase + + import Mox + + alias Algora.Payments + alias Algora.Payments.Account + alias Algora.Payments.Transaction + alias Algora.Repo + + setup :verify_on_exit! + + describe "execute_pending_transfers/1" do + setup do + user = insert(:user) + account = insert(:account, user: user) + + {:ok, user: user, account: account} + end + + test "executes transfer when user has positive balance", %{user: user, account: account} do + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1, :USD) + ) + + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(2, :USD) + ) + + stripe_transfer_id = "tr_#{Nanoid.generate()}" + + expect(Algora.StripeMock, :create_transfer, fn params -> + {:ok, + %{ + id: stripe_transfer_id, + amount: params.amount, + currency: params.currency, + destination: params.destination + }} + end) + + assert {:ok, transfer} = Payments.execute_pending_transfers(user.id) + assert transfer.id == stripe_transfer_id + assert transfer.amount == 100 + 200 + assert transfer.currency == "usd" + assert transfer.destination == account.provider_id + + transfer_tx = Repo.get_by(Transaction, provider_id: stripe_transfer_id) + assert transfer_tx.status == :succeeded + assert transfer_tx.type == :transfer + assert transfer_tx.provider == "stripe" + assert transfer_tx.provider_meta["id"] == stripe_transfer_id + assert Money.equal?(transfer_tx.net_amount, Money.new(1 + 2, :USD)) + assert Money.equal?(transfer_tx.gross_amount, Money.new(1 + 2, :USD)) + assert Money.equal?(transfer_tx.total_fee, Money.new(0, :USD)) + end + + test "does nothing when user has positive unconfirmed balance", %{user: user} do + insert(:transaction, + user: user, + type: :credit, + status: :processing, + net_amount: Money.new(1, :USD) + ) + + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + assert Transaction |> where([t], t.type == :transfer) |> Repo.aggregate(:count) == 0 + end + + test "does nothing when user has zero balance", %{user: user} do + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + assert Repo.aggregate(Transaction, :count) == 0 + end + + test "does nothing when user has payouts disabled", %{user: user, account: account} do + account |> change(payouts_enabled: false) |> Repo.update() + + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1, :USD) + ) + + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + assert Transaction |> where([t], t.type == :transfer) |> Repo.aggregate(:count) == 0 + end + + test "does nothing when user has no stripe account", %{user: user} do + Repo.delete_all(Account) + + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1, :USD) + ) + + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + assert Transaction |> where([t], t.type == :transfer) |> Repo.aggregate(:count) == 0 + end + + test "handles failed stripe transfers", %{user: user} do + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1, :USD) + ) + + expect(Algora.StripeMock, :create_transfer, fn _params -> + {:error, %{message: "Insufficient funds"}} + end) + + assert {:error, _} = Payments.execute_pending_transfers(user.id) + + transfer_tx = Repo.one(from t in Transaction, where: t.type == :transfer) + assert transfer_tx.status == :failed + end + end +end diff --git a/test/support/factory.ex b/test/support/factory.ex index 4a6b71844..7386b9ecb 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -108,9 +108,11 @@ defmodule Algora.Factory do id: Nanoid.generate(), provider: "stripe", provider_id: sequence(:acct, &"acct_#{&1}"), + provider_meta: %{}, name: "Kevin 'The Carver'", details_submitted: true, charges_enabled: true, + payouts_enabled: true, service_agreement: "recipient", country: "US", type: :express,