Skip to content

Commit 079a754

Browse files
committed
feat: adding sequelize along with new db type 'CexOrder' (#85)
1 parent 8bb5b1d commit 079a754

File tree

10 files changed

+11539
-138
lines changed

10 files changed

+11539
-138
lines changed

.env-example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ CEX_BASEASSET=BTC
1313
CEX_QUOTEASSET=USDT
1414
TEST_CENTRALIZED_EXCHANGE_BASEASSET_BALANCE=10
1515
TEST_CENTRALIZED_EXCHANGE_QUOTEASSET_BALANCE=100000
16-
LIVE_CEX=false
16+
TEST_MODE=true

package-lock.json

Lines changed: 11240 additions & 95 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
"moment": "2.26.0",
6060
"ramda": "0.27.0",
6161
"rxjs": "6.5.5",
62+
"sequelize": "^6.3.5",
63+
"sqlite3": "^5.0.0",
6264
"uuid": "8.1.0",
6365
"winston": "3.2.1",
6466
"ws": "7.3.0"

src/arby.spec.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { startArby } from '../src/arby';
44
import { Config } from '../src/config';
55
import { InitCEXResponse } from './centralized/ccxt/init';
66
import { getLoggers } from './test-utils';
7+
import { InitDBResponse } from './db/db';
78

89
let testScheduler: TestScheduler;
910

@@ -15,6 +16,7 @@ type AssertStartArbyParams = {
1516
shutdown$: string;
1617
cleanup$: string;
1718
initCEX$: string;
19+
initDB$: string;
1820
};
1921
verifyMarkets?: () => boolean;
2022
};
@@ -39,13 +41,19 @@ const assertStartArby = ({
3941
InitCEXResponse
4042
>;
4143
};
44+
const initDB$ = () => {
45+
return (cold(inputEvents.initDB$) as unknown) as Observable<
46+
InitDBResponse
47+
>;
48+
};
4249
const arby$ = startArby({
4350
config$,
4451
getLoggers,
4552
shutdown$,
4653
trade$: getTrade$,
4754
cleanup$,
4855
initCEX$,
56+
initDB$,
4957
verifyMarkets: verifyMarkets ? verifyMarkets : () => true,
5058
});
5159
expectObservable(arby$).toBe(expected, undefined, { message: 'error' });
@@ -63,11 +71,12 @@ describe('startArby', () => {
6371
const inputEvents = {
6472
config$: '1000ms a',
6573
initCEX$: '1s a',
74+
initDB$: '1s a',
6675
getTrade$: 'b',
6776
shutdown$: '',
6877
cleanup$: '',
6978
};
70-
const expected = '2s b';
79+
const expected = '3s b';
7180
assertStartArby({
7281
inputEvents,
7382
expected,
@@ -78,11 +87,12 @@ describe('startArby', () => {
7887
const inputEvents = {
7988
config$: '1000ms a',
8089
initCEX$: '1s a',
90+
initDB$: '1s a',
8191
getTrade$: 'b',
8292
shutdown$: '',
8393
cleanup$: '',
8494
};
85-
const expected = '2s #';
95+
const expected = '3s #';
8696
assertStartArby({
8797
inputEvents,
8898
expected,
@@ -96,11 +106,12 @@ describe('startArby', () => {
96106
const inputEvents = {
97107
config$: 'a',
98108
initCEX$: '1s a',
109+
initDB$: '1s a',
99110
getTrade$: '500ms b',
100111
shutdown$: '10s c',
101112
cleanup$: '2s a',
102113
};
103-
const expected = '1500ms b 11499ms a';
114+
const expected = '2500ms b 11499ms a';
104115
assertStartArby({
105116
inputEvents,
106117
expected,
@@ -111,11 +122,12 @@ describe('startArby', () => {
111122
const inputEvents = {
112123
config$: 'a',
113124
initCEX$: '1s a',
125+
initDB$: '1s a',
114126
getTrade$: '500ms #',
115127
shutdown$: '10s c',
116128
cleanup$: '2s a',
117129
};
118-
const expected = '3500ms a';
130+
const expected = '4500ms a';
119131
assertStartArby({
120132
inputEvents,
121133
expected,

src/arby.ts

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { getNewTrade$, GetTradeParams } from './trade/trade';
2121
import { getStartShutdown$ } from './utils';
2222
import { Dictionary, Market } from 'ccxt';
2323
import { verifyMarkets } from './centralized/verify-markets';
24+
import { initDB$, InitDBparams, InitDBResponse } from './db/db';
2425

2526
type StartArbyParams = {
2627
config$: Observable<Config>;
@@ -43,6 +44,10 @@ type StartArbyParams = {
4344
loadMarkets$,
4445
}: InitCEXparams) => Observable<InitCEXResponse>;
4546
verifyMarkets: (config: Config, CEXmarkets: Dictionary<Market>) => boolean;
47+
initDB$: ({
48+
dataDir: string,
49+
logger: Logger,
50+
}: InitDBparams) => Observable<InitDBResponse>;
4651
};
4752

4853
const logConfig = (config: Config, logger: Logger) => {
@@ -86,54 +91,63 @@ export const startArby = ({
8691
trade$,
8792
cleanup$,
8893
initCEX$,
94+
initDB$,
8995
verifyMarkets,
9096
}: StartArbyParams): Observable<any> => {
9197
const store = getArbyStore();
9298
return config$.pipe(
9399
mergeMap(config => {
94-
const CEX$ = initCEX$({
95-
config,
96-
loadMarkets$,
97-
getExchange,
100+
const loggers = getLoggers(config);
101+
const db$ = initDB$({
102+
logger: loggers.db,
103+
dataDir: config.DATA_DIR,
98104
});
99-
return CEX$.pipe(
100-
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
101-
const loggers = getLoggers(config);
102-
loggers.global.info('Starting. Hello, Arby.');
103-
logConfig(config, loggers.global);
104-
verifyMarkets(config, CEXmarkets);
105-
const tradeComplete$ = trade$({
105+
return db$.pipe(
106+
mergeMap(() => {
107+
const CEX$ = initCEX$({
106108
config,
107-
loggers,
108-
getOpenDEXcomplete$,
109-
shutdown$,
110-
getCentralizedExchangeOrder$,
111-
catchOpenDEXerror,
112-
getCentralizedExchangePrice$,
113-
CEX,
114-
store,
115-
}).pipe(takeUntil(shutdown$));
116-
return concat(
117-
tradeComplete$,
118-
cleanup$({
119-
config,
120-
loggers,
121-
removeOpenDEXorders$,
122-
removeCEXorders$,
123-
CEX,
124-
})
125-
).pipe(
126-
catchError(e => {
127-
loggers.global.info(
128-
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
129-
);
130-
return cleanup$({
109+
loadMarkets$,
110+
getExchange,
111+
});
112+
return CEX$.pipe(
113+
mergeMap(({ markets: CEXmarkets, exchange: CEX }) => {
114+
loggers.global.info('Starting. Hello, Arby.');
115+
logConfig(config, loggers.global);
116+
verifyMarkets(config, CEXmarkets);
117+
const tradeComplete$ = trade$({
131118
config,
132119
loggers,
133-
removeOpenDEXorders$,
134-
removeCEXorders$,
120+
getOpenDEXcomplete$,
121+
shutdown$,
122+
getCentralizedExchangeOrder$,
123+
catchOpenDEXerror,
124+
getCentralizedExchangePrice$,
135125
CEX,
136-
});
126+
store,
127+
}).pipe(takeUntil(shutdown$));
128+
return concat(
129+
tradeComplete$,
130+
cleanup$({
131+
config,
132+
loggers,
133+
removeOpenDEXorders$,
134+
removeCEXorders$,
135+
CEX,
136+
})
137+
).pipe(
138+
catchError(e => {
139+
loggers.global.info(
140+
`Unrecoverable error: ${JSON.stringify(e)} - cleaning up.`
141+
);
142+
return cleanup$({
143+
config,
144+
loggers,
145+
removeOpenDEXorders$,
146+
removeCEXorders$,
147+
CEX,
148+
});
149+
})
150+
);
137151
})
138152
);
139153
})
@@ -155,6 +169,7 @@ if (!module.parent) {
155169
cleanup$: getCleanup$,
156170
initCEX$,
157171
verifyMarkets,
172+
initDB$,
158173
}).subscribe({
159174
error: error => {
160175
if (error.message) {

src/db/db.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { from, Observable, of } from 'rxjs';
2+
import { Sequelize, ModelCtor } from 'sequelize';
3+
import { Order, OrderInstance } from './order';
4+
import { Fee, FeeInstance } from './fee';
5+
import { Trade, TradeInstance } from './trade';
6+
import { Logger } from '../logger';
7+
import { mergeMap } from 'rxjs/operators';
8+
9+
type InitDBparams = {
10+
dataDir?: string;
11+
logger: Logger;
12+
};
13+
14+
type InitDBResponse = {
15+
Order: ModelCtor<OrderInstance>;
16+
Fee: ModelCtor<FeeInstance>;
17+
Trade: ModelCtor<TradeInstance>;
18+
};
19+
20+
const createModels = (sequelize: Sequelize): InitDBResponse => {
21+
const models = {
22+
Order: Order(sequelize),
23+
Fee: Fee(sequelize),
24+
Trade: Trade(sequelize),
25+
};
26+
27+
models.Order.hasMany(models.Trade, {
28+
as: 'orderTrades',
29+
foreignKey: 'orderId',
30+
constraints: true,
31+
});
32+
models.Trade.belongsTo(models.Order, {
33+
as: 'orderInstance',
34+
constraints: true,
35+
foreignKey: 'orderId',
36+
});
37+
38+
models.Fee.belongsTo(models.Order, {
39+
foreignKey: 'orderId',
40+
constraints: true,
41+
});
42+
models.Fee.belongsTo(models.Trade, {
43+
foreignKey: 'tradeId',
44+
constraints: true,
45+
});
46+
47+
return models;
48+
};
49+
50+
const initDB$ = ({
51+
logger,
52+
dataDir,
53+
}: InitDBparams): Observable<InitDBResponse> => {
54+
const sequelize = new Sequelize({
55+
storage: dataDir ? `${dataDir}/arby.db` : undefined,
56+
logging: logger.trace,
57+
dialect: 'sqlite',
58+
});
59+
60+
return of(createModels(sequelize)).pipe(
61+
mergeMap(models => {
62+
return from(sequelize.authenticate()).pipe(
63+
mergeMap(() => {
64+
return from(models.Order.sync()).pipe(
65+
mergeMap(() => {
66+
return from(models.Trade.sync()).pipe(
67+
mergeMap(() => {
68+
return from(models.Fee.sync()).pipe(mergeMap(() => of(models)));
69+
})
70+
);
71+
})
72+
);
73+
})
74+
);
75+
})
76+
);
77+
};
78+
79+
export { initDB$, InitDBparams, InitDBResponse };

src/db/fee.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import {
2+
DataTypes,
3+
Model,
4+
ModelAttributes,
5+
ModelOptions,
6+
Sequelize,
7+
} from 'sequelize';
8+
import { ModelCtor } from 'sequelize/types/lib/model';
9+
10+
type FeeAttributes = {
11+
orderId?: string;
12+
tradeId?: string;
13+
type: 'taker' | 'maker';
14+
currency: string;
15+
rate: number;
16+
cost: number;
17+
};
18+
19+
export interface FeeInstance extends Model<FeeAttributes>, FeeAttributes {}
20+
21+
export function Fee(sequelize: Sequelize): ModelCtor<FeeInstance> {
22+
const attributes: ModelAttributes<FeeInstance> = {
23+
orderId: { type: DataTypes.STRING, allowNull: true },
24+
tradeId: { type: DataTypes.STRING, allowNull: true },
25+
type: { type: DataTypes.STRING, allowNull: false },
26+
currency: { type: DataTypes.STRING, allowNull: false },
27+
rate: { type: DataTypes.DOUBLE, allowNull: false },
28+
cost: { type: DataTypes.DOUBLE, allowNull: false },
29+
};
30+
31+
const options: ModelOptions = {
32+
tableName: 'fees',
33+
timestamps: false,
34+
};
35+
36+
return sequelize.define<FeeInstance>('Fee', attributes, options);
37+
}

0 commit comments

Comments
 (0)