Skip to content

Commit 39c00bc

Browse files
feat(pro): cancel active up tasks if new one is started
1 parent 6cfb763 commit 39c00bc

File tree

5 files changed

+63
-86
lines changed

5 files changed

+63
-86
lines changed

cmd/up.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1430,7 +1430,7 @@ func WithSignals(ctx context.Context) (context.Context, func()) {
14301430
signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT)
14311431
go func() {
14321432
select {
1433-
case _ = <-signals:
1433+
case <-signals:
14341434
cancel()
14351435
case <-ctx.Done():
14361436
}

desktop/src/client/command.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ export class Command implements TCommand<ChildProcess<string>> {
200200
}),
201201
})
202202

203-
await sleep(2_000)
203+
await sleep(3_000)
204204
// the actual child process could be gone after sending a SIGINT
205205
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
206206
if (this.childProcess) {

pkg/client/clientimplementation/daemonclient/client.go

Lines changed: 3 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"fmt"
77
"io"
88
"net/netip"
9-
"os"
10-
"path/filepath"
119
"strings"
1210
"sync"
1311
"time"
@@ -62,8 +60,7 @@ func New(devPodConfig *config.Config, prov *provider.ProviderConfig, workspace *
6260
type client struct {
6361
m sync.Mutex
6462

65-
workspaceLockOnce sync.Once
66-
workspaceLock *flock.Flock
63+
workspaceLock *flock.Flock
6764

6865
devPodConfig *config.Config
6966
config *provider.ProviderConfig
@@ -74,27 +71,12 @@ type client struct {
7471
}
7572

7673
func (c *client) Lock(ctx context.Context) error {
77-
c.initLock()
78-
79-
// try to lock workspace
80-
c.log.Debugf("Acquire workspace lock...")
81-
err := tryLock(ctx, c.workspaceLock, "workspace", c.log)
82-
if err != nil {
83-
return fmt.Errorf("error locking workspace: %w", err)
84-
}
85-
c.log.Debugf("Acquired workspace lock...")
86-
74+
// noop
8775
return nil
8876
}
8977

9078
func (c *client) Unlock() {
91-
c.initLock()
92-
93-
// try to unlock workspace
94-
err := c.workspaceLock.Unlock()
95-
if err != nil {
96-
c.log.Warnf("Error unlocking workspace: %v", err)
97-
}
79+
// noop
9880
}
9981

10082
func (c *client) Provider() string {
@@ -207,23 +189,6 @@ func (c *client) DirectTunnel(ctx context.Context, stdin io.Reader, stdout io.Wr
207189
}
208190
}
209191

210-
func (c *client) initLock() {
211-
c.workspaceLockOnce.Do(func() {
212-
c.m.Lock()
213-
defer c.m.Unlock()
214-
215-
// get locks dir
216-
workspaceLocksDir, err := provider.GetLocksDir(c.workspace.Context)
217-
if err != nil {
218-
panic(fmt.Errorf("get workspaces dir: %w", err))
219-
}
220-
_ = os.MkdirAll(workspaceLocksDir, 0777)
221-
222-
// create workspace lock
223-
c.workspaceLock = flock.New(filepath.Join(workspaceLocksDir, c.workspace.ID+".workspace.lock"))
224-
})
225-
}
226-
227192
func (c *client) Ping(ctx context.Context, writer io.Writer) error {
228193
wAddr, err := c.getWorkspaceAddress()
229194
if err != nil {
@@ -291,43 +256,3 @@ func (c *client) getWorkspaceAddress() (ts.Addr, error) {
291256

292257
return ts.NewAddr(ts.GetWorkspaceHostname(c.workspace.Pro.InstanceName, c.workspace.Pro.Project), sshServer.DefaultUserPort), nil
293258
}
294-
295-
func printLogMessagePeriodically(message string, log log.Logger) chan struct{} {
296-
done := make(chan struct{})
297-
go func() {
298-
for {
299-
select {
300-
case <-done:
301-
return
302-
case <-time.After(time.Second * 5):
303-
log.Info(message)
304-
}
305-
}
306-
}()
307-
308-
return done
309-
}
310-
311-
func tryLock(ctx context.Context, lock *flock.Flock, name string, log log.Logger) error {
312-
done := printLogMessagePeriodically(fmt.Sprintf("Trying to lock %s, seems like another process is running that blocks this %s", name, name), log)
313-
defer close(done)
314-
315-
now := time.Now()
316-
for time.Since(now) < time.Minute*5 {
317-
locked, err := lock.TryLock()
318-
if err != nil {
319-
return err
320-
} else if locked {
321-
return nil
322-
}
323-
324-
select {
325-
case <-time.After(time.Second):
326-
continue
327-
case <-ctx.Done():
328-
return ctx.Err()
329-
}
330-
}
331-
332-
return fmt.Errorf("timed out waiting to lock %s, seems like there is another process running on this machine that blocks it", name)
333-
}

pkg/client/clientimplementation/daemonclient/up.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,24 @@ func (c *client) Up(ctx context.Context, opt clientpkg.UpOptions) (*config.Resul
5959
return nil, fmt.Errorf("error getting management client: %w", err)
6060
}
6161

62+
// prompt user to attach to active task or start new one
63+
c.log.Debug("Check active up task")
64+
activeUpTask, err := findActiveUpTask(ctx, managementClient, instance)
65+
if err != nil {
66+
return nil, fmt.Errorf("find active up task: %w", err)
67+
}
68+
69+
// if we have an active up task, cancel it before creating a new one
70+
if activeUpTask != nil {
71+
c.log.Warnf("Found active up task %s, attempting to cancel it", activeUpTask.ID)
72+
_, err = managementClient.Loft().ManagementV1().DevPodWorkspaceInstances(instance.Namespace).Cancel(ctx, instance.Name, &managementv1.DevPodWorkspaceInstanceCancel{
73+
TaskID: activeUpTask.ID,
74+
}, metav1.CreateOptions{})
75+
if err != nil {
76+
return nil, fmt.Errorf("cancel task: %w", err)
77+
}
78+
}
79+
6280
// create up task
6381
task, err := managementClient.Loft().ManagementV1().DevPodWorkspaceInstances(instance.Namespace).Up(ctx, instance.Name, &managementv1.DevPodWorkspaceInstanceUp{
6482
Spec: managementv1.DevPodWorkspaceInstanceUpSpec{
@@ -72,7 +90,11 @@ func (c *client) Up(ctx context.Context, opt clientpkg.UpOptions) (*config.Resul
7290
return nil, fmt.Errorf("no up task id returned from server")
7391
}
7492

75-
exitCode, err := observeTask(ctx, managementClient, instance, task.Status.TaskID, c.log)
93+
return waitTaskDone(ctx, managementClient, instance, task.Status.TaskID, c.log)
94+
}
95+
96+
func waitTaskDone(ctx context.Context, managementClient kube.Interface, instance *managementv1.DevPodWorkspaceInstance, taskID string, log log.Logger) (*config.Result, error) {
97+
exitCode, err := observeTask(ctx, managementClient, instance, taskID, log)
7698
if err != nil {
7799
return nil, fmt.Errorf("up: %w", err)
78100
} else if exitCode != 0 {
@@ -87,7 +109,7 @@ func (c *client) Up(ctx context.Context, opt clientpkg.UpOptions) (*config.Resul
87109
Name(instance.Name).
88110
SubResource("tasks").
89111
VersionedParams(&managementv1.DevPodWorkspaceInstanceTasksOptions{
90-
TaskID: task.Status.TaskID,
112+
TaskID: taskID,
91113
}, builders.ParameterCodec).
92114
Do(ctx).
93115
Into(tasks)
@@ -267,3 +289,33 @@ func printLogs(ctx context.Context, managementClient kube.Interface, workspace *
267289

268290
return 0, nil
269291
}
292+
293+
const (
294+
TaskStatusRunning = "Running"
295+
TaskStatusSucceed = "Succeeded"
296+
TaskStatusFailed = "Failed"
297+
)
298+
const (
299+
TaskTypeUp = "up"
300+
TaskTypeStop = "stop"
301+
TaskTypeDelete = "delete"
302+
)
303+
304+
func findActiveUpTask(ctx context.Context, managementClient kube.Interface, instance *managementv1.DevPodWorkspaceInstance) (*managementv1.DevPodWorkspaceInstanceTask, error) {
305+
tasks := &managementv1.DevPodWorkspaceInstanceTasks{}
306+
err := managementClient.Loft().ManagementV1().RESTClient().Get().
307+
Namespace(instance.Namespace).
308+
Resource("devpodworkspaceinstances").
309+
Name(instance.Name).
310+
SubResource("tasks").
311+
Do(ctx).
312+
Into(tasks)
313+
314+
for _, task := range tasks.Tasks {
315+
if task.Status == TaskStatusRunning && task.Type == TaskTypeUp {
316+
return &task, nil
317+
}
318+
}
319+
320+
return nil, err
321+
}

pkg/ts/workspace_server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,20 +377,20 @@ func (s *WorkspaceServer) sendHeartbeat(ctx context.Context, client *http.Client
377377

378378
discoveredRunner, err := s.discoverRunner(ctx, lc)
379379
if err != nil {
380-
return fmt.Errorf("failed to discover runner: %v", err)
380+
return fmt.Errorf("failed to discover runner: %w", err)
381381
}
382382

383383
heartbeatURL := fmt.Sprintf("http://%s.ts.loft/devpod/%s/%s/heartbeat", discoveredRunner, projectName, workspaceName)
384384
s.log.Infof("Sending heartbeat to %s, because there are %d active connections", heartbeatURL, connections)
385385
req, err := http.NewRequestWithContext(ctx, "GET", heartbeatURL, nil)
386386
if err != nil {
387-
return fmt.Errorf("failed to create request for %s: %v", heartbeatURL, err)
387+
return fmt.Errorf("failed to create request for %s: %w", heartbeatURL, err)
388388
}
389389

390390
req.Header.Set("Authorization", "Bearer "+s.config.AccessKey)
391391
resp, err := client.Do(req)
392392
if err != nil {
393-
return fmt.Errorf("request to %s failed: %v", heartbeatURL, err)
393+
return fmt.Errorf("request to %s failed: %w", heartbeatURL, err)
394394
}
395395
defer resp.Body.Close()
396396

@@ -405,7 +405,7 @@ func (s *WorkspaceServer) sendHeartbeat(ctx context.Context, client *http.Client
405405
func (s *WorkspaceServer) discoverRunner(ctx context.Context, lc *tailscale.LocalClient) (string, error) {
406406
status, err := lc.Status(ctx)
407407
if err != nil {
408-
return "", fmt.Errorf("failed to get status: %v", err)
408+
return "", fmt.Errorf("failed to get status: %w", err)
409409
}
410410

411411
var runner string

0 commit comments

Comments
 (0)