55 "fmt"
66 "net/http"
77
8+ "k8s.io/apimachinery/pkg/runtime"
9+ "k8s.io/apimachinery/pkg/util/sets"
10+
811 "github.com/jetstack/preflight/api"
912 "github.com/jetstack/preflight/pkg/internal/cyberark"
1013 "github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
@@ -36,10 +39,20 @@ func NewCyberArk(httpClient *http.Client) (*CyberArkClient, error) {
3639}
3740
3841// PostDataReadingsWithOptions uploads data readings to CyberArk.
42+ // It converts the supplied data readings into a snapshot format expected by CyberArk.
3943// It initializes a data upload client with the configured HTTP client and credentials,
4044// then uploads a snapshot.
4145// The supplied Options are not used by this publisher.
4246func (o * CyberArkClient ) PostDataReadingsWithOptions (ctx context.Context , readings []* api.DataReading , _ Options ) error {
47+ var snapshot dataupload.Snapshot
48+ if err := convertDataReadings (defaultExtractorFunctions , readings , & snapshot ); err != nil {
49+ return fmt .Errorf ("while converting data readings: %s" , err )
50+ }
51+ snapshot .AgentVersion = version .PreflightVersion
52+ // Temporary hard coded cluster ID.
53+ // TODO(wallrj): The clusterID will eventually be extracted from the supplied readings.
54+ snapshot .ClusterID = "success-cluster-id"
55+
4356 cfg , err := o .configLoader ()
4457 if err != nil {
4558 return err
@@ -49,14 +62,136 @@ func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readin
4962 return fmt .Errorf ("while initializing data upload client: %s" , err )
5063 }
5164
52- err = datauploadClient .PutSnapshot (ctx , dataupload.Snapshot {
53- // Temporary hard coded cluster ID.
54- // TODO(wallrj): The clusterID will eventually be extracted from the supplied readings.
55- ClusterID : "success-cluster-id" ,
56- AgentVersion : version .PreflightVersion ,
57- })
65+ err = datauploadClient .PutSnapshot (ctx , snapshot )
5866 if err != nil {
5967 return fmt .Errorf ("while uploading snapshot: %s" , err )
6068 }
6169 return nil
6270}
71+
72+ // extractServerVersionFromReading converts the opaque data from a DiscoveryData
73+ // data reading to allow access to the Kubernetes version fields within.
74+ func extractServerVersionFromReading (reading * api.DataReading , target * string ) error {
75+ if reading == nil {
76+ return fmt .Errorf ("programmer mistake: the DataReading must not be nil" )
77+ }
78+ data , ok := reading .Data .(* api.DiscoveryData )
79+ if ! ok {
80+ return fmt .Errorf (
81+ "programmer mistake: the DataReading must have data type *api.DiscoveryData. " +
82+ "This DataReading (%s) has data type %T" , reading .DataGatherer , reading .Data )
83+ }
84+ if data .ServerVersion == nil {
85+ return nil
86+ }
87+ * target = data .ServerVersion .GitVersion
88+ return nil
89+ }
90+
91+ // extractResourceListFromReading converts the opaque data from a DynamicData
92+ // data reading to runtime.Object resources, to allow access to the metadata and
93+ // other kubernetes API fields.
94+ func extractResourceListFromReading (reading * api.DataReading , target * []runtime.Object ) error {
95+ if reading == nil {
96+ return fmt .Errorf ("programmer mistake: the DataReading must not be nil" )
97+ }
98+ data , ok := reading .Data .(* api.DynamicData )
99+ if ! ok {
100+ return fmt .Errorf (
101+ "programmer mistake: the DataReading must have data type *api.DynamicData. " +
102+ "This DataReading (%s) has data type %T" , reading .DataGatherer , reading .Data )
103+ }
104+ resources := make ([]runtime.Object , len (data .Items ))
105+ for i , item := range data .Items {
106+ if resource , ok := item .Resource .(runtime.Object ); ok {
107+ resources [i ] = resource
108+ } else {
109+ return fmt .Errorf (
110+ "programmer mistake: the DynamicData items must have Resource type runtime.Object. " +
111+ "This item (%d) has Resource type %T" , i , item .Resource )
112+ }
113+ }
114+ * target = resources
115+ return nil
116+ }
117+
118+ var defaultExtractorFunctions = map [string ]func (* api.DataReading , * dataupload.Snapshot ) error {
119+ "ark/discovery" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
120+ return extractServerVersionFromReading (r , & s .K8SVersion )
121+ },
122+ "ark/secrets" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
123+ return extractResourceListFromReading (r , & s .Secrets )
124+ },
125+ "ark/serviceaccounts" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
126+ return extractResourceListFromReading (r , & s .ServiceAccounts )
127+ },
128+ "ark/roles" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
129+ return extractResourceListFromReading (r , & s .Roles )
130+ },
131+ "ark/clusterroles" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
132+ return extractResourceListFromReading (r , & s .ClusterRoles )
133+ },
134+ "ark/rolebindings" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
135+ return extractResourceListFromReading (r , & s .RoleBindings )
136+ },
137+ "ark/clusterrolebindings" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
138+ return extractResourceListFromReading (r , & s .ClusterRoleBindings )
139+ },
140+ "ark/jobs" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
141+ return extractResourceListFromReading (r , & s .Jobs )
142+ },
143+ "ark/cronjobs" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
144+ return extractResourceListFromReading (r , & s .CronJobs )
145+ },
146+ "ark/deployments" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
147+ return extractResourceListFromReading (r , & s .Deployments )
148+ },
149+ "ark/statefulsets" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
150+ return extractResourceListFromReading (r , & s .Statefulsets )
151+ },
152+ "ark/daemonsets" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
153+ return extractResourceListFromReading (r , & s .Daemonsets )
154+ },
155+ "ark/pods" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
156+ return extractResourceListFromReading (r , & s .Pods )
157+ },
158+ }
159+
160+ // convertDataReadings processes a list of DataReadings using the provided
161+ // extractor functions to populate the fields of the target snapshot.
162+ // It ensures that all expected data gatherers are handled and that there are
163+ // no unhandled data gatherers. If any discrepancies are found, or if any
164+ // extractor function returns an error, it returns an error.
165+ // The extractorFunctions map should contain functions for each expected
166+ // DataGatherer name, which will be called with the corresponding DataReading
167+ // and the target snapshot to populate the relevant fields.
168+ func convertDataReadings (
169+ extractorFunctions map [string ]func (* api.DataReading , * dataupload.Snapshot ) error ,
170+ readings []* api.DataReading ,
171+ target * dataupload.Snapshot ,
172+ ) error {
173+ expectedDataGatherers := sets .KeySet (extractorFunctions )
174+ unhandledDataGatherers := sets .New [string ]()
175+ missingDataGatherers := expectedDataGatherers .Clone ()
176+ for _ , reading := range readings {
177+ dataGathererName := reading .DataGatherer
178+ extractFunc , found := extractorFunctions [dataGathererName ]
179+ if ! found {
180+ unhandledDataGatherers .Insert (dataGathererName )
181+ continue
182+ }
183+ missingDataGatherers .Delete (dataGathererName )
184+ // Call the extractor function to populate the relevant field in the target snapshot.
185+ if err := extractFunc (reading , target ); err != nil {
186+ return fmt .Errorf ("while extracting data reading %s: %s" , dataGathererName , err )
187+ }
188+ }
189+ if missingDataGatherers .Len () > 0 || unhandledDataGatherers .Len () > 0 {
190+ return fmt .Errorf (
191+ "unexpected data gatherers, missing: %v, unhandled: %v" ,
192+ sets .List (missingDataGatherers ),
193+ sets .List (unhandledDataGatherers ),
194+ )
195+ }
196+ return nil
197+ }
0 commit comments