Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ CEX_BASEASSET=BTC
CEX_QUOTEASSET=USDT
TEST_CENTRALIZED_EXCHANGE_BASEASSET_BALANCE=10
TEST_CENTRALIZED_EXCHANGE_QUOTEASSET_BALANCE=100000
LIVE_CEX=false
TEST_MODE=true
12,137 changes: 12,041 additions & 96 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
"moment": "2.26.0",
"ramda": "0.27.0",
"rxjs": "6.5.5",
"sequelize": "^6.3.5",
"sqlite3": "^5.0.0",
"uuid": "8.1.0",
"winston": "3.2.1",
"ws": "7.3.0"
Expand Down
20 changes: 16 additions & 4 deletions src/arby.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { startArby } from '../src/arby';
import { Config } from '../src/config';
import { InitCEXResponse } from './centralized/ccxt/init';
import { getLoggers } from './test-utils';
import { InitDBResponse } from './db/db';

let testScheduler: TestScheduler;

Expand All @@ -15,6 +16,7 @@ type AssertStartArbyParams = {
shutdown$: string;
cleanup$: string;
initCEX$: string;
initDB$: string;
};
verifyMarkets?: () => boolean;
};
Expand All @@ -39,13 +41,19 @@ const assertStartArby = ({
InitCEXResponse
>;
};
const initDB$ = () => {
return (cold(inputEvents.initDB$) as unknown) as Observable<
InitDBResponse
>;
};
const arby$ = startArby({
config$,
getLoggers,
shutdown$,
trade$: getTrade$,
cleanup$,
initCEX$,
initDB$,
verifyMarkets: verifyMarkets ? verifyMarkets : () => true,
});
expectObservable(arby$).toBe(expected, undefined, { message: 'error' });
Expand All @@ -63,11 +71,12 @@ describe('startArby', () => {
const inputEvents = {
config$: '1000ms a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: 'b',
shutdown$: '',
cleanup$: '',
};
const expected = '2s b';
const expected = '3s b';
assertStartArby({
inputEvents,
expected,
Expand All @@ -78,11 +87,12 @@ describe('startArby', () => {
const inputEvents = {
config$: '1000ms a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: 'b',
shutdown$: '',
cleanup$: '',
};
const expected = '2s #';
const expected = '3s #';
assertStartArby({
inputEvents,
expected,
Expand All @@ -96,11 +106,12 @@ describe('startArby', () => {
const inputEvents = {
config$: 'a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: '500ms b',
shutdown$: '10s c',
cleanup$: '2s a',
};
const expected = '1500ms b 11499ms a';
const expected = '2500ms b 11499ms a';
assertStartArby({
inputEvents,
expected,
Expand All @@ -111,11 +122,12 @@ describe('startArby', () => {
const inputEvents = {
config$: 'a',
initCEX$: '1s a',
initDB$: '1s a',
getTrade$: '500ms #',
shutdown$: '10s c',
cleanup$: '2s a',
};
const expected = '3500ms a';
const expected = '4500ms a';
assertStartArby({
inputEvents,
expected,
Expand Down
126 changes: 77 additions & 49 deletions src/arby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import { getNewTrade$, GetTradeParams } from './trade/trade';
import { getStartShutdown$ } from './utils';
import { Dictionary, Market } from 'ccxt';
import { verifyMarkets } from './centralized/verify-markets';
import { initDB$, InitDBparams, InitDBResponse, closeDB$ } from './db/db';
import { saveOrder$ } from './db/order-repository';

type StartArbyParams = {
config$: Observable<Config>;
Expand All @@ -32,6 +34,8 @@ type StartArbyParams = {
getCentralizedExchangeOrder$,
getOpenDEXcomplete$,
shutdown$,
models,
saveOrder$,
}: GetTradeParams) => Observable<boolean>;
cleanup$: ({
config,
Expand All @@ -43,6 +47,10 @@ type StartArbyParams = {
loadMarkets$,
}: InitCEXparams) => Observable<InitCEXResponse>;
verifyMarkets: (config: Config, CEXmarkets: Dictionary<Market>) => boolean;
initDB$: ({
dataDir: string,
logger: Logger,
}: InitDBparams) => Observable<InitDBResponse>;
};

const logConfig = (config: Config, logger: Logger) => {
Expand Down Expand Up @@ -86,54 +94,65 @@ export const startArby = ({
trade$,
cleanup$,
initCEX$,
initDB$,
verifyMarkets,
}: StartArbyParams): Observable<any> => {
const store = getArbyStore();
return config$.pipe(
mergeMap(config => {
const CEX$ = initCEX$({
config,
loadMarkets$,
getExchange,
const loggers = getLoggers(config);
const db$ = initDB$({
logger: loggers.db,
dataDir: config.DATA_DIR,
});
return CEX$.pipe(
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
const loggers = getLoggers(config);
loggers.global.info('Starting. Hello, Arby.');
logConfig(config, loggers.global);
verifyMarkets(config, CEXmarkets);
const tradeComplete$ = trade$({
return db$.pipe(
mergeMap((models: InitDBResponse) => {
const CEX$ = initCEX$({
config,
loggers,
getOpenDEXcomplete$,
shutdown$,
getCentralizedExchangeOrder$,
catchOpenDEXerror,
getCentralizedExchangePrice$,
CEX,
store,
}).pipe(takeUntil(shutdown$));
return concat(
tradeComplete$,
cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
})
).pipe(
catchError(e => {
loggers.global.info(
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
);
return cleanup$({
loadMarkets$,
getExchange,
});
return CEX$.pipe(
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
loggers.global.info('Starting. Hello, Arby.');
logConfig(config, loggers.global);
verifyMarkets(config, CEXmarkets);
const tradeComplete$ = trade$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
getOpenDEXcomplete$,
shutdown$,
getCentralizedExchangeOrder$,
catchOpenDEXerror,
getCentralizedExchangePrice$,
CEX,
});
store,
saveOrder$,
models,
}).pipe(takeUntil(shutdown$));
return concat(
tradeComplete$,
cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
})
).pipe(
catchError(e => {
loggers.global.info(
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
);
return cleanup$({
config,
loggers,
removeOpenDEXorders$,
removeCEXorders$,
CEX,
});
})
);
})
);
})
Expand All @@ -155,15 +174,24 @@ if (!module.parent) {
cleanup$: getCleanup$,
initCEX$,
verifyMarkets,
}).subscribe({
error: error => {
if (error.message) {
console.log(`Error: ${error.message}`);
} else {
console.log(error);
}
process.exit(1);
},
complete: () => console.log('Shutdown complete. Goodbye, Arby.'),
});
initDB$,
})
.pipe(
mergeMap(() => {
return closeDB$();
})
)
.subscribe({
error: error => {
if (error.message) {
console.log(`Error: ${error.message}`);
} else {
console.log(error);
}
process.exit(1);
},
complete: () => {
console.log('Shutdown complete. Goodbye, Arby.');
},
});
}
13 changes: 12 additions & 1 deletion src/centralized/execute-order.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { TestScheduler } from 'rxjs/testing';
import { executeCEXorder$ } from './execute-order';
import { getLoggers, testConfig } from '../test-utils';
import { getLoggers, getModels, testConfig } from '../test-utils';
import BigNumber from 'bignumber.js';
import { CEXorder } from './order-builder';
import { OrderSide } from '../constants';
import { Config } from '../config';
import { Observable } from 'rxjs';
import { Order, Exchange } from 'ccxt';
import { OrderInstance } from '../db/order';
import { InitDBResponse } from '../db/db';

let testScheduler: TestScheduler;

Expand All @@ -15,6 +17,7 @@ const assertExecuteCEXorder = (
config: Config;
price: BigNumber;
order: CEXorder;
models: InitDBResponse;
createOrder$: string;
unsubscribe?: string;
},
Expand All @@ -25,6 +28,9 @@ const assertExecuteCEXorder = (
const createOrder$ = () => {
return (cold(inputEvents.createOrder$) as unknown) as Observable<Order>;
};
const saveOrder$ = () => {
return (cold('') as unknown) as Observable<OrderInstance>;
};
const CEX = (null as unknown) as Exchange;
const CEXorder$ = executeCEXorder$({
CEX,
Expand All @@ -33,6 +39,8 @@ const assertExecuteCEXorder = (
price: inputEvents.price,
order: inputEvents.order,
createOrder$,
saveOrder$,
models: inputEvents.models,
});
expectObservable(CEXorder$, inputEvents.unsubscribe).toBe(expected, {
a: null,
Expand All @@ -56,6 +64,7 @@ describe('executeCEXorder$', () => {
quantity: new BigNumber('0.001'),
side: OrderSide.BUY,
},
models: getModels(),
};
const expected = '5s (a|)';
assertExecuteCEXorder(inputEvents, expected);
Expand All @@ -74,6 +83,7 @@ describe('executeCEXorder$', () => {
quantity: new BigNumber('0.001'),
side: OrderSide.BUY,
},
models: getModels(),
};
const expected = '1s (a|)';
assertExecuteCEXorder(inputEvents, expected);
Expand All @@ -93,6 +103,7 @@ describe('executeCEXorder$', () => {
side: OrderSide.BUY,
},
unsubscribe: '4s !',
models: getModels(),
};
const expected = '';
assertExecuteCEXorder(inputEvents, expected);
Expand Down
18 changes: 15 additions & 3 deletions src/centralized/execute-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,22 @@ import { Config } from '../config';
import { Logger } from '../logger';
import { CreateOrderParams } from './ccxt/create-order';
import { CEXorder } from './order-builder';
import { SaveOrderParams } from '../db/order-repository';
import { OrderInstance } from '../db/order';
import { InitDBResponse } from '../db/db';

type ExecuteCEXorderParams = {
CEX: Exchange;
config: Config;
logger: Logger;
price: BigNumber;
order: CEXorder;
saveOrder$: ({
order,
logger,
models,
}: SaveOrderParams) => Observable<OrderInstance>;
models: InitDBResponse;
createOrder$: ({
config,
exchange,
Expand All @@ -35,6 +44,8 @@ const executeCEXorder$ = ({
price,
order,
createOrder$,
saveOrder$,
models,
}: ExecuteCEXorderParams): Observable<null> => {
if (!config.TEST_MODE) {
logger.info(
Expand All @@ -46,11 +57,12 @@ const executeCEXorder$ = ({
side: order.side,
quantity: order.quantity,
}).pipe(
tap(order =>
tap(order => {
logger.info(
`Centralized exchange order finished: ${JSON.stringify(order)}`
)
),
);
saveOrder$({ order, logger, models }).subscribe();
}),
catchError((e, caught) => {
logger.warn(`Failed to execute CEX order: ${e}. Retrying in 1000ms`);
return timer(1000).pipe(mergeMapTo(caught));
Expand Down
Loading