Skip to content

Commit 8b9a233

Browse files
committed
Add a snapshot JSON format for uploads and convert DataReadings to Snapshot format
Signed-off-by: Richard Wall <[email protected]>
1 parent 373c4f8 commit 8b9a233

File tree

17 files changed

+441
-80
lines changed

17 files changed

+441
-80
lines changed

api/datareading.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package api
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"time"
7+
8+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9+
"k8s.io/apimachinery/pkg/version"
610
)
711

812
// DataReadingsPost is the payload in the upload request.
@@ -28,8 +32,8 @@ type DataReading struct {
2832
type GatheredResource struct {
2933
// Resource is a reference to a k8s object that was found by the informer
3034
// should be of type unstructured.Unstructured, raw Object
31-
Resource interface{}
32-
DeletedAt Time
35+
Resource interface{} `json:"resource"`
36+
DeletedAt Time `json:"deleted_at,omitempty"`
3337
}
3438

3539
func (v GatheredResource) MarshalJSON() ([]byte, error) {
@@ -48,3 +52,32 @@ func (v GatheredResource) MarshalJSON() ([]byte, error) {
4852

4953
return json.Marshal(data)
5054
}
55+
56+
func (v *GatheredResource) UnmarshalJSON(data []byte) error {
57+
var tmpResource struct {
58+
Resource *unstructured.Unstructured `json:"resource"`
59+
DeletedAt Time `json:"deleted_at,omitempty"`
60+
}
61+
62+
d := json.NewDecoder(bytes.NewReader(data))
63+
d.DisallowUnknownFields()
64+
65+
if err := d.Decode(&tmpResource); err != nil {
66+
return err
67+
}
68+
v.Resource = tmpResource.Resource
69+
v.DeletedAt = tmpResource.DeletedAt
70+
return nil
71+
}
72+
73+
// DynamicData is the DataReading.Data returned by the k8s.DataGathererDynamic
74+
// gatherer
75+
type DynamicData struct {
76+
Items []*GatheredResource `json:"items"`
77+
}
78+
79+
// DiscoveryData is the DataReading.Data returned by the k8s.ConfigDiscovery
80+
// gatherer
81+
type DiscoveryData struct {
82+
ServerVersion *version.Info `json:"server_version"`
83+
}

pkg/agent/run.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,14 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf
321321
var readings []*api.DataReading
322322

323323
if config.InputPath != "" {
324+
// TODO(wallrj): The datareadings read from disk can not yet be pushed
325+
// to the CyberArk Discovery and Context API. Why? Because they have
326+
// simple data types such as map[string]interface{}. In contrast, the
327+
// data from data gatherers can be cast to rich types like DynamicData
328+
// or DiscoveryData The CyberArk dataupload client requires the data to
329+
// have rich types to convert it to the Discovery and Context snapshots
330+
// format. Consider refactoring testutil.ParseDataReadings so that it
331+
// can be used here.
324332
log.V(logs.Debug).Info("Reading data from local file", "inputPath", config.InputPath)
325333
data, err := os.ReadFile(config.InputPath)
326334
if err != nil {

pkg/client/client_cyberark.go

Lines changed: 0 additions & 9 deletions
This file was deleted.

pkg/datagatherer/k8s/discovery.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"k8s.io/client-go/discovery"
88

9+
"github.com/jetstack/preflight/api"
910
"github.com/jetstack/preflight/pkg/datagatherer"
1011
)
1112

@@ -59,15 +60,12 @@ func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error {
5960

6061
// Fetch will fetch discovery data from the apiserver, or return an error
6162
func (g *DataGathererDiscovery) Fetch() (interface{}, int, error) {
62-
data, err := g.cl.ServerVersion()
63+
serverVersion, err := g.cl.ServerVersion()
6364
if err != nil {
6465
return nil, -1, fmt.Errorf("failed to get server version: %v", err)
6566
}
6667

67-
response := map[string]interface{}{
68-
// data has type Info: https://godoc.org/k8s.io/apimachinery/pkg/version#Info
69-
"server_version": data,
70-
}
71-
72-
return response, len(response), nil
68+
return &api.DiscoveryData{
69+
ServerVersion: serverVersion,
70+
}, 1, nil
7371
}

pkg/datagatherer/k8s/dynamic.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ func (g *DataGathererDynamic) Fetch() (interface{}, int, error) {
314314
return nil, -1, fmt.Errorf("resource type must be specified")
315315
}
316316

317-
var list = map[string]interface{}{}
318317
var items = []*api.GatheredResource{}
319318

320319
fetchNamespaces := g.namespaces
@@ -344,10 +343,9 @@ func (g *DataGathererDynamic) Fetch() (interface{}, int, error) {
344343
return nil, -1, err
345344
}
346345

347-
// add gathered resources to items
348-
list["items"] = items
349-
350-
return list, len(items), nil
346+
return &api.DynamicData{
347+
Items: items,
348+
}, len(items), nil
351349
}
352350

353351
func redactList(list []*api.GatheredResource, excludeAnnotKeys, excludeLabelKeys []*regexp.Regexp) error {

pkg/datagatherer/k8s/dynamic_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -730,15 +730,12 @@ func TestDynamicGatherer_Fetch(t *testing.T) {
730730
}
731731

732732
if tc.expected != nil {
733-
items, ok := res.(map[string]interface{})
733+
data, ok := res.(*api.DynamicData)
734734
if !ok {
735-
t.Errorf("expected result be an map[string]interface{} but wasn't")
735+
t.Errorf("expected result be *api.DynamicData but wasn't")
736736
}
737737

738-
list, ok := items["items"].([]*api.GatheredResource)
739-
if !ok {
740-
t.Errorf("expected result be an []*api.GatheredResource but wasn't")
741-
}
738+
list := data.Items
742739
// sorting list of results by name
743740
sortGatheredResources(list)
744741
// sorting list of expected results by name
@@ -1045,10 +1042,9 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) {
10451042
}
10461043

10471044
if tc.expected != nil {
1048-
res, ok := rawRes.(map[string]interface{})
1049-
require.Truef(t, ok, "expected result be an map[string]interface{} but wasn't")
1050-
actual := res["items"].([]*api.GatheredResource)
1051-
require.Truef(t, ok, "expected result be an []*api.GatheredResource but wasn't")
1045+
res, ok := rawRes.(*api.DynamicData)
1046+
require.Truef(t, ok, "expected result be an *api.DynamicData but wasn't")
1047+
actual := res.Items
10521048

10531049
// sorting list of results by name
10541050
sortGatheredResources(actual)

pkg/datagatherer/k8s/fieldfilter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ var SecretSelectedFields = []FieldPath{
1616
{"metadata", "ownerReferences"},
1717
{"metadata", "selfLink"},
1818
{"metadata", "uid"},
19+
{"metadata", "creationTimestamp"},
20+
{"metadata", "deletionTimestamp"},
21+
{"metadata", "resourceVersion"},
1922

2023
{"type"},
2124
{"data", "tls.crt"},

pkg/internal/cyberark/dataupload/dataupload.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,19 @@ func NewCyberArkClient(trustedCAs *x509.CertPool, baseURL string, authenticateRe
5858
// PostDataReadingsWithOptions PUTs the supplied payload to an [AWS presigned URL] which it obtains via the CyberArk inventory API.
5959
//
6060
// [AWS presigned URL]: https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
61-
func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payload api.DataReadingsPost, opts Options) error {
61+
func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
6262
if opts.ClusterName == "" {
6363
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
6464
}
6565

66+
snapshot, err := convertDataReadingsToCyberarkSnapshot(readings)
67+
if err != nil {
68+
return fmt.Errorf("while converting datareadings to Cyberark snapshot format: %s", err)
69+
}
70+
6671
encodedBody := &bytes.Buffer{}
6772
checksum := sha3.New256()
68-
if err := json.NewEncoder(io.MultiWriter(encodedBody, checksum)).Encode(payload); err != nil {
73+
if err := json.NewEncoder(io.MultiWriter(encodedBody, checksum)).Encode(snapshot); err != nil {
6974
return err
7075
}
7176

pkg/internal/cyberark/dataupload/dataupload_test.go

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dataupload_test
33
import (
44
"crypto/x509"
55
"encoding/pem"
6+
"errors"
67
"fmt"
78
"net/http"
89
"os"
@@ -17,28 +18,23 @@ import (
1718
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
1819
"github.com/jetstack/preflight/pkg/internal/cyberark/identity"
1920
"github.com/jetstack/preflight/pkg/internal/cyberark/servicediscovery"
21+
"github.com/jetstack/preflight/pkg/testutil"
2022

2123
_ "k8s.io/klog/v2/ktesting/init"
2224
)
2325

24-
func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
26+
func TestCyberArkClient_PostDataReadingsWithOptions_MockAPI(t *testing.T) {
2527
fakeTime := time.Unix(123, 0)
26-
defaultPayload := api.DataReadingsPost{
27-
AgentMetadata: &api.AgentMetadata{
28-
Version: "test-version",
29-
ClusterID: "test",
30-
},
31-
DataGatherTime: fakeTime,
32-
DataReadings: []*api.DataReading{
33-
{
34-
ClusterID: "success-cluster-id",
35-
DataGatherer: "test-gatherer",
36-
Timestamp: api.Time{Time: fakeTime},
37-
Data: map[string]interface{}{"test": "data"},
38-
SchemaVersion: "v1",
39-
},
28+
defaultDataReadings := []*api.DataReading{
29+
{
30+
ClusterID: "success-cluster-id",
31+
DataGatherer: "test-gatherer",
32+
Timestamp: api.Time{Time: fakeTime},
33+
Data: map[string]interface{}{"test": "data"},
34+
SchemaVersion: "v1",
4035
},
4136
}
37+
4238
defaultOpts := dataupload.Options{
4339
ClusterName: "success-cluster-id",
4440
}
@@ -52,14 +48,14 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
5248

5349
tests := []struct {
5450
name string
55-
payload api.DataReadingsPost
51+
readings []*api.DataReading
5652
authenticate func(req *http.Request) error
5753
opts dataupload.Options
5854
requireFn func(t *testing.T, err error)
5955
}{
6056
{
6157
name: "successful upload",
62-
payload: defaultPayload,
58+
readings: defaultDataReadings,
6359
opts: defaultOpts,
6460
authenticate: setToken("success-token"),
6561
requireFn: func(t *testing.T, err error) {
@@ -68,7 +64,7 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
6864
},
6965
{
7066
name: "error when cluster name is empty",
71-
payload: defaultPayload,
67+
readings: defaultDataReadings,
7268
opts: dataupload.Options{ClusterName: ""},
7369
authenticate: setToken("success-token"),
7470
requireFn: func(t *testing.T, err error) {
@@ -77,16 +73,27 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
7773
},
7874
{
7975
name: "error when bearer token is incorrect",
80-
payload: defaultPayload,
76+
readings: defaultDataReadings,
8177
opts: defaultOpts,
8278
authenticate: setToken("fail-token"),
8379
requireFn: func(t *testing.T, err error) {
8480
require.ErrorContains(t, err, "while retrieving snapshot upload URL: received response with status code 500: should authenticate using the correct bearer token")
8581
},
8682
},
83+
{
84+
name: "error contains authenticate error",
85+
readings: defaultDataReadings,
86+
opts: defaultOpts,
87+
authenticate: func(_ *http.Request) error {
88+
return errors.New("simulated-authenticate-error")
89+
},
90+
requireFn: func(t *testing.T, err error) {
91+
require.ErrorContains(t, err, "while retrieving snapshot upload URL: failed to authenticate request: simulated-authenticate-error")
92+
},
93+
},
8794
{
8895
name: "invalid JSON from server (RetrievePresignedUploadURL step)",
89-
payload: defaultPayload,
96+
readings: defaultDataReadings,
9097
opts: dataupload.Options{ClusterName: "invalid-json-retrieve-presigned"},
9198
authenticate: setToken("success-token"),
9299
requireFn: func(t *testing.T, err error) {
@@ -95,7 +102,7 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
95102
},
96103
{
97104
name: "500 from server (RetrievePresignedUploadURL step)",
98-
payload: defaultPayload,
105+
readings: defaultDataReadings,
99106
opts: dataupload.Options{ClusterName: "invalid-response-post-data"},
100107
authenticate: setToken("success-token"),
101108
requireFn: func(t *testing.T, err error) {
@@ -106,6 +113,9 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
106113

107114
for _, tc := range tests {
108115
t.Run(tc.name, func(t *testing.T) {
116+
logger := ktesting.NewLogger(t, ktesting.DefaultConfig)
117+
ctx := klog.NewContext(t.Context(), logger)
118+
109119
server := dataupload.MockDataUploadServer()
110120
defer server.Close()
111121

@@ -118,22 +128,22 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
118128
cyberArkClient, err := dataupload.NewCyberArkClient(certPool, server.Server.URL, tc.authenticate)
119129
require.NoError(t, err)
120130

121-
err = cyberArkClient.PostDataReadingsWithOptions(t.Context(), tc.payload, tc.opts)
131+
err = cyberArkClient.PostDataReadingsWithOptions(ctx, tc.readings, tc.opts)
122132
tc.requireFn(t, err)
123133
})
124134
}
125135
}
126136

127-
// TestPostDataReadingsWithOptionsWithRealAPI demonstrates that the dataupload code works with the real inventory API.
137+
// TestCyberArkClient_PostDataReadingsWithOptions_RealAPI demonstrates that the dataupload code works with the real inventory API.
128138
// An API token is obtained by authenticating with the ARK_USERNAME and ARK_SECRET from the environment.
129139
// ARK_SUBDOMAIN should be your tenant subdomain.
130140
// ARK_PLATFORM_DOMAIN should be either integration-cyberark.cloud or cyberark.cloud
131141
//
132142
// To enable verbose request logging:
133143
//
134144
// go test ./pkg/internal/cyberark/dataupload/... \
135-
// -v -count 1 -run TestPostDataReadingsWithOptionsWithRealAPI -args -testing.v 6
136-
func TestPostDataReadingsWithOptionsWithRealAPI(t *testing.T) {
145+
// -v -count 1 -run TestCyberArkClient_PostDataReadingsWithOptions_RealAPI -args -testing.v 6
146+
func TestCyberArkClient_PostDataReadingsWithOptions_RealAPI(t *testing.T) {
137147
platformDomain := os.Getenv("ARK_PLATFORM_DOMAIN")
138148
subdomain := os.Getenv("ARK_SUBDOMAIN")
139149
username := os.Getenv("ARK_USERNAME")
@@ -172,8 +182,13 @@ func TestPostDataReadingsWithOptionsWithRealAPI(t *testing.T) {
172182
cyberArkClient, err := dataupload.NewCyberArkClient(nil, serviceURL, identityClient.AuthenticateRequest)
173183
require.NoError(t, err)
174184

175-
err = cyberArkClient.PostDataReadingsWithOptions(ctx, api.DataReadingsPost{}, dataupload.Options{
176-
ClusterName: "bb068932-c80d-460d-88df-34bc7f3f3297",
177-
})
185+
dataReadings := testutil.ParseDataReadings(t, testutil.ReadGZIP(t, "testdata/example-1/datareadings.json.gz"))
186+
err = cyberArkClient.PostDataReadingsWithOptions(
187+
ctx,
188+
dataReadings,
189+
dataupload.Options{
190+
ClusterName: "bb068932-c80d-460d-88df-34bc7f3f3297",
191+
},
192+
)
178193
require.NoError(t, err)
179194
}

0 commit comments

Comments
 (0)