Skip to content

Commit 0d9cc27

Browse files
[Feature]: Aborting jobs on synapse (#203)
* add build abort functionality for synapse * uncomment auto remove for removing container * remove not required log msg * correct comments * fix comment on processAbortBuild * fix typo * apply suggestion on comment from code review Co-authored-by: Vikrant Kumar Sinha <[email protected]> Co-authored-by: Vikrant Kumar Sinha <[email protected]>
1 parent 9506bd2 commit 0d9cc27

File tree

4 files changed

+43
-1
lines changed

4 files changed

+43
-1
lines changed

pkg/core/runner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ type DockerRunner interface {
6464
// KillRunningDocker kills container spawn by synapse
6565
KillRunningDocker(ctx context.Context)
6666

67+
// KillContainerForBuildID kills synapse container which is running for given buildID
68+
KillContainerForBuildID(buildID string) error
69+
6770
CreateVolume(ctx context.Context, r *RunnerOptions) error
6871

6972
// RemoveOldVolumes removes volumes that are older than X hours

pkg/core/wsproto.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ const (
1818
MsgError MessageType = "error"
1919
MsgResourceStats MessageType = "resourcestats"
2020
MsgJobInfo MessageType = "jobinfo"
21+
MsgBuildAbort MessageType = "build_abort"
2122
)
2223

2324
// JobInfo types
2425
const (
2526
JobCompleted StatusType = "complete"
2627
JobStarted StatusType = "started"
2728
JobFailed StatusType = "failed"
29+
JobAborted StatusType = "aborted"
2830
)
2931

3032
// ResourceStats types
@@ -66,3 +68,8 @@ type JobInfo struct {
6668
BuildID string `json:"build_id"`
6769
Message string `json:"message"`
6870
}
71+
72+
// BuildAbortMsg struct defines message for aborting a build
73+
type BuildAbortMsg struct {
74+
BuildID string `json:"build_id"`
75+
}

pkg/runner/docker/docker.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
const (
3232
buildCacheExpiry time.Duration = 4 * time.Hour
33+
BuildID = "build-id"
3334
)
3435

3536
var gracefulyContainerStopDuration = time.Second * 10
@@ -337,6 +338,19 @@ func (d *docker) KillRunningDocker(ctx context.Context) {
337338
}
338339
}
339340

341+
func (d *docker) KillContainerForBuildID(buildID string) error {
342+
for _, r := range d.RunningContainers {
343+
if r.Label[BuildID] == buildID {
344+
if err := d.Destroy(context.Background(), r); err != nil {
345+
d.logger.Errorf("error while destroying container: %v", err)
346+
return err
347+
}
348+
return nil
349+
}
350+
}
351+
return nil
352+
}
353+
340354
func (d *docker) PullImage(containerImageConfig *core.ContainerImageConfig, r *core.RunnerOptions) error {
341355
if containerImageConfig.PullPolicy == config.PullNever && r.PodType == core.NucleusPod {
342356
d.logger.Infof("pull policy %s pod type %s, not pulling any image",

pkg/synapse/synapse.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const (
2828
duplicateConnectionSleepDuration = 15 * time.Second
2929
)
3030

31+
var buildAbortMap = make(map[string]bool)
32+
3133
type synapse struct {
3234
conn *websocket.Conn
3335
runner core.DockerRunner
@@ -46,7 +48,6 @@ func New(
4648
logger lumber.Logger,
4749
secretsManager core.SecretsManager,
4850
) core.SynapseManager {
49-
5051
return &synapse{
5152
runner: runner,
5253
logger: logger,
@@ -215,6 +216,9 @@ func (s *synapse) processMessage(msg []byte, duplicateConnectionChan chan struct
215216
case core.MsgTask:
216217
s.logger.Debugf("task message received from server")
217218
go s.processTask(message)
219+
case core.MsgBuildAbort:
220+
s.logger.Debugf("abort-build message received from server")
221+
go s.processAbortBuild(message)
218222
default:
219223
s.logger.Errorf("message type not found")
220224
}
@@ -232,6 +236,17 @@ func (s *synapse) processErrorMessage(message core.Message, duplicateConnectionC
232236
}
233237
}
234238

239+
// processAbortBuild handles aborting a running build
240+
func (s *synapse) processAbortBuild(message core.Message) {
241+
buildID := string(message.Content)
242+
buildAbortMap[buildID] = true
243+
s.logger.Debugf("message received to abort build %s", buildID)
244+
if err := s.runner.KillContainerForBuildID(buildID); err != nil {
245+
s.logger.Errorf("error while terminating container for buildID: %s, error: %v", buildID, err)
246+
return
247+
}
248+
}
249+
235250
// processTask handles task type message
236251
func (s *synapse) processTask(message core.Message) {
237252
var runnerOpts core.RunnerOptions
@@ -270,6 +285,9 @@ func (s *synapse) runAndUpdateJobStatus(runnerOpts *core.RunnerOptions) {
270285
if status.Done {
271286
jobStatus = core.JobCompleted
272287
}
288+
if buildAbortMap[runnerOpts.Label[BuildID]] {
289+
jobStatus = core.JobAborted
290+
}
273291
jobInfo := CreateJobInfo(jobStatus, runnerOpts, status.Error.Message)
274292
s.logger.Infof("Sending update to neuron %+v", jobInfo)
275293
resourceStatsMessage := CreateJobUpdateMessage(jobInfo)

0 commit comments

Comments
 (0)