Skip to content

Commit e8f50da

Browse files
wallrjwallrj-cyberark
authored andcommitted
Add a snapshot JSON format for uploads and convert DataReadings to Snapshot format
Signed-off-by: Richard Wall <[email protected]>
1 parent bc5648a commit e8f50da

File tree

16 files changed

+560
-57
lines changed

16 files changed

+560
-57
lines changed

api/datareading.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package api
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"time"
7+
8+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9+
"k8s.io/apimachinery/pkg/version"
610
)
711

812
// DataReadingsPost is the payload in the upload request.
@@ -28,8 +32,8 @@ type DataReading struct {
2832
type GatheredResource struct {
2933
// Resource is a reference to a k8s object that was found by the informer
3034
// should be of type unstructured.Unstructured, raw Object
31-
Resource interface{}
32-
DeletedAt Time
35+
Resource interface{} `json:"resource"`
36+
DeletedAt Time `json:"deleted_at,omitempty"`
3337
}
3438

3539
func (v GatheredResource) MarshalJSON() ([]byte, error) {
@@ -48,3 +52,32 @@ func (v GatheredResource) MarshalJSON() ([]byte, error) {
4852

4953
return json.Marshal(data)
5054
}
55+
56+
func (v *GatheredResource) UnmarshalJSON(data []byte) error {
57+
var tmpResource struct {
58+
Resource *unstructured.Unstructured `json:"resource"`
59+
DeletedAt Time `json:"deleted_at,omitempty"`
60+
}
61+
62+
d := json.NewDecoder(bytes.NewReader(data))
63+
d.DisallowUnknownFields()
64+
65+
if err := d.Decode(&tmpResource); err != nil {
66+
return err
67+
}
68+
v.Resource = tmpResource.Resource
69+
v.DeletedAt = tmpResource.DeletedAt
70+
return nil
71+
}
72+
73+
// DynamicData is the DataReading.Data returned by the k8s.DataGathererDynamic
74+
// gatherer
75+
type DynamicData struct {
76+
Items []*GatheredResource `json:"items"`
77+
}
78+
79+
// DiscoveryData is the DataReading.Data returned by the k8s.ConfigDiscovery
80+
// gatherer
81+
type DiscoveryData struct {
82+
ServerVersion *version.Info `json:"server_version"`
83+
}

pkg/agent/run.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,14 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf
305305
var readings []*api.DataReading
306306

307307
if config.InputPath != "" {
308+
// TODO(wallrj): The datareadings read from disk can not yet be pushed
309+
// to the CyberArk Discovery and Context API. Why? Because they have
310+
// simple data types such as map[string]interface{}. In contrast, the
311+
// data from data gatherers can be cast to rich types like DynamicData
312+
// or DiscoveryData The CyberArk dataupload client requires the data to
313+
// have rich types to convert it to the Discovery and Context snapshots
314+
// format. Consider refactoring testutil.ParseDataReadings so that it
315+
// can be used here.
308316
log.V(logs.Debug).Info("Reading data from local file", "inputPath", config.InputPath)
309317
data, err := os.ReadFile(config.InputPath)
310318
if err != nil {

pkg/client/client_cyberark.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readin
3838
return err
3939
}
4040

41-
err = datauploadClient.PostDataReadingsWithOptions(ctx, api.DataReadingsPost{}, dataupload.Options{
41+
err = datauploadClient.PostDataReadingsWithOptions(ctx, readings, dataupload.Options{
4242
ClusterName: "bb068932-c80d-460d-88df-34bc7f3f3297",
4343
})
4444
if err != nil {

pkg/datagatherer/k8s/discovery.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"k8s.io/client-go/discovery"
88

9+
"github.com/jetstack/preflight/api"
910
"github.com/jetstack/preflight/pkg/datagatherer"
1011
)
1112

@@ -59,15 +60,12 @@ func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error {
5960

6061
// Fetch will fetch discovery data from the apiserver, or return an error
6162
func (g *DataGathererDiscovery) Fetch() (interface{}, int, error) {
62-
data, err := g.cl.ServerVersion()
63+
serverVersion, err := g.cl.ServerVersion()
6364
if err != nil {
6465
return nil, -1, fmt.Errorf("failed to get server version: %v", err)
6566
}
6667

67-
response := map[string]interface{}{
68-
// data has type Info: https://godoc.org/k8s.io/apimachinery/pkg/version#Info
69-
"server_version": data,
70-
}
71-
72-
return response, len(response), nil
68+
return &api.DiscoveryData{
69+
ServerVersion: serverVersion,
70+
}, 1, nil
7371
}

pkg/datagatherer/k8s/dynamic.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ func (g *DataGathererDynamic) Fetch() (interface{}, int, error) {
314314
return nil, -1, fmt.Errorf("resource type must be specified")
315315
}
316316

317-
var list = map[string]interface{}{}
318317
var items = []*api.GatheredResource{}
319318

320319
fetchNamespaces := g.namespaces
@@ -344,10 +343,9 @@ func (g *DataGathererDynamic) Fetch() (interface{}, int, error) {
344343
return nil, -1, err
345344
}
346345

347-
// add gathered resources to items
348-
list["items"] = items
349-
350-
return list, len(items), nil
346+
return &api.DynamicData{
347+
Items: items,
348+
}, len(items), nil
351349
}
352350

353351
// redactList removes sensitive and superfluous data from the supplied resource list.

pkg/datagatherer/k8s/dynamic_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -730,15 +730,12 @@ func TestDynamicGatherer_Fetch(t *testing.T) {
730730
}
731731

732732
if tc.expected != nil {
733-
items, ok := res.(map[string]interface{})
733+
data, ok := res.(*api.DynamicData)
734734
if !ok {
735-
t.Errorf("expected result be an map[string]interface{} but wasn't")
735+
t.Errorf("expected result be *api.DynamicData but wasn't")
736736
}
737737

738-
list, ok := items["items"].([]*api.GatheredResource)
739-
if !ok {
740-
t.Errorf("expected result be an []*api.GatheredResource but wasn't")
741-
}
738+
list := data.Items
742739
// sorting list of results by name
743740
sortGatheredResources(list)
744741
// sorting list of expected results by name
@@ -1045,10 +1042,9 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) {
10451042
}
10461043

10471044
if tc.expected != nil {
1048-
res, ok := rawRes.(map[string]interface{})
1049-
require.Truef(t, ok, "expected result be an map[string]interface{} but wasn't")
1050-
actual := res["items"].([]*api.GatheredResource)
1051-
require.Truef(t, ok, "expected result be an []*api.GatheredResource but wasn't")
1045+
res, ok := rawRes.(*api.DynamicData)
1046+
require.Truef(t, ok, "expected result be an *api.DynamicData but wasn't")
1047+
actual := res.Items
10521048

10531049
// sorting list of results by name
10541050
sortGatheredResources(actual)

pkg/internal/cyberark/dataupload/dataupload.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,19 @@ func NewCyberArkClient(trustedCAs *x509.CertPool, baseURL string, authenticateRe
7070
// If you omit that header, it is possible to PUT any data.
7171
// There is a work around listed in that issue which we have shared with the
7272
// CyberArk API team.
73-
func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payload api.DataReadingsPost, opts Options) error {
73+
func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
7474
if opts.ClusterName == "" {
7575
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
7676
}
7777

78+
snapshot, err := convertDataReadingsToCyberarkSnapshot(readings)
79+
if err != nil {
80+
return fmt.Errorf("while converting datareadings to Cyberark snapshot format: %s", err)
81+
}
82+
7883
encodedBody := &bytes.Buffer{}
7984
hash := sha256.New()
80-
if err := json.NewEncoder(io.MultiWriter(encodedBody, hash)).Encode(payload); err != nil {
85+
if err := json.NewEncoder(io.MultiWriter(encodedBody, hash)).Encode(snapshot); err != nil {
8186
return err
8287
}
8388
checksum := hash.Sum(nil)

pkg/internal/cyberark/dataupload/dataupload_test.go

Lines changed: 98 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,38 @@ package dataupload_test
33
import (
44
"crypto/x509"
55
"encoding/pem"
6+
"errors"
67
"fmt"
78
"net/http"
9+
"os"
810
"testing"
911
"time"
1012

1113
"github.com/stretchr/testify/require"
1214

1315
"github.com/jetstack/preflight/api"
1416
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
17+
"github.com/jetstack/preflight/pkg/internal/cyberark/identity"
18+
"github.com/jetstack/preflight/pkg/internal/cyberark/servicediscovery"
19+
"github.com/jetstack/preflight/pkg/testutil"
20+
21+
"k8s.io/klog/v2"
22+
"k8s.io/klog/v2/ktesting"
23+
_ "k8s.io/klog/v2/ktesting/init"
1524
)
1625

17-
func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
26+
func TestCyberArkClient_PostDataReadingsWithOptions_MockAPI(t *testing.T) {
1827
fakeTime := time.Unix(123, 0)
19-
defaultPayload := api.DataReadingsPost{
20-
AgentMetadata: &api.AgentMetadata{
21-
Version: "test-version",
22-
ClusterID: "test",
23-
},
24-
DataGatherTime: fakeTime,
25-
DataReadings: []*api.DataReading{
26-
{
27-
ClusterID: "success-cluster-id",
28-
DataGatherer: "test-gatherer",
29-
Timestamp: api.Time{Time: fakeTime},
30-
Data: map[string]interface{}{"test": "data"},
31-
SchemaVersion: "v1",
32-
},
28+
defaultDataReadings := []*api.DataReading{
29+
{
30+
ClusterID: "success-cluster-id",
31+
DataGatherer: "test-gatherer",
32+
Timestamp: api.Time{Time: fakeTime},
33+
Data: map[string]interface{}{"test": "data"},
34+
SchemaVersion: "v1",
3335
},
3436
}
37+
3538
defaultOpts := dataupload.Options{
3639
ClusterName: "success-cluster-id",
3740
}
@@ -45,14 +48,14 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
4548

4649
tests := []struct {
4750
name string
48-
payload api.DataReadingsPost
51+
readings []*api.DataReading
4952
authenticate func(req *http.Request) error
5053
opts dataupload.Options
5154
requireFn func(t *testing.T, err error)
5255
}{
5356
{
5457
name: "successful upload",
55-
payload: defaultPayload,
58+
readings: defaultDataReadings,
5659
opts: defaultOpts,
5760
authenticate: setToken("success-token"),
5861
requireFn: func(t *testing.T, err error) {
@@ -61,7 +64,7 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
6164
},
6265
{
6366
name: "error when cluster name is empty",
64-
payload: defaultPayload,
67+
readings: defaultDataReadings,
6568
opts: dataupload.Options{ClusterName: ""},
6669
authenticate: setToken("success-token"),
6770
requireFn: func(t *testing.T, err error) {
@@ -70,16 +73,27 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
7073
},
7174
{
7275
name: "error when bearer token is incorrect",
73-
payload: defaultPayload,
76+
readings: defaultDataReadings,
7477
opts: defaultOpts,
7578
authenticate: setToken("fail-token"),
7679
requireFn: func(t *testing.T, err error) {
7780
require.ErrorContains(t, err, "while retrieving snapshot upload URL: received response with status code 500: should authenticate using the correct bearer token")
7881
},
7982
},
83+
{
84+
name: "error contains authenticate error",
85+
readings: defaultDataReadings,
86+
opts: defaultOpts,
87+
authenticate: func(_ *http.Request) error {
88+
return errors.New("simulated-authenticate-error")
89+
},
90+
requireFn: func(t *testing.T, err error) {
91+
require.ErrorContains(t, err, "while retrieving snapshot upload URL: failed to authenticate request: simulated-authenticate-error")
92+
},
93+
},
8094
{
8195
name: "invalid JSON from server (RetrievePresignedUploadURL step)",
82-
payload: defaultPayload,
96+
readings: defaultDataReadings,
8397
opts: dataupload.Options{ClusterName: "invalid-json-retrieve-presigned"},
8498
authenticate: setToken("success-token"),
8599
requireFn: func(t *testing.T, err error) {
@@ -88,7 +102,7 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
88102
},
89103
{
90104
name: "500 from server (RetrievePresignedUploadURL step)",
91-
payload: defaultPayload,
105+
readings: defaultDataReadings,
92106
opts: dataupload.Options{ClusterName: "invalid-response-post-data"},
93107
authenticate: setToken("success-token"),
94108
requireFn: func(t *testing.T, err error) {
@@ -99,6 +113,9 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
99113

100114
for _, tc := range tests {
101115
t.Run(tc.name, func(t *testing.T) {
116+
logger := ktesting.NewLogger(t, ktesting.DefaultConfig)
117+
ctx := klog.NewContext(t.Context(), logger)
118+
102119
server := dataupload.MockDataUploadServer()
103120
defer server.Close()
104121

@@ -111,8 +128,67 @@ func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
111128
cyberArkClient, err := dataupload.NewCyberArkClient(certPool, server.Server.URL, tc.authenticate)
112129
require.NoError(t, err)
113130

114-
err = cyberArkClient.PostDataReadingsWithOptions(t.Context(), tc.payload, tc.opts)
131+
err = cyberArkClient.PostDataReadingsWithOptions(ctx, tc.readings, tc.opts)
115132
tc.requireFn(t, err)
116133
})
117134
}
118135
}
136+
137+
// TestCyberArkClient_PostDataReadingsWithOptions_RealAPI demonstrates that the dataupload code works with the real inventory API.
138+
// An API token is obtained by authenticating with the ARK_USERNAME and ARK_SECRET from the environment.
139+
// ARK_SUBDOMAIN should be your tenant subdomain.
140+
// ARK_PLATFORM_DOMAIN should be either integration-cyberark.cloud or cyberark.cloud
141+
//
142+
// To enable verbose request logging:
143+
//
144+
// go test ./pkg/internal/cyberark/dataupload/... \
145+
// -v -count 1 -run TestCyberArkClient_PostDataReadingsWithOptions_RealAPI -args -testing.v 6
146+
func TestCyberArkClient_PostDataReadingsWithOptions_RealAPI(t *testing.T) {
147+
platformDomain := os.Getenv("ARK_PLATFORM_DOMAIN")
148+
subdomain := os.Getenv("ARK_SUBDOMAIN")
149+
username := os.Getenv("ARK_USERNAME")
150+
secret := os.Getenv("ARK_SECRET")
151+
152+
if platformDomain == "" || subdomain == "" || username == "" || secret == "" {
153+
t.Skip("Skipping because one of the following environment variables is unset or empty: ARK_PLATFORM_DOMAIN, ARK_SUBDOMAIN, ARK_USERNAME, ARK_SECRET")
154+
return
155+
}
156+
157+
logger := ktesting.NewLogger(t, ktesting.DefaultConfig)
158+
ctx := klog.NewContext(t.Context(), logger)
159+
160+
const (
161+
discoveryContextServiceName = "inventory"
162+
separator = "."
163+
)
164+
165+
serviceURL := fmt.Sprintf("https://%s%s%s.%s", subdomain, separator, discoveryContextServiceName, platformDomain)
166+
167+
var (
168+
identityClient *identity.Client
169+
err error
170+
)
171+
if platformDomain == "cyberark.cloud" {
172+
identityClient, err = identity.New(ctx, subdomain)
173+
} else {
174+
discoveryClient := servicediscovery.New(servicediscovery.WithIntegrationEndpoint())
175+
identityClient, err = identity.NewWithDiscoveryClient(ctx, discoveryClient, subdomain)
176+
}
177+
require.NoError(t, err)
178+
179+
err = identityClient.LoginUsernamePassword(ctx, username, []byte(secret))
180+
require.NoError(t, err)
181+
182+
cyberArkClient, err := dataupload.NewCyberArkClient(nil, serviceURL, identityClient.AuthenticateRequest)
183+
require.NoError(t, err)
184+
185+
dataReadings := testutil.ParseDataReadings(t, testutil.ReadGZIP(t, "testdata/example-1/datareadings.json.gz"))
186+
err = cyberArkClient.PostDataReadingsWithOptions(
187+
ctx,
188+
dataReadings,
189+
dataupload.Options{
190+
ClusterName: "bb068932-c80d-460d-88df-34bc7f3f3297",
191+
},
192+
)
193+
require.NoError(t, err)
194+
}

0 commit comments

Comments
 (0)