Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/kubewarden/audit-scanner/internal/k8s"
Expand Down Expand Up @@ -30,13 +31,24 @@ const (
//nolint:gocognit,funlen // This function is the CLI entrypoint and it's expected to be long.
func NewRootCommand() *cobra.Command {
var (
level logconfig.Level // log level.
outputScan bool // print result of scan as JSON to stdout.
skippedNs []string // list of namespaces to be skipped from scan.
insecureSSL bool // skip SSL cert validation when connecting to PolicyServers endpoints.
disableStore bool // disable storing the results in the k8s cluster.
level logconfig.Level // log level.
outputScan bool // print result of scan as JSON to stdout.
skippedNs []string // list of namespaces to be skipped from scan.
insecureSSL bool // skip SSL cert validation when connecting to PolicyServers endpoints.
disableStore bool // disable storing the results in the k8s cluster.
suseObsURL string // URL to the SUSE OBS API.
suseObsApiKey string // API key to authenticate with the SUSE OBS API.
suseObsUrn string // API key to authenticate with the SUSE OBS API.
suseObsCluster string // API key to authenticate with the SUSE OBS API.
suseObsRepeatInterval time.Duration
suseObsExpireInterval time.Duration
)

defaultInterval, err := time.ParseDuration("30m")
if err != nil {
log.Logger.Err(err).Msg("cannot parse default suseob interval value ")
}

// rootCmd represents the base command when called without any subcommands.
rootCmd := &cobra.Command{
Use: "audit-scanner",
Expand Down Expand Up @@ -112,7 +124,14 @@ There will be a ClusterPolicyReport with results for cluster-wide resources.`,
if err != nil {
return err
}
policyReportStore := report.NewPolicyReportStore(client)
var policyReportStore report.ReportStore
if len(suseObsURL) > 0 && len(suseObsApiKey) > 0 && len(suseObsUrn) > 0 && len(suseObsCluster) > 0 {
log.Debug().Msg("Using SUSE Observability as report store")
policyReportStore = report.NewSuseObsStore(suseObsApiKey, suseObsURL, suseObsUrn, suseObsCluster, suseObsRepeatInterval, suseObsExpireInterval)
} else {
log.Debug().Msg("Using Kubernetes as report store")
policyReportStore = report.NewPolicyReportStore(client)
}
Comment on lines +127 to +134
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is very ugly. And it needs to be refactored.

What do you think about having multiple stores? For example, the audit scanner could create policy reports and send the data to SUSE Obs.


scannerConfig := scanner.Config{
PoliciesClient: policiesClient,
Expand Down Expand Up @@ -162,6 +181,12 @@ There will be a ClusterPolicyReport with results for cluster-wide resources.`,
rootCmd.Flags().IntP("parallel-resources", "", defaultParallelResources, "number of resources to scan in parallel")
rootCmd.Flags().IntP("parallel-policies", "", defaultParallelPolicies, "number of policies to evaluate for a given resource in parallel")
rootCmd.Flags().IntP("page-size", "", defaultPageSize, "number of resources to fetch from the Kubernetes API server when paginating")
rootCmd.Flags().StringVar(&suseObsURL, "suseobs-url", "", "URL to the SUSE OBS API")
rootCmd.Flags().StringVar(&suseObsApiKey, "suseobs-apikey", "", "API key to authenticate with the SUSE OBS API")
rootCmd.Flags().StringVar(&suseObsUrn, "suseobs-urn", "", "SUSE Observability health check stream urn")
rootCmd.Flags().StringVar(&suseObsCluster, "suseobs-cluster", "", "SUSE Observability cluster name where audit scanner is running")
rootCmd.Flags().DurationVar(&suseObsRepeatInterval, "suseobs-repeat-interval", defaultInterval, "The frequency with which audit scanner will send health data to SUSE Observability. Max allowed value is 1800 (30 minutes)")
rootCmd.Flags().DurationVar(&suseObsExpireInterval, "suseobs-expire-interval", defaultInterval, "The time to wait after the last update before an audit scanner check is deleted by SUSE Observability if the check isn't observed again")
Comment on lines +184 to +189
Copy link
Owner Author

@jvanz jvanz Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to allow multiple store types. I think we should move the store configuration to a file.


return rootCmd
}
Expand Down
7 changes: 7 additions & 0 deletions internal/report/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ import (
wgpolicy "sigs.k8s.io/wg-policy-prototypes/policy-report/pkg/api/wgpolicyk8s.io/v1alpha2"
)

type ReportStore interface {
CreateOrPatchPolicyReport(ctx context.Context, policyReport *wgpolicy.PolicyReport) error
DeleteOldPolicyReports(ctx context.Context, scanRunID, namespace string) error
CreateOrPatchClusterPolicyReport(ctx context.Context, clusterPolicyReport *wgpolicy.ClusterPolicyReport) error
DeleteOldClusterPolicyReports(ctx context.Context, scanRunID string) error
}

// PolicyReportStore is a store for PolicyReport and ClusterPolicyReport.
type PolicyReportStore struct {
// client is a controller-runtime client that knows about PolicyReport and ClusterPolicyReport CRDs
Expand Down
220 changes: 220 additions & 0 deletions internal/report/suseobs_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package report

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"net/http"
"net/url"
"strconv"
"strings"
"time"

// "github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
wgpolicy "sigs.k8s.io/wg-policy-prototypes/policy-report/pkg/api/wgpolicyk8s.io/v1alpha2"
)

// https://docs.stackstate.com/health/health-synchronization#consistency-models
const DEFAULT_CONSISTENCY_MODEL = "REPEAT_STATES"

// SuseObsStore is a store for PolicyReport and ClusterPolicyReport.
type SuseObsStore struct {
client *http.Client
apiKey string
internalHostname string
urn string
cluster string
repeatInterval string
expireInterval string
}

type SuseObsExpireConfiguration struct {
RepeatInterval string `json:"repeat_interval_s"`
ExpireInterval string `json:"expiry_interval_s"`
}

type SuseObsStream struct {
Urn string `json:"urn"`
}

type SuseObsCheckState struct {
CheckStateId string `json:"checkStateId"`
Message string `json:"message"`
Health string `json:"health"`
TopologyElementIdentifier string `json:"topologyElementIdentifier"`
Name string `json:"name"`
}

type SuseObsHealthCheck struct {
ConsistencyModel string `json:"consistency_model"`
Expire SuseObsExpireConfiguration `json:"expire"`
Stream SuseObsStream `json:"stream"`
CheckStates []SuseObsCheckState `json:"check_states"`
}

type SuseObsJsonPayload struct {
ApiKey string `json:"apiKey"`
CollectionTimestamp int64 `json:"collection_timestamp"`
InternalHostname string `json:"internalHostname"`
Events interface{} `json:"events,omitempty"`
Metrics []interface{} `json:"metrics"`
ServiceChecks []interface{} `json:"service_checks"`
Health []SuseObsHealthCheck `json:"health"`
Topologies []interface{} `json:"topoligies"`
}

// NewSuseObsStore creates a new SuseObsStore.
func NewSuseObsStore(apiKey, internalHostname, urn, cluster string, repeatInterval, expireInterval time.Duration) *SuseObsStore {
repeatIntervalStr := strconv.FormatFloat(repeatInterval.Seconds(), 'f', 0, 32)
expireIntervalStr := strconv.FormatFloat(expireInterval.Seconds(), 'f', 0, 32)
return &SuseObsStore{
client: &http.Client{
Transport: &http.Transport{
// FIXME - configure certicates for a secure communication
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
apiKey: apiKey,
internalHostname: internalHostname,
urn: urn,
cluster: cluster,
repeatInterval: repeatIntervalStr,
expireInterval: expireIntervalStr,
}
}

func (s *SuseObsStore) generateCheckStates(policyReport *wgpolicy.PolicyReport) []SuseObsCheckState {
checkStates := []SuseObsCheckState{}
for _, result := range policyReport.Results {
healthCheckStatus := "Clear"
if result.Result == "fail" {
healthCheckStatus = "Deviating"
}
checkState := SuseObsCheckState{
CheckStateId: generateCheckStateId(result.Policy, policyReport.Scope),
Message: result.Description,
Health: healthCheckStatus,
TopologyElementIdentifier: strings.ToLower("urn:kubernetes:/" + s.cluster + ":" + policyReport.Scope.Namespace + ":" + policyReport.Scope.Kind + "/" + policyReport.Scope.Name),
Name: result.Policy,
}
checkStates = append(checkStates, checkState)
}
return checkStates
}

func generateCheckStateId(policy string, scope *corev1.ObjectReference) string {
return strings.ToLower(policy + "-" + scope.Namespace + "-" + scope.Kind + "-" + scope.Name + "-" + policy)
}

func (s *SuseObsStore) generateCheckStatesFromClusterPolicyReport(policyReport *wgpolicy.ClusterPolicyReport) []SuseObsCheckState {
checkStates := []SuseObsCheckState{}
for _, result := range policyReport.Results {
healthCheckStatus := "Clear"
if result.Result == "fail" {
healthCheckStatus = "Deviating"
}
checkState := SuseObsCheckState{
CheckStateId: generateCheckStateId(result.Policy, policyReport.Scope),
Message: result.Description,
Health: healthCheckStatus,
TopologyElementIdentifier: strings.ToLower("urn:kubernetes:/" + s.cluster + ":" + policyReport.Scope.Kind + "/" + policyReport.Scope.Name),
Name: result.Policy,
}
checkStates = append(checkStates, checkState)
}
return checkStates
}

func (s *SuseObsStore) generateSuseObsJsonPayload(checkStates []SuseObsCheckState) (*SuseObsJsonPayload, error) {
url, err := url.Parse(s.internalHostname)
if err != nil {
return nil, errors.New("failed to parse SUSE OBS URL")
}
payload := &SuseObsJsonPayload{
ApiKey: s.apiKey,
InternalHostname: url.Hostname(),
CollectionTimestamp: time.Now().Unix(),
Events: nil,
Metrics: []interface{}{},
ServiceChecks: []interface{}{},
Topologies: []interface{}{},
Health: []SuseObsHealthCheck{{
ConsistencyModel: DEFAULT_CONSISTENCY_MODEL,
Expire: SuseObsExpireConfiguration{
RepeatInterval: s.repeatInterval,
ExpireInterval: s.expireInterval,
},
Stream: SuseObsStream{
Urn: s.urn,
},
CheckStates: checkStates,
}},
}
return payload, nil
}

func (s *SuseObsStore) convertPolicyReportIntoSuseObsJsonPayload(policyReport *wgpolicy.PolicyReport) (*SuseObsJsonPayload, error) {
return s.generateSuseObsJsonPayload(s.generateCheckStates(policyReport))
}

func (s *SuseObsStore) convertClusterPolicyReportIntoSuseObsJsonPayload(policyReport *wgpolicy.ClusterPolicyReport) (*SuseObsJsonPayload, error) {
return s.generateSuseObsJsonPayload(s.generateCheckStatesFromClusterPolicyReport(policyReport))
}

// CreateOrPatchPolicyReport creates or patches a PolicyReport.
func (s *SuseObsStore) CreateOrPatchPolicyReport(ctx context.Context, policyReport *wgpolicy.PolicyReport) error {
payload, err := s.convertPolicyReportIntoSuseObsJsonPayload(policyReport)
if err != nil {
return err
}
return s.sendRequest(payload)
}

func (s *SuseObsStore) DeleteOldPolicyReports(ctx context.Context, scanRunID, namespace string) error {
// No need to delete SUSE Obs will remove the check states after the expiry interval
return nil
}

// CreateOrPatchClusterPolicyReport creates or patches a ClusterPolicyReport.
func (s *SuseObsStore) CreateOrPatchClusterPolicyReport(ctx context.Context, clusterPolicyReport *wgpolicy.ClusterPolicyReport) error {
payload, err := s.convertClusterPolicyReportIntoSuseObsJsonPayload(clusterPolicyReport)
if err != nil {
return err
}
return s.sendRequest(payload)
}

func (s *SuseObsStore) DeleteOldClusterPolicyReports(ctx context.Context, scanRunID string) error {
// No need to delete SUSE Obs will remove the check states after the expiry interval
return nil
}

func (s *SuseObsStore) sendRequest(payload *SuseObsJsonPayload) error {
if len(payload.Health) > 0 && len(payload.Health[0].CheckStates) == 0 {
return nil
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
return errors.New("failed to marshal SUSE OBS payload")
}
url := s.internalHostname + "/receiver/stsAgent/intake?api_key="
log.Debug().Dict("dict", zerolog.Dict()).
Str("SUSE Obs URL", url).
RawJSON("payload", jsonPayload).
Msg("Sending SUSE OBS healch check request")

response, err := s.client.Post(url+s.apiKey, "application/json", bytes.NewReader(jsonPayload))
if err != nil {
return errors.Join(errors.New("failed to send SUSE OBS payload"), err)
}
if response.StatusCode != http.StatusOK {
return errors.New("SUSE Obs returned an error. Status code: " + response.Status)
}
return nil

}
Loading
Loading