Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions pkg/internal/cyberark/dataupload/dataupload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"net/http"
"net/url"

"github.com/jetstack/preflight/api"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/jetstack/preflight/pkg/version"
)

Expand Down Expand Up @@ -46,7 +47,43 @@ func New(httpClient *http.Client, baseURL string, authenticateRequest func(req *
}
}

// PostDataReadingsWithOptions PUTs the supplied payload to an [AWS presigned URL] which it obtains via the CyberArk inventory API.
// Snapshot is the JSON that the CyberArk Discovery and Context API expects to
// be uploaded to the AWS presigned URL.
type Snapshot struct {
// AgentVersion is the version of the Venafi Kubernetes Agent which is uploading this snapshot.
AgentVersion string `json:"agent_version"`
// ClusterID is the unique ID of the Kubernetes cluster which this snapshot was taken from.
ClusterID string `json:"cluster_id"`
// K8SVersion is the version of Kubernetes which the cluster is running.
K8SVersion string `json:"k8s_version"`
// Secrets is a list of Secret resources in the cluster. Not all Secret
// types are included and only a subset of the Secret data is included.
Secrets []*unstructured.Unstructured `json:"secrets"`
// ServiceAccounts is a list of ServiceAccount resources in the cluster.
ServiceAccounts []*unstructured.Unstructured `json:"serviceaccounts"`
// Roles is a list of Role resources in the cluster.
Roles []*unstructured.Unstructured `json:"roles"`
// ClusterRoles is a list of ClusterRole resources in the cluster.
ClusterRoles []*unstructured.Unstructured `json:"clusterroles"`
// RoleBindings is a list of RoleBinding resources in the cluster.
RoleBindings []*unstructured.Unstructured `json:"rolebindings"`
// ClusterRoleBindings is a list of ClusterRoleBinding resources in the cluster.
ClusterRoleBindings []*unstructured.Unstructured `json:"clusterrolebindings"`
// Jobs is a list of Job resources in the cluster.
Jobs []*unstructured.Unstructured `json:"jobs"`
// CronJobs is a list of CronJob resources in the cluster.
CronJobs []*unstructured.Unstructured `json:"cronjobs"`
// Deployments is a list of Deployment resources in the cluster.
Deployments []*unstructured.Unstructured `json:"deployments"`
// Statefulsets is a list of StatefulSet resources in the cluster.
Statefulsets []*unstructured.Unstructured `json:"statefulsets"`
// Daemonsets is a list of DaemonSet resources in the cluster.
Daemonsets []*unstructured.Unstructured `json:"daemonsets"`
// Pods is a list of Pod resources in the cluster.
Pods []*unstructured.Unstructured `json:"pods"`
}

// PutSnapshot PUTs the supplied snapshot to an [AWS presigned URL] which it obtains via the CyberArk inventory API.
// [AWS presigned URL]: https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
//
// A SHA256 checksum header is included in the request, to verify that the payload
Expand All @@ -60,20 +97,20 @@ func New(httpClient *http.Client, baseURL string, authenticateRequest func(req *
// If you omit that header, it is possible to PUT any data.
// There is a work around listed in that issue which we have shared with the
// CyberArk API team.
func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payload api.DataReadingsPost, opts Options) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing that. The Options struct was odd, it looks much better now.

if opts.ClusterName == "" {
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
func (c *CyberArkClient) PutSnapshot(ctx context.Context, snapshot Snapshot) error {
if snapshot.ClusterID == "" {
return fmt.Errorf("programmer mistake: the snapshot cluster ID cannot be left empty")
}

encodedBody := &bytes.Buffer{}
hash := sha256.New()
if err := json.NewEncoder(io.MultiWriter(encodedBody, hash)).Encode(payload); err != nil {
if err := json.NewEncoder(io.MultiWriter(encodedBody, hash)).Encode(snapshot); err != nil {
return err
}
checksum := hash.Sum(nil)
checksumHex := hex.EncodeToString(checksum)
checksumBase64 := base64.StdEncoding.EncodeToString(checksum)
presignedUploadURL, err := c.retrievePresignedUploadURL(ctx, checksumHex, opts)
presignedUploadURL, err := c.retrievePresignedUploadURL(ctx, checksumHex, snapshot.ClusterID)
if err != nil {
return fmt.Errorf("while retrieving snapshot upload URL: %s", err)
}
Expand Down Expand Up @@ -103,7 +140,7 @@ func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payloa
return nil
}

func (c *CyberArkClient) retrievePresignedUploadURL(ctx context.Context, checksum string, opts Options) (string, error) {
func (c *CyberArkClient) retrievePresignedUploadURL(ctx context.Context, checksum string, clusterID string) (string, error) {
uploadURL, err := url.JoinPath(c.baseURL, apiPathSnapshotLinks)
if err != nil {
return "", err
Expand All @@ -114,7 +151,7 @@ func (c *CyberArkClient) retrievePresignedUploadURL(ctx context.Context, checksu
Checksum string `json:"checksum_sha256"`
AgentVersion string `json:"agent_version"`
}{
ClusterID: opts.ClusterName,
ClusterID: clusterID,
Checksum: checksum,
AgentVersion: version.PreflightVersion,
}
Expand Down
85 changes: 37 additions & 48 deletions pkg/internal/cyberark/dataupload/dataupload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"net/http"
"os"
"testing"
"time"

"github.com/jetstack/venafi-connection-lib/http_client"
"github.com/stretchr/testify/require"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"

"github.com/jetstack/preflight/api"
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
"github.com/jetstack/preflight/pkg/internal/cyberark/identity"
"github.com/jetstack/preflight/pkg/internal/cyberark/servicediscovery"
Expand All @@ -23,28 +21,10 @@ import (
_ "k8s.io/klog/v2/ktesting/init"
)

func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
fakeTime := time.Unix(123, 0)
defaultPayload := api.DataReadingsPost{
AgentMetadata: &api.AgentMetadata{
Version: "test-version",
ClusterID: "test",
},
DataGatherTime: fakeTime,
DataReadings: []*api.DataReading{
{
ClusterID: "success-cluster-id",
DataGatherer: "test-gatherer",
Timestamp: api.Time{Time: fakeTime},
Data: map[string]interface{}{"test": "data"},
SchemaVersion: "v1",
},
},
}
defaultOpts := dataupload.Options{
ClusterName: "success-cluster-id",
}

// TestCyberArkClient_PutSnapshot_MockAPI tests the dataupload code against a
// mock API server. The mock server is configured to return different responses
// based on the cluster ID and bearer token used in the request.
func TestCyberArkClient_PutSnapshot_MockAPI(t *testing.T) {
setToken := func(token string) func(*http.Request) error {
return func(req *http.Request) error {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
Expand All @@ -54,51 +34,60 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {

tests := []struct {
name string
payload api.DataReadingsPost
snapshot dataupload.Snapshot
authenticate func(req *http.Request) error
opts dataupload.Options
requireFn func(t *testing.T, err error)
}{
{
name: "successful upload",
payload: defaultPayload,
opts: defaultOpts,
name: "successful upload",
snapshot: dataupload.Snapshot{
ClusterID: "success-cluster-id",
AgentVersion: "test-version",
},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "error when cluster name is empty",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: ""},
name: "error when cluster ID is empty",
snapshot: dataupload.Snapshot{
ClusterID: "",
AgentVersion: "test-version",
},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "programmer mistake: the cluster name")
require.ErrorContains(t, err, "programmer mistake: the snapshot cluster ID cannot be left empty")
},
},
{
name: "error when bearer token is incorrect",
payload: defaultPayload,
opts: defaultOpts,
name: "error when bearer token is incorrect",
snapshot: dataupload.Snapshot{
ClusterID: "test",
AgentVersion: "test-version",
},
authenticate: setToken("fail-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "while retrieving snapshot upload URL: received response with status code 500: should authenticate using the correct bearer token")
},
},
{
name: "invalid JSON from server (RetrievePresignedUploadURL step)",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: "invalid-json-retrieve-presigned"},
name: "invalid JSON from server (RetrievePresignedUploadURL step)",
snapshot: dataupload.Snapshot{
ClusterID: "invalid-json-retrieve-presigned",
AgentVersion: "test-version",
},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "while retrieving snapshot upload URL: rejecting JSON response from server as it was too large or was truncated")
},
},
{
name: "500 from server (RetrievePresignedUploadURL step)",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: "invalid-response-post-data"},
name: "500 from server (RetrievePresignedUploadURL step)",
snapshot: dataupload.Snapshot{
ClusterID: "invalid-response-post-data",
AgentVersion: "test-version",
},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "while retrieving snapshot upload URL: received response with status code 500: mock error")
Expand All @@ -115,13 +104,13 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {

cyberArkClient := dataupload.New(httpClient, datauploadAPIBaseURL, tc.authenticate)

err := cyberArkClient.PostDataReadingsWithOptions(ctx, tc.payload, tc.opts)
err := cyberArkClient.PutSnapshot(ctx, tc.snapshot)
tc.requireFn(t, err)
})
}
}

// TestPostDataReadingsWithOptionsWithRealAPI demonstrates that the dataupload code works with the real inventory API.
// TestCyberArkClient_PutSnapshot_RealAPI demonstrates that the dataupload code works with the real inventory API.
// An API token is obtained by authenticating with the ARK_USERNAME and ARK_SECRET from the environment.
// ARK_SUBDOMAIN should be your tenant subdomain.
//
Expand All @@ -131,8 +120,8 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
// To enable verbose request logging:
//
// go test ./pkg/internal/cyberark/dataupload/... \
// -v -count 1 -run TestPostDataReadingsWithOptionsWithRealAPI -args -testing.v 6
func TestPostDataReadingsWithOptionsWithRealAPI(t *testing.T) {
// -v -count 1 -run TestCyberArkClient_PutSnapshot_RealAPI -args -testing.v 6
func TestCyberArkClient_PutSnapshot_RealAPI(t *testing.T) {
subdomain := os.Getenv("ARK_SUBDOMAIN")
username := os.Getenv("ARK_USERNAME")
secret := os.Getenv("ARK_SECRET")
Expand All @@ -159,8 +148,8 @@ func TestPostDataReadingsWithOptionsWithRealAPI(t *testing.T) {
require.NoError(t, err)

cyberArkClient := dataupload.New(httpClient, services.DiscoveryContext.API, identityClient.AuthenticateRequest)
err = cyberArkClient.PostDataReadingsWithOptions(ctx, api.DataReadingsPost{}, dataupload.Options{
ClusterName: "bb068932-c80d-460d-88df-34bc7f3f3297",
err = cyberArkClient.PutSnapshot(ctx, dataupload.Snapshot{
ClusterID: "bb068932-c80d-460d-88df-34bc7f3f3297",
})
require.NoError(t, err)
}
20 changes: 17 additions & 3 deletions pkg/internal/cyberark/dataupload/mock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dataupload

import (
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/json"
Expand All @@ -10,6 +11,7 @@ import (
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
"k8s.io/client-go/transport"

"github.com/jetstack/preflight/pkg/version"
Expand Down Expand Up @@ -162,15 +164,27 @@ func (mds *mockDataUploadServer) handlePresignedUpload(w http.ResponseWriter, r
return
}

checksum := sha256.New()
_, _ = io.Copy(checksum, r.Body)
body, err := io.ReadAll(r.Body)
require.NoError(mds.t, err)

hash := sha256.New()
_, err = hash.Write(body)
require.NoError(mds.t, err)

// AWS S3 responds with a BadDigest error if the request body has a
// different checksum than the checksum supplied in the request header.
if amzChecksum != base64.StdEncoding.EncodeToString(checksum.Sum(nil)) {
if amzChecksum != base64.StdEncoding.EncodeToString(hash.Sum(nil)) {
w.Header().Set("Content-Type", "application/xml")
http.Error(w, amzExampleChecksumError, http.StatusBadRequest)
}

// Verifies that the new Snapshot format is used in the request body.
var snapshot Snapshot
d := json.NewDecoder(bytes.NewBuffer(body))
d.DisallowUnknownFields()
err = d.Decode(&snapshot)
require.NoError(mds.t, err)

// AWS S3 responds with an empty body if the PUT succeeds
w.WriteHeader(http.StatusOK)
}