From 2961da30eb1645b3349f9bd98bf8ebbdc99cc64b Mon Sep 17 00:00:00 2001 From: zafer Date: Sun, 26 Jan 2025 12:31:08 +0300 Subject: [PATCH 01/11] init transfers --- lib/algora/payments/jobs/execute_transfer.ex | 11 +++++++++++ .../controllers/webhooks/stripe_controller.ex | 13 +++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) create mode 100644 lib/algora/payments/jobs/execute_transfer.ex diff --git a/lib/algora/payments/jobs/execute_transfer.ex b/lib/algora/payments/jobs/execute_transfer.ex new file mode 100644 index 000000000..7ea924234 --- /dev/null +++ b/lib/algora/payments/jobs/execute_transfer.ex @@ -0,0 +1,11 @@ +defmodule Algora.Payments.Jobs.ExecuteTransfer do + @moduledoc false + use Oban.Worker, queue: :execute_transfer + + @impl Oban.Worker + def perform(%Oban.Job{args: %{transfer_id: transfer_id, user_id: user_id}}) do + # TODO: execute transfer + dbg("executing transfer #{transfer_id} for user #{user_id}") + {:error, :not_implemented} + end +end diff --git a/lib/algora_web/controllers/webhooks/stripe_controller.ex b/lib/algora_web/controllers/webhooks/stripe_controller.ex index 125b3bd2d..2b0cf1c03 100644 --- a/lib/algora_web/controllers/webhooks/stripe_controller.ex +++ b/lib/algora_web/controllers/webhooks/stripe_controller.ex @@ -4,6 +4,7 @@ defmodule AlgoraWeb.Webhooks.StripeController do import Ecto.Query alias Algora.Payments + alias Algora.Payments.Jobs.ExecuteTransfer alias Algora.Payments.Transaction alias Algora.Repo @@ -22,10 +23,14 @@ defmodule AlgoraWeb.Webhooks.StripeController do set: [status: :succeeded, succeeded_at: DateTime.utc_now()] ) - # TODO: initiate pending transfers if any recipient has a payout account - # %{transfer_id: transfer_id, user_id: user_id} - # |> Algora.Workers.InitiateTransfer.new() - # |> Oban.insert() + # TODO: get pending transfers (recipient with active payout accounts) + transfers = [] + + Enum.map(transfers, fn %{transfer_id: transfer_id, user_id: user_id} -> + %{transfer_id: transfer_id, user_id: user_id} + |> ExecuteTransfer.new() + |> Oban.insert() + end) {:ok, count} end) From f49b9a60b2e0585de3cf610a2958074228634640 Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 00:27:26 +0300 Subject: [PATCH 02/11] implement transfers --- config/config.exs | 1 + .../jobs/execute_pending_transfers.ex | 84 +++++++++++++++++++ lib/algora/payments/jobs/execute_transfer.ex | 11 --- .../controllers/webhooks/stripe_controller.ex | 68 +++++++++------ 4 files changed, 129 insertions(+), 35 deletions(-) create mode 100644 lib/algora/payments/jobs/execute_pending_transfers.ex delete mode 100644 lib/algora/payments/jobs/execute_transfer.ex diff --git a/config/config.exs b/config/config.exs index 0995b6e60..837054558 100644 --- a/config/config.exs +++ b/config/config.exs @@ -34,6 +34,7 @@ config :algora, Oban, notify_bounty: 1, notify_tip_intent: 1, notify_claim: 1, + transfers: 1, activity_notifier: 1, activity_mailer: 1 ] diff --git a/lib/algora/payments/jobs/execute_pending_transfers.ex b/lib/algora/payments/jobs/execute_pending_transfers.ex new file mode 100644 index 000000000..b3d65a9e5 --- /dev/null +++ b/lib/algora/payments/jobs/execute_pending_transfers.ex @@ -0,0 +1,84 @@ +defmodule Algora.Payments.Jobs.ExecutePendingTransfers do + @moduledoc false + use Oban.Worker, queue: :transfers + + import Ecto.Changeset + import Ecto.Query + + alias Algora.MoneyUtils + alias Algora.Payments.Account + alias Algora.Payments.Transaction + alias Algora.Repo + alias Algora.Util + + require Logger + + @impl Oban.Worker + def perform(%Oban.Job{args: %{user_id: user_id, group_id: group_id}}) do + total_credits = + Repo.one( + from(t in Transaction, + where: t.user_id == ^user_id, + where: t.type == :credit, + where: t.status == :succeeded, + select: sum(t.net_amount) + ) + ) || Money.zero(:USD) + + total_transfers = + Repo.one( + from(t in Transaction, + where: t.user_id == ^user_id, + where: t.type == :transfer, + where: t.status == :succeeded or t.status == :processing or t.status == :initialized, + select: sum(t.net_amount) + ) + ) || Money.zero(:USD) + + pending_amount = Money.sub!(total_credits, total_transfers) + + with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: :stripe, payouts_enabled: true), + true <- Money.positive?(pending_amount) do + {:ok, transaction} = + %Transaction{} + |> change(%{ + provider: "stripe", + type: :credit, + status: :initialized, + user_id: user_id, + gross_amount: pending_amount, + net_amount: pending_amount, + total_fee: Money.zero(:USD), + group_id: group_id + }) + |> Algora.Validations.validate_positive(:gross_amount) + |> Algora.Validations.validate_positive(:net_amount) + |> foreign_key_constraint(:user_id) + |> Repo.insert() + + Repo.transact(fn -> + # TODO: set other params + # TODO: provide idempotency key + {:ok, transfer} = + Stripe.Transfer.create(%{ + amount: MoneyUtils.to_minor_units(pending_amount), + currency: to_string(pending_amount.currency), + destination: account.stripe_account_id + }) + + {:ok, transaction} = + transaction + |> change(%{ + status: if(transfer.status == :succeeded, do: :succeeded, else: :failed), + provider_id: transfer.id, + provider_meta: Util.normalize_struct(transfer) + }) + |> Repo.update() + + {:ok, transaction} + end) + else + _ -> :ok + end + end +end diff --git a/lib/algora/payments/jobs/execute_transfer.ex b/lib/algora/payments/jobs/execute_transfer.ex deleted file mode 100644 index 7ea924234..000000000 --- a/lib/algora/payments/jobs/execute_transfer.ex +++ /dev/null @@ -1,11 +0,0 @@ -defmodule Algora.Payments.Jobs.ExecuteTransfer do - @moduledoc false - use Oban.Worker, queue: :execute_transfer - - @impl Oban.Worker - def perform(%Oban.Job{args: %{transfer_id: transfer_id, user_id: user_id}}) do - # TODO: execute transfer - dbg("executing transfer #{transfer_id} for user #{user_id}") - {:error, :not_implemented} - end -end diff --git a/lib/algora_web/controllers/webhooks/stripe_controller.ex b/lib/algora_web/controllers/webhooks/stripe_controller.ex index 2b0cf1c03..0b816f178 100644 --- a/lib/algora_web/controllers/webhooks/stripe_controller.ex +++ b/lib/algora_web/controllers/webhooks/stripe_controller.ex @@ -4,47 +4,67 @@ defmodule AlgoraWeb.Webhooks.StripeController do import Ecto.Query alias Algora.Payments - alias Algora.Payments.Jobs.ExecuteTransfer + alias Algora.Payments.Jobs.ExecutePendingTransfers alias Algora.Payments.Transaction alias Algora.Repo require Logger + @metadata_version "2" + @impl true def handle_event(%Stripe.Event{ type: "charge.succeeded", - data: %{object: %{metadata: %{"version" => "2", "group_id" => group_id}}} + data: %{object: %{metadata: %{"version" => @metadata_version, "group_id" => group_id}}} }) when is_binary(group_id) do - {:ok, count} = - Repo.transact(fn -> - {count, _} = - Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id), - set: [status: :succeeded, succeeded_at: DateTime.utc_now()] - ) - - # TODO: get pending transfers (recipient with active payout accounts) - transfers = [] - - Enum.map(transfers, fn %{transfer_id: transfer_id, user_id: user_id} -> - %{transfer_id: transfer_id, user_id: user_id} - |> ExecuteTransfer.new() - |> Oban.insert() + Repo.transact(fn -> + update_result = + Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id), + set: [status: :succeeded, succeeded_at: DateTime.utc_now()] + ) + + # TODO: split into two groups: + # - has active payout account -> execute pending transfers + # - has no active payout account -> notify user to connect payout account + jobs_result = + from(t in Transaction, + where: t.group_id == ^group_id, + where: t.type == :credit, + where: t.status == :succeeded + ) + |> Repo.all() + |> Enum.map(fn %{user_id: user_id} -> user_id end) + |> Enum.uniq() + |> Enum.reduce_while(:ok, fn user_id, :ok -> + case %{user_id: user_id, group_id: group_id} + |> ExecutePendingTransfers.new() + |> Oban.insert() do + {:ok, _job} -> {:cont, :ok} + error -> {:halt, error} + end end) - {:ok, count} - end) + with {count, _} when count > 0 <- update_result, + :ok <- jobs_result do + Payments.broadcast() + else + {:error, reason} -> + Logger.error("Failed to update transactions: #{inspect(reason)}") + {:error, :failed_to_update_transactions} - if count == 0 do - {:error, :no_transactions_found} - else - Payments.broadcast() - {:ok, nil} - end + _error -> + Logger.error("Failed to update transactions") + {:error, :failed_to_update_transactions} + end + end) end @impl true def handle_event(%Stripe.Event{type: "transfer.created"} = event) do + # TODO: update transaction + # TODO: broadcast + # TODO: notify user Logger.info("Stripe #{event.type} event: #{event.id}") end From 0d7ca835a2b350fab89755ab3a1b00b874b17c96 Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 00:38:08 +0300 Subject: [PATCH 03/11] clean up module --- .../jobs/execute_pending_transfers.ex | 108 +++++++++++------- 1 file changed, 67 insertions(+), 41 deletions(-) diff --git a/lib/algora/payments/jobs/execute_pending_transfers.ex b/lib/algora/payments/jobs/execute_pending_transfers.ex index b3d65a9e5..90a0cfd0e 100644 --- a/lib/algora/payments/jobs/execute_pending_transfers.ex +++ b/lib/algora/payments/jobs/execute_pending_transfers.ex @@ -1,6 +1,8 @@ defmodule Algora.Payments.Jobs.ExecutePendingTransfers do @moduledoc false - use Oban.Worker, queue: :transfers + use Oban.Worker, + queue: :transfers, + max_attempts: 1 import Ecto.Changeset import Ecto.Query @@ -15,6 +17,17 @@ defmodule Algora.Payments.Jobs.ExecutePendingTransfers do @impl Oban.Worker def perform(%Oban.Job{args: %{user_id: user_id, group_id: group_id}}) do + pending_amount = get_pending_amount(user_id) + + with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: :stripe, payouts_enabled: true), + true <- Money.positive?(pending_amount) do + initialize_and_execute_transfer(user_id, group_id, pending_amount, account) + else + _ -> :ok + end + end + + defp get_pending_amount(user_id) do total_credits = Repo.one( from(t in Transaction, @@ -35,50 +48,63 @@ defmodule Algora.Payments.Jobs.ExecutePendingTransfers do ) ) || Money.zero(:USD) - pending_amount = Money.sub!(total_credits, total_transfers) + Money.sub!(total_credits, total_transfers) + end - with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: :stripe, payouts_enabled: true), - true <- Money.positive?(pending_amount) do - {:ok, transaction} = - %Transaction{} - |> change(%{ - provider: "stripe", - type: :credit, - status: :initialized, - user_id: user_id, - gross_amount: pending_amount, - net_amount: pending_amount, - total_fee: Money.zero(:USD), - group_id: group_id - }) - |> Algora.Validations.validate_positive(:gross_amount) - |> Algora.Validations.validate_positive(:net_amount) - |> foreign_key_constraint(:user_id) - |> Repo.insert() + defp initialize_and_execute_transfer(user_id, group_id, pending_amount, account) do + with {:ok, transaction} <- initialize_transfer(user_id, group_id, pending_amount), + {:ok, transfer} <- execute_transfer(transaction, account) do + {:ok, transfer} + else + error -> + Logger.error("Failed to execute transfer: #{inspect(error)}") + error + end + end - Repo.transact(fn -> - # TODO: set other params - # TODO: provide idempotency key - {:ok, transfer} = - Stripe.Transfer.create(%{ - amount: MoneyUtils.to_minor_units(pending_amount), - currency: to_string(pending_amount.currency), - destination: account.stripe_account_id - }) + defp initialize_transfer(user_id, group_id, pending_amount) do + %Transaction{} + |> change(%{ + provider: "stripe", + type: :credit, + status: :initialized, + user_id: user_id, + gross_amount: pending_amount, + net_amount: pending_amount, + total_fee: Money.zero(:USD), + group_id: group_id + }) + |> Algora.Validations.validate_positive(:gross_amount) + |> Algora.Validations.validate_positive(:net_amount) + |> foreign_key_constraint(:user_id) + |> Repo.insert() + end - {:ok, transaction} = - transaction - |> change(%{ - status: if(transfer.status == :succeeded, do: :succeeded, else: :failed), - provider_id: transfer.id, - provider_meta: Util.normalize_struct(transfer) - }) - |> Repo.update() + defp execute_transfer(transaction, account) do + # TODO: set other params + # TODO: provide idempotency key + case Stripe.Transfer.create(%{ + amount: MoneyUtils.to_minor_units(transaction.net_amount), + currency: to_string(transaction.net_amount.currency), + destination: account.stripe_account_id + }) do + {:ok, transfer} -> + # it's fine if this fails since we'll receive a webhook + _result = try_update_transaction(transaction, transfer) + {:ok, transfer} - {:ok, transaction} - end) - else - _ -> :ok + {:error, error} -> + {:error, error} end end + + defp try_update_transaction(transaction, transfer) do + transaction + |> change(%{ + status: if(transfer.status == :succeeded, do: :succeeded, else: :failed), + provider_id: transfer.id, + provider_meta: Util.normalize_struct(transfer) + }) + |> Repo.update() + end end From 1d59dc962e834baed0a3ecb00cd5302f436d24d1 Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 20:32:54 +0300 Subject: [PATCH 04/11] move function to Payments module --- .../jobs/execute_pending_transfers.ex | 103 +----------------- lib/algora/payments/payments.ex | 94 ++++++++++++++++ .../controllers/webhooks/stripe_controller.ex | 2 +- 3 files changed, 98 insertions(+), 101 deletions(-) diff --git a/lib/algora/payments/jobs/execute_pending_transfers.ex b/lib/algora/payments/jobs/execute_pending_transfers.ex index 90a0cfd0e..f38fee414 100644 --- a/lib/algora/payments/jobs/execute_pending_transfers.ex +++ b/lib/algora/payments/jobs/execute_pending_transfers.ex @@ -4,107 +4,10 @@ defmodule Algora.Payments.Jobs.ExecutePendingTransfers do queue: :transfers, max_attempts: 1 - import Ecto.Changeset - import Ecto.Query - - alias Algora.MoneyUtils - alias Algora.Payments.Account - alias Algora.Payments.Transaction - alias Algora.Repo - alias Algora.Util - - require Logger + alias Algora.Payments @impl Oban.Worker - def perform(%Oban.Job{args: %{user_id: user_id, group_id: group_id}}) do - pending_amount = get_pending_amount(user_id) - - with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: :stripe, payouts_enabled: true), - true <- Money.positive?(pending_amount) do - initialize_and_execute_transfer(user_id, group_id, pending_amount, account) - else - _ -> :ok - end - end - - defp get_pending_amount(user_id) do - total_credits = - Repo.one( - from(t in Transaction, - where: t.user_id == ^user_id, - where: t.type == :credit, - where: t.status == :succeeded, - select: sum(t.net_amount) - ) - ) || Money.zero(:USD) - - total_transfers = - Repo.one( - from(t in Transaction, - where: t.user_id == ^user_id, - where: t.type == :transfer, - where: t.status == :succeeded or t.status == :processing or t.status == :initialized, - select: sum(t.net_amount) - ) - ) || Money.zero(:USD) - - Money.sub!(total_credits, total_transfers) - end - - defp initialize_and_execute_transfer(user_id, group_id, pending_amount, account) do - with {:ok, transaction} <- initialize_transfer(user_id, group_id, pending_amount), - {:ok, transfer} <- execute_transfer(transaction, account) do - {:ok, transfer} - else - error -> - Logger.error("Failed to execute transfer: #{inspect(error)}") - error - end - end - - defp initialize_transfer(user_id, group_id, pending_amount) do - %Transaction{} - |> change(%{ - provider: "stripe", - type: :credit, - status: :initialized, - user_id: user_id, - gross_amount: pending_amount, - net_amount: pending_amount, - total_fee: Money.zero(:USD), - group_id: group_id - }) - |> Algora.Validations.validate_positive(:gross_amount) - |> Algora.Validations.validate_positive(:net_amount) - |> foreign_key_constraint(:user_id) - |> Repo.insert() - end - - defp execute_transfer(transaction, account) do - # TODO: set other params - # TODO: provide idempotency key - case Stripe.Transfer.create(%{ - amount: MoneyUtils.to_minor_units(transaction.net_amount), - currency: to_string(transaction.net_amount.currency), - destination: account.stripe_account_id - }) do - {:ok, transfer} -> - # it's fine if this fails since we'll receive a webhook - _result = try_update_transaction(transaction, transfer) - {:ok, transfer} - - {:error, error} -> - {:error, error} - end - end - - defp try_update_transaction(transaction, transfer) do - transaction - |> change(%{ - status: if(transfer.status == :succeeded, do: :succeeded, else: :failed), - provider_id: transfer.id, - provider_meta: Util.normalize_struct(transfer) - }) - |> Repo.update() + def perform(%Oban.Job{args: %{user_id: user_id}}) do + Payments.execute_pending_transfers(user_id) end end diff --git a/lib/algora/payments/payments.ex b/lib/algora/payments/payments.ex index 96d167c75..4206381e4 100644 --- a/lib/algora/payments/payments.ex +++ b/lib/algora/payments/payments.ex @@ -1,9 +1,11 @@ defmodule Algora.Payments do @moduledoc false + import Ecto.Changeset import Ecto.Query alias Algora.Accounts alias Algora.Accounts.User + alias Algora.MoneyUtils alias Algora.Payments.Account alias Algora.Payments.Customer alias Algora.Payments.PaymentMethod @@ -248,4 +250,96 @@ defmodule Algora.Payments do Repo.delete(account) end end + + def execute_pending_transfers(user_id) do + pending_amount = get_pending_amount(user_id) + + with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: "stripe", payouts_enabled: true), + true <- Money.positive?(pending_amount) do + initialize_and_execute_transfer(user_id, pending_amount, account) + else + _ -> {:ok, nil} + end + end + + defp get_pending_amount(user_id) do + total_credits = + Repo.one( + from(t in Transaction, + where: t.user_id == ^user_id, + where: t.type == :credit, + where: t.status == :succeeded, + select: sum(t.net_amount) + ) + ) || Money.zero(:USD) + + total_transfers = + Repo.one( + from(t in Transaction, + where: t.user_id == ^user_id, + where: t.type == :transfer, + where: t.status == :succeeded or t.status == :processing or t.status == :initialized, + select: sum(t.net_amount) + ) + ) || Money.zero(:USD) + + Money.sub!(total_credits, total_transfers) + end + + defp initialize_and_execute_transfer(user_id, pending_amount, account) do + with {:ok, transaction} <- initialize_transfer(user_id, pending_amount), + {:ok, transfer} <- execute_transfer(transaction, account) do + {:ok, transfer} + else + error -> + Logger.error("Failed to execute transfer: #{inspect(error)}") + error + end + end + + defp initialize_transfer(user_id, pending_amount) do + %Transaction{} + |> change(%{ + id: Nanoid.generate(), + provider: "stripe", + type: :transfer, + status: :initialized, + user_id: user_id, + gross_amount: pending_amount, + net_amount: pending_amount, + total_fee: Money.zero(:USD) + }) + |> Algora.Validations.validate_positive(:gross_amount) + |> Algora.Validations.validate_positive(:net_amount) + |> foreign_key_constraint(:user_id) + |> Repo.insert() + end + + defp execute_transfer(transaction, account) do + # TODO: set other params + # TODO: provide idempotency key + case Algora.Stripe.create_transfer(%{ + amount: MoneyUtils.to_minor_units(transaction.net_amount), + currency: to_string(transaction.net_amount.currency), + destination: account.provider_id + }) do + {:ok, transfer} -> + # it's fine if this fails since we'll receive a webhook + _result = try_update_transaction(transaction, transfer) + {:ok, transfer} + + {:error, error} -> + {:error, error} + end + end + + defp try_update_transaction(transaction, transfer) do + transaction + |> change(%{ + status: if(transfer.status == :succeeded, do: :succeeded, else: :failed), + provider_id: transfer.id, + provider_meta: Util.normalize_struct(transfer) + }) + |> Repo.update() + end end diff --git a/lib/algora_web/controllers/webhooks/stripe_controller.ex b/lib/algora_web/controllers/webhooks/stripe_controller.ex index 0b816f178..1e80343b2 100644 --- a/lib/algora_web/controllers/webhooks/stripe_controller.ex +++ b/lib/algora_web/controllers/webhooks/stripe_controller.ex @@ -37,7 +37,7 @@ defmodule AlgoraWeb.Webhooks.StripeController do |> Enum.map(fn %{user_id: user_id} -> user_id end) |> Enum.uniq() |> Enum.reduce_while(:ok, fn user_id, :ok -> - case %{user_id: user_id, group_id: group_id} + case %{user_id: user_id} |> ExecutePendingTransfers.new() |> Oban.insert() do {:ok, _job} -> {:cont, :ok} From ea3b1d34496545b0e9ef89786cd73e1ef8b41a4e Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 20:33:17 +0300 Subject: [PATCH 05/11] add tests --- test/algora/payments_test.exs | 100 ++++++++++++++++++++++++++++++++++ test/support/factory.ex | 2 + 2 files changed, 102 insertions(+) create mode 100644 test/algora/payments_test.exs diff --git a/test/algora/payments_test.exs b/test/algora/payments_test.exs new file mode 100644 index 000000000..f5fbff1ee --- /dev/null +++ b/test/algora/payments_test.exs @@ -0,0 +1,100 @@ +defmodule Algora.PaymentsTest do + use Algora.DataCase + + import Mox + + alias Algora.Payments + alias Algora.Payments.Account + alias Algora.Payments.Transaction + alias Algora.Repo + + setup :verify_on_exit! + + describe "perform/1" do + setup do + user = insert(:user) + account = insert(:account, user: user) + + {:ok, user: user, account: account} + end + + test "executes transfer when there are pending credits", %{user: user} do + # Create a successful credit transaction + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1, :USD) + ) + + # Mock Stripe transfer creation + expect(Algora.StripeMock, :create_transfer, fn params -> + assert params.amount == 100 + assert params.currency == "USD" + + {:ok, + %{ + id: "tr_123", + status: :succeeded, + amount: 100, + currency: "USD" + }} + end) + + assert {:ok, _transfer} = Payments.execute_pending_transfers(user.id) + + # Verify transfer transaction was created + transfer_tx = Repo.get_by(Transaction, provider_id: "tr_123") + assert transfer_tx.status == :succeeded + assert transfer_tx.type == :transfer + assert Money.equal?(transfer_tx.net_amount, Money.new(1, :USD)) + end + + test "does nothing when user has no pending credits", %{user: user} do + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + + assert Repo.aggregate(Transaction, :count) == 0 + end + + test "does nothing when user has no stripe account", %{user: user} do + # Delete the account created in setup + Repo.delete_all(Account) + + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1000, :USD) + ) + + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + + # Verify no transfer was created + transfer_count = + Transaction + |> where([t], t.type == :transfer) + |> Repo.aggregate(:count) + + assert transfer_count == 0 + end + + test "handles failed stripe transfers", %{user: user} do + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1000, :USD) + ) + + expect(Algora.StripeMock, :create_transfer, fn _params -> + {:error, %{message: "Insufficient funds"}} + end) + + assert {:error, _} = Payments.execute_pending_transfers(user.id) + + # Verify transfer transaction status + transfer_tx = Repo.one(from t in Transaction, where: t.type == :transfer) + assert transfer_tx.status == :initialized + end + end +end diff --git a/test/support/factory.ex b/test/support/factory.ex index 4a6b71844..7386b9ecb 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -108,9 +108,11 @@ defmodule Algora.Factory do id: Nanoid.generate(), provider: "stripe", provider_id: sequence(:acct, &"acct_#{&1}"), + provider_meta: %{}, name: "Kevin 'The Carver'", details_submitted: true, charges_enabled: true, + payouts_enabled: true, service_agreement: "recipient", country: "US", type: :express, From eeeab32103b4243cdb0f886d060b17498181f60b Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 20:49:22 +0300 Subject: [PATCH 06/11] update tests --- lib/algora/payments/payments.ex | 22 +++++----- lib/algora/shared/money_utils.ex | 5 +++ test/algora/payments_test.exs | 72 +++++++++++++++++++------------- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/lib/algora/payments/payments.ex b/lib/algora/payments/payments.ex index 4206381e4..4d9dcde9b 100644 --- a/lib/algora/payments/payments.ex +++ b/lib/algora/payments/payments.ex @@ -320,26 +320,24 @@ defmodule Algora.Payments do # TODO: provide idempotency key case Algora.Stripe.create_transfer(%{ amount: MoneyUtils.to_minor_units(transaction.net_amount), - currency: to_string(transaction.net_amount.currency), + currency: MoneyUtils.to_stripe_currency(transaction.net_amount), destination: account.provider_id }) do {:ok, transfer} -> # it's fine if this fails since we'll receive a webhook - _result = try_update_transaction(transaction, transfer) + transaction + |> change(%{status: :succeeded, provider_id: transfer.id, provider_meta: Util.normalize_struct(transfer)}) + |> Repo.update() + {:ok, transfer} {:error, error} -> + # TODO: inconsistent state if this fails + transaction + |> change(%{status: :failed}) + |> Repo.update() + {:error, error} end end - - defp try_update_transaction(transaction, transfer) do - transaction - |> change(%{ - status: if(transfer.status == :succeeded, do: :succeeded, else: :failed), - provider_id: transfer.id, - provider_meta: Util.normalize_struct(transfer) - }) - |> Repo.update() - end end diff --git a/lib/algora/shared/money_utils.ex b/lib/algora/shared/money_utils.ex index 0666de989..4dd5aa4fe 100644 --- a/lib/algora/shared/money_utils.ex +++ b/lib/algora/shared/money_utils.ex @@ -17,6 +17,11 @@ defmodule Algora.MoneyUtils do amount_int end + @spec to_stripe_currency(Money.t()) :: String.t() + def to_stripe_currency(money) do + money.currency |> to_string() |> String.downcase() + end + # TODO: Find a way to make this obsolete # Why does ecto return {currency, amount} instead of Money.t()? def ensure_money_field(struct, field) do diff --git a/test/algora/payments_test.exs b/test/algora/payments_test.exs index f5fbff1ee..f8c7ae72a 100644 --- a/test/algora/payments_test.exs +++ b/test/algora/payments_test.exs @@ -10,7 +10,7 @@ defmodule Algora.PaymentsTest do setup :verify_on_exit! - describe "perform/1" do + describe "execute_pending_transfers/1" do setup do user = insert(:user) account = insert(:account, user: user) @@ -18,8 +18,7 @@ defmodule Algora.PaymentsTest do {:ok, user: user, account: account} end - test "executes transfer when there are pending credits", %{user: user} do - # Create a successful credit transaction + test "executes transfer when there are pending credits", %{user: user, account: account} do insert(:transaction, user: user, type: :credit, @@ -27,27 +26,39 @@ defmodule Algora.PaymentsTest do net_amount: Money.new(1, :USD) ) - # Mock Stripe transfer creation - expect(Algora.StripeMock, :create_transfer, fn params -> - assert params.amount == 100 - assert params.currency == "USD" + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(2, :USD) + ) + stripe_transfer_id = "tr_#{Nanoid.generate()}" + + expect(Algora.StripeMock, :create_transfer, fn params -> {:ok, %{ - id: "tr_123", - status: :succeeded, - amount: 100, - currency: "USD" + id: stripe_transfer_id, + amount: params.amount, + currency: params.currency, + destination: params.destination }} end) - assert {:ok, _transfer} = Payments.execute_pending_transfers(user.id) + assert {:ok, transfer} = Payments.execute_pending_transfers(user.id) + assert transfer.id == stripe_transfer_id + assert transfer.amount == 100 + 200 + assert transfer.currency == "usd" + assert transfer.destination == account.provider_id - # Verify transfer transaction was created - transfer_tx = Repo.get_by(Transaction, provider_id: "tr_123") + transfer_tx = Repo.get_by(Transaction, provider_id: stripe_transfer_id) assert transfer_tx.status == :succeeded assert transfer_tx.type == :transfer - assert Money.equal?(transfer_tx.net_amount, Money.new(1, :USD)) + assert transfer_tx.provider == "stripe" + assert transfer_tx.provider_meta["id"] == stripe_transfer_id + assert Money.equal?(transfer_tx.net_amount, Money.new(1 + 2, :USD)) + assert Money.equal?(transfer_tx.gross_amount, Money.new(1 + 2, :USD)) + assert Money.equal?(transfer_tx.total_fee, Money.new(0, :USD)) end test "does nothing when user has no pending credits", %{user: user} do @@ -56,26 +67,32 @@ defmodule Algora.PaymentsTest do assert Repo.aggregate(Transaction, :count) == 0 end - test "does nothing when user has no stripe account", %{user: user} do - # Delete the account created in setup - Repo.delete_all(Account) + test "does nothing when user has payouts disabled", %{user: user, account: account} do + account |> change(payouts_enabled: false) |> Repo.update() insert(:transaction, user: user, type: :credit, status: :succeeded, - net_amount: Money.new(1000, :USD) + net_amount: Money.new(1, :USD) ) assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + assert Transaction |> where([t], t.type == :transfer) |> Repo.aggregate(:count) == 0 + end - # Verify no transfer was created - transfer_count = - Transaction - |> where([t], t.type == :transfer) - |> Repo.aggregate(:count) + test "does nothing when user has no stripe account", %{user: user} do + Repo.delete_all(Account) - assert transfer_count == 0 + insert(:transaction, + user: user, + type: :credit, + status: :succeeded, + net_amount: Money.new(1, :USD) + ) + + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + assert Transaction |> where([t], t.type == :transfer) |> Repo.aggregate(:count) == 0 end test "handles failed stripe transfers", %{user: user} do @@ -83,7 +100,7 @@ defmodule Algora.PaymentsTest do user: user, type: :credit, status: :succeeded, - net_amount: Money.new(1000, :USD) + net_amount: Money.new(1, :USD) ) expect(Algora.StripeMock, :create_transfer, fn _params -> @@ -92,9 +109,8 @@ defmodule Algora.PaymentsTest do assert {:error, _} = Payments.execute_pending_transfers(user.id) - # Verify transfer transaction status transfer_tx = Repo.one(from t in Transaction, where: t.type == :transfer) - assert transfer_tx.status == :initialized + assert transfer_tx.status == :failed end end end From 6e667d8c54fd40dafdd9307bf84697e4be942c33 Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 20:52:26 +0300 Subject: [PATCH 07/11] update tests --- test/algora/payments_test.exs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/test/algora/payments_test.exs b/test/algora/payments_test.exs index f8c7ae72a..081a3137f 100644 --- a/test/algora/payments_test.exs +++ b/test/algora/payments_test.exs @@ -18,7 +18,7 @@ defmodule Algora.PaymentsTest do {:ok, user: user, account: account} end - test "executes transfer when there are pending credits", %{user: user, account: account} do + test "executes transfer when user has positive balance", %{user: user, account: account} do insert(:transaction, user: user, type: :credit, @@ -61,9 +61,20 @@ defmodule Algora.PaymentsTest do assert Money.equal?(transfer_tx.total_fee, Money.new(0, :USD)) end - test "does nothing when user has no pending credits", %{user: user} do + test "does nothing when user has positive unconfirmed balance", %{user: user} do + insert(:transaction, + user: user, + type: :credit, + status: :processing, + net_amount: Money.new(1, :USD) + ) + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) + assert Transaction |> where([t], t.type == :transfer) |> Repo.aggregate(:count) == 0 + end + test "does nothing when user has zero balance", %{user: user} do + assert {:ok, nil} = Payments.execute_pending_transfers(user.id) assert Repo.aggregate(Transaction, :count) == 0 end From 4736997501f6967c5f0d9c608ab838e26e8f8cc5 Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 21:16:19 +0300 Subject: [PATCH 08/11] minor fixes --- lib/algora/payments/jobs/execute_pending_transfers.ex | 2 +- lib/algora/payments/payments.ex | 1 + lib/algora_web/controllers/webhooks/stripe_controller.ex | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/algora/payments/jobs/execute_pending_transfers.ex b/lib/algora/payments/jobs/execute_pending_transfers.ex index f38fee414..2117e4e4d 100644 --- a/lib/algora/payments/jobs/execute_pending_transfers.ex +++ b/lib/algora/payments/jobs/execute_pending_transfers.ex @@ -7,7 +7,7 @@ defmodule Algora.Payments.Jobs.ExecutePendingTransfers do alias Algora.Payments @impl Oban.Worker - def perform(%Oban.Job{args: %{user_id: user_id}}) do + def perform(%Oban.Job{args: %{"user_id" => user_id}}) do Payments.execute_pending_transfers(user_id) end end diff --git a/lib/algora/payments/payments.ex b/lib/algora/payments/payments.ex index 4d9dcde9b..24981d3d4 100644 --- a/lib/algora/payments/payments.ex +++ b/lib/algora/payments/payments.ex @@ -289,6 +289,7 @@ defmodule Algora.Payments do defp initialize_and_execute_transfer(user_id, pending_amount, account) do with {:ok, transaction} <- initialize_transfer(user_id, pending_amount), {:ok, transfer} <- execute_transfer(transaction, account) do + broadcast() {:ok, transfer} else error -> diff --git a/lib/algora_web/controllers/webhooks/stripe_controller.ex b/lib/algora_web/controllers/webhooks/stripe_controller.ex index 1e80343b2..38b062754 100644 --- a/lib/algora_web/controllers/webhooks/stripe_controller.ex +++ b/lib/algora_web/controllers/webhooks/stripe_controller.ex @@ -48,6 +48,7 @@ defmodule AlgoraWeb.Webhooks.StripeController do with {count, _} when count > 0 <- update_result, :ok <- jobs_result do Payments.broadcast() + {:ok, nil} else {:error, reason} -> Logger.error("Failed to update transactions: #{inspect(reason)}") From 30622edafcad71c617b8055dfe0dac09a28f2f7f Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 21:16:38 +0300 Subject: [PATCH 09/11] add helper to set up test account --- lib/algora/admin/admin.ex | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/algora/admin/admin.ex b/lib/algora/admin/admin.ex index 84589ddea..d4cfe6959 100644 --- a/lib/algora/admin/admin.ex +++ b/lib/algora/admin/admin.ex @@ -2,6 +2,8 @@ defmodule Algora.Admin do @moduledoc false import Ecto.Query + alias Algora.Accounts.User + alias Algora.Payments alias Algora.Repo alias Algora.Workspace alias Algora.Workspace.Ticket @@ -72,6 +74,15 @@ defmodule Algora.Admin do |> Algora.Repo.update() end + def setup_test_account(user_handle) do + with account_id when is_binary(account_id) <- Algora.config([:stripe, :test_account_id]), + {:ok, user} <- Repo.fetch_by(User, handle: user_handle), + {:ok, acct} <- Payments.create_account(user, "US"), + {:ok, stripe_acct} <- Stripe.Account.retrieve(account_id) do + Payments.update_account(acct, stripe_acct) + end + end + defp update_tickets(url, repo_id) do Repo.update_all(from(t in Ticket, where: fragment("?->>'repository_url' = ?", t.provider_meta, ^url)), set: [repository_id: repo_id] From 1583e4cb949fc55a8312fe2de309d9c7fe8983fa Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 21:38:02 +0300 Subject: [PATCH 10/11] handle transfer.created webhook --- lib/algora/bounties/bounties.ex | 2 +- lib/algora/payments/payments.ex | 12 +++++- .../controllers/webhooks/stripe_controller.ex | 39 +++++++++++++++---- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/lib/algora/bounties/bounties.ex b/lib/algora/bounties/bounties.ex index a75200c4b..a8d69b112 100644 --- a/lib/algora/bounties/bounties.ex +++ b/lib/algora/bounties/bounties.ex @@ -556,7 +556,7 @@ defmodule Algora.Bounties do |> Enum.map(&LineItem.to_stripe/1) |> Payments.create_stripe_session(%{ description: description, - metadata: %{"version" => "2", "group_id" => tx_group_id} + metadata: %{"version" => Payments.metadata_version(), "group_id" => tx_group_id} }) do {:ok, session.url} end diff --git a/lib/algora/payments/payments.ex b/lib/algora/payments/payments.ex index 24981d3d4..0050566f4 100644 --- a/lib/algora/payments/payments.ex +++ b/lib/algora/payments/payments.ex @@ -16,6 +16,8 @@ defmodule Algora.Payments do require Logger + def metadata_version, do: "2" + def broadcast do Phoenix.PubSub.broadcast(Algora.PubSub, "payments:all", :payments_updated) end @@ -322,12 +324,18 @@ defmodule Algora.Payments do case Algora.Stripe.create_transfer(%{ amount: MoneyUtils.to_minor_units(transaction.net_amount), currency: MoneyUtils.to_stripe_currency(transaction.net_amount), - destination: account.provider_id + destination: account.provider_id, + metadata: %{"version" => metadata_version()} }) do {:ok, transfer} -> # it's fine if this fails since we'll receive a webhook transaction - |> change(%{status: :succeeded, provider_id: transfer.id, provider_meta: Util.normalize_struct(transfer)}) + |> change(%{ + status: :succeeded, + succeeded_at: DateTime.utc_now(), + provider_id: transfer.id, + provider_meta: Util.normalize_struct(transfer) + }) |> Repo.update() {:ok, transfer} diff --git a/lib/algora_web/controllers/webhooks/stripe_controller.ex b/lib/algora_web/controllers/webhooks/stripe_controller.ex index 38b062754..cacffd8a6 100644 --- a/lib/algora_web/controllers/webhooks/stripe_controller.ex +++ b/lib/algora_web/controllers/webhooks/stripe_controller.ex @@ -1,21 +1,23 @@ defmodule AlgoraWeb.Webhooks.StripeController do @behaviour Stripe.WebhookHandler + import Ecto.Changeset import Ecto.Query alias Algora.Payments alias Algora.Payments.Jobs.ExecutePendingTransfers alias Algora.Payments.Transaction alias Algora.Repo + alias Algora.Util require Logger - @metadata_version "2" + @metadata_version Payments.metadata_version() @impl true def handle_event(%Stripe.Event{ type: "charge.succeeded", - data: %{object: %{metadata: %{"version" => @metadata_version, "group_id" => group_id}}} + data: %{object: %Stripe.Charge{metadata: %{"version" => @metadata_version, "group_id" => group_id}}} }) when is_binary(group_id) do Repo.transact(fn -> @@ -62,11 +64,20 @@ defmodule AlgoraWeb.Webhooks.StripeController do end @impl true - def handle_event(%Stripe.Event{type: "transfer.created"} = event) do - # TODO: update transaction - # TODO: broadcast - # TODO: notify user - Logger.info("Stripe #{event.type} event: #{event.id}") + def handle_event(%Stripe.Event{ + type: "transfer.created", + data: %{object: %Stripe.Transfer{metadata: %{"version" => @metadata_version}} = transfer} + }) do + with {:ok, transaction} <- Repo.fetch_by(Transaction, provider: "stripe", provider_id: transfer.id), + {:ok, _transaction} <- maybe_update_transaction(transaction, transfer) do + # TODO: notify user + Payments.broadcast() + {:ok, nil} + else + error -> + Logger.error("Failed to update transaction: #{inspect(error)}") + {:error, :failed_to_update_transaction} + end end @impl true @@ -76,4 +87,18 @@ defmodule AlgoraWeb.Webhooks.StripeController do @impl true def handle_event(_event), do: :ok + + defp maybe_update_transaction(transaction, transfer) do + if transaction.status == :succeeded do + {:ok, transaction} + else + transaction + |> change(%{ + status: :succeeded, + succeeded_at: DateTime.utc_now(), + provider_meta: Util.normalize_struct(transfer) + }) + |> Repo.update() + end + end end From 5944e2ba9959db2967ee9b39d9fd87490a0cb7e4 Mon Sep 17 00:00:00 2001 From: zafer Date: Mon, 27 Jan 2025 21:41:54 +0300 Subject: [PATCH 11/11] satisfy dialyzer --- lib/algora/admin/admin.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/algora/admin/admin.ex b/lib/algora/admin/admin.ex index d4cfe6959..7d10dd8a0 100644 --- a/lib/algora/admin/admin.ex +++ b/lib/algora/admin/admin.ex @@ -78,7 +78,7 @@ defmodule Algora.Admin do with account_id when is_binary(account_id) <- Algora.config([:stripe, :test_account_id]), {:ok, user} <- Repo.fetch_by(User, handle: user_handle), {:ok, acct} <- Payments.create_account(user, "US"), - {:ok, stripe_acct} <- Stripe.Account.retrieve(account_id) do + {:ok, stripe_acct} <- Stripe.Account.retrieve(account_id, []) do Payments.update_account(acct, stripe_acct) end end