@@ -20,30 +20,62 @@ type RunServiceJob struct {
2020 Delete bool `default:"true"`
2121 Image string
2222 Network string
23+ Service string
2324}
2425
2526func NewRunServiceJob (c * docker.Client ) * RunServiceJob {
2627 return & RunServiceJob {Client : c }
2728}
2829
30+ // Main method for running a service-based job
31+ // If the service has been provided it will start a new task for the existing service
32+ // Otherwise it will create a new service based on the image and other parameters
2933func (j * RunServiceJob ) Run (ctx * Context ) error {
30- if err := j .pullImage (); err != nil {
31- return err
34+
35+ if j .Image != "" {
36+ if err := j .pullImage (); err != nil {
37+ return err
38+ }
3239 }
3340
34- svc , err := j .buildService ()
41+ var svcID string
42+ if j .Service == "" {
43+ svc , err := j .buildService ()
3544
36- if err != nil {
37- return err
38- }
45+ if err != nil {
46+ return err
47+ }
48+
49+ svcID = svc .ID
50+ ctx .Logger .Noticef ("Created service %s for job %s\n " , svcID , j .Name )
51+ } else {
52+ svc , err := j .inspectService (ctx , j .Service )
53+ if err != nil {
54+ return err
55+ }
56+ svcID = svc .ID
57+ ctx .Logger .Noticef ("Found service %s for job %s\n " , svcID , j .Name )
3958
40- ctx .Logger .Noticef ("Created service %s for job %s\n " , svc .ID , j .Name )
59+ _ , err = j .scaleService (ctx , svcID , false )
60+ if err != nil {
61+ return err
62+ }
4163
42- if err := j .watchContainer (ctx , svc .ID ); err != nil {
64+ _ , err = j .scaleService (ctx , svcID , true )
65+ if err != nil {
66+ return err
67+ }
68+ }
69+
70+ if err := j .watchContainer (ctx , svcID ); err != nil {
4371 return err
4472 }
4573
46- return j .deleteService (ctx , svc .ID )
74+ if j .Service == "" {
75+ return j .deleteService (ctx , svcID )
76+ } else {
77+ return nil
78+ }
4779}
4880
4981func (j * RunServiceJob ) pullImage () error {
@@ -96,6 +128,48 @@ func (j *RunServiceJob) buildService() (*swarm.Service, error) {
96128 return svc , err
97129}
98130
131+ // Scale an existing service one replica up or down
132+ func (j * RunServiceJob ) scaleService (ctx * Context , svcID string , up bool ) (* swarm.Service , error ) {
133+ svc , err := j .inspectService (ctx , j .Service )
134+ if err != nil {
135+ return nil , err
136+ }
137+
138+ replicas := * svc .Spec .Mode .Replicated .Replicas
139+ if up {
140+ replicas += 1
141+ } else {
142+ // If there already 0 replicas of a service, there is no need to scale down
143+ if replicas == 0 {
144+ return svc , err
145+ }
146+ replicas -= 1
147+ }
148+
149+ updateSvcOpts := docker.UpdateServiceOptions {}
150+
151+ updateSvcOpts .Name = svc .Spec .Name
152+ updateSvcOpts .Version = svc .Version .Index
153+
154+ // The old spec is required, otherwise defaults will override the service
155+ updateSvcOpts .ServiceSpec = svc .Spec
156+
157+ updateSvcOpts .Mode .Replicated =
158+ & swarm.ReplicatedService {
159+ Replicas : & replicas ,
160+ }
161+
162+ // Do the actual scaling
163+ err = j .Client .UpdateService (svcID , updateSvcOpts )
164+ if err != nil {
165+ return nil , err
166+ }
167+
168+ // Give docker the time to do the scaling
169+ time .Sleep (time .Millisecond * 1000 )
170+ return svc , err
171+ }
172+
99173const (
100174
101175 // TODO are these const defined somewhere in the docker API?
@@ -111,9 +185,9 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error {
111185
112186 ctx .Logger .Noticef ("Checking for service ID %s (%s) termination\n " , svcID , j .Name )
113187
114- svc , err := j .Client . InspectService ( svcID )
188+ svc , err := j .inspectService ( ctx , j . Service )
115189 if err != nil {
116- return fmt . Errorf ( "Failed to inspect service %s: %s" , svcID , err . Error ())
190+ return err
117191 }
118192
119193 // On every tick, check if all the services have completed, or have error out
@@ -124,12 +198,14 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error {
124198 defer wg .Done ()
125199 for _ = range svcChecker .C {
126200
201+ // TODO will not work with longer existing services
202+ // TODO doesn't work
127203 if svc .CreatedAt .After (time .Now ().Add (maxProcessDuration )) {
128204 err = ErrMaxTimeRunning
129205 return
130206 }
131207
132- taskExitCode , found := j .findtaskstatus (ctx , svc .ID )
208+ taskExitCode , found := j .findTaskStatus (ctx , svc .ID )
133209
134210 if found {
135211 exitCode = taskExitCode
@@ -141,19 +217,28 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error {
141217 wg .Wait ()
142218
143219 ctx .Logger .Noticef ("Service ID %s (%s) has completed\n " , svcID , j .Name )
220+
221+ switch exitCode {
222+ case 0 :
223+ return nil
224+ case - 1 :
225+ return ErrUnexpected
226+ default :
227+ return fmt .Errorf ("error non-zero exit code: %d" , exitCode )
228+ }
144229 return err
145230}
146231
147- func (j * RunServiceJob ) findtaskstatus (ctx * Context , taskID string ) (int , bool ) {
232+ func (j * RunServiceJob ) findTaskStatus (ctx * Context , svcID string ) (int , bool ) {
148233 taskFilters := make (map [string ][]string )
149- taskFilters ["service" ] = []string {taskID }
234+ taskFilters ["service" ] = []string {svcID }
150235
151236 tasks , err := j .Client .ListTasks (docker.ListTasksOptions {
152237 Filters : taskFilters ,
153238 })
154239
155240 if err != nil {
156- ctx .Logger .Errorf ("Failed to find task ID %s. Considering the task terminated: %s\n " , taskID , err .Error ())
241+ ctx .Logger .Errorf ("Failed to find tasks fo service %s. Considering the task terminated: %s\n " , svcID , err .Error ())
157242 return 0 , false
158243 }
159244
@@ -187,6 +272,20 @@ func (j *RunServiceJob) findtaskstatus(ctx *Context, taskID string) (int, bool)
187272 if exitCode == 0 && task .Status .State == swarm .TaskStateRejected {
188273 exitCode = 255 // force non-zero exit for task rejected
189274 }
275+
276+ err = j .Client .GetServiceLogs (docker.LogsServiceOptions {
277+ Service : svcID ,
278+ Stderr : true ,
279+ Stdout : true ,
280+ Follow : false ,
281+ ErrorStream : ctx .Execution .ErrorStream ,
282+ OutputStream : ctx .Execution .OutputStream ,
283+ })
284+ if err != nil {
285+ ctx .Logger .Errorf ("Error getting logs for service: %s - %s \n " , svcID , err .Error ())
286+ return 0 , false
287+ }
288+
190289 done = true
191290 break
192291 }
@@ -212,3 +311,12 @@ func (j *RunServiceJob) deleteService(ctx *Context, svcID string) error {
212311 return err
213312
214313}
314+
315+ // Convenience method for inspecting a service
316+ func (j * RunServiceJob ) inspectService (ctx * Context , svcID string ) (* swarm.Service , error ) {
317+ svc , err := j .Client .InspectService (j .Service )
318+ if err != nil {
319+ return nil , fmt .Errorf ("Failed to inspect service %s: %s" , j .Service , err .Error ())
320+ }
321+ return svc , err
322+ }
0 commit comments