@@ -13,6 +13,7 @@ import (
1313 "github.com/operable/go-relay/relay/worker"
1414 "golang.org/x/net/context"
1515 "strings"
16+ "time"
1617)
1718
1819const (
@@ -45,6 +46,8 @@ type cogRelay struct {
4546 catalog * bundle.Catalog
4647 announcer Announcer
4748 directivesReplyTo string
49+ bundleTimer * time.Timer
50+ cleanTimer * time.Timer
4851}
4952
5053// NewRelay constructs a new Relay instance
@@ -84,10 +87,23 @@ func (r *cogRelay) Start() error {
8487 if err := conn .Connect (r .connOpts ); err != nil {
8588 return err
8689 }
90+ if r .config .DockerEnabled () {
91+ r .cleanTimer = time .AfterFunc (r .config .Docker .CleanDuration (), r .scheduledDockerCleanup )
92+ log .Infof ("Cleaning up Docker environment on %d second intervals." , r .config .Docker .CleanDuration ()/ time .Second )
93+ }
94+ log .Infof ("Refreshing bundle catalog on %d second intervals." , r .config .RefreshDuration ()/ time .Second )
8795 return nil
8896}
8997
9098func (r * cogRelay ) Stop () error {
99+ if r .bundleTimer != nil {
100+ r .bundleTimer .Stop ()
101+ }
102+ if r .config .DockerEnabled () {
103+ if r .bundleTimer != nil {
104+ r .cleanTimer .Stop ()
105+ }
106+ }
91107 return nil
92108}
93109
@@ -134,9 +150,11 @@ func (r *cogRelay) handleCommand(conn bus.Connection, topic string, message []by
134150 Payload : message ,
135151 }
136152 ctx := context .WithValue (context .Background (), "invoke" , invoke )
137- log .Debugf ("Queue stopped: %v" , r .queue .IsStopped ())
138- log .Debugf ("Eqneueud request: %s" , r .queue .Enqueue (ctx ))
139- log .Debugf ("Enqueued invocation request for %s" , topic )
153+ if err := r .queue .Enqueue (ctx ); err != nil {
154+ log .Debugf ("Failed enqueuing invocation request: %s." , err )
155+ } else {
156+ log .Debugf ("Enqueued invocation request for %s" , topic )
157+ }
140158}
141159
142160func (r * cogRelay ) handleDirective (conn bus.Connection , topic string , message []byte ) {
@@ -148,7 +166,7 @@ func (r *cogRelay) handleDirective(conn bus.Connection, topic string, message []
148166 // Dispatch on mesasge type
149167 switch tm .(type ) {
150168 case * messages.ListBundlesResponseEnvelope :
151- log .Info ("Processing bundle list " )
169+ log .Debug ("Processing bundle catalog updates. " )
152170 r .updateCatalog (tm .(* messages.ListBundlesResponseEnvelope ))
153171 }
154172}
@@ -174,21 +192,34 @@ func (r *cogRelay) updateCatalog(envelope *messages.ListBundlesResponseEnvelope)
174192 // TODO: This should be bi-directional sync to catch bundle unassignments too
175193 r .catalog .AddBatch (bundles )
176194 if r .catalog .IsChanged () {
177- r .refreshBundles ()
178- log .Info ("Changes to bundle assignments detected." )
179- r .announcer .SendAnnouncement ()
195+ if err := r .refreshBundles (); err != nil {
196+ log .Errorf ("Bundle catalog refresh failed: %s." , err )
197+ } else {
198+ log .Info ("Changes to bundle catalog detected." )
199+ r .announcer .SendAnnouncement ()
200+ }
180201 }
202+ r .bundleTimer = time .AfterFunc (r .config .RefreshDuration (), r .scheduledBundleRefresh )
181203}
182204
183205func (r * cogRelay ) refreshBundles () error {
184206 dockerEngine , err := engines .NewDockerEngine (* r .config )
185207 if err != nil {
186- return err
208+ if r .config .DockerEnabled () == false {
209+ dockerEngine = nil
210+ } else {
211+ return err
212+ }
187213 }
188214 for _ , name := range r .catalog .BundleNames () {
189215 if bundle := r .catalog .FindLatest (name ); bundle != nil {
190216 if bundle .NeedsRefresh () {
191217 if bundle .IsDocker () {
218+ if r .config .DockerEnabled () == false {
219+ log .Infof ("Skipping Docker-based bundle %s %s." , bundle .Name , bundle .Version )
220+ bundle .SetAvailable (false )
221+ continue
222+ }
192223 avail , _ := dockerEngine .IsAvailable (bundle .Docker .Image , bundle .Docker .Tag )
193224 bundle .SetAvailable (avail )
194225 } else {
@@ -208,10 +239,34 @@ func (r *cogRelay) requestBundles() error {
208239 },
209240 }
210241 raw , _ := json .Marshal (& msg )
211- log .Info ("Refreshing command bundles ." )
242+ log .Debug ("Refreshing command catalog ." )
212243 return r .conn .Publish (infoTopic , raw )
213244}
214245
246+ func (r * cogRelay ) scheduledBundleRefresh () {
247+ if err := r .requestBundles (); err != nil {
248+ log .Errorf ("Scheduled bundle catalog refresh failed: %s." , err )
249+ r .bundleTimer = time .AfterFunc (r .config .RefreshDuration (), r .scheduledBundleRefresh )
250+ }
251+ }
252+
253+ func (r * cogRelay ) scheduledDockerCleanup () {
254+ engine , err := engines .NewDockerEngine (* r .config )
255+ if err != nil {
256+ log .Errorf ("Scheduled clean up of Docker environment failed: %s." , err )
257+ } else {
258+ cleaned := engine .Clean ()
259+ container := "containers"
260+ if cleaned == 1 {
261+ container = "container"
262+ }
263+ if cleaned > 0 {
264+ log .Infof ("Scheduled Docker clean up removed %d %s." , cleaned , container )
265+ }
266+ }
267+ r .cleanTimer = time .AfterFunc (r .config .Docker .CleanDuration (), r .scheduledDockerCleanup )
268+ }
269+
215270func verifyDockerConfig (c * config.Config ) error {
216271 if c .DockerEnabled () == true {
217272 if err := engines .VerifyDockerConfig (c .Docker ); err != nil {
0 commit comments