Skip to content

Commit 0e8ce5d

Browse files
wolovimfselmo
authored andcommitted
docs: introduce subscriptions guide
1 parent 8f1443d commit 0e8ce5d

File tree

3 files changed

+245
-0
lines changed

3 files changed

+245
-0
lines changed

docs/subscriptions.rst

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
.. _subscriptions:
2+
3+
Event Subscriptions
4+
===================
5+
6+
Most Ethereum clients include ``eth_subscribe`` support, allowing you to listen for specific events as they occur. This applies to a limited set of events: new block headers, the syncing status of a node, new pending transactions, and emitted logs from smart contracts.
7+
8+
.. warning::
9+
10+
Subscriptions require a persistent socket connection between you and the Ethereum client. For that reason, you must use web3.py's :class:`~web3.providers.persistent.WebSocketProvider` or :class:`~web3.providers.persistent.AsyncIPCProvider` to utilize subscriptions. As it is the more common of the two, examples in this guide will leverage the ``WebSocketProvider``.
11+
12+
An introduction to subscriptions
13+
--------------------------------
14+
15+
When you subscribe to an event – new block headers, for example – you'll receive a subscription ID. The Ethereum client will then maintain a connection to your application and send along any related event until you unsubscribe with that ID. That example in code:
16+
17+
.. code-block:: python
18+
19+
import asyncio
20+
from web3 import AsyncWeb3, WebSocketProvider
21+
22+
async def example():
23+
# connect to a node:
24+
async with AsyncWeb3(WebSocketProvider("wss://...")) as w3:
25+
26+
# subscribe to new block headers:
27+
subscription_id = await w3.eth.subscribe("newHeads")
28+
print(subscription_id)
29+
30+
# listen for events as they occur:
31+
async for response in w3.socket.process_subscriptions():
32+
# handle each event:
33+
print(response)
34+
35+
# unsubscribe:
36+
if response["number"] > 42012345:
37+
await w3.eth.unsubscribe(subscription_id)
38+
break
39+
40+
asyncio.run(example())
41+
42+
43+
web3.py's ``subscription_manager``
44+
----------------------------------
45+
46+
The example above is the "manual" approach to managing subscriptions. It's not so complicated in the case of listening for new block headers, but things get considerably more complex once you start listening for smart contract event logs or managing multiple subscriptions.
47+
As of v7.7.0, web3.py includes some additional convenient subscription management features. We'll step through them now.
48+
49+
1.) The subscription_manager
50+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
51+
52+
First, your w3 instance now includes a new module, subscription_manager. While you may still use the w3.eth.subscribe method from the previous example, the subscription_manager offers an additional way to start one or more subscriptions. We're going to pass in a list of events we want to subscribe to within the w3.subscription_manager.subscribe method.
53+
54+
.. code-block:: python
55+
56+
await w3.subscription_manager.subscribe([sub1, sub2, ...])
57+
58+
59+
2.) Subscription types
60+
~~~~~~~~~~~~~~~~~~~~~~
61+
62+
To aid in defining those subscriptions, subscription type classes have been introduced: NewHeadsSubscription, PendingTxSubscription, LogsSubscription, and SyncingSubscription. Each class is context aware, meaning it will throw an error if you provide an unexpected data type.
63+
64+
.. code-block:: python
65+
66+
from web3.utils.subscriptions import (
67+
NewHeadsSubscription,
68+
PendingTxSubscription,
69+
LogsSubscription,
70+
)
71+
72+
sub1 = NewHeadsSubscription(
73+
label="new-heads-mainnet", # optional label
74+
handler=new_heads_handler
75+
)
76+
77+
sub2 = PendingTxSubscription(
78+
label="pending-tx-mainnet", # optional label
79+
full_transactions=True,
80+
handler=pending_tx_handler,
81+
)
82+
83+
sub3 = LogsSubscription(
84+
label="WETH transfers", # optional label
85+
address=weth_contract.address,
86+
topics=[weth_contract.events.Transfer().topic],
87+
handler=log_handler,
88+
# optional `handler_context` args to help parse a response
89+
handler_context={"transfer_event": weth_contract.events.Transfer()},
90+
)
91+
92+
93+
3.) Handlers
94+
~~~~~~~~~~~~
95+
96+
In the example above, there is a handler specified for each subscription. These are context-aware functions that you can declare separate from the subscription logic. Within each handler, parse and perform whatever logic you require.
97+
Note that in addition to the result being processed, the handler_context in each handler provides access to your async_w3 instance, the subscription instance, and any custom values declared within the handler_context of the subscription. from web3.utils.subscriptions import LogsSubscriptionContext
98+
99+
.. code-block:: python
100+
101+
async def new_heads_handler(
102+
handler_context: LogsSubscriptionContext,
103+
) -> None:
104+
log_receipt = handler_context.result
105+
print(f"New log: {log_receipt}\n")
106+
107+
event_data = handler_context.transfer_event.process_log(log_receipt)
108+
print(f"Log event data: {event_data}\n")
109+
110+
if log_receipt["blockNumber"] > 42012345:
111+
await handler_context.subscription.unsubscribe()
112+
113+
114+
4.) handle_subscriptions
115+
~~~~~~~~~~~~~~~~~~~~~~~~
116+
117+
Finally, when all your subscriptions are configured, utilize the handle_subscriptions method to begin processing them. If you need to listen for events on multiple chains, create one w3 instance per chain.
118+
119+
.. code-block:: python
120+
121+
async def sub_manager():
122+
...
123+
124+
# handle subscriptions via configured handlers:
125+
await w3.subscription_manager.handle_subscriptions()
126+
127+
# or, gather one w3 instance per chain:
128+
await asyncio.gather(
129+
w3.subscription_manager.handle_subscriptions(),
130+
l2_w3.subscription_manager.handle_subscriptions(),
131+
)
132+
133+
asyncio.run(sub_manager())
134+
135+
136+
5.) Unsubscribing
137+
~~~~~~~~~~~~~~~~~
138+
139+
If you don't want to subscribe indefinitely to an event, you can unsubscribe at any point. The first example in this post demonstrated the manual approach:await w3.eth.unsubscribe(subscription_id)
140+
141+
142+
The new handler pattern will keep track of the subscription ID for you however, so the same can be accomplished via the handler_context without an ID:
143+
144+
.. code-block:: python
145+
146+
async def new_heads_handler(handler_context):
147+
...
148+
if some_condition:
149+
await handler_context.subscription.unsubscribe()
150+
151+
152+
Lastly, if you're wrapping up the whole show, you can reach for unsubscribe_all on the subscription_manager:
153+
154+
.. code-block:: python
155+
156+
await w3.subscription_manager.unsubscribe_all()
157+
assert subscription_manager.subscriptions == []
158+
159+
160+
An example
161+
----------
162+
163+
Let's put all the pieces together. This example will subscribe to new block headers and transfer events from the WETH contract. It should work as written if you provide a WebSocket RPC URL.
164+
165+
.. code-block:: python
166+
167+
import asyncio
168+
from web3 import AsyncWeb3, WebSocketProvider
169+
from web3.utils.subscriptions import (
170+
NewHeadsSubscription,
171+
NewHeadsSubscriptionContext,
172+
LogsSubscription,
173+
LogsSubscriptionContext,
174+
)
175+
176+
# -- declare handlers --
177+
async def new_heads_handler(
178+
handler_context: NewHeadsSubscriptionContext,
179+
) -> None:
180+
header = handler_context.result
181+
print(f"New block header: {header}\n")
182+
183+
async def log_handler(
184+
handler_context: LogsSubscriptionContext,
185+
) -> None:
186+
log_receipt = handler_context.result
187+
print(f"Log receipt: {log_receipt}\n")
188+
189+
async def sub_manager():
190+
191+
# -- initialize provider --
192+
w3 = await AsyncWeb3(WebSocketProvider("wss://..."))
193+
194+
# -- subscribe to event(s) --
195+
await w3.subscription_manager.subscribe(
196+
[
197+
NewHeadsSubscription(
198+
label="new-heads-mainnet",
199+
handler=new_heads_handler
200+
),
201+
LogsSubscription(
202+
label="WETH transfers",
203+
address=w3.to_checksum_address(
204+
"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
205+
),
206+
topics=["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],
207+
handler=log_handler,
208+
),
209+
]
210+
)
211+
212+
# -- listen for events --
213+
await w3.subscription_manager.handle_subscriptions()
214+
215+
asyncio.run(sub_manager())
216+
217+
218+
FAQ
219+
---
220+
221+
222+
How can I subscribe to additional events once my application is running?
223+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
224+
225+
Wherever you have an instance of the ``w3`` object, you can use the ``subscription_manager`` to subscribe to new events.
226+
227+
For example, the handler of one subscription could initialize a new subscription:
228+
229+
.. code-block:: python
230+
231+
async def log_handler(
232+
handler_context: LogsSubscriptionContext,
233+
) -> None:
234+
log_receipt = handler_context.result
235+
print(f"Log receipt: {log_receipt}\n")
236+
237+
# reference the w3 instance
238+
w3 = handler_context.async_w3
239+
240+
# initialize a new subscription
241+
await w3.subscription_manager.subscribe(
242+
NewHeadsSubscription(handler=new_heads_handler)
243+
)

docs/toc.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Table of Contents
1818
transactions
1919
web3.contract
2020
filters
21+
subscriptions
2122
middleware
2223
internals
2324
ens_overview

newsfragments/3600.docs.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Introduce Event Subscriptions docs guide

0 commit comments

Comments
 (0)