Skip to content

[VC-43753] Update the dataupload package to be compatible with the inventory API #681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions pkg/internal/cyberark/dataupload/dataupload.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ const (
// maxRetrievePresignedUploadURLBodySize is the maximum allowed size for a response body from the
// Retrieve Presigned Upload URL service.
maxRetrievePresignedUploadURLBodySize = 10 * 1024

// apiPathSnapshotLinks is the URL path of the snapshot-links endpoint of the inventory API.
// This endpoint returns an AWS presigned URL.
// TODO(wallrj): Link to CyberArk API documentation when it is published.
apiPathSnapshotLinks = "/api/ingestions/kubernetes/snapshot-links"
)

type CyberArkClient struct {
Expand All @@ -32,8 +37,7 @@ type CyberArkClient struct {
}

type Options struct {
ClusterName string
ClusterDescription string
ClusterName string
}

func NewCyberArkClient(trustedCAs *x509.CertPool, baseURL string, authenticateRequest func(req *http.Request) error) (*CyberArkClient, error) {
Expand All @@ -51,6 +55,9 @@ func NewCyberArkClient(trustedCAs *x509.CertPool, baseURL string, authenticateRe
}, nil
}

// PostDataReadingsWithOptions PUTs the supplied payload to an [AWS presigned URL] which it obtains via the CyberArk inventory API.
//
// [AWS presigned URL]: https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payload api.DataReadingsPost, opts Options) error {
if opts.ClusterName == "" {
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
Expand All @@ -64,15 +71,15 @@ func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payloa

presignedUploadURL, err := c.retrievePresignedUploadURL(ctx, hex.EncodeToString(checksum.Sum(nil)), opts)
if err != nil {
return err
return fmt.Errorf("while retrieving snapshot upload URL: %s", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra context around the error helped me know which part of the post operation was failing, during testing. It will also be helpful for us to diagnose problems if customers encounter this error path in the field.
I updated the unit tests accordingly.

}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, presignedUploadURL, encodedBody)
// The snapshot-links endpoint returns an AWS presigned URL which only supports the PUT verb.
req, err := http.NewRequestWithContext(ctx, http.MethodPut, presignedUploadURL, encodedBody)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS API is very picky about the request headers. With this content type header, the upload fails with the following error:

=== RUN   TestPostDataReadingsWithOptionsWithRealAPI
    identity.go:328: I0806 17:48:18.208472] made successful request to StartAuthentication source="Identity.doStartAuthentication" summary="NewPackage"
    identity.go:444: I0806 17:48:18.599177] successfully completed AdvanceAuthentication request to CyberArk Identity; login complete username="[email protected]"
    dataupload_test.go:172:
                Error Trace:    /home/richard/projects/jetstack/jetstack-secure/pkg/internal/cyberark/dataupload/dataupload_test.go:172
                Error:          Received unexpected error:
                                received response with status code 403: <?xml version="1.0" encoding="UTF-8"?>
                                <Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>ASIAR53UX6ITK7A63SVE</AWSAccessKeyId><StringToSign>PUT

                                application/json
                                1754499204
                                x-amz-security-token:IQoJb3JpZ2luX2VjEEEaCXVzLWVhc3QtMSJGMEQCID5ReOMYdbV6+xXUg1buIf83rg/BzGYJJMuBXB+CRk7ZAiBNC/fNLgruAsTg1NUkcdJ+Na63tmasBmp96D/GA4V8virTAwh6EAAaDDEzMjg1MTk1NDIxNCIM6RQ9ZMY
                Test:           TestPostDataReadingsWithOptionsWithRealAPI
--- FAIL: TestPostDataReadingsWithOptionsWithRealAPI (8.32s)
FAIL
FAIL    github.com/jetstack/preflight/pkg/internal/cyberark/dataupload  8.336s
FAIL

version.SetUserAgent(req)

res, err := c.client.Do(req)
Expand All @@ -93,19 +100,19 @@ func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payloa
}

func (c *CyberArkClient) retrievePresignedUploadURL(ctx context.Context, checksum string, opts Options) (string, error) {
uploadURL, err := url.JoinPath(c.baseURL, "/api/data/kubernetes/upload")
uploadURL, err := url.JoinPath(c.baseURL, apiPathSnapshotLinks)
if err != nil {
return "", err
}

request := struct {
ClusterID string `json:"cluster_id"`
ClusterDescription string `json:"cluster_description"`
Checksum string `json:"checksum_sha256"`
ClusterID string `json:"cluster_id"`
Checksum string `json:"checksum_sha256"`
AgentVersion string `json:"agent_version"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed ClusterDescription and added AgentVersion, as per the API design.

}{
ClusterID: opts.ClusterName,
ClusterDescription: opts.ClusterDescription,
Checksum: checksum,
ClusterID: opts.ClusterName,
Checksum: checksum,
AgentVersion: version.PreflightVersion,
}

encodedBody := &bytes.Buffer{}
Expand All @@ -120,7 +127,7 @@ func (c *CyberArkClient) retrievePresignedUploadURL(ctx context.Context, checksu

req.Header.Set("Content-Type", "application/json")
if err := c.authenticateRequest(req); err != nil {
return "", fmt.Errorf("failed to authenticate request")
return "", fmt.Errorf("failed to authenticate request: %s", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to see the authentication error here to know what has failed.
The only error from the current implementation is no token cached, which means that you've forgotten to call Login on the identity client.

}
version.SetUserAgent(req)

Expand Down
69 changes: 61 additions & 8 deletions pkg/internal/cyberark/dataupload/dataupload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import (
"encoding/pem"
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"

"github.com/jetstack/preflight/api"
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
"github.com/jetstack/preflight/pkg/internal/cyberark/identity"
"github.com/jetstack/preflight/pkg/internal/cyberark/servicediscovery"
)

func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
Expand All @@ -33,8 +38,7 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
},
}
defaultOpts := dataupload.Options{
ClusterName: "success-cluster-id",
ClusterDescription: "success-cluster-description",
ClusterName: "success-cluster-id",
}

setToken := func(token string) func(*http.Request) error {
Expand Down Expand Up @@ -75,25 +79,25 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
opts: defaultOpts,
authenticate: setToken("fail-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "received response with status code 500: should authenticate using the correct bearer token")
require.ErrorContains(t, err, "while retrieving snapshot upload URL: received response with status code 500: should authenticate using the correct bearer token")
},
},
{
name: "invalid JSON from server (RetrievePresignedUploadURL step)",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: "invalid-json-retrieve-presigned", ClusterDescription: defaultOpts.ClusterDescription},
opts: dataupload.Options{ClusterName: "invalid-json-retrieve-presigned"},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "rejecting JSON response from server as it was too large or was truncated")
require.ErrorContains(t, err, "while retrieving snapshot upload URL: rejecting JSON response from server as it was too large or was truncated")
},
},
{
name: "500 from server (PostData step)",
name: "500 from server (RetrievePresignedUploadURL step)",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: "invalid-response-post-data", ClusterDescription: defaultOpts.ClusterDescription},
opts: dataupload.Options{ClusterName: "invalid-response-post-data"},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "received response with status code 500: mock error")
require.ErrorContains(t, err, "while retrieving snapshot upload URL: received response with status code 500: mock error")
},
},
}
Expand All @@ -117,3 +121,52 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
})
}
}

// TestPostDataReadingsWithOptionsWithRealAPI demonstrates that the dataupload code works with the real inventory API.
// An API token is obtained by authenticating with the ARK_USERNAME and ARK_SECRET from the environment.
// ARK_SUBDOMAIN should be your tenant subdomain.
// ARK_PLATFORM_DOMAIN should be either integration-cyberark.cloud or cyberark.cloud
func TestPostDataReadingsWithOptionsWithRealAPI(t *testing.T) {
platformDomain := os.Getenv("ARK_PLATFORM_DOMAIN")
subdomain := os.Getenv("ARK_SUBDOMAIN")
username := os.Getenv("ARK_USERNAME")
secret := os.Getenv("ARK_SECRET")

if platformDomain == "" || subdomain == "" || username == "" || secret == "" {
t.Skip("Skipping because one of the following environment variables is unset or empty: ARK_PLATFORM_DOMAIN, ARK_SUBDOMAIN, ARK_USERNAME, ARK_SECRET")
return
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The skip message looks like this:

$ go test ./pkg/internal/cyberark/dataupload/... -run Real -v
warning: both GOPATH and GOROOT are the same directory (/home/richard/go); see https://go.dev/wiki/InstallTroubleshooting
=== RUN   TestPostDataReadingsWithOptionsWithRealAPI
    dataupload_test.go:137: Skipping because one of the following environment variables is unset or empty: ARK_PLATFORM_DOMAIN, ARK_SUBDOMAIN, ARK_USERNAME, ARK_SECRET
--- SKIP: TestPostDataReadingsWithOptionsWithRealAPI (0.00s)
PASS
ok      github.com/jetstack/preflight/pkg/internal/cyberark/dataupload  0.009s


logger := ktesting.NewLogger(t, ktesting.NewConfig())
ctx := klog.NewContext(t.Context(), logger)

const (
discoveryContextServiceName = "inventory"
separator = "."
)

serviceURL := fmt.Sprintf("https://%s%s%s.%s", subdomain, separator, discoveryContextServiceName, platformDomain)

var (
identityClient *identity.Client
err error
)
if platformDomain == "cyberark.cloud" {
identityClient, err = identity.New(ctx, subdomain)
} else {
discoveryClient := servicediscovery.New(servicediscovery.WithIntegrationEndpoint())
identityClient, err = identity.NewWithDiscoveryClient(ctx, discoveryClient, subdomain)
}
require.NoError(t, err)

err = identityClient.LoginUsernamePassword(ctx, username, []byte(secret))
require.NoError(t, err)

cyberArkClient, err := dataupload.NewCyberArkClient(nil, serviceURL, identityClient.AuthenticateRequest)
require.NoError(t, err)

err = cyberArkClient.PostDataReadingsWithOptions(t.Context(), api.DataReadingsPost{}, dataupload.Options{
ClusterName: "bb068932-c80d-460d-88df-34bc7f3f3297",
})
require.NoError(t, err)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A successful run of this test looks like this:

$ go test ./pkg/internal/cyberark/dataupload/... -run Real -v  -test.v 6
warning: both GOPATH and GOROOT are the same directory (/home/richard/go); see https://go.dev/wiki/InstallTroubleshooting
=== RUN   TestPostDataReadingsWithOptionsReal
    identity.go:328: I0806 17:14:42.059639] made successful request to StartAuthentication source="Identity.doStartAuthentication" summary="NewPackage"
    identity.go:444: I0806 17:14:42.438060] successfully completed AdvanceAuthentication request to CyberArk Identity; login complete username="********@cyberark.cloud.****"
--- PASS: TestPostDataReadingsWithOptionsReal (2.67s)
PASS
ok      github.com/jetstack/preflight/pkg/internal/cyberark/dataupload  2.681s

23 changes: 9 additions & 14 deletions pkg/internal/cyberark/dataupload/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -16,8 +17,7 @@ import (
const (
successBearerToken = "success-token"

successClusterID = "success-cluster-id"
successClusterDescription = "success-cluster-description"
successClusterID = "success-cluster-id"
)

type mockDataUploadServer struct {
Expand All @@ -37,7 +37,7 @@ func (mds *mockDataUploadServer) Close() {

func (mds *mockDataUploadServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/data/kubernetes/upload":
case apiPathSnapshotLinks:
mds.handlePresignedUpload(w, r)
return
case "/presigned-upload":
Expand Down Expand Up @@ -80,17 +80,17 @@ func (mds *mockDataUploadServer) handlePresignedUpload(w http.ResponseWriter, r
}

var req struct {
ClusterID string `json:"cluster_id"`
ClusterDescription string `json:"Cluster_description"`
Checksum string `json:"checksum_sha256"`
ClusterID string `json:"cluster_id"`
Checksum string `json:"checksum_sha256"`
AgentVersion string `json:"agent_version"`
}
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, "failed to unmarshal post body", http.StatusInternalServerError)
return
}

if req.ClusterDescription != successClusterDescription {
http.Error(w, "post body contains unexpected description", http.StatusInternalServerError)
if req.AgentVersion != version.PreflightVersion {
http.Error(w, fmt.Sprintf("post body contains unexpected agent version: %s", req.AgentVersion), http.StatusInternalServerError)
return
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func (mds *mockDataUploadServer) handlePresignedUpload(w http.ResponseWriter, r
}

func (mds *mockDataUploadServer) handleUpload(w http.ResponseWriter, r *http.Request, invalidJSON bool) {
if r.Method != http.MethodPost {
if r.Method != http.MethodPut {
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = w.Write([]byte(`{"message":"method not allowed"}`))
return
Expand All @@ -134,11 +134,6 @@ func (mds *mockDataUploadServer) handleUpload(w http.ResponseWriter, r *http.Req
return
}

if r.Header.Get("Content-Type") != "application/json" {
http.Error(w, "should send JSON on all requests", http.StatusInternalServerError)
return
}

if invalidJSON {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
Expand Down
4 changes: 3 additions & 1 deletion pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ var GoVersion string

// UserAgent return a standard user agent for use with all HTTP requests. This is implemented in one place so
// it's uniform across the Kubernetes Agent.
//
// TODO(wallrj): The prefix "Mozilla/5.0" is currently required by the CyberArk inventory API. Remove the prefix when CyberArk relax the API security settings.
func UserAgent() string {
return fmt.Sprintf("venafi-kubernetes-agent/%s", PreflightVersion)
return fmt.Sprintf("Mozilla/5.0 venafi-kubernetes-agent/%s", PreflightVersion)
}

// SetUserAgent augments an http.Request with a standard user agent.
Expand Down