Skip to content

Commit 32900cf

Browse files
feat: implement shutdown method for helios-ts (#778)
- Implement a proper cleanup so Helios instance can be safely garbage collected - Bonus: return Helios reference from on() and removeListener() methods as per EIP-1193 spec
1 parent f627340 commit 32900cf

File tree

8 files changed

+157
-37
lines changed

8 files changed

+157
-37
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

helios-ts/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ wasm-bindgen-futures = "0.4.33"
1414
wasm-bindgen-test = "0.3.0"
1515
serde-wasm-bindgen = "0.6.5"
1616
console_error_panic_hook = "0.1.7"
17+
futures-util = "0.3"
1718

1819
eyre.workspace = true
1920
alloy.workspace = true

helios-ts/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,19 @@ const blockNumber = await client.getBlockNumber();
127127
console.log('Latest block number:', blockNumber);
128128
```
129129

130+
## Cleanup
131+
132+
If Helios is used as a transport for `ethers` or `viem`, calling `EthersProvider.destroy()` will NOT destroy underlying Helios instance nor will stop the background tasks.
133+
134+
For proper cleanup, call `shutdown()` (or its alias `destroy()`) on your Helios provider instance to properly release resources:
135+
136+
```typescript
137+
// Clean up when done
138+
await heliosProvider.shutdown();
139+
```
140+
141+
This unsubscribes all active subscriptions, aborts background tasks, removes event listeners, and frees WASM memory. The method is idempotent—calling it multiple times is safe. After dereferencing Helios provider instance will be garbage collected.
142+
130143
## Documentation
131144

132145
See [helios github repo](https://github.com/a16z/helios/) for more details.

helios-ts/lib.ts

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ export class HeliosProvider {
8181
#client;
8282
#chainId;
8383
#eventEmitter;
84+
#closed = false;
85+
#subscriptionIds: Set<string> = new Set();
8486

8587
private constructor(config: Config, kind: NetworkKind) {
8688
const executionRpc = config.executionRpc;
@@ -116,8 +118,10 @@ export class HeliosProvider {
116118
}
117119

118120
#setHeliosEvents() {
121+
// Capture only the emitter reference, not `this`
122+
const emitter = this.#eventEmitter;
119123
this.#client.set_helios_events((event: string, data: any) => {
120-
this.#eventEmitter.emit(event, data);
124+
emitter.emit(event, data);
121125
});
122126
}
123127

@@ -175,6 +179,9 @@ export class HeliosProvider {
175179
* ```
176180
*/
177181
async request(req: Request): Promise<any> {
182+
if (this.#closed) {
183+
throw new Error("Provider has been shut down");
184+
}
178185
try {
179186
return await this.#req(req);
180187
} catch (err) {
@@ -292,7 +299,9 @@ export class HeliosProvider {
292299
return this.#handleSubscribe(req);
293300
}
294301
case "eth_unsubscribe": {
295-
return this.#client.unsubscribe(req.params[0]);
302+
const id = req.params[0];
303+
this.#subscriptionIds.delete(id);
304+
return this.#client.unsubscribe(id);
296305
}
297306
case "helios_getCurrentCheckpoint": {
298307
return this.#client.get_current_checkpoint();
@@ -305,18 +314,21 @@ export class HeliosProvider {
305314

306315
async #handleSubscribe(req: Request) {
307316
try {
308-
let id = uuidv4();
309-
await this.#client.subscribe(req.params[0], id, (data: any, id: string) => {
310-
let result = data instanceof Map ? mapToObj(data) : data;
311-
let payload = {
317+
const id = uuidv4();
318+
// Capture only the emitter reference, not `this`
319+
const emitter = this.#eventEmitter;
320+
await this.#client.subscribe(req.params[0], id, (data: any, subId: string) => {
321+
const result = data instanceof Map ? mapToObj(data) : data;
322+
const payload = {
312323
type: 'eth_subscription',
313324
data: {
314-
subscription: id,
325+
subscription: subId,
315326
result,
316327
},
317328
};
318-
this.#eventEmitter.emit("message", payload);
329+
emitter.emit("message", payload);
319330
});
331+
this.#subscriptionIds.add(id);
320332
return id;
321333
} catch (err) {
322334
throw new Error(err.toString());
@@ -353,8 +365,9 @@ export class HeliosProvider {
353365
on(
354366
eventName: string,
355367
handler: (data: any) => void
356-
): void {
368+
): this {
357369
this.#eventEmitter.on(eventName, handler);
370+
return this;
358371
}
359372

360373
/**
@@ -381,8 +394,59 @@ export class HeliosProvider {
381394
removeListener(
382395
eventName: string,
383396
handler: (data: any) => void
384-
): void {
397+
): this {
385398
this.#eventEmitter.off(eventName, handler);
399+
return this;
400+
}
401+
402+
/**
403+
* Shuts down the provider and releases all resources.
404+
*
405+
* @returns A promise that resolves when the provider has been shut down
406+
*
407+
* @remarks
408+
* After shutdown:
409+
* - All future `request()` calls will reject with an error
410+
* - All active subscriptions are unsubscribed
411+
* - All event listeners are removed
412+
* - Background tasks are stopped
413+
*
414+
* The provider instance will be garbage collected after the user drops all references.
415+
*
416+
* @example
417+
* ```typescript
418+
* const provider = await createHeliosProvider(config, "ethereum");
419+
*
420+
* // ... use the provider ...
421+
*
422+
* // Clean up when done
423+
* await provider.shutdown();
424+
* ```
425+
*/
426+
async shutdown(): Promise<void> {
427+
if (this.#closed) {
428+
return;
429+
}
430+
this.#closed = true;
431+
432+
for (const id of this.#subscriptionIds) {
433+
try {
434+
this.#client.unsubscribe(id);
435+
} catch {
436+
// Ignore errors during cleanup
437+
}
438+
}
439+
this.#subscriptionIds.clear();
440+
await this.#client.shutdown();
441+
this.#eventEmitter.removeAllListeners();
442+
this.#client.free();
443+
}
444+
445+
/**
446+
* This method is equivalent to `shutdown()`
447+
*/
448+
async destroy(): Promise<void> {
449+
await this.shutdown();
386450
}
387451
}
388452

helios-ts/src/ethereum.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use alloy::hex::{self, FromHex};
99
use alloy::primitives::{Address, B256, U256};
1010
use alloy::rpc::types::{state::StateOverride, Filter, TransactionRequest};
1111
use eyre::Result;
12+
use futures_util::future::{AbortHandle, Abortable};
1213
use url::Url;
1314
use wasm_bindgen::prelude::*;
1415
use web_sys::js_sys::Function;
@@ -67,6 +68,7 @@ pub struct EthereumClient {
6768
chain_id: u64,
6869
active_subscriptions: HashMap<String, Subscription<Ethereum>>,
6970
event_handler: Option<Function>,
71+
events_abort_handle: Option<AbortHandle>,
7072
}
7173

7274
#[wasm_bindgen]
@@ -141,6 +143,7 @@ impl EthereumClient {
141143
chain_id,
142144
active_subscriptions: HashMap::new(),
143145
event_handler: None,
146+
events_abort_handle: None,
144147
})
145148
}
146149

@@ -154,9 +157,7 @@ impl EthereumClient {
154157
let sub_type: SubscriptionType = serde_wasm_bindgen::from_value(sub_type)?;
155158
let rx = map_err(self.inner.subscribe(sub_type).await)?;
156159

157-
let subscription = Subscription::<Ethereum>::new(id.clone());
158-
159-
subscription.listen(rx, callback).await;
160+
let subscription = Subscription::<Ethereum>::spawn_listener(id.clone(), rx, callback);
160161
self.active_subscriptions.insert(id, subscription);
161162

162163
Ok(true)
@@ -453,7 +454,10 @@ impl EthereumClient {
453454

454455
let mut rx = map_err(self.inner.new_checkpoints_recv())?;
455456

456-
wasm_bindgen_futures::spawn_local(async move {
457+
let (abort_handle, abort_registration) = AbortHandle::new_pair();
458+
self.events_abort_handle = Some(abort_handle);
459+
460+
let future = async move {
457461
loop {
458462
if rx.changed().await.is_err() {
459463
break;
@@ -470,6 +474,10 @@ impl EthereumClient {
470474
}
471475
}
472476
}
477+
};
478+
479+
wasm_bindgen_futures::spawn_local(async move {
480+
let _ = Abortable::new(future, abort_registration).await;
473481
});
474482

475483
Ok(())
@@ -482,4 +490,19 @@ impl EthereumClient {
482490
&checkpoint.map(|c| format!("0x{}", hex::encode(c))),
483491
)?)
484492
}
493+
494+
#[wasm_bindgen]
495+
pub async fn shutdown(&mut self) {
496+
if let Some(abort_handle) = self.events_abort_handle.take() {
497+
abort_handle.abort();
498+
}
499+
500+
self.event_handler = None;
501+
502+
for (_, subscription) in self.active_subscriptions.drain() {
503+
subscription.abort();
504+
}
505+
506+
self.inner.shutdown().await;
507+
}
485508
}

helios-ts/src/linea.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,7 @@ impl LineaClient {
7676
let sub_type: SubscriptionType = serde_wasm_bindgen::from_value(sub_type)?;
7777
let rx = map_err(self.inner.subscribe(sub_type).await)?;
7878

79-
let subscription = Subscription::<Linea>::new(id.clone());
80-
81-
subscription.listen(rx, callback).await;
79+
let subscription = Subscription::<Linea>::spawn_listener(id.clone(), rx, callback);
8280
self.active_subscriptions.insert(id, subscription);
8381

8482
Ok(true)
@@ -378,4 +376,13 @@ impl LineaClient {
378376
// Linea does not support checkpoints
379377
Err(JsError::new("Linea does not support checkpoints"))
380378
}
379+
380+
#[wasm_bindgen]
381+
pub async fn shutdown(&mut self) {
382+
for (_, subscription) in self.active_subscriptions.drain() {
383+
subscription.abort();
384+
}
385+
386+
self.inner.shutdown().await;
387+
}
381388
}

helios-ts/src/opstack.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,7 @@ impl OpStackClient {
9494
let sub_type: SubscriptionType = serde_wasm_bindgen::from_value(sub_type)?;
9595
let rx = map_err(self.inner.subscribe(sub_type).await)?;
9696

97-
let subscription = Subscription::<OpStack>::new(id.clone());
98-
99-
subscription.listen(rx, callback).await;
97+
let subscription = Subscription::<OpStack>::spawn_listener(id.clone(), rx, callback);
10098
self.active_subscriptions.insert(id, subscription);
10199

102100
Ok(true)
@@ -396,4 +394,13 @@ impl OpStackClient {
396394
// OP Stack does not support checkpoints
397395
Err(JsError::new("OP Stack does not support checkpoints"))
398396
}
397+
398+
#[wasm_bindgen]
399+
pub async fn shutdown(&mut self) {
400+
for (_, subscription) in self.active_subscriptions.drain() {
401+
subscription.abort();
402+
}
403+
404+
self.inner.shutdown().await;
405+
}
399406
}

helios-ts/src/subscription.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,51 @@
1-
use std::cell::Cell;
21
use std::marker::PhantomData;
3-
use std::rc::Rc;
42
use wasm_bindgen::prelude::*;
53
use web_sys::js_sys::Function;
64

5+
use futures_util::future::{AbortHandle, Abortable};
6+
77
use helios_common::network_spec::NetworkSpec;
88
use helios_common::types::SubEventRx;
99

1010
pub struct Subscription<N: NetworkSpec> {
11-
id: String,
12-
active: Rc<Cell<bool>>,
11+
abort_handle: AbortHandle,
1312
_phantom: PhantomData<N>,
1413
}
1514

1615
impl<N: NetworkSpec> Subscription<N> {
17-
pub fn new(id: String) -> Self {
16+
pub fn new(abort_handle: AbortHandle) -> Self {
1817
Self {
19-
id,
20-
active: Rc::new(Cell::new(true)),
18+
abort_handle,
2119
_phantom: PhantomData,
2220
}
2321
}
2422

25-
pub async fn listen(&self, mut rx: SubEventRx<N>, callback: Function) {
26-
let id = self.id.clone();
27-
let active = self.active.clone();
23+
pub fn spawn_listener(id: String, mut rx: SubEventRx<N>, callback: Function) -> Self {
24+
let (abort_handle, abort_registration) = AbortHandle::new_pair();
2825

29-
wasm_bindgen_futures::spawn_local(async move {
26+
let listener_id = id.clone();
27+
let future = async move {
3028
while let Ok(msg) = rx.recv().await {
31-
if !active.get() {
32-
break;
33-
}
34-
3529
if let Ok(data) = serde_wasm_bindgen::to_value(&msg) {
36-
let _ = callback.call2(&JsValue::NULL, &data, &JsValue::from_str(&id));
30+
let _ = callback.call2(&JsValue::NULL, &data, &JsValue::from_str(&listener_id));
3731
}
3832
}
33+
};
34+
35+
wasm_bindgen_futures::spawn_local(async move {
36+
let _ = Abortable::new(future, abort_registration).await;
3937
});
38+
39+
Self::new(abort_handle)
40+
}
41+
42+
pub fn abort(&self) {
43+
self.abort_handle.abort();
4044
}
4145
}
4246

4347
impl<N: NetworkSpec> Drop for Subscription<N> {
4448
fn drop(&mut self) {
45-
self.active.set(false);
49+
self.abort_handle.abort();
4650
}
4751
}

0 commit comments

Comments
 (0)