diff --git a/sdk/src/accounts/README_WebSocketAccountSubscriberV2.md b/sdk/src/accounts/README_WebSocketAccountSubscriberV2.md index f53ebc133..b15b05f35 100644 --- a/sdk/src/accounts/README_WebSocketAccountSubscriberV2.md +++ b/sdk/src/accounts/README_WebSocketAccountSubscriberV2.md @@ -31,6 +31,46 @@ await subscriber.subscribe((data) => { await subscriber.unsubscribe(); ``` +### Polling Instead of Resubscribing + +For accounts that rarely update (like long-tail markets), you can use polling instead of resubscribing to reduce resource usage: + +```typescript +const resubOpts = { + resubTimeoutMs: 30000, // 30 seconds + logResubMessages: true, + usePollingInsteadOfResub: true, // Enable polling mode + pollingIntervalMs: 30000, // Poll every 30 seconds (optional, defaults to 30000) +}; + +const subscriber = new WebSocketAccountSubscriberV2( + 'perpMarket', // account name + program, + marketPublicKey, + undefined, // decodeBuffer + resubOpts +); +``` + +**How it works:** +1. Initially subscribes to WebSocket updates +2. If no WebSocket data is received for `resubTimeoutMs` (30s), switches to polling mode if `usePollingInsteadOfResub` is specified true, else just resubscribes(unsub, sub). +3. Polls every `pollingIntervalMs` (30s) to check for updates by: + - Storing current account buffer state + - Fetching latest account data + - Comparing buffers to detect any missed updates +4. If polling detects new data (indicating missed WebSocket events): + - Immediately stops polling + - Resubscribes to WebSocket to restore real-time updates + - This helps recover from degraded WebSocket connections +5. If a WebSocket event is received while polling: + - Polling is automatically stopped + - System continues with normal WebSocket updates +6. This approach provides: + - Efficient handling of rarely-updated accounts + - Automatic recovery from WebSocket connection issues + - Seamless fallback between polling and WebSocket modes + ## Implementation Details ### Gill Integration @@ -52,3 +92,4 @@ const { rpc, rpcSubscriptions } = createSolanaClient({ 3. **Address Handling**: Converts `PublicKey` to gill's `Address` type for compatibility 4. **Response Formatting**: Converts gill responses to match the expected `AccountInfo` format 5. **Abort Signal**: Utilizes AbortSignal nodejs/web class to shutdown websocket connection synchronously +6. **Polling Mode**: Optional polling mechanism for accounts that rarely update diff --git a/sdk/src/accounts/types.ts b/sdk/src/accounts/types.ts index 26bb9cfb4..98ab7133f 100644 --- a/sdk/src/accounts/types.ts +++ b/sdk/src/accounts/types.ts @@ -202,6 +202,9 @@ export type DataAndSlot = { export type ResubOpts = { resubTimeoutMs?: number; logResubMessages?: boolean; + // New options for polling-based resubscription + usePollingInsteadOfResub?: boolean; + pollingIntervalMs?: number; }; export interface UserStatsAccountEvents { diff --git a/sdk/src/accounts/webSocketAccountSubscriberV2.ts b/sdk/src/accounts/webSocketAccountSubscriberV2.ts index 9068f1b15..cf7f73fa1 100644 --- a/sdk/src/accounts/webSocketAccountSubscriberV2.ts +++ b/sdk/src/accounts/webSocketAccountSubscriberV2.ts @@ -8,16 +8,67 @@ import { AnchorProvider, Program } from '@coral-xyz/anchor'; import { capitalize } from './utils'; import { AccountInfoBase, - AccountInfoWithBase58EncodedData, AccountInfoWithBase64EncodedData, + AccountInfoWithBase58EncodedData, createSolanaClient, isAddress, + Rpc, + RpcSubscriptions, + SolanaRpcSubscriptionsApi, type Address, type Commitment, } from 'gill'; import { PublicKey } from '@solana/web3.js'; import bs58 from 'bs58'; +/** + * WebSocketAccountSubscriberV2 + * + * High-level overview + * - WebSocket-first subscriber for a single Solana account with optional + * polling safeguards when the WS feed goes quiet. + * - Emits decoded updates via `onChange` and maintains the latest + * `{buffer, slot}` and decoded `{data, slot}` internally. + * + * Why polling if this is a WebSocket subscriber? + * - Under real-world conditions, WS notifications can stall or get dropped. + * - When `resubOpts.resubTimeoutMs` elapses without WS data, you can either: + * - resubscribe to the WS stream (default), or + * - enable `resubOpts.usePollingInsteadOfResub` to start polling this single + * account via RPC to check for missed changes. + * - Polling compares the fetched buffer to the last known buffer. If different + * at an equal-or-later slot, it indicates a missed update and we resubscribe + * to WS to restore a clean stream. + * + * Initial fetch (on subscribe) + * - On `subscribe()`, we do a one-time RPC `fetch()` to seed internal state and + * emit the latest account state, ensuring consumers start from ground truth + * even before WS events arrive. + * + * Continuous polling (opt-in) + * - If `usePollingInsteadOfResub` is set, the inactivity timeout triggers a + * polling loop that periodically `fetch()`es the account and checks for + * changes. On change, polling stops and we resubscribe to WS. + * - If not set (default), the inactivity timeout immediately triggers a WS + * resubscription (no polling loop). + * + * Account focus + * - This class tracks exactly one account — the one passed to the constructor — + * which is by definition the account the consumer cares about. The extra + * logic is narrowly scoped to this account to minimize overhead. + * + * Tuning knobs + * - `resubOpts.resubTimeoutMs`: WS inactivity threshold before fallback. + * - `resubOpts.usePollingInsteadOfResub`: toggle polling vs immediate resub. + * - `resubOpts.pollingIntervalMs`: polling cadence (default 30s). + * - `resubOpts.logResubMessages`: verbose logs for diagnostics. + * - `commitment`: WS/RPC commitment used for reads and notifications. + * - `decodeBufferFn`: optional custom decode; defaults to Anchor coder. + * + * Implementation notes + * - Uses `gill` for both WS (`rpcSubscriptions`) and RPC (`rpc`) to match the + * program provider’s RPC endpoint. Handles base58/base64 encoded data. + */ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { dataAndSlot?: DataAndSlot; bufferAndSlot?: BufferAndSlot; @@ -35,6 +86,7 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { isUnsubscribing = false; timeoutId?: ReturnType; + pollingTimeoutId?: ReturnType; receivingData: boolean; @@ -45,13 +97,28 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { >['rpcSubscriptions']; private abortController?: AbortController; + /** + * Create a single-account WebSocket subscriber with optional polling fallback. + * + * @param accountName Name of the Anchor account type (used for default decode). + * @param program Anchor `Program` used for decoding and provider access. + * @param accountPublicKey Public key of the account to track. + * @param decodeBuffer Optional custom decode function; if omitted, uses + * program coder to decode `accountName`. + * @param resubOpts Resubscription/polling options. See class docs. + * @param commitment Commitment for WS and RPC operations. + * @param rpcSubscriptions Optional override/injection for testing. + * @param rpc Optional override/injection for testing. + */ public constructor( accountName: string, program: Program, accountPublicKey: PublicKey, decodeBuffer?: (buffer: Buffer) => T, resubOpts?: ResubOpts, - commitment?: Commitment + commitment?: Commitment, + rpcSubscriptions?: RpcSubscriptions & string, + rpc?: Rpc ) { this.accountName = accountName; this.logAccountName = `${accountName}-${accountPublicKey.toBase58()}-ws-acct-subscriber-v2`; @@ -81,17 +148,44 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { ((this.program.provider as AnchorProvider).opts.commitment as Commitment); // Initialize gill client using the same RPC URL as the program provider - const rpcUrl = (this.program.provider as AnchorProvider).connection - .rpcEndpoint; - const { rpc, rpcSubscriptions } = createSolanaClient({ - urlOrMoniker: rpcUrl, - }); - this.rpc = rpc; - this.rpcSubscriptions = rpcSubscriptions; + + this.rpc = rpc + ? rpc + : (() => { + const rpcUrl = (this.program.provider as AnchorProvider).connection + .rpcEndpoint; + const { rpc } = createSolanaClient({ + urlOrMoniker: rpcUrl, + }); + return rpc; + })(); + this.rpcSubscriptions = rpcSubscriptions + ? rpcSubscriptions + : (() => { + const rpcUrl = (this.program.provider as AnchorProvider).connection + .rpcEndpoint; + const { rpcSubscriptions } = createSolanaClient({ + urlOrMoniker: rpcUrl, + }); + return rpcSubscriptions; + })(); } - private async handleNotificationLoop(subscription: AsyncIterable) { + private async handleNotificationLoop( + subscriptionPromise: Promise> + ) { + const subscription = await subscriptionPromise; for await (const notification of subscription) { + // If we're currently polling and receive a WebSocket event, stop polling + if (this.pollingTimeoutId) { + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.logAccountName}] Received WebSocket event while polling, stopping polling` + ); + } + this.stopPolling(); + } + if (this.resubOpts?.resubTimeoutMs) { this.receivingData = true; clearTimeout(this.timeoutId); @@ -104,6 +198,19 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { } async subscribe(onChange: (data: T) => void): Promise { + /** + * Start the WebSocket subscription and (optionally) setup inactivity + * fallback. + * + * Flow + * - If we do not have initial state, perform a one-time `fetch()` to seed + * internal buffers and emit current data. + * - Subscribe to account notifications via WS. + * - If `resubOpts.resubTimeoutMs` is set, schedule an inactivity timeout. + * When it fires: + * - if `usePollingInsteadOfResub` is true, start polling loop; + * - otherwise, resubscribe to WS immediately. + */ if (this.listenerId != null || this.isUnsubscribing) { if (this.resubOpts?.logResubMessages) { console.log( @@ -132,7 +239,7 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { // Subscribe to account changes using gill's rpcSubscriptions const pubkey = this.accountPublicKey.toBase58(); if (isAddress(pubkey)) { - const subscription = await this.rpcSubscriptions + const subscriptionPromise = this.rpcSubscriptions .accountNotifications(pubkey, { commitment: this.commitment, encoding: 'base64', @@ -141,8 +248,8 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { abortSignal: abortController.signal, }); - // Start notification loop without awaiting - this.handleNotificationLoop(subscription); + // Start notification loop with the subscription promise + this.handleNotificationLoop(subscriptionPromise); } } @@ -159,6 +266,11 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { } protected setTimeout(): void { + /** + * Schedule inactivity handling. If WS is quiet for + * `resubOpts.resubTimeoutMs` and `receivingData` is true, trigger either + * a polling loop or a resubscribe depending on options. + */ if (!this.onChange) { throw new Error('onChange callback function must be set'); } @@ -175,31 +287,113 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { } if (this.receivingData) { + if (this.resubOpts?.usePollingInsteadOfResub) { + // Use polling instead of resubscribing + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.logAccountName}] No ws data in ${this.resubOpts.resubTimeoutMs}ms, starting polling - listenerId=${this.listenerId}` + ); + } + this.startPolling(); + } else { + // Original resubscribe behavior + if (this.resubOpts?.logResubMessages) { + console.log( + `No ws data from ${this.logAccountName} in ${this.resubOpts.resubTimeoutMs}ms, resubscribing - listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}` + ); + } + await this.unsubscribe(true); + this.receivingData = false; + await this.subscribe(this.onChange); + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.logAccountName}] Resubscribe completed - receivingData=${this.receivingData}, listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}` + ); + } + } + } else { if (this.resubOpts?.logResubMessages) { console.log( - `No ws data from ${this.logAccountName} in ${this.resubOpts.resubTimeoutMs}ms, resubscribing - listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}` + `[${this.logAccountName}] Timeout fired but receivingData=false, skipping resubscribe` ); } - await this.unsubscribe(true); - this.receivingData = false; - await this.subscribe(this.onChange); + } + }, + this.resubOpts?.resubTimeoutMs + ); + } + + /** + * Start the polling loop (single-account). + * - Periodically calls `fetch()` and compares buffers to detect changes. + * - On detected change, stops polling and resubscribes to WS. + */ + private startPolling(): void { + const pollingInterval = this.resubOpts?.pollingIntervalMs || 30000; // Default to 30s + + const poll = async () => { + if (this.isUnsubscribing) { + return; + } + + try { + // Store current data and buffer before polling + const currentBuffer = this.bufferAndSlot?.buffer; + + // Fetch latest account data + await this.fetch(); + + // Check if we got new data by comparing buffers + const newBuffer = this.bufferAndSlot?.buffer; + const hasNewData = + newBuffer && (!currentBuffer || !newBuffer.equals(currentBuffer)); + + if (hasNewData) { + // New data received, stop polling and resubscribe to websocket if (this.resubOpts?.logResubMessages) { console.log( - `[${this.logAccountName}] Resubscribe completed - receivingData=${this.receivingData}, listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}` + `[${this.logAccountName}] Polling detected account data change, resubscribing to websocket` ); } + this.stopPolling(); + this.receivingData = false; + await this.subscribe(this.onChange); } else { + // No new data, continue polling if (this.resubOpts?.logResubMessages) { console.log( - `[${this.logAccountName}] Timeout fired but receivingData=false, skipping resubscribe` + `[${this.logAccountName}] Polling found no account changes, continuing to poll every ${pollingInterval}ms` ); } + this.pollingTimeoutId = setTimeout(poll, pollingInterval); } - }, - this.resubOpts?.resubTimeoutMs - ); + } catch (error) { + if (this.resubOpts?.logResubMessages) { + console.error( + `[${this.logAccountName}] Error during polling:`, + error + ); + } + // On error, continue polling + this.pollingTimeoutId = setTimeout(poll, pollingInterval); + } + }; + + // Start polling immediately + poll(); } + private stopPolling(): void { + if (this.pollingTimeoutId) { + clearTimeout(this.pollingTimeoutId); + this.pollingTimeoutId = undefined; + } + } + + /** + * Fetch the current account state via RPC and process it through the same + * decoding and update pathway as WS notifications. + */ async fetch(): Promise { // Use gill's rpc for fetching account info const accountAddress = this.accountPublicKey.toBase58() as Address; @@ -294,6 +488,11 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { } unsubscribe(onResub = false): Promise { + /** + * Stop timers, polling, and WS subscription. + * - When called during a resubscribe (`onResub=true`), we preserve + * `resubOpts.resubTimeoutMs` for the restarted subscription. + */ if (!onResub && this.resubOpts) { this.resubOpts.resubTimeoutMs = undefined; } @@ -301,6 +500,9 @@ export class WebSocketAccountSubscriberV2 implements AccountSubscriber { clearTimeout(this.timeoutId); this.timeoutId = undefined; + // Stop polling if active + this.stopPolling(); + // Abort the WebSocket subscription if (this.abortController) { this.abortController.abort('unsubscribing'); diff --git a/sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts b/sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts new file mode 100644 index 000000000..f172083d1 --- /dev/null +++ b/sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts @@ -0,0 +1,720 @@ +import { + AccountSubscriber, + DataAndSlot, + DelistedMarketSetting, + DriftClientAccountEvents, + DriftClientAccountSubscriber, + NotSubscribedError, + ResubOpts, +} from './types'; +import { + isVariant, + PerpMarketAccount, + SpotMarketAccount, + StateAccount, +} from '../types'; +import { Program } from '@coral-xyz/anchor'; +import StrictEventEmitter from 'strict-event-emitter-types'; +import { EventEmitter } from 'events'; +import { + getDriftStateAccountPublicKey, + getPerpMarketPublicKey, + getSpotMarketPublicKey, +} from '../addresses/pda'; +import { Context, PublicKey } from '@solana/web3.js'; +import { + Commitment, + SolanaRpcSubscriptionsApi, + Rpc, + RpcSubscriptions, + createSolanaClient, +} from 'gill'; +import { OracleInfo, OraclePriceData } from '../oracles/types'; +import { OracleClientCache } from '../oracles/oracleClientCache'; +import { QUOTE_ORACLE_PRICE_DATA } from '../oracles/quoteAssetOracleClient'; +import { findAllMarketAndOracles } from '../config'; +import { findDelistedPerpMarketsAndOracles } from './utils'; +import { + getOracleId, + getPublicKeyAndSourceFromOracleId, +} from '../oracles/oracleId'; +import { OracleSource } from '../types'; +import { + getPerpMarketAccountsFilter, + getSpotMarketAccountsFilter, +} from '../memcmp'; +import { WebSocketProgramAccountSubscriberV2 } from './webSocketProgramAccountSubscriberV2'; +import { WebSocketAccountSubscriberV2 } from './webSocketAccountSubscriberV2'; +const ORACLE_DEFAULT_ID = getOracleId( + PublicKey.default, + OracleSource.QUOTE_ASSET +); + +export class WebSocketDriftClientAccountSubscriberV2 + implements DriftClientAccountSubscriber +{ + isSubscribed: boolean; + program: Program; + commitment?: Commitment; + perpMarketIndexes: number[]; + spotMarketIndexes: number[]; + oracleInfos: OracleInfo[]; + oracleClientCache = new OracleClientCache(); + + resubOpts?: ResubOpts; + shouldFindAllMarketsAndOracles: boolean; + skipInitialData: boolean = true; + + eventEmitter: StrictEventEmitter; + stateAccountSubscriber?: WebSocketAccountSubscriberV2; + perpMarketAllAccountsSubscriber: WebSocketProgramAccountSubscriberV2; + perpMarketAccountLatestData = new Map< + number, + DataAndSlot + >(); + spotMarketAllAccountsSubscriber: WebSocketProgramAccountSubscriberV2; + spotMarketAccountLatestData = new Map< + number, + DataAndSlot + >(); + perpOracleMap = new Map(); + perpOracleStringMap = new Map(); + spotOracleMap = new Map(); + spotOracleStringMap = new Map(); + oracleSubscribers = new Map>(); + delistedMarketSetting: DelistedMarketSetting; + + initialPerpMarketAccountData: Map; + initialSpotMarketAccountData: Map; + initialOraclePriceData: Map; + + protected isSubscribing = false; + protected subscriptionPromise: Promise; + protected subscriptionPromiseResolver: (val: boolean) => void; + + private rpc: Rpc; + private rpcSubscriptions: RpcSubscriptions & + string; + + public constructor( + program: Program, + perpMarketIndexes: number[], + spotMarketIndexes: number[], + oracleInfos: OracleInfo[], + shouldFindAllMarketsAndOracles: boolean, + delistedMarketSetting: DelistedMarketSetting, + resubOpts?: ResubOpts, + commitment?: Commitment, + skipInitialData?: boolean + ) { + this.isSubscribed = false; + this.program = program; + this.eventEmitter = new EventEmitter(); + this.perpMarketIndexes = perpMarketIndexes; + this.spotMarketIndexes = spotMarketIndexes; + this.oracleInfos = oracleInfos; + this.shouldFindAllMarketsAndOracles = shouldFindAllMarketsAndOracles; + this.delistedMarketSetting = delistedMarketSetting; + this.resubOpts = resubOpts; + this.commitment = commitment; + this.skipInitialData = skipInitialData ?? false; + + const { rpc, rpcSubscriptions } = createSolanaClient({ + urlOrMoniker: this.program.provider.connection.rpcEndpoint, + }); + this.rpc = rpc; + this.rpcSubscriptions = rpcSubscriptions; + } + + public async subscribe(): Promise { + const startTime = performance.now(); + if (this.isSubscribed) { + console.log( + `[PROFILING] WebSocketDriftClientAccountSubscriberV2.subscribe() skipped - already subscribed` + ); + return true; + } + + if (this.isSubscribing) { + console.log( + `[PROFILING] WebSocketDriftClientAccountSubscriberV2.subscribe() waiting for existing subscription` + ); + return await this.subscriptionPromise; + } + + this.isSubscribing = true; + + this.subscriptionPromise = new Promise((res) => { + this.subscriptionPromiseResolver = res; + }); + + const [perpMarketAccountPubkeys, spotMarketAccountPubkeys] = + await Promise.all([ + Promise.all( + this.perpMarketIndexes.map((marketIndex) => + getPerpMarketPublicKey(this.program.programId, marketIndex) + ) + ), + Promise.all( + this.spotMarketIndexes.map((marketIndex) => + getSpotMarketPublicKey(this.program.programId, marketIndex) + ) + ), + ]); + + // Profile findAllMarketsAndOracles if needed + let findAllMarketsDuration = 0; + if (this.shouldFindAllMarketsAndOracles) { + const findAllMarketsStartTime = performance.now(); + const { + perpMarketIndexes, + perpMarketAccounts, + spotMarketIndexes, + spotMarketAccounts, + oracleInfos, + } = await findAllMarketAndOracles(this.program); + this.perpMarketIndexes = perpMarketIndexes; + this.spotMarketIndexes = spotMarketIndexes; + this.oracleInfos = oracleInfos; + // front run and set the initial data here to save extra gma call in set initial data + this.initialPerpMarketAccountData = new Map( + perpMarketAccounts.map((market) => [market.marketIndex, market]) + ); + this.initialSpotMarketAccountData = new Map( + spotMarketAccounts.map((market) => [market.marketIndex, market]) + ); + const findAllMarketsEndTime = performance.now(); + findAllMarketsDuration = findAllMarketsEndTime - findAllMarketsStartTime; + console.log( + `[PROFILING] findAllMarketAndOracles completed in ${findAllMarketsDuration.toFixed( + 2 + )}ms (${perpMarketAccounts.length} perp markets, ${ + spotMarketAccounts.length + } spot markets, ${oracleInfos.length} oracles)` + ); + } else { + console.log( + `[PROFILING] findAllMarketAndOracles skipped (shouldFindAllMarketsAndOracles=false)` + ); + } + // Create subscribers + this.perpMarketAllAccountsSubscriber = + new WebSocketProgramAccountSubscriberV2( + 'PerpMarketAccountsSubscriber', + 'PerpMarket', + this.program, + this.program.account.perpMarket.coder.accounts.decodeUnchecked.bind( + this.program.account.perpMarket.coder.accounts + ), + { + filters: [getPerpMarketAccountsFilter()], + commitment: this.commitment, + }, + this.resubOpts, + perpMarketAccountPubkeys // because we pass these in, it will monitor these accounts and fetch them right away + ); + + this.spotMarketAllAccountsSubscriber = + new WebSocketProgramAccountSubscriberV2( + 'SpotMarketAccountsSubscriber', + 'SpotMarket', + this.program, + this.program.account.spotMarket.coder.accounts.decodeUnchecked.bind( + this.program.account.spotMarket.coder.accounts + ), + { + filters: [getSpotMarketAccountsFilter()], + commitment: this.commitment, + }, + this.resubOpts, + spotMarketAccountPubkeys // because we pass these in, it will monitor these accounts and fetch them right away + ); + + // Run all subscriptions in parallel + await Promise.all([ + // Perp market subscription + this.perpMarketAllAccountsSubscriber.subscribe( + ( + _accountId: PublicKey, + data: PerpMarketAccount, + context: Context, + _buffer: Buffer + ) => { + if ( + this.delistedMarketSetting !== DelistedMarketSetting.Subscribe && + isVariant(data.status, 'delisted') + ) { + return; + } + this.perpMarketAccountLatestData.set(data.marketIndex, { + data, + slot: context.slot, + }); + this.eventEmitter.emit('perpMarketAccountUpdate', data); + this.eventEmitter.emit('update'); + } + ), + // Spot market subscription + this.spotMarketAllAccountsSubscriber.subscribe( + ( + _accountId: PublicKey, + data: SpotMarketAccount, + context: Context, + _buffer: Buffer + ) => { + if ( + this.delistedMarketSetting !== DelistedMarketSetting.Subscribe && + isVariant(data.status, 'delisted') + ) { + return; + } + this.spotMarketAccountLatestData.set(data.marketIndex, { + data, + slot: context.slot, + }); + this.eventEmitter.emit('spotMarketAccountUpdate', data); + this.eventEmitter.emit('update'); + } + ), + // State account subscription + (async () => { + const statePublicKey = await getDriftStateAccountPublicKey( + this.program.programId + ); + this.stateAccountSubscriber = new WebSocketAccountSubscriberV2( + 'state', + this.program, + statePublicKey, + undefined, + undefined, + this.commitment as Commitment, + this.rpcSubscriptions, + this.rpc + ); + await Promise.all([ + this.stateAccountSubscriber.fetch(), + this.stateAccountSubscriber.subscribe((data: StateAccount) => { + this.eventEmitter.emit('stateAccountUpdate', data); + this.eventEmitter.emit('update'); + }), + ]); + })(), + (async () => { + await this.setInitialData(); + const subscribeToOraclesStartTime = performance.now(); + await this.subscribeToOracles(); + const subscribeToOraclesEndTime = performance.now(); + const duration = + subscribeToOraclesEndTime - subscribeToOraclesStartTime; + return duration; + })(), + ]); + + const initialPerpMarketDataFromLatestData = new Map( + Array.from(this.perpMarketAccountLatestData.values()).map((data) => [ + data.data.marketIndex, + data.data, + ]) + ); + const initialSpotMarketDataFromLatestData = new Map( + Array.from(this.spotMarketAccountLatestData.values()).map((data) => [ + data.data.marketIndex, + data.data, + ]) + ); + this.initialPerpMarketAccountData = initialPerpMarketDataFromLatestData; + this.initialSpotMarketAccountData = initialSpotMarketDataFromLatestData; + + this.eventEmitter.emit('update'); + + await this.handleDelistedMarketOracles(); + + await Promise.all([this.setPerpOracleMap(), this.setSpotOracleMap()]); + + this.isSubscribing = false; + this.isSubscribed = true; + this.subscriptionPromiseResolver(true); + + // delete initial data + this.removeInitialData(); + + const totalDuration = performance.now() - startTime; + console.log( + `[PROFILING] WebSocketDriftClientAccountSubscriberV2.subscribe() completed in ${totalDuration.toFixed( + 2 + )}ms` + ); + + return true; + } + + chunks = (array: readonly T[], size: number): T[][] => { + const result: T[][] = []; + for (let i = 0; i < array.length; i += size) { + result.push(array.slice(i, i + size)); + } + return result; + }; + + public async fetch(): Promise { + await this.setInitialData(); + } + + /** + * This is a no-op method that always returns true. + * Unlike the previous implementation, we don't need to manually subscribe to individual perp markets + * because we automatically receive updates for all program account changes via a single websocket subscription. + * This means any new perp markets will automatically be included without explicit subscription. + * @param marketIndex The perp market index to add (unused) + * @returns Promise that resolves to true + */ + public addPerpMarket(_marketIndex: number): Promise { + return Promise.resolve(true); + } + + /** + * This is a no-op method that always returns true. + * Unlike the previous implementation, we don't need to manually subscribe to individual spot markets + * because we automatically receive updates for all program account changes via a single websocket subscription. + * This means any new spot markets will automatically be included without explicit subscription. + * @param marketIndex The spot market index to add (unused) + * @returns Promise that resolves to true + */ + public addSpotMarket(_marketIndex: number): Promise { + return Promise.resolve(true); + } + + // TODO: need more options to skip loading perp market and spot market data. Because of how we fetch within the program account subscribers, I am commenting this all out + async setInitialData(): Promise { + const connection = this.program.provider.connection; + // Profile oracle initial data setup + const oracleSetupStartTime = performance.now(); + const oracleAccountPubkeyChunks = this.chunks( + this.oracleInfos.map((oracleInfo) => oracleInfo.publicKey), + 100 + ); + const oracleAccountInfos = ( + await Promise.all( + oracleAccountPubkeyChunks.map((oracleAccountPublicKeysChunk) => + connection.getMultipleAccountsInfo(oracleAccountPublicKeysChunk) + ) + ) + ).flat(); + this.initialOraclePriceData = new Map( + this.oracleInfos.reduce((result, oracleInfo, i) => { + if (!oracleAccountInfos[i]) { + return result; + } + + const oracleClient = this.oracleClientCache.get( + oracleInfo.source, + connection, + this.program + ); + + const oraclePriceData = oracleClient.getOraclePriceDataFromBuffer( + oracleAccountInfos[i].data + ); + + result.push([ + getOracleId(oracleInfo.publicKey, oracleInfo.source), + oraclePriceData, + ]); + return result; + }, []) + ); + const oracleSetupEndTime = performance.now(); + const oracleSetupDuration = oracleSetupEndTime - oracleSetupStartTime; + if (this.resubOpts?.logResubMessages) { + console.log( + `[PROFILING] Oracle initial data setup completed in ${oracleSetupDuration.toFixed( + 2 + )}ms (${this.initialOraclePriceData.size} oracles)` + ); + } + + // emit initial oracle price data + Array.from(this.initialOraclePriceData.entries()).forEach( + ([oracleId, oraclePriceData]) => { + const { publicKey, source } = + getPublicKeyAndSourceFromOracleId(oracleId); + this.eventEmitter.emit( + 'oraclePriceUpdate', + publicKey, + source, + oraclePriceData + ); + } + ); + this.eventEmitter.emit('update'); + } + + removeInitialData() { + this.initialPerpMarketAccountData = new Map(); + this.initialSpotMarketAccountData = new Map(); + this.initialOraclePriceData = new Map(); + } + + async subscribeToOracles(): Promise { + const startTime = performance.now(); + + // Filter out default oracles and duplicates to avoid unnecessary subscriptions + const validOracleInfos = this.oracleInfos.filter( + (oracleInfo) => + !this.oracleSubscribers.has( + getOracleId(oracleInfo.publicKey, oracleInfo.source) + ) + ); + + await Promise.all( + validOracleInfos.map((oracleInfo) => this.subscribeToOracle(oracleInfo)) + ); + + const totalDuration = performance.now() - startTime; + console.log( + `[PROFILING] subscribeToOracles() completed in ${totalDuration.toFixed( + 2 + )}ms` + ); + + return true; + } + + async subscribeToOracle(oracleInfo: OracleInfo): Promise { + const oracleId = getOracleId(oracleInfo.publicKey, oracleInfo.source); + + const client = this.oracleClientCache.get( + oracleInfo.source, + this.program.provider.connection, + this.program + ); + const accountSubscriber = new WebSocketAccountSubscriberV2( + 'oracle', + this.program, + oracleInfo.publicKey, + (buffer: Buffer) => { + return client.getOraclePriceDataFromBuffer(buffer); + }, + this.resubOpts, + this.commitment, + this.rpcSubscriptions, + this.rpc + ); + const initialOraclePriceData = this.initialOraclePriceData?.get(oracleId); + if (initialOraclePriceData) { + accountSubscriber.setData(initialOraclePriceData); + } + await accountSubscriber.subscribe((data: OraclePriceData) => { + this.eventEmitter.emit( + 'oraclePriceUpdate', + oracleInfo.publicKey, + oracleInfo.source, + data + ); + this.eventEmitter.emit('update'); + }); + + this.oracleSubscribers.set(oracleId, accountSubscriber); + + return true; + } + + async unsubscribeFromMarketAccounts(): Promise { + await this.perpMarketAllAccountsSubscriber.unsubscribe(); + } + + async unsubscribeFromSpotMarketAccounts(): Promise { + await this.spotMarketAllAccountsSubscriber.unsubscribe(); + } + + async unsubscribeFromOracles(): Promise { + await Promise.all( + Array.from(this.oracleSubscribers.values()).map((accountSubscriber) => + accountSubscriber.unsubscribe() + ) + ); + } + + public async unsubscribe(): Promise { + if (!this.isSubscribed) { + return; + } + + await Promise.all([ + this.stateAccountSubscriber?.unsubscribe(), + this.unsubscribeFromMarketAccounts(), + this.unsubscribeFromSpotMarketAccounts(), + this.unsubscribeFromOracles(), + ]); + + this.isSubscribed = false; + } + + async addOracle(oracleInfo: OracleInfo): Promise { + const oracleId = getOracleId(oracleInfo.publicKey, oracleInfo.source); + if (this.oracleSubscribers.has(oracleId)) { + return true; + } + + if (oracleInfo.publicKey.equals(PublicKey.default)) { + return true; + } + + return this.subscribeToOracle(oracleInfo); + } + + async setPerpOracleMap() { + const perpMarkets = this.getMarketAccountsAndSlots(); + const addOraclePromises = []; + for (const perpMarket of perpMarkets) { + if (!perpMarket || !perpMarket.data) { + continue; + } + const perpMarketAccount = perpMarket.data; + const perpMarketIndex = perpMarketAccount.marketIndex; + const oracle = perpMarketAccount.amm.oracle; + const oracleId = getOracleId(oracle, perpMarket.data.amm.oracleSource); + if (!this.oracleSubscribers.has(oracleId)) { + addOraclePromises.push( + this.addOracle({ + publicKey: oracle, + source: perpMarket.data.amm.oracleSource, + }) + ); + } + this.perpOracleMap.set(perpMarketIndex, oracle); + this.perpOracleStringMap.set(perpMarketIndex, oracleId); + } + await Promise.all(addOraclePromises); + } + + async setSpotOracleMap() { + const spotMarkets = this.getSpotMarketAccountsAndSlots(); + const addOraclePromises = []; + for (const spotMarket of spotMarkets) { + if (!spotMarket || !spotMarket.data) { + continue; + } + const spotMarketAccount = spotMarket.data; + const spotMarketIndex = spotMarketAccount.marketIndex; + const oracle = spotMarketAccount.oracle; + const oracleId = getOracleId(oracle, spotMarketAccount.oracleSource); + if (!this.oracleSubscribers.has(oracleId)) { + addOraclePromises.push( + this.addOracle({ + publicKey: oracle, + source: spotMarketAccount.oracleSource, + }) + ); + } + this.spotOracleMap.set(spotMarketIndex, oracle); + this.spotOracleStringMap.set(spotMarketIndex, oracleId); + } + await Promise.all(addOraclePromises); + } + + async handleDelistedMarketOracles(): Promise { + if (this.delistedMarketSetting === DelistedMarketSetting.Subscribe) { + return; + } + + const { oracles } = findDelistedPerpMarketsAndOracles( + this.getMarketAccountsAndSlots(), + this.getSpotMarketAccountsAndSlots() + ); + + for (const oracle of oracles) { + const oracleId = getOracleId(oracle.publicKey, oracle.source); + if (this.oracleSubscribers.has(oracleId)) { + await this.oracleSubscribers.get(oracleId).unsubscribe(); + if (this.delistedMarketSetting === DelistedMarketSetting.Discard) { + this.oracleSubscribers.delete(oracleId); + } + } + } + } + + assertIsSubscribed(): void { + if (!this.isSubscribed) { + throw new NotSubscribedError( + 'You must call `subscribe` before using this function' + ); + } + } + + public getStateAccountAndSlot(): DataAndSlot { + this.assertIsSubscribed(); + return this.stateAccountSubscriber.dataAndSlot; + } + + public getMarketAccountAndSlot( + marketIndex: number + ): DataAndSlot | undefined { + this.assertIsSubscribed(); + return this.perpMarketAccountLatestData.get(marketIndex); + } + + public getMarketAccountsAndSlots(): DataAndSlot[] { + return Array.from(this.perpMarketAccountLatestData.values()); + } + + public getSpotMarketAccountAndSlot( + marketIndex: number + ): DataAndSlot | undefined { + this.assertIsSubscribed(); + return this.spotMarketAccountLatestData.get(marketIndex); + } + + public getSpotMarketAccountsAndSlots(): DataAndSlot[] { + return Array.from(this.spotMarketAccountLatestData.values()); + } + + public getOraclePriceDataAndSlot( + oracleId: string + ): DataAndSlot | undefined { + this.assertIsSubscribed(); + if (oracleId === ORACLE_DEFAULT_ID) { + return { + data: QUOTE_ORACLE_PRICE_DATA, + slot: 0, + }; + } + return this.oracleSubscribers.get(oracleId).dataAndSlot; + } + + public getOraclePriceDataAndSlotForPerpMarket( + marketIndex: number + ): DataAndSlot | undefined { + const perpMarketAccount = this.getMarketAccountAndSlot(marketIndex); + const oracle = this.perpOracleMap.get(marketIndex); + const oracleId = this.perpOracleStringMap.get(marketIndex); + if (!perpMarketAccount || !oracleId) { + return undefined; + } + + if (!perpMarketAccount.data.amm.oracle.equals(oracle)) { + // If the oracle has changed, we need to update the oracle map in background + this.setPerpOracleMap(); + } + + return this.getOraclePriceDataAndSlot(oracleId); + } + + public getOraclePriceDataAndSlotForSpotMarket( + marketIndex: number + ): DataAndSlot | undefined { + const spotMarketAccount = this.getSpotMarketAccountAndSlot(marketIndex); + const oracle = this.spotOracleMap.get(marketIndex); + const oracleId = this.spotOracleStringMap.get(marketIndex); + if (!spotMarketAccount || !oracleId) { + return undefined; + } + + if (!spotMarketAccount.data.oracle.equals(oracle)) { + // If the oracle has changed, we need to update the oracle map in background + this.setSpotOracleMap(); + } + + return this.getOraclePriceDataAndSlot(oracleId); + } +} diff --git a/sdk/src/accounts/webSocketProgramAccountSubscriberV2.ts b/sdk/src/accounts/webSocketProgramAccountSubscriberV2.ts index 1176b8658..5781cc7fb 100644 --- a/sdk/src/accounts/webSocketProgramAccountSubscriberV2.ts +++ b/sdk/src/accounts/webSocketProgramAccountSubscriberV2.ts @@ -7,11 +7,92 @@ import { AccountInfoWithBase64EncodedData, createSolanaClient, isAddress, + Lamports, + Slot, type Address, type Commitment as GillCommitment, } from 'gill'; import bs58 from 'bs58'; +type ProgramAccountSubscriptionAsyncIterable = AsyncIterable< + Readonly<{ + context: Readonly<{ + slot: Slot; + }>; + value: Readonly<{ + account: Readonly<{ + executable: boolean; + lamports: Lamports; + owner: Address; + rentEpoch: bigint; + space: bigint; + }> & + Readonly; + pubkey: Address; + }>; + }> +>; +/** + * WebSocketProgramAccountSubscriberV2 + * + * High-level overview + * - WebSocket-first subscriber for Solana program accounts that also layers in + * targeted polling to detect missed updates reliably. + * - Emits decoded account updates via the provided `onChange` callback. + * - Designed to focus extra work on the specific accounts the consumer cares + * about ("monitored accounts") while keeping baseline WS behavior for the + * full program subscription. + * + * Why polling if this is a WebSocket subscriber? + * - WS infra can stall, drop, or reorder notifications under network stress or + * provider hiccups. When that happens, critical account changes can be missed. + * - To mitigate this, the class maintains a small set of accounts(provided via constructor) to monitor + * and uses light polling to verify whether a WS change was missed. + * - If polling detects a newer slot with different data than the last seen + * buffer, a centralized resubscription is triggered to restore a clean stream. + * + * Initial polling (on subscribe) + * - On `subscribe()`, we first perform a single batched fetch of all monitored + * accounts ("initial monitor fetch"). + * - Purpose: seed the internal `bufferAndSlotMap` and emit the latest state so + * consumers have up-to-date data immediately, even before WS events arrive. + * - This step does not decide resubscription; it only establishes ground truth. + * + * Continuous polling (only for monitored accounts) + * - After seeding, each monitored account is put into a monitoring cycle: + * 1) If no WS notification for an account is observed for `pollingIntervalMs`, + * we enqueue it for a batched fetch (buffered for a short window). + * 2) Once an account enters the "currently polling" set, a shared batch poll + * runs every `pollingIntervalMs` across all such accounts. + * 3) If WS notifications resume for an account, that account is removed from + * the polling set and returns to passive monitoring. + * - Polling compares the newly fetched buffer with the last stored buffer at a + * later slot. A difference indicates a missed update; we schedule a single + * resubscription (coalesced across accounts) to re-sync. + * + * Accounts the consumer cares about + * - Provide accounts up-front via the constructor `accountsToMonitor`, or add + * them dynamically with `addAccountToMonitor()` and remove with + * `removeAccountFromMonitor()`. + * - Only these accounts incur additional polling safeguards; other accounts are + * still processed from the WS stream normally. + * + * Resubscription strategy + * - Missed updates from any monitored account are coalesced and trigger a single + * resubscription after a short delay. This avoids rapid churn. + * - If `resubOpts.resubTimeoutMs` is set, an inactivity timer also performs a + * batch check of monitored accounts. If a missed update is found, the same + * centralized resubscription flow is used. + * + * Tuning knobs + * - `setPollingInterval(ms)`: adjust how often monitoring/polling runs + * (default 30s). Shorter = faster detection, higher RPC load. + * - `initialFetchBufferMs` (internal): small delay to batch initial monitoring + * fetch requests, minimizing RPC calls when many accounts are added at once. + * - Batch size for `getMultipleAccounts` is limited to 100, requests are chunked + * and processed concurrently. + */ + export class WebSocketProgramAccountSubscriberV2 implements ProgramAccountSubscriber { @@ -51,6 +132,16 @@ export class WebSocketProgramAccountSubscriberV2 private accountsCurrentlyPolling: Set = new Set(); // Track which accounts are being polled private batchPollingTimeout?: ReturnType; // Single timeout for batch polling + // For batching initial account fetches + private accountsPendingInitialMonitorFetch: Set = new Set(); // Track accounts waiting for initial monitor fetch + private initialFetchTimeout?: ReturnType; // Single timeout for initial monitoring batch fetch + private initialFetchBufferMs: number = 1000; // Buffer time to collect accounts for initial monitoring fetch + + // Centralized resubscription handling + private missedChangeDetected = false; // Flag to track if any missed change was detected + private resubscriptionTimeout?: ReturnType; // Timeout for delayed resubscription + private accountsWithMissedUpdates: Set = new Set(); // Track which accounts had missed updates + public constructor( subscriptionName: string, accountDiscriminator: string, @@ -92,6 +183,30 @@ export class WebSocketProgramAccountSubscriberV2 this.rpcSubscriptions = rpcSubscriptions; } + private async handleNotificationLoop( + notificationPromise: Promise + ) { + const subscriptionIterable = await notificationPromise; + for await (const notification of subscriptionIterable) { + if (this.resubOpts?.resubTimeoutMs) { + this.receivingData = true; + clearTimeout(this.timeoutId); + this.handleRpcResponse( + notification.context, + notification.value.pubkey, + notification.value.account.data + ); + this.setTimeout(); + } else { + this.handleRpcResponse( + notification.context, + notification.value.pubkey, + notification.value.account.data + ); + } + } + } + async subscribe( onChange: ( accountId: PublicKey, @@ -100,62 +215,91 @@ export class WebSocketProgramAccountSubscriberV2 buffer: Buffer ) => void ): Promise { + /** + * Start the WebSocket subscription and initialize polling safeguards. + * + * Flow + * - Seeds all monitored accounts with a single batched RPC fetch and emits + * their current state. + * - Subscribes to program notifications via WS using gill. + * - If `resubOpts.resubTimeoutMs` is set, starts an inactivity timer that + * batch-checks monitored accounts when WS goes quiet. + * - Begins monitoring for accounts that may need polling when WS + * notifications are not observed within `pollingIntervalMs`. + * + * @param onChange Callback invoked with decoded account data when an update + * is detected (via WS or batch RPC fetch). + */ + const startTime = performance.now(); if (this.listenerId != null || this.isUnsubscribing) { return; } + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.subscriptionName}] initializing subscription. This many monitored accounts: ${this.accountsToMonitor.size}` + ); + } + this.onChange = onChange; + // initial fetch of monitored data - only fetch and populate, don't check for missed changes + await this.fetchAndPopulateAllMonitoredAccounts(); + // Create abort controller for proper cleanup const abortController = new AbortController(); this.abortController = abortController; + this.listenerId = Math.random(); // Unique ID for logging purposes + + if (this.resubOpts?.resubTimeoutMs) { + this.receivingData = true; + this.setTimeout(); + } + // Subscribe to program account changes using gill's rpcSubscriptions const programId = this.program.programId.toBase58(); if (isAddress(programId)) { - const subscription = await this.rpcSubscriptions + const subscriptionPromise = this.rpcSubscriptions .programNotifications(programId, { commitment: this.options.commitment as GillCommitment, encoding: 'base64', - filters: this.options.filters.map((filter) => ({ - memcmp: { - offset: BigInt(filter.memcmp.offset), - bytes: filter.memcmp.bytes as any, - encoding: 'base64' as const, - }, - })), + filters: this.options.filters.map((filter) => { + // Convert filter bytes from base58 to base64 if needed + let bytes = filter.memcmp.bytes; + if ( + typeof bytes === 'string' && + /^[1-9A-HJ-NP-Za-km-z]+$/.test(bytes) + ) { + // Looks like base58 - convert to base64 + const decoded = bs58.decode(bytes); + bytes = Buffer.from(decoded).toString('base64'); + } + + return { + memcmp: { + offset: BigInt(filter.memcmp.offset), + bytes: bytes as any, + encoding: 'base64' as const, + }, + }; + }), }) .subscribe({ abortSignal: abortController.signal, }); - for await (const notification of subscription) { - if (this.resubOpts?.resubTimeoutMs) { - this.receivingData = true; - clearTimeout(this.timeoutId); - this.handleRpcResponse( - notification.context, - notification.value.account - ); - this.setTimeout(); - } else { - this.handleRpcResponse( - notification.context, - notification.value.account - ); - } - } - } - - this.listenerId = Math.random(); // Unique ID for logging purposes - - if (this.resubOpts?.resubTimeoutMs) { - this.receivingData = true; - this.setTimeout(); + // Start notification loop without awaiting + this.handleNotificationLoop(subscriptionPromise); + // Start monitoring for accounts that may need polling if no WS event is received + this.startMonitoringForAccounts(); } - - // Start monitoring for accounts that may need polling if no WS event is received - this.startMonitoringForAccounts(); + const endTime = performance.now(); + console.log( + `[PROFILING] ${this.subscriptionName}.subscribe() completed in ${ + endTime - startTime + }ms` + ); } protected setTimeout(): void { @@ -172,12 +316,21 @@ export class WebSocketProgramAccountSubscriberV2 if (this.receivingData) { if (this.resubOpts?.logResubMessages) { console.log( - `No ws data from ${this.subscriptionName} in ${this.resubOpts?.resubTimeoutMs}ms, resubscribing` + `No ws data from ${this.subscriptionName} in ${this.resubOpts?.resubTimeoutMs}ms, checking for missed changes` ); } - await this.unsubscribe(true); - this.receivingData = false; - await this.subscribe(this.onChange); + + // Check for missed changes in monitored accounts + const missedChangeDetected = await this.fetchAllMonitoredAccounts(); + + if (missedChangeDetected) { + // Signal missed change with a generic identifier since we don't have specific account IDs from this context + this.signalMissedChange('timeout-check'); + } else { + // No missed changes, continue monitoring + this.receivingData = false; + this.setTimeout(); + } } }, this.resubOpts?.resubTimeoutMs @@ -186,19 +339,23 @@ export class WebSocketProgramAccountSubscriberV2 handleRpcResponse( context: { slot: bigint }, + accountId: Address, accountInfo?: AccountInfoBase & - (AccountInfoWithBase58EncodedData | AccountInfoWithBase64EncodedData) + ( + | AccountInfoWithBase58EncodedData + | AccountInfoWithBase64EncodedData + )['data'] ): void { const newSlot = Number(context.slot); let newBuffer: Buffer | undefined = undefined; if (accountInfo) { // Extract data from gill response - if (accountInfo.data) { + if (accountInfo) { // Handle different data formats from gill - if (Array.isArray(accountInfo.data)) { + if (Array.isArray(accountInfo)) { // If it's a tuple [data, encoding] - const [data, encoding] = accountInfo.data; + const [data, encoding] = accountInfo; if (encoding === ('base58' as any)) { // Convert base58 to buffer using bs58 @@ -210,12 +367,7 @@ export class WebSocketProgramAccountSubscriberV2 } } - // Convert gill's account key to PublicKey - // Note: accountInfo doesn't have a key property, we need to get it from the notification - // For now, we'll use a placeholder - this needs to be fixed based on the actual gill API - const accountId = new PublicKey('11111111111111111111111111111111'); // Placeholder - const accountIdString = accountId.toBase58(); - + const accountIdString = accountId.toString(); const existingBufferAndSlot = this.bufferAndSlotMap.get(accountIdString); // Track WebSocket notification time for this account @@ -242,7 +394,8 @@ export class WebSocketProgramAccountSubscriberV2 slot: newSlot, }); const account = this.decodeBuffer(this.accountDiscriminator, newBuffer); - this.onChange(accountId, account, { slot: newSlot }, newBuffer); + const accountIdPubkey = new PublicKey(accountId.toString()); + this.onChange(accountIdPubkey, account, { slot: newSlot }, newBuffer); } return; } @@ -258,7 +411,12 @@ export class WebSocketProgramAccountSubscriberV2 slot: newSlot, }); const account = this.decodeBuffer(this.accountDiscriminator, newBuffer); - this.onChange(accountId, account, { slot: newSlot }, newBuffer); + this.onChange( + new PublicKey(accountId.toString()), + account, + { slot: newSlot }, + newBuffer + ); } } @@ -290,10 +448,14 @@ export class WebSocketProgramAccountSubscriberV2 !lastNotificationTime || currentTime - lastNotificationTime >= this.pollingIntervalMs ) { - // No recent WS notification, start polling - await this.pollAccount(accountIdString); - // Schedule next poll - this.startPollingForAccount(accountIdString); + if (this.resubOpts?.logResubMessages) { + console.debug( + `[${this.subscriptionName}] No recent WS notification, starting initial fetch for account`, + accountIdString + ); + } + // No recent WS notification, add to pending initial fetch + this.addToInitialFetchBatch(accountIdString); } else { // We received a WS notification recently, continue monitoring this.startMonitoringForAccount(accountIdString); @@ -303,17 +465,10 @@ export class WebSocketProgramAccountSubscriberV2 this.pollingTimeouts.set(accountIdString, timeoutId); } - private startPollingForAccount(accountIdString: string): void { - // Add account to polling set - this.accountsCurrentlyPolling.add(accountIdString); - - // If this is the first account being polled, start batch polling - if (this.accountsCurrentlyPolling.size === 1) { - this.startBatchPolling(); - } - } - private startBatchPolling(): void { + if (this.resubOpts?.logResubMessages) { + console.debug(`[${this.subscriptionName}] Scheduling batch polling`); + } // Clear existing batch polling timeout if (this.batchPollingTimeout) { clearTimeout(this.batchPollingTimeout); @@ -335,8 +490,40 @@ export class WebSocketProgramAccountSubscriberV2 return; } + if (this.resubOpts?.logResubMessages) { + console.debug( + `[${this.subscriptionName}] Polling all accounts`, + accountsToPoll.length, + 'accounts' + ); + } + + // Use the shared batch fetch method + await this.fetchAccountsBatch(accountsToPoll); + } catch (error) { + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.subscriptionName}] Error batch polling accounts:`, + error + ); + } + } + } + + /** + * Fetches and populates all monitored accounts data without checking for missed changes + * This is used during initial subscription to populate data + */ + private async fetchAndPopulateAllMonitoredAccounts(): Promise { + try { + // Get all accounts currently being polled + const accountsToMonitor = Array.from(this.accountsToMonitor); + if (accountsToMonitor.length === 0) { + return; + } + // Fetch all accounts in a single batch request - const accountAddresses = accountsToPoll.map( + const accountAddresses = accountsToMonitor.map( (accountId) => accountId as Address ); const rpcResponse = await this.rpc @@ -349,8 +536,8 @@ export class WebSocketProgramAccountSubscriberV2 const currentSlot = Number(rpcResponse.context.slot); // Process each account response - for (let i = 0; i < accountsToPoll.length; i++) { - const accountIdString = accountsToPoll[i]; + for (let i = 0; i < accountsToMonitor.length; i++) { + const accountIdString = accountsToMonitor[i]; const accountInfo = rpcResponse.value[i]; if (!accountInfo) { @@ -385,7 +572,7 @@ export class WebSocketProgramAccountSubscriberV2 continue; } - // Check if we missed an update + // For initial population, just update the slot if we have newer data if (currentSlot > existingBufferAndSlot.slot) { let newBuffer: Buffer | undefined = undefined; if (accountInfo.data) { @@ -399,56 +586,74 @@ export class WebSocketProgramAccountSubscriberV2 } } - // Check if buffer has changed - if ( - newBuffer && - (!existingBufferAndSlot.buffer || - !newBuffer.equals(existingBufferAndSlot.buffer)) - ) { - if (this.resubOpts?.logResubMessages) { - console.log( - `[${this.subscriptionName}] Batch polling detected missed update for account ${accountIdString}, resubscribing` - ); - } - // We missed an update, resubscribe - await this.unsubscribe(true); - this.receivingData = false; - await this.subscribe(this.onChange); - return; + // Update with newer data if available + if (newBuffer) { + this.bufferAndSlotMap.set(accountIdString, { + buffer: newBuffer, + slot: currentSlot, + }); + const account = this.decodeBuffer( + this.accountDiscriminator, + newBuffer + ); + const accountId = new PublicKey(accountIdString); + this.onChange(accountId, account, { slot: currentSlot }, newBuffer); } } } } catch (error) { if (this.resubOpts?.logResubMessages) { console.log( - `[${this.subscriptionName}] Error batch polling accounts:`, + `[${this.subscriptionName}] Error fetching and populating monitored accounts:`, error ); } } } - private async pollAccount(accountIdString: string): Promise { + /** + * Fetches all monitored accounts and checks for missed changes + * Returns true if a missed change was detected and resubscription is needed + */ + private async fetchAllMonitoredAccounts(): Promise { try { - // Fetch current account data using gill's rpc - const accountAddress = accountIdString as Address; + // Get all accounts currently being polled + const accountsToMonitor = Array.from(this.accountsToMonitor); + if (accountsToMonitor.length === 0) { + return false; + } + + // Fetch all accounts in a single batch request + const accountAddresses = accountsToMonitor.map( + (accountId) => accountId as Address + ); const rpcResponse = await this.rpc - .getAccountInfo(accountAddress, { + .getMultipleAccounts(accountAddresses, { commitment: this.options.commitment as GillCommitment, encoding: 'base64', }) .send(); const currentSlot = Number(rpcResponse.context.slot); - const existingBufferAndSlot = this.bufferAndSlotMap.get(accountIdString); - if (!existingBufferAndSlot) { - // Account not in our map yet, add it - if (rpcResponse.value) { + // Process each account response + for (let i = 0; i < accountsToMonitor.length; i++) { + const accountIdString = accountsToMonitor[i]; + const accountInfo = rpcResponse.value[i]; + + if (!accountInfo) { + continue; + } + + const existingBufferAndSlot = + this.bufferAndSlotMap.get(accountIdString); + + if (!existingBufferAndSlot) { + // Account not in our map yet, add it let newBuffer: Buffer | undefined = undefined; - if (rpcResponse.value.data) { - if (Array.isArray(rpcResponse.value.data)) { - const [data, encoding] = rpcResponse.value.data; + if (accountInfo.data) { + if (Array.isArray(accountInfo.data)) { + const [data, encoding] = accountInfo.data; newBuffer = Buffer.from(data, encoding); } } @@ -465,17 +670,15 @@ export class WebSocketProgramAccountSubscriberV2 const accountId = new PublicKey(accountIdString); this.onChange(accountId, account, { slot: currentSlot }, newBuffer); } + continue; } - return; - } - // Check if we missed an update - if (currentSlot > existingBufferAndSlot.slot) { - let newBuffer: Buffer | undefined = undefined; - if (rpcResponse.value) { - if (rpcResponse.value.data) { - if (Array.isArray(rpcResponse.value.data)) { - const [data, encoding] = rpcResponse.value.data; + // Check if we missed an update + if (currentSlot > existingBufferAndSlot.slot) { + let newBuffer: Buffer | undefined = undefined; + if (accountInfo.data) { + if (Array.isArray(accountInfo.data)) { + const [data, encoding] = accountInfo.data; if (encoding === ('base58' as any)) { newBuffer = Buffer.from(bs58.decode(data)); } else { @@ -483,30 +686,215 @@ export class WebSocketProgramAccountSubscriberV2 } } } - } - // Check if buffer has changed - if ( - newBuffer && - (!existingBufferAndSlot.buffer || - !newBuffer.equals(existingBufferAndSlot.buffer)) - ) { - if (this.resubOpts?.logResubMessages) { - console.log( - `[${this.subscriptionName}] Polling detected missed update for account ${accountIdString}, resubscribing` - ); + // Check if buffer has changed + if ( + newBuffer && + (!existingBufferAndSlot.buffer || + !newBuffer.equals(existingBufferAndSlot.buffer)) + ) { + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.subscriptionName}] Batch polling detected missed update for account ${accountIdString}, resubscribing` + ); + } + // We missed an update, return true to indicate resubscription is needed + return true; } - // We missed an update, resubscribe - await this.unsubscribe(true); - this.receivingData = false; - await this.subscribe(this.onChange); - return; } } + + // No missed changes detected + return false; } catch (error) { if (this.resubOpts?.logResubMessages) { console.log( - `[${this.subscriptionName}] Error polling account ${accountIdString}:`, + `[${this.subscriptionName}] Error batch polling accounts:`, + error + ); + } + return false; + } + } + + private addToInitialFetchBatch(accountIdString: string): void { + // Add account to pending initial monitor fetch set + this.accountsPendingInitialMonitorFetch.add(accountIdString); + + // If this is the first account in the batch, start the buffer timer + if (this.accountsPendingInitialMonitorFetch.size === 1) { + this.startInitialFetchBuffer(); + } + } + + private startInitialFetchBuffer(): void { + // Clear existing initial fetch timeout + if (this.initialFetchTimeout) { + clearTimeout(this.initialFetchTimeout); + } + + // Set up buffer timeout to collect accounts + this.initialFetchTimeout = setTimeout(async () => { + await this.processInitialFetchBatch(); + }, this.initialFetchBufferMs); + } + + private async processInitialFetchBatch(): Promise { + try { + // Get all accounts pending initial monitor fetch + const accountsToFetch = Array.from( + this.accountsPendingInitialMonitorFetch + ); + if (accountsToFetch.length === 0) { + return; + } + + if (this.resubOpts?.logResubMessages) { + console.debug( + `[${this.subscriptionName}] Processing initial monitor fetch batch`, + accountsToFetch.length, + 'accounts' + ); + } + + // Use the same batch logic as pollAllAccounts + await this.fetchAccountsBatch(accountsToFetch); + + // Move accounts to polling set and start batch polling + accountsToFetch.forEach((accountIdString) => { + this.accountsCurrentlyPolling.add(accountIdString); + }); + + // Clear the pending set + this.accountsPendingInitialMonitorFetch.clear(); + + // If this is the first account being polled, start batch polling + if (this.accountsCurrentlyPolling.size === accountsToFetch.length) { + this.startBatchPolling(); + } else { + if (this.resubOpts?.logResubMessages) { + console.debug( + `[${this.subscriptionName}] Not starting initial batch polling, we think it is not the first account being polled`, + this.accountsCurrentlyPolling.size, + 'accounts currently polling', + accountsToFetch.length, + 'accounts to fetch' + ); + } + } + } catch (error) { + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.subscriptionName}] Error processing initial monitor fetch batch:`, + error + ); + } + } + } + + private async fetchAccountsBatch(accountIds: string[]): Promise { + try { + // Chunk account IDs into groups of 100 (getMultipleAccounts limit) + const chunkSize = 100; + const chunks: string[][] = []; + for (let i = 0; i < accountIds.length; i += chunkSize) { + chunks.push(accountIds.slice(i, i + chunkSize)); + } + + // Process all chunks concurrently + await Promise.all( + chunks.map(async (chunk) => { + const accountAddresses = chunk.map( + (accountId) => accountId as Address + ); + const rpcResponse = await this.rpc + .getMultipleAccounts(accountAddresses, { + commitment: this.options.commitment as GillCommitment, + encoding: 'base64', + }) + .send(); + + const currentSlot = Number(rpcResponse.context.slot); + + // Process each account response in this chunk + for (let i = 0; i < chunk.length; i++) { + const accountIdString = chunk[i]; + const accountInfo = rpcResponse.value[i]; + + if (!accountInfo) { + continue; + } + + const existingBufferAndSlot = + this.bufferAndSlotMap.get(accountIdString); + + if (!existingBufferAndSlot) { + // Account not in our map yet, add it + let newBuffer: Buffer | undefined = undefined; + if (accountInfo.data) { + if (Array.isArray(accountInfo.data)) { + const [data, encoding] = accountInfo.data; + newBuffer = Buffer.from(data, encoding); + } + } + + if (newBuffer) { + this.bufferAndSlotMap.set(accountIdString, { + buffer: newBuffer, + slot: currentSlot, + }); + const account = this.decodeBuffer( + this.accountDiscriminator, + newBuffer + ); + const accountId = new PublicKey(accountIdString); + this.onChange( + accountId, + account, + { slot: currentSlot }, + newBuffer + ); + } + continue; + } + + // Check if we missed an update + if (currentSlot > existingBufferAndSlot.slot) { + let newBuffer: Buffer | undefined = undefined; + if (accountInfo.data) { + if (Array.isArray(accountInfo.data)) { + const [data, encoding] = accountInfo.data; + if (encoding === ('base58' as any)) { + newBuffer = Buffer.from(bs58.decode(data)); + } else { + newBuffer = Buffer.from(data, 'base64'); + } + } + } + + // Check if buffer has changed + if ( + newBuffer && + (!existingBufferAndSlot.buffer || + !newBuffer.equals(existingBufferAndSlot.buffer)) + ) { + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.subscriptionName}] Batch polling detected missed update for account ${accountIdString}, signaling resubscription` + ); + } + // Signal missed change instead of immediately resubscribing + this.signalMissedChange(accountIdString); + return; + } + } + } + }) + ); + } catch (error) { + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.subscriptionName}] Error fetching accounts batch:`, error ); } @@ -525,8 +913,72 @@ export class WebSocketProgramAccountSubscriberV2 this.batchPollingTimeout = undefined; } + // Clear initial fetch timeout + if (this.initialFetchTimeout) { + clearTimeout(this.initialFetchTimeout); + this.initialFetchTimeout = undefined; + } + + // Clear resubscription timeout + if (this.resubscriptionTimeout) { + clearTimeout(this.resubscriptionTimeout); + this.resubscriptionTimeout = undefined; + } + // Clear accounts currently polling this.accountsCurrentlyPolling.clear(); + + // Clear accounts pending initial monitor fetch + this.accountsPendingInitialMonitorFetch.clear(); + + // Reset missed change flag and clear accounts with missed updates + this.missedChangeDetected = false; + this.accountsWithMissedUpdates.clear(); + } + + /** + * Centralized resubscription handler that only resubscribes once after checking all accounts + */ + private async handleResubscription(): Promise { + if (this.missedChangeDetected) { + if (this.resubOpts?.logResubMessages) { + console.log( + `[${this.subscriptionName}] Missed change detected for ${ + this.accountsWithMissedUpdates.size + } accounts: ${Array.from(this.accountsWithMissedUpdates).join( + ', ' + )}, resubscribing` + ); + } + await this.unsubscribe(true); + this.receivingData = false; + await this.subscribe(this.onChange); + this.missedChangeDetected = false; + this.accountsWithMissedUpdates.clear(); + } + } + + /** + * Signal that a missed change was detected and schedule resubscription + */ + private signalMissedChange(accountIdString: string): void { + if (!this.missedChangeDetected) { + this.missedChangeDetected = true; + this.accountsWithMissedUpdates.add(accountIdString); + + // Clear any existing resubscription timeout + if (this.resubscriptionTimeout) { + clearTimeout(this.resubscriptionTimeout); + } + + // Schedule resubscription after a short delay to allow for batch processing + this.resubscriptionTimeout = setTimeout(async () => { + await this.handleResubscription(); + }, 100); // 100ms delay to allow for batch processing + } else { + // If already detected, just add the account to the set + this.accountsWithMissedUpdates.add(accountIdString); + } } unsubscribe(onResub = false): Promise { @@ -553,6 +1005,11 @@ export class WebSocketProgramAccountSubscriberV2 } // Method to add accounts to the polling list + /** + * Add an account to the monitored set. + * - Monitored accounts are subject to initial fetch and periodic batch polls + * if WS notifications are not observed within `pollingIntervalMs`. + */ addAccountToMonitor(accountId: PublicKey): void { const accountIdString = accountId.toBase58(); this.accountsToMonitor.add(accountIdString); @@ -586,6 +1043,10 @@ export class WebSocketProgramAccountSubscriberV2 } // Method to set polling interval + /** + * Set the monitoring/polling interval for monitored accounts. + * Shorter intervals detect missed updates sooner but increase RPC load. + */ setPollingInterval(intervalMs: number): void { this.pollingIntervalMs = intervalMs; // Restart monitoring with new interval if already subscribed diff --git a/sdk/src/accounts/websocketProgramUserAccountSubscriber.ts b/sdk/src/accounts/websocketProgramUserAccountSubscriber.ts new file mode 100644 index 000000000..6c95afe2d --- /dev/null +++ b/sdk/src/accounts/websocketProgramUserAccountSubscriber.ts @@ -0,0 +1,94 @@ +import { + DataAndSlot, + NotSubscribedError, + UserAccountEvents, + UserAccountSubscriber, +} from './types'; +import { Program } from '@coral-xyz/anchor'; +import StrictEventEmitter from 'strict-event-emitter-types'; +import { EventEmitter } from 'events'; +import { Context, PublicKey } from '@solana/web3.js'; +import { WebSocketProgramAccountSubscriber } from './webSocketProgramAccountSubscriber'; +import { UserAccount } from '../types'; + +export class WebSocketProgramUserAccountSubscriber + implements UserAccountSubscriber +{ + isSubscribed: boolean; + eventEmitter: StrictEventEmitter; + + private userAccountPublicKey: PublicKey; + private program: Program; + private programSubscriber: WebSocketProgramAccountSubscriber; + private userAccountAndSlot?: DataAndSlot; + + public constructor( + program: Program, + userAccountPublicKey: PublicKey, + programSubscriber: WebSocketProgramAccountSubscriber + ) { + this.isSubscribed = false; + this.program = program; + this.userAccountPublicKey = userAccountPublicKey; + this.eventEmitter = new EventEmitter(); + this.programSubscriber = programSubscriber; + } + + async subscribe(userAccount?: UserAccount): Promise { + if (this.isSubscribed) { + return true; + } + + if (userAccount) { + this.updateData(userAccount, 0); + } + + this.programSubscriber.onChange = ( + accountId: PublicKey, + data: UserAccount, + context: Context + ) => { + if (accountId.equals(this.userAccountPublicKey)) { + this.updateData(data, context.slot); + this.eventEmitter.emit('userAccountUpdate', data); + this.eventEmitter.emit('update'); + } + }; + + this.isSubscribed = true; + return true; + } + + async fetch(): Promise { + if (!this.isSubscribed) { + throw new NotSubscribedError( + 'Must subscribe before fetching account updates' + ); + } + + const account = await this.program.account.user.fetch( + this.userAccountPublicKey + ); + this.updateData(account as UserAccount, 0); + } + + updateData(userAccount: UserAccount, slot: number): void { + this.userAccountAndSlot = { + data: userAccount, + slot, + }; + } + + async unsubscribe(): Promise { + this.isSubscribed = false; + } + + getUserAccountAndSlot(): DataAndSlot { + if (!this.userAccountAndSlot) { + throw new NotSubscribedError( + 'Must subscribe before getting user account data' + ); + } + return this.userAccountAndSlot; + } +} diff --git a/sdk/src/driftClient.ts b/sdk/src/driftClient.ts index fa5e44157..ba637b167 100644 --- a/sdk/src/driftClient.ts +++ b/sdk/src/driftClient.ts @@ -137,7 +137,6 @@ import { decodeName, DEFAULT_USER_NAME, encodeName } from './userName'; import { OraclePriceData } from './oracles/types'; import { DriftClientConfig } from './driftClientConfig'; import { PollingDriftClientAccountSubscriber } from './accounts/pollingDriftClientAccountSubscriber'; -import { WebSocketDriftClientAccountSubscriber } from './accounts/webSocketDriftClientAccountSubscriber'; import { RetryTxSender } from './tx/retryTxSender'; import { User } from './user'; import { UserSubscriptionConfig } from './userConfig'; @@ -194,6 +193,8 @@ import { getOracleId } from './oracles/oracleId'; import { SignedMsgOrderParams } from './types'; import { sha256 } from '@noble/hashes/sha256'; import { getOracleConfidenceFromMMOracleData } from './oracles/utils'; +import { Commitment } from 'gill'; +import { WebSocketDriftClientAccountSubscriber } from './accounts/webSocketDriftClientAccountSubscriber'; type RemainingAccountParams = { userAccounts: UserAccount[]; @@ -371,6 +372,8 @@ export class DriftClient { resubTimeoutMs: config.accountSubscription?.resubTimeoutMs, logResubMessages: config.accountSubscription?.logResubMessages, commitment: config.accountSubscription?.commitment, + programUserAccountSubscriber: + config.accountSubscription?.programUserAccountSubscriber, }; this.userStatsAccountSubscriptionConfig = { type: 'websocket', @@ -436,7 +439,10 @@ export class DriftClient { } ); } else { - this.accountSubscriber = new WebSocketDriftClientAccountSubscriber( + const accountSubscriberClass = + config.accountSubscription?.driftClientAccountSubscriber ?? + WebSocketDriftClientAccountSubscriber; + this.accountSubscriber = new accountSubscriberClass( this.program, config.perpMarketIndexes ?? [], config.spotMarketIndexes ?? [], @@ -447,9 +453,7 @@ export class DriftClient { resubTimeoutMs: config.accountSubscription?.resubTimeoutMs, logResubMessages: config.accountSubscription?.logResubMessages, }, - config.accountSubscription?.commitment, - config.accountSubscription?.perpMarketAccountSubscriber, - config.accountSubscription?.oracleAccountSubscriber + config.accountSubscription?.commitment as Commitment ); } this.eventEmitter = this.accountSubscriber.eventEmitter; @@ -610,7 +614,8 @@ export class DriftClient { public getSpotMarketAccount( marketIndex: number ): SpotMarketAccount | undefined { - return this.accountSubscriber.getSpotMarketAccountAndSlot(marketIndex).data; + return this.accountSubscriber.getSpotMarketAccountAndSlot(marketIndex) + ?.data; } /** @@ -621,7 +626,8 @@ export class DriftClient { marketIndex: number ): Promise { await this.accountSubscriber.fetch(); - return this.accountSubscriber.getSpotMarketAccountAndSlot(marketIndex).data; + return this.accountSubscriber.getSpotMarketAccountAndSlot(marketIndex) + ?.data; } public getSpotMarketAccounts(): SpotMarketAccount[] { diff --git a/sdk/src/driftClientConfig.ts b/sdk/src/driftClientConfig.ts index ebfa4b695..b3723a2ae 100644 --- a/sdk/src/driftClientConfig.ts +++ b/sdk/src/driftClientConfig.ts @@ -5,7 +5,7 @@ import { PublicKey, TransactionVersion, } from '@solana/web3.js'; -import { IWallet, TxParams } from './types'; +import { IWallet, TxParams, UserAccount } from './types'; import { OracleInfo } from './oracles/types'; import { BulkAccountLoader } from './accounts/bulkAccountLoader'; import { DriftEnv } from './config'; @@ -19,6 +19,9 @@ import { import { Coder, Program } from '@coral-xyz/anchor'; import { WebSocketAccountSubscriber } from './accounts/webSocketAccountSubscriber'; import { WebSocketAccountSubscriberV2 } from './accounts/webSocketAccountSubscriberV2'; +import { WebSocketProgramAccountSubscriber } from './accounts/webSocketProgramAccountSubscriber'; +import { WebSocketDriftClientAccountSubscriberV2 } from './accounts/webSocketDriftClientAccountSubscriberV2'; +import { WebSocketDriftClientAccountSubscriber } from './accounts/webSocketDriftClientAccountSubscriber'; export type DriftClientConfig = { connection: Connection; @@ -63,6 +66,7 @@ export type DriftClientSubscriptionConfig = resubTimeoutMs?: number; logResubMessages?: boolean; commitment?: Commitment; + programUserAccountSubscriber?: WebSocketProgramAccountSubscriber; perpMarketAccountSubscriber?: new ( accountName: string, program: Program, @@ -71,14 +75,17 @@ export type DriftClientSubscriptionConfig = resubOpts?: ResubOpts, commitment?: Commitment ) => WebSocketAccountSubscriberV2 | WebSocketAccountSubscriber; - oracleAccountSubscriber?: new ( - accountName: string, + /** If you use V2 here, whatever you pass for perpMarketAccountSubscriber and oracleAccountSubscriber will be ignored and it will use v2 under the hood regardless */ + driftClientAccountSubscriber?: new ( program: Program, - accountPublicKey: PublicKey, - decodeBuffer?: (buffer: Buffer) => any, - resubOpts?: ResubOpts, - commitment?: Commitment - ) => WebSocketAccountSubscriberV2 | WebSocketAccountSubscriber; + perpMarketIndexes: number[], + spotMarketIndexes: number[], + oracleInfos: OracleInfo[], + shouldFindAllMarketsAndOracles: boolean, + delistedMarketSetting: DelistedMarketSetting + ) => + | WebSocketDriftClientAccountSubscriber + | WebSocketDriftClientAccountSubscriberV2; } | { type: 'polling'; diff --git a/sdk/src/index.ts b/sdk/src/index.ts index 902f4a533..721dfc697 100644 --- a/sdk/src/index.ts +++ b/sdk/src/index.ts @@ -12,6 +12,10 @@ export * from './accounts/webSocketDriftClientAccountSubscriber'; export * from './accounts/webSocketInsuranceFundStakeAccountSubscriber'; export * from './accounts/webSocketHighLeverageModeConfigAccountSubscriber'; export { WebSocketAccountSubscriberV2 } from './accounts/webSocketAccountSubscriberV2'; +export { WebSocketProgramAccountSubscriber } from './accounts/webSocketProgramAccountSubscriber'; +export { WebSocketProgramUserAccountSubscriber } from './accounts/websocketProgramUserAccountSubscriber'; +export { WebSocketProgramAccountSubscriberV2 } from './accounts/webSocketProgramAccountSubscriberV2'; +export { WebSocketDriftClientAccountSubscriberV2 } from './accounts/webSocketDriftClientAccountSubscriberV2'; export * from './accounts/bulkAccountLoader'; export * from './accounts/bulkUserSubscription'; export * from './accounts/bulkUserStatsSubscription'; diff --git a/sdk/src/memcmp.ts b/sdk/src/memcmp.ts index 300f2a75d..896971ebb 100644 --- a/sdk/src/memcmp.ts +++ b/sdk/src/memcmp.ts @@ -112,3 +112,20 @@ export function getSignedMsgUserOrdersFilter(): MemcmpFilter { }, }; } + +export function getPerpMarketAccountsFilter(): MemcmpFilter { + return { + memcmp: { + offset: 0, + bytes: bs58.encode(BorshAccountsCoder.accountDiscriminator('PerpMarket')), + }, + }; +} +export function getSpotMarketAccountsFilter(): MemcmpFilter { + return { + memcmp: { + offset: 0, + bytes: bs58.encode(BorshAccountsCoder.accountDiscriminator('SpotMarket')), + }, + }; +} diff --git a/sdk/src/oracles/oracleId.ts b/sdk/src/oracles/oracleId.ts index c5b63e743..3999d3ce7 100644 --- a/sdk/src/oracles/oracleId.ts +++ b/sdk/src/oracles/oracleId.ts @@ -24,9 +24,43 @@ export function getOracleSourceNum(source: OracleSource): number { throw new Error('Invalid oracle source'); } +export function getOracleSourceFromNum(sourceNum: number): OracleSource { + if (sourceNum === OracleSourceNum.PYTH) return 'pyth'; + if (sourceNum === OracleSourceNum.PYTH_1K) return 'pyth1K'; + if (sourceNum === OracleSourceNum.PYTH_1M) return 'pyth1M'; + if (sourceNum === OracleSourceNum.PYTH_PULL) return 'pythPull'; + if (sourceNum === OracleSourceNum.PYTH_1K_PULL) return 'pyth1KPull'; + if (sourceNum === OracleSourceNum.PYTH_1M_PULL) return 'pyth1MPull'; + if (sourceNum === OracleSourceNum.SWITCHBOARD) return 'switchboard'; + if (sourceNum === OracleSourceNum.QUOTE_ASSET) return 'quoteAsset'; + if (sourceNum === OracleSourceNum.PYTH_STABLE_COIN) return 'pythStableCoin'; + if (sourceNum === OracleSourceNum.PYTH_STABLE_COIN_PULL) + return 'pythStableCoinPull'; + if (sourceNum === OracleSourceNum.PRELAUNCH) return 'prelaunch'; + if (sourceNum === OracleSourceNum.SWITCHBOARD_ON_DEMAND) + return 'switchboardOnDemand'; + if (sourceNum === OracleSourceNum.PYTH_LAZER) return 'pythLazer'; + if (sourceNum === OracleSourceNum.PYTH_LAZER_1K) return 'pythLazer1K'; + if (sourceNum === OracleSourceNum.PYTH_LAZER_1M) return 'pythLazer1M'; + if (sourceNum === OracleSourceNum.PYTH_LAZER_STABLE_COIN) + return 'pythLazerStableCoin'; + throw new Error('Invalid oracle source'); +} + export function getOracleId( publicKey: PublicKey, source: OracleSource ): string { return `${publicKey.toBase58()}-${getOracleSourceNum(source)}`; } + +export function getPublicKeyAndSourceFromOracleId(oracleId: string): { + publicKey: PublicKey; + source: OracleSource; +} { + const [publicKey, source] = oracleId.split('-'); + return { + publicKey: new PublicKey(publicKey), + source: getOracleSourceFromNum(parseInt(source)), + }; +} diff --git a/sdk/src/user.ts b/sdk/src/user.ts index 4f1c33cd1..435f54c42 100644 --- a/sdk/src/user.ts +++ b/sdk/src/user.ts @@ -75,6 +75,7 @@ import { } from './types'; import { standardizeBaseAssetAmount } from './math/orders'; import { UserStats } from './userStats'; +import { WebSocketProgramUserAccountSubscriber } from './accounts/websocketProgramUserAccountSubscriber'; import { calculateAssetWeight, calculateLiabilityWeight, @@ -149,15 +150,26 @@ export class User { } ); } else { - this.accountSubscriber = new WebSocketUserAccountSubscriber( - config.driftClient.program, - config.userAccountPublicKey, - { - resubTimeoutMs: config.accountSubscription?.resubTimeoutMs, - logResubMessages: config.accountSubscription?.logResubMessages, - }, - config.accountSubscription?.commitment - ); + if ( + config.accountSubscription?.type === 'websocket' && + config.accountSubscription?.programUserAccountSubscriber + ) { + this.accountSubscriber = new WebSocketProgramUserAccountSubscriber( + config.driftClient.program, + config.userAccountPublicKey, + config.accountSubscription.programUserAccountSubscriber + ); + } else { + this.accountSubscriber = new WebSocketUserAccountSubscriber( + config.driftClient.program, + config.userAccountPublicKey, + { + resubTimeoutMs: config.accountSubscription?.resubTimeoutMs, + logResubMessages: config.accountSubscription?.logResubMessages, + }, + config.accountSubscription?.commitment + ); + } } this.eventEmitter = this.accountSubscriber.eventEmitter; } diff --git a/sdk/src/userConfig.ts b/sdk/src/userConfig.ts index f575ea2b5..d5ba2147b 100644 --- a/sdk/src/userConfig.ts +++ b/sdk/src/userConfig.ts @@ -2,6 +2,8 @@ import { DriftClient } from './driftClient'; import { Commitment, PublicKey } from '@solana/web3.js'; import { BulkAccountLoader } from './accounts/bulkAccountLoader'; import { GrpcConfigs, UserAccountSubscriber } from './accounts/types'; +import { WebSocketProgramAccountSubscriber } from './accounts/webSocketProgramAccountSubscriber'; +import { UserAccount } from './types'; export type UserConfig = { accountSubscription?: UserSubscriptionConfig; @@ -21,6 +23,7 @@ export type UserSubscriptionConfig = resubTimeoutMs?: number; logResubMessages?: boolean; commitment?: Commitment; + programUserAccountSubscriber?: WebSocketProgramAccountSubscriber; } | { type: 'polling';