Skip to content

Commit 237ebe4

Browse files
authored
fix: improve sticky peers and re-trigger pending queue (#2610)
this definitely has a huge improvement in distributing the txs across the network and reducing the number of evictions/sequence mismatches encountered by clients. however it doesn't appear to solve the problem entirely.
1 parent a8b0992 commit 237ebe4

File tree

18 files changed

+1095
-137
lines changed

18 files changed

+1095
-137
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,4 @@ proto/spec/**/*.pb.go
5959
.venv
6060
go.work.sum
6161
.gocache
62+
.gopath

consensus/wal_fuzz.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//go:build gofuzz
2-
// +build gofuzz
32

43
package consensus
54

libs/sync/deadlock.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//go:build deadlock
2-
// +build deadlock
32

43
package sync
54

libs/sync/sync.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//go:build !deadlock
2-
// +build !deadlock
32

43
package sync
54

libs/trace/schema/mempool.go

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ func MempoolTables() []string {
1212
MempoolPeerStateTable,
1313
MempoolRecoveredPartsTable,
1414
MempoolAddResultTable,
15+
MempoolTxStatusTable,
16+
MempoolRecheckTable,
1517
}
1618
}
1719

@@ -61,9 +63,10 @@ const (
6163
type MempoolStateUpdateType string
6264

6365
const (
64-
SeenTx MempoolStateUpdateType = "SeenTx"
65-
WantTx MempoolStateUpdateType = "WantTx"
66-
Unknown MempoolStateUpdateType = "Unknown"
66+
SeenTx MempoolStateUpdateType = "SeenTx"
67+
WantTx MempoolStateUpdateType = "WantTx"
68+
MissingSequence MempoolStateUpdateType = "MissingSequence"
69+
Unknown MempoolStateUpdateType = "Unknown"
6770
)
6871

6972
// MempoolPeerState describes the schema for the "mempool_peer_state" table.
@@ -72,6 +75,8 @@ type MempoolPeerState struct {
7275
StateUpdate MempoolStateUpdateType `json:"state_update"`
7376
TxHash string `json:"tx_hash"`
7477
TransferType TransferType `json:"transfer_type"`
78+
Signer string `json:"signer,omitempty"`
79+
Sequence uint64 `json:"sequence,omitempty"`
7580
}
7681

7782
// Table returns the table name for the MempoolPeerState struct.
@@ -87,17 +92,39 @@ func WriteMempoolPeerState(
8792
stateUpdate MempoolStateUpdateType,
8893
txHash []byte,
8994
transferType TransferType,
95+
) {
96+
WriteMempoolPeerStateWithSeq(client, peer, stateUpdate, txHash, transferType, nil, 0)
97+
}
98+
99+
// WriteMempoolPeerStateWithSeq writes a tracing point for the mempool state
100+
// including signer and sequence information.
101+
func WriteMempoolPeerStateWithSeq(
102+
client trace.Tracer,
103+
peer string,
104+
stateUpdate MempoolStateUpdateType,
105+
txHash []byte,
106+
transferType TransferType,
107+
signer []byte,
108+
sequence uint64,
90109
) {
91110
// this check is redundant to what is checked during client.Write, although it
92111
// is an optimization to avoid allocations from creating the map of fields.
93112
if !client.IsCollecting(MempoolPeerStateTable) {
94113
return
95114
}
115+
116+
signerStr := ""
117+
if len(signer) > 0 {
118+
signerStr = string(signer)
119+
}
120+
96121
client.Write(MempoolPeerState{
97122
Peer: peer,
98123
StateUpdate: stateUpdate,
99124
TransferType: transferType,
100125
TxHash: bytes.HexBytes(txHash).String(),
126+
Signer: signerStr,
127+
Sequence: sequence,
101128
})
102129
}
103130

@@ -192,3 +219,119 @@ func WriteMempoolRecoveredParts(client trace.Tracer, height int64, round int32,
192219
RecoveredParts: parts,
193220
})
194221
}
222+
223+
const (
224+
// MempoolTxStatusTable is the tracing "measurement" (aka table) for the
225+
// mempool that stores tracing data related to transaction status changes
226+
// (evictions, rejections, confirmations).
227+
MempoolTxStatusTable = "mempool_tx_status"
228+
)
229+
230+
type MempoolTxStatusType string
231+
232+
const (
233+
TxEvicted MempoolTxStatusType = "evicted"
234+
TxRejected MempoolTxStatusType = "rejected"
235+
TxConfirmed MempoolTxStatusType = "confirmed"
236+
TxExpired MempoolTxStatusType = "expired"
237+
)
238+
239+
// MempoolTxStatus describes the schema for the "mempool_tx_status" table.
240+
type MempoolTxStatus struct {
241+
TxHash string `json:"tx_hash"`
242+
Status MempoolTxStatusType `json:"status"`
243+
Error string `json:"error,omitempty"`
244+
Signer string `json:"signer,omitempty"`
245+
Sequence uint64 `json:"sequence,omitempty"`
246+
}
247+
248+
// Table returns the table name for the MempoolTxStatus struct.
249+
func (MempoolTxStatus) Table() string {
250+
return MempoolTxStatusTable
251+
}
252+
253+
// WriteMempoolTxStatus writes a tracing point for mempool transaction status
254+
// changes using the predetermined schema for mempool tracing.
255+
func WriteMempoolTxStatus(
256+
client trace.Tracer,
257+
txHash []byte,
258+
status MempoolTxStatusType,
259+
err error,
260+
signer []byte,
261+
sequence uint64,
262+
) {
263+
if !client.IsCollecting(MempoolTxStatusTable) {
264+
return
265+
}
266+
267+
errStr := ""
268+
if err != nil {
269+
errStr = err.Error()
270+
}
271+
272+
signerStr := ""
273+
if len(signer) > 0 {
274+
signerStr = string(signer)
275+
}
276+
277+
client.Write(MempoolTxStatus{
278+
TxHash: bytes.HexBytes(txHash).String(),
279+
Status: status,
280+
Error: errStr,
281+
Signer: signerStr,
282+
Sequence: sequence,
283+
})
284+
}
285+
286+
const (
287+
// MempoolRecheckTable is the tracing "measurement" (aka table) for the
288+
// mempool that stores tracing data related to transaction rechecks.
289+
MempoolRecheckTable = "mempool_recheck"
290+
)
291+
292+
// MempoolRecheck describes the schema for the "mempool_recheck" table.
293+
type MempoolRecheck struct {
294+
TxHash string `json:"tx_hash"`
295+
Signer string `json:"signer,omitempty"`
296+
Sequence uint64 `json:"sequence,omitempty"`
297+
Kept bool `json:"kept"`
298+
Error string `json:"error,omitempty"`
299+
}
300+
301+
// Table returns the table name for the MempoolRecheck struct.
302+
func (MempoolRecheck) Table() string {
303+
return MempoolRecheckTable
304+
}
305+
306+
// WriteMempoolRecheck writes a tracing point for mempool recheck events using
307+
// the predetermined schema for mempool tracing.
308+
func WriteMempoolRecheck(
309+
client trace.Tracer,
310+
txHash []byte,
311+
signer []byte,
312+
sequence uint64,
313+
kept bool,
314+
err error,
315+
) {
316+
if !client.IsCollecting(MempoolRecheckTable) {
317+
return
318+
}
319+
320+
errStr := ""
321+
if err != nil {
322+
errStr = err.Error()
323+
}
324+
325+
signerStr := ""
326+
if len(signer) > 0 {
327+
signerStr = string(signer)
328+
}
329+
330+
client.Write(MempoolRecheck{
331+
TxHash: bytes.HexBytes(txHash).String(),
332+
Signer: signerStr,
333+
Sequence: sequence,
334+
Kept: kept,
335+
Error: errStr,
336+
})
337+
}

0 commit comments

Comments
 (0)