Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/datareading.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package api
import (
"encoding/json"
"time"

"k8s.io/apimachinery/pkg/version"
)

// DataReadingsPost is the payload in the upload request.
Expand Down Expand Up @@ -48,3 +50,23 @@ func (v GatheredResource) MarshalJSON() ([]byte, error) {

return json.Marshal(data)
}

// DynamicData is the DataReading.Data returned by the k8s.DataGathererDynamic
// gatherer
type DynamicData struct {
// Items is a list of GatheredResource
Items []*GatheredResource `json:"items"`
}

// DiscoveryData is the DataReading.Data returned by the k8s.ConfigDiscovery
// gatherer
type DiscoveryData struct {
// ClusterID is the unique ID of the Kubernetes cluster which this snapshot was taken from.
// This is sourced from the kube-system namespace UID,
// which is assumed to be stable for the lifetime of the cluster.
// - https://github.com/kubernetes/kubernetes/issues/77487#issuecomment-489786023
ClusterID string `json:"cluster_id"`
// ServerVersion is the version information of the k8s apiserver
// See https://godoc.org/k8s.io/apimachinery/pkg/version#Info
ServerVersion *version.Info `json:"server_version"`
}
122 changes: 120 additions & 2 deletions examples/machinehub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,126 @@
# export ARK_SUBDOMAIN= # your CyberArk tenant subdomain
# export ARK_USERNAME= # your CyberArk username
# export ARK_SECRET= # your CyberArk password
#
# OPTIONAL: the URL for the CyberArk Discovery API if not using the production environment
# # export ARK_DISCOVERY_API=https://platform-discovery.integration-cyberark.cloud/api/v2
#
# go run . agent --one-shot --machine-hub -v 6 --agent-config-file ./examples/machinehub.yaml

data-gatherers:
- kind: "dummy"
name: "dummy"
# Gather Kubernetes API server version information
- name: ark/discovery
kind: k8s-discovery

# Gather Kubernetes secrets, excluding specific types
- name: ark/secrets
kind: k8s-dynamic
config:
resource-type:
version: v1
resource: secrets
field-selectors:
- type!=kubernetes.io/service-account-token
- type!=kubernetes.io/dockercfg
- type!=kubernetes.io/dockerconfigjson
- type!=kubernetes.io/basic-auth
- type!=kubernetes.io/ssh-auth
- type!=bootstrap.kubernetes.io/token
- type!=helm.sh/release.v1

# Gather Kubernetes service accounts
- name: ark/serviceaccounts
kind: k8s-dynamic
config:
resource-type:
resource: serviceaccounts
version: v1

# Gather Kubernetes roles
- name: ark/roles
kind: k8s-dynamic
config:
resource-type:
version: v1
group: rbac.authorization.k8s.io
resource: roles

# Gather Kubernetes cluster roles
- name: ark/clusterroles
kind: k8s-dynamic
config:
resource-type:
version: v1
group: rbac.authorization.k8s.io
resource: clusterroles

# Gather Kubernetes role bindings
- name: ark/rolebindings
kind: k8s-dynamic
config:
resource-type:
version: v1
group: rbac.authorization.k8s.io
resource: rolebindings

# Gather Kubernetes cluster role bindings
- name: ark/clusterrolebindings
kind: k8s-dynamic
config:
resource-type:
version: v1
group: rbac.authorization.k8s.io
resource: clusterrolebindings

# Gather Kubernetes jobs
- name: ark/jobs
kind: k8s-dynamic
config:
resource-type:
version: v1
group: batch
resource: jobs

# Gather Kubernetes cron jobs
- name: ark/cronjobs
kind: k8s-dynamic
config:
resource-type:
version: v1
group: batch
resource: cronjobs

# Gather Kubernetes deployments
- name: ark/deployments
kind: k8s-dynamic
config:
resource-type:
version: v1
group: apps
resource: deployments

# Gather Kubernetes stateful sets
- name: ark/statefulsets
kind: k8s-dynamic
config:
resource-type:
version: v1
group: apps
resource: statefulsets

# Gather Kubernetes daemon sets
- name: ark/daemonsets
kind: k8s-dynamic
config:
resource-type:
version: v1
group: apps
resource: daemonsets

# Gather Kubernetes pods
- name: ark/pods
kind: k8s-dynamic
config:
resource-type:
version: v1
resource: pods
142 changes: 136 additions & 6 deletions pkg/client/client_cyberark.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"net/http"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/jetstack/preflight/api"
"github.com/jetstack/preflight/pkg/internal/cyberark"
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
Expand Down Expand Up @@ -36,10 +39,17 @@ func NewCyberArk(httpClient *http.Client) (*CyberArkClient, error) {
}

// PostDataReadingsWithOptions uploads data readings to CyberArk.
// It converts the supplied data readings into a snapshot format expected by CyberArk.
// It initializes a data upload client with the configured HTTP client and credentials,
// then uploads a snapshot.
// The supplied Options are not used by this publisher.
func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, _ Options) error {
var snapshot dataupload.Snapshot
if err := convertDataReadings(defaultExtractorFunctions, readings, &snapshot); err != nil {
return fmt.Errorf("while converting data readings: %s", err)
}
snapshot.AgentVersion = version.PreflightVersion

cfg, err := o.configLoader()
if err != nil {
return err
Expand All @@ -49,14 +59,134 @@ func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readin
return fmt.Errorf("while initializing data upload client: %s", err)
}

err = datauploadClient.PutSnapshot(ctx, dataupload.Snapshot{
// Temporary hard coded cluster ID.
// TODO(wallrj): The clusterID will eventually be extracted from the supplied readings.
ClusterID: "success-cluster-id",
AgentVersion: version.PreflightVersion,
})
err = datauploadClient.PutSnapshot(ctx, snapshot)
if err != nil {
return fmt.Errorf("while uploading snapshot: %s", err)
}
return nil
}

// extractClusterIDAndServerVersionFromReading converts the opaque data from a DiscoveryData
// data reading to allow access to the Kubernetes version fields within.
func extractClusterIDAndServerVersionFromReading(reading *api.DataReading, target *dataupload.Snapshot) error {
if reading == nil {
return fmt.Errorf("programmer mistake: the DataReading must not be nil")
}
data, ok := reading.Data.(*api.DiscoveryData)
if !ok {
return fmt.Errorf(
"programmer mistake: the DataReading must have data type *api.DiscoveryData. "+
"This DataReading (%s) has data type %T", reading.DataGatherer, reading.Data)
}
target.ClusterID = data.ClusterID
if data.ServerVersion != nil {
target.K8SVersion = data.ServerVersion.GitVersion
}
return nil
}

// extractResourceListFromReading converts the opaque data from a DynamicData
// data reading to runtime.Object resources, to allow access to the metadata and
// other kubernetes API fields.
func extractResourceListFromReading(reading *api.DataReading, target *[]runtime.Object) error {
if reading == nil {
return fmt.Errorf("programmer mistake: the DataReading must not be nil")
}
data, ok := reading.Data.(*api.DynamicData)
if !ok {
return fmt.Errorf(
"programmer mistake: the DataReading must have data type *api.DynamicData. "+
"This DataReading (%s) has data type %T", reading.DataGatherer, reading.Data)
}
resources := make([]runtime.Object, len(data.Items))
for i, item := range data.Items {
if resource, ok := item.Resource.(runtime.Object); ok {
resources[i] = resource
} else {
return fmt.Errorf(
"programmer mistake: the DynamicData items must have Resource type runtime.Object. "+
"This item (%d) has Resource type %T", i, item.Resource)
}
}
*target = resources
return nil
}

var defaultExtractorFunctions = map[string]func(*api.DataReading, *dataupload.Snapshot) error{
"ark/discovery": extractClusterIDAndServerVersionFromReading,
"ark/secrets": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.Secrets)
},
"ark/serviceaccounts": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.ServiceAccounts)
},
"ark/roles": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.Roles)
},
"ark/clusterroles": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.ClusterRoles)
},
"ark/rolebindings": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.RoleBindings)
},
"ark/clusterrolebindings": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.ClusterRoleBindings)
},
"ark/jobs": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.Jobs)
},
"ark/cronjobs": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.CronJobs)
},
"ark/deployments": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.Deployments)
},
"ark/statefulsets": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.Statefulsets)
},
"ark/daemonsets": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.Daemonsets)
},
"ark/pods": func(r *api.DataReading, s *dataupload.Snapshot) error {
return extractResourceListFromReading(r, &s.Pods)
},
}

// convertDataReadings processes a list of DataReadings using the provided
// extractor functions to populate the fields of the target snapshot.
// It ensures that all expected data gatherers are handled and that there are
// no unhandled data gatherers. If any discrepancies are found, or if any
// extractor function returns an error, it returns an error.
// The extractorFunctions map should contain functions for each expected
// DataGatherer name, which will be called with the corresponding DataReading
// and the target snapshot to populate the relevant fields.
func convertDataReadings(
extractorFunctions map[string]func(*api.DataReading, *dataupload.Snapshot) error,
readings []*api.DataReading,
target *dataupload.Snapshot,
) error {
expectedDataGatherers := sets.KeySet(extractorFunctions)
unhandledDataGatherers := sets.New[string]()
missingDataGatherers := expectedDataGatherers.Clone()
for _, reading := range readings {
dataGathererName := reading.DataGatherer
extractFunc, found := extractorFunctions[dataGathererName]
if !found {
unhandledDataGatherers.Insert(dataGathererName)
continue
}
missingDataGatherers.Delete(dataGathererName)
// Call the extractor function to populate the relevant field in the target snapshot.
if err := extractFunc(reading, target); err != nil {
return fmt.Errorf("while extracting data reading %s: %s", dataGathererName, err)
}
}
if missingDataGatherers.Len() > 0 || unhandledDataGatherers.Len() > 0 {
return fmt.Errorf(
"unexpected data gatherers, missing: %v, unhandled: %v",
sets.List(missingDataGatherers),
sets.List(unhandledDataGatherers),
)
}
return nil
}
Loading