Skip to content

Commit b439838

Browse files
committed
Query all cloudtrail events for stop and terminate
This commit uses the Paginator for CloudTrail events, instead of just running a single query. This should prevent missing events, even though it might mean longer runtimes for CAD in case of very many events matching stop/terminate events. It also will only retry the event-gathering instead of the all the actions that follow on the event, to keep the retry-loop smaller.
1 parent 8cf32ed commit b439838

File tree

1 file changed

+91
-72
lines changed

1 file changed

+91
-72
lines changed

pkg/aws/aws.go

Lines changed: 91 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ func NewClient(accessID, accessSecret, token, region string) (*SdkClient, error)
7979
configv2.WithRegion(region),
8080
configv2.WithCredentialsProvider(staticCredentials),
8181
configv2.WithRetryer(func() awsv2.Retryer {
82-
return retry.AddWithMaxBackoffDelay(retry.AddWithMaxAttempts(retry.NewStandard(), maxRetries),
83-
time.Second*5)
82+
return retry.AddWithMaxBackoffDelay(retry.AddWithMaxAttempts(retry.NewStandard(), maxRetries), time.Second*5)
8483
}),
8584
)
8685
if err != nil {
@@ -229,88 +228,100 @@ func (c *SdkClient) PollInstanceStopEventsFor(instances []ec2v2types.Instance, r
229228
idToCloudtrailEvent := make(map[string]cloudtrailv2types.Event)
230229

231230
var executionError error
231+
var stoppedInstanceEvents []cloudtrailv2types.Event = make([]cloudtrailv2types.Event, 0)
232+
var terminatedInstanceEvents []cloudtrailv2types.Event = make([]cloudtrailv2types.Event, 0)
232233
err = wait.ExponentialBackoff(backoffOptions, func() (bool, error) {
233234
executionError = nil
234235

235-
stoppedInstanceEvents, err := c.ListAllInstanceStopEventsV2()
236-
if err != nil {
237-
executionError = fmt.Errorf("an error occurred in ListAllInstanceStopEvents: %w", err)
238-
return false, nil
239-
}
240-
241-
terminatedInstanceEvents, err := c.ListAllTerminatedInstancesV2()
242-
if err != nil {
243-
executionError = fmt.Errorf("an error occurred in ListAllTerminatedInstances: %w", err)
244-
return false, nil
236+
// Retry only in case we haven't retrieved these events in the previous iteration.
237+
if len(stoppedInstanceEvents) == 0 {
238+
stoppedInstanceEvents, err = c.ListAllInstanceStopEventsV2()
239+
if err != nil {
240+
executionError = fmt.Errorf("an error occurred in ListAllInstanceStopEvents: %w", err)
241+
//nolint:nilerr
242+
return false, nil
243+
}
245244
}
246245

247-
// only add events to our investigation, if these events contain
248-
// at least one of our cluster instances.
249-
var clusterInstanceEvents []cloudtrailv2types.Event
250-
for _, event := range stoppedInstanceEvents {
251-
if eventContainsInstances(instances, event) {
252-
clusterInstanceEvents = append(clusterInstanceEvents, event)
253-
continue
246+
// Retry only in case we haven't retrieved these events in the previous
247+
// iteration (this should never be != 0 as the code is sequential and
248+
// this is the only part that can trigger the partial-retry)
249+
if len(terminatedInstanceEvents) == 0 {
250+
terminatedInstanceEvents, err = c.ListAllTerminatedInstancesV2()
251+
if err != nil {
252+
executionError = fmt.Errorf("an error occurred in ListAllTerminatedInstances: %w", err)
253+
//nolint:nilerr
254+
return false, nil
254255
}
255-
// Event is not for one of our cluster instances.
256-
logging.Debugf("Ignoring event with id '%s', as it is unrelated to the cluster.", *event.EventId)
257256
}
257+
return true, nil
258+
})
258259

259-
for _, event := range terminatedInstanceEvents {
260-
if eventContainsInstances(instances, event) {
261-
clusterInstanceEvents = append(clusterInstanceEvents, event)
262-
continue
263-
}
264-
// Event is not for one of our cluster instances.
265-
logging.Debugf("Ignoring event with id '%s', as it is unrelated to the cluster.", *event.EventId)
260+
// only add events to our investigation, if these events contain
261+
// at least one of our cluster instances.
262+
var clusterInstanceEvents []cloudtrailv2types.Event
263+
for _, event := range stoppedInstanceEvents {
264+
if eventContainsInstances(instances, event) {
265+
clusterInstanceEvents = append(clusterInstanceEvents, event)
266+
continue
266267
}
268+
// Event is not for one of our cluster instances.
269+
logging.Debugf("Ignoring event with id '%s', as it is unrelated to the cluster.", *event.EventId)
270+
}
267271

268-
// Loop over all stopped and terminate events for our cluster instancs
269-
for _, event := range clusterInstanceEvents {
270-
// we have to loop over each resource in each event
271-
for _, resource := range event.Resources {
272-
instanceID := *resource.ResourceName
273-
_, ok := idToStopTime[instanceID]
274-
if ok {
275-
storedEvent, ok := idToCloudtrailEvent[instanceID]
276-
if !ok {
277-
idToCloudtrailEvent[instanceID] = event
278-
} else if storedEvent.EventTime.Before(*event.EventTime) {
279-
// here the event exists already, and we compared it with the eventTime of the current event.
280-
// We only jump into this else if clause, if the storedEvent happened BEFORE the current event.
281-
// This means we are overwriting the idToCloudTrailEvent with the "newest" event.
282-
idToCloudtrailEvent[instanceID] = event
283-
}
272+
for _, event := range terminatedInstanceEvents {
273+
if eventContainsInstances(instances, event) {
274+
clusterInstanceEvents = append(clusterInstanceEvents, event)
275+
continue
276+
}
277+
// Event is not for one of our cluster instances.
278+
logging.Debugf("Ignoring event with id '%s', as it is unrelated to the cluster.", *event.EventId)
279+
}
280+
281+
// Loop over all stopped and terminate events for our cluster instancs
282+
for _, event := range clusterInstanceEvents {
283+
// we have to loop over each resource in each event
284+
for _, resource := range event.Resources {
285+
instanceID := *resource.ResourceName
286+
_, ok := idToStopTime[instanceID]
287+
if ok {
288+
storedEvent, ok := idToCloudtrailEvent[instanceID]
289+
if !ok {
290+
idToCloudtrailEvent[instanceID] = event
291+
} else if storedEvent.EventTime.Before(*event.EventTime) {
292+
// here the event exists already, and we compared it with the eventTime of the current event.
293+
// We only jump into this else if clause, if the storedEvent happened BEFORE the current event.
294+
// This means we are overwriting the idToCloudTrailEvent with the "newest" event.
295+
idToCloudtrailEvent[instanceID] = event
284296
}
285297
}
286298
}
287-
logging.Debugf("%+v", idToCloudtrailEvent)
288-
for _, instance := range instances {
289-
instanceID := *instance.InstanceId
290-
event, ok := idToCloudtrailEvent[instanceID]
291-
if !ok {
292-
executionError = fmt.Errorf("the stopped instance %s does not have a StopInstanceEvent", instanceID)
293-
return false, nil
294-
}
295-
logging.Debug("event in idToCloudtrailEvent[instanceID]", instanceID)
299+
}
300+
logging.Debugf("%+v", idToCloudtrailEvent)
301+
for _, instance := range instances {
302+
instanceID := *instance.InstanceId
303+
event, ok := idToCloudtrailEvent[instanceID]
304+
if !ok {
305+
executionError = fmt.Errorf("the stopped instance %s does not have a StopInstanceEvent", instanceID)
306+
return nil, executionError
307+
}
308+
logging.Debug("event in idToCloudtrailEvent[instanceID]", instanceID)
296309

297-
// not checking if the item is in the array as it's a 1-1 mapping
298-
extractedTime := idToStopTime[instanceID]
310+
// not checking if the item is in the array as it's a 1-1 mapping
311+
extractedTime := idToStopTime[instanceID]
299312

300-
if event.EventTime.Before(extractedTime) {
301-
executionError = fmt.Errorf("most up to date time is before the instance stopped time")
302-
return false, nil
303-
}
313+
if event.EventTime.Before(extractedTime) {
314+
executionError = fmt.Errorf("most up to date time is before the instance stopped time")
315+
return nil, executionError
304316
}
317+
}
305318

306-
for _, event := range idToCloudtrailEvent {
307-
if !containsEvent(event, events) {
308-
events = append(events, event)
309-
}
319+
for _, event := range idToCloudtrailEvent {
320+
if !containsEvent(event, events) {
321+
events = append(events, event)
310322
}
323+
}
311324

312-
return true, nil
313-
})
314325
logging.Debugf("%+v", idToCloudtrailEvent)
315326

316327
if err != nil || executionError != nil {
@@ -493,18 +504,26 @@ func (c *SdkClient) defaultRouteTableForVpc(vpcId string) (*ec2v2types.RouteTabl
493504
func (c *SdkClient) listAllInstancesAttribute(att cloudtrailv2types.LookupAttribute) ([]cloudtrailv2types.Event, error) {
494505
// We only look up events that are not older than 2 hours
495506
since := time.Now().UTC().Add(time.Duration(-2) * time.Hour)
496-
// We only look up 1000 events maximum
497-
var maxResults int32 = 1000
507+
// We will only capture this many events via pagination - looping till we
508+
// exhaust 90 days of events might take *very* long in big accounts
509+
// otherwise.
510+
maxNumberEvents := 1000
511+
var events []cloudtrailv2types.Event = make([]cloudtrailv2types.Event, 0)
498512
in := &cloudtrailv2.LookupEventsInput{
499513
LookupAttributes: []cloudtrailv2types.LookupAttribute{att},
500-
MaxResults: &maxResults,
501514
StartTime: &since,
502515
}
503-
out, err := c.CloudtrailClient.LookupEvents(context.TODO(), in)
504-
if err != nil {
505-
return nil, err
516+
paginator := cloudtrailv2.NewLookupEventsPaginator(c.CloudtrailClient, in)
517+
// FIXME: Decide if we should just always retrieve *all* events which could
518+
// be wasteful
519+
for paginator.HasMorePages() && len(events) < maxNumberEvents {
520+
out, err := paginator.NextPage(context.TODO())
521+
if err != nil {
522+
return nil, err
523+
}
524+
events = append(events, out.Events...)
506525
}
507-
return out.Events, nil
526+
return events, nil
508527
}
509528

510529
// findVpcIDForSubnet returns the VPC ID for the subnet

0 commit comments

Comments
 (0)