Skip to content

Commit 33aaa56

Browse files
authored
Merge pull request #61 from cloudstruct/feature/send-pipelining
Support for pipelining on send
2 parents 617e6f9 + ab1bf2b commit 33aaa56

File tree

7 files changed

+148
-87
lines changed

7 files changed

+148
-87
lines changed

cmd/go-ouroboros-network/chainsync.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,15 @@ func testChainSync(f *globalFlags) {
135135
}
136136
// Wait until ready for next block
137137
<-syncState.readyForNextBlockChan
138+
// Pipeline the initial block requests to speed things up a bit
139+
// Using a value higher than 10 seems to cause problems with NtN
140+
for i := 0; i < 10; i++ {
141+
err := o.ChainSync.RequestNext()
142+
if err != nil {
143+
fmt.Printf("ERROR: RequestNext: %s\n", err)
144+
os.Exit(1)
145+
}
146+
}
138147
for {
139148
err := o.ChainSync.RequestNext()
140149
if err != nil {
@@ -208,8 +217,8 @@ func chainSyncRollForwardHandler(blockType uint, blockData interface{}) error {
208217
fmt.Printf("unsupported (so far) block type %d\n", blockType)
209218
fmt.Printf("%s\n", utils.DumpCborStructure(blockData, ""))
210219
}
211-
syncState.readyForNextBlockChan <- true
212220
}
221+
syncState.readyForNextBlockChan <- true
213222
return nil
214223
}
215224

@@ -266,6 +275,5 @@ func blockFetchBlockHandler(blockType uint, blockData interface{}) error {
266275
}
267276

268277
func blockFetchBatchDoneHandler() error {
269-
syncState.readyForNextBlockChan <- true
270278
return nil
271279
}

protocol/blockfetch/blockfetch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ func (b *BlockFetch) messageHandler(msg protocol.Message, isResponse bool) error
121121

122122
func (b *BlockFetch) RequestRange(start []interface{}, end []interface{}) error {
123123
msg := NewMsgRequestRange(start, end)
124-
return b.SendMessage(msg, false)
124+
return b.SendMessage(msg)
125125
}
126126

127127
func (b *BlockFetch) ClientDone() error {
128128
msg := NewMsgClientDone()
129-
return b.SendMessage(msg, false)
129+
return b.SendMessage(msg)
130130
}
131131

132132
func (b *BlockFetch) handleStartBatch() error {

protocol/chainsync/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,12 @@ func (c *ChainSync) messageHandler(msg protocol.Message, isResponse bool) error
160160

161161
func (c *ChainSync) RequestNext() error {
162162
msg := NewMsgRequestNext()
163-
return c.SendMessage(msg, false)
163+
return c.SendMessage(msg)
164164
}
165165

166166
func (c *ChainSync) FindIntersect(points []interface{}) error {
167167
msg := NewMsgFindIntersect(points)
168-
return c.SendMessage(msg, false)
168+
return c.SendMessage(msg)
169169
}
170170

171171
func (c *ChainSync) handleAwaitReply() error {

protocol/handshake/handshake.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (h *Handshake) ProposeVersions(versions []uint16, networkMagic uint32) erro
101101
}
102102
}
103103
msg := NewMsgProposeVersions(versionMap)
104-
return h.SendMessage(msg, false)
104+
return h.SendMessage(msg)
105105
}
106106

107107
func (h *Handshake) handleProposeVersions(msgGeneric protocol.Message) error {
@@ -121,7 +121,7 @@ func (h *Handshake) handleProposeVersions(msgGeneric protocol.Message) error {
121121
}
122122
if highestVersion > 0 {
123123
resp := NewMsgAcceptVersion(highestVersion, versionData)
124-
if err := h.SendMessage(resp, true); err != nil {
124+
if err := h.SendMessage(resp); err != nil {
125125
return err
126126
}
127127
h.Version = highestVersion

protocol/keepalive/keepalive.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (k *KeepAlive) Stop() {
118118

119119
func (k *KeepAlive) KeepAlive(cookie uint16) error {
120120
msg := NewMsgKeepAlive(cookie)
121-
return k.SendMessage(msg, false)
121+
return k.SendMessage(msg)
122122
}
123123

124124
func (k *KeepAlive) handleKeepAlive(msgGeneric protocol.Message) error {
@@ -129,7 +129,7 @@ func (k *KeepAlive) handleKeepAlive(msgGeneric protocol.Message) error {
129129
} else {
130130
// Send the keep-alive response
131131
resp := NewMsgKeepAliveResponse(msg.Cookie)
132-
return k.SendMessage(resp, true)
132+
return k.SendMessage(resp)
133133
}
134134
}
135135

protocol/localtxsubmission/localtxsubmission.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ func (l *LocalTxSubmission) messageHandler(msg protocol.Message, isResponse bool
101101

102102
func (l *LocalTxSubmission) SubmitTx(eraId uint16, tx []byte) error {
103103
msg := NewMsgSubmitTx(eraId, tx)
104-
return l.SendMessage(msg, false)
104+
return l.SendMessage(msg)
105105
}
106106

107107
func (l *LocalTxSubmission) Done(tx interface{}) error {
108108
msg := NewMsgDone()
109-
return l.SendMessage(msg, false)
109+
return l.SendMessage(msg)
110110
}
111111

112112
func (l *LocalTxSubmission) handleSubmitTx(msgGeneric protocol.Message) error {

protocol/protocol.go

Lines changed: 128 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@ import (
99
"sync"
1010
)
1111

12+
const (
13+
// This is completely arbitrary, but the line had to be drawn somewhere
14+
MAX_MESSAGES_PER_SEGMENT = 20
15+
)
16+
1217
type Protocol struct {
13-
config ProtocolConfig
14-
sendChan chan *muxer.Segment
15-
recvChan chan *muxer.Segment
16-
state State
17-
stateMutex sync.Mutex
18-
recvBuffer *bytes.Buffer
19-
recvReadyChan chan bool
20-
sendReadyChan chan bool
18+
config ProtocolConfig
19+
muxerSendChan chan *muxer.Segment
20+
muxerRecvChan chan *muxer.Segment
21+
state State
22+
stateMutex sync.Mutex
23+
recvBuffer *bytes.Buffer
24+
sendQueueChan chan Message
25+
sendStateQueueChan chan Message
26+
recvReadyChan chan bool
27+
sendReadyChan chan bool
2128
}
2229

2330
type ProtocolConfig struct {
@@ -62,19 +69,22 @@ type MessageHandlerFunc func(Message, bool) error
6269
type MessageFromCborFunc func(uint, []byte) (Message, error)
6370

6471
func New(config ProtocolConfig) *Protocol {
65-
sendChan, recvChan := config.Muxer.RegisterProtocol(config.ProtocolId)
72+
muxerSendChan, muxerRecvChan := config.Muxer.RegisterProtocol(config.ProtocolId)
6673
p := &Protocol{
67-
config: config,
68-
sendChan: sendChan,
69-
recvChan: recvChan,
70-
recvBuffer: bytes.NewBuffer(nil),
71-
recvReadyChan: make(chan bool, 1),
72-
sendReadyChan: make(chan bool, 1),
74+
config: config,
75+
muxerSendChan: muxerSendChan,
76+
muxerRecvChan: muxerRecvChan,
77+
recvBuffer: bytes.NewBuffer(nil),
78+
sendQueueChan: make(chan Message, 50),
79+
sendStateQueueChan: make(chan Message, 50),
80+
recvReadyChan: make(chan bool, 1),
81+
sendReadyChan: make(chan bool, 1),
7382
}
7483
// Set initial state
7584
p.setState(config.InitialState)
76-
// Start our receiver Goroutine
85+
// Start our send and receive Goroutines
7786
go p.recvLoop()
87+
go p.sendLoop()
7888
return p
7989
}
8090

@@ -86,56 +96,114 @@ func (p *Protocol) Role() ProtocolRole {
8696
return p.config.Role
8797
}
8898

89-
func (p *Protocol) SendMessage(msg Message, isResponse bool) error {
90-
// Wait until ready to send based on state map
91-
<-p.sendReadyChan
92-
// Lock the state to prevent collisions
93-
p.stateMutex.Lock()
94-
if err := p.checkCurrentState(); err != nil {
95-
return fmt.Errorf("%s: error sending message: %s", p.config.Name, err)
96-
}
97-
newState, err := p.getNewState(msg)
98-
if err != nil {
99-
return fmt.Errorf("%s: error sending message: %s", p.config.Name, err)
100-
}
101-
// Get raw CBOR from message
102-
data := msg.Cbor()
103-
// If message has no raw CBOR, encode the message
104-
if data == nil {
105-
var err error
106-
data, err = utils.CborEncode(msg)
107-
if err != nil {
108-
return err
109-
}
110-
}
111-
// Send message in multiple segments (if needed)
112-
for {
113-
// Determine segment payload length
114-
segmentPayloadLength := len(data)
115-
if segmentPayloadLength > muxer.SEGMENT_MAX_PAYLOAD_LENGTH {
116-
segmentPayloadLength = muxer.SEGMENT_MAX_PAYLOAD_LENGTH
117-
}
118-
// Send current segment
119-
segmentPayload := data[:segmentPayloadLength]
120-
segment := muxer.NewSegment(p.config.ProtocolId, segmentPayload, isResponse)
121-
p.sendChan <- segment
122-
// Remove current segment's data from buffer
123-
if len(data) > segmentPayloadLength {
124-
data = data[segmentPayloadLength:]
125-
} else {
126-
break
127-
}
128-
}
129-
// Set new state and unlock
130-
p.setState(newState)
131-
p.stateMutex.Unlock()
99+
func (p *Protocol) SendMessage(msg Message) error {
100+
p.sendQueueChan <- msg
132101
return nil
133102
}
134103

135104
func (p *Protocol) SendError(err error) {
136105
p.config.ErrorChan <- err
137106
}
138107

108+
func (p *Protocol) sendLoop() {
109+
var setNewState bool
110+
var newState State
111+
var err error
112+
for {
113+
// Wait until ready to send based on state map
114+
<-p.sendReadyChan
115+
// Lock the state to prevent collisions
116+
p.stateMutex.Lock()
117+
// Check for queued state changes from previous pipelined sends
118+
setNewState = false
119+
if len(p.sendStateQueueChan) > 0 {
120+
msg := <-p.sendStateQueueChan
121+
newState, err = p.getNewState(msg)
122+
if err != nil {
123+
p.SendError(fmt.Errorf("%s: error sending message: %s", p.config.Name, err))
124+
}
125+
setNewState = true
126+
// If there are no queued messages, set the new state now
127+
if len(p.sendQueueChan) == 0 {
128+
p.setState(newState)
129+
p.stateMutex.Unlock()
130+
continue
131+
}
132+
}
133+
// Read queued messages and write into buffer
134+
payloadBuf := bytes.NewBuffer(nil)
135+
msgCount := 0
136+
for {
137+
// Get next message from send queue
138+
msg := <-p.sendQueueChan
139+
msgCount = msgCount + 1
140+
// Write the message into the send state queue if we already have a new state
141+
if setNewState {
142+
p.sendStateQueueChan <- msg
143+
}
144+
// Get raw CBOR from message
145+
data := msg.Cbor()
146+
// If message has no raw CBOR, encode the message
147+
if data == nil {
148+
var err error
149+
data, err = utils.CborEncode(msg)
150+
if err != nil {
151+
p.SendError(err)
152+
}
153+
}
154+
payloadBuf.Write(data)
155+
if !setNewState {
156+
newState, err = p.getNewState(msg)
157+
if err != nil {
158+
p.SendError(fmt.Errorf("%s: error sending message: %s", p.config.Name, err))
159+
}
160+
setNewState = true
161+
}
162+
// We don't want more than MAX_MESSAGES_PER_SEGMENT messages in a segment
163+
if msgCount >= MAX_MESSAGES_PER_SEGMENT {
164+
break
165+
}
166+
// We don't want to add more messages once we spill over into a second segment
167+
if payloadBuf.Len() > muxer.SEGMENT_MAX_PAYLOAD_LENGTH {
168+
break
169+
}
170+
// Check if there are any more queued messages
171+
if len(p.sendQueueChan) == 0 {
172+
break
173+
}
174+
// We don't want to block on writes to the send state queue
175+
if len(p.sendStateQueueChan) == cap(p.sendStateQueueChan) {
176+
break
177+
}
178+
}
179+
// Send messages in multiple segments (if needed)
180+
for {
181+
// Determine segment payload length
182+
segmentPayloadLength := payloadBuf.Len()
183+
if segmentPayloadLength > muxer.SEGMENT_MAX_PAYLOAD_LENGTH {
184+
segmentPayloadLength = muxer.SEGMENT_MAX_PAYLOAD_LENGTH
185+
}
186+
// Send current segment
187+
segmentPayload := payloadBuf.Bytes()[:segmentPayloadLength]
188+
isResponse := false
189+
if p.Role() == ProtocolRoleServer {
190+
isResponse = true
191+
}
192+
segment := muxer.NewSegment(p.config.ProtocolId, segmentPayload, isResponse)
193+
p.muxerSendChan <- segment
194+
// Remove current segment's data from buffer
195+
if payloadBuf.Len() > segmentPayloadLength {
196+
payloadBuf = bytes.NewBuffer(payloadBuf.Bytes()[segmentPayloadLength:])
197+
} else {
198+
break
199+
}
200+
}
201+
// Set new state and unlock
202+
p.setState(newState)
203+
p.stateMutex.Unlock()
204+
}
205+
}
206+
139207
func (p *Protocol) recvLoop() {
140208
leftoverData := false
141209
isResponse := false
@@ -144,7 +212,7 @@ func (p *Protocol) recvLoop() {
144212
// Don't grab the next segment from the muxer if we still have data in the buffer
145213
if !leftoverData {
146214
// Wait for segment
147-
segment := <-p.recvChan
215+
segment := <-p.muxerRecvChan
148216
// Add segment payload to buffer
149217
p.recvBuffer.Write(segment.Payload)
150218
// Save whether it's a response
@@ -192,18 +260,6 @@ func (p *Protocol) recvLoop() {
192260
}
193261
}
194262

195-
func (p *Protocol) checkCurrentState() error {
196-
if currentStateMapEntry, ok := p.config.StateMap[p.state]; ok {
197-
if currentStateMapEntry.Agency == AGENCY_NONE {
198-
return fmt.Errorf("protocol is in state with no agency")
199-
}
200-
// TODO: check client/server agency
201-
} else {
202-
return fmt.Errorf("protocol in unknown state")
203-
}
204-
return nil
205-
}
206-
207263
func (p *Protocol) getNewState(msg Message) (State, error) {
208264
var newState State
209265
matchFound := false
@@ -251,9 +307,6 @@ func (p *Protocol) setState(state State) {
251307
func (p *Protocol) handleMessage(msg Message, isResponse bool) error {
252308
// Lock the state to prevent collisions
253309
p.stateMutex.Lock()
254-
if err := p.checkCurrentState(); err != nil {
255-
return fmt.Errorf("%s: error handling message: %s", p.config.Name, err)
256-
}
257310
newState, err := p.getNewState(msg)
258311
if err != nil {
259312
return fmt.Errorf("%s: error handling message: %s", p.config.Name, err)

0 commit comments

Comments
 (0)