@@ -8,10 +8,12 @@ import (
8
8
"context"
9
9
"net/url"
10
10
"sort"
11
+ "strconv"
11
12
12
13
"golang.org/x/xerrors"
13
14
"k8s.io/apimachinery/pkg/api/errors"
14
15
"k8s.io/apimachinery/pkg/runtime"
16
+ "k8s.io/apimachinery/pkg/util/uuid"
15
17
"k8s.io/client-go/tools/cache"
16
18
ctrl "sigs.k8s.io/controller-runtime"
17
19
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -20,6 +22,7 @@ import (
20
22
21
23
wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
22
24
"github.com/gitpod-io/gitpod/common-go/log"
25
+ "github.com/gitpod-io/gitpod/ws-manager/api"
23
26
wsapi "github.com/gitpod-io/gitpod/ws-manager/api"
24
27
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
25
28
"github.com/gitpod-io/gitpod/ws-proxy/pkg/common"
@@ -48,11 +51,19 @@ func getPortStr(urlStr string) string {
48
51
return portURL .Port ()
49
52
}
50
53
54
+ type ConnectionContext struct {
55
+ WorkspaceID string
56
+ Port string
57
+ UUID string
58
+ CancelFunc context.CancelCauseFunc
59
+ }
60
+
51
61
type CRDWorkspaceInfoProvider struct {
52
62
client.Client
53
63
Scheme * runtime.Scheme
54
64
55
- store cache.ThreadSafeStore
65
+ store cache.ThreadSafeStore
66
+ contextStore cache.ThreadSafeStore
56
67
}
57
68
58
69
// NewCRDWorkspaceInfoProvider creates a fresh WorkspaceInfoProvider.
@@ -67,12 +78,21 @@ func NewCRDWorkspaceInfoProvider(client client.Client, scheme *runtime.Scheme) (
67
78
return nil , xerrors .Errorf ("object is not a WorkspaceInfo" )
68
79
},
69
80
}
81
+ contextIndexers := cache.Indexers {
82
+ workspaceIndex : func (obj interface {}) ([]string , error ) {
83
+ if connCtx , ok := obj .(* ConnectionContext ); ok {
84
+ return []string {connCtx .WorkspaceID }, nil
85
+ }
86
+ return nil , xerrors .Errorf ("object is not a ConnectionContext" )
87
+ },
88
+ }
70
89
71
90
return & CRDWorkspaceInfoProvider {
72
91
Client : client ,
73
92
Scheme : scheme ,
74
93
75
- store : cache .NewThreadSafeStore (indexers , cache.Indices {}),
94
+ store : cache .NewThreadSafeStore (indexers , cache.Indices {}),
95
+ contextStore : cache .NewThreadSafeStore (contextIndexers , cache.Indices {}),
76
96
}, nil
77
97
}
78
98
@@ -101,6 +121,28 @@ func (r *CRDWorkspaceInfoProvider) WorkspaceInfo(workspaceID string) *common.Wor
101
121
return nil
102
122
}
103
123
124
+ func (r * CRDWorkspaceInfoProvider ) AcquireContext (ctx context.Context , workspaceID string , port string ) (context.Context , string , error ) {
125
+ ws := r .WorkspaceInfo (workspaceID )
126
+ if ws == nil {
127
+ return ctx , "" , xerrors .Errorf ("workspace %s not found" , workspaceID )
128
+ }
129
+ id := string (uuid .NewUUID ())
130
+ ctx , cancel := context .WithCancelCause (ctx )
131
+ connCtx := & ConnectionContext {
132
+ WorkspaceID : workspaceID ,
133
+ Port : port ,
134
+ CancelFunc : cancel ,
135
+ UUID : id ,
136
+ }
137
+
138
+ r .contextStore .Add (id , connCtx )
139
+ return ctx , id , nil
140
+ }
141
+
142
+ func (r * CRDWorkspaceInfoProvider ) ReleaseContext (id string ) {
143
+ r .contextStore .Delete (id )
144
+ }
145
+
104
146
func (r * CRDWorkspaceInfoProvider ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
105
147
var ws workspacev1.Workspace
106
148
err := r .Client .Get (context .Background (), req .NamespacedName , & ws )
@@ -162,11 +204,44 @@ func (r *CRDWorkspaceInfoProvider) Reconcile(ctx context.Context, req ctrl.Reque
162
204
}
163
205
164
206
r .store .Update (req .Name , wsinfo )
207
+ r .invalidateConnectionContext (wsinfo )
165
208
log .WithField ("workspace" , req .Name ).WithField ("details" , wsinfo ).Debug ("adding/updating workspace details" )
166
209
167
210
return ctrl.Result {}, nil
168
211
}
169
212
213
+ func (r * CRDWorkspaceInfoProvider ) invalidateConnectionContext (ws * common.WorkspaceInfo ) {
214
+ connCtxs , err := r .contextStore .ByIndex (workspaceIndex , ws .WorkspaceID )
215
+ if err != nil {
216
+ return
217
+ }
218
+ if len (connCtxs ) == 0 {
219
+ return
220
+ }
221
+
222
+ if ws .Auth != nil && ws .Auth .Admission == wsapi .AdmissionLevel_ADMIT_EVERYONE {
223
+ return
224
+ }
225
+ publicPorts := make (map [string ]struct {})
226
+ for _ , p := range ws .Ports {
227
+ if p .Visibility == api .PortVisibility_PORT_VISIBILITY_PUBLIC {
228
+ publicPorts [strconv .FormatUint (uint64 (p .Port ), 10 )] = struct {}{}
229
+ }
230
+ }
231
+
232
+ for _ , _connCtx := range connCtxs {
233
+ connCtx , ok := _connCtx .(* ConnectionContext )
234
+ if ! ok {
235
+ continue
236
+ }
237
+ if _ , ok := publicPorts [connCtx .Port ]; ok {
238
+ continue
239
+ }
240
+ connCtx .CancelFunc (xerrors .Errorf ("workspace %s is no longer public" , ws .WorkspaceID ))
241
+ r .contextStore .Delete (connCtx .UUID )
242
+ }
243
+ }
244
+
170
245
// SetupWithManager sets up the controller with the Manager.
171
246
func (r * CRDWorkspaceInfoProvider ) SetupWithManager (mgr ctrl.Manager ) error {
172
247
return ctrl .NewControllerManagedBy (mgr ).
@@ -177,28 +252,3 @@ func (r *CRDWorkspaceInfoProvider) SetupWithManager(mgr ctrl.Manager) error {
177
252
).
178
253
Complete (r )
179
254
}
180
-
181
- // CompositeInfoProvider checks each of its info providers and returns the first info found.
182
- type CompositeInfoProvider []common.WorkspaceInfoProvider
183
-
184
- func (c CompositeInfoProvider ) WorkspaceInfo (workspaceID string ) * common.WorkspaceInfo {
185
- for _ , ip := range c {
186
- res := ip .WorkspaceInfo (workspaceID )
187
- if res != nil {
188
- return res
189
- }
190
- }
191
- return nil
192
- }
193
-
194
- type fixedInfoProvider struct {
195
- Infos map [string ]* common.WorkspaceInfo
196
- }
197
-
198
- // WorkspaceInfo returns the workspace information of a workspace using it's workspace ID.
199
- func (fp * fixedInfoProvider ) WorkspaceInfo (workspaceID string ) * common.WorkspaceInfo {
200
- if fp .Infos == nil {
201
- return nil
202
- }
203
- return fp .Infos [workspaceID ]
204
- }
0 commit comments