1
1
import BigNumber from 'bignumber.js' ;
2
- import { merge , Observable , of } from 'rxjs' ;
3
- import { filter , map , mergeMap , repeat , take } from 'rxjs/operators' ;
2
+ import { curry } from 'ramda' ;
3
+ import { empty , merge , Observable , of } from 'rxjs' ;
4
+ import { map , mergeMap , repeat , take } from 'rxjs/operators' ;
4
5
import { Config } from '../config' ;
5
6
import { OrderSide } from '../constants' ;
6
7
import { Logger } from '../logger' ;
@@ -27,9 +28,7 @@ type GetOrderBuilderParams = {
27
28
accumulateOrderFillsForQuoteAssetReceived : (
28
29
config : Config
29
30
) => ( source : Observable < SwapSuccess > ) => Observable < BigNumber > ;
30
- quantityAboveMinimum : (
31
- asset : string
32
- ) => ( filledQuantity : BigNumber ) => boolean ;
31
+ minimumQuantity$ : Observable < BigNumber > ;
33
32
store : ArbyStore ;
34
33
} ;
35
34
@@ -38,13 +37,40 @@ type CEXorder = {
38
37
side : OrderSide ;
39
38
} ;
40
39
40
+ const quantityAboveMinimum = curry (
41
+ (
42
+ store : ArbyStore ,
43
+ logger : Logger ,
44
+ assetToTradeOnCEX : string ,
45
+ minimumQuantity$ : Observable < BigNumber > ,
46
+ quantity : BigNumber
47
+ ) => {
48
+ logger . info (
49
+ `Swap success. Accumulated ${ assetToTradeOnCEX } quantity: ${ quantity . toFixed ( ) } `
50
+ ) ;
51
+ store . resetLastOrderUpdatePrice ( ) ;
52
+ return minimumQuantity$ . pipe (
53
+ take ( 1 ) ,
54
+ mergeMap ( minimumQuantity => {
55
+ if ( quantity . isGreaterThanOrEqualTo ( minimumQuantity ) ) {
56
+ return of ( quantity ) ;
57
+ }
58
+ logger . info (
59
+ `Will not execute CEX order because ${ quantity . toFixed ( ) } is below the minimum allowed CEX quantity ${ minimumQuantity . toFixed ( ) } `
60
+ ) ;
61
+ return empty ( ) ;
62
+ } )
63
+ ) ;
64
+ }
65
+ ) ;
66
+
41
67
const getOrderBuilder$ = ( {
42
68
config,
43
69
logger,
44
70
getOpenDEXswapSuccess$,
45
71
accumulateOrderFillsForBaseAssetReceived,
46
72
accumulateOrderFillsForQuoteAssetReceived,
47
- quantityAboveMinimum ,
73
+ minimumQuantity$ ,
48
74
store,
49
75
} : GetOrderBuilderParams ) : Observable < CEXorder > => {
50
76
const {
@@ -59,19 +85,17 @@ const getOrderBuilder$ = ({
59
85
config . CEX_QUOTEASSET === 'BTC'
60
86
? config . CEX_BASEASSET
61
87
: config . CEX_QUOTEASSET ;
88
+ const filterMinimum = quantityAboveMinimum (
89
+ store ,
90
+ logger ,
91
+ assetToTradeOnCEX ,
92
+ minimumQuantity$
93
+ ) ;
62
94
const receivedQuoteAssetOrder$ = receivedQuoteAssetSwapSuccess$ . pipe (
63
95
// accumulate OpenDEX order fills when receiving
64
96
// quote asset
65
97
accumulateOrderFillsForQuoteAssetReceived ( config ) ,
66
- mergeMap ( ( quantity : BigNumber ) => {
67
- logger . info (
68
- `Swap success. Accumulated ${ assetToTradeOnCEX } quantity: ${ quantity . toFixed ( ) } `
69
- ) ;
70
- store . resetLastOrderUpdatePrice ( ) ;
71
- return of ( quantity ) ;
72
- } ) ,
73
- // filter based on minimum CEX order quantity
74
- filter ( quantityAboveMinimum ( assetToTradeOnCEX ) ) ,
98
+ mergeMap ( filterMinimum ) ,
75
99
map ( quantity => {
76
100
return { quantity, side : OrderSide . BUY } ;
77
101
} ) ,
@@ -84,15 +108,7 @@ const getOrderBuilder$ = ({
84
108
// accumulate OpenDEX order fills when receiving
85
109
// quote asset
86
110
accumulateOrderFillsForBaseAssetReceived ( config ) ,
87
- mergeMap ( ( quantity : BigNumber ) => {
88
- logger . info (
89
- `Swap success. Accumulated ${ assetToTradeOnCEX } quantity: ${ quantity . toFixed ( ) } `
90
- ) ;
91
- store . resetLastOrderUpdatePrice ( ) ;
92
- return of ( quantity ) ;
93
- } ) ,
94
- // filter based on minimum CEX order quantity
95
- filter ( quantityAboveMinimum ( assetToTradeOnCEX ) ) ,
111
+ mergeMap ( filterMinimum ) ,
96
112
map ( quantity => {
97
113
return { quantity, side : OrderSide . SELL } ;
98
114
} ) ,
0 commit comments