Skip to content

Commit 990bc9d

Browse files
author
Karl Ranna
authored
Merge pull request #80 from ExchangeUnion/fix/price-throttle-update
fix: price throttle update
2 parents fd905ec + c33b53b commit 990bc9d

File tree

13 files changed

+175
-28
lines changed

13 files changed

+175
-28
lines changed

src/arby.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { removeOpenDEXorders$ } from './opendex/remove-orders';
1515
import { getCleanup$, GetCleanupParams } from './trade/cleanup';
1616
import { getNewTrade$, GetTradeParams } from './trade/trade';
1717
import { getStartShutdown$ } from './utils';
18+
import { getArbyStore } from './store';
1819

1920
type StartArbyParams = {
2021
config$: Observable<Config>;
@@ -74,6 +75,7 @@ export const startArby = ({
7475
cleanup$,
7576
initBinance$,
7677
}: StartArbyParams): Observable<any> => {
78+
const store = getArbyStore();
7779
return config$.pipe(
7880
mergeMap(config => {
7981
const CEX$ = initBinance$({
@@ -95,6 +97,7 @@ export const startArby = ({
9597
catchOpenDEXerror,
9698
getCentralizedExchangePrice$,
9799
CEX,
100+
store,
98101
}).pipe(takeUntil(shutdown$));
99102
return concat(
100103
tradeComplete$,

src/centralized/order-builder.spec.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import { getOrderBuilder$, CEXorder } from './order-builder';
2-
import { TestScheduler } from 'rxjs/testing';
3-
import { testConfig, getLoggers } from '../test-utils';
4-
import { Observable } from 'rxjs';
5-
import { SwapSuccess } from '../proto/xudrpc_pb';
6-
import { OrderSide, Asset } from '../constants';
71
import BigNumber from 'bignumber.js';
2+
import { Observable } from 'rxjs';
3+
import { TestScheduler } from 'rxjs/testing';
84
import { Config } from '../config';
5+
import { Asset, OrderSide } from '../constants';
6+
import { SwapSuccess } from '../proto/xudrpc_pb';
7+
import { ArbyStore, getArbyStore } from '../store';
8+
import { getLoggers, testConfig } from '../test-utils';
9+
import { CEXorder, getOrderBuilder$ } from './order-builder';
910

1011
let testScheduler: TestScheduler;
1112

@@ -28,7 +29,8 @@ const assertOrderBuilder = (
2829
a: CEXorder;
2930
},
3031
config: Config,
31-
expectedAssetToTradeOnCEX: Asset
32+
expectedAssetToTradeOnCEX: Asset,
33+
store?: ArbyStore
3234
) => {
3335
testScheduler.run(helpers => {
3436
const { cold, expectObservable } = helpers;
@@ -59,6 +61,7 @@ const assertOrderBuilder = (
5961
accumulateOrderFillsForBaseAssetReceived: accumulateOrderFillsForAssetReceived,
6062
accumulateOrderFillsForQuoteAssetReceived: accumulateOrderFillsForAssetReceived,
6163
quantityAboveMinimum,
64+
store: store ? store : getArbyStore(),
6265
});
6366
expectObservable(orderBuilder$, inputEvents.unsubscribe).toBe(
6467
expected,
@@ -83,6 +86,7 @@ describe('getCentralizedExchangeOrder$', () => {
8386
});
8487

8588
it('accumulates buy and sell orders for ETHBTC', () => {
89+
expect.assertions(6);
8690
const inputEvents = {
8791
receivedBaseAssetSwapSuccess$: '1s a',
8892
receivedQuoteAssetSwapSuccess$: '1400ms b',
@@ -117,17 +121,24 @@ describe('getCentralizedExchangeOrder$', () => {
117121
QUOTEASSET,
118122
};
119123
const expectedAssetToTradeOnCEX = BASEASSET;
124+
const store = {
125+
...getArbyStore(),
126+
...{ resetLastOrderUpdatePrice: jest.fn() },
127+
};
120128
assertOrderBuilder(
121129
inputEvents,
122130
inputValues,
123131
expected,
124132
expectedValues,
125133
config,
126-
expectedAssetToTradeOnCEX
134+
expectedAssetToTradeOnCEX,
135+
store
127136
);
137+
expect(store.resetLastOrderUpdatePrice).toHaveBeenCalledTimes(2);
128138
});
129139

130140
it('accumulates buy and sell orders for BTCUSDT', () => {
141+
expect.assertions(6);
131142
const inputEvents = {
132143
receivedBaseAssetSwapSuccess$: '1s a',
133144
receivedQuoteAssetSwapSuccess$: '1400ms b',
@@ -162,13 +173,19 @@ describe('getCentralizedExchangeOrder$', () => {
162173
QUOTEASSET,
163174
};
164175
const expectedAssetToTradeOnCEX = QUOTEASSET;
176+
const store = {
177+
...getArbyStore(),
178+
...{ resetLastOrderUpdatePrice: jest.fn() },
179+
};
165180
assertOrderBuilder(
166181
inputEvents,
167182
inputValues,
168183
expected,
169184
expectedValues,
170185
config,
171-
expectedAssetToTradeOnCEX
186+
expectedAssetToTradeOnCEX,
187+
store
172188
);
189+
expect(store.resetLastOrderUpdatePrice).toHaveBeenCalledTimes(2);
173190
});
174191
});

src/centralized/order-builder.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import BigNumber from 'bignumber.js';
2-
import { merge, Observable } from 'rxjs';
3-
import { filter, map, repeat, take, tap } from 'rxjs/operators';
2+
import { merge, Observable, of } from 'rxjs';
3+
import { filter, map, repeat, take, tap, mergeMap } from 'rxjs/operators';
44
import { Config } from '../config';
55
import { OrderSide, Asset } from '../constants';
66
import { Logger } from '../logger';
@@ -11,6 +11,7 @@ import {
1111
import { getXudClient$ } from '../opendex/xud/client';
1212
import { subscribeXudSwaps$ } from '../opendex/xud/subscribe-swaps';
1313
import { SwapSuccess } from '../proto/xudrpc_pb';
14+
import { ArbyStore } from '../store';
1415

1516
type GetOrderBuilderParams = {
1617
config: Config;
@@ -29,6 +30,7 @@ type GetOrderBuilderParams = {
2930
quantityAboveMinimum: (
3031
asset: Asset
3132
) => (filledQuantity: BigNumber) => boolean;
33+
store: ArbyStore;
3234
};
3335

3436
type CEXorder = {
@@ -43,6 +45,7 @@ const getOrderBuilder$ = ({
4345
accumulateOrderFillsForBaseAssetReceived,
4446
accumulateOrderFillsForQuoteAssetReceived,
4547
quantityAboveMinimum,
48+
store,
4649
}: GetOrderBuilderParams): Observable<CEXorder> => {
4750
const {
4851
receivedBaseAssetSwapSuccess$,
@@ -58,10 +61,12 @@ const getOrderBuilder$ = ({
5861
// accumulate OpenDEX order fills when receiving
5962
// quote asset
6063
accumulateOrderFillsForQuoteAssetReceived(config),
61-
tap((quantity: BigNumber) => {
64+
mergeMap((quantity: BigNumber) => {
6265
logger.info(
6366
`Swap success. Accumulated ${assetToTradeOnCEX} quantity: ${quantity.toFixed()}`
6467
);
68+
store.resetLastOrderUpdatePrice();
69+
return of(quantity);
6570
}),
6671
// filter based on minimum CEX order quantity
6772
filter(quantityAboveMinimum(assetToTradeOnCEX)),

src/centralized/order.spec.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getCentralizedExchangeOrder$ } from './order';
55
import { CEXorder } from './order-builder';
66
import BigNumber from 'bignumber.js';
77
import { Exchange } from 'ccxt';
8+
import { getArbyStore } from '../store';
89

910
let testScheduler: TestScheduler;
1011

@@ -35,6 +36,7 @@ const assertCentralizedExchangeOrder = (
3536
) as unknown) as Observable<BigNumber>;
3637
const CEX = (null as unknown) as Exchange;
3738
const deriveCEXorderQuantity = (order: any) => order;
39+
const store = getArbyStore();
3840
const centralizedExchangeOrder$ = getCentralizedExchangeOrder$({
3941
CEX,
4042
logger: getLoggers().centralized,
@@ -43,6 +45,7 @@ const assertCentralizedExchangeOrder = (
4345
executeCEXorder$,
4446
centralizedExchangePrice$,
4547
deriveCEXorderQuantity,
48+
store,
4649
});
4750
expectObservable(centralizedExchangeOrder$, inputEvents.unsubscribe).toBe(
4851
expected

src/centralized/order.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { createOrder$ } from './ccxt/create-order';
1818
import { ExecuteCEXorderParams } from './execute-order';
1919
import { quantityAboveMinimum } from './minimum-order-quantity-filter';
2020
import { CEXorder, GetOrderBuilderParams } from './order-builder';
21+
import { ArbyStore } from 'src/store';
2122

2223
type GetCentralizedExchangeOrderParams = {
2324
CEX: Exchange;
@@ -41,6 +42,7 @@ type GetCentralizedExchangeOrderParams = {
4142
price: BigNumber,
4243
config: Config
4344
) => CEXorder;
45+
store: ArbyStore;
4446
};
4547

4648
const getCentralizedExchangeOrder$ = ({
@@ -51,6 +53,7 @@ const getCentralizedExchangeOrder$ = ({
5153
getOrderBuilder$,
5254
centralizedExchangePrice$,
5355
deriveCEXorderQuantity,
56+
store,
5457
}: GetCentralizedExchangeOrderParams): Observable<null> => {
5558
return getOrderBuilder$({
5659
config,
@@ -59,6 +62,7 @@ const getCentralizedExchangeOrder$ = ({
5962
accumulateOrderFillsForBaseAssetReceived,
6063
accumulateOrderFillsForQuoteAssetReceived,
6164
quantityAboveMinimum,
65+
store,
6266
}).pipe(
6367
withLatestFrom(
6468
centralizedExchangePrice$.pipe(

src/opendex/catch-error.spec.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
import { status } from '@grpc/grpc-js';
2+
import { AuthenticationError, Exchange } from 'ccxt';
13
import { TestScheduler } from 'rxjs/testing';
24
import { errors } from '../opendex/errors';
3-
import { getLoggers, TestError, testConfig } from '../test-utils';
5+
import { ArbyStore, getArbyStore } from '../store';
6+
import { getLoggers, testConfig, TestError } from '../test-utils';
47
import { catchOpenDEXerror } from './catch-error';
5-
import { status } from '@grpc/grpc-js';
6-
import { AuthenticationError, Exchange } from 'ccxt';
7-
88
let testScheduler: TestScheduler;
99
const testSchedulerSetup = () => {
1010
testScheduler = new TestScheduler((actual, expected) => {
@@ -22,6 +22,7 @@ type AssertCatchOpenDEXerrorParams = {
2222
};
2323
expectedError?: TestError;
2424
unsubscribe: string;
25+
store?: ArbyStore;
2526
};
2627

2728
const assertCatchOpenDEXerror = ({
@@ -31,6 +32,7 @@ const assertCatchOpenDEXerror = ({
3132
inputError,
3233
expectedError,
3334
unsubscribe,
35+
store,
3436
}: AssertCatchOpenDEXerrorParams) => {
3537
testScheduler.run(helpers => {
3638
const { cold, expectObservable, expectSubscriptions } = helpers;
@@ -43,7 +45,8 @@ const assertCatchOpenDEXerror = ({
4345
getLoggers(),
4446
config,
4547
getCleanup$,
46-
CEX
48+
CEX,
49+
store ? store : getArbyStore()
4750
)(input$);
4851
expectObservable(output$, unsubscribe).toBe(
4952
expected,
@@ -186,23 +189,30 @@ describe('catchOpenDEXerror', () => {
186189
});
187190
});
188191

189-
it('cancels orders, retries CENTRALIZED_EXCHANGE_PRICE_FEED_ERROR', () => {
190-
expect.assertions(ASSERTIONS_PER_TEST);
192+
it('cancels orders, updates store lastPriceUpdate, retries CENTRALIZED_EXCHANGE_PRICE_FEED_ERROR', () => {
193+
// 1 extra assertion after assertCatchOpenDEXerror
194+
expect.assertions(ASSERTIONS_PER_TEST + 1);
191195
const inputEvents = '1s #';
192196
const inputError = errors.CENTRALIZED_EXCHANGE_PRICE_FEED_ERROR;
193197
const expected = '';
194198
const expectedSubscriptions = {
195199
input$: ['^ 999ms !', '7001ms ^ 999ms !'],
196200
cleanup$: ['1s ^ 1s !', '8001ms ^ 1s !'],
197201
};
202+
const store = {
203+
...getArbyStore(),
204+
...{ resetLastOrderUpdatePrice: jest.fn() },
205+
};
198206
const unsubscribe = '10s !';
199207
assertCatchOpenDEXerror({
200208
inputEvents,
201209
inputError,
202210
expected,
203211
unsubscribe,
204212
expectedSubscriptions,
213+
store,
205214
});
215+
expect(store.resetLastOrderUpdatePrice).toHaveBeenCalledTimes(2);
206216
});
207217

208218
it('retries recoverable gRPC errors', () => {

src/opendex/catch-error.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { status } from '@grpc/grpc-js';
22
import { AuthenticationError, Exchange } from 'ccxt';
33
import { concat, Observable, throwError, timer } from 'rxjs';
44
import { ignoreElements, mergeMap, retryWhen } from 'rxjs/operators';
5+
import { ArbyStore } from 'src/store';
56
import { removeCEXorders$ } from '../centralized/remove-orders';
67
import { Config } from '../config';
78
import { MAX_RETRY_ATTEMPS, RETRY_INTERVAL } from '../constants';
@@ -19,7 +20,8 @@ const catchOpenDEXerror = (
1920
removeOpenDEXorders$,
2021
removeCEXorders$,
2122
}: GetCleanupParams) => Observable<unknown>,
22-
CEX: Exchange
23+
CEX: Exchange,
24+
store: ArbyStore
2325
) => {
2426
return (source: Observable<any>) => {
2527
return source.pipe(
@@ -63,6 +65,7 @@ const catchOpenDEXerror = (
6365
e.code === errorCodes.CENTRALIZED_EXCHANGE_PRICE_FEED_ERROR
6466
) {
6567
logMessage(loggers.centralized);
68+
store.resetLastOrderUpdatePrice();
6669
return concat(
6770
getCleanup$({
6871
config,

src/opendex/complete.spec.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import BigNumber from 'bignumber.js';
22
import { Exchange } from 'ccxt';
33
import { Observable } from 'rxjs';
44
import { TestScheduler } from 'rxjs/testing';
5+
import { getArbyStore } from '../store';
56
import { getLoggers, testConfig } from '../test-utils';
67
import { TradeInfo } from '../trade/info';
78
import { getOpenDEXcomplete$ } from './complete';
@@ -39,13 +40,15 @@ const assertGetOpenDEXcomplete = (
3940
BigNumber
4041
>;
4142
const CEX = (null as unknown) as Exchange;
43+
const store = getArbyStore();
4244
const trade$ = getOpenDEXcomplete$({
4345
CEX,
4446
config: testConfig(),
4547
loggers: getLoggers(),
4648
tradeInfo$: getTradeInfo$,
4749
createOpenDEXorders$,
4850
centralizedExchangePrice$,
51+
store,
4952
});
5053
expectObservable(trade$).toBe(expected, { a: true });
5154
});

src/opendex/complete.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { BigNumber } from 'bignumber.js';
22
import { Exchange } from 'ccxt';
3-
import { BehaviorSubject, empty, Observable, of } from 'rxjs';
3+
import { empty, Observable, of } from 'rxjs';
44
import { exhaustMap, mergeMap, take } from 'rxjs/operators';
55
import { getCentralizedExchangeAssets$ } from '../centralized/assets';
66
import { Config } from '../config';
77
import { Loggers } from '../logger';
8+
import { ArbyStore } from '../store';
89
import {
910
GetTradeInfoParams,
1011
TradeInfo,
@@ -39,6 +40,7 @@ type GetOpenDEXcompleteParams = {
3940
createXudOrder$,
4041
}: CreateOpenDEXordersParams) => Observable<boolean>;
4142
centralizedExchangePrice$: Observable<BigNumber>;
43+
store: ArbyStore;
4244
};
4345

4446
const getOpenDEXcomplete$ = ({
@@ -48,6 +50,7 @@ const getOpenDEXcomplete$ = ({
4850
tradeInfo$,
4951
createOpenDEXorders$,
5052
centralizedExchangePrice$,
53+
store,
5154
}: GetOpenDEXcompleteParams): Observable<boolean> => {
5255
const openDEXassetsWithConfig = (config: Config) => {
5356
return getOpenDEXassets$({
@@ -60,7 +63,6 @@ const getOpenDEXcomplete$ = ({
6063
xudTradingLimits$: getXudTradingLimits$,
6164
});
6265
};
63-
const lastPriceUpdateStore = new BehaviorSubject(new BigNumber('0'));
6466
return tradeInfo$({
6567
config,
6668
loggers,
@@ -74,7 +76,7 @@ const getOpenDEXcomplete$ = ({
7476
// is already in progress
7577
exhaustMap((tradeInfo: TradeInfo) => {
7678
const getTradeInfo = () => tradeInfo;
77-
return lastPriceUpdateStore.pipe(
79+
return store.selectState('lastOrderUpdatePrice').pipe(
7880
take(1),
7981
mergeMap((lastPriceUpdate: BigNumber) => {
8082
if (shouldCreateOpenDEXorders(tradeInfo.price, lastPriceUpdate)) {
@@ -89,7 +91,7 @@ const getOpenDEXcomplete$ = ({
8991
}).pipe(
9092
mergeMap(() => {
9193
// store the last price update
92-
lastPriceUpdateStore.next(tradeInfo.price);
94+
store.updateLastOrderUpdatePrice(tradeInfo.price);
9395
return of(true);
9496
})
9597
);

0 commit comments

Comments
 (0)