Skip to content

Commit 4e5caba

Browse files
author
Karl Ranna
authored
pull price feed from Kraken (#112)
1 parent 103a93b commit 4e5caba

File tree

6 files changed

+220
-46
lines changed

6 files changed

+220
-46
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`getCentralizedExchangePrice$ errors for unknown exchange 1`] = `"Could not get price feed for unknown exchange: ASDF"`;

src/centralized/binance-price.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import BigNumber from 'bignumber.js';
2+
import { Observable, throwError } from 'rxjs';
3+
import { catchError, share, distinctUntilChanged } from 'rxjs/operators';
4+
import WebSocket from 'ws';
5+
import { errors } from '../opendex/errors';
6+
import { GetPriceParams } from './exchange-price';
7+
8+
const getBinancePrice$ = ({
9+
config,
10+
logger,
11+
}: GetPriceParams): Observable<BigNumber> => {
12+
const priceObservable: Observable<BigNumber> = new Observable(observer => {
13+
const tradingPair = `${config.CEX_BASEASSET}${config.CEX_QUOTEASSET}`;
14+
const url = `wss://stream.binance.com:9443/ws/${tradingPair.toLowerCase()}@aggTrade`;
15+
const socket = new WebSocket(url);
16+
socket.onopen = () => {
17+
logger.trace(`${tradingPair} established connection to ${url}`);
18+
};
19+
socket.on('error', e => {
20+
observer.error(e);
21+
});
22+
const heartbeat = () => {
23+
logger.trace(`heartbeat from ${tradingPair} socket`);
24+
};
25+
socket.onclose = (event: WebSocket.CloseEvent) => {
26+
if (event.reason) {
27+
logger.trace(
28+
`${tradingPair} stream closed with reason: ${event.reason}`
29+
);
30+
} else {
31+
logger.trace(`${tradingPair} stream closed`);
32+
}
33+
};
34+
socket.on('ping', heartbeat);
35+
socket.on('open', heartbeat);
36+
socket.onmessage = (event: WebSocket.MessageEvent) => {
37+
const aggTrade = JSON.parse(event.data.toString());
38+
const { p: priceString } = aggTrade;
39+
const price = new BigNumber(priceString);
40+
observer.next(price);
41+
};
42+
return () => {
43+
socket.terminate();
44+
};
45+
});
46+
return priceObservable.pipe(
47+
catchError(() => {
48+
return throwError(errors.CENTRALIZED_EXCHANGE_PRICE_FEED_ERROR);
49+
}),
50+
distinctUntilChanged((a, b) => a.isEqualTo(b)),
51+
share()
52+
);
53+
};
54+
55+
export { getBinancePrice$ };
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { testConfig, getLoggers } from '../test-utils';
2+
import { getCentralizedExchangePrice$ } from './exchange-price';
3+
4+
describe('getCentralizedExchangePrice$', () => {
5+
it('returns Binance price stream', () => {
6+
const config = {
7+
...testConfig(),
8+
...{ CEX: 'BINANCE' },
9+
};
10+
const getBinancePrice$ = jest.fn();
11+
const getKrakenPrice$ = jest.fn();
12+
const logger = getLoggers().global;
13+
getCentralizedExchangePrice$({
14+
config,
15+
logger,
16+
getBinancePrice$,
17+
getKrakenPrice$,
18+
});
19+
expect(getBinancePrice$).toHaveBeenCalledTimes(1);
20+
});
21+
22+
it('returns Kraken price stream', () => {
23+
const config = {
24+
...testConfig(),
25+
...{ CEX: 'KRAKEN' },
26+
};
27+
const getBinancePrice$ = jest.fn();
28+
const getKrakenPrice$ = jest.fn();
29+
const logger = getLoggers().global;
30+
getCentralizedExchangePrice$({
31+
config,
32+
logger,
33+
getBinancePrice$,
34+
getKrakenPrice$,
35+
});
36+
expect(getKrakenPrice$).toHaveBeenCalledTimes(1);
37+
});
38+
39+
it('errors for unknown exchange', () => {
40+
const config = {
41+
...testConfig(),
42+
...{ CEX: 'ASDF' },
43+
};
44+
const getBinancePrice$ = jest.fn();
45+
const getKrakenPrice$ = jest.fn();
46+
const logger = getLoggers().global;
47+
expect(() => {
48+
getCentralizedExchangePrice$({
49+
config,
50+
logger,
51+
getBinancePrice$,
52+
getKrakenPrice$,
53+
});
54+
}).toThrowErrorMatchingSnapshot();
55+
});
56+
});

src/centralized/exchange-price.ts

Lines changed: 31 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,46 @@
11
import BigNumber from 'bignumber.js';
2-
import { Observable, throwError } from 'rxjs';
3-
import { catchError, share, distinctUntilChanged } from 'rxjs/operators';
4-
import WebSocket from 'ws';
2+
import { Observable } from 'rxjs';
53
import { Config } from '../config';
64
import { Logger } from '../logger';
7-
import { errors } from '../opendex/errors';
5+
6+
type GetPriceParams = {
7+
config: Config;
8+
logger: Logger;
9+
};
810

911
type CentralizedExchangePriceParams = {
1012
config: Config;
1113
logger: Logger;
14+
getKrakenPrice$: ({
15+
config,
16+
logger,
17+
}: GetPriceParams) => Observable<BigNumber>;
18+
getBinancePrice$: ({
19+
config,
20+
logger,
21+
}: GetPriceParams) => Observable<BigNumber>;
1222
};
1323

1424
const getCentralizedExchangePrice$ = ({
1525
config,
1626
logger,
27+
getKrakenPrice$,
28+
getBinancePrice$,
1729
}: CentralizedExchangePriceParams): Observable<BigNumber> => {
18-
const priceObservable: Observable<BigNumber> = new Observable(observer => {
19-
const tradingPair = `${config.CEX_BASEASSET}${config.CEX_QUOTEASSET}`;
20-
const url = `wss://stream.binance.com:9443/ws/${tradingPair.toLowerCase()}@aggTrade`;
21-
const socket = new WebSocket(url);
22-
socket.onopen = () => {
23-
logger.trace(`${tradingPair} established connection to ${url}`);
24-
};
25-
socket.on('error', e => {
26-
observer.error(e);
27-
});
28-
const heartbeat = () => {
29-
logger.trace(`heartbeat from ${tradingPair} socket`);
30-
};
31-
socket.onclose = (event: WebSocket.CloseEvent) => {
32-
if (event.reason) {
33-
logger.trace(
34-
`${tradingPair} stream closed with reason: ${event.reason}`
35-
);
36-
} else {
37-
logger.trace(`${tradingPair} stream closed`);
38-
}
39-
};
40-
socket.on('ping', heartbeat);
41-
socket.on('open', heartbeat);
42-
socket.onmessage = (event: WebSocket.MessageEvent) => {
43-
const aggTrade = JSON.parse(event.data.toString());
44-
const { p: priceString } = aggTrade;
45-
const price = new BigNumber(priceString);
46-
observer.next(price);
47-
};
48-
return () => {
49-
socket.terminate();
50-
};
51-
});
52-
return priceObservable.pipe(
53-
catchError(() => {
54-
return throwError(errors.CENTRALIZED_EXCHANGE_PRICE_FEED_ERROR);
55-
}),
56-
distinctUntilChanged((a, b) => a.isEqualTo(b)),
57-
share()
58-
);
30+
switch (config.CEX) {
31+
case 'BINANCE':
32+
return getBinancePrice$({ config, logger });
33+
case 'KRAKEN':
34+
return getKrakenPrice$({ config, logger });
35+
default:
36+
throw new Error(
37+
`Could not get price feed for unknown exchange: ${config.CEX}`
38+
);
39+
}
5940
};
6041

61-
export { getCentralizedExchangePrice$, CentralizedExchangePriceParams };
42+
export {
43+
getCentralizedExchangePrice$,
44+
CentralizedExchangePriceParams,
45+
GetPriceParams,
46+
};

src/centralized/kraken-price.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import BigNumber from 'bignumber.js';
2+
import { Observable, throwError } from 'rxjs';
3+
import { catchError, distinctUntilChanged, share } from 'rxjs/operators';
4+
import WebSocket from 'ws';
5+
import { errors } from '../opendex/errors';
6+
import { GetPriceParams } from './exchange-price';
7+
8+
const getKrakenPrice$ = ({
9+
config,
10+
logger,
11+
}: GetPriceParams): Observable<BigNumber> => {
12+
const priceObservable: Observable<BigNumber> = new Observable(observer => {
13+
const tradingPair = `${config.CEX_BASEASSET}/${config.CEX_QUOTEASSET}`;
14+
const url = 'wss://ws.kraken.com';
15+
const socket = new WebSocket(url);
16+
socket.onopen = () => {
17+
logger.trace(`${tradingPair} established connection to ${url}`);
18+
};
19+
socket.on('error', e => {
20+
observer.error(e);
21+
});
22+
const heartbeat = () => {
23+
logger.trace(`heartbeat from ${tradingPair} socket`);
24+
};
25+
socket.onclose = (event: WebSocket.CloseEvent) => {
26+
if (event.reason) {
27+
logger.trace(
28+
`${tradingPair} stream closed with reason: ${event.reason}`
29+
);
30+
} else {
31+
logger.trace(`${tradingPair} stream closed`);
32+
}
33+
};
34+
socket.on('ping', heartbeat);
35+
socket.on('open', () => {
36+
const subscribeTrade = {
37+
event: 'subscribe',
38+
pair: [tradingPair],
39+
subscription: {
40+
name: 'trade',
41+
},
42+
};
43+
socket.send(JSON.stringify(subscribeTrade));
44+
heartbeat();
45+
});
46+
let channelID = -1;
47+
socket.onmessage = (event: WebSocket.MessageEvent) => {
48+
const eventData = JSON.parse(event.data.toString());
49+
if (eventData.channelID) {
50+
channelID = eventData.channelID;
51+
}
52+
if (eventData[0] === channelID) {
53+
const priceString = eventData[1][0][0];
54+
const price = new BigNumber(priceString);
55+
observer.next(price);
56+
}
57+
};
58+
return () => {
59+
socket.terminate();
60+
};
61+
});
62+
return priceObservable.pipe(
63+
catchError(() => {
64+
return throwError(errors.CENTRALIZED_EXCHANGE_PRICE_FEED_ERROR);
65+
}),
66+
distinctUntilChanged((a, b) => a.isEqualTo(b)),
67+
share()
68+
);
69+
};
70+
71+
export { getKrakenPrice$ };

src/trade/trade.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import { createOpenDEXorders$ } from '../opendex/create-orders';
1414
import { ArbyStore } from '../store';
1515
import { getCleanup$, GetCleanupParams } from './cleanup';
1616
import { getTradeInfo$ } from './info';
17+
import { getKrakenPrice$ } from '../centralized/kraken-price';
18+
import { getBinancePrice$ } from '../centralized/binance-price';
1719

1820
type GetTradeParams = {
1921
config: Config;
@@ -63,6 +65,8 @@ const getNewTrade$ = ({
6365
const centralizedExchangePrice$ = getCentralizedExchangePrice$({
6466
config,
6567
logger: loggers.centralized,
68+
getBinancePrice$,
69+
getKrakenPrice$,
6670
});
6771
return merge(
6872
getOpenDEXcomplete$({

0 commit comments

Comments
 (0)