Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ CEX_BASEASSET=BTC
CEX_QUOTEASSET=USDT
TEST_CENTRALIZED_EXCHANGE_BASEASSET_BALANCE=10
TEST_CENTRALIZED_EXCHANGE_QUOTEASSET_BALANCE=100000
LIVE_CEX=false
TEST_MODE=true
12,137 changes: 12,041 additions & 96 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
"moment": "2.26.0",
"ramda": "0.27.0",
"rxjs": "6.5.5",
"sequelize": "^6.3.5",
"sqlite3": "^5.0.0",
"uuid": "8.1.0",
"winston": "3.2.1",
"ws": "7.3.0"
Expand Down
20 changes: 16 additions & 4 deletions src/arby.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { startArby } from '../src/arby';
import { Config } from '../src/config';
import { InitCEXResponse } from './centralized/ccxt/init';
import { getLoggers } from './test-utils';
import { InitDBResponse } from './db/db';

let testScheduler: TestScheduler;

Expand All @@ -15,6 +16,7 @@ type AssertStartArbyParams = {
shutdown$: string;
cleanup$: string;
initCEX$: string;
initDB$: string;
};
verifyMarkets?: () => boolean;
};
Expand All @@ -39,13 +41,19 @@ const assertStartArby = ({
InitCEXResponse
>;
};
const initDB$ = () => {
return (cold(inputEvents.initDB$) as unknown) as Observable<
InitDBResponse
>;
};
const arby$ = startArby({
config$,
getLoggers,
shutdown$,
trade$: getTrade$,
cleanup$,
initCEX$,
initDB$,
verifyMarkets: verifyMarkets ? verifyMarkets : () => true,
});
expectObservable(arby$).toBe(expected, undefined, { message: 'error' });
Expand All @@ -63,11 +71,12 @@ describe('startArby', () => {
const inputEvents = {
config$: '1000ms a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: 'b',
shutdown$: '',
cleanup$: '',
};
const expected = '2s b';
const expected = '3s b';
assertStartArby({
inputEvents,
expected,
Expand All @@ -78,11 +87,12 @@ describe('startArby', () => {
const inputEvents = {
config$: '1000ms a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: 'b',
shutdown$: '',
cleanup$: '',
};
const expected = '2s #';
const expected = '3s #';
assertStartArby({
inputEvents,
expected,
Expand All @@ -96,11 +106,12 @@ describe('startArby', () => {
const inputEvents = {
config$: 'a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: '500ms b',
shutdown$: '10s c',
cleanup$: '2s a',
};
const expected = '1500ms b 11499ms a';
const expected = '2500ms b 11499ms a';
assertStartArby({
inputEvents,
expected,
Expand All @@ -111,11 +122,12 @@ describe('startArby', () => {
const inputEvents = {
config$: 'a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: '500ms #',
shutdown$: '10s c',
cleanup$: '2s a',
};
const expected = '3500ms a';
const expected = '4500ms a';
assertStartArby({
inputEvents,
expected,
Expand Down
110 changes: 70 additions & 40 deletions src/arby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import { getNewTrade$, GetTradeParams } from './trade/trade';
import { getStartShutdown$ } from './utils';
import { Dictionary, Market } from 'ccxt';
import { verifyMarkets } from './centralized/verify-markets';
import { initDB$, InitDBparams, InitDBResponse, closeDB$ } from './db/db';
import { saveOrder$ } from './db/order-repository';

type StartArbyParams = {
config$: Observable<Config>;
Expand All @@ -32,6 +34,8 @@ type StartArbyParams = {
getCentralizedExchangeOrder$,
getOpenDEXcomplete$,
shutdown$,
models,
saveOrder$,
}: GetTradeParams) => Observable<boolean>;
cleanup$: ({
config,
Expand All @@ -43,6 +47,10 @@ type StartArbyParams = {
loadMarkets$,
}: InitCEXparams) => Observable<InitCEXResponse>;
verifyMarkets: (config: Config, CEXmarkets: Dictionary<Market>) => boolean;
initDB$: ({
dataDir: string,
logger: Logger,
}: InitDBparams) => Observable<InitDBResponse>;
};

const logConfig = (config: Config, logger: Logger) => {
Expand Down Expand Up @@ -86,54 +94,65 @@ export const startArby = ({
trade$,
cleanup$,
initCEX$,
initDB$,
verifyMarkets,
}: StartArbyParams): Observable<any> => {
const store = getArbyStore();
return config$.pipe(
mergeMap(config => {
const CEX$ = initCEX$({
config,
loadMarkets$,
getExchange,
const loggers = getLoggers(config);
const db$ = initDB$({
logger: loggers.db,
dataDir: config.DATA_DIR,
});
return CEX$.pipe(
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
const loggers = getLoggers(config);
loggers.global.info('Starting. Hello, Arby.');
logConfig(config, loggers.global);
verifyMarkets(config, CEXmarkets);
const tradeComplete$ = trade$({
return db$.pipe(
mergeMap((models: InitDBResponse) => {
const CEX$ = initCEX$({
config,
loggers,
getOpenDEXcomplete$,
shutdown$,
getCentralizedExchangeOrder$,
catchOpenDEXerror,
getCentralizedExchangePrice$,
CEX,
store,
}).pipe(takeUntil(shutdown$));
return concat(
tradeComplete$,
cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
})
).pipe(
catchError(e => {
loggers.global.info(
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
);
return cleanup$({
loadMarkets$,
getExchange,
});
return CEX$.pipe(
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
loggers.global.info('Starting. Hello, Arby.');
logConfig(config, loggers.global);
verifyMarkets(config, CEXmarkets);
const tradeComplete$ = trade$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
getOpenDEXcomplete$,
shutdown$,
getCentralizedExchangeOrder$,
catchOpenDEXerror,
getCentralizedExchangePrice$,
CEX,
});
store,
saveOrder$,
models,
}).pipe(takeUntil(shutdown$));
return concat(
tradeComplete$,
cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
})
).pipe(
catchError(e => {
loggers.global.info(
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
);
return cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
});
})
);
})
);
})
Expand All @@ -155,15 +174,26 @@ if (!module.parent) {
cleanup$: getCleanup$,
initCEX$,
verifyMarkets,
initDB$,
}).subscribe({
error: error => {
if (error.message) {
console.log(`Error: ${error.message}`);
} else {
console.log(error);
}
process.exit(1);
closeDB$().subscribe({
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever you need to subscribe within a subscribe callback you should probably be using a transformation operation instead (such as mergeMap).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this by adding a pipe to startArby observable and then calling closeDB observable within mergeMap, but I'm really unsure if it worked, because if I put console.log statements inside to mergeMap (that I call closeDB), they're never getting called, and never printing, can you please check @ERKarl

I followed here: https://rxjs-dev.firebaseapp.com/api/operators/mergeMap

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete: () => {
process.exit(1);
},
});
},
complete: () => {
closeDB$().subscribe({
complete: () => {
console.log('Shutdown complete. Goodbye, Arby.');
},
});
},
complete: () => console.log('Shutdown complete. Goodbye, Arby.'),
});
}
13 changes: 12 additions & 1 deletion src/centralized/execute-order.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { TestScheduler } from 'rxjs/testing';
import { executeCEXorder$ } from './execute-order';
import { getLoggers, testConfig } from '../test-utils';
import { getLoggers, getModels, testConfig } from '../test-utils';
import BigNumber from 'bignumber.js';
import { CEXorder } from './order-builder';
import { OrderSide } from '../constants';
import { Config } from '../config';
import { Observable } from 'rxjs';
import { Order, Exchange } from 'ccxt';
import { OrderInstance } from '../db/order';
import { InitDBResponse } from '../db/db';

let testScheduler: TestScheduler;

Expand All @@ -15,6 +17,7 @@ const assertExecuteCEXorder = (
config: Config;
price: BigNumber;
order: CEXorder;
models: InitDBResponse;
createOrder$: string;
unsubscribe?: string;
},
Expand All @@ -25,6 +28,9 @@ const assertExecuteCEXorder = (
const createOrder$ = () => {
return (cold(inputEvents.createOrder$) as unknown) as Observable<Order>;
};
const saveOrder$ = () => {
return (cold('') as unknown) as Observable<OrderInstance>;
};
const CEX = (null as unknown) as Exchange;
const CEXorder$ = executeCEXorder$({
CEX,
Expand All @@ -33,6 +39,8 @@ const assertExecuteCEXorder = (
price: inputEvents.price,
order: inputEvents.order,
createOrder$,
saveOrder$,
models: inputEvents.models,
});
expectObservable(CEXorder$, inputEvents.unsubscribe).toBe(expected, {
a: null,
Expand All @@ -56,6 +64,7 @@ describe('executeCEXorder$', () => {
quantity: new BigNumber('0.001'),
side: OrderSide.BUY,
},
models: getModels(),
};
const expected = '5s (a|)';
assertExecuteCEXorder(inputEvents, expected);
Expand All @@ -74,6 +83,7 @@ describe('executeCEXorder$', () => {
quantity: new BigNumber('0.001'),
side: OrderSide.BUY,
},
models: getModels(),
};
const expected = '1s (a|)';
assertExecuteCEXorder(inputEvents, expected);
Expand All @@ -93,6 +103,7 @@ describe('executeCEXorder$', () => {
side: OrderSide.BUY,
},
unsubscribe: '4s !',
models: getModels(),
};
const expected = '';
assertExecuteCEXorder(inputEvents, expected);
Expand Down
18 changes: 15 additions & 3 deletions src/centralized/execute-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,22 @@ import { Config } from '../config';
import { Logger } from '../logger';
import { CreateOrderParams } from './ccxt/create-order';
import { CEXorder } from './order-builder';
import { SaveOrderParams } from '../db/order-repository';
import { OrderInstance } from '../db/order';
import { InitDBResponse } from '../db/db';

type ExecuteCEXorderParams = {
CEX: Exchange;
config: Config;
logger: Logger;
price: BigNumber;
order: CEXorder;
saveOrder$: ({
order,
logger,
models,
}: SaveOrderParams) => Observable<OrderInstance>;
models: InitDBResponse;
createOrder$: ({
config,
exchange,
Expand All @@ -35,6 +44,8 @@ const executeCEXorder$ = ({
price,
order,
createOrder$,
saveOrder$,
models,
}: ExecuteCEXorderParams): Observable<null> => {
if (!config.TEST_MODE) {
logger.info(
Expand All @@ -46,11 +57,12 @@ const executeCEXorder$ = ({
side: order.side,
quantity: order.quantity,
}).pipe(
tap(order =>
tap(order => {
logger.info(
`Centralized exchange order finished: ${JSON.stringify(order)}`
)
),
);
saveOrder$({ order, logger, models }).subscribe();
}),
catchError((e, caught) => {
logger.warn(`Failed to execute CEX order: ${e}. Retrying in 1000ms`);
return timer(1000).pipe(mergeMapTo(caught));
Expand Down
Loading