Skip to content

Commit b8fa4a5

Browse files
feat(api): add dynamic JSON unmarshaling for DataReading
- Implemented custom UnmarshalJSON for DataReading to handle dynamic parsing of the Data field based on the DataGatherer type. - Added new test cases in agent_test.go to validate output modes for machinehub. - Introduced example configuration and input files for machinehub. - Refactored CyberArkClient to move data conversion logic earlier in the PostDataReadingsWithOptions method.
1 parent 20ec1c5 commit b8fa4a5

File tree

5 files changed

+225
-4
lines changed

5 files changed

+225
-4
lines changed

api/datareading.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"bytes"
55
"encoding/json"
6+
"fmt"
67
"time"
78

89
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -28,6 +29,58 @@ type DataReading struct {
2829
SchemaVersion string `json:"schema_version"`
2930
}
3031

32+
// UnmarshalJSON implements the json.Unmarshaler interface for DataReading.
33+
// It handles the dynamic parsing of the Data field based on the DataGatherer.
34+
func (o *DataReading) UnmarshalJSON(data []byte) error {
35+
var tmp struct {
36+
ClusterID string `json:"cluster_id,omitempty"`
37+
DataGatherer string `json:"data-gatherer"`
38+
Timestamp Time `json:"timestamp"`
39+
Data json.RawMessage `json:"data"`
40+
SchemaVersion string `json:"schema_version"`
41+
}
42+
43+
d := json.NewDecoder(bytes.NewReader(data))
44+
d.DisallowUnknownFields()
45+
46+
if err := d.Decode(&tmp); err != nil {
47+
return err
48+
}
49+
o.ClusterID = tmp.ClusterID
50+
o.DataGatherer = tmp.DataGatherer
51+
o.Timestamp = tmp.Timestamp
52+
o.SchemaVersion = tmp.SchemaVersion
53+
54+
{
55+
var discoveryData DiscoveryData
56+
d := json.NewDecoder(bytes.NewReader(tmp.Data))
57+
d.DisallowUnknownFields()
58+
if err := d.Decode(&discoveryData); err == nil {
59+
o.Data = &discoveryData
60+
return nil
61+
}
62+
}
63+
{
64+
var dynamicData DynamicData
65+
d := json.NewDecoder(bytes.NewReader(tmp.Data))
66+
d.DisallowUnknownFields()
67+
if err := d.Decode(&dynamicData); err == nil {
68+
o.Data = &dynamicData
69+
return nil
70+
}
71+
}
72+
{
73+
var genericData map[string]interface{}
74+
d := json.NewDecoder(bytes.NewReader(tmp.Data))
75+
d.DisallowUnknownFields()
76+
if err := d.Decode(&genericData); err == nil {
77+
o.Data = genericData
78+
return nil
79+
}
80+
}
81+
return fmt.Errorf("failed to parse DataReading.Data for gatherer %s", o.DataGatherer)
82+
}
83+
3184
// GatheredResource wraps the raw k8s resource that is sent to the jetstack secure backend
3285
type GatheredResource struct {
3386
// Resource is a reference to a k8s object that was found by the informer

cmd/agent_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"os"
77
"os/exec"
8+
"path/filepath"
89
"testing"
910
"time"
1011

@@ -49,3 +50,64 @@ func TestAgentRunOneShot(t *testing.T) {
4950
t.Logf("STDERR\n%s\n", stderrStr)
5051
require.NoError(t, err, context.Cause(ctx))
5152
}
53+
54+
func TestOutputModes(t *testing.T) {
55+
cwd, err := os.Getwd()
56+
require.NoError(t, err)
57+
repoRoot, err := filepath.Abs(filepath.Join(cwd, ".."))
58+
require.NoError(t, err)
59+
60+
type testCase struct {
61+
args []string
62+
}
63+
64+
tests := map[string]testCase{
65+
"machinehub": {
66+
args: []string{
67+
"--agent-config-file", "examples/machinehub/config.yaml",
68+
"--input-path", "examples/machinehub/input.json",
69+
"--machine-hub",
70+
},
71+
},
72+
}
73+
74+
for name, testSpec := range tests {
75+
t.Run(name, func(t *testing.T) {
76+
if _, found := os.LookupEnv("GO_CHILD"); found {
77+
os.Args = append([]string{
78+
"preflight",
79+
"agent",
80+
"--log-level", "6",
81+
"--one-shot",
82+
}, testSpec.args...)
83+
84+
Execute()
85+
return
86+
}
87+
t.Log("Running child process")
88+
ctx, cancel := context.WithTimeout(t.Context(), time.Second*10)
89+
defer cancel()
90+
91+
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=^"+t.Name()+"$")
92+
var (
93+
stdout bytes.Buffer
94+
stderr bytes.Buffer
95+
)
96+
cmd.Stdout = &stdout
97+
cmd.Stderr = &stderr
98+
cmd.Dir = repoRoot
99+
100+
cmd.Env = append(
101+
os.Environ(),
102+
"GO_CHILD=true",
103+
)
104+
err := cmd.Run()
105+
106+
stdoutStr := stdout.String()
107+
stderrStr := stderr.String()
108+
t.Logf("STDOUT\n%s\n", stdoutStr)
109+
t.Logf("STDERR\n%s\n", stderrStr)
110+
require.NoError(t, err, context.Cause(ctx))
111+
})
112+
}
113+
}

examples/machinehub/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Not used

examples/machinehub/input.json

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
[
2+
{
3+
"data-gatherer": "ark/discovery",
4+
"data": {
5+
"server_version": {
6+
"gitVersion": "v1.27.6"
7+
}
8+
}
9+
},
10+
{
11+
"data-gatherer": "ark/secrets",
12+
"data": {
13+
"items": [
14+
{
15+
"resource": {
16+
"kind": "Secret",
17+
"apiVersion": "v1",
18+
"metadata": {
19+
"name": "app-1-secret-1",
20+
"namespace": "team-1"
21+
}
22+
}
23+
}
24+
]
25+
}
26+
},
27+
{
28+
"data-gatherer": "ark/pods",
29+
"data": {
30+
"items": [
31+
{
32+
"resource": {
33+
"kind": "Pod",
34+
"apiVersion": "v1",
35+
"metadata": {
36+
"name": "app-1-pod-1",
37+
"namespace": "team-1"
38+
}
39+
}
40+
}
41+
]
42+
}
43+
},
44+
{
45+
"data-gatherer": "ark/statefulsets",
46+
"data": {
47+
"items": []
48+
}
49+
},
50+
{
51+
"data-gatherer": "ark/deployments",
52+
"data": {
53+
"items": []
54+
}
55+
},
56+
{
57+
"data-gatherer": "ark/clusterroles",
58+
"data": {
59+
"items": []
60+
}
61+
},
62+
{
63+
"data-gatherer": "ark/roles",
64+
"data": {
65+
"items": []
66+
}
67+
},
68+
{
69+
"data-gatherer": "ark/clusterrolebindings",
70+
"data": {
71+
"items": []
72+
}
73+
},
74+
{
75+
"data-gatherer": "ark/rolebindings",
76+
"data": {
77+
"items": []
78+
}
79+
},
80+
{
81+
"data-gatherer": "ark/cronjobs",
82+
"data": {
83+
"items": []
84+
}
85+
},
86+
{
87+
"data-gatherer": "ark/jobs",
88+
"data": {
89+
"items": []
90+
}
91+
},
92+
{
93+
"data-gatherer": "ark/daemonsets",
94+
"data": {
95+
"items": []
96+
}
97+
},
98+
{
99+
"data-gatherer": "ark/serviceaccounts",
100+
"data": {
101+
"items": []
102+
}
103+
}
104+
]

pkg/client/client_cyberark.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ func NewCyberArk(httpClient *http.Client) (*CyberArkClient, error) {
4242
// then uploads a snapshot.
4343
// The supplied Options are not used by this publisher.
4444
func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, _ Options) error {
45+
var snapshot dataupload.Snapshot
46+
if err := convertDataReadings(defaultExtractorFunctions, readings, &snapshot); err != nil {
47+
return fmt.Errorf("while converting data readings: %s", err)
48+
}
49+
4550
cfg, err := o.configLoader()
4651
if err != nil {
4752
return err
@@ -50,10 +55,6 @@ func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readin
5055
if err != nil {
5156
return fmt.Errorf("while initializing data upload client: %s", err)
5257
}
53-
var snapshot dataupload.Snapshot
54-
if err := convertDataReadings(defaultExtractorFunctions, readings, &snapshot); err != nil {
55-
return fmt.Errorf("while converting data readings: %s", err)
56-
}
5758
// Temporary hard coded cluster ID.
5859
// TODO(wallrj): The clusterID will eventually be extracted from the supplied readings.
5960
snapshot.ClusterID = "success-cluster-id"

0 commit comments

Comments
 (0)