Skip to content

Commit 00e4e91

Browse files
CyberArk(client): add CyberArk snapshot conversion
- Introduced `convertDataReadings` to process `DataReading` objects into snapshots. - Added support for extracting Kubernetes server version and dynamic resources. - Updated `CyberArkClient` to use the new data conversion logic. - Refactored `DiscoveryData` and `DynamicData` structures for better type safety. - Replaced `unstructured.Unstructured` with `runtime.Object` in `Snapshot` fields. - Enhanced `DataGathererDiscovery` and `DataGathererDynamic` to return strongly typed data. - Added unit tests for new data extraction and conversion functions. Signed-off-by: Richard Wall <[email protected]>
1 parent 2d030d4 commit 00e4e91

File tree

12 files changed

+638
-47
lines changed

12 files changed

+638
-47
lines changed

api/datareading.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package api
33
import (
44
"encoding/json"
55
"time"
6+
7+
"k8s.io/apimachinery/pkg/version"
68
)
79

810
// DataReadingsPost is the payload in the upload request.
@@ -48,3 +50,18 @@ func (v GatheredResource) MarshalJSON() ([]byte, error) {
4850

4951
return json.Marshal(data)
5052
}
53+
54+
// DynamicData is the DataReading.Data returned by the k8s.DataGathererDynamic
55+
// gatherer
56+
type DynamicData struct {
57+
// Items is a list of GatheredResource
58+
Items []*GatheredResource `json:"items"`
59+
}
60+
61+
// DiscoveryData is the DataReading.Data returned by the k8s.ConfigDiscovery
62+
// gatherer
63+
type DiscoveryData struct {
64+
// ServerVersion is the version information of the k8s apiserver
65+
// See https://godoc.org/k8s.io/apimachinery/pkg/version#Info
66+
ServerVersion *version.Info `json:"server_version"`
67+
}

examples/machinehub.yaml

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,126 @@
55
# export ARK_SUBDOMAIN= # your CyberArk tenant subdomain
66
# export ARK_USERNAME= # your CyberArk username
77
# export ARK_SECRET= # your CyberArk password
8+
#
9+
# OPTIONAL: the URL for the CyberArk Discovery API if not using the production environment
10+
# # export ARK_DISCOVERY_API=https://platform-discovery.integration-cyberark.cloud/api/v2
11+
#
812
# go run . agent --one-shot --machine-hub -v 6 --agent-config-file ./examples/machinehub.yaml
913

1014
data-gatherers:
11-
- kind: "dummy"
12-
name: "dummy"
15+
# Gather Kubernetes API server version information
16+
- name: ark/discovery
17+
kind: k8s-discovery
18+
19+
# Gather Kubernetes secrets, excluding specific types
20+
- name: ark/secrets
21+
kind: k8s-dynamic
22+
config:
23+
resource-type:
24+
version: v1
25+
resource: secrets
26+
field-selectors:
27+
- type!=kubernetes.io/service-account-token
28+
- type!=kubernetes.io/dockercfg
29+
- type!=kubernetes.io/dockerconfigjson
30+
- type!=kubernetes.io/basic-auth
31+
- type!=kubernetes.io/ssh-auth
32+
- type!=bootstrap.kubernetes.io/token
33+
- type!=helm.sh/release.v1
34+
35+
# Gather Kubernetes service accounts
36+
- name: ark/serviceaccounts
37+
kind: k8s-dynamic
38+
config:
39+
resource-type:
40+
resource: serviceaccounts
41+
version: v1
42+
43+
# Gather Kubernetes roles
44+
- name: ark/roles
45+
kind: k8s-dynamic
46+
config:
47+
resource-type:
48+
version: v1
49+
group: rbac.authorization.k8s.io
50+
resource: roles
51+
52+
# Gather Kubernetes cluster roles
53+
- name: ark/clusterroles
54+
kind: k8s-dynamic
55+
config:
56+
resource-type:
57+
version: v1
58+
group: rbac.authorization.k8s.io
59+
resource: clusterroles
60+
61+
# Gather Kubernetes role bindings
62+
- name: ark/rolebindings
63+
kind: k8s-dynamic
64+
config:
65+
resource-type:
66+
version: v1
67+
group: rbac.authorization.k8s.io
68+
resource: rolebindings
69+
70+
# Gather Kubernetes cluster role bindings
71+
- name: ark/clusterrolebindings
72+
kind: k8s-dynamic
73+
config:
74+
resource-type:
75+
version: v1
76+
group: rbac.authorization.k8s.io
77+
resource: clusterrolebindings
78+
79+
# Gather Kubernetes jobs
80+
- name: ark/jobs
81+
kind: k8s-dynamic
82+
config:
83+
resource-type:
84+
version: v1
85+
group: batch
86+
resource: jobs
87+
88+
# Gather Kubernetes cron jobs
89+
- name: ark/cronjobs
90+
kind: k8s-dynamic
91+
config:
92+
resource-type:
93+
version: v1
94+
group: batch
95+
resource: cronjobs
96+
97+
# Gather Kubernetes deployments
98+
- name: ark/deployments
99+
kind: k8s-dynamic
100+
config:
101+
resource-type:
102+
version: v1
103+
group: apps
104+
resource: deployments
105+
106+
# Gather Kubernetes stateful sets
107+
- name: ark/statefulsets
108+
kind: k8s-dynamic
109+
config:
110+
resource-type:
111+
version: v1
112+
group: apps
113+
resource: statefulsets
114+
115+
# Gather Kubernetes daemon sets
116+
- name: ark/daemonsets
117+
kind: k8s-dynamic
118+
config:
119+
resource-type:
120+
version: v1
121+
group: apps
122+
resource: daemonsets
123+
124+
# Gather Kubernetes pods
125+
- name: ark/pods
126+
kind: k8s-dynamic
127+
config:
128+
resource-type:
129+
version: v1
130+
resource: pods

pkg/client/client_cyberark.go

Lines changed: 141 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"fmt"
66
"net/http"
77

8+
"k8s.io/apimachinery/pkg/runtime"
9+
"k8s.io/apimachinery/pkg/util/sets"
10+
811
"github.com/jetstack/preflight/api"
912
"github.com/jetstack/preflight/pkg/internal/cyberark"
1013
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
@@ -36,10 +39,20 @@ func NewCyberArk(httpClient *http.Client) (*CyberArkClient, error) {
3639
}
3740

3841
// PostDataReadingsWithOptions uploads data readings to CyberArk.
42+
// It converts the supplied data readings into a snapshot format expected by CyberArk.
3943
// It initializes a data upload client with the configured HTTP client and credentials,
4044
// then uploads a snapshot.
4145
// The supplied Options are not used by this publisher.
4246
func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, _ Options) error {
47+
var snapshot dataupload.Snapshot
48+
if err := convertDataReadings(defaultExtractorFunctions, readings, &snapshot); err != nil {
49+
return fmt.Errorf("while converting data readings: %s", err)
50+
}
51+
snapshot.AgentVersion = version.PreflightVersion
52+
// Temporary hard coded cluster ID.
53+
// TODO(wallrj): The clusterID will eventually be extracted from the supplied readings.
54+
snapshot.ClusterID = "success-cluster-id"
55+
4356
cfg, err := o.configLoader()
4457
if err != nil {
4558
return err
@@ -49,14 +62,136 @@ func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readin
4962
return fmt.Errorf("while initializing data upload client: %s", err)
5063
}
5164

52-
err = datauploadClient.PutSnapshot(ctx, dataupload.Snapshot{
53-
// Temporary hard coded cluster ID.
54-
// TODO(wallrj): The clusterID will eventually be extracted from the supplied readings.
55-
ClusterID: "success-cluster-id",
56-
AgentVersion: version.PreflightVersion,
57-
})
65+
err = datauploadClient.PutSnapshot(ctx, snapshot)
5866
if err != nil {
5967
return fmt.Errorf("while uploading snapshot: %s", err)
6068
}
6169
return nil
6270
}
71+
72+
// extractServerVersionFromReading converts the opaque data from a DiscoveryData
73+
// data reading to allow access to the Kubernetes version fields within.
74+
func extractServerVersionFromReading(reading *api.DataReading, target *string) error {
75+
if reading == nil {
76+
return fmt.Errorf("programmer mistake: the DataReading must not be nil")
77+
}
78+
data, ok := reading.Data.(*api.DiscoveryData)
79+
if !ok {
80+
return fmt.Errorf(
81+
"programmer mistake: the DataReading must have data type *api.DiscoveryData. "+
82+
"This DataReading (%s) has data type %T", reading.DataGatherer, reading.Data)
83+
}
84+
if data.ServerVersion == nil {
85+
return nil
86+
}
87+
*target = data.ServerVersion.GitVersion
88+
return nil
89+
}
90+
91+
// extractResourceListFromReading converts the opaque data from a DynamicData
92+
// data reading to runtime.Object resources, to allow access to the metadata and
93+
// other kubernetes API fields.
94+
func extractResourceListFromReading(reading *api.DataReading, target *[]runtime.Object) error {
95+
if reading == nil {
96+
return fmt.Errorf("programmer mistake: the DataReading must not be nil")
97+
}
98+
data, ok := reading.Data.(*api.DynamicData)
99+
if !ok {
100+
return fmt.Errorf(
101+
"programmer mistake: the DataReading must have data type *api.DynamicData. "+
102+
"This DataReading (%s) has data type %T", reading.DataGatherer, reading.Data)
103+
}
104+
resources := make([]runtime.Object, len(data.Items))
105+
for i, item := range data.Items {
106+
if resource, ok := item.Resource.(runtime.Object); ok {
107+
resources[i] = resource
108+
} else {
109+
return fmt.Errorf(
110+
"programmer mistake: the DynamicData items must have Resource type runtime.Object. "+
111+
"This item (%d) has Resource type %T", i, item.Resource)
112+
}
113+
}
114+
*target = resources
115+
return nil
116+
}
117+
118+
var defaultExtractorFunctions = map[string]func(*api.DataReading, *dataupload.Snapshot) error{
119+
"ark/discovery": func(r *api.DataReading, s *dataupload.Snapshot) error {
120+
return extractServerVersionFromReading(r, &s.K8SVersion)
121+
},
122+
"ark/secrets": func(r *api.DataReading, s *dataupload.Snapshot) error {
123+
return extractResourceListFromReading(r, &s.Secrets)
124+
},
125+
"ark/serviceaccounts": func(r *api.DataReading, s *dataupload.Snapshot) error {
126+
return extractResourceListFromReading(r, &s.ServiceAccounts)
127+
},
128+
"ark/roles": func(r *api.DataReading, s *dataupload.Snapshot) error {
129+
return extractResourceListFromReading(r, &s.Roles)
130+
},
131+
"ark/clusterroles": func(r *api.DataReading, s *dataupload.Snapshot) error {
132+
return extractResourceListFromReading(r, &s.ClusterRoles)
133+
},
134+
"ark/rolebindings": func(r *api.DataReading, s *dataupload.Snapshot) error {
135+
return extractResourceListFromReading(r, &s.RoleBindings)
136+
},
137+
"ark/clusterrolebindings": func(r *api.DataReading, s *dataupload.Snapshot) error {
138+
return extractResourceListFromReading(r, &s.ClusterRoleBindings)
139+
},
140+
"ark/jobs": func(r *api.DataReading, s *dataupload.Snapshot) error {
141+
return extractResourceListFromReading(r, &s.Jobs)
142+
},
143+
"ark/cronjobs": func(r *api.DataReading, s *dataupload.Snapshot) error {
144+
return extractResourceListFromReading(r, &s.CronJobs)
145+
},
146+
"ark/deployments": func(r *api.DataReading, s *dataupload.Snapshot) error {
147+
return extractResourceListFromReading(r, &s.Deployments)
148+
},
149+
"ark/statefulsets": func(r *api.DataReading, s *dataupload.Snapshot) error {
150+
return extractResourceListFromReading(r, &s.Statefulsets)
151+
},
152+
"ark/daemonsets": func(r *api.DataReading, s *dataupload.Snapshot) error {
153+
return extractResourceListFromReading(r, &s.Daemonsets)
154+
},
155+
"ark/pods": func(r *api.DataReading, s *dataupload.Snapshot) error {
156+
return extractResourceListFromReading(r, &s.Pods)
157+
},
158+
}
159+
160+
// convertDataReadings processes a list of DataReadings using the provided
161+
// extractor functions to populate the fields of the target snapshot.
162+
// It ensures that all expected data gatherers are handled and that there are
163+
// no unhandled data gatherers. If any discrepancies are found, or if any
164+
// extractor function returns an error, it returns an error.
165+
// The extractorFunctions map should contain functions for each expected
166+
// DataGatherer name, which will be called with the corresponding DataReading
167+
// and the target snapshot to populate the relevant fields.
168+
func convertDataReadings[T any](
169+
extractorFunctions map[string]func(*api.DataReading, *T) error,
170+
readings []*api.DataReading,
171+
target *T,
172+
) error {
173+
expectedDataGatherers := sets.StringKeySet(extractorFunctions)
174+
unhandledDataGatherers := sets.New[string]()
175+
missingDataGatherers := sets.New[string](expectedDataGatherers.UnsortedList()...)
176+
for _, reading := range readings {
177+
dataGathererName := reading.DataGatherer
178+
extractFunc, found := extractorFunctions[dataGathererName]
179+
if !found {
180+
unhandledDataGatherers.Insert(dataGathererName)
181+
continue
182+
}
183+
missingDataGatherers.Delete(dataGathererName)
184+
// Call the extractor function to populate the relevant field in the target snapshot.
185+
if err := extractFunc(reading, target); err != nil {
186+
return fmt.Errorf("while extracting data reading %s: %s", dataGathererName, err)
187+
}
188+
}
189+
if missingDataGatherers.Len() > 0 || unhandledDataGatherers.Len() > 0 {
190+
return fmt.Errorf(
191+
"unexpected data gatherers, missing: %v, unhandled: %v",
192+
sets.List(missingDataGatherers),
193+
sets.List(unhandledDataGatherers),
194+
)
195+
}
196+
return nil
197+
}

0 commit comments

Comments
 (0)