Skip to content

Commit c045daf

Browse files
authored
fix(types): add timeout per request in retrieve helper (#2726)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Downloading many blobs would timeout after 10s. We should timeout per request and for all requests. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 0679230 commit c045daf

File tree

3 files changed

+63
-9
lines changed

3 files changed

+63
-9
lines changed

block/internal/syncing/da_retriever.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import (
1919
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
2020
)
2121

22-
const dAFetcherTimeout = 10 * time.Second
22+
// defaultDATimeout is the default timeout for DA retrieval operations
23+
const defaultDATimeout = 10 * time.Second
2324

2425
// DARetriever handles DA retrieval operations for syncing
2526
type DARetriever struct {
@@ -64,9 +65,6 @@ func NewDARetriever(
6465
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
6566
func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
6667
r.logger.Debug().Uint64("da_height", daHeight).Msg("retrieving from DA")
67-
ctx, cancel := context.WithTimeout(ctx, dAFetcherTimeout)
68-
defer cancel()
69-
7068
blobsResp, err := r.fetchBlobs(ctx, daHeight)
7169
if err != nil {
7270
return nil, err
@@ -84,14 +82,14 @@ func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co
8482
// fetchBlobs retrieves blobs from the DA layer
8583
func (r *DARetriever) fetchBlobs(ctx context.Context, daHeight uint64) (coreda.ResultRetrieve, error) {
8684
// Retrieve from both namespaces
87-
headerRes := types.RetrieveWithHelpers(ctx, r.da, r.logger, daHeight, r.namespaceBz)
85+
headerRes := types.RetrieveWithHelpers(ctx, r.da, r.logger, daHeight, r.namespaceBz, defaultDATimeout)
8886

8987
// If namespaces are the same, return header result
9088
if bytes.Equal(r.namespaceBz, r.namespaceDataBz) {
9189
return headerRes, r.validateBlobResponse(headerRes, daHeight)
9290
}
9391

94-
dataRes := types.RetrieveWithHelpers(ctx, r.da, r.logger, daHeight, r.namespaceDataBz)
92+
dataRes := types.RetrieveWithHelpers(ctx, r.da, r.logger, daHeight, r.namespaceDataBz, defaultDATimeout)
9593

9694
// Validate responses
9795
headerErr := r.validateBlobResponse(headerRes, daHeight)

types/da.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,19 @@ func SubmitWithHelpers(
114114
// RetrieveWithHelpers performs blob retrieval using the underlying DA layer,
115115
// handling error mapping to produce a ResultRetrieve.
116116
// It mimics the logic previously found in da.DAClient.Retrieve.
117+
// requestTimeout defines the timeout for the each retrieval request.
117118
func RetrieveWithHelpers(
118119
ctx context.Context,
119120
da coreda.DA,
120121
logger zerolog.Logger,
121122
dataLayerHeight uint64,
122123
namespace []byte,
124+
requestTimeout time.Duration,
123125
) coreda.ResultRetrieve {
124126
// 1. Get IDs
125-
idsResult, err := da.GetIDs(ctx, dataLayerHeight, namespace)
127+
getIDsCtx, cancel := context.WithTimeout(ctx, requestTimeout)
128+
defer cancel()
129+
idsResult, err := da.GetIDs(getIDsCtx, dataLayerHeight, namespace)
126130
if err != nil {
127131
// Handle specific "not found" error
128132
if strings.Contains(err.Error(), coreda.ErrBlobNotFound.Error()) {
@@ -177,7 +181,9 @@ func RetrieveWithHelpers(
177181
for i := 0; i < len(idsResult.IDs); i += batchSize {
178182
end := min(i+batchSize, len(idsResult.IDs))
179183

180-
batchBlobs, err := da.Get(ctx, idsResult.IDs[i:end], namespace)
184+
getBlobsCtx, cancel := context.WithTimeout(ctx, requestTimeout)
185+
batchBlobs, err := da.Get(getBlobsCtx, idsResult.IDs[i:end], namespace)
186+
cancel()
181187
if err != nil {
182188
// Handle errors during Get
183189
logger.Error().Uint64("height", dataLayerHeight).Int("num_ids", len(idsResult.IDs)).Err(err).Msg("Retrieve helper: Failed to get blobs")

types/da_test.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func TestRetrieveWithHelpers(t *testing.T) {
229229
mockDA.On("Get", mock.Anything, tc.getIDsResult.IDs, mock.Anything).Return(mockBlobs, tc.getBlobsErr)
230230
}
231231

232-
result := types.RetrieveWithHelpers(context.Background(), mockDA, logger, dataLayerHeight, encodedNamespace.Bytes())
232+
result := types.RetrieveWithHelpers(context.Background(), mockDA, logger, dataLayerHeight, encodedNamespace.Bytes(), 5*time.Second)
233233

234234
assert.Equal(t, tc.expectedCode, result.Code)
235235
assert.Equal(t, tc.expectedHeight, result.Height)
@@ -246,3 +246,53 @@ func TestRetrieveWithHelpers(t *testing.T) {
246246
})
247247
}
248248
}
249+
250+
func TestRetrieveWithHelpers_Timeout(t *testing.T) {
251+
logger := zerolog.Nop()
252+
dataLayerHeight := uint64(100)
253+
encodedNamespace := coreda.NamespaceFromString("test-namespace")
254+
255+
t.Run("timeout during GetIDs", func(t *testing.T) {
256+
mockDA := mocks.NewMockDA(t)
257+
258+
// Mock GetIDs to block until context is cancelled
259+
mockDA.On("GetIDs", mock.Anything, dataLayerHeight, mock.Anything).Run(func(args mock.Arguments) {
260+
ctx := args.Get(0).(context.Context)
261+
<-ctx.Done() // Wait for context cancellation
262+
}).Return(nil, context.DeadlineExceeded)
263+
264+
// Use a very short timeout to ensure it triggers
265+
result := types.RetrieveWithHelpers(context.Background(), mockDA, logger, dataLayerHeight, encodedNamespace.Bytes(), 1*time.Millisecond)
266+
267+
assert.Equal(t, coreda.StatusError, result.Code)
268+
assert.Contains(t, result.Message, "failed to get IDs")
269+
assert.Contains(t, result.Message, "context deadline exceeded")
270+
mockDA.AssertExpectations(t)
271+
})
272+
273+
t.Run("timeout during Get", func(t *testing.T) {
274+
mockDA := mocks.NewMockDA(t)
275+
mockIDs := [][]byte{[]byte("id1")}
276+
mockTimestamp := time.Now()
277+
278+
// Mock GetIDs to succeed
279+
mockDA.On("GetIDs", mock.Anything, dataLayerHeight, mock.Anything).Return(&coreda.GetIDsResult{
280+
IDs: mockIDs,
281+
Timestamp: mockTimestamp,
282+
}, nil)
283+
284+
// Mock Get to block until context is cancelled
285+
mockDA.On("Get", mock.Anything, mockIDs, mock.Anything).Run(func(args mock.Arguments) {
286+
ctx := args.Get(0).(context.Context)
287+
<-ctx.Done() // Wait for context cancellation
288+
}).Return(nil, context.DeadlineExceeded)
289+
290+
// Use a very short timeout to ensure it triggers
291+
result := types.RetrieveWithHelpers(context.Background(), mockDA, logger, dataLayerHeight, encodedNamespace.Bytes(), 1*time.Millisecond)
292+
293+
assert.Equal(t, coreda.StatusError, result.Code)
294+
assert.Contains(t, result.Message, "failed to get blobs for batch")
295+
assert.Contains(t, result.Message, "context deadline exceeded")
296+
mockDA.AssertExpectations(t)
297+
})
298+
}

0 commit comments

Comments
 (0)