@@ -14,6 +14,7 @@ import (
1414 "github.com/digitalocean/godo"
1515 "github.com/google/uuid"
1616 "github.com/hashicorp/go-hclog"
17+ "github.com/hashicorp/nomad-autoscaler/sdk/helper/nomad"
1718 "github.com/hashicorp/nomad/api"
1819)
1920
@@ -24,6 +25,7 @@ const (
2425
2526type dropletTemplate struct {
2627 createReservedAddresses bool
28+ initGracePeriod time.Duration
2729 ipv6 bool
2830 name string
2931 region string
@@ -42,6 +44,8 @@ type dropletTemplate struct {
4244 vpc string
4345}
4446
47+ type DropletIDs map [int ]struct {}
48+
4549func (t * TargetPlugin ) scaleOut (
4650 ctx context.Context ,
4751 desired , diff int64 ,
@@ -52,8 +56,6 @@ func (t *TargetPlugin) scaleOut(
5256
5357 log .Debug ("creating DigitalOcean droplets" , "template" , fmt .Sprintf ("%+v" , template ))
5458
55- ctx , cancel := context .WithCancelCause (ctx )
56- defer cancel (nil )
5759 wg := & sync.WaitGroup {}
5860 var prereservedIPV4s []string
5961 var prereservedIPV6s []string
@@ -189,10 +191,22 @@ func (t *TargetPlugin) scaleOut(
189191 wg .Wait ()
190192 close (errorChannel )
191193 }()
194+
192195 errorList := make ([]error , 0 )
193196 for err := range errorChannel {
194197 errorList = append (errorList , err )
195198 }
199+
200+ // Regardless of whether there were failures, schedule an orphaned-droplet check
201+ if template .initGracePeriod > time .Second {
202+ // Wait until enough time has passed so that if the newly-created droplet(s) are not yet
203+ // participating in the nomad cluster, they will be considered orphaned.
204+ // Add a minute to the grace period to allow for minor time disparities between
205+ // the autoscaler clock and the droplets' creation times.
206+ delay := template .initGracePeriod + time .Minute
207+ go deleteOrphanedDroplets (ctx , t .logger , t .client .Droplets (), t .getReadyNomadClients , template , delay )
208+ }
209+
196210 if len (errorList ) > 0 {
197211 return errors .Join (errorList ... )
198212 }
@@ -208,6 +222,119 @@ func (t *TargetPlugin) scaleOut(
208222 return nil
209223}
210224
225+ // deleteOrphanedDroplets will destroy any droplets which are not whitelisted,
226+ // but only if they have the tag shared by all droplets managed by the autoscaler,
227+ // and which were not recently created.
228+ // whitelistGenerator: provides a set of droplet IDs which are expected; any other
229+ // droplets will be assessed and potentially deleted.
230+ // delay: the time to wait before looking for orphans
231+ func deleteOrphanedDroplets (ctx context.Context ,
232+ logger hclog.Logger ,
233+ dropletsService Droplets ,
234+ whitelistGenerator func (ctx context.Context ) (DropletIDs , error ),
235+ template * dropletTemplate ,
236+ delay time.Duration ,
237+ ) {
238+ if err := Sleep (ctx , delay ); err != nil {
239+ logger .Info ("context was cancelled, so not checking for orphaned droplets" )
240+ return
241+ }
242+
243+ whitelist , err := whitelistGenerator (ctx )
244+ if err != nil {
245+ logger .Error ("cannot determine which droplets are whitelisted: %w" , err )
246+ return
247+ }
248+ logger .Info ("checking for orphaned droplets" , "whitelist size" , len (whitelist ))
249+ for droplet , err := range Unpaginate (ctx , Unarg (dropletsService .ListByTag , template .name ), godo.ListOptions {}) {
250+ if err != nil {
251+ logger .Error ("cannot retrieve droplets" , "error" , err )
252+ return
253+ }
254+ // This could be done in parallel, but shouldn't be necessary
255+ if _ , exists := whitelist [droplet .ID ]; exists {
256+ logger .Debug ("droplet is a nomad client, so not considering an orphan" , "droplet ID" , droplet .ID )
257+ } else {
258+ dt , err := time .Parse (time .RFC3339 , droplet .Created )
259+ if err != nil {
260+ logger .Error ("cannot parse droplet creation time. Not treating as an orphan" , "error" , err , "droplet ID" , droplet .ID )
261+ continue
262+ }
263+ if time .Since (dt ) < template .initGracePeriod {
264+ logger .Debug ("Droplet was very recently created. Not treating as an orphan" , "droplet ID" , droplet .ID )
265+ continue
266+ }
267+
268+ if _ , err := dropletsService .Delete (ctx , droplet .ID ); err == nil {
269+ logger .Info ("deleted orphaned droplet" , "droplet ID" , droplet .ID )
270+ } else {
271+ logger .Error ("cannot delete droplet" , "error" , err , "droplet ID" , droplet .ID )
272+ }
273+ }
274+ }
275+ logger .Debug ("finished checking for orphaned droplets" )
276+ }
277+
278+ // getReadyNomadClients returns a set of droplet IDs
279+ // where the nomad client is running and has "ready" status.
280+ func (t * TargetPlugin ) getReadyNomadClients (ctx context.Context ) (DropletIDs , error ) {
281+ result := make (DropletIDs )
282+ cfg := nomad .ConfigFromNamespacedMap (t .config )
283+ client , err := api .NewClient (cfg )
284+ if err != nil {
285+ return nil , fmt .Errorf ("failed to instantiate Nomad client: %v" , err )
286+ }
287+ nodes , _ , err := client .Nodes ().List (& api.QueryOptions {Params : map [string ]string {"resources" : "true" }})
288+ if err != nil {
289+ return nil , fmt .Errorf ("failed to list Nomad nodes from API: %v" , err )
290+ }
291+
292+ q := api.QueryOptions {
293+ AllowStale : true ,
294+ }
295+ for _ , node := range nodes {
296+ // TODO: filter out nodes which are not part of our node pool.
297+ // This is just an optimisation, as it's only droplets which aren't in any node pool
298+ // which are susceptible to being considered orphans.
299+
300+ t .logger .Debug ("found node" ,
301+ "node_id" , node .ID , "datacenter" , node .Datacenter , "node_class" , node .NodeClass , "node_pool" , node .NodePool ,
302+ "status" , node .Status , "eligibility" , node .SchedulingEligibility , "draining" , node .Drain , "all" , fmt .Sprintf ("%+v" , node ),
303+ )
304+ if node .Status != "ready" {
305+ t .logger .Info ("node is known as a nomad client but its status is not ready" , "node ID" , node .ID , "status" , node .Status )
306+ continue
307+ }
308+
309+ if dropletID , exists := t .dropletMapping .Get (node .ID ); exists {
310+ // this node's droplet ID is already known, so include it
311+ result [dropletID ] = struct {}{}
312+ continue
313+ }
314+
315+ // The summary daa returned by client.Nodes() does not contain sufficient metadaa to determine
316+ // the droplet ID; a follow-up call to `Info()` is required.
317+ node , _ , err := client .Nodes ().Info (node .ID , & q )
318+ if err != nil {
319+ t .logger .Warn ("cannot get node info" , "node ID" , node .ID , "err" , err )
320+ continue
321+ }
322+ dropletID , ok := node .Attributes ["unique.platform.digitalocean.id" ]
323+ if ! ok || dropletID == "" {
324+ t .logger .Warn ("cannot find droplet ID" , "NodeID" , node .ID , "attributes" , node .Attributes , "node" , fmt .Sprintf ("%+v" , node ), "err" , err )
325+ continue
326+ }
327+ t .logger .Debug ("Found droplet ID for node" , "NodeID" , node .ID , "droplet ID" , dropletID )
328+ numericID , err := strconv .Atoi (dropletID )
329+ if err != nil {
330+ return nil , fmt .Errorf ("cannot convert %v to an integer: %w" , dropletID , err )
331+ }
332+ result [numericID ] = struct {}{}
333+ t .dropletMapping .Add (node .ID , numericID )
334+ }
335+ return result , nil
336+ }
337+
211338func (t * TargetPlugin ) scaleIn (
212339 ctx context.Context ,
213340 desired , diff int64 ,
@@ -319,7 +446,7 @@ func (t *TargetPlugin) ensureDropletsAreStable(
319446 cancel (err )
320447 return err
321448 } else {
322- return errors . New ("waiting for droplets to become stable" )
449+ return fmt . Errorf ("waiting for droplets to become stable. desired:%v active:%v" , desired , active )
323450 }
324451 },
325452 )
0 commit comments