Skip to content

Commit 4f4f2c0

Browse files
authored
feat(syncer): fetch from p2p and da (#2712)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview #2712 (comment) Always fetch both p2p and DA in order to mark height fetched by DA da included. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent ec4a475 commit 4f4f2c0

File tree

10 files changed

+165
-173
lines changed

10 files changed

+165
-173
lines changed

block/internal/cache/generic_cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64) {
104104
c.daIncluded.Store(hash, daHeight)
105105
}
106106

107+
// removeDAIncluded removes the DA-included status of the hash
108+
func (c *Cache[T]) removeDAIncluded(hash string) {
109+
c.daIncluded.Delete(hash)
110+
}
111+
107112
const (
108113
itemsByHeightFilename = "items_by_height.gob"
109114
hashesFilename = "hashes.gob"

block/internal/cache/manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Manager interface {
4343
SetHeaderSeen(hash string)
4444
GetHeaderDAIncluded(hash string) (uint64, bool)
4545
SetHeaderDAIncluded(hash string, daHeight uint64)
46+
RemoveHeaderDAIncluded(hash string)
4647

4748
// Data operations
4849
IsDataSeen(hash string) bool
@@ -141,6 +142,10 @@ func (m *implementation) SetHeaderDAIncluded(hash string, daHeight uint64) {
141142
m.headerCache.setDAIncluded(hash, daHeight)
142143
}
143144

145+
func (m *implementation) RemoveHeaderDAIncluded(hash string) {
146+
m.headerCache.removeDAIncluded(hash)
147+
}
148+
144149
// Data operations
145150
func (m *implementation) IsDataSeen(hash string) bool {
146151
return m.dataCache.isSeen(hash)

block/internal/common/types.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,4 @@ type DAHeightEvent struct {
88
Data *types.Data
99
// DaHeight corresponds to the highest DA included height between the Header and Data.
1010
DaHeight uint64
11-
// HeaderDaIncludedHeight corresponds to the DA height at which the Header was included.
12-
HeaderDaIncludedHeight uint64
1311
}

block/internal/syncing/da_retriever.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ type DARetriever struct {
3535

3636
// transient cache, only full event need to be passed to the syncer
3737
// on restart, will be refetch as da height is updated by syncer
38-
pendingHeaders map[uint64]*types.SignedHeader
39-
pendingData map[uint64]*types.Data
40-
headerDAHeights map[uint64]uint64
38+
pendingHeaders map[uint64]*types.SignedHeader
39+
pendingData map[uint64]*types.Data
4140
}
4241

4342
// NewDARetriever creates a new DA retriever
@@ -59,7 +58,6 @@ func NewDARetriever(
5958
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
6059
pendingHeaders: make(map[uint64]*types.SignedHeader),
6160
pendingData: make(map[uint64]*types.Data),
62-
headerDAHeights: make(map[uint64]uint64), // Track DA height for each header
6361
}
6462
}
6563

@@ -166,7 +164,6 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
166164

167165
if header := r.tryDecodeHeader(bz, daHeight); header != nil {
168166
r.pendingHeaders[header.Height()] = header
169-
r.headerDAHeights[header.Height()] = daHeight
170167
continue
171168
}
172169

@@ -180,14 +177,12 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
180177
// Match headers with data and create events
181178
for height, header := range r.pendingHeaders {
182179
data := r.pendingData[height]
183-
includedHeight := r.headerDAHeights[height]
184180

185181
// Handle empty data case
186182
if data == nil {
187183
if r.isEmptyDataExpected(header) {
188184
data = r.createEmptyDataForHeader(ctx, header)
189185
delete(r.pendingHeaders, height)
190-
delete(r.headerDAHeights, height)
191186
} else {
192187
// keep header in pending headers until data lands
193188
r.logger.Debug().Uint64("height", height).Msg("header found but no matching data")
@@ -196,15 +191,13 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
196191
} else {
197192
delete(r.pendingHeaders, height)
198193
delete(r.pendingData, height)
199-
delete(r.headerDAHeights, height)
200194
}
201195

202196
// Create height event
203197
event := common.DAHeightEvent{
204-
Header: header,
205-
Data: data,
206-
DaHeight: daHeight,
207-
HeaderDaIncludedHeight: includedHeight,
198+
Header: header,
199+
Data: data,
200+
DaHeight: daHeight,
208201
}
209202

210203
events = append(events, event)
@@ -240,9 +233,17 @@ func (r *DARetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH
240233
return nil
241234
}
242235

243-
// note, we cannot mark the header as DA included
244-
// we haven't done any signature verification check here
245-
// signature verification happens with data.
236+
// Optimistically mark as DA included
237+
// This has to be done for all fetched DA headers prior to validation because P2P does not confirm
238+
// da inclusion. This is not an issue, as an invalid header will be rejected. There cannot be hash collisions.
239+
headerHash := header.Hash().String()
240+
r.cache.SetHeaderDAIncluded(headerHash, daHeight)
241+
242+
r.logger.Info().
243+
Str("header_hash", headerHash).
244+
Uint64("da_height", daHeight).
245+
Uint64("height", header.Height()).
246+
Msg("optimistically marked header as DA included")
246247

247248
return header
248249
}

block/internal/syncing/da_retriever_test.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,14 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
196196
assert.Equal(t, uint64(2), events[0].Header.Height())
197197
assert.Equal(t, uint64(2), events[0].Data.Height())
198198
assert.Equal(t, uint64(77), events[0].DaHeight)
199-
assert.Equal(t, uint64(77), events[0].HeaderDaIncludedHeight)
199+
200+
hHeight, ok := r.cache.GetHeaderDAIncluded(events[0].Header.Hash().String())
201+
assert.True(t, ok)
202+
assert.Equal(t, uint64(77), hHeight)
203+
204+
dHeight, ok := r.cache.GetDataDAIncluded(events[0].Data.DACommitment().String())
205+
assert.True(t, ok)
206+
assert.Equal(t, uint64(77), dHeight)
200207
}
201208

202209
func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
@@ -217,6 +224,14 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
217224
assert.Equal(t, uint64(3), events[0].Header.Height())
218225
assert.NotNil(t, events[0].Data)
219226
assert.Equal(t, uint64(88), events[0].DaHeight)
227+
228+
hHeight, ok := r.cache.GetHeaderDAIncluded(events[0].Header.Hash().String())
229+
assert.True(t, ok)
230+
assert.Equal(t, uint64(88), hHeight)
231+
232+
// empty data is not marked as data included (the submitter components does handle the empty data case)
233+
_, ok = r.cache.GetDataDAIncluded(events[0].Data.DACommitment().String())
234+
assert.False(t, ok)
220235
}
221236

222237
func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {
@@ -346,8 +361,6 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
346361

347362
// Verify header is stored in pending headers
348363
require.Contains(t, r.pendingHeaders, uint64(5), "header should be stored as pending")
349-
require.Contains(t, r.headerDAHeights, uint64(5), "header DA height should be tracked")
350-
assert.Equal(t, uint64(100), r.headerDAHeights[5])
351364

352365
// Process data from DA height 102
353366
events2 := r.processBlobs(context.Background(), [][]byte{dataBin}, 102)
@@ -357,12 +370,10 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
357370
assert.Equal(t, uint64(5), event.Header.Height())
358371
assert.Equal(t, uint64(5), event.Data.Height())
359372
assert.Equal(t, uint64(102), event.DaHeight, "DaHeight should be the height where data was processed")
360-
assert.Equal(t, uint64(100), event.HeaderDaIncludedHeight, "HeaderDaIncludedHeight should be where header was included")
361373

362374
// Verify pending maps are cleared
363375
require.NotContains(t, r.pendingHeaders, uint64(5), "header should be removed from pending")
364376
require.NotContains(t, r.pendingData, uint64(5), "data should be removed from pending")
365-
require.NotContains(t, r.headerDAHeights, uint64(5), "header DA height should be removed")
366377
}
367378

368379
func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testing.T) {
@@ -393,9 +404,6 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
393404
require.Contains(t, r.pendingHeaders, uint64(3), "header 3 should be pending")
394405
require.Contains(t, r.pendingHeaders, uint64(4), "header 4 should be pending")
395406
require.Contains(t, r.pendingHeaders, uint64(5), "header 5 should be pending")
396-
assert.Equal(t, uint64(200), r.headerDAHeights[3])
397-
assert.Equal(t, uint64(200), r.headerDAHeights[4])
398-
assert.Equal(t, uint64(200), r.headerDAHeights[5])
399407

400408
// Process some data from DA height 203 - should create partial events
401409
events2 := r.processBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203)
@@ -410,13 +418,11 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
410418
assert.Equal(t, uint64(3), events2[0].Header.Height())
411419
assert.Equal(t, uint64(3), events2[0].Data.Height())
412420
assert.Equal(t, uint64(203), events2[0].DaHeight)
413-
assert.Equal(t, uint64(200), events2[0].HeaderDaIncludedHeight)
414421

415422
// Verify event for height 5
416423
assert.Equal(t, uint64(5), events2[1].Header.Height())
417424
assert.Equal(t, uint64(5), events2[1].Data.Height())
418425
assert.Equal(t, uint64(203), events2[1].DaHeight)
419-
assert.Equal(t, uint64(200), events2[1].HeaderDaIncludedHeight)
420426

421427
// Verify header 4 is still pending (no matching data yet)
422428
require.Contains(t, r.pendingHeaders, uint64(4), "header 4 should still be pending")
@@ -431,11 +437,9 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
431437
assert.Equal(t, uint64(4), events3[0].Header.Height())
432438
assert.Equal(t, uint64(4), events3[0].Data.Height())
433439
assert.Equal(t, uint64(205), events3[0].DaHeight)
434-
assert.Equal(t, uint64(200), events3[0].HeaderDaIncludedHeight)
435440

436441
// Verify all pending maps are now clear
437442
require.NotContains(t, r.pendingHeaders, uint64(4), "header 4 should be removed from pending")
438443
require.Len(t, r.pendingHeaders, 0, "all headers should be processed")
439444
require.Len(t, r.pendingData, 0, "all data should be processed")
440-
require.Len(t, r.headerDAHeights, 0, "all header DA heights should be cleared")
441445
}

block/internal/syncing/p2p_handler.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,9 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei
8686

8787
// Create height event
8888
event := common.DAHeightEvent{
89-
Header: header,
90-
Data: data,
91-
DaHeight: 0, // P2P events don't have DA height context
92-
HeaderDaIncludedHeight: 0, // P2P events don't have DA height context
89+
Header: header,
90+
Data: data,
91+
DaHeight: 0, // P2P events don't have DA height context
9392
}
9493

9594
events = append(events, event)
@@ -139,10 +138,9 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh
139138

140139
// Create height event
141140
event := common.DAHeightEvent{
142-
Header: header,
143-
Data: data,
144-
DaHeight: 0, // P2P events don't have DA height context
145-
HeaderDaIncludedHeight: 0, // P2P events don't have DA height context
141+
Header: header,
142+
Data: data,
143+
DaHeight: 0, // P2P events don't have DA height context
146144
}
147145

148146
events = append(events, event)

0 commit comments

Comments
 (0)