Skip to content

Commit ecb72a9

Browse files
julienrbrtalpe
andauthored
fix(syncing): combine data and headers from different blobs (#2696)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Bug found by @alpe. Combine data and headers from different blobs. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> --> --------- Co-authored-by: Alex Peters <[email protected]>
1 parent fc65124 commit ecb72a9

File tree

3 files changed

+147
-23
lines changed

3 files changed

+147
-23
lines changed

block/internal/syncing/da_retriever.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ type DARetriever struct {
3636
// calculate namespaces bytes once and reuse them
3737
namespaceBz []byte
3838
namespaceDataBz []byte
39+
40+
// transient cache, only full event need to be passed to the syncer
41+
// on restart, will be refetch as da height is updated by syncer
42+
pendingHeaders map[uint64]*types.SignedHeader
43+
pendingData map[uint64]*types.Data
44+
headerDAHeights map[uint64]uint64
3945
}
4046

4147
// NewDARetriever creates a new DA retriever
@@ -55,6 +61,9 @@ func NewDARetriever(
5561
logger: logger.With().Str("component", "da_retriever").Logger(),
5662
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
5763
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
64+
pendingHeaders: make(map[uint64]*types.SignedHeader),
65+
pendingData: make(map[uint64]*types.Data),
66+
headerDAHeights: make(map[uint64]uint64), // Track DA height for each header
5867
}
5968
}
6069

@@ -168,49 +177,53 @@ func (r *DARetriever) validateBlobResponse(res coreda.ResultRetrieve, daHeight u
168177

169178
// processBlobs processes retrieved blobs to extract headers and data and returns height events
170179
func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
171-
headers := make(map[uint64]*types.SignedHeader)
172-
dataMap := make(map[uint64]*types.Data)
173-
headerDAHeights := make(map[uint64]uint64) // Track DA height for each header
174-
175180
// Decode all blobs
176181
for _, bz := range blobs {
177182
if len(bz) == 0 {
178183
continue
179184
}
180185

181186
if header := r.tryDecodeHeader(bz, daHeight); header != nil {
182-
headers[header.Height()] = header
183-
headerDAHeights[header.Height()] = daHeight
187+
r.pendingHeaders[header.Height()] = header
188+
r.headerDAHeights[header.Height()] = daHeight
184189
continue
185190
}
186191

187192
if data := r.tryDecodeData(bz, daHeight); data != nil {
188-
dataMap[data.Height()] = data
193+
r.pendingData[data.Height()] = data
189194
}
190195
}
191196

192197
var events []common.DAHeightEvent
193198

194199
// Match headers with data and create events
195-
for height, header := range headers {
196-
data := dataMap[height]
200+
for height, header := range r.pendingHeaders {
201+
data := r.pendingData[height]
202+
includedHeight := r.headerDAHeights[height]
197203

198204
// Handle empty data case
199205
if data == nil {
200206
if r.isEmptyDataExpected(header) {
201207
data = r.createEmptyDataForHeader(ctx, header)
208+
delete(r.pendingHeaders, height)
209+
delete(r.headerDAHeights, height)
202210
} else {
211+
// keep header in pending headers until data lands
203212
r.logger.Debug().Uint64("height", height).Msg("header found but no matching data")
204213
continue
205214
}
215+
} else {
216+
delete(r.pendingHeaders, height)
217+
delete(r.pendingData, height)
218+
delete(r.headerDAHeights, height)
206219
}
207220

208221
// Create height event
209222
event := common.DAHeightEvent{
210223
Header: header,
211224
Data: data,
212225
DaHeight: daHeight,
213-
HeaderDaIncludedHeight: headerDAHeights[height],
226+
HeaderDaIncludedHeight: includedHeight,
214227
}
215228

216229
events = append(events, event)

block/internal/syncing/da_retriever_test.go

Lines changed: 123 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ import (
2828
)
2929

3030
// makeSignedHeaderBytes builds a valid SignedHeader and returns its binary encoding and the object
31-
func makeSignedHeaderBytes(t *testing.T, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte) ([]byte, *types.SignedHeader) {
31+
func makeSignedHeaderBytes(t *testing.T, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte, dataHash []byte) ([]byte, *types.SignedHeader) {
3232
hdr := &types.SignedHeader{
3333
Header: types.Header{
3434
BaseHeader: types.BaseHeader{ChainID: chainID, Height: height, Time: uint64(time.Now().Add(time.Duration(height) * time.Second).UnixNano())},
3535
AppHash: appHash,
36+
DataHash: dataHash,
3637
ProposerAddress: proposer,
3738
},
3839
Signer: types.Signer{PubKey: pub, Address: proposer},
@@ -115,12 +116,8 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
115116

116117
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
117118

118-
// Build one header and one data blob at same height
119-
_, lastState := types.State{}, types.State{}
120-
_ = lastState // placeholder to keep parity with helper pattern
121-
122-
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil)
123-
dataBin, _ := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
119+
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
120+
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, data.Hash())
124121

125122
events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
126123
require.Len(t, events, 1)
@@ -141,7 +138,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
141138
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
142139

143140
// Header with no data hash present should trigger empty data creation (per current logic)
144-
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil)
141+
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil)
145142

146143
events := r.processBlobs(context.Background(), [][]byte{hb}, 88)
147144
require.Len(t, events, 1)
@@ -160,7 +157,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {
160157
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
161158
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
162159

163-
hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil)
160+
hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil)
164161
gotH := r.tryDecodeHeader(hb, 123)
165162
require.NotNil(t, gotH)
166163
assert.Equal(t, sh.Hash().String(), gotH.Hash().String())
@@ -225,8 +222,8 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) {
225222
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
226223

227224
// Prepare header/data blobs
228-
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil)
229-
dataBin, _ := makeSignedDataBytes(t, gen.ChainID, 9, addr, pub, signer, 1)
225+
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 9, addr, pub, signer, 1)
226+
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, data.Hash())
230227

231228
cfg := config.DefaultConfig()
232229
cfg.DA.Namespace = "nsHdr"
@@ -255,3 +252,118 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) {
255252
assert.Equal(t, uint64(9), events[0].Header.Height())
256253
assert.Equal(t, uint64(9), events[0].Data.Height())
257254
}
255+
256+
func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
257+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
258+
st := store.New(ds)
259+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
260+
require.NoError(t, err)
261+
262+
addr, pub, signer := buildSyncTestSigner(t)
263+
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
264+
265+
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
266+
267+
// Create header and data for the same block height but from different DA heights
268+
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2)
269+
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, data.Hash())
270+
271+
// Process header from DA height 100 first
272+
events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100)
273+
require.Len(t, events1, 0, "should not create event yet - data is missing")
274+
275+
// Verify header is stored in pending headers
276+
require.Contains(t, r.pendingHeaders, uint64(5), "header should be stored as pending")
277+
require.Contains(t, r.headerDAHeights, uint64(5), "header DA height should be tracked")
278+
assert.Equal(t, uint64(100), r.headerDAHeights[5])
279+
280+
// Process data from DA height 102
281+
events2 := r.processBlobs(context.Background(), [][]byte{dataBin}, 102)
282+
require.Len(t, events2, 1, "should create event when matching data arrives")
283+
284+
event := events2[0]
285+
assert.Equal(t, uint64(5), event.Header.Height())
286+
assert.Equal(t, uint64(5), event.Data.Height())
287+
assert.Equal(t, uint64(102), event.DaHeight, "DaHeight should be the height where data was processed")
288+
assert.Equal(t, uint64(100), event.HeaderDaIncludedHeight, "HeaderDaIncludedHeight should be where header was included")
289+
290+
// Verify pending maps are cleared
291+
require.NotContains(t, r.pendingHeaders, uint64(5), "header should be removed from pending")
292+
require.NotContains(t, r.pendingData, uint64(5), "data should be removed from pending")
293+
require.NotContains(t, r.headerDAHeights, uint64(5), "header DA height should be removed")
294+
}
295+
296+
func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testing.T) {
297+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
298+
st := store.New(ds)
299+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
300+
require.NoError(t, err)
301+
302+
addr, pub, signer := buildSyncTestSigner(t)
303+
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
304+
305+
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
306+
307+
// Create multiple headers and data for different block heights
308+
data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1)
309+
data4Bin, data4 := makeSignedDataBytes(t, gen.ChainID, 4, addr, pub, signer, 2)
310+
data5Bin, data5 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1)
311+
312+
hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, data3.Hash())
313+
hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, data4.Hash())
314+
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, data5.Hash())
315+
316+
// Process multiple headers from DA height 200 - should be stored as pending
317+
events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
318+
require.Len(t, events1, 0, "should not create events yet - all data is missing")
319+
320+
// Verify all headers are stored in pending
321+
require.Contains(t, r.pendingHeaders, uint64(3), "header 3 should be pending")
322+
require.Contains(t, r.pendingHeaders, uint64(4), "header 4 should be pending")
323+
require.Contains(t, r.pendingHeaders, uint64(5), "header 5 should be pending")
324+
assert.Equal(t, uint64(200), r.headerDAHeights[3])
325+
assert.Equal(t, uint64(200), r.headerDAHeights[4])
326+
assert.Equal(t, uint64(200), r.headerDAHeights[5])
327+
328+
// Process some data from DA height 203 - should create partial events
329+
events2 := r.processBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203)
330+
require.Len(t, events2, 2, "should create events for heights 3 and 5")
331+
332+
// Sort events by height for consistent testing
333+
if events2[0].Header.Height() > events2[1].Header.Height() {
334+
events2[0], events2[1] = events2[1], events2[0]
335+
}
336+
337+
// Verify event for height 3
338+
assert.Equal(t, uint64(3), events2[0].Header.Height())
339+
assert.Equal(t, uint64(3), events2[0].Data.Height())
340+
assert.Equal(t, uint64(203), events2[0].DaHeight)
341+
assert.Equal(t, uint64(200), events2[0].HeaderDaIncludedHeight)
342+
343+
// Verify event for height 5
344+
assert.Equal(t, uint64(5), events2[1].Header.Height())
345+
assert.Equal(t, uint64(5), events2[1].Data.Height())
346+
assert.Equal(t, uint64(203), events2[1].DaHeight)
347+
assert.Equal(t, uint64(200), events2[1].HeaderDaIncludedHeight)
348+
349+
// Verify header 4 is still pending (no matching data yet)
350+
require.Contains(t, r.pendingHeaders, uint64(4), "header 4 should still be pending")
351+
require.NotContains(t, r.pendingHeaders, uint64(3), "header 3 should be removed from pending")
352+
require.NotContains(t, r.pendingHeaders, uint64(5), "header 5 should be removed from pending")
353+
354+
// Process remaining data from DA height 205
355+
events3 := r.processBlobs(context.Background(), [][]byte{data4Bin}, 205)
356+
require.Len(t, events3, 1, "should create event for height 4")
357+
358+
// Verify final event for height 4
359+
assert.Equal(t, uint64(4), events3[0].Header.Height())
360+
assert.Equal(t, uint64(4), events3[0].Data.Height())
361+
assert.Equal(t, uint64(205), events3[0].DaHeight)
362+
assert.Equal(t, uint64(200), events3[0].HeaderDaIncludedHeight)
363+
364+
// Verify all pending maps are now clear
365+
require.NotContains(t, r.pendingHeaders, uint64(4), "header 4 should be removed from pending")
366+
require.Len(t, r.pendingHeaders, 0, "all headers should be processed")
367+
require.Len(t, r.pendingData, 0, "all data should be processed")
368+
require.Len(t, r.headerDAHeights, 0, "all header DA heights should be cleared")
369+
}

block/internal/syncing/syncer_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,7 @@ func TestSyncLoopPersistState(t *testing.T) {
188188
// with n da blobs fetched
189189
for i := range myFutureDAHeight - myDAHeightOffset {
190190
chainHeight, daHeight := i, i+myDAHeightOffset
191-
_, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, nil)
192-
//_, sigData := makeSignedDataBytes(t, gen.ChainID, chainHeight, addr, pub, signer, 1)
191+
_, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, nil, nil)
193192
emptyData := types.Data{
194193
Metadata: &types.Metadata{
195194
ChainID: sigHeader.ChainID(),

0 commit comments

Comments
 (0)