Skip to content

Commit 472232c

Browse files
authored
Merge pull request #10 from opentensor/restart-shovel-on-failure
Restart shovels on failure
2 parents 1c906af + 30ec993 commit 472232c

File tree

14 files changed

+1289
-804
lines changed

14 files changed

+1289
-804
lines changed

scraper_service/requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ requests==2.32.3
2727
scalecodec==1.2.10
2828
six==1.16.0
2929
substrate-interface==1.7.10
30-
tenacity==9.0.0
3130
toolz==0.12.1
3231
tqdm==4.66.4
3332
typing_extensions==4.12.2
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
class ShovelException(Exception):
2+
"""Base exception for all shovel-related errors"""
3+
pass
4+
5+
class ShovelProcessingError(ShovelException):
6+
"""Fatal error that should crash the process"""
7+
pass
8+
9+
class DatabaseConnectionError(ShovelException):
10+
"""Retryable error for database connection issues"""
11+
pass
Lines changed: 91 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
from shared.clickhouse.batch_insert import buffer_insert, flush_buffer, batch_insert_into_clickhouse_table
2-
from shared.substrate import get_substrate_client
2+
from shared.substrate import get_substrate_client, reconnect_substrate
33
from time import sleep
44
from shared.clickhouse.utils import (
55
get_clickhouse_client,
66
table_exists,
77
)
8+
from shared.exceptions import DatabaseConnectionError, ShovelProcessingError
89
from tqdm import tqdm
910
import logging
1011
import threading
1112
from concurrent.futures import ThreadPoolExecutor
13+
import sys
1214

1315

1416
class ShovelBaseClass:
1517
checkpoint_block_number = 0
1618
last_buffer_flush_call_block_number = 0
1719
name = None
1820
skip_interval = 1
21+
MAX_RETRIES = 3
22+
RETRY_DELAY = 5
1923

2024
def __init__(self, name, skip_interval=1):
2125
"""
@@ -26,49 +30,80 @@ def __init__(self, name, skip_interval=1):
2630
self.starting_block = 0 # Default value, can be overridden by subclasses
2731

2832
def start(self):
29-
print("Initialising Substrate client")
30-
substrate = get_substrate_client()
31-
32-
print("Fetching the finalized block")
33-
finalized_block_hash = substrate.get_chain_finalised_head()
34-
finalized_block_number = substrate.get_block_number(
35-
finalized_block_hash)
36-
37-
# Start the clickhouse buffer
38-
print("Starting Clickhouse buffer")
39-
executor = ThreadPoolExecutor(max_workers=1)
40-
threading.Thread(
41-
target=flush_buffer,
42-
args=(executor, self._buffer_flush_started, self._buffer_flush_done),
43-
).start()
44-
45-
last_scraped_block_number = self.get_checkpoint()
46-
logging.info(f"Last scraped block is {last_scraped_block_number}")
47-
48-
# Create a list of block numbers to scrape
33+
retry_count = 0
4934
while True:
50-
block_numbers = tqdm(
51-
range(last_scraped_block_number +
52-
1, finalized_block_number + 1, self.skip_interval)
53-
)
54-
55-
if len(block_numbers) > 0:
56-
logging.info(
57-
f"Catching up {len(block_numbers)} blocks")
58-
for block_number in block_numbers:
59-
self.process_block(block_number)
60-
self.checkpoint_block_number = block_number
61-
else:
62-
logging.info(
63-
"Already up to latest finalized block, checking again in 12s...")
64-
65-
# Make sure to sleep so buffer with checkpoint update is flushed to Clickhouse
66-
# before trying again
67-
sleep(12)
68-
last_scraped_block_number = self.get_checkpoint()
69-
finalized_block_hash = substrate.get_chain_finalised_head()
70-
finalized_block_number = substrate.get_block_number(
71-
finalized_block_hash)
35+
try:
36+
print("Initialising Substrate client")
37+
substrate = get_substrate_client()
38+
39+
print("Fetching the finalized block")
40+
finalized_block_hash = substrate.get_chain_finalised_head()
41+
finalized_block_number = substrate.get_block_number(finalized_block_hash)
42+
43+
# Start the clickhouse buffer
44+
print("Starting Clickhouse buffer")
45+
executor = ThreadPoolExecutor(max_workers=1)
46+
buffer_thread = threading.Thread(
47+
target=flush_buffer,
48+
args=(executor, self._buffer_flush_started, self._buffer_flush_done),
49+
daemon=True # Make it a daemon thread so it exits with the main thread
50+
)
51+
buffer_thread.start()
52+
53+
last_scraped_block_number = self.get_checkpoint()
54+
logging.info(f"Last scraped block is {last_scraped_block_number}")
55+
56+
# Create a list of block numbers to scrape
57+
while True:
58+
try:
59+
block_numbers = list(range(
60+
last_scraped_block_number + 1,
61+
finalized_block_number + 1,
62+
self.skip_interval
63+
))
64+
65+
if len(block_numbers) > 0:
66+
logging.info(f"Catching up {len(block_numbers)} blocks")
67+
for block_number in tqdm(block_numbers):
68+
try:
69+
self.process_block(block_number)
70+
self.checkpoint_block_number = block_number
71+
except DatabaseConnectionError as e:
72+
logging.error(f"Database connection error while processing block {block_number}: {str(e)}")
73+
raise # Re-raise to be caught by outer try-except
74+
except Exception as e:
75+
logging.error(f"Fatal error while processing block {block_number}: {str(e)}")
76+
raise ShovelProcessingError(f"Failed to process block {block_number}: {str(e)}")
77+
else:
78+
logging.info("Already up to latest finalized block, checking again in 12s...")
79+
80+
# Reset retry count on successful iteration
81+
retry_count = 0
82+
83+
# Make sure to sleep so buffer with checkpoint update is flushed to Clickhouse
84+
sleep(12)
85+
last_scraped_block_number = self.get_checkpoint()
86+
finalized_block_hash = substrate.get_chain_finalised_head()
87+
finalized_block_number = substrate.get_block_number(finalized_block_hash)
88+
89+
except DatabaseConnectionError as e:
90+
retry_count += 1
91+
if retry_count > self.MAX_RETRIES:
92+
logging.error(f"Max retries ({self.MAX_RETRIES}) exceeded for database connection. Exiting.")
93+
raise ShovelProcessingError("Max database connection retries exceeded")
94+
95+
logging.warning(f"Database connection error (attempt {retry_count}/{self.MAX_RETRIES}): {str(e)}")
96+
logging.info(f"Retrying in {self.RETRY_DELAY} seconds...")
97+
sleep(self.RETRY_DELAY)
98+
reconnect_substrate() # Try to reconnect to substrate
99+
continue
100+
101+
except ShovelProcessingError as e:
102+
logging.error(f"Fatal shovel error: {str(e)}")
103+
sys.exit(1)
104+
except Exception as e:
105+
logging.error(f"Unexpected error: {str(e)}")
106+
sys.exit(1)
72107

73108
def process_block(self, n):
74109
raise NotImplementedError(
@@ -106,7 +141,18 @@ def _buffer_flush_done(self, tables, rows):
106141

107142
def get_checkpoint(self):
108143
if not table_exists("shovel_checkpoints"):
109-
return self.starting_block - 1
144+
return max(0, self.starting_block - 1)
145+
146+
# First check if our shovel has any entries
147+
query = f"""
148+
SELECT count(*)
149+
FROM shovel_checkpoints
150+
WHERE shovel_name = '{self.name}'
151+
"""
152+
count = get_clickhouse_client().execute(query)[0][0]
153+
if count == 0:
154+
return max(0, self.starting_block - 1)
155+
110156
query = f"""
111157
SELECT block_number
112158
FROM shovel_checkpoints
@@ -118,4 +164,4 @@ def get_checkpoint(self):
118164
if res:
119165
return res[0][0]
120166
else:
121-
return self.starting_block - 1
167+
return max(0, self.starting_block - 1) # This case shouldn't happen due to count check above

scraper_service/shovel_block_timestamp/main.py

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
get_clickhouse_client,
66
table_exists,
77
)
8+
from shared.exceptions import DatabaseConnectionError, ShovelProcessingError
89
import logging
910

1011

@@ -20,31 +21,50 @@ def process_block(self, n):
2021

2122

2223
def do_process_block(self, n):
23-
substrate = get_substrate_client()
24-
25-
# Create table if it doesn't exist
26-
if not table_exists(self.table_name):
27-
query = f"""
28-
CREATE TABLE IF NOT EXISTS {self.table_name} (
29-
block_number UInt64 CODEC(Delta, ZSTD),
30-
timestamp DateTime CODEC(Delta, ZSTD),
31-
) ENGINE = ReplacingMergeTree()
32-
PARTITION BY toYYYYMM(timestamp)
33-
ORDER BY block_number
34-
"""
35-
get_clickhouse_client().execute(query)
36-
37-
block_hash = substrate.get_block_hash(n)
38-
block_timestamp = int(
39-
substrate.query(
40-
"Timestamp",
41-
"Now",
42-
block_hash=block_hash,
43-
).serialize()
44-
/ 1000
45-
)
46-
47-
buffer_insert(self.table_name, [n, block_timestamp])
24+
try:
25+
substrate = get_substrate_client()
26+
27+
try:
28+
if not table_exists(self.table_name):
29+
query = f"""
30+
CREATE TABLE IF NOT EXISTS {self.table_name} (
31+
block_number UInt64 CODEC(Delta, ZSTD),
32+
timestamp DateTime CODEC(Delta, ZSTD),
33+
) ENGINE = ReplacingMergeTree()
34+
PARTITION BY toYYYYMM(timestamp)
35+
ORDER BY block_number
36+
"""
37+
get_clickhouse_client().execute(query)
38+
except Exception as e:
39+
raise DatabaseConnectionError(f"Failed to create/check table: {str(e)}")
40+
41+
try:
42+
block_hash = substrate.get_block_hash(n)
43+
block_timestamp = int(
44+
substrate.query(
45+
"Timestamp",
46+
"Now",
47+
block_hash=block_hash,
48+
).serialize()
49+
/ 1000
50+
)
51+
except Exception as e:
52+
raise ShovelProcessingError(f"Failed to get block timestamp from substrate: {str(e)}")
53+
54+
if block_timestamp == 0 and n != 0:
55+
raise ShovelProcessingError(f"Invalid block timestamp (0) for block {n}")
56+
57+
try:
58+
buffer_insert(self.table_name, [n, block_timestamp])
59+
except Exception as e:
60+
raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}")
61+
62+
except (DatabaseConnectionError, ShovelProcessingError):
63+
# Re-raise these exceptions to be handled by the base class
64+
raise
65+
except Exception as e:
66+
# Convert unexpected exceptions to ShovelProcessingError
67+
raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}")
4868

4969

5070
def main():

0 commit comments

Comments
 (0)