Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions auditing/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package auditing

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/metal-stack/metal-lib/pkg/pointer"
)

const (
asyncDefaultBackoff = 200 * time.Millisecond
)

type (
AsyncConfig struct {
// AsyncRetry defines the amount of attempts to retry sending an audit trace to a backend in case it failed.
AsyncRetry int
// AsyncBackoff defines the backoff after a failed attempt to index an audit trace to a backend.
AsyncBackoff *time.Duration
}

asyncAuditing struct {
log *slog.Logger
config *AsyncConfig
a Auditing
}
)

// NewAsync takes another audit backend and allows indexing audit traces asynchronously.
// If this is used it can occur that audit traces get lost in case the backend is not available for receiving the trace.
// The advantage is that it does not block.
//
// Dev note: For a backend wrapped in async, it is strictly required that the index function does not modify internal state
// as otherwise race conditions will occur!
func NewAsync(backend Auditing, log *slog.Logger, ac AsyncConfig) (Auditing, error) {
wrappedBackendType := fmt.Sprintf("%T", backend)

a := &asyncAuditing{
log: log.WithGroup("auditing").With("audit-backend", "async", "wrapped-backend", wrappedBackendType),
config: &ac,
a: backend,
}

if ac.AsyncBackoff == nil {
ac.AsyncBackoff = pointer.Pointer(asyncDefaultBackoff)
}

a.log.Info("wrapping audit backend in async")

return a, nil
}

func (a *asyncAuditing) Index(entry Entry) error {
go func() {
count := 0

log := a.log.With("entry-id", entry.Id, "retries", a.config.AsyncRetry, "backoff", a.config.AsyncBackoff.String())

for {
log.Debug("async index", "count", count)

err := a.a.Index(entry)
if err == nil {
return
}

if count >= a.config.AsyncRetry {
log.Error("maximum amount of retries reached for sending event to splunk, giving up", "error", err)
return
}

count++

log.Error("async indexing failed, retrying", "error", err)
time.Sleep(*a.config.AsyncBackoff)

continue
}
}()

return nil
}

func (a *asyncAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
return a.a.Search(ctx, filter)
}
137 changes: 137 additions & 0 deletions auditing/async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package auditing

import (
"context"
"errors"
"log/slog"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_asyncAuditing_Index(t *testing.T) {
tests := []struct {
name string
asyncRetry int
asyncBackoff time.Duration

idxFn func(count int) error

wantCount int
wantTimeout bool
}{
{
name: "index without error",
asyncRetry: 0,
asyncBackoff: 5 * time.Millisecond,
idxFn: func(_ int) error {
return nil
},
wantCount: 0,
wantTimeout: false,
},
{
name: "index with error",
asyncRetry: 0,
asyncBackoff: 5 * time.Millisecond,
idxFn: func(_ int) error {
return errors.New("test backend error")
},
wantCount: 1,
wantTimeout: true,
},
{
name: "retry does work",
asyncRetry: 3,
asyncBackoff: 5 * time.Millisecond,
idxFn: func(count int) error {
switch count {
case 0, 1, 2:
return errors.New("test backend error")
default:
return nil
}
},
wantCount: 3,
wantTimeout: false,
},
{
name: "giving up on too many retries",
asyncRetry: 3,
asyncBackoff: 5 * time.Millisecond,
idxFn: func(count int) error {
switch count {
case 0, 1, 2, 3:
return errors.New("test backend error")
default:
return nil
}
},
wantCount: 4,
wantTimeout: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
done := make(chan bool)
defer close(done)

backend := &testBackend{idxFn: tt.idxFn, done: done}

async, err := NewAsync(backend, slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})), AsyncConfig{
AsyncRetry: tt.asyncRetry,
AsyncBackoff: &tt.asyncBackoff,
})
require.NoError(t, err)

err = async.Index(Entry{Id: "test"})
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

select {
case <-done:
require.False(t, tt.wantTimeout, "finished but timeout was expected")
case <-ctx.Done():
require.True(t, tt.wantTimeout, "unexpected timeout occurred")
}

backend.mutex.Lock()
defer backend.mutex.Unlock()

assert.Equal(t, tt.wantCount, backend.count)
})
}
}

type testBackend struct {
mutex sync.Mutex
done chan bool
count int
idxFn func(count int) error
}

func (t *testBackend) Index(e Entry) error {
t.mutex.Lock()
defer t.mutex.Unlock()

if t.idxFn != nil {
if err := t.idxFn(t.count); err != nil {
t.count++
return errors.New("test backend error")
}
}

t.done <- true

return nil
}

func (t *testBackend) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
panic("not required")
}
5 changes: 2 additions & 3 deletions auditing/auditing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
type Config struct {
Component string
Log *slog.Logger
// IndexTimeout sets a timeout for indexing a trace for the backend.
IndexTimeout time.Duration
}

type Interval string
Expand Down Expand Up @@ -126,9 +128,6 @@ type EntryFilter struct {
}

type Auditing interface {
// Commits all pending entries to the index.
// Should be called before shutting down the application.
Flush() error
// Adds the given entry to the index.
// Some fields like `Id`, `Component` and `Timestamp` will be filled by the auditing driver if not given.
Index(Entry) error
Expand Down
Loading
Loading