Skip to content

Commit c98a140

Browse files
committed
saveOrder$ added to executeOrder of cex
1 parent 56655ec commit c98a140

File tree

10 files changed

+127
-5
lines changed

10 files changed

+127
-5
lines changed

src/arby.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { getStartShutdown$ } from './utils';
2222
import { Dictionary, Market } from 'ccxt';
2323
import { verifyMarkets } from './centralized/verify-markets';
2424
import { initDB$, InitDBparams, InitDBResponse } from './db/db';
25+
import { OrderRepository } from './db/order-repository';
2526

2627
type StartArbyParams = {
2728
config$: Observable<Config>;
@@ -103,7 +104,7 @@ export const startArby = ({
103104
dataDir: config.DATA_DIR,
104105
});
105106
return db$.pipe(
106-
mergeMap(() => {
107+
mergeMap((models: InitDBResponse) => {
107108
const CEX$ = initCEX$({
108109
config,
109110
loadMarkets$,
@@ -114,6 +115,7 @@ export const startArby = ({
114115
loggers.global.info('Starting. Hello, Arby.');
115116
logConfig(config, loggers.global);
116117
verifyMarkets(config, CEXmarkets);
118+
const orderRepository = new OrderRepository(models, loggers.db);
117119
const tradeComplete$ = trade$({
118120
config,
119121
loggers,
@@ -124,6 +126,7 @@ export const startArby = ({
124126
getCentralizedExchangePrice$,
125127
CEX,
126128
store,
129+
saveOrder$: orderRepository.saveOrder$,
127130
}).pipe(takeUntil(shutdown$));
128131
return concat(
129132
tradeComplete$,

src/centralized/execute-order.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { OrderSide } from '../constants';
77
import { Config } from '../config';
88
import { Observable } from 'rxjs';
99
import { Order, Exchange } from 'ccxt';
10+
import { OrderInstance } from '../db/order';
1011

1112
let testScheduler: TestScheduler;
1213

@@ -25,6 +26,9 @@ const assertExecuteCEXorder = (
2526
const createOrder$ = () => {
2627
return (cold(inputEvents.createOrder$) as unknown) as Observable<Order>;
2728
};
29+
const saveOrder$ = () => {
30+
return (cold('') as unknown) as Observable<OrderInstance>;
31+
};
2832
const CEX = (null as unknown) as Exchange;
2933
const CEXorder$ = executeCEXorder$({
3034
CEX,
@@ -33,6 +37,7 @@ const assertExecuteCEXorder = (
3337
price: inputEvents.price,
3438
order: inputEvents.order,
3539
createOrder$,
40+
saveOrder$,
3641
});
3742
expectObservable(CEXorder$, inputEvents.unsubscribe).toBe(expected, {
3843
a: null,

src/centralized/execute-order.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ import { Config } from '../config';
1313
import { Logger } from '../logger';
1414
import { CreateOrderParams } from './ccxt/create-order';
1515
import { CEXorder } from './order-builder';
16+
import { SaveOrderParams } from '../db/order-repository';
17+
import { OrderInstance } from '../db/order';
1618

1719
type ExecuteCEXorderParams = {
1820
CEX: Exchange;
1921
config: Config;
2022
logger: Logger;
2123
price: BigNumber;
2224
order: CEXorder;
25+
saveOrder$: ({ order }: SaveOrderParams) => Observable<OrderInstance>;
2326
createOrder$: ({
2427
config,
2528
exchange,
@@ -35,6 +38,7 @@ const executeCEXorder$ = ({
3538
price,
3639
order,
3740
createOrder$,
41+
saveOrder$,
3842
}: ExecuteCEXorderParams): Observable<null> => {
3943
if (!config.TEST_MODE) {
4044
logger.info(
@@ -46,11 +50,12 @@ const executeCEXorder$ = ({
4650
side: order.side,
4751
quantity: order.quantity,
4852
}).pipe(
49-
tap(order =>
53+
tap(order => {
5054
logger.info(
5155
`Centralized exchange order finished: ${JSON.stringify(order)}`
52-
)
53-
),
56+
);
57+
saveOrder$({ order });
58+
}),
5459
catchError((e, caught) => {
5560
logger.warn(`Failed to execute CEX order: ${e}. Retrying in 1000ms`);
5661
return timer(1000).pipe(mergeMapTo(caught));

src/centralized/order.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { CEXorder } from './order-builder';
66
import BigNumber from 'bignumber.js';
77
import { Exchange } from 'ccxt';
88
import { getArbyStore } from '../store';
9+
import { OrderInstance } from '../db/order';
910

1011
let testScheduler: TestScheduler;
1112

@@ -34,6 +35,9 @@ const assertCentralizedExchangeOrder = (
3435
const centralizedExchangePrice$ = (cold(
3536
inputEvents.centralizedExchangePrice$
3637
) as unknown) as Observable<BigNumber>;
38+
const saveOrder$ = () => {
39+
return (cold('') as unknown) as Observable<OrderInstance>;
40+
};
3741
const CEX = (null as unknown) as Exchange;
3842
const deriveCEXorderQuantity = (order: any) => order;
3943
const store = getArbyStore();
@@ -46,6 +50,7 @@ const assertCentralizedExchangeOrder = (
4650
centralizedExchangePrice$,
4751
deriveCEXorderQuantity,
4852
store,
53+
saveOrder$,
4954
});
5055
expectObservable(centralizedExchangeOrder$, inputEvents.unsubscribe).toBe(
5156
expected

src/centralized/order.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import { ExecuteCEXorderParams } from './execute-order';
1919
import { quantityAboveMinimum } from './minimum-order-quantity-filter';
2020
import { CEXorder, GetOrderBuilderParams } from './order-builder';
2121
import { ArbyStore } from 'src/store';
22+
import { SaveOrderParams } from '../db/order-repository';
23+
import { OrderInstance } from '../db/order';
2224

2325
type GetCentralizedExchangeOrderParams = {
2426
CEX: Exchange;
@@ -28,6 +30,7 @@ type GetCentralizedExchangeOrderParams = {
2830
logger,
2931
price,
3032
order,
33+
saveOrder$,
3134
}: ExecuteCEXorderParams) => Observable<null>;
3235
getOrderBuilder$: ({
3336
config,
@@ -43,12 +46,14 @@ type GetCentralizedExchangeOrderParams = {
4346
config: Config
4447
) => CEXorder;
4548
store: ArbyStore;
49+
saveOrder$: ({ order }: SaveOrderParams) => Observable<OrderInstance>;
4650
};
4751

4852
const getCentralizedExchangeOrder$ = ({
4953
CEX,
5054
logger,
5155
config,
56+
saveOrder$,
5257
executeCEXorder$,
5358
getOrderBuilder$,
5459
centralizedExchangePrice$,
@@ -79,6 +84,7 @@ const getCentralizedExchangeOrder$ = ({
7984
logger,
8085
price,
8186
order: deriveCEXorderQuantity(order, price, config),
87+
saveOrder$,
8288
});
8389
})
8490
);

src/db/cex-repository.ts

Whitespace-only changes.

src/db/order-repository.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { Logger } from '../logger';
2+
import { InitDBResponse } from './db';
3+
import { from, Observable } from 'rxjs';
4+
import { Order, Trade } from 'ccxt';
5+
import { OrderAttributes, OrderInstance } from './order';
6+
import { map, mergeMap } from 'rxjs/operators';
7+
import { TradeAttributes } from './trade';
8+
9+
type SaveOrderParams = {
10+
order: Order;
11+
};
12+
13+
class OrderRepository {
14+
constructor(private models: InitDBResponse, private logger: Logger) {}
15+
16+
public saveOrder$ = ({
17+
order,
18+
}: SaveOrderParams): Observable<OrderInstance> => {
19+
this.logger.trace(`Saving order ${JSON.stringify(order)} into database`);
20+
21+
const arbyOrder = this.convertToArbyOrder(order);
22+
const arbyTrades = this.convertToArbyTrades(order.trades, order.id);
23+
// TODO async
24+
return from(this.models.Order.create(arbyOrder)).pipe(
25+
mergeMap(order => {
26+
return from(this.models.Trade.bulkCreate(arbyTrades)).pipe(
27+
map(() => {
28+
this.logger.trace(
29+
`Order with id ${order.id} has been successfully saved`
30+
);
31+
return order;
32+
})
33+
);
34+
})
35+
);
36+
};
37+
38+
private convertToArbyOrder(order: Order): OrderAttributes {
39+
return {
40+
id: order.id,
41+
datetime: order.datetime,
42+
timestamp: order.timestamp,
43+
lastTradeTimestamp: order.lastTradeTimestamp,
44+
status: order.status,
45+
symbol: order.symbol,
46+
type: order.type,
47+
side: order.side,
48+
price: order.price,
49+
average: order.average,
50+
amount: order.amount,
51+
filled: order.filled,
52+
remaining: order.remaining,
53+
cost: order.cost,
54+
info: order.info,
55+
feeType: order.fee.type,
56+
feeCurrency: order.fee.currency,
57+
feeRate: order.fee.rate,
58+
feeCost: order.fee.cost,
59+
};
60+
}
61+
62+
private convertToArbyTrades(
63+
trades: Trade[],
64+
orderId: string
65+
): TradeAttributes[] {
66+
const result: TradeAttributes[] = [];
67+
trades.forEach(trade => {
68+
result.push({
69+
id: trade.id,
70+
orderId,
71+
amount: trade.amount,
72+
datetime: trade.datetime,
73+
info: trade.info,
74+
price: trade.price,
75+
timestamp: trade.timestamp,
76+
type: trade.type,
77+
side: trade.side,
78+
symbol: trade.symbol,
79+
takerOrMaker: trade.takerOrMaker,
80+
cost: trade.cost,
81+
});
82+
});
83+
84+
return result;
85+
}
86+
}
87+
88+
export { OrderRepository, SaveOrderParams };

src/db/trade.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
} from 'sequelize';
88
import { ModelCtor } from 'sequelize/types/lib/model';
99

10-
type TradeAttributes = {
10+
export type TradeAttributes = {
1111
id: string;
1212
orderId: string;
1313
amount: number;

src/trade/trade.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { TestScheduler } from 'rxjs/testing';
66
import { getArbyStore } from '../store';
77
import { getLoggers, testConfig, TestError } from '../test-utils';
88
import { getNewTrade$ } from './trade';
9+
import { OrderInstance } from '../db/order';
910

1011
let testScheduler: TestScheduler;
1112
const testSchedulerSetup = () => {
@@ -73,6 +74,9 @@ const assertGetTrade = ({
7374
const getCentralizedExchangePrice$ = () => {
7475
return (cold('') as unknown) as Observable<BigNumber>;
7576
};
77+
const saveOrder$ = () => {
78+
return (cold('') as unknown) as Observable<OrderInstance>;
79+
};
7680
const CEX = (null as unknown) as Exchange;
7781
const store = getArbyStore();
7882
const trade$ = getNewTrade$({
@@ -85,6 +89,7 @@ const assertGetTrade = ({
8589
catchOpenDEXerror,
8690
getCentralizedExchangePrice$,
8791
store,
92+
saveOrder$,
8893
});
8994
expectObservable(trade$).toBe(expected, { a: true }, expectedError);
9095
});

src/trade/trade.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import { getCleanup$, GetCleanupParams } from './cleanup';
1616
import { getTradeInfo$ } from './info';
1717
import { getKrakenPrice$ } from '../centralized/kraken-price';
1818
import { getBinancePrice$ } from '../centralized/binance-price';
19+
import { SaveOrderParams } from '../db/order-repository';
20+
import { OrderInstance } from '../db/order';
1921

2022
type GetTradeParams = {
2123
config: Config;
@@ -49,6 +51,7 @@ type GetTradeParams = {
4951
}: CentralizedExchangePriceParams) => Observable<BigNumber>;
5052
CEX: Exchange;
5153
store: ArbyStore;
54+
saveOrder$: ({ order }: SaveOrderParams) => Observable<OrderInstance>;
5255
};
5356

5457
const getNewTrade$ = ({
@@ -61,6 +64,7 @@ const getNewTrade$ = ({
6164
getCentralizedExchangePrice$,
6265
CEX,
6366
store,
67+
saveOrder$,
6468
}: GetTradeParams): Observable<boolean> => {
6569
const centralizedExchangePrice$ = getCentralizedExchangePrice$({
6670
config,
@@ -90,6 +94,7 @@ const getNewTrade$ = ({
9094
centralizedExchangePrice$,
9195
deriveCEXorderQuantity,
9296
store,
97+
saveOrder$,
9398
})
9499
).pipe(
95100
tap(() => {

0 commit comments

Comments
 (0)