Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ CEX_BASEASSET=BTC
CEX_QUOTEASSET=USDT
TEST_CENTRALIZED_EXCHANGE_BASEASSET_BALANCE=10
TEST_CENTRALIZED_EXCHANGE_QUOTEASSET_BALANCE=100000
LIVE_CEX=false
TEST_MODE=true
12,137 changes: 12,041 additions & 96 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
"moment": "2.26.0",
"ramda": "0.27.0",
"rxjs": "6.5.5",
"sequelize": "^6.3.5",
"sqlite3": "^5.0.0",
"uuid": "8.1.0",
"winston": "3.2.1",
"ws": "7.3.0"
Expand Down
20 changes: 16 additions & 4 deletions src/arby.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { startArby } from '../src/arby';
import { Config } from '../src/config';
import { InitCEXResponse } from './centralized/ccxt/init';
import { getLoggers } from './test-utils';
import { InitDBResponse } from './db/db';

let testScheduler: TestScheduler;

Expand All @@ -15,6 +16,7 @@ type AssertStartArbyParams = {
shutdown$: string;
cleanup$: string;
initCEX$: string;
initDB$: string;
};
verifyMarkets?: () => boolean;
};
Expand All @@ -39,13 +41,19 @@ const assertStartArby = ({
InitCEXResponse
>;
};
const initDB$ = () => {
return (cold(inputEvents.initDB$) as unknown) as Observable<
InitDBResponse
>;
};
const arby$ = startArby({
config$,
getLoggers,
shutdown$,
trade$: getTrade$,
cleanup$,
initCEX$,
initDB$,
verifyMarkets: verifyMarkets ? verifyMarkets : () => true,
});
expectObservable(arby$).toBe(expected, undefined, { message: 'error' });
Expand All @@ -63,11 +71,12 @@ describe('startArby', () => {
const inputEvents = {
config$: '1000ms a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: 'b',
shutdown$: '',
cleanup$: '',
};
const expected = '2s b';
const expected = '3s b';
assertStartArby({
inputEvents,
expected,
Expand All @@ -78,11 +87,12 @@ describe('startArby', () => {
const inputEvents = {
config$: '1000ms a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: 'b',
shutdown$: '',
cleanup$: '',
};
const expected = '2s #';
const expected = '3s #';
assertStartArby({
inputEvents,
expected,
Expand All @@ -96,11 +106,12 @@ describe('startArby', () => {
const inputEvents = {
config$: 'a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: '500ms b',
shutdown$: '10s c',
cleanup$: '2s a',
};
const expected = '1500ms b 11499ms a';
const expected = '2500ms b 11499ms a';
assertStartArby({
inputEvents,
expected,
Expand All @@ -111,11 +122,12 @@ describe('startArby', () => {
const inputEvents = {
config$: 'a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: '500ms #',
shutdown$: '10s c',
cleanup$: '2s a',
};
const expected = '3500ms a';
const expected = '4500ms a';
assertStartArby({
inputEvents,
expected,
Expand Down
96 changes: 58 additions & 38 deletions src/arby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import { getNewTrade$, GetTradeParams } from './trade/trade';
import { getStartShutdown$ } from './utils';
import { Dictionary, Market } from 'ccxt';
import { verifyMarkets } from './centralized/verify-markets';
import { initDB$, InitDBparams, InitDBResponse, closeDB$ } from './db/db';
import { OrderRepository } from './db/order-repository';

type StartArbyParams = {
config$: Observable<Config>;
Expand All @@ -43,6 +45,10 @@ type StartArbyParams = {
loadMarkets$,
}: InitCEXparams) => Observable<InitCEXResponse>;
verifyMarkets: (config: Config, CEXmarkets: Dictionary<Market>) => boolean;
initDB$: ({
dataDir: string,
logger: Logger,
}: InitDBparams) => Observable<InitDBResponse>;
};

const logConfig = (config: Config, logger: Logger) => {
Expand Down Expand Up @@ -86,54 +92,67 @@ export const startArby = ({
trade$,
cleanup$,
initCEX$,
initDB$,
verifyMarkets,
}: StartArbyParams): Observable<any> => {
const store = getArbyStore();
return config$.pipe(
mergeMap(config => {
const CEX$ = initCEX$({
config,
loadMarkets$,
getExchange,
const loggers = getLoggers(config);
const db$ = initDB$({
logger: loggers.db,
dataDir: config.DATA_DIR,
});
return CEX$.pipe(
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
const loggers = getLoggers(config);
loggers.global.info('Starting. Hello, Arby.');
logConfig(config, loggers.global);
verifyMarkets(config, CEXmarkets);
const tradeComplete$ = trade$({
return db$.pipe(
mergeMap((models: InitDBResponse) => {
const CEX$ = initCEX$({
config,
loggers,
getOpenDEXcomplete$,
shutdown$,
getCentralizedExchangeOrder$,
catchOpenDEXerror,
getCentralizedExchangePrice$,
CEX,
store,
}).pipe(takeUntil(shutdown$));
return concat(
tradeComplete$,
cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
})
).pipe(
catchError(e => {
loggers.global.info(
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
);
return cleanup$({
loadMarkets$,
getExchange,
});
return CEX$.pipe(
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
loggers.global.info('Starting. Hello, Arby.');
logConfig(config, loggers.global);
verifyMarkets(config, CEXmarkets);
const orderRepository = new OrderRepository(models, loggers.db);
const tradeComplete$ = trade$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
getOpenDEXcomplete$,
shutdown$,
getCentralizedExchangeOrder$,
catchOpenDEXerror,
getCentralizedExchangePrice$,
CEX,
});
store,
saveOrder$: orderRepository.saveOrder$,
}).pipe(takeUntil(shutdown$));
return concat(
tradeComplete$,
cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
closeDB$,
CEX,
})
).pipe(
catchError(e => {
loggers.global.info(
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
);
return cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
closeDB$,
});
})
);
})
);
})
Expand All @@ -155,6 +174,7 @@ if (!module.parent) {
cleanup$: getCleanup$,
initCEX$,
verifyMarkets,
initDB$,
}).subscribe({
error: error => {
if (error.message) {
Expand Down
5 changes: 5 additions & 0 deletions src/centralized/execute-order.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { OrderSide } from '../constants';
import { Config } from '../config';
import { Observable } from 'rxjs';
import { Order, Exchange } from 'ccxt';
import { OrderInstance } from '../db/order';

let testScheduler: TestScheduler;

Expand All @@ -25,6 +26,9 @@ const assertExecuteCEXorder = (
const createOrder$ = () => {
return (cold(inputEvents.createOrder$) as unknown) as Observable<Order>;
};
const saveOrder$ = () => {
return (cold('') as unknown) as Observable<OrderInstance>;
};
const CEX = (null as unknown) as Exchange;
const CEXorder$ = executeCEXorder$({
CEX,
Expand All @@ -33,6 +37,7 @@ const assertExecuteCEXorder = (
price: inputEvents.price,
order: inputEvents.order,
createOrder$,
saveOrder$,
});
expectObservable(CEXorder$, inputEvents.unsubscribe).toBe(expected, {
a: null,
Expand Down
11 changes: 8 additions & 3 deletions src/centralized/execute-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import { Config } from '../config';
import { Logger } from '../logger';
import { CreateOrderParams } from './ccxt/create-order';
import { CEXorder } from './order-builder';
import { SaveOrderParams } from '../db/order-repository';
import { OrderInstance } from '../db/order';

type ExecuteCEXorderParams = {
CEX: Exchange;
config: Config;
logger: Logger;
price: BigNumber;
order: CEXorder;
saveOrder$: ({ order }: SaveOrderParams) => Observable<OrderInstance>;
createOrder$: ({
config,
exchange,
Expand All @@ -35,6 +38,7 @@ const executeCEXorder$ = ({
price,
order,
createOrder$,
saveOrder$,
}: ExecuteCEXorderParams): Observable<null> => {
if (!config.TEST_MODE) {
logger.info(
Expand All @@ -46,11 +50,12 @@ const executeCEXorder$ = ({
side: order.side,
quantity: order.quantity,
}).pipe(
tap(order =>
tap(order => {
logger.info(
`Centralized exchange order finished: ${JSON.stringify(order)}`
)
),
);
saveOrder$({ order });
}),
catchError((e, caught) => {
logger.warn(`Failed to execute CEX order: ${e}. Retrying in 1000ms`);
return timer(1000).pipe(mergeMapTo(caught));
Expand Down
5 changes: 5 additions & 0 deletions src/centralized/order.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { CEXorder } from './order-builder';
import BigNumber from 'bignumber.js';
import { Exchange } from 'ccxt';
import { getArbyStore } from '../store';
import { OrderInstance } from '../db/order';

let testScheduler: TestScheduler;

Expand Down Expand Up @@ -34,6 +35,9 @@ const assertCentralizedExchangeOrder = (
const centralizedExchangePrice$ = (cold(
inputEvents.centralizedExchangePrice$
) as unknown) as Observable<BigNumber>;
const saveOrder$ = () => {
return (cold('') as unknown) as Observable<OrderInstance>;
};
const CEX = (null as unknown) as Exchange;
const deriveCEXorderQuantity = (order: any) => order;
const store = getArbyStore();
Expand All @@ -46,6 +50,7 @@ const assertCentralizedExchangeOrder = (
centralizedExchangePrice$,
deriveCEXorderQuantity,
store,
saveOrder$,
});
expectObservable(centralizedExchangeOrder$, inputEvents.unsubscribe).toBe(
expected
Expand Down
6 changes: 6 additions & 0 deletions src/centralized/order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { ExecuteCEXorderParams } from './execute-order';
import { quantityAboveMinimum } from './minimum-order-quantity-filter';
import { CEXorder, GetOrderBuilderParams } from './order-builder';
import { ArbyStore } from 'src/store';
import { SaveOrderParams } from '../db/order-repository';
import { OrderInstance } from '../db/order';

type GetCentralizedExchangeOrderParams = {
CEX: Exchange;
Expand All @@ -28,6 +30,7 @@ type GetCentralizedExchangeOrderParams = {
logger,
price,
order,
saveOrder$,
}: ExecuteCEXorderParams) => Observable<null>;
getOrderBuilder$: ({
config,
Expand All @@ -43,12 +46,14 @@ type GetCentralizedExchangeOrderParams = {
config: Config
) => CEXorder;
store: ArbyStore;
saveOrder$: ({ order }: SaveOrderParams) => Observable<OrderInstance>;
};

const getCentralizedExchangeOrder$ = ({
CEX,
logger,
config,
saveOrder$,
executeCEXorder$,
getOrderBuilder$,
centralizedExchangePrice$,
Expand Down Expand Up @@ -79,6 +84,7 @@ const getCentralizedExchangeOrder$ = ({
logger,
price,
order: deriveCEXorderQuantity(order, price, config),
saveOrder$,
});
})
);
Expand Down
Loading