@@ -118,6 +118,11 @@ type handling struct {
118
118
type pendingMap map [ID ]chan * Response
119
119
type handlingMap map [ID ]handling
120
120
121
+ var (
122
+ errLoadPendingMap = xerrors .New ("failed to Load pendingMap" )
123
+ errLoadhandlingMap = xerrors .New ("failed to Load handlingMap" )
124
+ )
125
+
121
126
// NewConn creates a new connection object that reads and writes messages from
122
127
// the supplied stream and dispatches incoming messages to the supplied handler.
123
128
func NewConn (ctx context.Context , s Stream , options ... Options ) * Conn {
@@ -170,11 +175,17 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
170
175
}
171
176
172
177
rchan := make (chan * Response )
173
- m := c .pending .Load ().(pendingMap )
178
+ m , ok := c .pending .Load ().(pendingMap )
179
+ if ! ok {
180
+ return errLoadPendingMap
181
+ }
174
182
m [id ] = rchan
175
183
c .pending .Store (m )
176
184
defer func () {
177
- m := c .pending .Load ().(pendingMap )
185
+ m , ok := c .pending .Load ().(pendingMap )
186
+ if ! ok {
187
+ panic (errLoadPendingMap )
188
+ }
178
189
delete (m , id )
179
190
c .pending .Store (m )
180
191
}()
@@ -221,7 +232,10 @@ func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err
221
232
return xerrors .New ("reply not invoked with a valid call" )
222
233
}
223
234
224
- m := c .handling .Load ().(handlingMap )
235
+ m , ok := c .handling .Load ().(handlingMap )
236
+ if ! ok {
237
+ return errLoadhandlingMap
238
+ }
225
239
handling , found := m [* req .ID ]
226
240
if ! found {
227
241
return xerrors .Errorf ("not a call in progress: %v" , req .ID )
@@ -254,7 +268,8 @@ func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err
254
268
zap .Any ("resp.Result" , resp .Result ),
255
269
zap .Error (resp .Error ),
256
270
)
257
- if err = c .stream .Write (ctx , data ); err != nil {
271
+
272
+ if err := c .stream .Write (ctx , data ); err != nil {
258
273
return err
259
274
}
260
275
@@ -287,7 +302,10 @@ func (c *Conn) Notify(ctx context.Context, method string, params interface{}) er
287
302
288
303
// Cancel cancels a pending Call on the server side.
289
304
func (c * Conn ) Cancel (id ID ) {
290
- m := c .handling .Load ().(handlingMap )
305
+ m , ok := c .handling .Load ().(handlingMap )
306
+ if ! ok {
307
+ panic (errLoadhandlingMap )
308
+ }
291
309
handling , found := m [id ]
292
310
if found {
293
311
handling .cancel ()
@@ -374,7 +392,11 @@ func (c *Conn) Run(ctx context.Context) error {
374
392
} else {
375
393
// we have a Call, add to the processor queue
376
394
reqCtx , reqCancel := context .WithCancel (ctx )
377
- m := c .handling .Load ().(handlingMap )
395
+ defer reqCancel ()
396
+ m , ok := c .handling .Load ().(handlingMap )
397
+ if ! ok {
398
+ return errLoadhandlingMap
399
+ }
378
400
m [* req .ID ] = handling {
379
401
request : req ,
380
402
cancel : reqCancel ,
@@ -390,7 +412,10 @@ func (c *Conn) Run(ctx context.Context) error {
390
412
391
413
case msg .ID != nil :
392
414
// we have a response, get the pending entry from the map
393
- m := c .pending .Load ().(pendingMap )
415
+ m , ok := c .handling .Load ().(pendingMap )
416
+ if ! ok {
417
+ return errLoadPendingMap
418
+ }
394
419
rchan := m [* msg .ID ]
395
420
if rchan != nil {
396
421
delete (m , * msg .ID )
0 commit comments