-
Notifications
You must be signed in to change notification settings - Fork 240
refactor: migrate to blob api and remove da interface (approach 2) #2893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Introduced a new `fetchBlobs` function to streamline the process of retrieving blobs from the DA client. - Replaced direct calls to `client.DA.GetIDs` and `client.DA.Get` with the new `fetchBlobs` function to handle errors more gracefully. - Added custom error types for better clarity when blobs are not found or when a height is in the future. - Updated blob processing logic to work with the new batch structure, improving efficiency and readability. - Modified tests to reflect changes in ID creation and splitting, ensuring compatibility with the new blob handling logic.
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
|
Claude finished @randygrok's task —— View job Code Review CompleteI've completed a comprehensive review of this large refactoring PR that migrates from the DA interface to the Blob API. This is a significant architectural change that removes ~5000 lines while adding ~2200 new lines across 81 files. SummaryThis PR successfully refactors the codebase to use a streamlined Blob API instead of the previous DA interface. The migration centralizes blob operations into Strengths:
Areas of Concern:
Critical Issues1. Concurrency Safety in da_submitter.go (block/internal/submitting/da_submitter.go:534-581)The for i, item := range items {
go func(idx int, itm T) {
sem <- struct{}{}
defer func() { <-sem }()
// ...
}(i, item)
}Issue: If context is canceled or an error occurs, goroutines may not complete before the function returns, causing:
Recommendation: Use a proper worker pool pattern with context cancellation or wait for all goroutines to complete even on error. 2. Missing Input Validation (block/internal/da/client.go:78-114)The func (c *client) Submit(ctx context.Context, data [][]byte, namespace []byte, options []byte) datypes.ResultSubmit {
var blobSize uint64
for _, b := range data {
blobSize += uint64(len(b)) // No check for nil or empty
}Issues:
Recommendation: if data == nil {
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: datypes.StatusError,
Message: "data cannot be nil",
},
}
}3. Error String Matching is Fragile (block/internal/da/client.go:131-144)Error classification uses substring matching: switch {
case strings.Contains(err.Error(), datypes.ErrTxTimedOut.Error()):
code = datypes.StatusNotIncludedInBlock
case strings.Contains(err.Error(), datypes.ErrTxAlreadyInMempool.Error()):
code = datypes.StatusAlreadyInMempoolIssues:
Recommendation: Use 4. Unbounded Memory Growth (da/cmd/local-da/local.go:20-44)The type LocalDA struct {
mu *sync.RWMutex
blobs map[uint64][]*blob.Blob
timestamps map[uint64]time.Time
// ...
}Issues:
Recommendation: Add configurable pruning (e.g., keep last N heights or blobs from last X hours). 5. Potential Panic in Block Submission (block/internal/submitting/da_submitter.go:433-444)switch res.Code {
case datypes.StatusSuccess:
submitted := items[:res.SubmittedCount] // Potential panic if SubmittedCount > len(items)Issue: If the DA layer returns Recommendation: Add bounds checking: if res.SubmittedCount > uint64(len(items)) {
s.logger.Error().Msg("DA returned invalid submitted count")
return fmt.Errorf("invalid submitted count: %d > %d", res.SubmittedCount, len(items))
}Security Concerns1. Missing Namespace Validation (pkg/namespace/namespace.go:46-54)
func NewNamespaceV0(data []byte) (*Namespace, error) {
if len(data) > NamespaceVersionZeroDataSize {
return nil, fmt.Errorf("data too long...")
}
ns := &Namespace{Version: NamespaceVersionZero}
copy(ns.ID[NamespaceVersionZeroPrefixSize:], data) // Silent truncation if data is short
return ns, nil
}Issue: Short input data is silently accepted and zero-padded, which could lead to namespace collisions. Recommendation: Explicitly document this behavior or require exact length matching. 2. JSON Unmarshalling Without Size Limits (block/internal/da/client.go:116-126)var submitOpts blob.SubmitOptions
if len(options) > 0 {
if err := json.Unmarshal(options, &submitOpts); err != nil {
// ...
}
}Issue: No maximum size check on Recommendation: Add size validation (e.g., max 1KB for options). 3. Address Selector Security (block/internal/submitting/da_submitter.go:317-353)The optionsMap["signer_address"] = signingAddress // No validationIssue: Invalid addresses could be silently accepted, leading to failed transactions or potential security issues. Recommendation: Validate address format before merging. Performance & Resource Management1. Inefficient Blob Cloning (da/cmd/local-da/local.go:147-154)func cloneBlob(b *blob.Blob) *blob.Blob {
if b == nil {
return nil
}
// Creates new blob with deep copyEvery Recommendation: Consider returning read-only views or implementing copy-on-write semantics. 2. Context Timeout Management (block/internal/da/client.go:202-203)getIDsCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()Issue: If Recommendation: Use 3. Concurrent Marshaling (block/internal/submitting/da_submitter.go:548)Semaphore limits to 32 workers but doesn't consider actual resource usage: sem := make(chan struct{}, 32)Issue: For very large marshaling operations, 32 concurrent goroutines might consume excessive CPU/memory. Recommendation: Make worker count configurable or base it on runtime.NumCPU(). Error Handling & Resilience1. Insufficient Context Cancellation Checks (block/internal/da/client.go:78-186)The func (c *client) Submit(ctx context.Context, data [][]byte, ...) datypes.ResultSubmit {
// No ctx.Err() check before processing
for _, raw := range data {
blobs[i], err = blob.NewBlobV0(ns, raw) // Expensive operationRecommendation: Add 2. Inconsistent Error Wrapping (block/internal/syncing/da_retriever.go)Some errors are wrapped, others aren't: return fmt.Errorf("DA retrieval failed: %s", res.Message) // String formatting
return fmt.Errorf("%w: height from future", datypes.ErrHeightFromFuture) // Proper wrappingRecommendation: Consistently use 3. Silent Failures (block/internal/syncing/da_retriever.go:154-164)Invalid headers/data are silently discarded: if header := r.tryDecodeHeader(bz, daHeight); header != nil {
// ...
} else {
// Silently continue - no logging of decode failure
}Recommendation: Add debug logging for decode failures to aid troubleshooting. Code Quality & Architecture1. Duplicate Namespace Conversion LogicNamespace conversion appears in multiple places with slight variations:
Recommendation: Consolidate into shared utility functions. 2. Missing Documentation (pkg/blob/blob.go)Critical functions lack detailed documentation: // MakeID constructs a blob ID by prefixing the commitment with the height (little endian).
func MakeID(height uint64, commitment Commitment) []byte {Issue: Doesn't explain the ID format, when to use it, or how it's used across the system. Recommendation: Add comprehensive package-level documentation explaining the blob lifecycle and ID scheme. 3. Magic Numbers (block/internal/submitting/da_submitter.go)submissionTimeout = 60 * time.Second
initialBackoff = 100 * time.Millisecond
sem := make(chan struct{}, 32) // Why 32?Recommendation: Document rationale or make configurable. Testing Concerns1. Test Coverage RegressionCoverage dropped 6.84% (64.81% → 57.97%). Key areas with low coverage:
Recommendation: Add integration tests covering the full blob submission and retrieval flow. 2. Missing Negative Test CasesMany tests only cover happy paths. Missing tests for:
3. Mock Overuse (block/internal/submitting/da_submitter_retry_test.go)Tests use mocks extensively, which may not catch real integration issues: api.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("*blob.SubmitOptions"))Recommendation: Balance unit tests with integration tests using real blob API implementations. Response to Reviewer's Question@tac0turtle's question (da/jsonrpc/client.go:16): "are we able to use subscribe here?"Answer: The current implementation uses synchronous RPC calls for blob operations (Submit, GetAll, GetProof, Included). Adding subscription support would require:
Recommendation: Unless there's a specific performance issue with polling or a need for real-time notifications, the current synchronous approach is simpler and more reliable. If subscriptions are added to celestia-node's blob RPC, we could add them in a future PR with proper connection lifecycle management. Minor Issues1. Unused Field (da/jsonrpc/client.go:17-18)Logger zerolog.Logger // Not used in Client struct
MaxBlobSize uint64 // Not validated in client methods2. Inconsistent Error Messages
3. HTTP Server Security (da/jsonrpc/server.go:73-76)srv: &http.Server{
Addr: address + ":" + port,
ReadHeaderTimeout: 2 * time.Second, // Good!
// Missing: ReadTimeout, WriteTimeout, IdleTimeout, MaxHeaderBytes
}Recommendation: Add comprehensive timeouts to prevent resource exhaustion attacks. 4. Nolint Comment Without Explanation (da/jsonrpc/server.go:105)//nolint:errcheck
go s.srv.Serve(listener)Recommendation: Document why error can be safely ignored. Positive Observations
Recommendations SummaryHigh Priority
Medium Priority
Low Priority
Overall Assessment: This is a solid refactoring that simplifies the architecture and improves code clarity. The main concerns are around production robustness (concurrency safety, input validation, error handling) and test coverage regression. With the critical issues addressed, this would be ready to merge. Todo List:
|
|
Full nodes syncing from DA were not persisting the DAIncludedHeightKey metadata, causing the E2E test to fail when querying this value. The submitter (sequencer) already persists this key when verifying DA inclusion. This change adds the same logic to the syncer so full nodes also track which blocks have been DA included.
The LocalDA's GetAll method was missing the height-from-future check that existed in the old GetIDs method. This caused the syncer to iterate indefinitely instead of backing off when caught up with DA. Also simplified IsHeightDAIncluded by removing unused variable.
This reverts commit 3c0a15c.
|
|
||
| // API defines the jsonrpc service module API | ||
| // API exposes the blob RPC methods used by the node. | ||
| type API struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we able to use subscribe here?
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2893 +/- ##
==========================================
- Coverage 64.81% 57.88% -6.94%
==========================================
Files 81 81
Lines 7347 7303 -44
==========================================
- Hits 4762 4227 -535
- Misses 2043 2558 +515
+ Partials 542 518 -24
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Fix broken markdown links by correcting the relative path depth from ../../../ to ../../ for linking to execution/grpc and sequencers/single READMEs.
Overview