Skip to content

Commit 9970540

Browse files
committed
feat: chunk stuff account requests
1 parent 157edac commit 9970540

File tree

1 file changed

+88
-72
lines changed

1 file changed

+88
-72
lines changed

sdk/src/accounts/webSocketProgramAccountSubscriberV2.ts

Lines changed: 88 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -718,87 +718,103 @@ export class WebSocketProgramAccountSubscriberV2<T>
718718

719719
private async fetchAccountsBatch(accountIds: string[]): Promise<void> {
720720
try {
721-
// Fetch all accounts in a single batch request
722-
const accountAddresses = accountIds.map(
723-
(accountId) => accountId as Address
724-
);
725-
const rpcResponse = await this.rpc
726-
.getMultipleAccounts(accountAddresses, {
727-
commitment: this.options.commitment as GillCommitment,
728-
encoding: 'base64',
729-
})
730-
.send();
731-
732-
const currentSlot = Number(rpcResponse.context.slot);
733-
734-
// Process each account response
735-
for (let i = 0; i < accountIds.length; i++) {
736-
const accountIdString = accountIds[i];
737-
const accountInfo = rpcResponse.value[i];
738-
739-
if (!accountInfo) {
740-
continue;
741-
}
742-
743-
const existingBufferAndSlot =
744-
this.bufferAndSlotMap.get(accountIdString);
721+
// Chunk account IDs into groups of 100 (getMultipleAccounts limit)
722+
const chunkSize = 100;
723+
const chunks: string[][] = [];
724+
for (let i = 0; i < accountIds.length; i += chunkSize) {
725+
chunks.push(accountIds.slice(i, i + chunkSize));
726+
}
745727

746-
if (!existingBufferAndSlot) {
747-
// Account not in our map yet, add it
748-
let newBuffer: Buffer | undefined = undefined;
749-
if (accountInfo.data) {
750-
if (Array.isArray(accountInfo.data)) {
751-
const [data, encoding] = accountInfo.data;
752-
newBuffer = Buffer.from(data, encoding);
728+
// Process all chunks concurrently
729+
await Promise.all(
730+
chunks.map(async (chunk) => {
731+
const accountAddresses = chunk.map(
732+
(accountId) => accountId as Address
733+
);
734+
const rpcResponse = await this.rpc
735+
.getMultipleAccounts(accountAddresses, {
736+
commitment: this.options.commitment as GillCommitment,
737+
encoding: 'base64',
738+
})
739+
.send();
740+
741+
const currentSlot = Number(rpcResponse.context.slot);
742+
743+
// Process each account response in this chunk
744+
for (let i = 0; i < chunk.length; i++) {
745+
const accountIdString = chunk[i];
746+
const accountInfo = rpcResponse.value[i];
747+
748+
if (!accountInfo) {
749+
continue;
753750
}
754-
}
755751

756-
if (newBuffer) {
757-
this.bufferAndSlotMap.set(accountIdString, {
758-
buffer: newBuffer,
759-
slot: currentSlot,
760-
});
761-
const account = this.decodeBuffer(
762-
this.accountDiscriminator,
763-
newBuffer
764-
);
765-
const accountId = new PublicKey(accountIdString);
766-
this.onChange(accountId, account, { slot: currentSlot }, newBuffer);
767-
}
768-
continue;
769-
}
752+
const existingBufferAndSlot =
753+
this.bufferAndSlotMap.get(accountIdString);
754+
755+
if (!existingBufferAndSlot) {
756+
// Account not in our map yet, add it
757+
let newBuffer: Buffer | undefined = undefined;
758+
if (accountInfo.data) {
759+
if (Array.isArray(accountInfo.data)) {
760+
const [data, encoding] = accountInfo.data;
761+
newBuffer = Buffer.from(data, encoding);
762+
}
763+
}
770764

771-
// Check if we missed an update
772-
if (currentSlot > existingBufferAndSlot.slot) {
773-
let newBuffer: Buffer | undefined = undefined;
774-
if (accountInfo.data) {
775-
if (Array.isArray(accountInfo.data)) {
776-
const [data, encoding] = accountInfo.data;
777-
if (encoding === ('base58' as any)) {
778-
newBuffer = Buffer.from(bs58.decode(data));
779-
} else {
780-
newBuffer = Buffer.from(data, 'base64');
765+
if (newBuffer) {
766+
this.bufferAndSlotMap.set(accountIdString, {
767+
buffer: newBuffer,
768+
slot: currentSlot,
769+
});
770+
const account = this.decodeBuffer(
771+
this.accountDiscriminator,
772+
newBuffer
773+
);
774+
const accountId = new PublicKey(accountIdString);
775+
this.onChange(
776+
accountId,
777+
account,
778+
{ slot: currentSlot },
779+
newBuffer
780+
);
781781
}
782+
continue;
782783
}
783-
}
784784

785-
// Check if buffer has changed
786-
if (
787-
newBuffer &&
788-
(!existingBufferAndSlot.buffer ||
789-
!newBuffer.equals(existingBufferAndSlot.buffer))
790-
) {
791-
if (this.resubOpts?.logResubMessages) {
792-
console.log(
793-
`[${this.subscriptionName}] Batch polling detected missed update for account ${accountIdString}, signaling resubscription`
794-
);
785+
// Check if we missed an update
786+
if (currentSlot > existingBufferAndSlot.slot) {
787+
let newBuffer: Buffer | undefined = undefined;
788+
if (accountInfo.data) {
789+
if (Array.isArray(accountInfo.data)) {
790+
const [data, encoding] = accountInfo.data;
791+
if (encoding === ('base58' as any)) {
792+
newBuffer = Buffer.from(bs58.decode(data));
793+
} else {
794+
newBuffer = Buffer.from(data, 'base64');
795+
}
796+
}
797+
}
798+
799+
// Check if buffer has changed
800+
if (
801+
newBuffer &&
802+
(!existingBufferAndSlot.buffer ||
803+
!newBuffer.equals(existingBufferAndSlot.buffer))
804+
) {
805+
if (this.resubOpts?.logResubMessages) {
806+
console.log(
807+
`[${this.subscriptionName}] Batch polling detected missed update for account ${accountIdString}, signaling resubscription`
808+
);
809+
}
810+
// Signal missed change instead of immediately resubscribing
811+
this.signalMissedChange(accountIdString);
812+
return;
813+
}
795814
}
796-
// Signal missed change instead of immediately resubscribing
797-
this.signalMissedChange(accountIdString);
798-
return;
799815
}
800-
}
801-
}
816+
})
817+
);
802818
} catch (error) {
803819
if (this.resubOpts?.logResubMessages) {
804820
console.log(

0 commit comments

Comments
 (0)