-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathasync.go
More file actions
88 lines (68 loc) · 2.13 KB
/
async.go
File metadata and controls
88 lines (68 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package auditing
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/metal-stack/metal-lib/pkg/pointer"
)
const (
asyncDefaultBackoff = 200 * time.Millisecond
)
type (
AsyncConfig struct {
// AsyncRetry defines the amount of attempts to retry sending an audit trace to a backend in case it failed.
AsyncRetry int
// AsyncBackoff defines the backoff after a failed attempt to index an audit trace to a backend.
AsyncBackoff *time.Duration
}
asyncAuditing struct {
log *slog.Logger
config *AsyncConfig
a Auditing
}
)
// NewAsync takes another audit backend and allows indexing audit traces asynchronously.
// If this is used it can occur that audit traces get lost in case the backend is not available for receiving the trace.
// The advantage is that it does not block.
//
// Dev note: For a backend wrapped in async, it is strictly required that the index function does not modify internal state
// as otherwise race conditions will occur!
func NewAsync(backend Auditing, log *slog.Logger, ac AsyncConfig) (Auditing, error) {
wrappedBackendType := fmt.Sprintf("%T", backend)
a := &asyncAuditing{
log: log.WithGroup("auditing").With("audit-backend", "async", "wrapped-backend", wrappedBackendType),
config: &ac,
a: backend,
}
if ac.AsyncBackoff == nil {
ac.AsyncBackoff = pointer.Pointer(asyncDefaultBackoff)
}
a.log.Info("wrapping audit backend in async")
return a, nil
}
func (a *asyncAuditing) Index(entry Entry) error {
go func() {
count := 0
log := a.log.With("entry-id", entry.Id, "retries", a.config.AsyncRetry, "backoff", a.config.AsyncBackoff.String())
for {
log.Debug("async index", "count", count)
err := a.a.Index(entry)
if err == nil {
return
}
if count >= a.config.AsyncRetry {
log.Error("maximum amount of retries reached for sending event to splunk, giving up", "error", err)
return
}
count++
log.Error("async indexing failed, retrying", "error", err)
time.Sleep(*a.config.AsyncBackoff)
continue
}
}()
return nil
}
func (a *asyncAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
return a.a.Search(ctx, filter)
}