Skip to content

Commit 36a9a93

Browse files
committed
feat: [wip] periodically fetch from ProofAggregatorService events
1 parent 1b22fd7 commit 36a9a93

File tree

1 file changed

+74
-6
lines changed

1 file changed

+74
-6
lines changed

explorer/lib/explorer/periodically.ex

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ defmodule Explorer.Periodically do
1616
one_second = 1000
1717
seconds_in_an_hour = 60 * 60
1818

19-
:timer.send_interval(one_second * 60, :next_batch_progress) # every minute
20-
:timer.send_interval(one_second * 12, :batches) # every 12 seconds, once per block
21-
:timer.send_interval(one_second * seconds_in_an_hour, :restakings) # every 1 hour
19+
# every minute
20+
:timer.send_interval(one_second * 60, :next_batch_progress)
21+
# every 12 seconds, once per block
22+
:timer.send_interval(one_second * 12, :batches)
23+
# every 1 hour
24+
:timer.send_interval(one_second * seconds_in_an_hour, :restakings)
25+
:timer.send_interval(one_second * seconds_in_an_hour, :aggregated_proofs)
2226
end
2327

2428
# Reads and process last blocks for operators and restaking changes
@@ -37,13 +41,14 @@ defmodule Explorer.Periodically do
3741

3842
def handle_info(:next_batch_progress, state) do
3943
Logger.debug("handling block progress timer")
40-
remaining_time = ExplorerWeb.Helpers.get_next_scheduled_batch_remaining_time()
44+
remaining_time = ExplorerWeb.Helpers.get_next_scheduled_batch_remaining_time()
45+
4146
PubSub.broadcast(Explorer.PubSub, "update_views", %{
4247
next_scheduled_batch_remaining_time_percentage:
4348
ExplorerWeb.Helpers.get_next_scheduled_batch_remaining_time_percentage(remaining_time),
4449
next_scheduled_batch_remaining_time: remaining_time
45-
})
46-
50+
})
51+
4752
{:noreply, state}
4853
end
4954

@@ -68,6 +73,68 @@ defmodule Explorer.Periodically do
6873
{:noreply, %{state | batches_count: new_count}}
6974
end
7075

76+
def handle_info(:aggregated_proofs, state) do
77+
# This runs every 1hr, so reading 300 means going back exactly one hour
78+
# We add a few blocks more to make sure we don't lose anything
79+
read_block_qty = 310
80+
latest_block_number = AlignedLayerServiceManager.get_latest_block_number()
81+
read_from_block = max(0, latest_block_number - read_block_qty)
82+
83+
## What we need to do:
84+
## 1. Calculate the logs to fetch from block number
85+
## 2. Fetch the events: NewAggregatedProof
86+
## 3. For the successful verifications query the blob from a beacon client
87+
## 4. When getting the blob data, split in chunks of 32,
88+
## 5. Store each hash in proof hash pointing to the aggregated proof number
89+
## 6. Store the info in db
90+
91+
process_aggregated_proofs(read_from_block)
92+
end
93+
94+
def process_aggregated_proofs(from_block) do
95+
"Processing aggregated proofs" |> Logger.debug()
96+
97+
aggregated_proofs =
98+
AlignedProofAggregationService.get_aggregated_proof_event()
99+
|> Enum.map(fn x ->
100+
Map.merge(
101+
x,
102+
%{
103+
blob_data:
104+
AlignedProofAggregationService.get_blob_data_from_versioned_hash(
105+
x.blob_versioned_hash
106+
)
107+
}
108+
)
109+
end)
110+
111+
# Split the blob data in chunks of 32 to get the number of leaves (number of proofs) in the aggregated proof
112+
proofs_leaves =
113+
Enum.map(aggregated_proofs, fn x -> chunk_every(x.blob_data, 2) end)
114+
115+
# Store aggregated proofs to db
116+
aggregated_proofs
117+
|> Enum.zip(proofs_leaves)
118+
|> Enum.map(fn %{agg_proof, leaves} ->
119+
Map.merge(agg_proof, %{number_of_proofs: length(leaves)})
120+
|> Enum.each(fn x -> AggregatedProof.insert_or_update(x) end)
121+
end)
122+
123+
# Store each individual proof
124+
aggregated_proofs
125+
|> Enum.zip(proofs_leaves)
126+
|> Enum.map(fn %{agg_proof, leaves} ->
127+
Enum.map(leaves, fn leaf ->
128+
%{
129+
AggregatedProof.insert_proof(%{
130+
aggregated_proof_number: agg_proof.number,
131+
proof_hash: leaf
132+
})
133+
}
134+
end)
135+
end)
136+
end
137+
71138
def process_batches(fromBlock, toBlock) do
72139
"Processing from block #{fromBlock} to block #{toBlock}..." |> Logger.debug()
73140

@@ -108,6 +175,7 @@ defmodule Explorer.Periodically do
108175
else
109176
{:error, reason} ->
110177
Logger.error("Error processing batch #{batch.merkle_root}. Error: #{inspect(reason)}")
178+
111179
# no changes in DB
112180
nil ->
113181
nil

0 commit comments

Comments
 (0)