Skip to content

Commit c364b17

Browse files
authored
fix(pkg/sync): fix block sync p2p for follower nodes (#2725)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview There was an issue with the block sync p2p where follower nodes were falling back on DA sync and never catching up to the aggregator. The root cause was that the sync service had started subscribing **before** the service was initialized in `initFromP2PWithRetry`. This PR separates out the construction of the components with when the service is started so that it happens after initialization has happened. Here is a snippet of test output from the p2p test using a commit with this fix ``` testsuite_test.go:215: Waiting for first follower to sync... testsuite_test.go:347: Follower 1: height=23, aggregator=24, delta=1 testsuite_test.go:355: All 1 follower nodes are now in sync (within 5 blocks of aggregator) testsuite_test.go:64: Evolve chain started liveness_test.go:57: Testing block production... liveness_test.go:61: Testing transaction submission and query... logger.go:146: 2025-10-02T13:39:26.869+0100 INFO Exec {"validator": true, "i": 0, "chain_id": "evolve-test", "test": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "image": "evolve-gm:latest", "test_name": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "command": "gmd keys add bob-gewnuv --coin-type 118 --keyring-backend test --home /var/cosmos-chain/evolve", "env,": [], "hostname": "TestDockerIntegrationTestSuite_._tLivenessWithCelestiaDA-sbyipj", "container": "TestDockerIntegrationTestSuite_TestLivenessWithCelestiaDA-sbyipj"} logger.go:146: 2025-10-02T13:39:27.261+0100 INFO Exec {"validator": true, "i": 0, "chain_id": "evolve-test", "test": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "image": "evolve-gm:latest", "test_name": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "command": "gmd keys show --address bob-gewnuv --home /var/cosmos-chain/evolve --keyring-backend test", "env,": [], "hostname": "TestDockerIntegrationTestSuite_._tLivenessWithCelestiaDA-awrotn", "container": "TestDockerIntegrationTestSuite_TestLivenessWithCelestiaDA-awrotn"} logger.go:146: 2025-10-02T13:39:32.715+0100 INFO broadcasted message {"wallet_address": "gm120wyqka8x8a6l2r72wdyjmhcn7w7740e963cpm", "message_types": ["MsgSend"], "tx_hash": "CE14E712CEB4C9D9382094146BA3C02C08F9273603767F0EF0FDB067E189C567"} logger.go:146: 2025-10-02T13:39:37.786+0100 INFO Exec {"validator": true, "i": 0, "chain_id": "evolve-test", "test": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "image": "evolve-gm:latest", "test_name": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "command": "gmd keys add carol --coin-type 118 --keyring-backend test --home /var/cosmos-chain/evolve", "env,": [], "hostname": "TestDockerIntegrationTestSuite_._tLivenessWithCelestiaDA-cerorr", "container": "TestDockerIntegrationTestSuite_TestLivenessWithCelestiaDA-cerorr"} logger.go:146: 2025-10-02T13:39:38.198+0100 INFO Exec {"validator": true, "i": 0, "chain_id": "evolve-test", "test": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "image": "evolve-gm:latest", "test_name": "TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA", "command": "gmd keys show --address carol --home /var/cosmos-chain/evolve --keyring-backend test", "env,": [], "hostname": "TestDockerIntegrationTestSuite_._tLivenessWithCelestiaDA-rhipbd", "container": "TestDockerIntegrationTestSuite_TestLivenessWithCelestiaDA-rhipbd"} liveness_test.go:78: Querying Bob's initial balance... liveness_test.go:85: Sending 100stake from Bob to Carol... logger.go:146: 2025-10-02T13:39:43.624+0100 INFO broadcasted message {"wallet_address": "gm1lve80wgtljp3rcqlfn8s8dv2wz8z6k9jttv3dd", "message_types": ["MsgSend"], "tx_hash": "0C76963C168E1BF24388CC1ACDBDD649978DFC3CE4E56F9102A6235AB31D8653"} liveness_test.go:64: Test completed successfully setup.go:188: Pruned 4 volumes, reclaiming approximately 103.3 MB setup.go:223: Pruned unused networks: [tastora-jvkkbvke] --- PASS: TestDockerIntegrationTestSuite (101.28s) --- PASS: TestDockerIntegrationTestSuite/TestLivenessWithCelestiaDA (101.28s) PASS ok github.com/evstack/ev-abci/tests/integration 101.849s ``` Previously the delta was always staying at ~10-15 and the nodes would never catch up. With this change we get `testsuite_test.go:347: Follower 1: height=23, aggregator=24, delta=1` <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent c045daf commit c364b17

File tree

1 file changed

+41
-13
lines changed

1 file changed

+41
-13
lines changed

pkg/sync/sync_service.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,13 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context,
169169

170170
// Start is a part of Service interface.
171171
func (syncService *SyncService[H]) Start(ctx context.Context) error {
172-
peerIDs, err := syncService.setupP2P(ctx)
172+
// setup P2P infrastructure, but don't start Subscriber yet.
173+
peerIDs, err := syncService.setupP2PInfrastructure(ctx)
173174
if err != nil {
174-
return fmt.Errorf("failed to setup syncer P2P: %w", err)
175+
return fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err)
175176
}
176177

178+
// create syncer, must be before initFromP2PWithRetry which calls startSyncer.
177179
if syncService.syncer, err = newSyncer(
178180
syncService.ex,
179181
syncService.store,
@@ -183,9 +185,21 @@ func (syncService *SyncService[H]) Start(ctx context.Context) error {
183185
return fmt.Errorf("failed to create syncer: %w", err)
184186
}
185187

186-
// Initialize from P2P, blocking until syncing can actually start for follower nodes.
188+
// initialize stores from P2P (blocking until genesis is fetched for followers)
187189
// Aggregators (no peers configured) return immediately and initialize on first produced block.
188-
return syncService.initFromP2PWithRetry(ctx, peerIDs)
190+
if err := syncService.initFromP2PWithRetry(ctx, peerIDs); err != nil {
191+
return fmt.Errorf("failed to initialize stores from P2P: %w", err)
192+
}
193+
194+
// start the subscriber, stores are guaranteed to have genesis for followers.
195+
//
196+
// NOTE: we must start the subscriber after the syncer is initialized in initFromP2PWithRetry to ensure p2p syncing
197+
// works correctly.
198+
if err := syncService.startSubscriber(ctx); err != nil {
199+
return fmt.Errorf("failed to start subscriber: %w", err)
200+
}
201+
202+
return nil
189203
}
190204

191205
// startSyncer starts the SyncService's syncer
@@ -222,11 +236,13 @@ func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) err
222236
return nil
223237
}
224238

225-
// setupP2P sets up the P2P configuration for the SyncService and starts the necessary components.
226-
// it returns IDs of peers in configuration (seeds) and available in the network.
227-
func (syncService *SyncService[H]) setupP2P(ctx context.Context) ([]peer.ID, error) {
239+
// setupP2PInfrastructure sets up the P2P infrastructure (Exchange, ExchangeServer, Store)
240+
// but does not start the Subscriber. Returns peer IDs for later use.
241+
func (syncService *SyncService[H]) setupP2PInfrastructure(ctx context.Context) ([]peer.ID, error) {
228242
ps := syncService.p2p.PubSub()
229243
var err error
244+
245+
// Create subscriber but DON'T start it yet
230246
syncService.sub, err = goheaderp2p.NewSubscriber[H](
231247
ps,
232248
pubsub.DefaultMsgIdFn,
@@ -237,15 +253,10 @@ func (syncService *SyncService[H]) setupP2P(ctx context.Context) ([]peer.ID, err
237253
return nil, err
238254
}
239255

240-
if err := syncService.sub.Start(ctx); err != nil {
241-
return nil, fmt.Errorf("error while starting subscriber: %w", err)
242-
}
243-
if syncService.topicSubscription, err = syncService.sub.Subscribe(); err != nil {
244-
return nil, fmt.Errorf("error while subscribing: %w", err)
245-
}
246256
if err := syncService.store.Start(ctx); err != nil {
247257
return nil, fmt.Errorf("error while starting store: %w", err)
248258
}
259+
249260
_, _, network, err := syncService.p2p.Info()
250261
if err != nil {
251262
return nil, fmt.Errorf("error while fetching the network: %w", err)
@@ -258,7 +269,9 @@ func (syncService *SyncService[H]) setupP2P(ctx context.Context) ([]peer.ID, err
258269
if err := syncService.p2pServer.Start(ctx); err != nil {
259270
return nil, fmt.Errorf("error while starting p2p server: %w", err)
260271
}
272+
261273
peerIDs := syncService.getPeerIDs()
274+
262275
if syncService.ex, err = newP2PExchange[H](syncService.p2p.Host(), peerIDs, networkID, syncService.genesis.ChainID, syncService.p2p.ConnectionGater()); err != nil {
263276
return nil, fmt.Errorf("error while creating exchange: %w", err)
264277
}
@@ -268,6 +281,21 @@ func (syncService *SyncService[H]) setupP2P(ctx context.Context) ([]peer.ID, err
268281
return peerIDs, nil
269282
}
270283

284+
// startSubscriber starts the Subscriber and subscribes to the P2P topic.
285+
// This should be called AFTER stores are initialized to ensure proper validation.
286+
func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error {
287+
if err := syncService.sub.Start(ctx); err != nil {
288+
return fmt.Errorf("error while starting subscriber: %w", err)
289+
}
290+
291+
var err error
292+
if syncService.topicSubscription, err = syncService.sub.Subscribe(); err != nil {
293+
return fmt.Errorf("error while subscribing: %w", err)
294+
}
295+
296+
return nil
297+
}
298+
271299
// initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism.
272300
// If trusted hash is available, it fetches the trusted header/block (by hash) from peers.
273301
// Otherwise, it tries to fetch the genesis header/block by height.

0 commit comments

Comments
 (0)