Skip to content

Commit cd88758

Browse files
committed
feat: initial batch fetching temp
1 parent 3f72967 commit cd88758

File tree

2 files changed

+192
-95
lines changed

2 files changed

+192
-95
lines changed

sdk/src/accounts/webSocketDriftClientAccountSubscriber.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ export class WebSocketDriftClientAccountSubscriber
9494
customPerpMarketAccountSubscriber?: new (
9595
accountName: string,
9696
program: Program,
97-
accountPublicKey: PublicKey,
97+
accountPublicKey: PublicKey,
9898
decodeBuffer?: (buffer: Buffer) => any,
9999
resubOpts?: ResubOpts,
100100
commitment?: Commitment

sdk/src/accounts/webSocketProgramAccountSubscriberV2.ts

Lines changed: 191 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ export class WebSocketProgramAccountSubscriberV2<T>
5151
private accountsCurrentlyPolling: Set<string> = new Set(); // Track which accounts are being polled
5252
private batchPollingTimeout?: ReturnType<typeof setTimeout>; // Single timeout for batch polling
5353

54+
// For batching initial account fetches
55+
private accountsPendingInitialMonitorFetch: Set<string> = new Set(); // Track accounts waiting for initial monitor fetch
56+
private initialFetchTimeout?: ReturnType<typeof setTimeout>; // Single timeout for initial monitoring batch fetch
57+
private initialFetchBufferMs: number = 1000; // Buffer time to collect accounts for initial monitoring fetch
58+
5459
public constructor(
5560
subscriptionName: string,
5661
accountDiscriminator: string,
@@ -149,7 +154,10 @@ export class WebSocketProgramAccountSubscriberV2<T>
149154
filters: this.options.filters.map((filter) => {
150155
// Convert filter bytes from base58 to base64 if needed
151156
let bytes = filter.memcmp.bytes;
152-
if (typeof bytes === 'string' && /^[1-9A-HJ-NP-Za-km-z]+$/.test(bytes)) {
157+
if (
158+
typeof bytes === 'string' &&
159+
/^[1-9A-HJ-NP-Za-km-z]+$/.test(bytes)
160+
) {
153161
// Looks like base58 - convert to base64
154162
const decoded = bs58.decode(bytes);
155163
bytes = Buffer.from(decoded).toString('base64');
@@ -309,13 +317,11 @@ export class WebSocketProgramAccountSubscriberV2<T>
309317
currentTime - lastNotificationTime >= this.pollingIntervalMs
310318
) {
311319
console.debug(
312-
'No recent WS notification, starting polling for account',
320+
'No recent WS notification, starting initial fetch for account',
313321
accountIdString
314322
);
315-
// No recent WS notification, start polling
316-
await this.pollAccount(accountIdString);
317-
// Schedule next poll
318-
this.startPollingForAccount(accountIdString);
323+
// No recent WS notification, add to pending initial fetch
324+
this.addToInitialFetchBatch(accountIdString);
319325
} else {
320326
// We received a WS notification recently, continue monitoring
321327
this.startMonitoringForAccount(accountIdString);
@@ -336,6 +342,7 @@ export class WebSocketProgramAccountSubscriberV2<T>
336342
}
337343

338344
private startBatchPolling(): void {
345+
console.debug('Scheduling batch polling');
339346
// Clear existing batch polling timeout
340347
if (this.batchPollingTimeout) {
341348
clearTimeout(this.batchPollingTimeout);
@@ -357,95 +364,10 @@ export class WebSocketProgramAccountSubscriberV2<T>
357364
return;
358365
}
359366

360-
console.debug(
361-
'Polling all accounts',
362-
accountsToPoll.length,
363-
'accounts'
364-
);
365-
366-
// Fetch all accounts in a single batch request
367-
const accountAddresses = accountsToPoll.map(
368-
(accountId) => accountId as Address
369-
);
370-
const rpcResponse = await this.rpc
371-
.getMultipleAccounts(accountAddresses, {
372-
commitment: this.options.commitment as GillCommitment,
373-
encoding: 'base64',
374-
})
375-
.send();
367+
console.debug('Polling all accounts', accountsToPoll.length, 'accounts');
376368

377-
const currentSlot = Number(rpcResponse.context.slot);
378-
379-
// Process each account response
380-
for (let i = 0; i < accountsToPoll.length; i++) {
381-
const accountIdString = accountsToPoll[i];
382-
const accountInfo = rpcResponse.value[i];
383-
384-
if (!accountInfo) {
385-
continue;
386-
}
387-
388-
const existingBufferAndSlot =
389-
this.bufferAndSlotMap.get(accountIdString);
390-
391-
if (!existingBufferAndSlot) {
392-
// Account not in our map yet, add it
393-
let newBuffer: Buffer | undefined = undefined;
394-
if (accountInfo.data) {
395-
if (Array.isArray(accountInfo.data)) {
396-
const [data, encoding] = accountInfo.data;
397-
newBuffer = Buffer.from(data, encoding);
398-
}
399-
}
400-
401-
if (newBuffer) {
402-
this.bufferAndSlotMap.set(accountIdString, {
403-
buffer: newBuffer,
404-
slot: currentSlot,
405-
});
406-
const account = this.decodeBuffer(
407-
this.accountDiscriminator,
408-
newBuffer
409-
);
410-
const accountId = new PublicKey(accountIdString);
411-
this.onChange(accountId, account, { slot: currentSlot }, newBuffer);
412-
}
413-
continue;
414-
}
415-
416-
// Check if we missed an update
417-
if (currentSlot > existingBufferAndSlot.slot) {
418-
let newBuffer: Buffer | undefined = undefined;
419-
if (accountInfo.data) {
420-
if (Array.isArray(accountInfo.data)) {
421-
const [data, encoding] = accountInfo.data;
422-
if (encoding === ('base58' as any)) {
423-
newBuffer = Buffer.from(bs58.decode(data));
424-
} else {
425-
newBuffer = Buffer.from(data, 'base64');
426-
}
427-
}
428-
}
429-
430-
// Check if buffer has changed
431-
if (
432-
newBuffer &&
433-
(!existingBufferAndSlot.buffer ||
434-
!newBuffer.equals(existingBufferAndSlot.buffer))
435-
) {
436-
if (this.resubOpts?.logResubMessages) {
437-
console.log(
438-
`[${this.subscriptionName}] Batch polling detected missed update for account ${accountIdString}, resubscribing`
439-
);
440-
}
441-
// We missed an update, resubscribe
442-
await this.unsubscribe(true);
443-
this.receivingData = false;
444-
await this.subscribe(this.onChange);
445-
return;
446-
}
447-
}
448-
}
369+
// Use the shared batch fetch method
370+
await this.fetchAccountsBatch(accountsToPoll);
449371
} catch (error) {
450372
if (this.resubOpts?.logResubMessages) {
451373
console.log(
@@ -642,6 +564,172 @@ export class WebSocketProgramAccountSubscriberV2<T>
642564
}
643565
}
644566

567+
private addToInitialFetchBatch(accountIdString: string): void {
568+
// Add account to pending initial monitor fetch set
569+
this.accountsPendingInitialMonitorFetch.add(accountIdString);
570+
571+
// If this is the first account in the batch, start the buffer timer
572+
if (this.accountsPendingInitialMonitorFetch.size === 1) {
573+
this.startInitialFetchBuffer();
574+
}
575+
}
576+
577+
private startInitialFetchBuffer(): void {
578+
// Clear existing initial fetch timeout
579+
if (this.initialFetchTimeout) {
580+
clearTimeout(this.initialFetchTimeout);
581+
}
582+
583+
// Set up buffer timeout to collect accounts
584+
this.initialFetchTimeout = setTimeout(async () => {
585+
await this.processInitialFetchBatch();
586+
}, this.initialFetchBufferMs);
587+
}
588+
589+
private async processInitialFetchBatch(): Promise<void> {
590+
try {
591+
// Get all accounts pending initial monitor fetch
592+
const accountsToFetch = Array.from(
593+
this.accountsPendingInitialMonitorFetch
594+
);
595+
if (accountsToFetch.length === 0) {
596+
return;
597+
}
598+
599+
console.debug(
600+
'Processing initial monitor fetch batch',
601+
accountsToFetch.length,
602+
'accounts'
603+
);
604+
605+
// Use the same batch logic as pollAllAccounts
606+
await this.fetchAccountsBatch(accountsToFetch);
607+
608+
// Move accounts to polling set and start batch polling
609+
accountsToFetch.forEach((accountIdString) => {
610+
this.accountsCurrentlyPolling.add(accountIdString);
611+
});
612+
613+
// Clear the pending set
614+
this.accountsPendingInitialMonitorFetch.clear();
615+
616+
// If this is the first account being polled, start batch polling
617+
if (this.accountsCurrentlyPolling.size === accountsToFetch.length) {
618+
this.startBatchPolling();
619+
} else {
620+
console.debug(
621+
'Not starting initial batch polling, we think it is not the first account being polled',
622+
this.accountsCurrentlyPolling.size,
623+
'accounts currently polling',
624+
accountsToFetch.length,
625+
'accounts to fetch'
626+
);
627+
}
628+
} catch (error) {
629+
if (this.resubOpts?.logResubMessages) {
630+
console.log(
631+
`[${this.subscriptionName}] Error processing initial monitor fetch batch:`,
632+
error
633+
);
634+
}
635+
}
636+
}
637+
638+
private async fetchAccountsBatch(accountIds: string[]): Promise<void> {
639+
try {
640+
// Fetch all accounts in a single batch request
641+
const accountAddresses = accountIds.map(
642+
(accountId) => accountId as Address
643+
);
644+
const rpcResponse = await this.rpc
645+
.getMultipleAccounts(accountAddresses, {
646+
commitment: this.options.commitment as GillCommitment,
647+
encoding: 'base64',
648+
})
649+
.send();
650+
651+
const currentSlot = Number(rpcResponse.context.slot);
652+
653+
// Process each account response
654+
for (let i = 0; i < accountIds.length; i++) {
655+
const accountIdString = accountIds[i];
656+
const accountInfo = rpcResponse.value[i];
657+
658+
if (!accountInfo) {
659+
continue;
660+
}
661+
662+
const existingBufferAndSlot =
663+
this.bufferAndSlotMap.get(accountIdString);
664+
665+
if (!existingBufferAndSlot) {
666+
// Account not in our map yet, add it
667+
let newBuffer: Buffer | undefined = undefined;
668+
if (accountInfo.data) {
669+
if (Array.isArray(accountInfo.data)) {
670+
const [data, encoding] = accountInfo.data;
671+
newBuffer = Buffer.from(data, encoding);
672+
}
673+
}
674+
675+
if (newBuffer) {
676+
this.bufferAndSlotMap.set(accountIdString, {
677+
buffer: newBuffer,
678+
slot: currentSlot,
679+
});
680+
const account = this.decodeBuffer(
681+
this.accountDiscriminator,
682+
newBuffer
683+
);
684+
const accountId = new PublicKey(accountIdString);
685+
this.onChange(accountId, account, { slot: currentSlot }, newBuffer);
686+
}
687+
continue;
688+
}
689+
690+
// Check if we missed an update
691+
if (currentSlot > existingBufferAndSlot.slot) {
692+
let newBuffer: Buffer | undefined = undefined;
693+
if (accountInfo.data) {
694+
if (Array.isArray(accountInfo.data)) {
695+
const [data, encoding] = accountInfo.data;
696+
if (encoding === ('base58' as any)) {
697+
newBuffer = Buffer.from(bs58.decode(data));
698+
} else {
699+
newBuffer = Buffer.from(data, 'base64');
700+
}
701+
}
702+
}
703+
704+
// Check if buffer has changed
705+
if (
706+
newBuffer &&
707+
(!existingBufferAndSlot.buffer ||
708+
!newBuffer.equals(existingBufferAndSlot.buffer))
709+
) {
710+
if (this.resubOpts?.logResubMessages) {
711+
console.log(
712+
`[${this.subscriptionName}] Initial fetch detected missed update for account ${accountIdString}, resubscribing`
713+
);
714+
}
715+
// We missed an update, resubscribe
716+
await this.unsubscribe(true);
717+
this.receivingData = false;
718+
await this.subscribe(this.onChange);
719+
return;
720+
}
721+
}
722+
}
723+
} catch (error) {
724+
if (this.resubOpts?.logResubMessages) {
725+
console.log(
726+
`[${this.subscriptionName}] Error fetching accounts batch:`,
727+
error
728+
);
729+
}
730+
}
731+
}
732+
645733
private clearPollingTimeouts(): void {
646734
this.pollingTimeouts.forEach((timeoutId) => {
647735
clearTimeout(timeoutId);
@@ -654,8 +742,17 @@ export class WebSocketProgramAccountSubscriberV2<T>
654742
this.batchPollingTimeout = undefined;
655743
}
656744

745+
// Clear initial fetch timeout
746+
if (this.initialFetchTimeout) {
747+
clearTimeout(this.initialFetchTimeout);
748+
this.initialFetchTimeout = undefined;
749+
}
750+
657751
// Clear accounts currently polling
658752
this.accountsCurrentlyPolling.clear();
753+
754+
// Clear accounts pending initial monitor fetch
755+
this.accountsPendingInitialMonitorFetch.clear();
659756
}
660757

661758
unsubscribe(onResub = false): Promise<void> {

0 commit comments

Comments
 (0)