Skip to content

Commit 50bc0ce

Browse files
committed
feat: polling check on websocket acct subscriber v2 + naming
1 parent 78bf21f commit 50bc0ce

9 files changed

+275
-109
lines changed

sdk/src/accounts/README_WebSocketAccountSubscriberV2.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,46 @@ await subscriber.subscribe((data) => {
3131
await subscriber.unsubscribe();
3232
```
3333

34+
### Polling Instead of Resubscribing
35+
36+
For accounts that rarely update (like long-tail markets), you can use polling instead of resubscribing to reduce resource usage:
37+
38+
```typescript
39+
const resubOpts = {
40+
resubTimeoutMs: 30000, // 30 seconds
41+
logResubMessages: true,
42+
usePollingInsteadOfResub: true, // Enable polling mode
43+
pollingIntervalMs: 30000, // Poll every 30 seconds (optional, defaults to 30000)
44+
};
45+
46+
const subscriber = new WebSocketAccountSubscriberV2(
47+
'perpMarket', // account name
48+
program,
49+
marketPublicKey,
50+
undefined, // decodeBuffer
51+
resubOpts
52+
);
53+
```
54+
55+
**How it works:**
56+
1. Initially subscribes to WebSocket updates
57+
2. If no WebSocket data is received for `resubTimeoutMs` (30s), switches to polling mode
58+
3. Polls every `pollingIntervalMs` (30s) to check for updates by:
59+
- Storing current account buffer state
60+
- Fetching latest account data
61+
- Comparing buffers to detect any missed updates
62+
4. If polling detects new data (indicating missed WebSocket events):
63+
- Immediately stops polling
64+
- Resubscribes to WebSocket to restore real-time updates
65+
- This helps recover from degraded WebSocket connections
66+
5. If a WebSocket event is received while polling:
67+
- Polling is automatically stopped
68+
- System continues with normal WebSocket updates
69+
6. This approach provides:
70+
- Efficient handling of rarely-updated accounts
71+
- Automatic recovery from WebSocket connection issues
72+
- Seamless fallback between polling and WebSocket modes
73+
3474
## Implementation Details
3575

3676
### Gill Integration
@@ -52,3 +92,4 @@ const { rpc, rpcSubscriptions } = createSolanaClient({
5292
3. **Address Handling**: Converts `PublicKey` to gill's `Address` type for compatibility
5393
4. **Response Formatting**: Converts gill responses to match the expected `AccountInfo<Buffer>` format
5494
5. **Abort Signal**: Utilizes AbortSignal nodejs/web class to shutdown websocket connection synchronously
95+
6. **Polling Mode**: Optional polling mechanism for accounts that rarely update

sdk/src/accounts/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ export type DataAndSlot<T> = {
202202
export type ResubOpts = {
203203
resubTimeoutMs?: number;
204204
logResubMessages?: boolean;
205+
// New options for polling-based resubscription
206+
usePollingInsteadOfResub?: boolean;
207+
pollingIntervalMs?: number;
205208
};
206209

207210
export interface UserStatsAccountEvents {

sdk/src/accounts/webSocketAccountSubscriberV2.ts

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export class WebSocketAccountSubscriberV2<T> implements AccountSubscriber<T> {
3535
isUnsubscribing = false;
3636

3737
timeoutId?: ReturnType<typeof setTimeout>;
38+
pollingTimeoutId?: ReturnType<typeof setTimeout>;
3839

3940
receivingData: boolean;
4041

@@ -175,29 +176,102 @@ export class WebSocketAccountSubscriberV2<T> implements AccountSubscriber<T> {
175176
}
176177

177178
if (this.receivingData) {
179+
if (this.resubOpts?.usePollingInsteadOfResub) {
180+
// Use polling instead of resubscribing
181+
if (this.resubOpts?.logResubMessages) {
182+
console.log(
183+
`[${this.logAccountName}] No ws data in ${this.resubOpts.resubTimeoutMs}ms, starting polling - listenerId=${this.listenerId}`
184+
);
185+
}
186+
this.startPolling();
187+
} else {
188+
// Original resubscribe behavior
189+
if (this.resubOpts?.logResubMessages) {
190+
console.log(
191+
`No ws data from ${this.logAccountName} in ${this.resubOpts.resubTimeoutMs}ms, resubscribing - listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`
192+
);
193+
}
194+
await this.unsubscribe(true);
195+
this.receivingData = false;
196+
await this.subscribe(this.onChange);
197+
if (this.resubOpts?.logResubMessages) {
198+
console.log(
199+
`[${this.logAccountName}] Resubscribe completed - receivingData=${this.receivingData}, listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`
200+
);
201+
}
202+
}
203+
} else {
178204
if (this.resubOpts?.logResubMessages) {
179205
console.log(
180-
`No ws data from ${this.logAccountName} in ${this.resubOpts.resubTimeoutMs}ms, resubscribing - listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`
206+
`[${this.logAccountName}] Timeout fired but receivingData=false, skipping resubscribe`
181207
);
182208
}
183-
await this.unsubscribe(true);
184-
this.receivingData = false;
185-
await this.subscribe(this.onChange);
209+
}
210+
},
211+
this.resubOpts?.resubTimeoutMs
212+
);
213+
}
214+
215+
private startPolling(): void {
216+
const pollingInterval = this.resubOpts?.pollingIntervalMs || 30000; // Default to 30s
217+
218+
const poll = async () => {
219+
if (this.isUnsubscribing) {
220+
return;
221+
}
222+
223+
try {
224+
// Store current data and buffer before polling
225+
const currentBuffer = this.bufferAndSlot?.buffer;
226+
227+
// Fetch latest account data
228+
await this.fetch();
229+
230+
// Check if we got new data by comparing buffers
231+
const newBuffer = this.bufferAndSlot?.buffer;
232+
const hasNewData =
233+
newBuffer && (!currentBuffer || !newBuffer.equals(currentBuffer));
234+
235+
if (hasNewData) {
236+
// New data received, stop polling and resubscribe to websocket
186237
if (this.resubOpts?.logResubMessages) {
187238
console.log(
188-
`[${this.logAccountName}] Resubscribe completed - receivingData=${this.receivingData}, listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`
239+
`[${this.logAccountName}] Polling detected account data change, resubscribing to websocket`
189240
);
190241
}
242+
this.stopPolling();
243+
this.receivingData = false;
244+
await this.subscribe(this.onChange);
191245
} else {
246+
// No new data, continue polling
192247
if (this.resubOpts?.logResubMessages) {
193248
console.log(
194-
`[${this.logAccountName}] Timeout fired but receivingData=false, skipping resubscribe`
249+
`[${this.logAccountName}] Polling found no account changes, continuing to poll every ${pollingInterval}ms`
195250
);
196251
}
252+
this.pollingTimeoutId = setTimeout(poll, pollingInterval);
197253
}
198-
},
199-
this.resubOpts?.resubTimeoutMs
200-
);
254+
} catch (error) {
255+
if (this.resubOpts?.logResubMessages) {
256+
console.error(
257+
`[${this.logAccountName}] Error during polling:`,
258+
error
259+
);
260+
}
261+
// On error, continue polling
262+
this.pollingTimeoutId = setTimeout(poll, pollingInterval);
263+
}
264+
};
265+
266+
// Start polling immediately
267+
poll();
268+
}
269+
270+
private stopPolling(): void {
271+
if (this.pollingTimeoutId) {
272+
clearTimeout(this.pollingTimeoutId);
273+
this.pollingTimeoutId = undefined;
274+
}
201275
}
202276

203277
async fetch(): Promise<void> {
@@ -301,6 +375,9 @@ export class WebSocketAccountSubscriberV2<T> implements AccountSubscriber<T> {
301375
clearTimeout(this.timeoutId);
302376
this.timeoutId = undefined;
303377

378+
// Stop polling if active
379+
this.stopPolling();
380+
304381
// Abort the WebSocket subscription
305382
if (this.abortController) {
306383
this.abortController.abort('unsubscribing');

0 commit comments

Comments
 (0)