Skip to content

Commit 250b138

Browse files
committed
feat: implement lock-free transaction fast feed with sub-100μs propagation target including lock-free SPMC ring buffer for zero-copy transaction events, binary fixed-size event layout for CPU cache efficiency, selective filtering by address and gas price, multiple concurrent consumer support with independent read positions, and integration foundation in txpool
1 parent a95ba33 commit 250b138

File tree

4 files changed

+606
-0
lines changed

4 files changed

+606
-0
lines changed

core/txpool/fastfeed/feed.go

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
// Copyright 2024 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package fastfeed
18+
19+
import (
20+
"sync"
21+
"sync/atomic"
22+
"time"
23+
"unsafe"
24+
25+
"errors"
26+
27+
"github.com/ethereum/go-ethereum/common"
28+
"github.com/ethereum/go-ethereum/core/types"
29+
"github.com/ethereum/go-ethereum/log"
30+
)
31+
32+
const (
33+
// DefaultBufferSize is the default ring buffer capacity (must be power of 2)
34+
DefaultBufferSize = 16384
35+
36+
// MaxReaders is the maximum number of concurrent consumers
37+
MaxReaders = 64
38+
39+
// TxEventSize is the size of a transaction event in bytes
40+
TxEventSize = 184 // 32 (hash) + 20 (from) + 20 (to) + 32 (value) + 32 (gasPrice) + 8 (nonce) + 8 (gas) + 4 (type) + 8 (timestamp) + 20 (padding)
41+
)
42+
43+
// TxEventType represents the type of transaction event
44+
type TxEventType uint8
45+
46+
const (
47+
TxEventAdded TxEventType = iota
48+
TxEventRemoved
49+
TxEventReplaced
50+
)
51+
52+
// TxEvent is a fixed-size transaction event optimized for zero-copy access.
53+
// Layout is designed for CPU cache efficiency and minimal memory access.
54+
type TxEvent struct {
55+
Hash [32]byte // Transaction hash
56+
From [20]byte // Sender address
57+
To [20]byte // Recipient address (0x0 for contract creation)
58+
Value [32]byte // Transfer value
59+
GasPrice [32]byte // Gas price or maxFeePerGas for EIP-1559
60+
Nonce uint64 // Sender nonce
61+
Gas uint64 // Gas limit
62+
Type uint8 // Transaction type
63+
EventType TxEventType // Event type (added/removed/replaced)
64+
Timestamp uint64 // Event timestamp (nanoseconds)
65+
_ [6]byte // Padding for alignment
66+
}
67+
68+
// TxFilter defines filtering criteria for transaction events.
69+
type TxFilter struct {
70+
// Addresses to watch (empty = all addresses)
71+
Addresses map[common.Address]struct{}
72+
73+
// Contract methods to watch (first 4 bytes of calldata)
74+
Methods map[[4]byte]struct{}
75+
76+
// Minimum gas price filter
77+
MinGasPrice uint64
78+
79+
// Transaction types to include
80+
Types map[uint8]struct{}
81+
}
82+
83+
// Matches returns true if the transaction matches the filter.
84+
func (f *TxFilter) Matches(event *TxEvent) bool {
85+
// Check addresses
86+
if len(f.Addresses) > 0 {
87+
fromAddr := common.BytesToAddress(event.From[:])
88+
toAddr := common.BytesToAddress(event.To[:])
89+
_, fromMatch := f.Addresses[fromAddr]
90+
_, toMatch := f.Addresses[toAddr]
91+
if !fromMatch && !toMatch {
92+
return false
93+
}
94+
}
95+
96+
// Check transaction type
97+
if len(f.Types) > 0 {
98+
if _, ok := f.Types[event.Type]; !ok {
99+
return false
100+
}
101+
}
102+
103+
// Check gas price
104+
if f.MinGasPrice > 0 {
105+
// Simple comparison of first 8 bytes as uint64
106+
gasPrice := uint64(event.GasPrice[24])<<56 |
107+
uint64(event.GasPrice[25])<<48 |
108+
uint64(event.GasPrice[26])<<40 |
109+
uint64(event.GasPrice[27])<<32 |
110+
uint64(event.GasPrice[28])<<24 |
111+
uint64(event.GasPrice[29])<<16 |
112+
uint64(event.GasPrice[30])<<8 |
113+
uint64(event.GasPrice[31])
114+
if gasPrice < f.MinGasPrice {
115+
return false
116+
}
117+
}
118+
119+
return true
120+
}
121+
122+
// TxFastFeed is a high-performance transaction event feed using lock-free ring buffers.
123+
type TxFastFeed struct {
124+
ring *RingBuffer
125+
mu sync.RWMutex
126+
filters map[int]*TxFilter
127+
nextID int
128+
enabled atomic.Bool
129+
130+
// Metrics
131+
eventsPublished atomic.Uint64
132+
eventsDropped atomic.Uint64
133+
lastPublish atomic.Int64
134+
}
135+
136+
// NewTxFastFeed creates a new fast transaction feed.
137+
func NewTxFastFeed() *TxFastFeed {
138+
feed := &TxFastFeed{
139+
ring: NewRingBuffer(DefaultBufferSize, MaxReaders),
140+
filters: make(map[int]*TxFilter),
141+
}
142+
feed.enabled.Store(true)
143+
return feed
144+
}
145+
146+
// Publish publishes a transaction event to all subscribers.
147+
func (f *TxFastFeed) Publish(tx *types.Transaction, eventType TxEventType) {
148+
if !f.enabled.Load() {
149+
return
150+
}
151+
152+
// Convert transaction to fixed-size event
153+
event := f.txToEvent(tx, eventType)
154+
155+
// Write to ring buffer
156+
eventPtr := unsafe.Pointer(&event)
157+
if !f.ring.Write(eventPtr) {
158+
f.eventsDropped.Add(1)
159+
log.Warn("Fast feed buffer full, event dropped", "hash", tx.Hash())
160+
return
161+
}
162+
163+
f.eventsPublished.Add(1)
164+
f.lastPublish.Store(time.Now().UnixNano())
165+
}
166+
167+
// Subscribe creates a new subscription with optional filtering.
168+
func (f *TxFastFeed) Subscribe(filter *TxFilter) (*Subscription, error) {
169+
f.mu.Lock()
170+
defer f.mu.Unlock()
171+
172+
if f.nextID >= MaxReaders {
173+
return nil, ErrTooManySubscribers
174+
}
175+
176+
id := f.nextID
177+
f.nextID++
178+
179+
if filter != nil {
180+
f.filters[id] = filter
181+
}
182+
183+
sub := &Subscription{
184+
id: id,
185+
feed: f,
186+
events: make(chan *TxEvent, 256),
187+
quit: make(chan struct{}),
188+
}
189+
190+
// Reset reader position to current
191+
f.ring.Reset(id)
192+
193+
// Start event delivery goroutine
194+
go sub.deliver()
195+
196+
return sub, nil
197+
}
198+
199+
// txToEvent converts a transaction to a fixed-size event.
200+
func (f *TxFastFeed) txToEvent(tx *types.Transaction, eventType TxEventType) TxEvent {
201+
var event TxEvent
202+
203+
// Hash
204+
copy(event.Hash[:], tx.Hash().Bytes())
205+
206+
// From (will be filled by caller if available)
207+
// We don't compute sender here to avoid expensive ECDSA recovery
208+
209+
// To
210+
if to := tx.To(); to != nil {
211+
copy(event.To[:], to.Bytes())
212+
}
213+
214+
// Value
215+
if value := tx.Value(); value != nil {
216+
copy(event.Value[:], value.Bytes())
217+
}
218+
219+
// Gas price
220+
if gasPrice := tx.GasPrice(); gasPrice != nil {
221+
copy(event.GasPrice[:], gasPrice.Bytes())
222+
}
223+
224+
// Other fields
225+
event.Nonce = tx.Nonce()
226+
event.Gas = tx.Gas()
227+
event.Type = tx.Type()
228+
event.EventType = eventType
229+
event.Timestamp = uint64(time.Now().UnixNano())
230+
231+
return event
232+
}
233+
234+
// PublishWithSender publishes a transaction event with a known sender.
235+
func (f *TxFastFeed) PublishWithSender(tx *types.Transaction, from common.Address, eventType TxEventType) {
236+
if !f.enabled.Load() {
237+
return
238+
}
239+
240+
event := f.txToEvent(tx, eventType)
241+
copy(event.From[:], from.Bytes())
242+
243+
eventPtr := unsafe.Pointer(&event)
244+
if !f.ring.Write(eventPtr) {
245+
f.eventsDropped.Add(1)
246+
log.Warn("Fast feed buffer full, event dropped", "hash", tx.Hash())
247+
return
248+
}
249+
250+
f.eventsPublished.Add(1)
251+
f.lastPublish.Store(time.Now().UnixNano())
252+
}
253+
254+
// Enable enables the fast feed.
255+
func (f *TxFastFeed) Enable() {
256+
f.enabled.Store(true)
257+
}
258+
259+
// Disable disables the fast feed.
260+
func (f *TxFastFeed) Disable() {
261+
f.enabled.Store(false)
262+
}
263+
264+
// Stats returns feed statistics.
265+
type FeedStats struct {
266+
BufferStats BufferStats
267+
EventsPublished uint64
268+
EventsDropped uint64
269+
LastPublishNs int64
270+
Subscribers int
271+
}
272+
273+
// Stats returns current feed statistics.
274+
func (f *TxFastFeed) Stats() FeedStats {
275+
f.mu.RLock()
276+
subscribers := len(f.filters)
277+
f.mu.RUnlock()
278+
279+
return FeedStats{
280+
BufferStats: f.ring.Stats(),
281+
EventsPublished: f.eventsPublished.Load(),
282+
EventsDropped: f.eventsDropped.Load(),
283+
LastPublishNs: f.lastPublish.Load(),
284+
Subscribers: subscribers,
285+
}
286+
}
287+
288+
var ErrTooManySubscribers = errors.New("too many subscribers")
289+

0 commit comments

Comments
 (0)