Skip to content

Commit 45c9a50

Browse files
authored
Fix operator merkle root retrocompatibility (#996)
1 parent 047a65b commit 45c9a50

File tree

16 files changed

+4416
-32
lines changed

16 files changed

+4416
-32
lines changed

.github/workflows/build-go.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ jobs:
3535
run: make build_halo2_ipa_linux
3636
- name: Build Merkle Tree bindings
3737
run: make build_merkle_tree_linux
38+
- name: Build Old Merkle Tree bindings
39+
run: make build_merkle_tree_linux_old
3840
- name: Build operator
3941
run: go build operator/cmd/main.go
4042
- name: Build aggregator

Makefile

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,15 +541,29 @@ build_merkle_tree_macos:
541541
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.dylib operator/merkle_tree/lib/libmerkle_tree.dylib
542542
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree/lib/libmerkle_tree.a
543543

544+
build_merkle_tree_macos_old:
545+
@cd operator/merkle_tree_old/lib && cargo build $(RELEASE_FLAG)
546+
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.dylib operator/merkle_tree_old/lib/libmerkle_tree.dylib
547+
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree_old/lib/libmerkle_tree.a
548+
544549
build_merkle_tree_linux:
545550
@cd operator/merkle_tree/lib && cargo build $(RELEASE_FLAG)
546551
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.so operator/merkle_tree/lib/libmerkle_tree.so
547552
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree/lib/libmerkle_tree.a
548553

554+
build_merkle_tree_linux_old:
555+
@cd operator/merkle_tree_old/lib && cargo build $(RELEASE_FLAG)
556+
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.so operator/merkle_tree_old/lib/libmerkle_tree.so
557+
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree_old/lib/libmerkle_tree.a
558+
549559
test_merkle_tree_rust_ffi:
550560
@echo "Testing Merkle Tree Rust FFI source code..."
551561
@cd operator/merkle_tree/lib && RUST_MIN_STACK=83886080 cargo t --release
552562

563+
test_merkle_tree_rust_ffi_old:
564+
@echo "Testing Old Merkle Tree Rust FFI source code..."
565+
@cd operator/merkle_tree_old/lib && RUST_MIN_STACK=83886080 cargo t --release
566+
553567
test_merkle_tree_go_bindings_macos: build_merkle_tree_macos
554568
@echo "Testing Merkle Tree Go bindings..."
555569
go test ./operator/merkle_tree/... -v
@@ -558,6 +572,14 @@ test_merkle_tree_go_bindings_linux: build_merkle_tree_linux
558572
@echo "Testing Merkle Tree Go bindings..."
559573
go test ./operator/merkle_tree/... -v
560574

575+
test_merkle_tree_old_go_bindings_macos: build_merkle_tree_macos_old
576+
@echo "Testing Old Merkle Tree Go bindings..."
577+
go test ./operator/merkle_tree_old/... -v
578+
579+
test_merkle_tree_go_bindings_linux_old: build_merkle_tree_linux_old
580+
@echo "Testing Merkle Tree Go bindings..."
581+
go test ./operator/merkle_tree_old/... -v
582+
561583
__HALO2_KZG_FFI__: ##
562584
build_halo2_kzg_macos:
563585
@cd operator/halo2kzg/lib && cargo build $(RELEASE_FLAG)
@@ -632,6 +654,7 @@ build_all_ffi_macos: ## Build all FFIs for macOS
632654
@$(MAKE) build_sp1_macos
633655
@$(MAKE) build_risc_zero_macos
634656
@$(MAKE) build_merkle_tree_macos
657+
@$(MAKE) build_merkle_tree_macos_old
635658
@$(MAKE) build_halo2_ipa_macos
636659
@$(MAKE) build_halo2_kzg_macos
637660
@echo "All macOS FFIs built successfully."
@@ -641,6 +664,7 @@ build_all_ffi_linux: ## Build all FFIs for Linux
641664
@$(MAKE) build_sp1_linux
642665
@$(MAKE) build_risc_zero_linux
643666
@$(MAKE) build_merkle_tree_linux
667+
@$(MAKE) build_merkle_tree_linux_old
644668
@$(MAKE) build_halo2_ipa_linux
645669
@$(MAKE) build_halo2_kzg_linux
646670
@echo "All Linux FFIs built successfully."

aggregator/internal/pkg/subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
2424
func (agg *Aggregator) subscribeToNewTasks() error {
2525
var err error
2626

27-
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasks(agg.NewBatchChan)
27+
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan)
2828

2929
if err != nil {
3030
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)

core/chainio/avs_subscriber.go

Lines changed: 191 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,87 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
6464
logger: baseConfig.Logger,
6565
}, nil
6666
}
67+
68+
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) {
69+
// Create a new channel to receive new tasks
70+
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
71+
72+
// Subscribe to new tasks
73+
sub, err := subscribeToNewTasksV2(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
74+
if err != nil {
75+
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
76+
return nil, err
77+
}
78+
79+
subFallback, err := subscribeToNewTasksV2(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
80+
if err != nil {
81+
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
82+
return nil, err
83+
}
84+
85+
// create a new channel to foward errors
86+
errorChannel := make(chan error)
87+
88+
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
89+
90+
// Forward the new tasks to the provided channel
91+
go func() {
92+
defer pollLatestBatchTicker.Stop()
93+
newBatchMutex := &sync.Mutex{}
94+
batchesSet := make(map[[32]byte]struct{})
95+
for {
96+
select {
97+
case newBatch := <-internalChannel:
98+
s.processNewBatchV2(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
99+
case <-pollLatestBatchTicker.C:
100+
latestBatch, err := s.getLatestTaskFromEthereumV2()
101+
if err != nil {
102+
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
103+
continue
104+
}
105+
s.processNewBatchV2(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
106+
}
107+
}
108+
109+
}()
110+
111+
// Handle errors and resubscribe
112+
go func() {
113+
for {
114+
select {
115+
case err := <-sub.Err():
116+
s.logger.Warn("Error in new task subscription", "err", err)
117+
sub.Unsubscribe()
118+
sub, err = subscribeToNewTasksV2(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
119+
if err != nil {
120+
errorChannel <- err
121+
}
122+
case err := <-subFallback.Err():
123+
s.logger.Warn("Error in fallback new task subscription", "err", err)
124+
subFallback.Unsubscribe()
125+
subFallback, err = subscribeToNewTasksV2(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
126+
if err != nil {
127+
errorChannel <- err
128+
}
129+
}
130+
}
131+
}()
132+
133+
return errorChannel, nil
134+
}
67135

68-
func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
136+
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
69137
// Create a new channel to receive new tasks
70138
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)
71139

72140
// Subscribe to new tasks
73-
sub, err := subscribeToNewTasks(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
141+
sub, err := subscribeToNewTasksV3(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
74142
if err != nil {
75143
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
76144
return nil, err
77145
}
78146

79-
subFallback, err := subscribeToNewTasks(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
147+
subFallback, err := subscribeToNewTasksV3(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
80148
if err != nil {
81149
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
82150
return nil, err
@@ -95,14 +163,14 @@ func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemana
95163
for {
96164
select {
97165
case newBatch := <-internalChannel:
98-
s.processNewBatch(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
166+
s.processNewBatchV3(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
99167
case <-pollLatestBatchTicker.C:
100-
latestBatch, err := s.getLatestTaskFromEthereum()
168+
latestBatch, err := s.getLatestTaskFromEthereumV3()
101169
if err != nil {
102170
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
103171
continue
104172
}
105-
s.processNewBatch(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
173+
s.processNewBatchV3(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
106174
}
107175
}
108176

@@ -115,14 +183,14 @@ func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemana
115183
case err := <-sub.Err():
116184
s.logger.Warn("Error in new task subscription", "err", err)
117185
sub.Unsubscribe()
118-
sub, err = subscribeToNewTasks(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
186+
sub, err = subscribeToNewTasksV3(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
119187
if err != nil {
120188
errorChannel <- err
121189
}
122190
case err := <-subFallback.Err():
123191
s.logger.Warn("Error in fallback new task subscription", "err", err)
124192
subFallback.Unsubscribe()
125-
subFallback, err = subscribeToNewTasks(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
193+
subFallback, err = subscribeToNewTasksV3(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
126194
if err != nil {
127195
errorChannel <- err
128196
}
@@ -133,7 +201,29 @@ func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemana
133201
return errorChannel, nil
134202
}
135203

136-
func subscribeToNewTasks(
204+
func subscribeToNewTasksV2(
205+
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
206+
newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2,
207+
logger sdklogging.Logger,
208+
) (event.Subscription, error) {
209+
for i := 0; i < MaxRetries; i++ {
210+
sub, err := serviceManager.WatchNewBatchV2(
211+
&bind.WatchOpts{}, newTaskCreatedChan, nil,
212+
)
213+
if err != nil {
214+
logger.Warn("Failed to subscribe to new AlignedLayer tasks", "err", err)
215+
time.Sleep(RetryInterval)
216+
continue
217+
}
218+
219+
logger.Info("Subscribed to new AlignedLayer tasks")
220+
return sub, nil
221+
}
222+
223+
return nil, fmt.Errorf("failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
224+
}
225+
226+
func subscribeToNewTasksV3(
137227
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
138228
newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3,
139229
logger sdklogging.Logger,
@@ -155,7 +245,7 @@ func subscribeToNewTasks(
155245
return nil, fmt.Errorf("failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
156246
}
157247

158-
func (s *AvsSubscriber) processNewBatch(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) {
248+
func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {
159249
newBatchMutex.Lock()
160250
defer newBatchMutex.Unlock()
161251

@@ -181,10 +271,100 @@ func (s *AvsSubscriber) processNewBatch(batch *servicemanager.ContractAlignedLay
181271
}
182272
}
183273

274+
func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) {
275+
newBatchMutex.Lock()
276+
defer newBatchMutex.Unlock()
277+
278+
batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
279+
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
280+
281+
if _, ok := batchesSet[batchIdentifierHash]; !ok {
282+
s.logger.Info("Received new task",
283+
"batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
284+
"senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
285+
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]),)
286+
287+
batchesSet[batchIdentifierHash] = struct{}{}
288+
newTaskCreatedChan <- batch
289+
290+
// Remove the batch from the set after RemoveBatchFromSetInterval time
291+
go func() {
292+
time.Sleep(RemoveBatchFromSetInterval)
293+
newBatchMutex.Lock()
294+
delete(batchesSet, batchIdentifierHash)
295+
newBatchMutex.Unlock()
296+
}()
297+
}
298+
}
299+
300+
// getLatestTaskFromEthereum queries the blockchain for the latest task using the FilterLogs method.
301+
// The alternative to this is using the FilterNewBatch method from the contract's filterer, but it requires
302+
// to iterate over all the logs, which is not efficient and not needed since we only need the latest task.
303+
func (s *AvsSubscriber) getLatestTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {
304+
latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(context.Background())
305+
if err != nil {
306+
latestBlock, err = s.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
307+
if err != nil {
308+
return nil, fmt.Errorf("failed to get latest block number: %w", err)
309+
}
310+
}
311+
312+
var fromBlock uint64
313+
314+
if latestBlock < BlockInterval {
315+
fromBlock = 0
316+
} else {
317+
fromBlock = latestBlock - BlockInterval
318+
}
319+
320+
alignedLayerServiceManagerABI, err := abi.JSON(strings.NewReader(servicemanager.ContractAlignedLayerServiceManagerMetaData.ABI))
321+
if err != nil {
322+
return nil, fmt.Errorf("failed to parse ABI: %w", err)
323+
}
324+
325+
// We just care about the NewBatch event
326+
newBatchEvent := alignedLayerServiceManagerABI.Events["NewBatchV2"]
327+
if newBatchEvent.ID == (ethcommon.Hash{}) {
328+
return nil, fmt.Errorf("NewBatch event not found in ABI")
329+
}
330+
331+
query := ethereum.FilterQuery{
332+
FromBlock: big.NewInt(int64(fromBlock)),
333+
ToBlock: big.NewInt(int64(latestBlock)),
334+
Addresses: []ethcommon.Address{s.AlignedLayerServiceManagerAddr},
335+
Topics: [][]ethcommon.Hash{{newBatchEvent.ID, {}}},
336+
}
337+
338+
logs, err := s.AvsContractBindings.ethClient.FilterLogs(context.Background(), query)
339+
if err != nil {
340+
logs, err = s.AvsContractBindings.ethClientFallback.FilterLogs(context.Background(), query)
341+
if err != nil {
342+
return nil, fmt.Errorf("failed to get logs: %w", err)
343+
}
344+
}
345+
346+
if len(logs) == 0 {
347+
return nil, fmt.Errorf("no logs found")
348+
}
349+
350+
lastLog := logs[len(logs)-1]
351+
352+
var latestTask servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
353+
err = alignedLayerServiceManagerABI.UnpackIntoInterface(&latestTask, "NewBatchV2", lastLog.Data)
354+
if err != nil {
355+
return nil, fmt.Errorf("failed to unpack log data: %w", err)
356+
}
357+
358+
// The second topic is the batch merkle root, as it is an indexed variable in the contract
359+
latestTask.BatchMerkleRoot = lastLog.Topics[1]
360+
361+
return &latestTask, nil
362+
}
363+
184364
// getLatestTaskFromEthereum queries the blockchain for the latest task using the FilterLogs method.
185365
// The alternative to this is using the FilterNewBatch method from the contract's filterer, but it requires
186366
// to iterate over all the logs, which is not efficient and not needed since we only need the latest task.
187-
func (s *AvsSubscriber) getLatestTaskFromEthereum() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
367+
func (s *AvsSubscriber) getLatestTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
188368
latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(context.Background())
189369
if err != nil {
190370
latestBlock, err = s.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
lib/libmerkle_tree.a
2+
lib/libmerkle_tree.dylib

0 commit comments

Comments
 (0)