Skip to content

Commit 7dc9061

Browse files
authored
Add batched export logs support (#3)
1 parent 857e57b commit 7dc9061

File tree

14 files changed

+1361
-95
lines changed

14 files changed

+1361
-95
lines changed

.github/workflows/build.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,18 @@ jobs:
2020
with:
2121
extra_args: --only-verified
2222

23-
- name: Setup Go 1.23
24-
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5
23+
- name: Setup Go
24+
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6
2525
with:
2626
go-version-file: "go.mod"
2727

2828
- name: Run golangci-lint
2929
if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-lint') }}
30-
uses: golangci/golangci-lint-action@v6.0.1
30+
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20 # v9.2.0
3131
with:
3232
args: -v --timeout=5m
33-
version: v1.60.3
33+
version: v2.8.0
34+
only-new-issues: false
3435

3536
- name: Test
3637
if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-test') }}

.golangci.yaml

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,54 @@
1+
version: "2"
2+
run:
3+
tests: false
14
linters:
5+
enable:
6+
- asasalint
7+
- asciicheck
8+
- bidichk
9+
- bodyclose
10+
- durationcheck
11+
- errchkjson
12+
- errorlint
13+
- gocheckcompilerdirectives
14+
- gochecksumtype
15+
- gosec
16+
- gosmopolitan
17+
- loggercheck
18+
- makezero
19+
- musttag
20+
- nilerr
21+
- nilnesserr
22+
- noctx
23+
- reassign
24+
- recvcheck
25+
- rowserrcheck
26+
- spancheck
27+
- sqlclosecheck
28+
- testifylint
29+
- unparam
30+
- zerologlint
231
disable:
3-
- wrapcheck
4-
- err113
532
- contextcheck
33+
- err113
634
- exhaustive
735
- protogetter
8-
presets:
9-
- bugs
10-
- error
11-
- unused
12-
13-
run:
14-
tests: false
15-
16-
issues:
17-
exclude-dirs:
18-
- .github
36+
- wrapcheck
37+
exclusions:
38+
generated: lax
39+
presets:
40+
- comments
41+
- common-false-positives
42+
- legacy
43+
- std-error-handling
44+
paths:
45+
- .github
46+
- .claude
1947

48+
formatters:
49+
exclusions:
50+
generated: lax
51+
paths:
52+
- third_party$
53+
- builtin$
54+
- examples$

components/batch_client.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package components
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log"
7+
"time"
8+
)
9+
10+
func EnqueueTimeout(timeout time.Duration) func(*BatchClientConfig) {
11+
return func(config *BatchClientConfig) {
12+
config.EnqueueTimeout = timeout
13+
}
14+
}
15+
16+
func FlushInterval(interval time.Duration) func(*BatchClientConfig) {
17+
return func(config *BatchClientConfig) {
18+
config.FlushInterval = interval
19+
}
20+
}
21+
22+
func BatchSize(size int) func(*BatchClientConfig) {
23+
return func(config *BatchClientConfig) {
24+
config.BatchSize = size
25+
}
26+
}
27+
28+
type BatchClientConfig struct {
29+
EnqueueTimeout time.Duration
30+
FlushInterval time.Duration
31+
BatchSize int
32+
}
33+
34+
var _ APIClient = (*BatchClient)(nil)
35+
36+
type BatchClient struct {
37+
buffer chan Entry
38+
client APIClient
39+
cfg BatchClientConfig
40+
}
41+
42+
func NewBatchClient(client APIClient, opts ...func(*BatchClientConfig)) *BatchClient {
43+
cfg := BatchClientConfig{
44+
EnqueueTimeout: 5 * time.Second,
45+
FlushInterval: 5 * time.Second,
46+
BatchSize: 100,
47+
}
48+
for _, opt := range opts {
49+
opt(&cfg)
50+
}
51+
52+
b := &BatchClient{
53+
buffer: make(chan Entry, cfg.BatchSize*2),
54+
client: client,
55+
cfg: cfg,
56+
}
57+
58+
return b
59+
}
60+
61+
func (b *BatchClient) IngestLogs(ctx context.Context, entries []Entry) error {
62+
enqTimeout := time.After(b.cfg.EnqueueTimeout)
63+
for _, entry := range entries {
64+
select {
65+
case b.buffer <- entry:
66+
// Successfully enqueued.
67+
case <-ctx.Done():
68+
return ctx.Err()
69+
case <-enqTimeout:
70+
return errors.New("timeout: buffer is full, cannot enqueue log entry")
71+
}
72+
}
73+
74+
return nil
75+
}
76+
77+
func (b *BatchClient) Run(ctx context.Context) error {
78+
return b.run(ctx)
79+
}
80+
81+
func (b *BatchClient) run(ctx context.Context) error {
82+
ticker := time.NewTicker(b.cfg.FlushInterval)
83+
defer ticker.Stop()
84+
85+
entries := make([]Entry, 0, b.cfg.BatchSize)
86+
for {
87+
select {
88+
case entry := <-b.buffer:
89+
if len(entry.Message) == 0 {
90+
continue
91+
}
92+
entries = append(entries, entry)
93+
if len(entries) >= b.cfg.BatchSize {
94+
b.flush(ctx, entries)
95+
entries = entries[:0]
96+
}
97+
case <-ticker.C:
98+
b.flush(ctx, entries)
99+
entries = entries[:0]
100+
case <-ctx.Done():
101+
b.drainBuffer(&entries)
102+
// Use a new context with timeout for graceful shutdown.
103+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
104+
defer cancel()
105+
b.flush(shutdownCtx, entries)
106+
return ctx.Err()
107+
}
108+
}
109+
}
110+
111+
func (b *BatchClient) drainBuffer(entries *[]Entry) {
112+
for {
113+
select {
114+
case entry := <-b.buffer:
115+
if len(entry.Message) > 0 {
116+
*entries = append(*entries, entry)
117+
}
118+
default:
119+
// Buffer is empty.
120+
return
121+
}
122+
}
123+
}
124+
125+
func (b *BatchClient) flush(ctx context.Context, e []Entry) {
126+
if len(e) == 0 {
127+
return
128+
}
129+
if err := b.client.IngestLogs(ctx, e); err != nil {
130+
log.Printf("failed to publish logs: %v", err)
131+
}
132+
}

0 commit comments

Comments
 (0)