@@ -8,6 +8,7 @@ defmodule Algora.Payments do
8
8
alias Algora.MoneyUtils
9
9
alias Algora.Payments.Account
10
10
alias Algora.Payments.Customer
11
+ alias Algora.Payments.Jobs
11
12
alias Algora.Payments.PaymentMethod
12
13
alias Algora.Payments.Transaction
13
14
alias Algora.Repo
@@ -319,86 +320,131 @@ defmodule Algora.Payments do
319
320
end
320
321
end
321
322
322
- @ spec execute_pending_transfers ( user_id :: String . t ( ) ) :: { :ok , Stripe.Transfer . t ( ) } | { :error , :not_found }
323
- def execute_pending_transfers ( user_id ) do
324
- pending_amount = get_pending_amount ( user_id )
323
+ @ spec execute_pending_transfer ( credit_id :: String . t ( ) ) ::
324
+ { :ok , Stripe.Transfer . t ( ) } | { :error , :not_found } | { :error , :duplicate_transfer_attempt }
325
+ def execute_pending_transfer ( credit_id ) do
326
+ with { :ok , credit } <- Repo . fetch_by ( Transaction , id: credit_id , type: :credit , status: :succeeded ) do
327
+ transfers =
328
+ Repo . all (
329
+ from ( t in Transaction ,
330
+ where: t . user_id == ^ credit . user_id ,
331
+ where: t . group_id == ^ credit . group_id ,
332
+ where: t . type == :transfer ,
333
+ where: t . status in [ :initialized , :processing , :succeeded ]
334
+ )
335
+ )
325
336
326
- with { :ok , account } <- fetch_active_account ( user_id ) ,
327
- true <- Money . positive? ( pending_amount ) do
328
- initialize_and_execute_transfer ( user_id , pending_amount , account )
329
- else
330
- _ -> { :ok , nil }
337
+ amount_transferred = Enum . reduce ( transfers , Money . zero ( :USD ) , fn t , acc -> Money . add! ( acc , t . net_amount ) end )
338
+
339
+ if Money . positive? ( amount_transferred ) do
340
+ Logger . error ( "Duplicate transfer attempt at transaction #{ credit_id } " )
341
+ { :error , :duplicate_transfer_attempt }
342
+ else
343
+ initialize_and_execute_transfer ( credit )
344
+ end
331
345
end
332
346
end
333
347
334
- @ spec fetch_active_account ( user_id :: String . t ( ) ) :: { :ok , Account . t ( ) } | { :error , :not_found }
335
- def fetch_active_account ( user_id ) do
336
- Repo . fetch_by ( Account , user_id: user_id , provider: "stripe" , payouts_enabled: true )
348
+ def list_payable_credits ( user_id ) do
349
+ Repo . all (
350
+ from ( cr in Transaction ,
351
+ left_join: tr in Transaction ,
352
+ on:
353
+ tr . user_id == cr . user_id and tr . group_id == cr . group_id and tr . type == :transfer and
354
+ tr . status in [ :initialized , :processing , :succeeded ] ,
355
+ where: cr . user_id == ^ user_id ,
356
+ where: cr . type == :credit ,
357
+ where: cr . status == :succeeded ,
358
+ where: is_nil ( tr . id )
359
+ )
360
+ )
337
361
end
338
362
339
- defp get_pending_amount ( user_id ) do
340
- total_credits =
341
- Repo . one (
342
- from ( t in Transaction ,
343
- where: t . user_id == ^ user_id ,
344
- where: t . type == :credit ,
345
- where: t . status == :succeeded ,
346
- select: sum ( t . net_amount )
347
- )
348
- ) || Money . zero ( :USD )
349
-
350
- total_transfers =
351
- Repo . one (
352
- from ( t in Transaction ,
353
- where: t . user_id == ^ user_id ,
354
- where: t . type == :transfer ,
355
- where: t . status == :succeeded or t . status == :processing or t . status == :initialized ,
356
- select: sum ( t . net_amount )
357
- )
358
- ) || Money . zero ( :USD )
363
+ @ spec enqueue_pending_transfers ( user_id :: String . t ( ) ) :: { :ok , nil } | { :error , term ( ) }
364
+ def enqueue_pending_transfers ( user_id ) do
365
+ Repo . transact ( fn ->
366
+ with { :ok , _account } <- fetch_active_account ( user_id ) ,
367
+ credits = list_payable_credits ( user_id ) ,
368
+ :ok <-
369
+ Enum . reduce_while ( credits , :ok , fn credit , :ok ->
370
+ case % { credit_id: credit . id }
371
+ |> Jobs.ExecutePendingTransfer . new ( )
372
+ |> Oban . insert ( ) do
373
+ { :ok , _job } -> { :cont , :ok }
374
+ error -> { :halt , error }
375
+ end
376
+ end ) do
377
+ { :ok , nil }
378
+ else
379
+ { :error , reason } ->
380
+ Logger . error ( "Failed to execute pending transfers: #{ inspect ( reason ) } " )
381
+ { :error , reason }
382
+ end
383
+ end )
384
+ end
359
385
360
- Money . sub! ( total_credits , total_transfers )
386
+ @ spec fetch_active_account ( user_id :: String . t ( ) ) :: { :ok , Account . t ( ) } | { :error , :no_active_account }
387
+ def fetch_active_account ( user_id ) do
388
+ case Repo . fetch_by ( Account , user_id: user_id , provider: "stripe" , payouts_enabled: true ) do
389
+ { :ok , account } -> { :ok , account }
390
+ { :error , :not_found } -> { :error , :no_active_account }
391
+ end
361
392
end
362
393
363
- defp initialize_and_execute_transfer ( user_id , pending_amount , account ) do
364
- with { :ok , transaction } <- initialize_transfer ( user_id , pending_amount ) ,
365
- { :ok , transfer } <- execute_transfer ( transaction , account ) do
366
- broadcast ( )
367
- { :ok , transfer }
368
- else
369
- error ->
370
- Logger . error ( "Failed to execute transfer: #{ inspect ( error ) } " )
371
- error
394
+ @ spec initialize_and_execute_transfer ( credit :: Transaction . t ( ) ) :: { :ok , Stripe.Transfer . t ( ) } | { :error , term ( ) }
395
+ defp initialize_and_execute_transfer ( % Transaction { } = credit ) do
396
+ case fetch_active_account ( credit . user_id ) do
397
+ { :ok , account } ->
398
+ with { :ok , transaction } <- initialize_transfer ( credit ) ,
399
+ { :ok , transfer } <- execute_transfer ( transaction , account ) do
400
+ broadcast ( )
401
+ { :ok , transfer }
402
+ else
403
+ error ->
404
+ Logger . error ( "Failed to execute transfer: #{ inspect ( error ) } " )
405
+ error
406
+ end
407
+
408
+ _ ->
409
+ Logger . error ( "Attempted to execute transfer to inactive account" )
410
+ { :error , :no_active_account }
372
411
end
373
412
end
374
413
375
- defp initialize_transfer ( user_id , pending_amount ) do
414
+ defp initialize_transfer ( % Transaction { } = credit ) do
376
415
% Transaction { }
377
416
|> change ( % {
378
417
id: Nanoid . generate ( ) ,
379
418
provider: "stripe" ,
380
419
type: :transfer ,
381
420
status: :initialized ,
382
- user_id: user_id ,
383
- gross_amount: pending_amount ,
384
- net_amount: pending_amount ,
385
- total_fee: Money . zero ( :USD )
421
+ user_id: credit . user_id ,
422
+ gross_amount: credit . net_amount ,
423
+ net_amount: credit . net_amount ,
424
+ total_fee: Money . zero ( :USD ) ,
425
+ group_id: credit . group_id
386
426
} )
387
427
|> Algora.Validations . validate_positive ( :gross_amount )
388
428
|> Algora.Validations . validate_positive ( :net_amount )
389
429
|> foreign_key_constraint ( :user_id )
390
430
|> Repo . insert ( )
391
431
end
392
432
393
- defp execute_transfer ( transaction , account ) do
394
- # TODO: set other params
433
+ defp execute_transfer ( % Transaction { } = transaction , account ) do
434
+ charge = Repo . get_by ( Transaction , type: :credit , status: :succeeded , group_id: transaction . group_id )
435
+
436
+ transfer_params =
437
+ % {
438
+ amount: MoneyUtils . to_minor_units ( transaction . net_amount ) ,
439
+ currency: MoneyUtils . to_stripe_currency ( transaction . net_amount ) ,
440
+ destination: account . provider_id ,
441
+ metadata: % { "version" => metadata_version ( ) }
442
+ }
443
+ |> Map . merge ( if transaction . group_id , do: % { transfer_group: transaction . group_id } , else: % { } )
444
+ |> Map . merge ( if charge && charge . provider_id , do: % { source_transaction: charge . provider_id } , else: % { } )
445
+
395
446
# TODO: provide idempotency key
396
- case Algora.Stripe.Transfer . create ( % {
397
- amount: MoneyUtils . to_minor_units ( transaction . net_amount ) ,
398
- currency: MoneyUtils . to_stripe_currency ( transaction . net_amount ) ,
399
- destination: account . provider_id ,
400
- metadata: % { "version" => metadata_version ( ) }
401
- } ) do
447
+ case Algora.Stripe.Transfer . create ( transfer_params ) do
402
448
{ :ok , transfer } ->
403
449
# it's fine if this fails since we'll receive a webhook
404
450
transaction
0 commit comments