Skip to content

Commit ddaf146

Browse files
committed
apply suggestions and enhnace s3 download to use memory buffer
1 parent 359c41b commit ddaf146

File tree

12 files changed

+681
-314
lines changed

12 files changed

+681
-314
lines changed

address-filter/ARCHITECTURE.md

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Restricted Address Filtering - Architecture
2+
3+
## Overview
4+
5+
The `restrictedaddr` package provides compliance-based address filtering for Nitro sequencers. It maintains a list of restricted address hashes (loaded from S3) and blocks transactions involving those addresses.
6+
7+
## Architecture Diagram
8+
9+
```mermaid
10+
flowchart LR
11+
subgraph DataSync["Data Synchronization"]
12+
direction TB
13+
S3[("S3 Bucket")]
14+
S3Sync["S3Syncer"]
15+
S3 -->|"HeadObject<br/>(ETag check)"| S3Sync
16+
S3 -->|"GetObject<br/>(if changed)"| S3Sync
17+
end
18+
19+
subgraph Storage["In-Memory Storage"]
20+
direction TB
21+
HashStore["HashStore<br/>atomic.Pointer"]
22+
LRU["LRU Cache<br/>(10k entries)"]
23+
HashStore --> LRU
24+
end
25+
26+
subgraph TxProcessing["Transaction Processing"]
27+
direction TB
28+
UserTx(("User Tx"))
29+
ExecEngine["ExecutionEngine"]
30+
TxFilter["txfilter"]
31+
Sequencer["Sequencer"]
32+
33+
UserTx --> ExecEngine
34+
ExecEngine --> TxFilter
35+
TxFilter -->|"allowed"| Sequencer
36+
end
37+
38+
S3Sync -->|"atomic swap"| HashStore
39+
TxFilter -.->|"IsRestricted?"| HashStore
40+
41+
style S3 fill:#f5f5f5,stroke:#424242,stroke-width:2px,color:#212121
42+
style S3Sync fill:#fff,stroke:#424242,stroke-width:1px,color:#212121
43+
style HashStore fill:#e3f2fd,stroke:#1565c0,stroke-width:2px,color:#0d47a1
44+
style LRU fill:#e3f2fd,stroke:#1565c0,stroke-width:1px,color:#0d47a1
45+
style UserTx fill:#fff,stroke:#424242,stroke-width:1px,color:#212121
46+
style ExecEngine fill:#f5f5f5,stroke:#424242,stroke-width:1px,color:#212121
47+
style TxFilter fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#bf360c
48+
style Sequencer fill:#f5f5f5,stroke:#424242,stroke-width:1px,color:#212121
49+
50+
style DataSync fill:#fafafa,stroke:#9e9e9e,stroke-width:1px,color:#212121
51+
style Storage fill:#e3f2fd,stroke:#1565c0,stroke-width:1px,color:#0d47a1
52+
style TxProcessing fill:#fafafa,stroke:#9e9e9e,stroke-width:1px,color:#212121
53+
```
54+
55+
## Data Flow
56+
57+
| Flow | Description |
58+
|------|-------------|
59+
| **Sync** | S3 → S3Syncer → HashStore (atomic swap on ETag change) |
60+
| **Lookup** | txfilter → HashStore → LRU cache or `sha256(salt \|\| addr)` hash lookup |
61+
| **Transaction** | User → ExecutionEngine → txfilter → Sequencer |
62+
63+
## Components
64+
65+
| Component | Role |
66+
|-----------|------|
67+
| **Service** | Orchestrates lifecycle: initialization, polling, shutdown |
68+
| **S3Syncer** | Polls S3 via HeadObject, downloads on ETag change (multipart, 10 concurrent parts) |
69+
| **HashStore** | Lock-free storage with `atomic.Pointer`, per-snapshot LRU cache (10k entries) |
70+
| **txfilter** | Blocks transactions touching restricted addresses |
71+
72+
## Package Structure
73+
74+
```
75+
restrictedaddr/
76+
├── config.go # Configuration struct and validation
77+
├── service.go # Service lifecycle (Initialize, Start, Stop)
78+
├── s3_sync.go # S3 polling and concurrent download
79+
├── hash_store.go # Lock-free hash storage with LRU caching
80+
└── service_test.go # Unit and integration tests
81+
```
82+
83+
## Configuration
84+
85+
| Option | Default | Description |
86+
|--------|---------|-------------|
87+
| `--restricted-addr.enable` | `false` | Enable the service |
88+
| `--restricted-addr.s3-bucket` | - | S3 bucket name (required if enabled) |
89+
| `--restricted-addr.s3-region` | - | AWS region (required if enabled) |
90+
| `--restricted-addr.s3-object-key` | - | Path to hash list JSON (required if enabled) |
91+
| `--restricted-addr.s3-access-key` | - | AWS access key (optional, uses default chain) |
92+
| `--restricted-addr.s3-secret-key` | - | AWS secret key (optional) |
93+
| `--restricted-addr.poll-interval` | `5m` | Interval between S3 ETag checks |
94+
95+
## S3 Hash List Format
96+
97+
```json
98+
{
99+
"salt": "hex_encoded_salt",
100+
"address_hashes": [
101+
{"hash": "hex_encoded_32byte_sha256"},
102+
{"hash": "hex_encoded_32byte_sha256"}
103+
]
104+
}
105+
```
106+
107+
**Hash computation:** `SHA256(salt || address_bytes)` - addresses are never stored in plaintext.
108+
109+
## Service Lifecycle
110+
111+
1. **NewService** - Validates config, creates S3 client
112+
2. **Initialize** - Blocking initial download (node won't start if this fails)
113+
3. **Start** - Begins background polling goroutine
114+
4. **StopAndWait** - Graceful shutdown
115+
116+
The service initializes **early** in the node startup sequence (before inbox tracker, transaction streamer, etc.) to ensure filtering is active before any transactions are processed.
117+
118+
## HashStore Design
119+
120+
- **Lock-free reads:** Uses `atomic.Pointer[hashData]` for concurrent access
121+
- **Double-buffering:** New data prepared while old data still serves requests
122+
- **Per-snapshot LRU:** Each atomic swap includes a fresh 10k-entry cache
123+
- **Lookup methods:**
124+
- `IsRestricted(addr)` - Single address check
125+
- `IsAnyRestricted(addrs)` - True if any address restricted
126+
- `IsAllRestricted(addrs)` - True only if all addresses restricted
127+
128+
## Transaction Filtering Points
129+
130+
Transactions are blocked if any of these addresses are restricted:
131+
132+
| Operation | Addresses Checked |
133+
|-----------|-------------------|
134+
| Transfer | Sender, Recipient |
135+
| CALL | Target contract |
136+
| STATICCALL | Target contract |
137+
| CREATE/CREATE2 | New contract address |
138+
| SELFDESTRUCT | Beneficiary address |
139+
140+
## S3 Polling Strategy
141+
142+
1. **HeadObject** call to check ETag (lightweight, no data transfer)
143+
2. If ETag unchanged, skip download
144+
3. If ETag changed:
145+
- Download to temp file (multipart: 32MB parts, 10 concurrent, 5 retries/part)
146+
- Parse and validate JSON
147+
- Atomic pointer swap into HashStore
148+
4. Repeat after `poll-interval`

address-filter/config.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2025, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package addressfilter
5+
6+
import (
7+
"errors"
8+
"time"
9+
10+
"github.com/spf13/pflag"
11+
12+
"github.com/offchainlabs/nitro/util/s3syncer"
13+
)
14+
15+
type Config struct {
16+
Enable bool `koanf:"enable"`
17+
S3 s3syncer.Config `koanf:"s3"`
18+
PollInterval time.Duration `koanf:"poll-interval"`
19+
}
20+
21+
var DefaultConfig = Config{
22+
Enable: false,
23+
PollInterval: 5 * time.Minute,
24+
}
25+
26+
func ConfigAddOptions(prefix string, f *pflag.FlagSet) {
27+
f.Bool(prefix+".enable", DefaultConfig.Enable, "enable restricted address synchronization service")
28+
s3syncer.ConfigAddOptions(prefix+".s3", f)
29+
f.Duration(prefix+".poll-interval", DefaultConfig.PollInterval, "interval between polling S3 for hash list updates")
30+
}
31+
32+
func (c *Config) Validate() error {
33+
if !c.Enable {
34+
return nil
35+
}
36+
37+
if err := c.S3.Validate(); err != nil {
38+
return err
39+
}
40+
41+
if c.PollInterval <= 0 {
42+
return errors.New("restricted-addr.poll-interval must be positive")
43+
}
44+
45+
return nil
46+
}
Lines changed: 5 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2025, Offchain Labs, Inc.
22
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
33

4-
package restrictedaddr
4+
package addressfilter
55

66
import (
77
"crypto/sha256"
@@ -17,7 +17,7 @@ import (
1717
// The cache is included here so it gets swapped atomically with the hash data.
1818
type hashData struct {
1919
salt []byte
20-
hashes map[[32]byte]struct{}
20+
hashes map[common.Hash]struct{}
2121
digest string
2222
loadedAt time.Time
2323
cache *lru.Cache[common.Address, bool] // LRU cache for address lookup results
@@ -43,7 +43,7 @@ func NewHashStoreWithCacheSize(cacheSize int) *HashStore {
4343
cacheSize: cacheSize,
4444
}
4545
h.data.Store(&hashData{
46-
hashes: make(map[[32]byte]struct{}),
46+
hashes: make(map[common.Hash]struct{}),
4747
cache: lru.NewCache[common.Address, bool](cacheSize),
4848
})
4949
return h
@@ -52,10 +52,10 @@ func NewHashStoreWithCacheSize(cacheSize int) *HashStore {
5252
// Load atomically swaps in a new hash list.
5353
// This is called after a new hash list has been downloaded and parsed.
5454
// A new LRU cache is created for the new data, ensuring atomic consistency.
55-
func (h *HashStore) Load(salt []byte, hashes [][32]byte, digest string) {
55+
func (h *HashStore) Load(salt []byte, hashes []common.Hash, digest string) {
5656
newData := &hashData{
5757
salt: salt,
58-
hashes: make(map[[32]byte]struct{}, len(hashes)),
58+
hashes: make(map[common.Hash]struct{}, len(hashes)),
5959
digest: digest,
6060
loadedAt: time.Now(),
6161
cache: lru.NewCache[common.Address, bool](h.cacheSize),
@@ -88,58 +88,6 @@ func (h *HashStore) IsRestricted(addr common.Address) bool {
8888
return restricted
8989
}
9090

91-
// IsAllRestricted checks if all provided addresses are in the restricted list
92-
// from same hash-store snapshot. Results are cached in the LRU cache.
93-
func (h *HashStore) IsAllRestricted(addrs []common.Address) bool {
94-
data := h.data.Load() // Atomic load - no lock needed
95-
if len(data.salt) == 0 {
96-
return false // Not initialized
97-
}
98-
for _, addr := range addrs {
99-
// Check cache first (cache is per-data snapshot)
100-
if restricted, ok := data.cache.Get(addr); ok {
101-
if !restricted {
102-
return false
103-
}
104-
continue
105-
}
106-
107-
hash := sha256.Sum256(append(data.salt, addr.Bytes()...))
108-
_, restricted := data.hashes[hash]
109-
data.cache.Add(addr, restricted)
110-
if !restricted {
111-
return false
112-
}
113-
}
114-
return true
115-
}
116-
117-
// IsAnyRestricted checks if any of the provided addresses are in the restricted list
118-
// from same hash-store snapshot. Results are cached in the LRU cache.
119-
func (h *HashStore) IsAnyRestricted(addrs []common.Address) bool {
120-
data := h.data.Load() // Atomic load - no lock needed
121-
if len(data.salt) == 0 {
122-
return false // Not initialized
123-
}
124-
for _, addr := range addrs {
125-
// Check cache first (cache is per-data snapshot)
126-
if restricted, ok := data.cache.Get(addr); ok {
127-
if restricted {
128-
return true
129-
}
130-
continue
131-
}
132-
133-
hash := sha256.Sum256(append(data.salt, addr.Bytes()...))
134-
_, restricted := data.hashes[hash]
135-
data.cache.Add(addr, restricted)
136-
if restricted {
137-
return true
138-
}
139-
}
140-
return false
141-
}
142-
14391
// Digest Return the digest of the current loaded hashstore.
14492
func (h *HashStore) Digest() string {
14593
return h.data.Load().digest

address-filter/s3_sync.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2025, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package addressfilter
5+
6+
import (
7+
"context"
8+
"encoding/hex"
9+
"encoding/json"
10+
"fmt"
11+
12+
"github.com/ethereum/go-ethereum/common"
13+
"github.com/ethereum/go-ethereum/log"
14+
15+
"github.com/offchainlabs/nitro/util/s3syncer"
16+
)
17+
18+
// hashListPayload represents the JSON structure of the hash list file used for unmarshalling.
19+
type hashListPayload struct {
20+
Salt string `json:"salt"`
21+
AddressHashes []struct {
22+
Hash string `json:"hash"`
23+
} `json:"address_hashes"`
24+
}
25+
26+
type S3SyncManager struct {
27+
Syncer *s3syncer.Syncer
28+
store *HashStore
29+
}
30+
31+
func NewS3SyncManager(ctx context.Context, config *Config, store *HashStore) (*S3SyncManager, error) {
32+
s := &S3SyncManager{
33+
store: store,
34+
}
35+
syncer, err := s3syncer.NewSyncer(
36+
ctx,
37+
&config.S3,
38+
s.handleHashListData,
39+
// These are initial settings that can be tuned as needed.
40+
s3syncer.WithDownloadConfig(s3syncer.DownloadConfig{
41+
PartSizeMB: 100,
42+
Concurrency: 10,
43+
PartBodyMaxRetries: 5,
44+
}))
45+
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
s.Syncer = syncer
51+
return s, nil
52+
}
53+
54+
// handleHashListData parses the downloaded JSON data and loads it into the store.
55+
func (s *S3SyncManager) handleHashListData(data []byte, digest string) error {
56+
salt, hashes, err := parseHashListJSON(data)
57+
if err != nil {
58+
return fmt.Errorf("failed to parse hash list: %w", err)
59+
}
60+
61+
s.store.Load(salt, hashes, digest)
62+
log.Info("loaded restricted addr list", "hash_count", len(hashes), "etag", digest, "size_bytes", len(data))
63+
return nil
64+
}
65+
66+
// parseHashListJSON parses the JSON hash list file.
67+
// Expected format: {"salt": "hex...", "address_hashes": [{"hash": "hex1"}, {"hash": "hex2"}, ...]}
68+
func parseHashListJSON(data []byte) ([]byte, []common.Hash, error) {
69+
var payload hashListPayload
70+
if err := json.Unmarshal(data, &payload); err != nil {
71+
return nil, nil, fmt.Errorf("JSON unmarshal failed: %w", err)
72+
}
73+
74+
salt, err := hex.DecodeString(payload.Salt)
75+
if err != nil {
76+
return nil, nil, fmt.Errorf("invalid salt hex: %w", err)
77+
}
78+
79+
hashes := make([]common.Hash, len(payload.AddressHashes))
80+
for i, h := range payload.AddressHashes {
81+
hashBytes, err := hex.DecodeString(h.Hash)
82+
if err != nil {
83+
return nil, nil, fmt.Errorf("invalid hash hex at index %d: %w", i, err)
84+
}
85+
if len(hashBytes) != 32 {
86+
return nil, nil, fmt.Errorf("invalid hash length at index %d: got %d, want 32", i, len(hashBytes))
87+
}
88+
copy(hashes[i][:], hashBytes)
89+
}
90+
91+
return salt, hashes, nil
92+
}

0 commit comments

Comments
 (0)