Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions auditing/auditing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (
type Config struct {
Component string
Log *slog.Logger

// Async indexes audit traces asynchronously if set to true. if this functionality is implemented depends on the audit backend implementation.
// If this is set to true it can occur that audit traces get lost in case the backend is not available for receiving the trace.
Async bool
// AsyncRetry defines the amount of attempts to retry sending an audit trace to a backend in case it failed.
AsyncRetry int
// AsyncBackoff defines the backoff after a failed attempt to index an audit trace to a backend.
AsyncBackoff time.Duration
// AsyncTimeout sets a timeout for indexing a trace for the backend.
AsyncTimeout time.Duration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configs are declared for all auditing providers, but are only implemented for splunk.
Either these should be moved to the SplunkConfig or a different solution should be considered.

What if we'd create an asyncAuditing which would then only implement the async mechanism while forwarding the index requests to the actual/wrapped backend?

// would be used like this:
splunk, err := auditing.Splunk(...)
if err != nil {
  return nil, err
}
asyncAuditing := auditing.Async(auditing.AsyncConfig{Retry: 1, /*...*/}, splunk)
return asyncAuditing, nil

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sounds good. I wanted to keep it small as first attempt and don't generalize it, but it certainly much cleaner to do it like this. I'll give that a try.

}

type Interval string
Expand Down
204 changes: 204 additions & 0 deletions auditing/splunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package auditing

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
)

type (
SplunkConfig struct {
Endpoint string
HECToken string
SourceType string
Index string
Host string
TlsConfig *tls.Config
}

splunkAuditing struct {
component string
log *slog.Logger

async bool
asyncRetry int
asyncBackoff time.Duration
asyncTimeout time.Duration

client *http.Client

endpoint string
hecToken string
sourceType string
index string
host string
}

splunkEvent struct {
// Time is the event time. The default time format is UNIX time format.
Time int64 `json:"time,omitempty"`
// Host value to assign to the event data. This key is typically the hostname of the client from which you're sending data.
Host string `json:"host,omitempty"`
// Source value to assign to the event data. For example, if you're sending data from an app you're developing, set this key to the name of the app.
Source string `json:"source,omitempty"`
// Sourcetype value to assign to the event data.
SourceType string `json:"sourcetype,omitempty"`
// Index by which the event data is to be indexed.
Index string `json:"index,omitempty"`
// Event is the actual event data in whatever format you want: a string, a number, another JSON object, and so on.
Event Entry `json:"event,omitempty"`
}
)

// NewSplunk returns a new auditing backend for splunk. It supports the HTTP event collector interface.
func NewSplunk(c Config, sc SplunkConfig) (Auditing, error) {
if c.Component == "" {
component, err := defaultComponent()
if err != nil {
return nil, err
}

c.Component = component
}

var (
endpoint = "http://localhost:8088"
sourceType = "_json"
asyncBackoff = 200 * time.Millisecond
asyncTimeout = 5 * time.Second
)

if sc.Endpoint != "" {
endpoint = sc.Endpoint
}

if sc.HECToken == "" {
return nil, fmt.Errorf("HEC token must be configured")
}

if sc.SourceType != "" {
sourceType = sc.SourceType
}

if sc.Endpoint != "" {
endpoint = sc.Endpoint
}

if c.Async && c.AsyncTimeout > 0 {
asyncTimeout = c.AsyncTimeout
}

if c.Async && c.AsyncBackoff > 0 {
asyncBackoff = c.AsyncBackoff
}

a := &splunkAuditing{
component: c.Component,
log: c.Log.WithGroup("auditing"),
async: c.Async,
asyncRetry: c.AsyncRetry,
asyncBackoff: asyncBackoff,
asyncTimeout: asyncTimeout,
client: &http.Client{Transport: &http.Transport{TLSClientConfig: sc.TlsConfig}},
endpoint: endpoint,
hecToken: sc.HECToken,
sourceType: sourceType,
index: sc.Index,
host: sc.Host,
}

a.log.Info("initialized splunk client")

return a, nil
}

func (a *splunkAuditing) Flush() error {
return nil
}

func (a *splunkAuditing) Index(entry Entry) error {
if entry.Timestamp.IsZero() {
return errors.New("timestamp is not set")
}

splunkEvent := &splunkEvent{
Time: entry.Timestamp.Unix(),
Host: a.host,
Source: a.component,
SourceType: a.sourceType,
Index: a.index,
Event: entry,
}

e, err := json.Marshal(splunkEvent)
if err != nil {
return fmt.Errorf("error marshaling splunk event: %w", err)
}

if !a.async {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.endpoint+"/services/collector", bytes.NewBuffer(e))
if err != nil {
return err
}

req.Header.Add("Authorization", "Splunk "+a.hecToken)

resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("error indexing audit entry in splunk: %w", err)
}
defer resp.Body.Close()

return nil
}

go func() {
count := 0

for {
if count > a.asyncRetry {
a.log.Error("maximum amount of retries reached for sending event to splunk, giving up", "retries", a.asyncRetry, "entry-id", entry.Id)
return
}

count++

ctx, cancel := context.WithTimeout(context.Background(), a.asyncTimeout)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.endpoint+"/services/collector", bytes.NewBuffer(e))
Copy link
Copy Markdown
Contributor

@vknabel vknabel May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any actual difference between the core of this for loop and the non-async variant?
This could be extracted into one function which could receive a context. In case of extracting the async mechanism this is of course obsolete.

if err != nil {
cancel()
a.log.Error("error creating request", "error", err)
return
}

req.Header.Add("Authorization", "Splunk "+a.hecToken)

resp, err := a.client.Do(req)
cancel()
if err != nil {
a.log.Error("error indexing audit entry in splunk, retrying", "backoff", a.asyncBackoff.String(), "error", err)
time.Sleep(a.asyncBackoff)
continue
}
defer resp.Body.Close()

return
}
}()

return nil
}

func (a *splunkAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
return nil, fmt.Errorf("search not implemented for splunk audit backend")
}
122 changes: 122 additions & 0 deletions auditing/splunk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package auditing

import (
"encoding/json"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/metal-stack/metal-lib/pkg/pointer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_splunkAuditing_Index(t *testing.T) {
now := time.Now()

tests := []struct {
name string
async bool
asyncRetry int
asyncTimeout time.Duration
entry Entry
want splunkEvent
}{
{
name: "index some entry with async",
async: true,
asyncRetry: 0,
asyncTimeout: 500 * time.Millisecond,
entry: Entry{
Component: "entry-component",
RequestId: "request-id",
Type: "entry-type",
Timestamp: now,
User: "entry-user",
Tenant: "entry-tenant",
Detail: "entry-detail",
Phase: "entry-phase",
Path: "entry-path",
ForwardedFor: "entry-forwarded",
RemoteAddr: "entry-remote-addr",
Body: nil,
StatusCode: pointer.Pointer(200),
Error: nil,
},
want: splunkEvent{
Time: now.Unix(),
Host: "test-host",
Source: "metal-lib",
SourceType: "_json",
Index: "test-index",
Event: Entry{
Component: "entry-component",
RequestId: "request-id",
Type: "entry-type",
Timestamp: now,
User: "entry-user",
Tenant: "entry-tenant",
Detail: "entry-detail",
Phase: "entry-phase",
Path: "entry-path",
ForwardedFor: "entry-forwarded",
RemoteAddr: "entry-remote-addr",
Body: nil,
StatusCode: pointer.Pointer(200),
Error: nil,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
received := make(chan bool)
defer close(received)

mux := http.NewServeMux()
mux.HandleFunc("/services/collector", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
assert.NoError(t, err)

var data splunkEvent
err = json.Unmarshal(body, &data)
assert.NoError(t, err)

if diff := cmp.Diff(data, tt.want); diff != "" {
t.Errorf("diff = %s", diff)
}

w.WriteHeader(http.StatusOK)

received <- true
})
server := httptest.NewServer(mux)
defer server.Close()

a, err := NewSplunk(Config{
Component: "metal-lib",
Log: slog.Default(),
Async: tt.async,
AsyncRetry: tt.asyncRetry,
AsyncTimeout: tt.asyncTimeout,
}, SplunkConfig{
Endpoint: server.URL,
HECToken: "test-hec",
Index: "test-index",
Host: "test-host",
})
require.NoError(t, err)

err = a.Index(tt.entry)
require.NoError(t, err)

if tt.async {
<-received
}
})
}
}
4 changes: 4 additions & 0 deletions auditing/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) {
c.Component = component
}

if c.Async {
return nil, fmt.Errorf("timescaledb backend does not support async indexing")
}

if tc.Port == "" {
tc.Port = "5432"
}
Expand Down
Loading