Skip to content

Commit d0bcdc9

Browse files
committed
discovery: Remove EKS audit logs from EKS fetcher
Remove checking EKS clusters for audit log fetching from the EKS fetcher and move it up into the AWS resource sync. This has slightly more overhead in that we need to reconstruct a map to check if it should have audit logs fetched, but the code is much cleaner as a result. Get rid of the clwClient interface as it embedded only a single other interface - its a bit redundant even if it does follow the pattern of the other AWS service clients.
1 parent 3a2ee14 commit d0bcdc9

File tree

6 files changed

+47
-105
lines changed

6 files changed

+47
-105
lines changed

lib/srv/discovery/access_graph_aws.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,27 @@ func (s *Server) reconcileAccessGraph(
130130
// Each fetcher can return an error and a result.
131131
for range allFetchers {
132132
fetcherResult := <-resultsC
133-
if fetcherResult.err != nil {
134-
errs = append(errs, fetcherResult.err)
133+
fetcher, result, err := fetcherResult.fetcher, fetcherResult.result, fetcherResult.err
134+
if err != nil {
135+
errs = append(errs, err)
136+
}
137+
if result == nil {
138+
continue
135139
}
136-
if fetcherResult.result != nil {
137-
results = append(results, fetcherResult.result)
138-
for _, cluster := range fetcherResult.result.EKSAuditLogClusters {
139-
fetcherCluster := eksAuditLogCluster{fetcherResult.fetcher, cluster}
140-
auditLogClusters = append(auditLogClusters, fetcherCluster)
140+
results = append(results, result)
141+
// If the fetcher is configured for EKS audit logs, see if any
142+
// EKS clusters match the configured tags.
143+
if fetcher.EKSAuditLogs == nil {
144+
continue
145+
}
146+
for _, cluster := range result.EKSClusters {
147+
clusterTags := make(map[string]string, len(cluster.Tags))
148+
for _, tag := range cluster.Tags {
149+
clusterTags[tag.GetKey()] = tag.GetValue().GetValue()
150+
}
151+
match, _, _ := services.MatchLabels(fetcher.EKSAuditLogs.Tags, clusterTags)
152+
if match {
153+
auditLogClusters = append(auditLogClusters, eksAuditLogCluster{fetcher, cluster})
141154
}
142155
}
143156
}

lib/srv/discovery/fetchers/aws-sync/aws-sync.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ type awsClientProvider interface {
133133
getSTSClient(cfg aws.Config, optFns ...func(*sts.Options)) stsClient
134134
// getKMSClient provides a [kmsClient].
135135
getKMSClient(cfg aws.Config, optFns ...func(*kms.Options)) kmsClient
136-
// getCloudWatchLogsClient provides a [cwlClient].
137-
getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cwlClient
136+
// getCloudWatchLogsClient provides a [cloudwatchlogs.FilterLogEventsAPIClient].
137+
getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient
138138
}
139139

140140
type defaultAWSClients struct{}
@@ -159,7 +159,7 @@ func (defaultAWSClients) getKMSClient(cfg aws.Config, optFns ...func(*kms.Option
159159
return kms.NewFromConfig(cfg, optFns...)
160160
}
161161

162-
func (defaultAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cwlClient {
162+
func (defaultAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient {
163163
return cloudwatchlogs.NewFromConfig(cfg, optFns...)
164164
}
165165

@@ -234,11 +234,6 @@ type Resources struct {
234234
OIDCProviders []*accessgraphv1alpha.AWSOIDCProviderV1
235235
// KMSKeys is a list of KMS keys.
236236
KMSKeys []*accessgraphv1alpha.AWSKMSKeyV1
237-
238-
// EKSAuditLogClusters is a subset of the clusters in the field EKSClusters.
239-
// These are the clusters for which apiserver audit logs should be fetched.
240-
// These are not sent to access graph with the other resources.
241-
EKSAuditLogClusters []*accessgraphv1alpha.AWSEKSClusterV1
242237
}
243238

244239
func (r *Resources) count() int {

lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,6 @@ import (
3030
accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
3131
)
3232

33-
// cwlClient has the AWS interfaces required by this package to fetch
34-
// CloudWatch logs.
35-
type cwlClient interface {
36-
cloudwatchlogs.FilterLogEventsAPIClient
37-
}
38-
3933
// FetchEKSAuditLogs returns a slice of audit log events for the given cluster
4034
// starting from the given cursor.
4135
func (a *Fetcher) FetchEKSAuditLogs(ctx context.Context, cluster *accessgraphv1alpha.AWSEKSClusterV1, cursor *accessgraphv1alpha.KubeAuditLogCursor) ([]cwltypes.FilteredLogEvent, error) {

lib/srv/discovery/fetchers/aws-sync/eks.go

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package aws_sync
2020

2121
import (
2222
"context"
23-
"slices"
2423
"sync"
2524

2625
"github.com/aws/aws-sdk-go-v2/aws"
@@ -32,7 +31,6 @@ import (
3231

3332
accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
3433
"github.com/gravitational/teleport/lib/cloud/awsconfig"
35-
"github.com/gravitational/teleport/lib/services"
3634
)
3735

3836
// EKSClientGetter returns an EKS client for aws-sync.
@@ -60,7 +58,6 @@ func (a *Fetcher) pollAWSEKSClusters(ctx context.Context, result *Resources, col
6058
result.EKSClusters = output.clusters
6159
result.AssociatedAccessPolicies = output.associatedPolicies
6260
result.AccessEntries = output.accessEntry
63-
result.EKSAuditLogClusters = output.auditLogClusters
6461
return nil
6562
}
6663
}
@@ -70,7 +67,6 @@ type fetchAWSEKSClustersOutput struct {
7067
clusters []*accessgraphv1alpha.AWSEKSClusterV1
7168
associatedPolicies []*accessgraphv1alpha.AWSEKSAssociatedAccessPolicyV1
7269
accessEntry []*accessgraphv1alpha.AWSEKSClusterAccessEntryV1
73-
auditLogClusters []*accessgraphv1alpha.AWSEKSClusterV1
7470
}
7571

7672
// fetchAWSSEKSClusters fetches eks instances from all regions.
@@ -89,7 +85,6 @@ func (a *Fetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClusters
8985
collectClusters := func(cluster *accessgraphv1alpha.AWSEKSClusterV1,
9086
clusterAssociatedPolicies []*accessgraphv1alpha.AWSEKSAssociatedAccessPolicyV1,
9187
clusterAccessEntries []*accessgraphv1alpha.AWSEKSClusterAccessEntryV1,
92-
fetchAuditLogs bool,
9388
err error,
9489
) {
9590
hostsMu.Lock()
@@ -99,9 +94,6 @@ func (a *Fetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClusters
9994
}
10095
if cluster != nil {
10196
output.clusters = append(output.clusters, cluster)
102-
if fetchAuditLogs {
103-
output.auditLogClusters = append(output.auditLogClusters, cluster)
104-
}
10597
}
10698
output.associatedPolicies = append(output.associatedPolicies, clusterAssociatedPolicies...)
10799
output.accessEntry = append(output.accessEntry, clusterAccessEntries...)
@@ -111,8 +103,7 @@ func (a *Fetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClusters
111103
eG.Go(func() error {
112104
eksClient, err := a.GetEKSClient(ctx, region, a.getAWSOptions()...)
113105
if err != nil {
114-
fetchAuditLogs := false
115-
collectClusters(nil, nil, nil, fetchAuditLogs, trace.Wrap(err))
106+
collectClusters(nil, nil, nil, trace.Wrap(err))
116107
return nil
117108
}
118109

@@ -130,14 +121,10 @@ func (a *Fetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClusters
130121
oldAssociatedPolicies := sliceFilter(existing.AssociatedAccessPolicies, func(ap *accessgraphv1alpha.AWSEKSAssociatedAccessPolicyV1) bool {
131122
return ap.Cluster.Region == region && ap.AccountId == a.AccountID
132123
})
133-
oldAuditLogClusters := sliceFilter(existing.EKSAuditLogClusters, func(cluster *accessgraphv1alpha.AWSEKSClusterV1) bool {
134-
return cluster.Region == region && cluster.AccountId == a.AccountID
135-
})
136124
hostsMu.Lock()
137125
output.clusters = append(output.clusters, oldEKSClusters...)
138126
output.associatedPolicies = append(output.associatedPolicies, oldAssociatedPolicies...)
139127
output.accessEntry = append(output.accessEntry, oldAccessEntries...)
140-
output.auditLogClusters = append(output.auditLogClusters, oldAuditLogClusters...)
141128
hostsMu.Unlock()
142129
break
143130
}
@@ -160,27 +147,22 @@ func (a *Fetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClusters
160147
},
161148
)
162149
if err != nil {
163-
fetchAuditLogs := slices.Contains(existing.EKSAuditLogClusters, oldCluster)
164-
collectClusters(oldCluster, oldAssociatedPolicies, oldAccessEntries, fetchAuditLogs, trace.Wrap(err))
150+
collectClusters(oldCluster, oldAssociatedPolicies, oldAccessEntries, trace.Wrap(err))
165151
return nil
166152
}
167153
protoCluster := awsEKSClusterToProtoCluster(cluster.Cluster, region, a.AccountID)
168-
fetchAuditLogs := false
169-
if a.EKSAuditLogs != nil {
170-
fetchAuditLogs, _, _ = services.MatchLabels(a.EKSAuditLogs.Tags, cluster.Cluster.Tags)
171-
}
172154

173155
// if eks cluster only allows CONFIGMAP auth, skip polling of access entries and
174156
// associated policies.
175157
if cluster.Cluster != nil && cluster.Cluster.AccessConfig != nil &&
176158
cluster.Cluster.AccessConfig.AuthenticationMode == ekstypes.AuthenticationModeConfigMap {
177-
collectClusters(protoCluster, nil, nil, fetchAuditLogs, nil)
159+
collectClusters(protoCluster, nil, nil, nil)
178160
continue
179161
}
180162
// fetchAccessEntries retries the list of configured access entries
181163
accessEntries, err := a.fetchAccessEntries(ctx, eksClient, protoCluster)
182164
if err != nil {
183-
collectClusters(protoCluster, oldAssociatedPolicies, oldAccessEntries, fetchAuditLogs, trace.Wrap(err))
165+
collectClusters(protoCluster, oldAssociatedPolicies, oldAccessEntries, trace.Wrap(err))
184166
}
185167

186168
accessEntryARNs := make([]string, 0, len(accessEntries))
@@ -193,9 +175,9 @@ func (a *Fetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClusters
193175

194176
associatedPolicies, err := a.fetchAssociatedPolicies(ctx, eksClient, protoCluster, accessEntryARNs)
195177
if err != nil {
196-
collectClusters(protoCluster, oldAssociatedPolicies, accessEntries, fetchAuditLogs, trace.Wrap(err))
178+
collectClusters(protoCluster, oldAssociatedPolicies, accessEntries, trace.Wrap(err))
197179
}
198-
collectClusters(protoCluster, associatedPolicies, accessEntries, fetchAuditLogs, nil)
180+
collectClusters(protoCluster, associatedPolicies, accessEntries, nil)
199181
}
200182
return nil
201183
})

lib/srv/discovery/fetchers/aws-sync/eks_test.go

Lines changed: 16 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"google.golang.org/protobuf/types/known/timestamppb"
3434
"google.golang.org/protobuf/types/known/wrapperspb"
3535

36-
"github.com/gravitational/teleport/api/utils"
3736
accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
3837
"github.com/gravitational/teleport/lib/cloud/awsconfig"
3938
)
@@ -48,7 +47,7 @@ const (
4847
type mockedEKSClient struct {
4948
clusters []*ekstypes.Cluster
5049
accessEntries []*ekstypes.AccessEntry
51-
associatedAccessPolicies map[string][]ekstypes.AssociatedAccessPolicy
50+
associatedAccessPolicies []ekstypes.AssociatedAccessPolicy
5251
}
5352

5453
func (m *mockedEKSClient) DescribeCluster(ctx context.Context, input *eks.DescribeClusterInput, optFns ...func(*eks.Options)) (*eks.DescribeClusterOutput, error) {
@@ -75,9 +74,7 @@ func (m *mockedEKSClient) ListClusters(ctx context.Context, input *eks.ListClust
7574
func (m *mockedEKSClient) ListAccessEntries(ctx context.Context, input *eks.ListAccessEntriesInput, optFns ...func(*eks.Options)) (*eks.ListAccessEntriesOutput, error) {
7675
accessEntries := make([]string, 0, len(m.accessEntries))
7776
for _, accessEntry := range m.accessEntries {
78-
if *input.ClusterName == *accessEntry.ClusterName {
79-
accessEntries = append(accessEntries, aws.ToString(accessEntry.AccessEntryArn))
80-
}
77+
accessEntries = append(accessEntries, aws.ToString(accessEntry.AccessEntryArn))
8178
}
8279
return &eks.ListAccessEntriesOutput{
8380
AccessEntries: accessEntries,
@@ -86,7 +83,7 @@ func (m *mockedEKSClient) ListAccessEntries(ctx context.Context, input *eks.List
8683

8784
func (m *mockedEKSClient) ListAssociatedAccessPolicies(ctx context.Context, input *eks.ListAssociatedAccessPoliciesInput, optFns ...func(*eks.Options)) (*eks.ListAssociatedAccessPoliciesOutput, error) {
8885
return &eks.ListAssociatedAccessPoliciesOutput{
89-
AssociatedAccessPolicies: m.associatedAccessPolicies[*input.ClusterName],
86+
AssociatedAccessPolicies: m.associatedAccessPolicies,
9087
}, nil
9188
}
9289

@@ -113,7 +110,7 @@ func TestPollAWSEKSClusters(t *testing.T) {
113110
accountID = "12345678"
114111
)
115112
regions := []string{"eu-west-1"}
116-
cluster1 := &accessgraphv1alpha.AWSEKSClusterV1{
113+
cluster := &accessgraphv1alpha.AWSEKSClusterV1{
117114
Name: "cluster1",
118115
Arn: "arn:us-west1:eks:cluster1",
119116
CreatedAt: timestamppb.New(date),
@@ -131,24 +128,6 @@ func TestPollAWSEKSClusters(t *testing.T) {
131128
Region: "eu-west-1",
132129
AccountId: "12345678",
133130
}
134-
cluster2 := &accessgraphv1alpha.AWSEKSClusterV1{
135-
Name: "cluster2",
136-
Arn: "arn:us-west1:eks:cluster2",
137-
CreatedAt: timestamppb.New(date),
138-
Status: "ACTIVE",
139-
Tags: []*accessgraphv1alpha.AWSTag{
140-
{
141-
Key: "tag1",
142-
Value: wrapperspb.String("hello"),
143-
},
144-
{
145-
Key: "tag2",
146-
Value: wrapperspb.String("val2"),
147-
},
148-
},
149-
Region: "eu-west-1",
150-
AccountId: "12345678",
151-
}
152131
tests := []struct {
153132
name string
154133
want *Resources
@@ -157,14 +136,11 @@ func TestPollAWSEKSClusters(t *testing.T) {
157136
name: "poll eks clusters",
158137
want: &Resources{
159138
EKSClusters: []*accessgraphv1alpha.AWSEKSClusterV1{
160-
cluster1, cluster2,
161-
},
162-
EKSAuditLogClusters: []*accessgraphv1alpha.AWSEKSClusterV1{
163-
cluster2,
139+
cluster,
164140
},
165141
AccessEntries: []*accessgraphv1alpha.AWSEKSClusterAccessEntryV1{
166142
{
167-
Cluster: cluster1,
143+
Cluster: cluster,
168144
AccessEntryArn: "arn:iam:access_entry",
169145
CreatedAt: timestamppb.New(date),
170146
KubernetesGroups: []string{"teleport"},
@@ -183,7 +159,7 @@ func TestPollAWSEKSClusters(t *testing.T) {
183159
},
184160
AssociatedAccessPolicies: []*accessgraphv1alpha.AWSEKSAssociatedAccessPolicyV1{
185161
{
186-
Cluster: cluster1,
162+
Cluster: cluster,
187163
PrincipalArn: principalARN,
188164
Scope: &accessgraphv1alpha.AWSEKSAccessScopeV1{
189165
Type: string(ekstypes.AccessScopeTypeCluster),
@@ -226,12 +202,6 @@ func TestPollAWSEKSClusters(t *testing.T) {
226202
Regions: regions,
227203
Integration: accountID,
228204
GetEKSClient: getEKSClient,
229-
EKSAuditLogs: &EKSAuditLogs{
230-
Tags: map[string]utils.Strings{
231-
"tag1": utils.Strings{"hello", "world"},
232-
"tag2": utils.Strings{"val*"},
233-
},
234-
},
235205
},
236206
lastResult: &Resources{},
237207
}
@@ -266,16 +236,6 @@ func eksClusters() []*ekstypes.Cluster {
266236
"tag2": "val2",
267237
},
268238
},
269-
{
270-
Name: aws.String("cluster2"),
271-
Arn: aws.String("arn:us-west1:eks:cluster2"),
272-
CreatedAt: aws.Time(date),
273-
Status: ekstypes.ClusterStatusActive,
274-
Tags: map[string]string{
275-
"tag1": "hello",
276-
"tag2": "val2",
277-
},
278-
},
279239
}
280240
}
281241

@@ -297,18 +257,16 @@ func accessEntries() []*ekstypes.AccessEntry {
297257
}
298258
}
299259

300-
func associatedPolicies() map[string][]ekstypes.AssociatedAccessPolicy {
301-
return map[string][]ekstypes.AssociatedAccessPolicy{
302-
"cluster1": {
303-
{
304-
AccessScope: &ekstypes.AccessScope{
305-
Namespaces: []string{"ns1"},
306-
Type: ekstypes.AccessScopeTypeCluster,
307-
},
308-
ModifiedAt: aws.Time(date),
309-
AssociatedAt: aws.Time(date),
310-
PolicyArn: aws.String("policy_arn"),
260+
func associatedPolicies() []ekstypes.AssociatedAccessPolicy {
261+
return []ekstypes.AssociatedAccessPolicy{
262+
{
263+
AccessScope: &ekstypes.AccessScope{
264+
Namespaces: []string{"ns1"},
265+
Type: ekstypes.AccessScopeTypeCluster,
311266
},
267+
ModifiedAt: aws.Time(date),
268+
AssociatedAt: aws.Time(date),
269+
PolicyArn: aws.String("policy_arn"),
312270
},
313271
}
314272
}

lib/srv/discovery/fetchers/aws-sync/rds_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ type fakeAWSClients struct {
225225
s3Client s3Client
226226
stsClient stsClient
227227
kmsClient kmsClient
228-
cwlClient cwlClient
228+
cwlClient cloudwatchlogs.FilterLogEventsAPIClient
229229
}
230230

231231
func (f fakeAWSClients) getIAMClient(cfg aws.Config, optFns ...func(*iam.Options)) iamClient {
@@ -248,6 +248,6 @@ func (f fakeAWSClients) getKMSClient(cfg aws.Config, optFns ...func(*kms.Options
248248
return f.kmsClient
249249
}
250250

251-
func (f fakeAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cwlClient {
251+
func (f fakeAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient {
252252
return f.cwlClient
253253
}

0 commit comments

Comments
 (0)