|
3 | 3 | import asyncio
|
4 | 4 | import logging
|
5 | 5 | import time
|
| 6 | +from collections.abc import Awaitable, Sequence |
6 | 7 | from pathlib import Path
|
7 | 8 | from typing import TYPE_CHECKING, ClassVar, Optional, cast
|
8 | 9 |
|
@@ -65,6 +66,47 @@ def _plot_passes_filter(self, plot_info: PlotInfo, challenge: harvester_protocol
|
65 | 66 | challenge.sp_hash,
|
66 | 67 | )
|
67 | 68 |
|
| 69 | + async def _handle_v1_responses( |
| 70 | + self, |
| 71 | + awaitables: Sequence[Awaitable[tuple[Path, list[harvester_protocol.NewProofOfSpace]]]], |
| 72 | + start_time: float, |
| 73 | + peer: WSChiaConnection, |
| 74 | + ) -> int: |
| 75 | + proofs_found = 0 |
| 76 | + for filename_sublist_awaitable in asyncio.as_completed(awaitables): |
| 77 | + filename, sublist = await filename_sublist_awaitable |
| 78 | + time_taken = time.monotonic() - start_time |
| 79 | + if time_taken > 8: |
| 80 | + self.harvester.log.warning( |
| 81 | + f"Looking up qualities on {filename} took: {time_taken}. This should be below 8 seconds" |
| 82 | + f" to minimize risk of losing rewards." |
| 83 | + ) |
| 84 | + for response in sublist: |
| 85 | + proofs_found += 1 |
| 86 | + msg = make_msg(ProtocolMessageTypes.new_proof_of_space, response) |
| 87 | + await peer.send_message(msg) |
| 88 | + return proofs_found |
| 89 | + |
| 90 | + async def _handle_v2_responses( |
| 91 | + self, v2_awaitables: Sequence[Awaitable[Optional[PartialProofsData]]], start_time: float, peer: WSChiaConnection |
| 92 | + ) -> int: |
| 93 | + partial_proofs_found = 0 |
| 94 | + for quality_awaitable in asyncio.as_completed(v2_awaitables): |
| 95 | + partial_proofs_data = await quality_awaitable |
| 96 | + if partial_proofs_data is None: |
| 97 | + continue |
| 98 | + time_taken = time.monotonic() - start_time |
| 99 | + if time_taken > 8: |
| 100 | + self.harvester.log.warning( |
| 101 | + f"Looking up partial proofs on {partial_proofs_data.plot_identifier}" |
| 102 | + f"took: {time_taken}. This should be below 8 seconds" |
| 103 | + f"to minimize risk of losing rewards." |
| 104 | + ) |
| 105 | + partial_proofs_found += len(partial_proofs_data.partial_proofs) |
| 106 | + msg = make_msg(ProtocolMessageTypes.partial_proofs, partial_proofs_data) |
| 107 | + await peer.send_message(msg) |
| 108 | + return partial_proofs_found |
| 109 | + |
68 | 110 | @metadata.request(peer_required=True)
|
69 | 111 | async def harvester_handshake(
|
70 | 112 | self, harvester_handshake: harvester_protocol.HarvesterHandshake, peer: WSChiaConnection
|
@@ -354,30 +396,22 @@ async def lookup_challenge(
|
354 | 396 | total_proofs_found = 0
|
355 | 397 | total_v2_partial_proofs_found = 0
|
356 | 398 |
|
357 |
| - # Process V1 plot responses (existing flow) |
358 |
| - for filename_sublist_awaitable in asyncio.as_completed(awaitables): |
359 |
| - filename, sublist = await filename_sublist_awaitable |
360 |
| - time_taken = time.monotonic() - start |
361 |
| - if time_taken > 8: |
362 |
| - self.harvester.log.warning( |
363 |
| - f"Looking up qualities on {filename} took: {time_taken}. This should be below 8 seconds" |
364 |
| - f" to minimize risk of losing rewards." |
365 |
| - ) |
366 |
| - else: |
367 |
| - pass |
368 |
| - # self.harvester.log.info(f"Looking up qualities on {filename} took: {time_taken}") |
369 |
| - for response in sublist: |
370 |
| - total_proofs_found += 1 |
371 |
| - msg = make_msg(ProtocolMessageTypes.new_proof_of_space, response) |
372 |
| - await peer.send_message(msg) |
373 |
| - |
374 |
| - # Process V2 plot quality collections (new flow) |
375 |
| - for quality_awaitable in asyncio.as_completed(v2_awaitables): |
376 |
| - partial_proofs_data = await quality_awaitable |
377 |
| - if partial_proofs_data is not None: |
378 |
| - total_v2_partial_proofs_found += len(partial_proofs_data.partial_proofs) |
379 |
| - msg = make_msg(ProtocolMessageTypes.partial_proofs, partial_proofs_data) |
380 |
| - await peer.send_message(msg) |
| 399 | + # run both concurrently |
| 400 | + tasks = [] |
| 401 | + if awaitables: |
| 402 | + tasks.append(self._handle_v1_responses(awaitables, start, peer)) |
| 403 | + if v2_awaitables: |
| 404 | + tasks.append(self._handle_v2_responses(v2_awaitables, start, peer)) |
| 405 | + |
| 406 | + if tasks: |
| 407 | + results = await asyncio.gather(*tasks) |
| 408 | + if len(results) == 2: |
| 409 | + total_proofs_found, total_v2_partial_proofs_found = results |
| 410 | + elif len(results) == 1: |
| 411 | + if awaitables: |
| 412 | + total_proofs_found = results[0] |
| 413 | + else: |
| 414 | + total_v2_partial_proofs_found = results[0] |
381 | 415 |
|
382 | 416 | now = uint64(time.time())
|
383 | 417 |
|
|
0 commit comments