Skip to content

Commit efc1cdb

Browse files
authored
feat(compliance-chains): s3 polling + hash-store #NIT-4269 (#4234)
* feat(compliance-chains): s3 polling + hash-store # Conflicts: # arbnode/node.go * remove address-checker+add lru-cache * fix lint and add changelog * move restrictedaddr service to execution node * move restrictedaddr service to execution node 2 * apply suggestions and enhnace s3 download to use memory buffer * Fix address-filter configci * Remove un-needed md file * rename addressfilter and change copyright * update changelog * enhance PR, fix issues * update changelog and move service enable check mechanism * update changelog 2 * apply suggestions * apply suggestions 2 * apply suggestions 3 * fix lint * fix ci-test
1 parent 893facb commit efc1cdb

File tree

10 files changed

+1130
-0
lines changed

10 files changed

+1130
-0
lines changed

addressfilter/config.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2026, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package addressfilter
5+
6+
import (
7+
"errors"
8+
"time"
9+
10+
"github.com/spf13/pflag"
11+
12+
"github.com/offchainlabs/nitro/util/s3syncer"
13+
)
14+
15+
type Config struct {
16+
Enable bool `koanf:"enable"`
17+
S3 s3syncer.Config `koanf:"s3"`
18+
PollInterval time.Duration `koanf:"poll-interval"`
19+
CacheSize int `koanf:"cache-size"`
20+
}
21+
22+
var DefaultConfig = Config{
23+
Enable: false,
24+
PollInterval: 5 * time.Minute,
25+
CacheSize: 10000,
26+
S3: s3syncer.DefaultS3Config,
27+
}
28+
29+
func ConfigAddOptions(prefix string, f *pflag.FlagSet) {
30+
f.Bool(prefix+".enable", DefaultConfig.Enable, "enable restricted address synchronization service")
31+
s3syncer.ConfigAddOptions(prefix+".s3", f)
32+
f.Duration(prefix+".poll-interval", DefaultConfig.PollInterval, "interval between polling S3 for hash list updates")
33+
f.Int(prefix+".cache-size", DefaultConfig.CacheSize, "LRU cache size for address lookup results")
34+
}
35+
36+
func (c *Config) Validate() error {
37+
if !c.Enable {
38+
return nil
39+
}
40+
41+
if err := c.S3.Validate(); err != nil {
42+
return err
43+
}
44+
45+
if c.PollInterval <= 0 {
46+
return errors.New("address-filter.poll-interval must be positive")
47+
}
48+
49+
if c.CacheSize <= 0 {
50+
return errors.New("address-filter.cache-size must be positive")
51+
}
52+
53+
return nil
54+
}

addressfilter/hash_store.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2026, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package addressfilter
5+
6+
import (
7+
"crypto/sha256"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/ethereum/go-ethereum/common"
12+
"github.com/ethereum/go-ethereum/common/lru"
13+
)
14+
15+
// hashData holds the immutable hash list data.
16+
// Once created, this struct is never modified, making it safe for concurrent reads.
17+
// The cache is included here so it gets swapped atomically with the hash data.
18+
type hashData struct {
19+
salt []byte
20+
hashes map[common.Hash]struct{}
21+
digest string
22+
loadedAt time.Time
23+
cache *lru.Cache[common.Address, bool] // LRU cache for address lookup results
24+
}
25+
26+
// HashStore provides thread-safe access to restricted address hashes.
27+
// It uses atomic.Pointer for lock-free reads during updates, implementing
28+
// a double-buffering strategy where new data is prepared in the background
29+
// and then atomically swapped in.
30+
type HashStore struct {
31+
data atomic.Pointer[hashData]
32+
cacheSize int
33+
}
34+
35+
func NewHashStore(cacheSize int) *HashStore {
36+
h := &HashStore{
37+
cacheSize: cacheSize,
38+
}
39+
h.data.Store(&hashData{
40+
hashes: make(map[common.Hash]struct{}),
41+
cache: lru.NewCache[common.Address, bool](cacheSize),
42+
})
43+
return h
44+
}
45+
46+
// Store atomically swaps in a new hash list.
47+
// This is called after a new hash list has been downloaded and parsed.
48+
// A new LRU cache is created for the new data, ensuring atomic consistency.
49+
func (h *HashStore) Store(salt []byte, hashes []common.Hash, digest string) {
50+
newData := &hashData{
51+
salt: salt,
52+
hashes: make(map[common.Hash]struct{}, len(hashes)),
53+
digest: digest,
54+
loadedAt: time.Now(),
55+
cache: lru.NewCache[common.Address, bool](h.cacheSize),
56+
}
57+
for _, hash := range hashes {
58+
newData.hashes[hash] = struct{}{}
59+
}
60+
h.data.Store(newData) // Atomic pointer swap
61+
}
62+
63+
// IsRestricted checks if an address is in the restricted list.
64+
// Results are cached in the LRU cache for faster subsequent lookups.
65+
// This method is safe to call concurrently.
66+
func (h *HashStore) IsRestricted(addr common.Address) bool {
67+
data := h.data.Load() // Atomic load - no lock needed
68+
if len(data.salt) == 0 {
69+
return false // Not initialized
70+
}
71+
72+
// Check cache first (cache is per-data snapshot)
73+
if restricted, ok := data.cache.Get(addr); ok {
74+
return restricted
75+
}
76+
77+
saltedAddr := make([]byte, len(data.salt)+common.AddressLength)
78+
copy(saltedAddr, data.salt)
79+
copy(saltedAddr[len(data.salt):], addr.Bytes())
80+
saltedHash := sha256.Sum256(saltedAddr)
81+
82+
_, restricted := data.hashes[saltedHash]
83+
84+
// Cache the result
85+
data.cache.Add(addr, restricted)
86+
return restricted
87+
}
88+
89+
// Digest Return the digest of the current loaded hashstore.
90+
func (h *HashStore) Digest() string {
91+
return h.data.Load().digest
92+
}
93+
94+
func (h *HashStore) Size() int {
95+
return len(h.data.Load().hashes)
96+
}
97+
98+
func (h *HashStore) LoadedAt() time.Time {
99+
return h.data.Load().loadedAt
100+
}
101+
102+
// Salt returns a copy of the current salt.
103+
func (h *HashStore) Salt() []byte {
104+
data := h.data.Load()
105+
if len(data.salt) == 0 {
106+
return nil
107+
}
108+
salt := make([]byte, len(data.salt))
109+
copy(salt, data.salt)
110+
return salt
111+
}
112+
113+
// CacheLen returns the current number of entries in the LRU cache.
114+
func (h *HashStore) CacheLen() int {
115+
return h.data.Load().cache.Len()
116+
}

addressfilter/s3_sync.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2026, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package addressfilter
5+
6+
import (
7+
"context"
8+
"encoding/hex"
9+
"encoding/json"
10+
"fmt"
11+
12+
"github.com/ethereum/go-ethereum/common"
13+
"github.com/ethereum/go-ethereum/log"
14+
15+
"github.com/offchainlabs/nitro/util/s3syncer"
16+
)
17+
18+
// hashListPayload represents the JSON structure of the hash list file used for unmarshalling.
19+
type hashListPayload struct {
20+
Salt string `json:"salt"`
21+
HashingScheme string `json:"hashing_scheme,omitempty"`
22+
AddressHashes []struct {
23+
Hash string `json:"hash"`
24+
} `json:"address_hashes"`
25+
}
26+
27+
type S3SyncManager struct {
28+
Syncer *s3syncer.Syncer
29+
hashStore *HashStore
30+
}
31+
32+
func NewS3SyncManager(ctx context.Context, config *Config, hashStore *HashStore) (*S3SyncManager, error) {
33+
s := &S3SyncManager{
34+
hashStore: hashStore,
35+
}
36+
syncer, err := s3syncer.NewSyncer(
37+
ctx,
38+
&config.S3,
39+
s.handleHashListData,
40+
)
41+
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
s.Syncer = syncer
47+
return s, nil
48+
}
49+
50+
// handleHashListData parses the downloaded JSON data and loads it into the hashStore.
51+
func (s *S3SyncManager) handleHashListData(data []byte, digest string) error {
52+
salt, hashes, err := parseHashListJSON(data)
53+
if err != nil {
54+
return fmt.Errorf("failed to parse hash list: %w", err)
55+
}
56+
57+
s.hashStore.Store(salt, hashes, digest)
58+
log.Info("loaded restricted addr list", "hash_count", len(hashes), "etag", digest, "size_bytes", len(data))
59+
return nil
60+
}
61+
62+
// parseHashListJSON parses the JSON hash list file.
63+
// Expected format: {"salt": "hex...", "address_hashes": [{"hash": "hex1"}, {"hash": "hex2"}, ...]}
64+
func parseHashListJSON(data []byte) ([]byte, []common.Hash, error) {
65+
var payload hashListPayload
66+
if err := json.Unmarshal(data, &payload); err != nil {
67+
return nil, nil, fmt.Errorf("JSON unmarshal failed: %w", err)
68+
}
69+
70+
// Validate hashing scheme - warn if not Sha256 but continue for forward compatibility
71+
if payload.HashingScheme != "" && payload.HashingScheme != "Sha256" {
72+
log.Warn("unknown hashing scheme in address list, continuing with Sha256 assumption",
73+
"scheme", payload.HashingScheme)
74+
}
75+
76+
salt, err := hex.DecodeString(payload.Salt)
77+
if err != nil {
78+
return nil, nil, fmt.Errorf("invalid salt hex: %w", err)
79+
}
80+
if len(salt) == 0 {
81+
return nil, nil, fmt.Errorf("salt cannot be empty")
82+
}
83+
84+
hashes := make([]common.Hash, len(payload.AddressHashes))
85+
for i, h := range payload.AddressHashes {
86+
hashBytes, err := hex.DecodeString(h.Hash)
87+
if err != nil {
88+
return nil, nil, fmt.Errorf("invalid hash hex at index %d: %w", i, err)
89+
}
90+
if len(hashBytes) != 32 {
91+
return nil, nil, fmt.Errorf("invalid hash length at index %d: got %d, want 32", i, len(hashBytes))
92+
}
93+
copy(hashes[i][:], hashBytes)
94+
}
95+
96+
return salt, hashes, nil
97+
}

addressfilter/service.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2026, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package addressfilter
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"time"
10+
11+
"github.com/ethereum/go-ethereum/log"
12+
13+
"github.com/offchainlabs/nitro/util/stopwaiter"
14+
)
15+
16+
// Service manages the address-filteress synchronization pipeline.
17+
// It periodically polls S3 for hash list updates and maintains an in-memory
18+
// copy for efficient address filtering.
19+
type FilterService struct {
20+
stopwaiter.StopWaiter
21+
config *Config
22+
hashStore *HashStore
23+
syncMgr *S3SyncManager
24+
}
25+
26+
// NewFilterService creates a new address-filteress service.
27+
// Returns nil if the service is not enabled in the configuration.
28+
func NewFilterService(ctx context.Context, config *Config) (*FilterService, error) {
29+
if !config.Enable {
30+
return nil, nil
31+
}
32+
33+
if err := config.Validate(); err != nil {
34+
return nil, fmt.Errorf("invalid config: %w", err)
35+
}
36+
37+
hashStore := NewHashStore(config.CacheSize)
38+
syncMgr, err := NewS3SyncManager(ctx, config, hashStore)
39+
if err != nil {
40+
return nil, fmt.Errorf("failed to create S3 syncer: %w", err)
41+
}
42+
43+
return &FilterService{
44+
config: config,
45+
hashStore: hashStore,
46+
syncMgr: syncMgr,
47+
}, nil
48+
}
49+
50+
// Initialize downloads the initial hash list from S3.
51+
// This method blocks until the hash list is successfully loaded.
52+
// If this fails, the node should not start.
53+
func (s *FilterService) Initialize(ctx context.Context) error {
54+
log.Info("initializing address-filter service, downloading initial hash list",
55+
"bucket", s.config.S3.Bucket,
56+
"key", s.config.S3.ObjectKey,
57+
)
58+
59+
// Force download (ignore ETag check for initial load)
60+
if err := s.syncMgr.Syncer.DownloadAndLoad(ctx); err != nil {
61+
return fmt.Errorf("failed to load initial hash list: %w", err)
62+
}
63+
64+
log.Info("address-filter service initialized",
65+
"hash_count", s.hashStore.Size(),
66+
"etag-digest", s.hashStore.Digest(),
67+
)
68+
return nil
69+
}
70+
71+
// Start begins the background polling goroutine.
72+
// This should be called after Initialize() succeeds.
73+
func (s *FilterService) Start(ctx context.Context) {
74+
s.StopWaiter.Start(ctx, s)
75+
76+
// Start periodic polling goroutine
77+
s.CallIteratively(func(ctx context.Context) time.Duration {
78+
if err := s.syncMgr.Syncer.CheckAndSync(ctx); err != nil {
79+
log.Error("failed to sync address-filter list", "err", err)
80+
}
81+
return s.config.PollInterval
82+
})
83+
84+
log.Info("address-filter service started",
85+
"poll_interval", s.config.PollInterval,
86+
)
87+
}
88+
89+
func (s *FilterService) GetHashCount() int {
90+
if !s.config.Enable {
91+
return 0
92+
}
93+
return s.hashStore.Size()
94+
}
95+
96+
// GetHashStoreDigest GetETag returns the S3 ETag Digest of the currently loaded hash list.
97+
func (s *FilterService) GetHashStoreDigest() string {
98+
if !s.config.Enable {
99+
return ""
100+
}
101+
return s.hashStore.Digest()
102+
}
103+
104+
func (s *FilterService) GetLoadedAt() time.Time {
105+
if !s.config.Enable {
106+
return time.Time{}
107+
}
108+
return s.hashStore.LoadedAt()
109+
}
110+
111+
func (s *FilterService) GetHashStore() *HashStore {
112+
if !s.config.Enable {
113+
return nil
114+
}
115+
return s.hashStore
116+
}

0 commit comments

Comments
 (0)