Skip to content

Commit 7395836

Browse files
committed
[ws-daemon] Dispose all handlers running triggered by the "Dispatch" abstraction (fueled by updates from a separate PodInformer)
1 parent db692ec commit 7395836

File tree

7 files changed

+119
-21
lines changed

7 files changed

+119
-21
lines changed

components/ws-daemon/pkg/cgroup/cgroup.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ func (host *PluginHost) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspa
7979
return xerrors.Errorf("no dispatch available")
8080
}
8181

82-
cgroupPath, err := disp.Runtime.ContainerCGroupPath(context.Background(), ws.ContainerID)
82+
cgroupPath, err := disp.Runtime.ContainerCGroupPath(ctx, ws.ContainerID)
8383
if err != nil {
84+
if err == context.Canceled {
85+
return nil
86+
}
8487
return xerrors.Errorf("cannot get cgroup path for container %s: %w", ws.ContainerID, err)
8588
}
8689

@@ -95,8 +98,11 @@ func (host *PluginHost) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspa
9598
if plg.Type() != host.CGroupVersion {
9699
continue
97100
}
101+
dispatch.GetDispatchWaitGroup(ctx).Add(1)
98102

99103
go func(plg Plugin) {
104+
defer dispatch.GetDispatchWaitGroup(ctx).Done()
105+
100106
err := plg.Apply(ctx, opts)
101107
if err == context.Canceled || err == context.DeadlineExceeded {
102108
err = nil

components/ws-daemon/pkg/controller/workspace_operations.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/gitpod-io/gitpod/content-service/pkg/logs"
2222
"github.com/gitpod-io/gitpod/content-service/pkg/storage"
2323
"github.com/gitpod-io/gitpod/ws-daemon/pkg/content"
24+
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
2425
"github.com/gitpod-io/gitpod/ws-daemon/pkg/internal/session"
2526
"github.com/opentracing/opentracing-go"
2627
"github.com/prometheus/client_golang/prometheus"
@@ -81,6 +82,7 @@ type DefaultWorkspaceOperations struct {
8182
provider *WorkspaceProvider
8283
backupWorkspaceLimiter chan struct{}
8384
metrics *Metrics
85+
dispatch *dispatch.Dispatch
8486
}
8587

8688
var _ WorkspaceOperations = (*DefaultWorkspaceOperations)(nil)
@@ -106,7 +108,7 @@ type BackupOptions struct {
106108
SkipBackupContent bool
107109
}
108110

109-
func NewWorkspaceOperations(config content.Config, provider *WorkspaceProvider, reg prometheus.Registerer) (WorkspaceOperations, error) {
111+
func NewWorkspaceOperations(config content.Config, provider *WorkspaceProvider, reg prometheus.Registerer, dispatch *dispatch.Dispatch) (WorkspaceOperations, error) {
110112
waitingTimeHist, waitingTimeoutCounter, err := registerConcurrentBackupMetrics(reg, "_mk2")
111113
if err != nil {
112114
return nil, err
@@ -121,6 +123,7 @@ func NewWorkspaceOperations(config content.Config, provider *WorkspaceProvider,
121123
},
122124
// we permit five concurrent backups at any given time, hence the five in the channel
123125
backupWorkspaceLimiter: make(chan struct{}, 5),
126+
dispatch: dispatch,
124127
}, nil
125128
}
126129

@@ -185,6 +188,8 @@ func (wso *DefaultWorkspaceOperations) InitWorkspace(ctx context.Context, option
185188
return "cannot persist workspace", err
186189
}
187190

191+
glog.WithFields(ws.OWI()).Debug("content init done")
192+
188193
return "", nil
189194
}
190195

@@ -305,6 +310,10 @@ func (wso *DefaultWorkspaceOperations) WipeWorkspace(ctx context.Context, instan
305310
return err
306311
}
307312

313+
// dispose all running "dispatch handlers", e.g. all code running on the "pod informer"-triggered part of ws-daemon
314+
wso.dispatch.DisposeWorkspace(ctx, instanceID)
315+
316+
// remove the reference from the WorkspaceProvider, e.g. the "workspace controller" part of ws-daemon
308317
wso.provider.Remove(ctx, instanceID)
309318

310319
return nil

components/ws-daemon/pkg/cpulimit/dispatch.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,11 @@ func (d *DispatchListener) WorkspaceAdded(ctx context.Context, ws *dispatch.Work
179179
return xerrors.Errorf("no dispatch available")
180180
}
181181

182-
cgroupPath, err := disp.Runtime.ContainerCGroupPath(context.Background(), ws.ContainerID)
182+
cgroupPath, err := disp.Runtime.ContainerCGroupPath(ctx, ws.ContainerID)
183183
if err != nil {
184+
if dispatch.IsCancelled(ctx) {
185+
return nil
186+
}
184187
return xerrors.Errorf("cannot start governer: %w", err)
185188
}
186189

@@ -194,7 +197,11 @@ func (d *DispatchListener) WorkspaceAdded(ctx context.Context, ws *dispatch.Work
194197
OWI: ws.OWI(),
195198
Annotations: ws.Pod.Annotations,
196199
}
200+
201+
dispatch.GetDispatchWaitGroup(ctx).Add(1)
197202
go func() {
203+
defer dispatch.GetDispatchWaitGroup(ctx).Done()
204+
198205
<-ctx.Done()
199206

200207
d.mu.Lock()

components/ws-daemon/pkg/daemon/daemon.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,12 @@ func NewDaemon(config Config) (*Daemon, error) {
208208
config.CPULimit.CGroupBasePath,
209209
)
210210

211-
workspaceOps, err := controller.NewWorkspaceOperations(contentCfg, controller.NewWorkspaceProvider(contentCfg.WorkingArea, hooks), wrappedReg)
211+
dsptch, err := dispatch.NewDispatch(containerRuntime, clientset, config.Runtime.KubernetesNamespace, nodename, listener...)
212+
if err != nil {
213+
return nil, err
214+
}
215+
216+
workspaceOps, err := controller.NewWorkspaceOperations(contentCfg, controller.NewWorkspaceProvider(contentCfg.WorkingArea, hooks), wrappedReg, dsptch)
212217
if err != nil {
213218
return nil, err
214219
}
@@ -233,11 +238,6 @@ func NewDaemon(config Config) (*Daemon, error) {
233238
housekeeping := controller.NewHousekeeping(contentCfg.WorkingArea, 5*time.Minute)
234239
go housekeeping.Start(context.Background())
235240

236-
dsptch, err := dispatch.NewDispatch(containerRuntime, clientset, config.Runtime.KubernetesNamespace, nodename, listener...)
237-
if err != nil {
238-
return nil, err
239-
}
240-
241241
dsk := diskguard.FromConfig(config.DiskSpaceGuard, clientset, nodename)
242242

243243
return &Daemon{

components/ws-daemon/pkg/daemon/markunmount.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,24 @@ func (c *MarkUnmountFallback) WorkspaceUpdated(ctx context.Context, ws *dispatch
101101
}
102102
ttl := time.Duration(gracePeriod)*time.Second + propagationGracePeriod
103103

104+
dispatch.GetDispatchWaitGroup(ctx).Add(1)
104105
go func() {
105-
time.Sleep(ttl)
106+
defer dispatch.GetDispatchWaitGroup(ctx).Done()
107+
108+
defer func() {
109+
// We expect the container to be gone now. Don't keep its referenec in memory.
110+
c.mu.Lock()
111+
delete(c.handled, ws.InstanceID)
112+
c.mu.Unlock()
113+
}()
114+
115+
wait := time.NewTicker(ttl)
116+
defer wait.Stop()
117+
select {
118+
case <-ctx.Done():
119+
return
120+
case <-wait.C:
121+
}
106122

107123
dsp := dispatch.GetFromContext(ctx)
108124
if !dsp.WorkspaceExistsOnNode(ws.InstanceID) {
@@ -111,17 +127,12 @@ func (c *MarkUnmountFallback) WorkspaceUpdated(ctx context.Context, ws *dispatch
111127
}
112128

113129
err := unmountMark(ws.InstanceID)
114-
if err != nil {
130+
if err != nil && !dispatch.IsCancelled(ctx) {
115131
log.WithFields(ws.OWI()).WithError(err).Error("cannot unmount mark mount from within ws-daemon")
116132
c.activityCounter.WithLabelValues("false").Inc()
117133
} else {
118134
c.activityCounter.WithLabelValues("true").Inc()
119135
}
120-
121-
// We expect the container to be gone now. Don't keep its referenec in memory.
122-
c.mu.Lock()
123-
delete(c.handled, ws.InstanceID)
124-
c.mu.Unlock()
125136
}()
126137

127138
return nil

components/ws-daemon/pkg/dispatch/dispatch.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ type workspaceState struct {
8686
Context context.Context
8787
Cancel context.CancelFunc
8888
Workspace *Workspace
89+
90+
// this WaitGroup keeps track of when each handler is finished. It's only relied upon in DisposeWorkspace() to determine when work on a given instanceID has commenced.
91+
HandlerWaitGroup sync.WaitGroup
8992
}
9093

9194
type contextKey struct{}
@@ -99,6 +102,20 @@ func GetFromContext(ctx context.Context) *Dispatch {
99102
return ctx.Value(contextDispatch).(*Dispatch)
100103
}
101104

105+
type dispacthHandlerWaitGroupKey struct{}
106+
107+
var (
108+
contextDispatchWaitGroup = dispacthHandlerWaitGroupKey{}
109+
)
110+
111+
func GetDispatchWaitGroup(ctx context.Context) *sync.WaitGroup {
112+
return ctx.Value(contextDispatchWaitGroup).(*sync.WaitGroup)
113+
}
114+
115+
func IsCancelled(ctx context.Context) bool {
116+
return context.Cause(ctx) != nil
117+
}
118+
102119
// Start starts the dispatch
103120
func (d *Dispatch) Start() error {
104121
ifac := informers.NewSharedInformerFactoryWithOptions(d.Kubernetes, podInformerResyncInterval, informers.WithNamespace(d.KubernetesNamespace))
@@ -170,6 +187,29 @@ func (d *Dispatch) WorkspaceExistsOnNode(instanceID string) (ok bool) {
170187
return
171188
}
172189

190+
// DisposeWorkspace makes sure
191+
func (d *Dispatch) DisposeWorkspace(ctx context.Context, instanceID string) {
192+
d.mu.Lock()
193+
defer d.mu.Unlock()
194+
195+
log.WithField("instanceID", instanceID).Debugf("WS DISPOSE: %s", instanceID)
196+
197+
// If we have that instanceID present, cancel it's context
198+
state, present := d.ctxs[instanceID]
199+
if !present {
200+
return
201+
}
202+
if state.Cancel != nil {
203+
state.Cancel()
204+
}
205+
206+
// ...and wait for all long-running/async processes/go-routines to finish
207+
state.HandlerWaitGroup.Wait()
208+
209+
delete(d.ctxs, instanceID)
210+
log.WithField("instanceID", instanceID).Debugf("WS DISPOSE DONE: %s", instanceID)
211+
}
212+
173213
func (d *Dispatch) handlePodUpdate(oldPod, newPod *corev1.Pod) {
174214
workspaceID, ok := newPod.Labels[wsk8s.MetaIDLabel]
175215
if !ok {
@@ -182,6 +222,7 @@ func (d *Dispatch) handlePodUpdate(oldPod, newPod *corev1.Pod) {
182222
if d.NodeName != "" && newPod.Spec.NodeName != d.NodeName {
183223
return
184224
}
225+
log.WithField("instanceID", workspaceInstanceID).Debugf("POD UPDATE: %s", workspaceInstanceID)
185226

186227
d.mu.Lock()
187228
defer d.mu.Unlock()
@@ -190,19 +231,21 @@ func (d *Dispatch) handlePodUpdate(oldPod, newPod *corev1.Pod) {
190231
if !ok {
191232
// we haven't seen this pod before - add it, and wait for the container
192233
owi := wsk8s.GetOWIFromObject(&newPod.ObjectMeta)
193-
d.ctxs[workspaceInstanceID] = &workspaceState{
234+
s := &workspaceState{
194235
WorkspaceAdded: false,
195236
Workspace: &Workspace{
196237
InstanceID: workspaceInstanceID,
197238
WorkspaceID: workspaceID,
198239
Pod: newPod,
199240
},
200241
}
242+
d.ctxs[workspaceInstanceID] = s
201243

202-
// Important!!!!: ideally this timeout must be equal to ws-manager https://github.com/gitpod-io/gitpod/blob/main/components/ws-manager/pkg/manager/manager.go#L171
203-
waitForPodCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
204244
containerCtx, containerCtxCancel := context.WithCancel(context.Background())
205245
containerCtx = context.WithValue(containerCtx, contextDispatch, d)
246+
containerCtx = context.WithValue(containerCtx, contextDispatchWaitGroup, &s.HandlerWaitGroup)
247+
// Important!!!!: ideally this timeout must be equal to ws-manager https://github.com/gitpod-io/gitpod/blob/main/components/ws-manager/pkg/manager/manager.go#L171
248+
waitForPodCtx, cancel := context.WithTimeout(containerCtx, 10*time.Minute)
206249
go func() {
207250
containerID, err := d.Runtime.WaitForContainer(waitForPodCtx, workspaceInstanceID)
208251
if err != nil && err != context.Canceled {
@@ -217,12 +260,19 @@ func (d *Dispatch) handlePodUpdate(oldPod, newPod *corev1.Pod) {
217260
d.mu.Unlock()
218261
return
219262
}
263+
// Only register with the WaitGroup _after_ acquiring the lock to avoid DeadLocks
264+
s.HandlerWaitGroup.Add(1)
265+
defer s.HandlerWaitGroup.Done()
266+
220267
s.Context = containerCtx
221-
s.Cancel = containerCtxCancel
268+
s.Cancel = sync.OnceFunc(containerCtxCancel)
222269
s.Workspace.ContainerID = containerID
223270

224271
for _, l := range d.Listener {
272+
s.HandlerWaitGroup.Add(1)
225273
go func(listener Listener) {
274+
defer s.HandlerWaitGroup.Done()
275+
226276
err := listener.WorkspaceAdded(containerCtx, s.Workspace)
227277
if err != nil {
228278
log.WithError(err).WithFields(owi).Error("dispatch listener failed")
@@ -259,20 +309,25 @@ func (d *Dispatch) handlePodUpdate(oldPod, newPod *corev1.Pod) {
259309
continue
260310
}
261311

312+
state.HandlerWaitGroup.Add(1)
262313
go func() {
314+
defer state.HandlerWaitGroup.Done()
315+
263316
err := lu.WorkspaceUpdated(state.Context, state.Workspace)
264317
if err != nil {
265318
log.WithError(err).WithFields(wsk8s.GetOWIFromObject(&oldPod.ObjectMeta)).Error("dispatch listener failed")
266319
}
267320
}()
268321
}
322+
log.WithField("instanceID", workspaceInstanceID).Debugf("POD UPDATE DONE: %s", workspaceInstanceID)
269323
}
270324

271325
func (d *Dispatch) handlePodDeleted(pod *corev1.Pod) {
272326
instanceID, ok := pod.Labels[wsk8s.WorkspaceIDLabel]
273327
if !ok {
274328
return
275329
}
330+
log.WithField("instanceID", instanceID).Debugf("POD DELETED: %s", instanceID)
276331

277332
d.mu.Lock()
278333
defer d.mu.Unlock()
@@ -286,4 +341,5 @@ func (d *Dispatch) handlePodDeleted(pod *corev1.Pod) {
286341
state.Cancel()
287342
}
288343
delete(d.ctxs, instanceID)
344+
log.WithField("instanceID", instanceID).Debugf("POD DELETED DONE: %s", instanceID)
289345
}

components/ws-daemon/pkg/netlimit/netlimit.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,11 @@ func (c *ConnLimiter) limitWorkspace(ctx context.Context, ws *dispatch.Workspace
128128
return fmt.Errorf("no dispatch available")
129129
}
130130

131-
pid, err := disp.Runtime.ContainerPID(context.Background(), ws.ContainerID)
131+
pid, err := disp.Runtime.ContainerPID(ctx, ws.ContainerID)
132132
if err != nil {
133+
if dispatch.IsCancelled(ctx) {
134+
return nil
135+
}
133136
return fmt.Errorf("could not get pid for container %s of workspace %s", ws.ContainerID, ws.WorkspaceID)
134137
}
135138

@@ -141,12 +144,18 @@ func (c *ConnLimiter) limitWorkspace(ctx context.Context, ws *dispatch.Workspace
141144
}
142145
}, nsinsider.EnterMountNS(false), nsinsider.EnterNetNS(true))
143146
if err != nil {
147+
if dispatch.IsCancelled(ctx) {
148+
return nil
149+
}
144150
log.WithError(err).WithFields(ws.OWI()).Error("cannot enable connection limiting")
145151
return err
146152
}
147153
c.limited[ws.InstanceID] = struct{}{}
148154

155+
dispatch.GetDispatchWaitGroup(ctx).Add(1)
149156
go func(*dispatch.Workspace) {
157+
defer dispatch.GetDispatchWaitGroup(ctx).Done()
158+
150159
ticker := time.NewTicker(30 * time.Second)
151160
defer ticker.Stop()
152161

0 commit comments

Comments
 (0)