Skip to content

Commit 13f0a81

Browse files
committed
feat: new grpc call for subscribring alerts such as low balance (#864)
1 parent 74a59dc commit 13f0a81

File tree

18 files changed

+1871
-351
lines changed

18 files changed

+1871
-351
lines changed

docs/api.md

Lines changed: 75 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { ServiceError, status } from 'grpc';
2+
import { Arguments, Argv } from 'yargs';
3+
import { XudClient } from '../../proto/xudrpc_grpc_pb';
4+
import * as xudrpc from '../../proto/xudrpc_pb';
5+
import { setTimeoutPromise } from '../../utils/utils';
6+
import { loadXudClient } from '../command';
7+
import { AlertType } from '../../constants/enums';
8+
9+
export const command = 'subscribealerts [--pretty]';
10+
11+
export const describe = 'subscribe alerts such as low balance';
12+
13+
export const builder = (argv: Argv) => argv
14+
.option('pretty', {
15+
type: 'boolean',
16+
})
17+
.example('$0 subscribealerts', 'prints alert payload in a JSON structure')
18+
.example('$0 closechannel --pretty', 'prints alert message only');
19+
20+
export const handler = async (argv: Arguments) => {
21+
await ensureConnection(argv, true);
22+
};
23+
24+
let client: XudClient;
25+
26+
const ensureConnection = async (argv: Arguments, printError?: boolean) => {
27+
if (!client) {
28+
client = await loadXudClient(argv);
29+
}
30+
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
31+
if (error) {
32+
if (error.message === 'Failed to connect before the deadline') {
33+
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
34+
process.exit(1);
35+
}
36+
37+
if (printError) console.error(`${error.name}: ${error.message}`);
38+
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
39+
} else {
40+
console.log('Successfully connected, subscribing for alerts');
41+
subscribeAlerts(argv);
42+
}
43+
});
44+
};
45+
46+
const subscribeAlerts = (argv: Arguments<any>) => {
47+
const request = new xudrpc.SubscribeAlertsRequest();
48+
const alertsSubscription = client.subscribeAlerts(request);
49+
50+
alertsSubscription.on('data', (alert: xudrpc.Alert) => {
51+
if (argv.pretty) {
52+
console.log(`${AlertType[alert.getType()]}: ${alert.getMessage()}`);
53+
} else {
54+
console.log(JSON.stringify(alert, undefined, 2));
55+
}
56+
});
57+
alertsSubscription.on('end', reconnect.bind(undefined, argv));
58+
alertsSubscription.on('error', async (err: ServiceError) => {
59+
if (err.code === status.UNIMPLEMENTED) {
60+
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
61+
process.exit(1);
62+
}
63+
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
64+
await setTimeoutPromise(1000);
65+
await ensureConnection(argv);
66+
});
67+
};
68+
69+
const reconnect = async (argv: Arguments) => {
70+
console.log('Stream has closed, trying to reconnect');
71+
await ensureConnection(argv, false);
72+
};

lib/connextclient/ConnextClient.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import assert from 'assert';
22
import http from 'http';
3-
import { SwapClientType, SwapRole, SwapState } from '../constants/enums';
3+
import { ChannelBalanceSide, SwapClientType, SwapRole, SwapState } from '../constants/enums';
44
import { CurrencyInstance } from '../db/types';
55
import { XudError } from '../types';
66
import Logger from '../Logger';
@@ -14,7 +14,7 @@ import SwapClient, {
1414
PaymentStatus,
1515
WithdrawArguments,
1616
} from '../swaps/SwapClient';
17-
import { SwapDeal, CloseChannelParams, OpenChannelParams, SwapCapacities } from '../swaps/types';
17+
import { SwapDeal, CloseChannelParams, OpenChannelParams, SwapCapacities, ChannelBalanceAlert } from '../swaps/types';
1818
import { UnitConverter } from '../utils/UnitConverter';
1919
import errors, { errorCodes } from './errors';
2020
import {
@@ -46,13 +46,15 @@ interface ConnextClient {
4646
on(event: 'htlcAccepted', listener: (rHash: string, amount: number, currency: string) => void): this;
4747
on(event: 'connectionVerified', listener: (swapClientInfo: SwapClientInfo) => void): this;
4848
on(event: 'depositConfirmed', listener: (hash: string) => void): this;
49+
on(event: 'lowBalance', listener: (alert: ChannelBalanceAlert) => void): this;
4950
once(event: 'initialized', listener: () => void): this;
5051
emit(event: 'htlcAccepted', rHash: string, amount: number, currency: string): boolean;
5152
emit(event: 'connectionVerified', swapClientInfo: SwapClientInfo): boolean;
5253
emit(event: 'initialized'): boolean;
5354
emit(event: 'preimage', preimageRequest: ProvidePreimageEvent): void;
5455
emit(event: 'transferReceived', transferReceivedRequest: TransferReceivedEvent): void;
5556
emit(event: 'depositConfirmed', hash: string): void;
57+
emit(event: 'lowBalance', alert: ChannelBalanceAlert): boolean;
5658
}
5759

5860
/**
@@ -332,6 +334,35 @@ class ConnextClient extends SwapClient {
332334
channelBalancePromises.push(this.channelBalance(currency));
333335
}
334336
await Promise.all(channelBalancePromises);
337+
338+
for (const [currency, address] of this.tokenAddresses) {
339+
const remoteBalance = this.inboundAmounts.get(currency) || 0;
340+
const localBalance = this.outboundAmounts.get(currency) || 0;
341+
const totalBalance = remoteBalance + localBalance;
342+
const alertThreshold = totalBalance * 0.1;
343+
344+
if (localBalance < alertThreshold) {
345+
this.emit('lowBalance', {
346+
totalBalance,
347+
currency,
348+
channelPoint: address,
349+
side: ChannelBalanceSide.Local,
350+
sideBalance: localBalance,
351+
bound: 10,
352+
});
353+
}
354+
355+
if (remoteBalance < alertThreshold) {
356+
this.emit('lowBalance', {
357+
totalBalance,
358+
currency,
359+
channelPoint: address,
360+
side: ChannelBalanceSide.Remote,
361+
sideBalance: remoteBalance,
362+
bound: 10,
363+
});
364+
}
365+
}
335366
} catch (e) {
336367
this.logger.error('failed to update total outbound capacity', e);
337368
}

lib/constants/enums.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,12 @@ export enum DisconnectionReason {
180180
AuthFailureInvalidSignature = 12,
181181
WireProtocolErr = 13,
182182
}
183+
184+
export enum AlertType {
185+
LowBalance = 0,
186+
}
187+
188+
export enum ChannelBalanceSide {
189+
Remote = 0,
190+
Local = 1,
191+
}

lib/grpc/GrpcService.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
import grpc, { ServerWritableStream, status } from 'grpc';
33
import { fromEvent } from 'rxjs';
44
import { take } from 'rxjs/operators';
5-
import { SwapFailureReason } from '../constants/enums';
5+
import { AlertType, SwapFailureReason } from '../constants/enums';
66
import { LndInfo } from '../lndclient/types';
77
import { isOwnOrder, Order, OrderPortion, PlaceOrderEventType, PlaceOrderResult } from '../orderbook/types';
88
import * as xudrpc from '../proto/xudrpc_pb';
99
import Service from '../service/Service';
1010
import { ServiceOrder, ServicePlaceOrderEvent } from '../service/types';
11-
import { SwapAccepted, SwapFailure, SwapSuccess } from '../swaps/types';
11+
import { ChannelBalanceAlert, SwapAccepted, SwapFailure, SwapSuccess } from '../swaps/types';
1212
import getGrpcError from './getGrpcError';
1313

1414
/**
@@ -870,6 +870,31 @@ class GrpcService {
870870
}
871871
}
872872

873+
/*
874+
* See [[Service.subscribeAlerts]]
875+
*/
876+
public subscribeAlerts: grpc.handleServerStreamingCall<xudrpc.SubscribeAlertsRequest, xudrpc.Alert> = (call) => {
877+
if (!this.isReady(this.service, call)) {
878+
return;
879+
}
880+
881+
const cancelled$ = getCancelled$(call);
882+
this.service.subscribeAlerts((type: AlertType, message: string, payload: ChannelBalanceAlert) => {
883+
const alert = new xudrpc.Alert();
884+
alert.setType(type as number);
885+
alert.setMessage(message);
886+
const channelBalanceAlert = new xudrpc.ChannelBalanceAlert();
887+
channelBalanceAlert.setBound(payload.bound);
888+
channelBalanceAlert.setChannelPoint(payload.channelPoint);
889+
channelBalanceAlert.setSide(payload.side as number);
890+
channelBalanceAlert.setSideBalance(payload.sideBalance);
891+
channelBalanceAlert.setTotalBalance(payload.totalBalance);
892+
alert.setBalanceAlert(channelBalanceAlert);
893+
call.write(alert);
894+
},
895+
cancelled$);
896+
}
897+
873898
/*
874899
* See [[Service.subscribeOrders]]
875900
*/

0 commit comments

Comments
 (0)