Skip to content

Commit 2fd948f

Browse files
committed
Document changes related to subscription management.
1 parent 38e11f6 commit 2fd948f

File tree

2 files changed

+140
-3
lines changed

2 files changed

+140
-3
lines changed

docs/internals.rst

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,10 +430,126 @@ request. The initial ``eth_subscribe`` request expects only one response, the
430430
subscription *id* value, but it also expects to receive many ``eth_subscription``
431431
messages if and when the request is successful. For this reason, the original request
432432
is considered a one-to-one request so that a subscription *id* can be returned to the
433-
user on the same line, but the ``process_subscriptions()`` method on the
433+
user on the same line. The many responses this call will produce can be handled in one
434+
of a few ways.
435+
436+
The recommended way to handle one-to-many responses is to use the subscription manager
437+
API. The subscription manager API is a public API on the ``AsyncWeb3`` class, when
438+
connected to a ``PersistentConnectionProvider`` instance, that allows the user to
439+
subscribe to a subscription and handle the many responses asynchronously. The
440+
``subscription_manager`` instance is responsible for handling the many responses that
441+
come in over the socket connection, as long as handlers are passed to each subscription
442+
call. The subscription manager can also be used to unsubscribe from a subscription when
443+
the user is done with it.
444+
445+
.. code-block:: python
446+
447+
>>> async def new_heads_handler(
448+
... async_w3: AsyncWeb3,
449+
... sx: EthSubscription,
450+
... response: BlockData,
451+
... ) -> None:
452+
... print(f"New block header: {response}\n")
453+
... if response["number"] > 1234567:
454+
... await sx.unsubscribe()
455+
456+
>>> async def ws_subscription_example():
457+
... async with AsyncWeb3(WebSocketProvider(f"ws://127.0.0.1:8546")) as w3:
458+
... # Subscribe to new block headers and receive the subscription_id.
459+
... # A one-to-one call with a trigger for many responses
460+
... subscription_id = await w3.eth.subscribe("newHeads", handler=new_heads_handler)
461+
...
462+
... # Handle the subscription messages asynchronously using the subscription
463+
... # manager. This will continue until no more subscriptions are present in
464+
... # the subscription manager, or indefinitely if the `run_forever` flag
465+
... # is set to `True`.
466+
... await w3.subscription_manager.handle_subscriptions(run_forever=False)
467+
>>> asyncio.run(ws_subscription_example())
468+
469+
The manager can also subscribe to many subscriptions at one time. The
470+
``EthSubscription`` classes, available via ``web3.utils.subscriptions``, provide a
471+
friendly API for managing subscriptions. Since each connection and provider instance
472+
has its own message listener task and subscription manager instance, you can subscribe
473+
to many subscriptions at once and handle the many responses that come in over the socket
474+
connections.
475+
476+
.. code-block:: python
477+
478+
>>> from web3 import (
479+
... AsyncWeb3,
480+
... WebSocketProvider,
481+
... AsyncIPCProvider,
482+
... )
483+
>>> from web3.utils.subscriptions import (
484+
... EthSubscription,
485+
... NewHeadsSubscription,
486+
... PendingTxSubscription,
487+
... LogsSubscription,
488+
... )
489+
490+
>>> async def new_heads_handler(
491+
... async_w3: AsyncWeb3,
492+
... sx: EthSubscription,
493+
... response: BlockData,
494+
... ) -> None:
495+
... print(f"New block header: {response}\n")
496+
... if response["number"] > 1234567:
497+
... await sx.unsubscribe()
498+
499+
>>> async def pending_txs_handler(
500+
... async_w3: AsyncWeb3,
501+
... sx: EthSubscription,
502+
... response: TxData,
503+
... ) -> None:
504+
... ...
505+
506+
>>> async def log_handler(
507+
... async_w3: AsyncWeb3,
508+
... sx: EthSubscription,
509+
... response: LogData,
510+
... ) -> None:
511+
... ...
512+
513+
>>> async def sx_manager():
514+
... local_w3 = await AsyncWeb3(AsyncIPCProvider(LOCAL_IPC, label="mainnet-ipc"))
515+
... # subscribe to many subscriptions via the subscription manager with handlers
516+
... await local_w3.subscription_manager.subscribe(
517+
... [
518+
... NewHeadsSubscription(label="new-heads-mainnet", handler=new_heads_handler),
519+
... PendingTxSubscription(
520+
... label="pending-tx-mainnet", # optional label
521+
... full_transactions=True,
522+
... handler=pending_tx_handler,
523+
... ),
524+
... LogsSubscription(
525+
... label="WETH transfers", # optional label
526+
... address=local_w3.to_checksum_address("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
527+
... topics=[HexStr("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")],
528+
... handler=log_handler,
529+
... ),
530+
... ]
531+
... )
532+
...
533+
... public_w3 = await AsyncWeb3(WebSocketProvider(PUBLIC_PROVIDER_WS, label="public-ws"))
534+
... # subscribe via eth_subscribe, with handler and label (optional)
535+
... await public_w3.eth.subscribe("public_newHeads", handler=pending_tx_handler, label="new-heads-public-ws")
536+
537+
>>> # This will handle all subscriptions until no more subscriptions are present
538+
... # in either subscription manager instance. If the `run_forever` flag is set
539+
... # to `True` on any manager instance, this will run indefinitely.
540+
>>> await asyncio.gather(
541+
... public_w3.subscription_manager.handle_subscriptions(),
542+
... local_w3.subscription_manager.handle_subscriptions(),
543+
... )
544+
545+
>>> asyncio.run(sx_manager())
546+
547+
548+
The ``process_subscriptions()`` method on the
434549
:class:`~web3.providers.persistent.PersistentConnection` class, the public API for
435-
interacting with the active persistent socket connection, is set up to receive
436-
``eth_subscription`` responses over an asynchronous interator pattern.
550+
interacting with the active persistent socket connection, is also set up to receive
551+
``eth_subscription`` responses over an asynchronous interator pattern. You can use this
552+
method to listen for raw messages and process them as they come in.
437553
438554
.. code-block:: python
439555

web3/providers/persistent/subscription_manager.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ async def subscribe(
7373
)
7474

7575
async def unsubscribe(self, subscription: EthSubscription) -> bool:
76+
"""
77+
Used to unsubscribe from a subscription that is being managed by the
78+
subscription manager.
79+
80+
:param subscription: The subscription to unsubscribe from.
81+
:return: True if unsubscribing was successful, False otherwise.
82+
"""
7683
if subscription not in self.subscriptions:
7784
raise Web3ValueError(
7885
f"Subscription not found or is not being managed by the subscription "
@@ -85,6 +92,12 @@ async def unsubscribe(self, subscription: EthSubscription) -> bool:
8592
return False
8693

8794
async def unsubscribe_all(self) -> None:
95+
"""
96+
Used to unsubscribe from all subscriptions that are being managed by the
97+
subscription manager.
98+
99+
:return: None
100+
"""
88101
for sx in self.subscriptions:
89102
await self.unsubscribe(sx)
90103

@@ -100,4 +113,12 @@ async def _handle_subscriptions(self, run_forever: bool = False) -> None:
100113
self._provider.logger.info("Subscription manager processing ended.")
101114

102115
async def handle_subscriptions(self, run_forever: bool = False) -> None:
116+
"""
117+
Used to process all subscriptions. It will run until all subscriptions are
118+
unsubscribed from or, if `run_forever` is set to `True`, it will run
119+
indefinitely.
120+
121+
:param run_forever: If `True`, the method will run indefinitely.
122+
:return: None
123+
"""
103124
await self._handle_subscriptions(run_forever=run_forever)

0 commit comments

Comments
 (0)