Skip to content

Commit 04243e9

Browse files
authored
fix: cleanup potential mem leaks on grpc v2 (#1963)
* fix: cleanup potential mem leaks on grpc v2 * fix: lint and prettier * fix: lint again * feat: higher load grpc client test * fix: lint
1 parent 7c34e42 commit 04243e9

File tree

3 files changed

+269
-4
lines changed

3 files changed

+269
-4
lines changed
File renamed without changes.
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import { DriftClient } from '../src/driftClient';
2+
import { grpcDriftClientAccountSubscriberV2 } from '../src/accounts/grpcDriftClientAccountSubscriberV2';
3+
import { Connection, Keypair, PublicKey } from '@solana/web3.js';
4+
import { DriftClientConfig } from '../src/driftClientConfig';
5+
import {
6+
DRIFT_PROGRAM_ID,
7+
PerpMarketAccount,
8+
SpotMarketAccount,
9+
Wallet,
10+
OracleInfo,
11+
decodeName,
12+
} from '../src';
13+
import { CommitmentLevel } from '@triton-one/yellowstone-grpc';
14+
import dotenv from 'dotenv';
15+
import {
16+
AnchorProvider,
17+
Idl,
18+
Program,
19+
ProgramAccount,
20+
} from '@coral-xyz/anchor';
21+
import driftIDL from '../src/idl/drift.json';
22+
23+
const GRPC_ENDPOINT = process.env.GRPC_ENDPOINT;
24+
const TOKEN = process.env.TOKEN;
25+
const RPC_ENDPOINT = process.env.RPC_ENDPOINT;
26+
27+
async function initializeSingleGrpcClient() {
28+
console.log('🚀 Initializing single gRPC Drift Client...');
29+
30+
const connection = new Connection(RPC_ENDPOINT);
31+
const wallet = new Wallet(new Keypair());
32+
dotenv.config({ path: '../' });
33+
34+
const programId = new PublicKey(DRIFT_PROGRAM_ID);
35+
const provider = new AnchorProvider(
36+
connection,
37+
// @ts-ignore
38+
wallet,
39+
{
40+
commitment: 'processed',
41+
}
42+
);
43+
44+
const program = new Program(driftIDL as Idl, programId, provider);
45+
46+
// Get perp market accounts
47+
const allPerpMarketProgramAccounts =
48+
(await program.account.perpMarket.all()) as ProgramAccount<PerpMarketAccount>[];
49+
const perpMarketProgramAccounts = allPerpMarketProgramAccounts.filter((val) =>
50+
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15].includes(
51+
val.account.marketIndex
52+
)
53+
);
54+
const perpMarketIndexes = perpMarketProgramAccounts.map(
55+
(val) => val.account.marketIndex
56+
);
57+
58+
// Get spot market accounts
59+
const allSpotMarketProgramAccounts =
60+
(await program.account.spotMarket.all()) as ProgramAccount<SpotMarketAccount>[];
61+
const spotMarketProgramAccounts = allSpotMarketProgramAccounts.filter((val) =>
62+
[0, 1, 2, 3, 4, 5].includes(val.account.marketIndex)
63+
);
64+
const spotMarketIndexes = spotMarketProgramAccounts.map(
65+
(val) => val.account.marketIndex
66+
);
67+
68+
// Get oracle infos
69+
const seen = new Set<string>();
70+
const oracleInfos: OracleInfo[] = [];
71+
for (const acct of perpMarketProgramAccounts) {
72+
const key = `${acct.account.amm.oracle.toBase58()}-${
73+
Object.keys(acct.account.amm.oracleSource)[0]
74+
}`;
75+
if (!seen.has(key)) {
76+
seen.add(key);
77+
oracleInfos.push({
78+
publicKey: acct.account.amm.oracle,
79+
source: acct.account.amm.oracleSource,
80+
});
81+
}
82+
}
83+
for (const acct of spotMarketProgramAccounts) {
84+
const key = `${acct.account.oracle.toBase58()}-${
85+
Object.keys(acct.account.oracleSource)[0]
86+
}`;
87+
if (!seen.has(key)) {
88+
seen.add(key);
89+
oracleInfos.push({
90+
publicKey: acct.account.oracle,
91+
source: acct.account.oracleSource,
92+
});
93+
}
94+
}
95+
96+
console.log(`📊 Markets: ${perpMarketIndexes.length} perp, ${spotMarketIndexes.length} spot`);
97+
console.log(`🔮 Oracles: ${oracleInfos.length}`);
98+
99+
const baseAccountSubscription = {
100+
type: 'grpc' as const,
101+
grpcConfigs: {
102+
endpoint: GRPC_ENDPOINT,
103+
token: TOKEN,
104+
commitmentLevel: CommitmentLevel.PROCESSED,
105+
channelOptions: {
106+
'grpc.keepalive_time_ms': 10_000,
107+
'grpc.keepalive_timeout_ms': 1_000,
108+
'grpc.keepalive_permit_without_calls': 1,
109+
},
110+
},
111+
};
112+
113+
const config: DriftClientConfig = {
114+
connection,
115+
wallet,
116+
programID: new PublicKey(DRIFT_PROGRAM_ID),
117+
accountSubscription: {
118+
...baseAccountSubscription,
119+
driftClientAccountSubscriber: grpcDriftClientAccountSubscriberV2,
120+
},
121+
perpMarketIndexes,
122+
spotMarketIndexes,
123+
oracleInfos,
124+
};
125+
126+
const client = new DriftClient(config);
127+
128+
// Set up event listeners
129+
const eventCounts = {
130+
stateAccountUpdate: 0,
131+
perpMarketAccountUpdate: 0,
132+
spotMarketAccountUpdate: 0,
133+
oraclePriceUpdate: 0,
134+
update: 0,
135+
};
136+
137+
console.log('🎧 Setting up event listeners...');
138+
139+
client.eventEmitter.on('stateAccountUpdate', (_data) => {
140+
eventCounts.stateAccountUpdate++;
141+
});
142+
143+
client.eventEmitter.on('perpMarketAccountUpdate', (_data) => {
144+
eventCounts.perpMarketAccountUpdate++;
145+
});
146+
147+
client.eventEmitter.on('spotMarketAccountUpdate', (_data) => {
148+
eventCounts.spotMarketAccountUpdate++;
149+
});
150+
151+
client.eventEmitter.on('oraclePriceUpdate', (_publicKey, _source, _data) => {
152+
eventCounts.oraclePriceUpdate++;
153+
});
154+
155+
client.accountSubscriber.eventEmitter.on('update', () => {
156+
eventCounts.update++;
157+
});
158+
159+
// Subscribe
160+
console.log('🔗 Subscribing to accounts...');
161+
await client.subscribe();
162+
163+
console.log('✅ Client subscribed successfully!');
164+
console.log('🚀 Starting high-load testing (50 reads/sec per perp market)...');
165+
166+
// High-frequency load testing - 50 reads per second per perp market
167+
const loadTestInterval = setInterval(async () => {
168+
try {
169+
// Test getPerpMarketAccount for each perp market (50 times per second per market)
170+
for (const marketIndex of perpMarketIndexes) {
171+
const perpMarketAccount = client.getPerpMarketAccount(marketIndex);
172+
console.log("perpMarketAccount name: ", decodeName(perpMarketAccount.name));
173+
console.log("perpMarketAccount data: ", JSON.stringify({
174+
marketIndex: perpMarketAccount.marketIndex,
175+
name: decodeName(perpMarketAccount.name),
176+
baseAssetReserve: perpMarketAccount.amm.baseAssetReserve.toString(),
177+
quoteAssetReserve: perpMarketAccount.amm.quoteAssetReserve.toString()
178+
}));
179+
}
180+
181+
// Test getMMOracleDataForPerpMarket for each perp market (50 times per second per market)
182+
for (const marketIndex of perpMarketIndexes) {
183+
try {
184+
const oracleData = client.getMMOracleDataForPerpMarket(marketIndex);
185+
console.log("oracleData price: ", oracleData.price.toString());
186+
console.log("oracleData: ", JSON.stringify({
187+
price: oracleData.price.toString(),
188+
confidence: oracleData.confidence?.toString(),
189+
slot: oracleData.slot?.toString()
190+
}));
191+
} catch (error) {
192+
// Ignore errors for load testing
193+
}
194+
}
195+
} catch (error) {
196+
console.error('Load test error:', error);
197+
}
198+
}, 20); // 50 times per second = 1000ms / 50 = 20ms interval
199+
200+
// Log periodic stats
201+
const statsInterval = setInterval(() => {
202+
console.log('\n📈 Event Counts:', eventCounts);
203+
console.log(`⏱️ Client subscribed: ${client.isSubscribed}`);
204+
console.log(`🔗 Account subscriber subscribed: ${client.accountSubscriber.isSubscribed}`);
205+
console.log(`🔥 Load: ${perpMarketIndexes.length * 50 * 2} reads/sec (${perpMarketIndexes.length} markets × 50 getPerpMarketAccount + 50 getMMOracleDataForPerpMarket)`);
206+
}, 5000);
207+
208+
// Handle shutdown signals - just exit without cleanup since they never unsubscribe
209+
process.on('SIGINT', () => {
210+
console.log('\n🛑 Shutting down...');
211+
clearInterval(loadTestInterval);
212+
clearInterval(statsInterval);
213+
process.exit(0);
214+
});
215+
216+
process.on('SIGTERM', () => {
217+
console.log('\n🛑 Shutting down...');
218+
clearInterval(loadTestInterval);
219+
clearInterval(statsInterval);
220+
process.exit(0);
221+
});
222+
223+
return client;
224+
}
225+
226+
initializeSingleGrpcClient().catch(console.error);

sdk/src/accounts/grpcDriftClientAccountSubscriberV2.ts

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,11 @@ export class grpcDriftClientAccountSubscriberV2
227227
o.source === oracleInfo.source &&
228228
o.publicKey.equals(oracleInfo.publicKey)
229229
);
230-
if (!exists) {
231-
this.oracleInfos = this.oracleInfos.concat(oracleInfo);
230+
if (exists) {
231+
return true; // Already exists, don't add duplicate
232232
}
233233

234+
this.oracleInfos = this.oracleInfos.concat(oracleInfo);
234235
this.oracleMultiSubscriber?.addAccounts([oracleInfo.publicKey]);
235236

236237
return true;
@@ -708,11 +709,37 @@ export class grpcDriftClientAccountSubscriberV2
708709
await this.perpMarketsSubscriber.removeAccounts(
709710
perpMarketPubkeysToRemove
710711
);
712+
// Clean up the mapping for removed perp markets
713+
for (const pubkey of perpMarketPubkeysToRemove) {
714+
const pubkeyString = pubkey.toBase58();
715+
for (const [
716+
marketIndex,
717+
accountPubkey,
718+
] of this.perpMarketIndexToAccountPubkeyMap.entries()) {
719+
if (accountPubkey === pubkeyString) {
720+
this.perpMarketIndexToAccountPubkeyMap.delete(marketIndex);
721+
this.perpOracleMap.delete(marketIndex);
722+
this.perpOracleStringMap.delete(marketIndex);
723+
break;
724+
}
725+
}
726+
}
711727
}
712728

713729
// Remove accounts in batches - oracles
714730
if (oraclePubkeysToRemove.length > 0) {
715731
await this.oracleMultiSubscriber.removeAccounts(oraclePubkeysToRemove);
732+
// Clean up oracle data for removed oracles by finding their sources
733+
for (const pubkey of oraclePubkeysToRemove) {
734+
// Find the oracle source by checking oracleInfos
735+
const oracleInfo = this.oracleInfos.find((info) =>
736+
info.publicKey.equals(pubkey)
737+
);
738+
if (oracleInfo) {
739+
const oracleId = getOracleId(pubkey, oracleInfo.source);
740+
this.oracleIdToOracleDataMap.delete(oracleId);
741+
}
742+
}
716743
}
717744
}
718745

@@ -731,13 +758,25 @@ export class grpcDriftClientAccountSubscriberV2
731758
}
732759

733760
async unsubscribe(): Promise<void> {
734-
if (this.isSubscribed) {
761+
if (!this.isSubscribed) {
735762
return;
736763
}
737764

738-
await this.stateAccountSubscriber.unsubscribe();
765+
this.isSubscribed = false;
766+
this.isSubscribing = false;
767+
768+
await this.stateAccountSubscriber?.unsubscribe();
739769
await this.unsubscribeFromOracles();
740770
await this.perpMarketsSubscriber?.unsubscribe();
741771
await this.spotMarketsSubscriber?.unsubscribe();
772+
773+
// Clean up all maps to prevent memory leaks
774+
this.perpMarketIndexToAccountPubkeyMap.clear();
775+
this.spotMarketIndexToAccountPubkeyMap.clear();
776+
this.oracleIdToOracleDataMap.clear();
777+
this.perpOracleMap.clear();
778+
this.perpOracleStringMap.clear();
779+
this.spotOracleMap.clear();
780+
this.spotOracleStringMap.clear();
742781
}
743782
}

0 commit comments

Comments
 (0)