Skip to content

Commit b260fa9

Browse files
committed
Improve prefetcher
1 parent ae9d933 commit b260fa9

File tree

3 files changed

+122
-100
lines changed

3 files changed

+122
-100
lines changed

app/services/eth_block_importer.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,14 @@ def import_single_block(block_number)
329329
start = Time.current
330330

331331
# Fetch block data from prefetcher
332-
ImportProfiler.start("prefetch_fetch")
333-
response = @prefetcher.fetch(block_number)
334-
ImportProfiler.stop("prefetch_fetch")
332+
begin
333+
ImportProfiler.start('prefetcher_fetch')
334+
response = prefetcher.fetch(block_number)
335+
rescue L1RpcPrefetcher::BlockFetchError => e
336+
raise BlockNotReadyToImportError.new(e.message)
337+
ensure
338+
ImportProfiler.stop('prefetcher_fetch')
339+
end
335340

336341
# Handle cancellation, fetch failure, or block not ready
337342
if response.nil?

config/derive_ethscriptions_blocks.rb

Lines changed: 64 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -116,95 +116,86 @@ module Clockwork
116116
every(import_interval.seconds, 'import_ethscriptions_blocks') do
117117
importer = EthBlockImporter.new
118118

119-
# Handle Ctrl+C gracefully
120-
Signal.trap('INT') do
121-
puts "\n[#{Time.now}] Received INT signal, shutting down..."
122-
importer.shutdown
123-
exit 130
124-
end
125-
126-
Signal.trap('TERM') do
127-
puts "\n[#{Time.now}] Received TERM signal, shutting down..."
128-
importer.shutdown
129-
exit 143
130-
end
131-
132119
# Track statistics
133120
total_blocks_imported = 0
134-
total_ethscriptions = 0
135121
start_time = Time.now
136122

137-
loop do
138-
begin
139-
initial_block = importer.current_max_eth_block_number
123+
begin
124+
loop do
125+
begin
126+
initial_block = importer.current_max_eth_block_number
140127

141-
# Import blocks
142-
importer.import_blocks_until_done
128+
# Import blocks
129+
importer.import_blocks_until_done
143130

144-
final_block = importer.current_max_eth_block_number
145-
blocks_imported = final_block - initial_block
131+
final_block = importer.current_max_eth_block_number
132+
blocks_imported = final_block - initial_block
146133

147-
if blocks_imported > 0
148-
total_blocks_imported += blocks_imported
134+
if blocks_imported > 0
135+
total_blocks_imported += blocks_imported
149136

150-
puts "[#{Time.now}] Imported #{blocks_imported} blocks (#{initial_block + 1} to #{final_block})"
137+
puts "[#{Time.now}] Imported #{blocks_imported} blocks (#{initial_block + 1} to #{final_block})"
151138

152-
# Show validation summary if enabled
153-
if ENV.fetch('VALIDATION_ENABLED').casecmp?('true')
154-
puts importer.validation_summary
155-
end
156-
else
157-
# We're caught up
158-
elapsed = (Time.now - start_time).round(2)
139+
# Show validation summary if enabled
140+
if ENV.fetch('VALIDATION_ENABLED').casecmp?('true')
141+
puts importer.validation_summary
142+
end
143+
else
144+
# We're caught up
145+
elapsed = (Time.now - start_time).round(2)
146+
147+
if total_blocks_imported > 0
148+
puts "[#{Time.now}] Session summary: Imported #{total_blocks_imported} blocks in #{elapsed}s"
159149

160-
if total_blocks_imported > 0
161-
puts "[#{Time.now}] Session summary: Imported #{total_blocks_imported} blocks in #{elapsed}s"
150+
# Reset counters
151+
total_blocks_imported = 0
152+
start_time = Time.now
153+
end
162154

163-
# Reset counters
164-
total_blocks_imported = 0
165-
start_time = Time.now
155+
puts "[#{Time.now}] Caught up at block #{final_block}. Waiting #{import_interval}s..."
166156
end
167157

168-
puts "[#{Time.now}] Caught up at block #{final_block}. Waiting #{import_interval}s..."
158+
rescue EthBlockImporter::BlockNotReadyToImportError => e
159+
# This is normal when caught up
160+
current = importer.current_max_eth_block_number
161+
puts "[#{Time.now}] Waiting for new blocks (current: #{current})..."
162+
163+
rescue EthBlockImporter::ReorgDetectedError => e
164+
Rails.logger.warn "[#{Time.now}] ⚠️ Reorg detected! Reinitializing importer..."
165+
puts "[#{Time.now}] ⚠️ Reorg detected at block #{importer.current_max_eth_block_number}"
166+
167+
# Reinitialize importer to handle reorg
168+
importer.shutdown
169+
importer = EthBlockImporter.new
170+
puts "[#{Time.now}] Importer reinitialized. Continuing from block #{importer.current_max_eth_block_number}"
171+
172+
rescue EthBlockImporter::ValidationFailureError => e
173+
Rails.logger.fatal "[#{Time.now}] 🛑 VALIDATION FAILURE: #{e.message}"
174+
puts "[#{Time.now}] 🛑 VALIDATION FAILURE - System stopping for investigation"
175+
puts "[#{Time.now}] Fix the validation issue and restart manually"
176+
exit 1
177+
178+
rescue EthBlockImporter::ValidationStalledError => e
179+
# Validation is behind - wait longer and keep trying
180+
Rails.logger.info "[#{Time.now}] ⏸️ VALIDATION BEHIND: #{e.message}"
181+
puts "[#{Time.now}] ⏸️ Validation is behind - waiting #{import_interval * 2}s for validation to catch up..."
182+
sleep import_interval * 2 # Wait longer when validation is behind
183+
# Don't exit - continue the loop to retry
184+
185+
rescue => e
186+
Rails.logger.error "Import error: #{e.class} - #{e.message}"
187+
Rails.logger.error e.backtrace.first(20).join("\n")
188+
189+
puts "[#{Time.now}] ❌ Error: #{e.message}"
190+
191+
# For other errors, wait and retry
192+
puts "[#{Time.now}] Retrying in #{import_interval}s..."
169193
end
170194

171-
rescue EthBlockImporter::BlockNotReadyToImportError => e
172-
# This is normal when caught up
173-
current = importer.current_max_eth_block_number
174-
puts "[#{Time.now}] Waiting for new blocks (current: #{current})..."
175-
176-
rescue EthBlockImporter::ReorgDetectedError => e
177-
Rails.logger.warn "[#{Time.now}] ⚠️ Reorg detected! Reinitializing importer..."
178-
puts "[#{Time.now}] ⚠️ Reorg detected at block #{importer.current_max_eth_block_number}"
179-
180-
# Reinitialize importer to handle reorg
181-
importer = EthBlockImporter.new
182-
puts "[#{Time.now}] Importer reinitialized. Continuing from block #{importer.current_max_eth_block_number}"
183-
184-
rescue EthBlockImporter::ValidationFailureError => e
185-
Rails.logger.fatal "[#{Time.now}] 🛑 VALIDATION FAILURE: #{e.message}"
186-
puts "[#{Time.now}] 🛑 VALIDATION FAILURE - System stopping for investigation"
187-
puts "[#{Time.now}] Fix the validation issue and restart manually"
188-
exit 1
189-
190-
rescue EthBlockImporter::ValidationStalledError => e
191-
# Validation is behind - wait longer and keep trying
192-
Rails.logger.info "[#{Time.now}] ⏸️ VALIDATION BEHIND: #{e.message}"
193-
puts "[#{Time.now}] ⏸️ Validation is behind - waiting #{import_interval * 2}s for validation to catch up..."
194-
sleep import_interval * 2 # Wait longer when validation is behind
195-
# Don't exit - continue the loop to retry
196-
197-
rescue => e
198-
Rails.logger.error "Import error: #{e.class} - #{e.message}"
199-
Rails.logger.error e.backtrace.first(20).join("\n")
200-
201-
puts "[#{Time.now}] ❌ Error: #{e.message}"
202-
203-
# For other errors, wait and retry
204-
puts "[#{Time.now}] Retrying in #{import_interval}s..."
195+
sleep import_interval
205196
end
206-
207-
sleep import_interval
197+
ensure
198+
importer&.shutdown
208199
end
209200
end
210201
end

lib/l1_rpc_prefetcher.rb

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
class L1RpcPrefetcher
55
include Memery
6+
class BlockFetchError < StandardError; end
67
def initialize(ethereum_client:,
7-
ahead: ENV.fetch('L1_PREFETCH_FORWARD', Rails.env.test? ? 5 : 200).to_i,
8+
ahead: ENV.fetch('L1_PREFETCH_FORWARD', Rails.env.test? ? 5 : 20).to_i,
89
threads: ENV.fetch('L1_PREFETCH_THREADS', 2).to_i)
910
@eth = ethereum_client
1011
@ahead = ahead
@@ -13,13 +14,18 @@ def initialize(ethereum_client:,
1314
# Thread-safe collections and pool
1415
@pool = Concurrent::FixedThreadPool.new(threads)
1516
@promises = Concurrent::Map.new
17+
@last_chain_tip = current_l1_block_number
1618

1719
Rails.logger.info "L1RpcPrefetcher initialized with #{threads} threads"
1820
end
1921

2022
def ensure_prefetched(from_block)
21-
# Check current chain tip first
22-
latest = get_latest_block_number
23+
distance_from_last_tip = @last_chain_tip - from_block
24+
latest = if distance_from_last_tip > 10
25+
cached_l1_block_number
26+
else
27+
current_l1_block_number
28+
end
2329

2430
# Don't prefetch beyond chain tip
2531
to_block = [from_block + @ahead, latest].min
@@ -45,16 +51,18 @@ def fetch(block_number)
4551

4652
Rails.logger.debug "Fetching block #{block_number}, promise state: #{promise.state}"
4753

48-
begin
49-
result = promise.value!(timeout)
50-
Rails.logger.debug "Got result for block #{block_number}"
51-
52-
result
53-
rescue Concurrent::TimeoutError => e
54-
Rails.logger.error "Timeout fetching block #{block_number} after #{timeout}s"
54+
result = promise.value!(timeout)
55+
56+
if result.nil? || result == :not_ready_sentinel
5557
@promises.delete(block_number)
56-
raise
58+
message = result.nil? ?
59+
"Block #{block_number} fetch timed out after #{timeout}s" :
60+
"Block #{block_number} not yet available on L1"
61+
raise BlockFetchError.new(message)
5762
end
63+
64+
Rails.logger.debug "Got result for block #{block_number}"
65+
result
5866
end
5967

6068
def clear_older_than(min_keep)
@@ -96,21 +104,31 @@ def stats
96104

97105
def shutdown
98106
@pool.shutdown
99-
if @pool.wait_for_termination(30)
100-
Rails.logger.info "L1 RPC Prefetcher thread pool shut down successfully"
101-
else
102-
Rails.logger.warn "L1 RPC Prefetcher shutdown timed out, forcing kill"
103-
@pool.kill
107+
terminated = @pool.wait_for_termination(3)
108+
@pool.kill unless terminated
109+
@promises.each_pair do |_, promise|
110+
begin
111+
if promise.pending? && promise.respond_to?(:cancel)
112+
promise.cancel
113+
end
114+
rescue StandardError => e
115+
Rails.logger.warn "Failed cancelling promise during shutdown: #{e.message}"
116+
end
104117
end
118+
@promises.clear
119+
Rails.logger.info(
120+
terminated ?
121+
'L1 RPC Prefetcher thread pool shut down successfully' :
122+
'L1 RPC Prefetcher shutdown timed out after 3s, pool killed'
123+
)
124+
terminated
125+
rescue StandardError => e
126+
Rails.logger.error("Error during L1RpcPrefetcher shutdown: #{e.message}\n#{e.backtrace.join("\n")}")
127+
false
105128
end
106129

107130
private
108131

109-
def get_latest_block_number
110-
@eth.get_block_number
111-
end
112-
memoize :get_latest_block_number, ttl: 12.seconds
113-
114132
def enqueue_single(block_number)
115133
@promises.compute_if_absent(block_number) do
116134
Rails.logger.debug "Creating promise for block #{block_number}"
@@ -132,13 +150,12 @@ def fetch_job(block_number)
132150
client = @eth
133151

134152
Retriable.retriable(tries: 3, base_interval: 1, max_interval: 4) do
135-
136153
block = client.get_block(block_number, true)
137154

138155
# Handle case where block doesn't exist yet (normal when caught up)
139156
if block.nil?
140157
Rails.logger.debug "Block #{block_number} not yet available on L1"
141-
return { error: :not_ready, block_number: block_number }
158+
return :not_ready_sentinel
142159
end
143160

144161
receipts = client.get_transaction_receipts(block_number)
@@ -154,4 +171,13 @@ def fetch_job(block_number)
154171
}
155172
end
156173
end
157-
end
174+
175+
def current_l1_block_number
176+
@last_chain_tip = @eth.get_block_number
177+
end
178+
179+
def cached_l1_block_number
180+
current_l1_block_number
181+
end
182+
memoize :cached_l1_block_number, ttl: 12.seconds
183+
end

0 commit comments

Comments
 (0)