Skip to content

Commit 5ac94a5

Browse files
authored
feat: added state context to state transitions and enabled chainsync pipelining (#585)
1 parent 1b07d4f commit 5ac94a5

File tree

6 files changed

+110
-21
lines changed

6 files changed

+110
-21
lines changed

protocol/chainsync/chainsync.go

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package chainsync
1717

1818
import (
19+
"sync"
1920
"time"
2021

2122
"github.com/blinklabs-io/gouroboros/connection"
@@ -44,8 +45,9 @@ var StateMap = protocol.StateMap{
4445
Agency: protocol.AgencyClient,
4546
Transitions: []protocol.StateTransition{
4647
{
47-
MsgType: MessageTypeRequestNext,
48-
NewState: stateCanAwait,
48+
MsgType: MessageTypeRequestNext,
49+
NewState: stateCanAwait,
50+
MatchFunc: IncrementPipelineCount,
4951
},
5052
{
5153
MsgType: MessageTypeFindIntersect,
@@ -60,17 +62,34 @@ var StateMap = protocol.StateMap{
6062
stateCanAwait: protocol.StateMapEntry{
6163
Agency: protocol.AgencyServer,
6264
Transitions: []protocol.StateTransition{
65+
{
66+
MsgType: MessageTypeRequestNext,
67+
NewState: stateCanAwait,
68+
MatchFunc: IncrementPipelineCount,
69+
},
6370
{
6471
MsgType: MessageTypeAwaitReply,
6572
NewState: stateMustReply,
6673
},
6774
{
68-
MsgType: MessageTypeRollForward,
69-
NewState: stateIdle,
75+
MsgType: MessageTypeRollForward,
76+
NewState: stateIdle,
77+
MatchFunc: DecrementPipelineCountAndIsEmpty,
7078
},
7179
{
72-
MsgType: MessageTypeRollBackward,
73-
NewState: stateIdle,
80+
MsgType: MessageTypeRollForward,
81+
NewState: stateCanAwait,
82+
MatchFunc: DecrementPipelineCountAndIsNotEmpty,
83+
},
84+
{
85+
MsgType: MessageTypeRollBackward,
86+
NewState: stateIdle,
87+
MatchFunc: DecrementPipelineCountAndIsEmpty,
88+
},
89+
{
90+
MsgType: MessageTypeRollBackward,
91+
NewState: stateCanAwait,
92+
MatchFunc: DecrementPipelineCountAndIsNotEmpty,
7493
},
7594
},
7695
},
@@ -91,12 +110,24 @@ var StateMap = protocol.StateMap{
91110
Agency: protocol.AgencyServer,
92111
Transitions: []protocol.StateTransition{
93112
{
94-
MsgType: MessageTypeRollForward,
95-
NewState: stateIdle,
113+
MsgType: MessageTypeRollForward,
114+
NewState: stateIdle,
115+
MatchFunc: DecrementPipelineCountAndIsEmpty,
96116
},
97117
{
98-
MsgType: MessageTypeRollBackward,
99-
NewState: stateIdle,
118+
MsgType: MessageTypeRollForward,
119+
NewState: stateCanAwait,
120+
MatchFunc: DecrementPipelineCountAndIsNotEmpty,
121+
},
122+
{
123+
MsgType: MessageTypeRollBackward,
124+
NewState: stateIdle,
125+
MatchFunc: DecrementPipelineCountAndIsEmpty,
126+
},
127+
{
128+
MsgType: MessageTypeRollBackward,
129+
NewState: stateCanAwait,
130+
MatchFunc: DecrementPipelineCountAndIsNotEmpty,
100131
},
101132
},
102133
},
@@ -105,6 +136,60 @@ var StateMap = protocol.StateMap{
105136
},
106137
}
107138

139+
type StateContext struct {
140+
mu sync.Mutex
141+
pipelineCount int
142+
}
143+
144+
var IncrementPipelineCount = func(context interface{}, msg protocol.Message) bool {
145+
s := context.(*StateContext)
146+
s.mu.Lock()
147+
defer s.mu.Unlock()
148+
149+
s.pipelineCount++
150+
return true
151+
}
152+
153+
var DecrementPipelineCountAndIsEmpty = func(context interface{}, msg protocol.Message) bool {
154+
s := context.(*StateContext)
155+
s.mu.Lock()
156+
defer s.mu.Unlock()
157+
158+
if s.pipelineCount == 1 {
159+
s.pipelineCount--
160+
return true
161+
}
162+
return false
163+
}
164+
165+
var DecrementPipelineCountAndIsNotEmpty = func(context interface{}, msg protocol.Message) bool {
166+
s := context.(*StateContext)
167+
s.mu.Lock()
168+
defer s.mu.Unlock()
169+
170+
if s.pipelineCount > 1 {
171+
s.pipelineCount--
172+
return true
173+
}
174+
return false
175+
}
176+
177+
var PipelineIsEmtpy = func(context interface{}, msg protocol.Message) bool {
178+
s := context.(*StateContext)
179+
s.mu.Lock()
180+
defer s.mu.Unlock()
181+
182+
return s.pipelineCount == 0
183+
}
184+
185+
var PipelineIsNotEmpty = func(context interface{}, msg protocol.Message) bool {
186+
s := context.(*StateContext)
187+
s.mu.Lock()
188+
defer s.mu.Unlock()
189+
190+
return s.pipelineCount > 0
191+
}
192+
108193
// ChainSync is a wrapper object that holds the client and server instances
109194
type ChainSync struct {
110195
Client *Client
@@ -137,9 +222,11 @@ type RequestNextFunc func(CallbackContext) error
137222

138223
// New returns a new ChainSync object
139224
func New(protoOptions protocol.ProtocolOptions, cfg *Config) *ChainSync {
225+
stateContext := &StateContext{}
226+
140227
c := &ChainSync{
141-
Client: NewClient(protoOptions, cfg),
142-
Server: NewServer(protoOptions, cfg),
228+
Client: NewClient(stateContext, protoOptions, cfg),
229+
Server: NewServer(stateContext, protoOptions, cfg),
143230
}
144231
return c
145232
}

protocol/chainsync/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type Client struct {
4343
}
4444

4545
// NewClient returns a new ChainSync client object
46-
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
46+
func NewClient(stateContext interface{}, protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
4747
// Use node-to-client protocol ID
4848
ProtocolId := ProtocolIdNtC
4949
msgFromCborFunc := NewMsgFromCborNtC
@@ -91,6 +91,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
9191
MessageHandlerFunc: c.messageHandler,
9292
MessageFromCborFunc: msgFromCborFunc,
9393
StateMap: stateMap,
94+
StateContext: stateContext,
9495
InitialState: stateIdle,
9596
}
9697
c.Protocol = protocol.New(protoConfig)

protocol/chainsync/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Server struct {
3030
}
3131

3232
// NewServer returns a new ChainSync server object
33-
func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
33+
func NewServer(stateContext interface{}, protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
3434
// Use node-to-client protocol ID
3535
ProtocolId := ProtocolIdNtC
3636
msgFromCborFunc := NewMsgFromCborNtC
@@ -56,6 +56,7 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
5656
MessageHandlerFunc: s.messageHandler,
5757
MessageFromCborFunc: msgFromCborFunc,
5858
StateMap: StateMap,
59+
StateContext: stateContext,
5960
InitialState: stateIdle,
6061
}
6162
s.Protocol = protocol.New(protoConfig)

protocol/protocol.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"bytes"
2020
"fmt"
2121
"io"
22-
"reflect"
2322
"sync"
2423
"time"
2524

@@ -58,6 +57,7 @@ type ProtocolConfig struct {
5857
MessageHandlerFunc MessageHandlerFunc
5958
MessageFromCborFunc MessageFromCborFunc
6059
StateMap StateMap
60+
StateContext interface{}
6161
InitialState State
6262
}
6363

@@ -495,7 +495,7 @@ func (p *Protocol) nextState(currentState State, msg Message) (State, error) {
495495
if transition.MsgType == msg.Type() {
496496
if transition.MatchFunc != nil {
497497
// Skip item if match function returns false
498-
if !transition.MatchFunc(msg) {
498+
if !transition.MatchFunc(p.config.StateContext, msg) {
499499
continue
500500
}
501501
}
@@ -504,8 +504,8 @@ func (p *Protocol) nextState(currentState State, msg Message) (State, error) {
504504
}
505505

506506
return State{}, fmt.Errorf(
507-
"message %s not allowed in current protocol state %s",
508-
reflect.TypeOf(msg).Name(),
507+
"message %T not allowed in current protocol state %s",
508+
msg,
509509
currentState,
510510
)
511511
}

protocol/state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type StateTransition struct {
5555

5656
// StateTransitionMatchFunc represents a function that will take a Message and return a bool
5757
// that indicates whether the message is a match for the state transition rule
58-
type StateTransitionMatchFunc func(Message) bool
58+
type StateTransitionMatchFunc func(interface{}, Message) bool
5959

6060
// StateMapEntry represents a protocol state, it's possible state transitions, and an optional timeout
6161
type StateMapEntry struct {

protocol/txsubmission/txsubmission.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var StateMap = protocol.StateMap{
5555
MsgType: MessageTypeRequestTxIds,
5656
NewState: stateTxIdsBlocking,
5757
// Match if blocking
58-
MatchFunc: func(msg protocol.Message) bool {
58+
MatchFunc: func(context interface{}, msg protocol.Message) bool {
5959
msgRequestTxIds := msg.(*MsgRequestTxIds)
6060
return msgRequestTxIds.Blocking
6161
},
@@ -64,7 +64,7 @@ var StateMap = protocol.StateMap{
6464
MsgType: MessageTypeRequestTxIds,
6565
NewState: stateTxIdsNonblocking,
6666
// Metch if non-blocking
67-
MatchFunc: func(msg protocol.Message) bool {
67+
MatchFunc: func(context interface{}, msg protocol.Message) bool {
6868
msgRequestTxIds := msg.(*MsgRequestTxIds)
6969
return !msgRequestTxIds.Blocking
7070
},

0 commit comments

Comments
 (0)