Skip to content

Commit 7ba3451

Browse files
committed
feat: separate multi grpc subscriber into v2 and test script
1 parent 999287d commit 7ba3451

File tree

6 files changed

+374
-142
lines changed

6 files changed

+374
-142
lines changed

sdk/scripts/client-test.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { DriftClient } from '../src/driftClient';
2+
import { grpcDriftClientAccountSubscriberV2 } from '../src/accounts/grpcDriftClientAccountSubscriberV2';
3+
import { Connection, Keypair, PublicKey } from '@solana/web3.js';
4+
import { DriftClientConfig } from '../src/driftClientConfig';
5+
import { decodeName, DRIFT_PROGRAM_ID, Wallet } from '../src';
6+
import { CommitmentLevel } from '@triton-one/yellowstone-grpc';
7+
import dotenv from 'dotenv';
8+
9+
const GRPC_ENDPOINT = process.env.GRPC_ENDPOINT;
10+
const TOKEN = process.env.TOKEN;
11+
12+
async function initializeGrpcDriftClientV2() {
13+
const connection = new Connection('https://api.mainnet-beta.solana.com');
14+
const wallet = new Wallet(new Keypair());
15+
dotenv.config({ path: '../' });
16+
const config: DriftClientConfig = {
17+
connection,
18+
wallet,
19+
programID: new PublicKey(DRIFT_PROGRAM_ID),
20+
accountSubscription: {
21+
type: 'grpc',
22+
grpcConfigs: {
23+
endpoint: GRPC_ENDPOINT,
24+
token: TOKEN,
25+
commitmentLevel: 'confirmed' as unknown as CommitmentLevel,
26+
channelOptions: {
27+
'grpc.keepalive_time_ms': 10_000,
28+
'grpc.keepalive_timeout_ms': 1_000,
29+
'grpc.keepalive_permit_without_calls': 1,
30+
},
31+
},
32+
driftClientAccountSubscriber: grpcDriftClientAccountSubscriberV2,
33+
},
34+
perpMarketIndexes: [0, 1, 2], // Example market indexes
35+
spotMarketIndexes: [0, 1, 2], // Example market indexes
36+
oracleInfos: [], // Add oracle information if needed
37+
};
38+
39+
const driftClient = new DriftClient(config);
40+
41+
let perpMarketUpdateCount = 0;
42+
let spotMarketUpdateCount = 0;
43+
let oraclePriceUpdateCount = 0;
44+
let userAccountUpdateCount = 0;
45+
46+
const updatePromise = new Promise<void>((resolve) => {
47+
driftClient.accountSubscriber.eventEmitter.on('perpMarketAccountUpdate', (data) => {
48+
console.log('Perp market account update:', decodeName(data.name));
49+
perpMarketUpdateCount++;
50+
if (perpMarketUpdateCount >= 10 && spotMarketUpdateCount >= 10 && oraclePriceUpdateCount >= 10 && userAccountUpdateCount >= 2) {
51+
resolve();
52+
}
53+
});
54+
55+
driftClient.accountSubscriber.eventEmitter.on('spotMarketAccountUpdate', (data) => {
56+
console.log('Spot market account update:', decodeName(data.name));
57+
spotMarketUpdateCount++;
58+
if (perpMarketUpdateCount >= 10 && spotMarketUpdateCount >= 10 && oraclePriceUpdateCount >= 10 && userAccountUpdateCount >= 2) {
59+
resolve();
60+
}
61+
});
62+
63+
driftClient.accountSubscriber.eventEmitter.on('oraclePriceUpdate', (data) => {
64+
console.log('Oracle price update:', data.toBase58());
65+
oraclePriceUpdateCount++;
66+
if (perpMarketUpdateCount >= 10 && spotMarketUpdateCount >= 10 && oraclePriceUpdateCount >= 10 && userAccountUpdateCount >= 2) {
67+
resolve();
68+
}
69+
});
70+
71+
driftClient.accountSubscriber.eventEmitter.on('userAccountUpdate', (data) => {
72+
console.log('User account update:', decodeName(data.name));
73+
userAccountUpdateCount++;
74+
if (perpMarketUpdateCount >= 10 && spotMarketUpdateCount >= 10 && oraclePriceUpdateCount >= 10 && userAccountUpdateCount >= 2) {
75+
resolve();
76+
}
77+
});
78+
});
79+
80+
await driftClient.subscribe();
81+
console.log('DriftClient initialized and listening for updates.');
82+
83+
await updatePromise;
84+
console.log('Received required number of updates.');
85+
}
86+
87+
initializeDriftClient().catch(console.error);

sdk/src/accounts/grpcDriftClientAccountSubscriber.ts

Lines changed: 5 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { WebSocketDriftClientAccountSubscriber } from './webSocketDriftClientAccountSubscriber';
22
import { OracleInfo, OraclePriceData } from '../oracles/types';
33
import { Program } from '@coral-xyz/anchor';
4-
import { PublicKey } from '@solana/web3.js';
54
import { findAllMarketAndOracles } from '../config';
65
import {
76
getDriftStateAccountPublicKey,
@@ -10,15 +9,11 @@ import {
109
} from '../addresses/pda';
1110
import { DelistedMarketSetting, GrpcConfigs, ResubOpts } from './types';
1211
import { grpcAccountSubscriber } from './grpcAccountSubscriber';
13-
import { grpcMultiAccountSubscriber } from './grpcMultiAccountSubscriber';
1412
import { PerpMarketAccount, SpotMarketAccount, StateAccount } from '../types';
1513
import { getOracleId } from '../oracles/oracleId';
1614

17-
export class gprcDriftClientAccountSubscriber extends WebSocketDriftClientAccountSubscriber {
15+
export class grpcDriftClientAccountSubscriber extends WebSocketDriftClientAccountSubscriber {
1816
private grpcConfigs: GrpcConfigs;
19-
private perpMarketsSubscriber?: grpcMultiAccountSubscriber<PerpMarketAccount>;
20-
private spotMarketsSubscriber?: grpcMultiAccountSubscriber<SpotMarketAccount>;
21-
private oracleMultiSubscriber?: grpcMultiAccountSubscriber<OraclePriceData>;
2217

2318
constructor(
2419
grpcConfigs: GrpcConfigs,
@@ -99,10 +94,12 @@ export class gprcDriftClientAccountSubscriber extends WebSocketDriftClientAccoun
9994
// set initial data to avoid spamming getAccountInfo calls in webSocketAccountSubscriber
10095
await this.setInitialData();
10196

102-
// subscribe to perp + spot markets (separate) and oracles
10397
await Promise.all([
98+
// subscribe to market accounts
10499
this.subscribeToPerpMarketAccounts(),
100+
// subscribe to spot market accounts
105101
this.subscribeToSpotMarketAccounts(),
102+
// subscribe to oracles
106103
this.subscribeToOracles(),
107104
]);
108105

@@ -123,128 +120,6 @@ export class gprcDriftClientAccountSubscriber extends WebSocketDriftClientAccoun
123120
return true;
124121
}
125122

126-
override async subscribeToPerpMarketAccounts(): Promise<boolean> {
127-
const perpMarketPubkeys = await Promise.all(
128-
this.perpMarketIndexes.map((marketIndex) =>
129-
getPerpMarketPublicKey(this.program.programId, marketIndex)
130-
)
131-
);
132-
133-
this.perpMarketsSubscriber =
134-
await grpcMultiAccountSubscriber.create<PerpMarketAccount>(
135-
this.grpcConfigs,
136-
'PerpMarket',
137-
this.program,
138-
undefined,
139-
this.resubOpts
140-
);
141-
await this.perpMarketsSubscriber.subscribe(
142-
perpMarketPubkeys,
143-
(_accountId, data) => {
144-
this.eventEmitter.emit(
145-
'perpMarketAccountUpdate',
146-
data as PerpMarketAccount
147-
);
148-
this.eventEmitter.emit('update');
149-
}
150-
);
151-
152-
return true;
153-
}
154-
155-
override async subscribeToSpotMarketAccounts(): Promise<boolean> {
156-
const spotMarketPubkeys = await Promise.all(
157-
this.spotMarketIndexes.map((marketIndex) =>
158-
getSpotMarketPublicKey(this.program.programId, marketIndex)
159-
)
160-
);
161-
162-
this.spotMarketsSubscriber =
163-
await grpcMultiAccountSubscriber.create<SpotMarketAccount>(
164-
this.grpcConfigs,
165-
'SpotMarket',
166-
this.program,
167-
undefined,
168-
this.resubOpts
169-
);
170-
await this.spotMarketsSubscriber.subscribe(
171-
spotMarketPubkeys,
172-
(_accountId, data) => {
173-
this.eventEmitter.emit(
174-
'spotMarketAccountUpdate',
175-
data as SpotMarketAccount
176-
);
177-
this.eventEmitter.emit('update');
178-
}
179-
);
180-
181-
return true;
182-
}
183-
184-
override async subscribeToOracles(): Promise<boolean> {
185-
// Build list of unique oracle pubkeys and a lookup for sources
186-
const uniqueOraclePubkeys = new Map<string, OracleInfo>();
187-
for (const info of this.oracleInfos) {
188-
const id = getOracleId(info.publicKey, info.source);
189-
if (
190-
!uniqueOraclePubkeys.has(id) &&
191-
!info.publicKey.equals((PublicKey as any).default)
192-
) {
193-
uniqueOraclePubkeys.set(id, info);
194-
}
195-
}
196-
197-
const oraclePubkeys = Array.from(uniqueOraclePubkeys.values()).map(
198-
(i) => i.publicKey
199-
);
200-
const pubkeyToSource = new Map<string, OracleInfo['source']>(
201-
Array.from(uniqueOraclePubkeys.values()).map((i) => [
202-
i.publicKey.toBase58(),
203-
i.source,
204-
])
205-
);
206-
207-
this.oracleMultiSubscriber =
208-
await grpcMultiAccountSubscriber.create<OraclePriceData>(
209-
this.grpcConfigs,
210-
'oracle',
211-
this.program,
212-
(buffer: Buffer, pubkey?: string) => {
213-
if (!pubkey) {
214-
throw new Error('Oracle pubkey missing in decode');
215-
}
216-
const source = pubkeyToSource.get(pubkey);
217-
const client = this.oracleClientCache.get(
218-
source,
219-
this.program.provider.connection,
220-
this.program
221-
);
222-
return client.getOraclePriceDataFromBuffer(buffer);
223-
},
224-
this.resubOpts
225-
);
226-
227-
await this.oracleMultiSubscriber.subscribe(
228-
oraclePubkeys,
229-
(accountId, data) => {
230-
const source = pubkeyToSource.get(accountId.toBase58());
231-
this.eventEmitter.emit('oraclePriceUpdate', accountId, source, data);
232-
this.eventEmitter.emit('update');
233-
}
234-
);
235-
236-
return true;
237-
}
238-
239-
async unsubscribeFromOracles(): Promise<void> {
240-
if (this.oracleMultiSubscriber) {
241-
await this.oracleMultiSubscriber.unsubscribe();
242-
this.oracleMultiSubscriber = undefined;
243-
return;
244-
}
245-
await super.unsubscribeFromOracles();
246-
}
247-
248123
override async subscribeToSpotMarketAccount(
249124
marketIndex: number
250125
): Promise<boolean> {
@@ -329,4 +204,4 @@ export class gprcDriftClientAccountSubscriber extends WebSocketDriftClientAccoun
329204
this.oracleSubscribers.set(oracleId, accountSubscriber);
330205
return true;
331206
}
332-
}
207+
}

0 commit comments

Comments
 (0)