Skip to content

Commit 58601b4

Browse files
committed
fix data races in proxy server
Signed-off-by: Matus Mrekaj <[email protected]> Change-Id: I932d560548ee816e28683243a7318a2a7fbbb24a
1 parent cc80dbc commit 58601b4

File tree

10 files changed

+442
-126
lines changed

10 files changed

+442
-126
lines changed

cmd/vpp-proxy/main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ func runClient() {
120120
}
121121

122122
func runServer() {
123-
p := proxy.NewServer()
123+
p, err := proxy.NewServer()
124+
if err != nil {
125+
log.Fatalln(err)
126+
}
124127

125128
statsAdapter := statsclient.NewStatsClient(*statsSocket)
126129
binapiAdapter := socketclient.NewVppClient(*binapiSocket)
@@ -135,5 +138,7 @@ func runServer() {
135138
}
136139
defer p.DisconnectBinapi()
137140

138-
p.ListenAndServe(*proxyAddr)
141+
if err := p.ListenAndServe(*proxyAddr); err != nil {
142+
log.Fatalln(err)
143+
}
139144
}

core/connection.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ type Connection struct {
111111

112112
lastReplyLock sync.Mutex // lock for the last reply
113113
lastReply time.Time // time of the last received reply from VPP
114+
115+
msgControlPing api.Message
116+
msgControlPingReply api.Message
114117
}
115118

116119
func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
@@ -122,14 +125,16 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration)
122125
}
123126

124127
c := &Connection{
125-
vppClient: binapi,
126-
maxAttempts: attempts,
127-
recInterval: interval,
128-
codec: &codec.MsgCodec{},
129-
msgIDs: make(map[string]uint16),
130-
msgMap: make(map[uint16]api.Message),
131-
channels: make(map[uint16]*Channel),
132-
subscriptions: make(map[uint16][]*subscriptionCtx),
128+
vppClient: binapi,
129+
maxAttempts: attempts,
130+
recInterval: interval,
131+
codec: &codec.MsgCodec{},
132+
msgIDs: make(map[string]uint16),
133+
msgMap: make(map[uint16]api.Message),
134+
channels: make(map[uint16]*Channel),
135+
subscriptions: make(map[uint16][]*subscriptionCtx),
136+
msgControlPing: msgControlPing,
137+
msgControlPingReply: msgControlPingReply,
133138
}
134139
binapi.SetMsgCallback(c.msgCallback)
135140
return c
@@ -314,7 +319,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
314319
}
315320

316321
// send the control ping request
317-
ch.reqChan <- &vppRequest{msg: msgControlPing}
322+
ch.reqChan <- &vppRequest{msg: c.msgControlPing}
318323

319324
for {
320325
// expect response within timeout period
@@ -427,12 +432,12 @@ func (c *Connection) retrieveMessageIDs() (err error) {
427432
}
428433
n++
429434

430-
if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
435+
if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
431436
c.pingReqID = msgID
432-
msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
433-
} else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
437+
c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
438+
} else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
434439
c.pingReplyID = msgID
435-
msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
440+
c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
436441
}
437442

438443
if debugMsgIDs {

core/control_ping.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package core
22

3-
import "git.fd.io/govpp.git/api"
3+
import (
4+
"git.fd.io/govpp.git/api"
5+
)
46

57
var (
68
msgControlPing api.Message = new(ControlPing)

core/request_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
110110

111111
if req.multi {
112112
// send a control ping to determine end of the multipart response
113-
pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
113+
pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
114114

115115
log.WithFields(logger.Fields{
116116
"channel": ch.id,

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/golang/protobuf v1.3.2 // indirect
1010
github.com/hpcloud/tail v1.0.0 // indirect
1111
github.com/kr/pretty v0.1.0 // indirect
12-
github.com/lunixbochs/struc v0.0.0-20180408203800-02e4c2afbb2a
12+
github.com/lunixbochs/struc v0.0.0-20190916212049-a5c72983bc42
1313
github.com/onsi/ginkgo v1.8.0 // indirect
1414
github.com/onsi/gomega v1.1.0
1515
github.com/pkg/profile v1.2.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
1717
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
1818
github.com/lunixbochs/struc v0.0.0-20180408203800-02e4c2afbb2a h1:axFx97V2Lyke5LbeygrJlzc07mwVhHt2ZHeI/Nv8Aq4=
1919
github.com/lunixbochs/struc v0.0.0-20180408203800-02e4c2afbb2a/go.mod h1:iOJu9pApjjmEmNq7PqlA5R9mDu/HMF5EM3llWKX/TyA=
20+
github.com/lunixbochs/struc v0.0.0-20190916212049-a5c72983bc42 h1:PzBD7QuxXSgSu61TKXxRwVGzWO5d9QZ0HxFFpndZMCg=
21+
github.com/lunixbochs/struc v0.0.0-20190916212049-a5c72983bc42/go.mod h1:vy1vK6wD6j7xX6O6hXe621WabdtNkou2h7uRtTfRMyg=
2022
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
2123
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
2224
github.com/onsi/gomega v1.1.0 h1:e3YP4dN/HYPpGh29X1ZkcxcEICsOls9huyVCRBaxjq8=

proxy/client.go

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
1+
// Copyright (c) 2019 Cisco and/or its affiliates.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at:
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
package proxy
216

317
import (
418
"fmt"
5-
"log"
19+
"git.fd.io/govpp.git/core"
620
"net/rpc"
721
"reflect"
822
"time"
@@ -40,7 +54,8 @@ func (c *Client) NewStatsClient() (*StatsClient, error) {
4054
// NewBinapiClient returns new BinapiClient which implements api.Channel.
4155
func (c *Client) NewBinapiClient() (*BinapiClient, error) {
4256
binapi := &BinapiClient{
43-
rpc: c.rpc,
57+
rpc: c.rpc,
58+
timeout: core.DefaultReplyTimeout,
4459
}
4560
return binapi, nil
4661
}
@@ -103,27 +118,31 @@ func (s *StatsClient) GetBufferStats(bufStats *api.BufferStats) error {
103118
}
104119

105120
type BinapiClient struct {
106-
rpc *rpc.Client
121+
rpc *rpc.Client
122+
timeout time.Duration
107123
}
108124

109125
func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
110126
req := &requestCtx{
111-
rpc: b.rpc,
112-
req: msg,
127+
rpc: b.rpc,
128+
timeout: b.timeout,
129+
req: msg,
113130
}
114-
log.Printf("SendRequest: %T %+v", msg, msg)
131+
log.Debugf("SendRequest: %T %+v", msg, msg)
115132
return req
116133
}
117134

118135
type requestCtx struct {
119-
rpc *rpc.Client
120-
req api.Message
136+
rpc *rpc.Client
137+
req api.Message
138+
timeout time.Duration
121139
}
122140

123141
func (r *requestCtx) ReceiveReply(msg api.Message) error {
124142
req := BinapiRequest{
125143
Msg: r.req,
126144
ReplyMsg: msg,
145+
Timeout: r.timeout,
127146
}
128147
resp := BinapiResponse{}
129148

@@ -140,16 +159,18 @@ func (r *requestCtx) ReceiveReply(msg api.Message) error {
140159

141160
func (b *BinapiClient) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
142161
req := &multiRequestCtx{
143-
rpc: b.rpc,
144-
req: msg,
162+
rpc: b.rpc,
163+
timeout: b.timeout,
164+
req: msg,
145165
}
146-
log.Printf("SendMultiRequest: %T %+v", msg, msg)
166+
log.Debugf("SendMultiRequest: %T %+v", msg, msg)
147167
return req
148168
}
149169

150170
type multiRequestCtx struct {
151-
rpc *rpc.Client
152-
req api.Message
171+
rpc *rpc.Client
172+
req api.Message
173+
timeout time.Duration
153174

154175
index int
155176
replies []api.Message
@@ -162,6 +183,7 @@ func (r *multiRequestCtx) ReceiveReply(msg api.Message) (stop bool, err error) {
162183
Msg: r.req,
163184
ReplyMsg: msg,
164185
IsMulti: true,
186+
Timeout: r.timeout,
165187
}
166188
resp := BinapiResponse{}
167189

@@ -189,24 +211,23 @@ func (b *BinapiClient) SubscribeNotification(notifChan chan api.Message, event a
189211
}
190212

191213
func (b *BinapiClient) SetReplyTimeout(timeout time.Duration) {
192-
req := BinapiTimeoutRequest{Timeout: timeout}
193-
resp := BinapiTimeoutResponse{}
194-
if err := b.rpc.Call("BinapiRPC.SetTimeout", req, &resp); err != nil {
195-
log.Println(err)
196-
}
214+
b.timeout = timeout
197215
}
198216

199217
func (b *BinapiClient) CheckCompatiblity(msgs ...api.Message) error {
218+
msgNamesCrscs := make([]string, 0, len(msgs))
219+
200220
for _, msg := range msgs {
201-
req := BinapiCompatibilityRequest{
202-
MsgName: msg.GetMessageName(),
203-
Crc: msg.GetCrcString(),
204-
}
205-
resp := BinapiCompatibilityResponse{}
206-
if err := b.rpc.Call("BinapiRPC.Compatibility", req, &resp); err != nil {
207-
return err
208-
}
221+
msgNamesCrscs = append(msgNamesCrscs, msg.GetMessageName()+"_"+msg.GetCrcString())
209222
}
223+
224+
req := BinapiCompatibilityRequest{MsgNameCrcs: msgNamesCrscs}
225+
resp := BinapiCompatibilityResponse{}
226+
227+
if err := b.rpc.Call("BinapiRPC.Compatibility", req, &resp); err != nil {
228+
return err
229+
}
230+
210231
return nil
211232
}
212233

proxy/log.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package proxy
2+
3+
import (
4+
"github.com/sirupsen/logrus"
5+
"os"
6+
)
7+
8+
var (
9+
debug = os.Getenv("DEBUG_GOVPP_PROXY") != ""
10+
11+
log = logrus.New()
12+
)
13+
14+
func init() {
15+
log.Out = os.Stdout
16+
if debug {
17+
log.Level = logrus.DebugLevel
18+
log.Debugf("govpp/proxy: debug mode enabled")
19+
}
20+
}
21+
22+
// SetLogger sets global logger to l.
23+
func SetLogger(l *logrus.Logger) {
24+
log = l
25+
}
26+
27+
// SetLogLevel sets global logger level to lvl.
28+
func SetLogLevel(lvl logrus.Level) {
29+
log.Level = lvl
30+
}

0 commit comments

Comments
 (0)