Skip to content

Commit b4c6d90

Browse files
authored
Merge pull request #49 from cloudstruct/feature/pipelining
Send/receive message only when allowed by state map
2 parents d14ed3d + 3a95117 commit b4c6d90

File tree

1 file changed

+46
-14
lines changed

1 file changed

+46
-14
lines changed

protocol/protocol.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ import (
1010
)
1111

1212
type Protocol struct {
13-
config ProtocolConfig
14-
sendChan chan *muxer.Segment
15-
recvChan chan *muxer.Segment
16-
state State
17-
stateMutex sync.Mutex
18-
recvBuffer *bytes.Buffer
13+
config ProtocolConfig
14+
sendChan chan *muxer.Segment
15+
recvChan chan *muxer.Segment
16+
state State
17+
stateMutex sync.Mutex
18+
recvBuffer *bytes.Buffer
19+
recvReadyChan chan bool
20+
sendReadyChan chan bool
1921
}
2022

2123
type ProtocolConfig struct {
@@ -60,13 +62,15 @@ type MessageFromCborFunc func(uint, []byte) (Message, error)
6062
func New(config ProtocolConfig) *Protocol {
6163
sendChan, recvChan := config.Muxer.RegisterProtocol(config.ProtocolId)
6264
p := &Protocol{
63-
config: config,
64-
sendChan: sendChan,
65-
recvChan: recvChan,
66-
recvBuffer: bytes.NewBuffer(nil),
65+
config: config,
66+
sendChan: sendChan,
67+
recvChan: recvChan,
68+
recvBuffer: bytes.NewBuffer(nil),
69+
recvReadyChan: make(chan bool, 1),
70+
sendReadyChan: make(chan bool, 1),
6771
}
6872
// Set initial state
69-
p.state = config.InitialState
73+
p.setState(config.InitialState)
7074
// Start our receiver Goroutine
7175
go p.recvLoop()
7276
return p
@@ -81,6 +85,8 @@ func (p *Protocol) Role() ProtocolRole {
8185
}
8286

8387
func (p *Protocol) SendMessage(msg Message, isResponse bool) error {
88+
// Wait until ready to send based on state map
89+
<-p.sendReadyChan
8490
// Lock the state to prevent collisions
8591
p.stateMutex.Lock()
8692
if err := p.checkCurrentState(); err != nil {
@@ -103,7 +109,7 @@ func (p *Protocol) SendMessage(msg Message, isResponse bool) error {
103109
segment := muxer.NewSegment(p.config.ProtocolId, data, isResponse)
104110
p.sendChan <- segment
105111
// Set new state and unlock
106-
p.state = newState
112+
p.setState(newState)
107113
p.stateMutex.Unlock()
108114
return nil
109115
}
@@ -127,6 +133,8 @@ func (p *Protocol) recvLoop() {
127133
isResponse = segment.IsResponse()
128134
}
129135
leftoverData = false
136+
// Wait until ready to receive based on state map
137+
<-p.recvReadyChan
130138
// Decode message into generic list until we can determine what type of message it is
131139
// This also lets us determine how many bytes the message is
132140
var tmpMsg []interface{}
@@ -135,13 +143,15 @@ func (p *Protocol) recvLoop() {
135143
if err == io.EOF && p.recvBuffer.Len() > 0 {
136144
// This is probably a multi-part message, so we wait until we get more of the message
137145
// before trying to process it
146+
p.recvReadyChan <- true
138147
continue
139148
}
140149
p.config.ErrorChan <- fmt.Errorf("%s: decode error: %s", p.config.Name, err)
141150
}
142151
// Create Message object from CBOR
143152
msgType := uint(tmpMsg[0].(uint64))
144-
msg, err := p.config.MessageFromCborFunc(msgType, p.recvBuffer.Bytes())
153+
msgData := p.recvBuffer.Bytes()[:numBytesRead]
154+
msg, err := p.config.MessageFromCborFunc(msgType, msgData)
145155
if err != nil {
146156
p.config.ErrorChan <- err
147157
}
@@ -192,6 +202,28 @@ func (p *Protocol) getNewState(msg Message) (State, error) {
192202
return newState, nil
193203
}
194204

205+
func (p *Protocol) setState(state State) {
206+
// Set the new state
207+
p.state = state
208+
// Mark protocol as ready to send/receive based on role and agency of the new state
209+
switch p.config.StateMap[p.state].Agency {
210+
case AGENCY_CLIENT:
211+
switch p.config.Role {
212+
case ProtocolRoleClient:
213+
p.sendReadyChan <- true
214+
case ProtocolRoleServer:
215+
p.recvReadyChan <- true
216+
}
217+
case AGENCY_SERVER:
218+
switch p.config.Role {
219+
case ProtocolRoleServer:
220+
p.sendReadyChan <- true
221+
case ProtocolRoleClient:
222+
p.recvReadyChan <- true
223+
}
224+
}
225+
}
226+
195227
func (p *Protocol) handleMessage(msg Message, isResponse bool) error {
196228
// Lock the state to prevent collisions
197229
p.stateMutex.Lock()
@@ -203,7 +235,7 @@ func (p *Protocol) handleMessage(msg Message, isResponse bool) error {
203235
return fmt.Errorf("%s: error handling message: %s", p.config.Name, err)
204236
}
205237
// Set new state and unlock
206-
p.state = newState
238+
p.setState(newState)
207239
p.stateMutex.Unlock()
208240
// Call handler function
209241
return p.config.MessageHandlerFunc(msg, isResponse)

0 commit comments

Comments
 (0)