Skip to content

Commit b187f8e

Browse files
committed
Subscription manager bugfixes:
- Fix ``run_forever`` logic so that it doesn't require subscriptions to start and doesn't end when no subscriptions are present. - Fix the ``unsubscribe`` method so that it can accept subscription ids. - Fix the ``unsubscribe`` method so it behaves like the ``subscribe`` method, accepting both single or multiple subscriptions or subscription ids. Note this does change the signature from ``subscription`` to ``subscriptions``, but I think this can be a bugfix since the ``subscribe`` method accepts multiple subscriptions and ``unsubscribe`` should behave similarly. - Fix ``subscribe`` so that it returns a list of subscription ids when a list of subscriptions is provided. Add tests: - Add a test for ``run_forever`` to make sure it can start without subscriptions and that the task does not stop when no subscriptions are present. - Add a test for ``unsubscribe`` to make sure it can accept multiple subscriptions as hexstr ids or as subscription objects. - Update subscriptions tests to check that a list of ids is returned when multiple subscriptions are provided to the ``subscribe`` method.
1 parent f4bd3f8 commit b187f8e

File tree

3 files changed

+152
-37
lines changed

3 files changed

+152
-37
lines changed

tests/core/subscriptions/test_subscription_manager.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ async def test_subscription_default_labels_are_unique(subscription_manager):
4646
sub3 = NewHeadsSubscription()
4747
sub4 = NewHeadsSubscription()
4848

49-
await subscription_manager.subscribe([sub1, sub2, sub3, sub4])
49+
sub_ids = await subscription_manager.subscribe([sub1, sub2, sub3, sub4])
50+
assert sub_ids == ["0x0", "0x1", "0x2", "0x3"]
5051

5152
assert sub1.label != sub2.label != sub3.label != sub4.label
5253
assert sub1.label == "NewHeadsSubscription('newHeads',)"
@@ -133,3 +134,23 @@ async def test_unsubscribe_all_clears_all_subscriptions(subscription_manager):
133134
assert sub_container.subscriptions == []
134135
assert sub_container.subscriptions_by_id == {}
135136
assert sub_container.subscriptions_by_label == {}
137+
138+
139+
@pytest.mark.asyncio
140+
async def test_unsubscribe_with_hex_ids(subscription_manager):
141+
sub1 = NewHeadsSubscription()
142+
sub2 = PendingTxSubscription()
143+
sub3 = NewHeadsSubscription()
144+
sub4 = LogsSubscription()
145+
146+
sub_id1, sub_id2, sub_id3, sub_id4 = await subscription_manager.subscribe(
147+
[sub1, sub2, sub3, sub4]
148+
)
149+
150+
assert subscription_manager.subscriptions == [sub1, sub2, sub3, sub4]
151+
152+
assert await subscription_manager.unsubscribe(sub_id1) is True
153+
assert subscription_manager.subscriptions == [sub2, sub3, sub4]
154+
155+
assert await subscription_manager.unsubscribe([sub_id2, sub_id3]) is True
156+
assert subscription_manager.subscriptions == [sub4]

web3/_utils/module_testing/persistent_connection_provider.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,3 +876,41 @@ async def unsubscribe_subs(
876876

877877
assert_no_subscriptions_left(sub_manager._subscription_container)
878878
await clean_up_task(unsubscribe_task)
879+
880+
@pytest.mark.asyncio
881+
async def test_run_forever_starts_with_0_subs_and_runs_until_task_cancelled(
882+
self, async_w3: AsyncWeb3
883+
) -> None:
884+
sub_manager = async_w3.subscription_manager
885+
assert_no_subscriptions_left(sub_manager._subscription_container)
886+
887+
run_forever_task = asyncio.create_task(
888+
sub_manager.handle_subscriptions(run_forever=True)
889+
)
890+
891+
await asyncio.sleep(0.1)
892+
assert run_forever_task.done() is False
893+
assert sub_manager.subscriptions == []
894+
895+
# subscribe to newHeads and validate it
896+
new_heads_handler_test = SubscriptionHandlerTest()
897+
sub1 = NewHeadsSubscription(
898+
label="foo",
899+
handler=new_heads_handler,
900+
handler_context={"new_heads_handler_test": new_heads_handler_test},
901+
)
902+
sub_id = await sub_manager.subscribe(sub1)
903+
assert is_hexstr(sub_id)
904+
assert len(sub_manager.subscriptions) == 1
905+
assert sub_manager.subscriptions[0] == sub1
906+
907+
# wait for the handler to unsubscribe
908+
while sub_manager.subscriptions:
909+
await asyncio.sleep(0.1)
910+
911+
assert new_heads_handler_test.passed
912+
assert run_forever_task.done() is False
913+
assert run_forever_task.cancelled() is False
914+
915+
# cleanup
916+
await clean_up_task(run_forever_task)

web3/providers/persistent/subscription_manager.py

Lines changed: 92 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -133,34 +133,88 @@ async def subscribe(
133133

134134
sub_ids: List[HexStr] = []
135135
for sub in subscriptions:
136-
await self.subscribe(sub)
136+
sub_ids.append(await self.subscribe(sub))
137137
return sub_ids
138138
raise Web3TypeError("Expected a Subscription or a sequence of Subscriptions.")
139139

140-
async def unsubscribe(self, subscription: EthSubscription[Any]) -> bool:
140+
@overload
141+
async def unsubscribe(self, subscriptions: EthSubscription[Any]) -> bool:
142+
...
143+
144+
@overload
145+
async def unsubscribe(self, subscriptions: HexStr) -> bool:
146+
...
147+
148+
@overload
149+
async def unsubscribe(
150+
self,
151+
subscriptions: Sequence[Union[EthSubscription[Any], HexStr]],
152+
) -> bool:
153+
...
154+
155+
async def unsubscribe(
156+
self,
157+
subscriptions: Union[
158+
EthSubscription[Any],
159+
HexStr,
160+
Sequence[Union[EthSubscription[Any], HexStr]],
161+
],
162+
) -> bool:
141163
"""
142-
Used to unsubscribe from a subscription.
164+
Used to unsubscribe from one or multiple subscriptions.
143165
144-
:param subscription: The subscription to unsubscribe from.
145-
:type subscription: EthSubscription
146-
:return: ``True`` if unsubscribing was successful, ``False`` otherwise.
166+
:param subscriptions: The subscription(s) to unsubscribe from.
167+
:type subscriptions: Union[EthSubscription, Sequence[EthSubscription], HexStr,
168+
Sequence[HexStr]]
169+
:return: ``True`` if unsubscribing to all was successful, ``False`` otherwise
170+
with a warning.
147171
:rtype: bool
148172
"""
149-
if subscription not in self.subscriptions:
150-
raise Web3ValueError(
151-
"Subscription not found or is not being managed by the subscription "
152-
f"manager.\n label: {subscription.label}\n id: {subscription._id}"
153-
)
154-
if await self._w3.eth._unsubscribe(subscription.id):
155-
self._remove_subscription(subscription)
156-
self.logger.info(
157-
"Successfully unsubscribed from subscription:\n "
158-
f"label: {subscription.label}\n id: {subscription.id}"
159-
)
160-
if len(self._subscription_container.handler_subscriptions) == 0:
161-
queue = self._provider._request_processor._handler_subscription_queue
162-
await queue.put(SubscriptionProcessingFinished())
163-
return True
173+
if isinstance(subscriptions, EthSubscription) or isinstance(subscriptions, str):
174+
if isinstance(subscriptions, str):
175+
subscription_id = subscriptions
176+
subscriptions = self.get_by_id(subscription_id)
177+
if subscriptions is None:
178+
raise Web3ValueError(
179+
"Subscription not found or is not being managed by the "
180+
f"subscription manager.\n id: {subscription_id}"
181+
)
182+
183+
if subscriptions not in self.subscriptions:
184+
raise Web3ValueError(
185+
"Subscription not found or is not being managed by the "
186+
"subscription manager.\n "
187+
f"label: {subscriptions.label}\n id: {subscriptions._id}"
188+
)
189+
190+
if await self._w3.eth._unsubscribe(subscriptions.id):
191+
self._remove_subscription(subscriptions)
192+
self.logger.info(
193+
"Successfully unsubscribed from subscription:\n "
194+
f"label: {subscriptions.label}\n id: {subscriptions.id}"
195+
)
196+
197+
if len(self._subscription_container.handler_subscriptions) == 0:
198+
queue = (
199+
self._provider._request_processor._handler_subscription_queue
200+
)
201+
await queue.put(SubscriptionProcessingFinished())
202+
return True
203+
204+
elif isinstance(subscriptions, Sequence):
205+
if len(subscriptions) == 0:
206+
raise Web3ValueError("No subscriptions provided.")
207+
208+
unsubscribed: List[bool] = []
209+
for sub in subscriptions:
210+
if isinstance(sub, str):
211+
sub = HexStr(sub)
212+
unsubscribed.append(await self.unsubscribe(sub))
213+
return all(unsubscribed)
214+
215+
self.logger.warning(
216+
f"Failed to unsubscribe from subscription\n subscription={subscriptions}"
217+
)
164218
return False
165219

166220
async def unsubscribe_all(self) -> bool:
@@ -195,15 +249,15 @@ async def handle_subscriptions(self, run_forever: bool = False) -> None:
195249
:type run_forever: bool
196250
:return: None
197251
"""
198-
if not self._subscription_container.handler_subscriptions:
252+
if not self._subscription_container.handler_subscriptions and not run_forever:
199253
self.logger.warning(
200254
"No handler subscriptions found. Subscription handler did not run."
201255
)
202256
return
203257

204258
queue = self._provider._request_processor._handler_subscription_queue
205-
try:
206-
while run_forever or self._subscription_container.handler_subscriptions:
259+
while run_forever or self._subscription_container.handler_subscriptions:
260+
try:
207261
response = cast(RPCResponse, await queue.get())
208262
formatted_sub_response = cast(
209263
FormattedEthSubscriptionResponse,
@@ -225,18 +279,20 @@ async def handle_subscriptions(self, run_forever: bool = False) -> None:
225279
**sub._handler_context,
226280
)
227281
)
228-
except SubscriptionProcessingFinished:
229-
self.logger.info(
230-
"All handler subscriptions have been unsubscribed from. "
231-
"Stopping subscription handling."
232-
)
233-
except TaskNotRunning:
234-
await asyncio.sleep(0)
235-
self._provider._handle_listener_task_exceptions()
236-
self.logger.error(
237-
"Message listener background task for the provider has stopped "
238-
"unexpectedly. Stopping subscription handling."
239-
)
282+
except SubscriptionProcessingFinished:
283+
if not run_forever:
284+
self.logger.info(
285+
"All handler subscriptions have been unsubscribed from. "
286+
"Stopping subscription handling."
287+
)
288+
break
289+
except TaskNotRunning:
290+
await asyncio.sleep(0)
291+
self._provider._handle_listener_task_exceptions()
292+
self.logger.error(
293+
"Message listener background task for the provider has stopped "
294+
"unexpectedly. Stopping subscription handling."
295+
)
240296

241297
# no active handler subscriptions, clear the handler subscription queue
242298
self._provider._request_processor._reset_handler_subscription_queue()

0 commit comments

Comments
 (0)