4
4
"context"
5
5
"net"
6
6
"strings"
7
+ "sync"
7
8
8
9
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
9
10
"github.com/moby/buildkit/identity"
@@ -36,14 +37,16 @@ type Attachable interface {
36
37
37
38
// Session is a long running connection between client and a daemon
38
39
type Session struct {
39
- id string
40
- name string
41
- sharedKey string
42
- ctx context.Context
43
- cancelCtx func ()
44
- done chan struct {}
45
- grpcServer * grpc.Server
46
- conn net.Conn
40
+ mu sync.Mutex // synchronizes conn run and close
41
+ id string
42
+ name string
43
+ sharedKey string
44
+ ctx context.Context
45
+ cancelCtx func ()
46
+ done chan struct {}
47
+ grpcServer * grpc.Server
48
+ conn net.Conn
49
+ closeCalled bool
47
50
}
48
51
49
52
// NewSession returns a new long running session
@@ -99,6 +102,11 @@ func (s *Session) ID() string {
99
102
100
103
// Run activates the session
101
104
func (s * Session ) Run (ctx context.Context , dialer Dialer ) error {
105
+ s .mu .Lock ()
106
+ if s .closeCalled {
107
+ s .mu .Unlock ()
108
+ return nil
109
+ }
102
110
ctx , cancel := context .WithCancel (ctx )
103
111
s .cancelCtx = cancel
104
112
s .done = make (chan struct {})
@@ -118,22 +126,27 @@ func (s *Session) Run(ctx context.Context, dialer Dialer) error {
118
126
}
119
127
conn , err := dialer (ctx , "h2c" , meta )
120
128
if err != nil {
129
+ s .mu .Unlock ()
121
130
return errors .Wrap (err , "failed to dial gRPC" )
122
131
}
123
132
s .conn = conn
133
+ s .mu .Unlock ()
124
134
serve (ctx , s .grpcServer , conn )
125
135
return nil
126
136
}
127
137
128
138
// Close closes the session
129
139
func (s * Session ) Close () error {
140
+ s .mu .Lock ()
130
141
if s .cancelCtx != nil && s .done != nil {
131
142
if s .conn != nil {
132
143
s .conn .Close ()
133
144
}
134
145
s .grpcServer .Stop ()
135
146
<- s .done
136
147
}
148
+ s .closeCalled = true
149
+ s .mu .Unlock ()
137
150
return nil
138
151
}
139
152
0 commit comments