Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ jobs:
with:
extra_args: --only-verified

- name: Setup Go 1.23
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6
with:
go-version-file: "go.mod"

- name: Run golangci-lint
if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-lint') }}
uses: golangci/golangci-lint-action@v6.0.1
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20 # v9.2.0
with:
args: -v --timeout=5m
version: v1.60.3
version: v2.8.0
only-new-issues: false

- name: Test
if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-test') }}
Expand Down
61 changes: 48 additions & 13 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,54 @@
version: "2"
run:
tests: false
linters:
enable:
- asasalint
- asciicheck
- bidichk
- bodyclose
- durationcheck
- errchkjson
- errorlint
- gocheckcompilerdirectives
- gochecksumtype
- gosec
- gosmopolitan
- loggercheck
- makezero
- musttag
- nilerr
- nilnesserr
- noctx
- reassign
- recvcheck
- rowserrcheck
- spancheck
- sqlclosecheck
- testifylint
- unparam
- zerologlint
disable:
- wrapcheck
- err113
- contextcheck
- err113
- exhaustive
- protogetter
presets:
- bugs
- error
- unused

run:
tests: false

issues:
exclude-dirs:
- .github
- wrapcheck
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
paths:
- .github
- .claude

formatters:
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
132 changes: 132 additions & 0 deletions components/batch_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package components

import (
"context"
"errors"
"log"
"time"
)

func EnqueueTimeout(timeout time.Duration) func(*BatchClientConfig) {
return func(config *BatchClientConfig) {
config.EnqueueTimeout = timeout
}
}

func FlushInterval(interval time.Duration) func(*BatchClientConfig) {
return func(config *BatchClientConfig) {
config.FlushInterval = interval
}
}

func BatchSize(size int) func(*BatchClientConfig) {
return func(config *BatchClientConfig) {
config.BatchSize = size
}
}

type BatchClientConfig struct {
EnqueueTimeout time.Duration
FlushInterval time.Duration
BatchSize int
}

var _ APIClient = (*BatchClient)(nil)

type BatchClient struct {
buffer chan Entry
client APIClient
cfg BatchClientConfig
}

func NewBatchClient(client APIClient, opts ...func(*BatchClientConfig)) *BatchClient {
cfg := BatchClientConfig{
EnqueueTimeout: 5 * time.Second,
FlushInterval: 5 * time.Second,
BatchSize: 100,
}
for _, opt := range opts {
opt(&cfg)
}

b := &BatchClient{
buffer: make(chan Entry, cfg.BatchSize*2),
client: client,
cfg: cfg,
}

return b
}

func (b *BatchClient) IngestLogs(ctx context.Context, entries []Entry) error {
enqTimeout := time.After(b.cfg.EnqueueTimeout)
for _, entry := range entries {
select {
case b.buffer <- entry:
// Successfully enqueued.
case <-ctx.Done():
return ctx.Err()
case <-enqTimeout:
return errors.New("timeout: buffer is full, cannot enqueue log entry")
}
}

return nil
}

func (b *BatchClient) Run(ctx context.Context) error {
return b.run(ctx)
}

func (b *BatchClient) run(ctx context.Context) error {
ticker := time.NewTicker(b.cfg.FlushInterval)
defer ticker.Stop()

entries := make([]Entry, 0, b.cfg.BatchSize)
for {
select {
case entry := <-b.buffer:
if len(entry.Message) == 0 {
continue
}
entries = append(entries, entry)
if len(entries) >= b.cfg.BatchSize {
b.flush(ctx, entries)
entries = entries[:0]
}
case <-ticker.C:
b.flush(ctx, entries)
entries = entries[:0]
case <-ctx.Done():
b.drainBuffer(&entries)
// Use a new context with timeout for graceful shutdown.
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
b.flush(shutdownCtx, entries)
return ctx.Err()
}
}
}

func (b *BatchClient) drainBuffer(entries *[]Entry) {
for {
select {
case entry := <-b.buffer:
if len(entry.Message) > 0 {
*entries = append(*entries, entry)
}
default:
// Buffer is empty.
return
}
}
}

func (b *BatchClient) flush(ctx context.Context, e []Entry) {
if len(e) == 0 {
return
}
if err := b.client.IngestLogs(ctx, e); err != nil {
log.Printf("failed to publish logs: %v", err)
}
}
Loading