@@ -21,6 +21,7 @@ import { getNewTrade$, GetTradeParams } from './trade/trade';
21
21
import { getStartShutdown$ } from './utils' ;
22
22
import { Dictionary , Market } from 'ccxt' ;
23
23
import { verifyMarkets } from './centralized/verify-markets' ;
24
+ import { initDB$ , InitDBparams , InitDBResponse } from './db/db' ;
24
25
25
26
type StartArbyParams = {
26
27
config$ : Observable < Config > ;
@@ -43,6 +44,10 @@ type StartArbyParams = {
43
44
loadMarkets$,
44
45
} : InitCEXparams ) => Observable < InitCEXResponse > ;
45
46
verifyMarkets : ( config : Config , CEXmarkets : Dictionary < Market > ) => boolean ;
47
+ initDB$ : ( {
48
+ dataDir : string ,
49
+ logger : Logger ,
50
+ } : InitDBparams ) => Observable < InitDBResponse > ;
46
51
} ;
47
52
48
53
const logConfig = ( config : Config , logger : Logger ) => {
@@ -86,54 +91,63 @@ export const startArby = ({
86
91
trade$,
87
92
cleanup$,
88
93
initCEX$,
94
+ initDB$,
89
95
verifyMarkets,
90
96
} : StartArbyParams ) : Observable < any > => {
91
97
const store = getArbyStore ( ) ;
92
98
return config$ . pipe (
93
99
mergeMap ( config => {
94
- const CEX$ = initCEX$ ( {
95
- config ,
96
- loadMarkets$ ,
97
- getExchange ,
100
+ const loggers = getLoggers ( config ) ;
101
+ const db$ = initDB$ ( {
102
+ logger : loggers . db ,
103
+ dataDir : config . DATA_DIR ,
98
104
} ) ;
99
- return CEX$ . pipe (
100
- mergeMap ( ( { markets : CEXmarkets , exchange : CEX } ) => {
101
- const loggers = getLoggers ( config ) ;
102
- loggers . global . info ( 'Starting. Hello, Arby.' ) ;
103
- logConfig ( config , loggers . global ) ;
104
- verifyMarkets ( config , CEXmarkets ) ;
105
- const tradeComplete$ = trade$ ( {
105
+ return db$ . pipe (
106
+ mergeMap ( ( ) => {
107
+ const CEX$ = initCEX$ ( {
106
108
config,
107
- loggers,
108
- getOpenDEXcomplete$,
109
- shutdown$,
110
- getCentralizedExchangeOrder$,
111
- catchOpenDEXerror,
112
- getCentralizedExchangePrice$,
113
- CEX ,
114
- store,
115
- } ) . pipe ( takeUntil ( shutdown$ ) ) ;
116
- return concat (
117
- tradeComplete$ ,
118
- cleanup$ ( {
119
- config,
120
- loggers,
121
- removeOpenDEXorders$,
122
- removeCEXorders$,
123
- CEX ,
124
- } )
125
- ) . pipe (
126
- catchError ( e => {
127
- loggers . global . info (
128
- `Unrecoverable error: ${ JSON . stringify ( e ) } - cleaning up.`
129
- ) ;
130
- return cleanup$ ( {
109
+ loadMarkets$,
110
+ getExchange,
111
+ } ) ;
112
+ return CEX$ . pipe (
113
+ mergeMap ( ( { markets : CEXmarkets , exchange : CEX } ) => {
114
+ loggers . global . info ( 'Starting. Hello, Arby.' ) ;
115
+ logConfig ( config , loggers . global ) ;
116
+ verifyMarkets ( config , CEXmarkets ) ;
117
+ const tradeComplete$ = trade$ ( {
131
118
config,
132
119
loggers,
133
- removeOpenDEXorders$,
134
- removeCEXorders$,
120
+ getOpenDEXcomplete$,
121
+ shutdown$,
122
+ getCentralizedExchangeOrder$,
123
+ catchOpenDEXerror,
124
+ getCentralizedExchangePrice$,
135
125
CEX ,
136
- } ) ;
126
+ store,
127
+ } ) . pipe ( takeUntil ( shutdown$ ) ) ;
128
+ return concat (
129
+ tradeComplete$ ,
130
+ cleanup$ ( {
131
+ config,
132
+ loggers,
133
+ removeOpenDEXorders$,
134
+ removeCEXorders$,
135
+ CEX ,
136
+ } )
137
+ ) . pipe (
138
+ catchError ( e => {
139
+ loggers . global . info (
140
+ `Unrecoverable error: ${ JSON . stringify ( e ) } - cleaning up.`
141
+ ) ;
142
+ return cleanup$ ( {
143
+ config,
144
+ loggers,
145
+ removeOpenDEXorders$,
146
+ removeCEXorders$,
147
+ CEX ,
148
+ } ) ;
149
+ } )
150
+ ) ;
137
151
} )
138
152
) ;
139
153
} )
@@ -155,6 +169,7 @@ if (!module.parent) {
155
169
cleanup$ : getCleanup$ ,
156
170
initCEX$,
157
171
verifyMarkets,
172
+ initDB$,
158
173
} ) . subscribe ( {
159
174
error : error => {
160
175
if ( error . message ) {
0 commit comments