Skip to content

Commit 7354e3a

Browse files
committed
restart subnets shovel on failure
1 parent a27d37b commit 7354e3a

File tree

2 files changed

+365
-217
lines changed

2 files changed

+365
-217
lines changed

scraper_service/shovel_subnets/main.py

Lines changed: 101 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,22 @@
55
from shared.shovel_base_class import ShovelBaseClass
66
import logging
77
import rust_bindings
8-
import time
98
from shovel_subnets.utils import create_table, get_axon_cache, get_coldkeys_and_stakes, refresh_axon_cache, default_axon
9+
from shared.exceptions import DatabaseConnectionError, ShovelProcessingError
1010

1111

1212
logging.basicConfig(level=logging.INFO,
1313
format="%(asctime)s %(process)d %(message)s")
1414

15-
# temporary hack around memory leak: restart shovel every 10mins
16-
# TODO: find and fix the actual issue causing the leak!
17-
start_time = time.time()
18-
1915

2016
class SubnetsShovel(ShovelBaseClass):
2117
def process_block(self, n):
22-
cur_time = time.time()
23-
if cur_time - start_time > 600:
24-
logging.info("Restarting shovel to avoid memory leak.")
25-
exit(0)
26-
do_process_block(n)
18+
try:
19+
do_process_block(n)
20+
except Exception as e:
21+
if isinstance(e, (DatabaseConnectionError, ShovelProcessingError)):
22+
raise
23+
raise ShovelProcessingError(f"Failed to process block {n}: {str(e)}")
2724

2825

2926
@retry(
@@ -32,63 +29,100 @@ def process_block(self, n):
3229
stop=stop_after_attempt(15)
3330
)
3431
def do_process_block(n):
35-
# Create table if it doesn't exist
36-
create_table()
37-
38-
(block_timestamp, block_hash) = get_block_metadata(n)
39-
40-
(neurons, hotkeys) = rust_bindings.query_neuron_info(block_hash)
41-
42-
coldkeys_and_stakes = get_coldkeys_and_stakes(
43-
hotkeys, block_timestamp, block_hash, n
44-
)
45-
46-
refresh_axon_cache(block_timestamp, block_hash, n)
47-
48-
axon_cache = get_axon_cache()
49-
for neuron in neurons:
50-
subnet_id = neuron.subnet_id
51-
hotkey = neuron.hotkey
52-
axon = axon_cache.get((subnet_id, hotkey)) or default_axon
53-
coldkey_and_stake = coldkeys_and_stakes.get(hotkey)
54-
if coldkey_and_stake is None:
55-
print(f"{hotkey} has no coldkey and stake!")
56-
exit(1)
57-
58-
buffer_insert("shovel_subnets", [
59-
n, # block_number UInt64 CODEC(Delta, ZSTD),
60-
block_timestamp, # timestamp DateTime CODEC(Delta, ZSTD),
61-
neuron.subnet_id, # subnet_id UInt16 CODEC(Delta, ZSTD),
62-
neuron.neuron_id, # neuron_id UInt16 CODEC(Delta, ZSTD),
63-
64-
f"'{neuron.hotkey}'", # hotkey String CODEC(ZSTD),
65-
f"'{coldkey_and_stake[0]}'", # coldkey String CODEC(ZSTD),
66-
neuron.active, # active Bool CODEC(ZSTD),
67-
68-
axon.block, # axon_block UInt64 CODEC(Delta, ZSTD),
69-
axon.version, # axon_version UInt32 CODEC(Delta, ZSTD),
70-
f'{axon.ip}', # axon_ip String CODEC(ZSTD),
71-
axon.port, # axon_port UInt16 CODEC(Delta, ZSTD),
72-
axon.ip_type, # axon_ip_type UInt8 CODEC(Delta, ZSTD),
73-
axon.protocol, # axon_protocol UInt8 CODEC(Delta, ZSTD),
74-
axon.placeholder1, # axon_placeholder1 UInt8
75-
axon.placeholder2, # axon_placeholder2 UInt8
76-
77-
neuron.rank, # rank UInt16 CODEC(Delta, ZSTD),
78-
neuron.emission, # emission UInt64 CODEC(Delta, ZSTD),
79-
neuron.incentive, # incentive UInt16 CODEC(Delta, ZSTD),
80-
neuron.consensus, # consensus UInt16 CODEC(Delta, ZSTD),
81-
neuron.trust, # trust UInt16 CODEC(Delta, ZSTD),
82-
neuron.validator_trust, # validator_trust UInt16
83-
neuron.dividends, # dividends UInt16 CODEC(Delta, ZSTD),
84-
coldkey_and_stake[1], # stake UInt64 CODEC(Delta, ZSTD),
85-
neuron.weights, # weights Array(Tuple(UInt16, UInt16)),
86-
neuron.bonds, # bonds Array(Tuple(UInt16, UInt16)) CODEC(ZSTD),
87-
neuron.last_update, # last_update UInt64 CODEC(Delta, ZSTD),
88-
89-
neuron.validator_permit, # validator_permit Bool
90-
neuron.pruning_scores # pruning_score UInt16 CODEC(Delta, ZSTD)
91-
])
32+
try:
33+
# Create table if it doesn't exist
34+
try:
35+
create_table()
36+
except Exception as e:
37+
raise DatabaseConnectionError(f"Failed to create/verify table: {str(e)}")
38+
39+
try:
40+
(block_timestamp, block_hash) = get_block_metadata(n)
41+
except Exception as e:
42+
raise ShovelProcessingError(f"Failed to get block metadata: {str(e)}")
43+
44+
try:
45+
(neurons, hotkeys) = rust_bindings.query_neuron_info(block_hash)
46+
if neurons is None or hotkeys is None:
47+
raise ShovelProcessingError("Received None response from query_neuron_info")
48+
except Exception as e:
49+
raise ShovelProcessingError(f"Failed to query neuron info: {str(e)}")
50+
51+
try:
52+
coldkeys_and_stakes = get_coldkeys_and_stakes(
53+
hotkeys, block_timestamp, block_hash, n
54+
)
55+
if coldkeys_and_stakes is None:
56+
raise ShovelProcessingError("Received None response from get_coldkeys_and_stakes")
57+
except Exception as e:
58+
if isinstance(e, DatabaseConnectionError):
59+
raise
60+
raise ShovelProcessingError(f"Failed to get coldkeys and stakes: {str(e)}")
61+
62+
try:
63+
refresh_axon_cache(block_timestamp, block_hash, n)
64+
except Exception as e:
65+
if isinstance(e, DatabaseConnectionError):
66+
raise
67+
raise ShovelProcessingError(f"Failed to refresh axon cache: {str(e)}")
68+
69+
axon_cache = get_axon_cache()
70+
71+
try:
72+
for neuron in neurons:
73+
subnet_id = neuron.subnet_id
74+
hotkey = neuron.hotkey
75+
axon = axon_cache.get((subnet_id, hotkey)) or default_axon
76+
coldkey_and_stake = coldkeys_and_stakes.get(hotkey)
77+
if coldkey_and_stake is None:
78+
logging.error(f"{hotkey} has no coldkey and stake!")
79+
raise ShovelProcessingError(f"Neuron {hotkey} has no coldkey and stake data")
80+
81+
buffer_insert("shovel_subnets", [
82+
n, # block_number UInt64 CODEC(Delta, ZSTD),
83+
block_timestamp, # timestamp DateTime CODEC(Delta, ZSTD),
84+
neuron.subnet_id, # subnet_id UInt16 CODEC(Delta, ZSTD),
85+
neuron.neuron_id, # neuron_id UInt16 CODEC(Delta, ZSTD),
86+
87+
f"'{neuron.hotkey}'", # hotkey String CODEC(ZSTD),
88+
f"'{coldkey_and_stake[0]}'", # coldkey String CODEC(ZSTD),
89+
neuron.active, # active Bool CODEC(ZSTD),
90+
91+
axon.block, # axon_block UInt64 CODEC(Delta, ZSTD),
92+
axon.version, # axon_version UInt32 CODEC(Delta, ZSTD),
93+
f'{axon.ip}', # axon_ip String CODEC(ZSTD),
94+
axon.port, # axon_port UInt16 CODEC(Delta, ZSTD),
95+
axon.ip_type, # axon_ip_type UInt8 CODEC(Delta, ZSTD),
96+
axon.protocol, # axon_protocol UInt8 CODEC(Delta, ZSTD),
97+
axon.placeholder1, # axon_placeholder1 UInt8
98+
axon.placeholder2, # axon_placeholder2 UInt8
99+
100+
neuron.rank, # rank UInt16 CODEC(Delta, ZSTD),
101+
neuron.emission, # emission UInt64 CODEC(Delta, ZSTD),
102+
neuron.incentive, # incentive UInt16 CODEC(Delta, ZSTD),
103+
neuron.consensus, # consensus UInt16 CODEC(Delta, ZSTD),
104+
neuron.trust, # trust UInt16 CODEC(Delta, ZSTD),
105+
neuron.validator_trust, # validator_trust UInt16
106+
neuron.dividends, # dividends UInt16 CODEC(Delta, ZSTD),
107+
coldkey_and_stake[1], # stake UInt64 CODEC(Delta, ZSTD),
108+
neuron.weights, # weights Array(Tuple(UInt16, UInt16)),
109+
neuron.bonds, # bonds Array(Tuple(UInt16, UInt16)) CODEC(ZSTD),
110+
neuron.last_update, # last_update UInt64 CODEC(Delta, ZSTD),
111+
112+
neuron.validator_permit,
113+
neuron.pruning_scores # pruning_score UInt16 CODEC(Delta, ZSTD)
114+
])
115+
except Exception as e:
116+
if isinstance(e, DatabaseConnectionError):
117+
raise
118+
raise ShovelProcessingError(f"Failed to process neuron data: {str(e)}")
119+
120+
except (DatabaseConnectionError, ShovelProcessingError):
121+
# Re-raise these exceptions to be handled by the base class
122+
raise
123+
except Exception as e:
124+
# Catch any other unexpected errors and wrap them
125+
raise ShovelProcessingError(f"Unexpected error in do_process_block: {str(e)}")
92126

93127

94128
def main():

0 commit comments

Comments
 (0)