diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index b3cd703..b529cd1 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -20,17 +20,18 @@ jobs: with: extra_args: --only-verified - - name: Setup Go 1.23 - uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5 + - name: Setup Go + uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6 with: go-version-file: "go.mod" - name: Run golangci-lint if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-lint') }} - uses: golangci/golangci-lint-action@v6.0.1 + uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20 # v9.2.0 with: args: -v --timeout=5m - version: v1.60.3 + version: v2.8.0 + only-new-issues: false - name: Test if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-test') }} diff --git a/.golangci.yaml b/.golangci.yaml index f066f2d..9092060 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,19 +1,54 @@ +version: "2" +run: + tests: false linters: + enable: + - asasalint + - asciicheck + - bidichk + - bodyclose + - durationcheck + - errchkjson + - errorlint + - gocheckcompilerdirectives + - gochecksumtype + - gosec + - gosmopolitan + - loggercheck + - makezero + - musttag + - nilerr + - nilnesserr + - noctx + - reassign + - recvcheck + - rowserrcheck + - spancheck + - sqlclosecheck + - testifylint + - unparam + - zerologlint disable: - - wrapcheck - - err113 - contextcheck + - err113 - exhaustive - protogetter - presets: - - bugs - - error - - unused - -run: - tests: false - -issues: - exclude-dirs: - - .github + - wrapcheck + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - .github + - .claude +formatters: + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ \ No newline at end of file diff --git a/components/batch_client.go b/components/batch_client.go new file mode 100644 index 0000000..da1bacc --- /dev/null +++ b/components/batch_client.go @@ -0,0 +1,132 @@ +package components + +import ( + "context" + "errors" + "log" + "time" +) + +func EnqueueTimeout(timeout time.Duration) func(*BatchClientConfig) { + return func(config *BatchClientConfig) { + config.EnqueueTimeout = timeout + } +} + +func FlushInterval(interval time.Duration) func(*BatchClientConfig) { + return func(config *BatchClientConfig) { + config.FlushInterval = interval + } +} + +func BatchSize(size int) func(*BatchClientConfig) { + return func(config *BatchClientConfig) { + config.BatchSize = size + } +} + +type BatchClientConfig struct { + EnqueueTimeout time.Duration + FlushInterval time.Duration + BatchSize int +} + +var _ APIClient = (*BatchClient)(nil) + +type BatchClient struct { + buffer chan Entry + client APIClient + cfg BatchClientConfig +} + +func NewBatchClient(client APIClient, opts ...func(*BatchClientConfig)) *BatchClient { + cfg := BatchClientConfig{ + EnqueueTimeout: 5 * time.Second, + FlushInterval: 5 * time.Second, + BatchSize: 100, + } + for _, opt := range opts { + opt(&cfg) + } + + b := &BatchClient{ + buffer: make(chan Entry, cfg.BatchSize*2), + client: client, + cfg: cfg, + } + + return b +} + +func (b *BatchClient) IngestLogs(ctx context.Context, entries []Entry) error { + enqTimeout := time.After(b.cfg.EnqueueTimeout) + for _, entry := range entries { + select { + case b.buffer <- entry: + // Successfully enqueued. + case <-ctx.Done(): + return ctx.Err() + case <-enqTimeout: + return errors.New("timeout: buffer is full, cannot enqueue log entry") + } + } + + return nil +} + +func (b *BatchClient) Run(ctx context.Context) error { + return b.run(ctx) +} + +func (b *BatchClient) run(ctx context.Context) error { + ticker := time.NewTicker(b.cfg.FlushInterval) + defer ticker.Stop() + + entries := make([]Entry, 0, b.cfg.BatchSize) + for { + select { + case entry := <-b.buffer: + if len(entry.Message) == 0 { + continue + } + entries = append(entries, entry) + if len(entries) >= b.cfg.BatchSize { + b.flush(ctx, entries) + entries = entries[:0] + } + case <-ticker.C: + b.flush(ctx, entries) + entries = entries[:0] + case <-ctx.Done(): + b.drainBuffer(&entries) + // Use a new context with timeout for graceful shutdown. + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + b.flush(shutdownCtx, entries) + return ctx.Err() + } + } +} + +func (b *BatchClient) drainBuffer(entries *[]Entry) { + for { + select { + case entry := <-b.buffer: + if len(entry.Message) > 0 { + *entries = append(*entries, entry) + } + default: + // Buffer is empty. + return + } + } +} + +func (b *BatchClient) flush(ctx context.Context, e []Entry) { + if len(e) == 0 { + return + } + if err := b.client.IngestLogs(ctx, e); err != nil { + log.Printf("failed to publish logs: %v", err) + } +} diff --git a/components/batch_client_test.go b/components/batch_client_test.go new file mode 100644 index 0000000..11ce7ec --- /dev/null +++ b/components/batch_client_test.go @@ -0,0 +1,241 @@ +package components_test + +import ( + "context" + "slices" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/castai/logging/components" +) + +func Test_BufferedClient_PublishLogs(t *testing.T) { + t.Run("should not publish logs when flush interval is not reached", func(t *testing.T) { + r := require.New(t) + mockAPIClient := &apiClient{} + client := components.NewBatchClient(mockAPIClient, components.BatchSize(420), components.FlushInterval(time.Hour)) + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error, 1) + go func() { + errc <- client.Run(ctx) + close(errc) + }() + + err := client.IngestLogs(ctx, []components.Entry{{ + Level: "meow", + Message: "meow-message", + Time: time.Now(), + }}) + r.NoError(err) + + sentLogs := mockAPIClient.waitForLogs(10 * time.Millisecond) + r.Len(sentLogs, 0) + + cancel() + r.ErrorIs(<-errc, context.Canceled) + }) + + t.Run("should publish logs when max entries per publish is reached but flush interval is not reached", func(t *testing.T) { + r := require.New(t) + mockAPIClient := &apiClient{} + client := components.NewBatchClient(mockAPIClient, components.BatchSize(1), components.FlushInterval(time.Hour)) + + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error, 1) + go func() { + errc <- client.Run(ctx) + close(errc) + }() + + entries := []components.Entry{{ + Level: "meow", + Message: "meow-message", + Time: time.Now(), + }} + + err := client.IngestLogs(ctx, entries) + r.NoError(err) + + sentLogs := mockAPIClient.waitForLogs(time.Second) + r.Len(sentLogs, 1) + + cancel() + r.ErrorIs(<-errc, context.Canceled) + }) + + t.Run("should publish logs when flush interval is reached but not enough entries to publish", func(t *testing.T) { + r := require.New(t) + mockAPIClient := &apiClient{} + client := components.NewBatchClient(mockAPIClient, components.BatchSize(2), components.FlushInterval(time.Millisecond*10)) + + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error, 1) + go func() { + errc <- client.Run(ctx) + close(errc) + }() + + entries := []components.Entry{{ + Level: "meow", + Message: "meow-message", + Time: time.Now(), + }} + + err := client.IngestLogs(ctx, entries) + r.NoError(err) + + sentLogs := mockAPIClient.waitForLogs(time.Second) + r.Len(sentLogs, 1) + + cancel() + r.ErrorIs(<-errc, context.Canceled) + }) + + t.Run("should publish remaining logs when no conditions are met but context is canceled", func(t *testing.T) { + r := require.New(t) + mockAPIClient := &apiClient{} + client := components.NewBatchClient(mockAPIClient, components.BatchSize(2), components.FlushInterval(time.Hour)) + + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error, 1) + go func() { + errc <- client.Run(ctx) + close(errc) + }() + + entries := []components.Entry{{ + Level: "meow", + Message: "meow-message", + Time: time.Now(), + }} + + err := client.IngestLogs(ctx, entries) + r.NoError(err) + <-time.After(time.Millisecond * 100) + cancel() + + sentLogs := mockAPIClient.waitForLogs(time.Second) + r.Len(sentLogs, 1) + + r.ErrorIs(<-errc, context.Canceled) + }) + + t.Run("should drain all buffered entries on shutdown", func(t *testing.T) { + r := require.New(t) + mockAPIClient := &apiClient{} + client := components.NewBatchClient(mockAPIClient, components.BatchSize(100), components.FlushInterval(time.Hour)) + + done := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + _ = client.Run(ctx) + close(done) + }() + + // Ingest 10 entries + for i := 0; i < 10; i++ { + err := client.IngestLogs(ctx, []components.Entry{{ + Level: "info", + Message: "test message", + Time: time.Now(), + }}) + r.NoError(err) + } + + // Give time for entries to be queued + time.Sleep(50 * time.Millisecond) + + // Cancel context - should drain buffer and flush all entries + cancel() + <-done + + sentLogs := mockAPIClient.getLogs() + r.Len(sentLogs, 10, "all buffered entries should be flushed on shutdown") + }) + + t.Run("should timeout when buffer is full", func(t *testing.T) { + r := require.New(t) + mockAPIClient := &slowAPIClient{delay: 30 * time.Second} // Very slow to keep buffer full + client := components.NewBatchClient( + mockAPIClient, + components.BatchSize(1), + components.FlushInterval(time.Second), + components.EnqueueTimeout(time.Second), + ) + + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error, 1) + go func() { + errc <- client.Run(ctx) + close(errc) + }() + defer cancel() + + // Quickly fill the buffer (capacity is BatchSize * 2 = 2) + // and trigger processing which will block on slow API call + for i := 0; i < 5; i++ { + err := client.IngestLogs(ctx, []components.Entry{{ + Level: "info", + Message: "test message", + Time: time.Now(), + }}) + if err != nil { + // We expect an error when buffer is full + r.Contains(err.Error(), "timeout") + return + } + // Don't sleep between sends to fill buffer quickly + } + + r.Fail("expected timeout error but got none") + }) +} + +type apiClient struct { + mu sync.Mutex + logs []components.Entry +} + +func (a *apiClient) IngestLogs(ctx context.Context, entries []components.Entry) error { + a.mu.Lock() + defer a.mu.Unlock() + + a.logs = append(a.logs, entries...) + return nil +} + +func (a *apiClient) getLogs() []components.Entry { + a.mu.Lock() + defer a.mu.Unlock() + return slices.Clone(a.logs) +} + +func (a *apiClient) waitForLogs(waitDuration time.Duration) []components.Entry { + timeout := time.After(waitDuration) + + for { + select { + case <-time.After(time.Millisecond): + a.mu.Lock() + res := slices.Clone(a.logs) + a.mu.Unlock() + if len(res) > 0 { + return res + } + case <-timeout: + return nil + } + } +} + +type slowAPIClient struct { + delay time.Duration +} + +func (s *slowAPIClient) IngestLogs(ctx context.Context, entries []components.Entry) error { + time.Sleep(s.delay) + return nil +} diff --git a/components/client.go b/components/client.go new file mode 100644 index 0000000..6c18dfe --- /dev/null +++ b/components/client.go @@ -0,0 +1,250 @@ +package components + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "sync" + "time" +) + +type LogLevel string + +const ( + LogLevelDebug LogLevel = "LOG_LEVEL_DEBUG" + LogLevelInfo LogLevel = "LOG_LEVEL_INFO" + LogLevelWarning LogLevel = "LOG_LEVEL_WARNING" + LogLevelError LogLevel = "LOG_LEVEL_ERROR" + LogLevelUnknown LogLevel = "LOG_LEVEL_UNKNOWN" +) + +type IngestLogsRequest struct { + Version string `json:"version"` + Entries []Entry `json:"entries"` +} + +type Entry struct { + Level string `json:"level"` + Message string `json:"message"` + Time time.Time `json:"time"` + Fields map[string]string `json:"fields"` +} + +const ( + headerAPIKey = "X-API-Key" // #nosec G101 +) + +var gzipWriterPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, +} + +type APIClient interface { + IngestLogs(ctx context.Context, entries []Entry) error +} + +type Config struct { + APIBaseURL string + APIKey string + ClusterID string + Component string + Version string + TLSCert string + MaxRetries int // Number of retries on failure (-1 = no retries) + MaxRetryBackoffWait time.Duration +} + +var _ APIClient = (*APIClientImpl)(nil) + +type APIClientImpl struct { + httpClient *http.Client + cfg Config +} + +func NewAPIClient(cfg Config) (*APIClientImpl, error) { + if err := validateConfig(cfg); err != nil { + return nil, err + } + if cfg.MaxRetries == 0 { + cfg.MaxRetries = 3 + } + if cfg.MaxRetryBackoffWait == 0 { + cfg.MaxRetryBackoffWait = 5 * time.Second + } + + httpClient, err := createHTTPClient(cfg.TLSCert) + if err != nil { + return nil, err + } + return &APIClientImpl{ + cfg: cfg, + httpClient: httpClient, + }, nil +} + +func validateConfig(cfg Config) error { + if cfg.APIBaseURL == "" { + return errors.New("field APIBaseURL is required") + } + if cfg.APIKey == "" { + return errors.New("field APIKey is required") + } + if cfg.ClusterID == "" { + return errors.New("field ClusterID is required") + } + if cfg.Component == "" { + return errors.New("field Component is required") + } + if cfg.Version == "" { + return errors.New("field Version is required") + } + return nil +} + +func (a *APIClientImpl) IngestLogs(ctx context.Context, entries []Entry) error { + payload := &IngestLogsRequest{ + Version: a.cfg.Version, + Entries: entries, + } + + jsonBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshaling ingest logs request: %w", err) + } + + maxRetries := a.cfg.MaxRetries + backoff := 100 * time.Millisecond + var lastErr error + if maxRetries < 0 { + maxRetries = 0 + } + for attempt := 0; attempt <= maxRetries; attempt++ { + if attempt > 0 { + waitTime := backoff * time.Duration(1< a.cfg.MaxRetryBackoffWait { + waitTime = a.cfg.MaxRetryBackoffWait + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitTime): + } + } + + err := a.doIngestRequest(ctx, jsonBytes) + if err == nil { + return nil + } + lastErr = err + + if !a.shouldRetry(err, attempt, maxRetries) { + return err + } + } + return fmt.Errorf("ingest logs failed after %d retries: %w", maxRetries, lastErr) +} + +func (a *APIClientImpl) shouldRetry(err error, attempt, maxRetries int) bool { + if attempt >= maxRetries { + return false + } + var httpErr *httpError + if errors.As(err, &httpErr) { + return httpErr.statusCode >= 500 + } + return true +} + +type httpError struct { + statusCode int + message string +} + +func (e *httpError) Error() string { + return e.message +} + +func (a *APIClientImpl) doIngestRequest(ctx context.Context, jsonBytes []byte) error { + var compressedBuf bytes.Buffer + gzipWriter := gzipWriterPool.Get().(*gzip.Writer) + defer gzipWriterPool.Put(gzipWriter) + + gzipWriter.Reset(&compressedBuf) + if _, err := gzipWriter.Write(jsonBytes); err != nil { + return err + } + if err := gzipWriter.Close(); err != nil { + return err + } + + endpoint := fmt.Sprintf("%s/v1/clusters/%s/components/%s/logs", a.cfg.APIBaseURL, a.cfg.ClusterID, a.cfg.Component) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, &compressedBuf) + if err != nil { + return err + } + + req.Header.Set(headerAPIKey, a.cfg.APIKey) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Encoding", "gzip") + + resp, err := a.httpClient.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + respMsg, _ := io.ReadAll(resp.Body) + return &httpError{ + statusCode: resp.StatusCode, + message: fmt.Sprintf("ingest logs failed: expected status %d, got %d: %v", http.StatusOK, resp.StatusCode, string(respMsg)), + } + } + return nil +} + +func createHTTPClient(tlsCert string) (*http.Client, error) { + tlsConfig, err := createTLSConfig(tlsCert) + if err != nil { + return nil, err + } + return &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 15 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 5 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + TLSClientConfig: tlsConfig, + }, + }, nil +} + +func createTLSConfig(tlsCert string) (*tls.Config, error) { + if tlsCert == "" { + return nil, nil + } + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM([]byte(tlsCert)) { + return nil, fmt.Errorf("failed to add root certificate to CA pool") + } + + return &tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: certPool, + }, nil +} diff --git a/components/client_test.go b/components/client_test.go new file mode 100644 index 0000000..6ca1dd5 --- /dev/null +++ b/components/client_test.go @@ -0,0 +1,465 @@ +package components + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestClient_NewAPIClient(t *testing.T) { + t.Run("should create new APIClient", func(t *testing.T) { + _, err := NewAPIClient(Config{ + APIBaseURL: "http://localhost:1234", + APIKey: "key", + ClusterID: "clusterID", + Component: "omni-agent", + Version: "123", + }) + require.NoError(t, err) + }) + t.Run("should create new APIClient with valid CA cert", func(t *testing.T) { + caCert := ` +-----BEGIN CERTIFICATE----- +MIIDATCCAemgAwIBAgIUPUS4krHP49SF+yYMLHe4nCllKmEwDQYJKoZIhvcNAQEL +BQAwDzENMAsGA1UECgwEVGVzdDAgFw0yMzA5MTMwODM5MzhaGA8yMjE1MDUxMDA4 +MzkzOFowDzENMAsGA1UECgwEVGVzdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC +AQoCggEBAOVZbDa4/tf3N3VP4Ezvt18d++xrQ+bzjhuE7MWX36NWZ4wUzgmqQXd0 +OQWoxYqRGKyI847v29j2BWG17ZmbqarwZHjR98rn9gNtRJgeURlEyAh1pAprhFwb +IBS9vyyCNJtfFFF+lvWvJcU+VKIqWH/9413xDx+OE8tRWNRkS/1CVJg1Nnm3H/IF +lhWAKOYbeKY9q8RtIhb4xNqIc8nmUjDFIjRTarIuf+jDwfFQAPK5pNci+o9KCDgd +Y4lvnGfvPp9XAHnWzTRWNGJQyefZb/SdJjXlic10njfttzKBXi0x8IuV2x98AEPE +2jLXIvC+UBpvMhscdzPfahp5xkYJWx0CAwEAAaNTMFEwHQYDVR0OBBYEFFE48b+V +4E5PWqjpLcUnqWvDDgsuMB8GA1UdIwQYMBaAFFE48b+V4E5PWqjpLcUnqWvDDgsu +MA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAIe82ddHX61WHmyp +zeSiF25aXBqeOUA0ScArTL0fBGi9xZ/8gVU79BvJMyfkaeBKvV06ka6g9OnleWYB +zhBmHBvCL6PsgwLxgzt/dj5ES0K3Ml+7jGmhCKKryzYj/ZvhSMyLlxZqP/nRccBG +y6G3KK4bjzqY4TcEPNs8H4Akc+0SGcPl+AAe65mXPIQhtMkANFLoRuWxMf5JmJke +dYT1GoOjRJpEWCATM+KCXa3UEpRBcXNLeOHZivuqf7n0e1CUD6+0oK4TLxVsTqti +q276VYI/vYmMLRI/iE7Qjn9uGEeR1LWpVngE9jSzSdzByvzw3DwO4sL5B+rv7O1T +9Qgi/No= +-----END CERTIFICATE----- + ` + _, err := NewAPIClient(Config{ + APIBaseURL: "http://localhost:1234", + APIKey: "key", + ClusterID: "clusterID", + Component: "omni-agent", + Version: "123", + TLSCert: caCert, + }) + require.NoError(t, err) + }) + t.Run("should return err with invalid CA cert", func(t *testing.T) { + caCert := "invalid-ca-cert" + _, err := NewAPIClient(Config{ + APIBaseURL: "http://localhost:1234", + APIKey: "key", + ClusterID: "clusterID", + Component: "omni-agent", + Version: "123", + TLSCert: caCert, + }) + require.Error(t, err) + }) + + t.Run("should validate required fields", func(t *testing.T) { + tests := []struct { + name string + config Config + errMsg string + }{ + { + name: "missing APIBaseURL", + config: Config{APIKey: "key", ClusterID: "cluster", Component: "comp", Version: "v1"}, + errMsg: "APIBaseURL is required", + }, + { + name: "missing APIKey", + config: Config{APIBaseURL: "http://test", ClusterID: "cluster", Component: "comp", Version: "v1"}, + errMsg: "APIKey is required", + }, + { + name: "missing ClusterID", + config: Config{APIBaseURL: "http://test", APIKey: "key", Component: "comp", Version: "v1"}, + errMsg: "ClusterID is required", + }, + { + name: "missing Component", + config: Config{APIBaseURL: "http://test", APIKey: "key", ClusterID: "cluster", Version: "v1"}, + errMsg: "Component is required", + }, + { + name: "missing Version", + config: Config{APIBaseURL: "http://test", APIKey: "key", ClusterID: "cluster", Component: "comp"}, + errMsg: "Version is required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewAPIClient(tt.config) + require.Error(t, err) + require.Contains(t, err.Error(), tt.errMsg) + }) + } + }) +} + +func TestClient_IngestLogs(t *testing.T) { + t.Run("happy path - should send valid request with correct headers and body", func(t *testing.T) { + // Setup test server + var receivedRequest *http.Request + var receivedBody []byte + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedRequest = r + var err error + receivedBody, err = io.ReadAll(r.Body) + require.NoError(t, err) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + // Create client + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + }) + require.NoError(t, err) + + // Prepare test data + testTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + entries := []Entry{ + { + Level: "info", + Message: "test message 1", + Time: testTime, + Fields: map[string]string{ + "key1": "value1", + }, + }, + { + Level: "error", + Message: "test message 2", + Time: testTime.Add(time.Second), + Fields: map[string]string{ + "key2": "value2", + }, + }, + } + + // Execute + err = client.IngestLogs(context.Background(), entries) + require.NoError(t, err) + + // Verify request method and URL + require.NotNil(t, receivedRequest, "server did not receive request") + require.Equal(t, http.MethodPost, receivedRequest.Method, "wrong HTTP method") + require.Equal(t, "/v1/clusters/cluster-123/components/test-component/logs", receivedRequest.URL.Path, "wrong URL path") + + // Verify headers + require.Equal(t, "test-api-key", receivedRequest.Header.Get("X-API-Key"), "API key header missing or incorrect") + require.Equal(t, "application/json", receivedRequest.Header.Get("Content-Type"), "Content-Type header missing or incorrect") + require.Equal(t, "gzip", receivedRequest.Header.Get("Content-Encoding"), "Content-Encoding header should be gzip") + + // Decompress gzip body + gzipReader, err := gzip.NewReader(bytes.NewReader(receivedBody)) + require.NoError(t, err, "failed to create gzip reader") + defer gzipReader.Close() + + decompressedBody, err := io.ReadAll(gzipReader) + require.NoError(t, err, "failed to decompress body") + + // Verify body + var receivedPayload IngestLogsRequest + err = json.Unmarshal(decompressedBody, &receivedPayload) + require.NoError(t, err, "failed to parse request body") + + require.Equal(t, "v1.0.0", receivedPayload.Version, "wrong version in payload") + require.Len(t, receivedPayload.Entries, 2, "wrong number of entries") + + // Verify first entry + require.Equal(t, "info", receivedPayload.Entries[0].Level) + require.Equal(t, "test message 1", receivedPayload.Entries[0].Message) + require.Equal(t, testTime, receivedPayload.Entries[0].Time) + require.Equal(t, "value1", receivedPayload.Entries[0].Fields["key1"]) + + // Verify second entry + require.Equal(t, "error", receivedPayload.Entries[1].Level) + require.Equal(t, "test message 2", receivedPayload.Entries[1].Message) + require.Equal(t, testTime.Add(time.Second), receivedPayload.Entries[1].Time) + require.Equal(t, "value2", receivedPayload.Entries[1].Fields["key2"]) + }) + + t.Run("should handle server error responses", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("invalid request")) + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + }) + require.NoError(t, err) + + entries := []Entry{ + { + Level: "info", + Message: "test message", + Time: time.Now(), + }, + } + + err = client.IngestLogs(context.Background(), entries) + require.Error(t, err) + require.Contains(t, err.Error(), "ingest logs failed") + require.Contains(t, err.Error(), "400") + }) + + t.Run("should respect context cancellation", func(t *testing.T) { + // Create a server that delays response + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + }) + require.NoError(t, err) + + // Create a context that's already cancelled + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + entries := []Entry{ + { + Level: "info", + Message: "test message", + Time: time.Now(), + }, + } + + err = client.IngestLogs(ctx, entries) + require.Error(t, err) + }) + + t.Run("should handle empty entries", func(t *testing.T) { + var receivedBody []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + receivedBody, err = io.ReadAll(r.Body) + require.NoError(t, err) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + }) + require.NoError(t, err) + + err = client.IngestLogs(context.Background(), []Entry{}) + require.NoError(t, err) + + // Decompress gzip body + gzipReader, err := gzip.NewReader(bytes.NewReader(receivedBody)) + require.NoError(t, err) + defer gzipReader.Close() + + decompressedBody, err := io.ReadAll(gzipReader) + require.NoError(t, err) + + var receivedPayload IngestLogsRequest + err = json.Unmarshal(decompressedBody, &receivedPayload) + require.NoError(t, err) + require.Empty(t, receivedPayload.Entries) + }) + + t.Run("should retry on server errors (5xx)", func(t *testing.T) { + attemptCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attemptCount++ + if attemptCount < 3 { + // First 2 attempts fail with 500 + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("server error")) + } else { + // Third attempt succeeds + w.WriteHeader(http.StatusOK) + } + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + MaxRetries: 3, // Allow up to 3 retries + }) + require.NoError(t, err) + + entries := []Entry{{Level: "info", Message: "test", Time: time.Now()}} + err = client.IngestLogs(context.Background(), entries) + require.NoError(t, err) + require.Equal(t, 3, attemptCount, "should have retried 2 times before succeeding on 3rd attempt") + }) + + t.Run("should not retry on client errors (4xx)", func(t *testing.T) { + attemptCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attemptCount++ + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("bad request")) + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + MaxRetries: 3, + }) + require.NoError(t, err) + + entries := []Entry{{Level: "info", Message: "test", Time: time.Now()}} + err = client.IngestLogs(context.Background(), entries) + require.Error(t, err) + require.Equal(t, 1, attemptCount, "should not retry on 4xx errors") + require.Contains(t, err.Error(), "400") + }) + + t.Run("should respect max retries", func(t *testing.T) { + attemptCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attemptCount++ + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("server error")) + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + MaxRetries: 2, + }) + require.NoError(t, err) + + entries := []Entry{{Level: "info", Message: "test", Time: time.Now()}} + err = client.IngestLogs(context.Background(), entries) + require.Error(t, err) + require.Equal(t, 3, attemptCount, "should try initial + 2 retries = 3 attempts") + require.Contains(t, err.Error(), "500") + }) + + t.Run("should work with disabled retries", func(t *testing.T) { + attemptCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attemptCount++ + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + MaxRetries: -1, // No retries + }) + require.NoError(t, err) + + entries := []Entry{{Level: "info", Message: "test", Time: time.Now()}} + err = client.IngestLogs(context.Background(), entries) + require.Error(t, err) + require.Equal(t, 1, attemptCount, "should only try once with 0 retries") + }) + + t.Run("should compress payload with gzip", func(t *testing.T) { + var compressedSize int + var decompressedSize int + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + compressedBody, err := io.ReadAll(r.Body) + require.NoError(t, err) + compressedSize = len(compressedBody) + + // Decompress to get original size + gzipReader, err := gzip.NewReader(bytes.NewReader(compressedBody)) + require.NoError(t, err) + defer gzipReader.Close() + + decompressedBody, err := io.ReadAll(gzipReader) + require.NoError(t, err) + decompressedSize = len(decompressedBody) + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client, err := NewAPIClient(Config{ + APIBaseURL: server.URL, + APIKey: "test-api-key", + ClusterID: "cluster-123", + Component: "test-component", + Version: "v1.0.0", + }) + require.NoError(t, err) + + // Create many entries to make compression effective + entries := make([]Entry, 100) + for i := 0; i < 100; i++ { + entries[i] = Entry{ + Level: "info", + Message: "This is a test message that should compress well due to repetition", + Time: time.Now(), + Fields: map[string]string{ + "field1": "value1", + "field2": "value2", + "field3": "value3", + }, + } + } + + err = client.IngestLogs(context.Background(), entries) + require.NoError(t, err) + + // Verify compression is working (compressed should be significantly smaller) + require.Greater(t, decompressedSize, compressedSize, "decompressed size should be larger than compressed size") + compressionRatio := float64(decompressedSize) / float64(compressedSize) + require.Greater(t, compressionRatio, 2.0, "compression ratio should be at least 2x for repetitive data") + }) +} diff --git a/export_handler.go b/export_handler.go index 67619e1..ecf19f6 100644 --- a/export_handler.go +++ b/export_handler.go @@ -2,32 +2,42 @@ package logging import ( "context" + "errors" + "fmt" "log/slog" + "slices" + + "github.com/castai/logging/components" ) -type ExportHandlerConfig struct { - MinLevel slog.Level // Only export logs for this min log level. - BufferSize int // Logs channel size. +var _ Handler = new(ExportHandler) + +var DefaultExportHandlerConfig = ExportHandlerConfig{ + MinLevel: slog.LevelInfo, } -func NewExportHandler(cfg ExportHandlerConfig) *ExportHandler { - if cfg.BufferSize == 0 { - cfg.BufferSize = 1000 - } +type ExportHandlerConfig struct { + MinLevel slog.Level // Only export logs for this min log level. +} +func NewExportHandler(apiClient components.APIClient, cfg ExportHandlerConfig) *ExportHandler { handler := &ExportHandler{ - cfg: cfg, - ch: make(chan slog.Record, cfg.BufferSize), + apiClient: apiClient, + cfg: cfg, + attrs: []slog.Attr{}, + groups: []string{}, } return handler } -// ExportHandler exports logs to separate channel available via Records() +// ExportHandler export logs to remote. type ExportHandler struct { - next slog.Handler - cfg ExportHandlerConfig + cfg ExportHandlerConfig + next slog.Handler + apiClient components.APIClient - ch chan slog.Record + attrs []slog.Attr + groups []string } func (h *ExportHandler) Register(next slog.Handler) slog.Handler { @@ -35,10 +45,6 @@ func (h *ExportHandler) Register(next slog.Handler) slog.Handler { return h } -func (h *ExportHandler) Records() <-chan slog.Record { - return h.ch -} - func (h *ExportHandler) Enabled(ctx context.Context, level slog.Level) bool { if h.next == nil { return true @@ -47,24 +53,48 @@ func (h *ExportHandler) Enabled(ctx context.Context, level slog.Level) bool { } func (h *ExportHandler) Handle(ctx context.Context, record slog.Record) error { + var err error if record.Level >= h.cfg.MinLevel { - select { - case <-ctx.Done(): - return ctx.Err() - case h.ch <- record: - default: - } + err = h.ingestLogs(&record) } if h.next == nil { + return err + } + if handleErr := h.next.Handle(ctx, record); handleErr != nil { + err = errors.Join(err, handleErr) + } + return err +} + +func (h *ExportHandler) ingestLogs(record *slog.Record) error { + if len(record.Message) == 0 { return nil } - return h.next.Handle(ctx, record) + fieldsM := make(map[string]string) + + for _, attr := range h.attrs { + addAttrToMap(fieldsM, attr, h.groups) + } + + record.Attrs(func(attr slog.Attr) bool { + addAttrToMap(fieldsM, attr, h.groups) + return true + }) + + return h.apiClient.IngestLogs(context.Background(), []components.Entry{{ + Level: mapSlogLevel(record.Level), + Message: record.Message, + Time: record.Time, + Fields: fieldsM, + }}) } func (h *ExportHandler) WithAttrs(attrs []slog.Attr) slog.Handler { clone := &ExportHandler{ - cfg: h.cfg, - ch: h.ch, + cfg: h.cfg, + apiClient: h.apiClient, + attrs: slices.Clone(slices.Concat(h.attrs, attrs)), + groups: slices.Clone(h.groups), } if h.next != nil { clone.next = h.next.WithAttrs(attrs) @@ -74,11 +104,72 @@ func (h *ExportHandler) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *ExportHandler) WithGroup(name string) slog.Handler { clone := &ExportHandler{ - cfg: h.cfg, - ch: h.ch, + cfg: h.cfg, + apiClient: h.apiClient, + attrs: slices.Clone(h.attrs), + groups: append(slices.Clone(h.groups), name), } if h.next != nil { clone.next = h.next.WithGroup(name) } return clone } + +func addAttrToMap(m map[string]string, attr slog.Attr, groups []string) { + key := attr.Key + if len(groups) > 0 { + prefix := "" + for _, g := range groups { + prefix += g + "." + } + key = prefix + key + } + + // Handle different value types + val := attr.Value + switch val.Kind() { + case slog.KindString: + m[key] = val.String() + case slog.KindInt64: + m[key] = fmt.Sprintf("%d", val.Int64()) + case slog.KindUint64: + m[key] = fmt.Sprintf("%d", val.Uint64()) + case slog.KindFloat64: + m[key] = fmt.Sprintf("%f", val.Float64()) + case slog.KindBool: + m[key] = fmt.Sprintf("%t", val.Bool()) + case slog.KindTime: + m[key] = val.Time().Format("2006-01-02T15:04:05.000Z07:00") + case slog.KindDuration: + m[key] = val.Duration().String() + case slog.KindAny: + // Handle error type specially + if err, ok := val.Any().(error); ok { + m[key] = err.Error() + } else { + m[key] = fmt.Sprintf("%v", val.Any()) + } + case slog.KindGroup: + // Flatten groups into the map + for _, groupAttr := range val.Group() { + addAttrToMap(m, groupAttr, append(groups, key)) + } + default: + m[key] = fmt.Sprintf("%v", val.Any()) + } +} + +func mapSlogLevel(level slog.Level) string { + switch { + case level >= slog.LevelError: + return string(components.LogLevelError) + case level >= slog.LevelWarn: + return string(components.LogLevelWarning) + case level >= slog.LevelInfo: + return string(components.LogLevelInfo) + case level >= slog.LevelDebug: + return string(components.LogLevelDebug) + default: + return string(components.LogLevelUnknown) + } +} diff --git a/export_handler_test.go b/export_handler_test.go index 2c494bb..5ab04d0 100644 --- a/export_handler_test.go +++ b/export_handler_test.go @@ -1,34 +1,59 @@ package logging_test import ( - "log/slog" + "context" + "github.com/stretchr/testify/require" "testing" "github.com/castai/logging" - "github.com/stretchr/testify/require" + "github.com/castai/logging/components" ) func TestExportHandler(t *testing.T) { r := require.New(t) - exportHandler := logging.NewExportHandler(logging.ExportHandlerConfig{ - MinLevel: slog.LevelWarn, - BufferSize: 2, - }) + client := &apiClient{} + exportHandler := logging.NewExportHandler(client, logging.DefaultExportHandlerConfig) log := logging.New(exportHandler) - log.Debug("msg1") - log.Info("msg2") - log.Warn("msg3") - log.WithField("k", "v").Error("msg4") - log.WithGroup("g").Error("msg5") + log.Info("msg1") + log.Warn("msg2") + log.WithField("k", "v").Error("msg3") + groupLogger := log.WithGroup("g") + groupLogger.WithField("k2", "v2").Error("msg4") + log.Debug("msg5 should not send") + + r.Len(client.logs, 4) + log1 := client.logs[0] + r.Equal("msg1", log1.Message) + r.Equal("LOG_LEVEL_INFO", log1.Level) + r.Empty(log1.Fields) + r.NotEmpty(log1.Time) + + log2 := client.logs[1] + r.Equal("msg2", log2.Message) + r.Equal("LOG_LEVEL_WARNING", log2.Level) + r.Empty(log2.Fields) + r.NotEmpty(log2.Time) - // Only warn and error should be inside the export channel. - msg1 := <-exportHandler.Records() - r.Equal("msg3", msg1.Message) - msg2 := <-exportHandler.Records() - r.Equal("msg4", msg2.Message) + log3 := client.logs[2] + r.Equal("msg3", log3.Message) + r.Equal("LOG_LEVEL_ERROR", log3.Level) + r.Equal(map[string]string{"k": "v"}, log3.Fields) + r.NotEmpty(log3.Time) + + log4 := client.logs[3] + r.Equal("msg4", log4.Message) + r.Equal("LOG_LEVEL_ERROR", log4.Level) + r.Equal(map[string]string{"g.k2": "v2"}, log4.Fields) + r.NotEmpty(log4.Time) +} + +type apiClient struct { + logs []components.Entry +} - // Ensure logs are not blocked. - log.Debug("msg5") +func (a *apiClient) IngestLogs(ctx context.Context, entries []components.Entry) error { + a.logs = append(a.logs, entries...) + return nil } diff --git a/go.mod b/go.mod index 018d0f7..a5f02c0 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/castai/logging -go 1.23.2 +go 1.24.0 require ( github.com/stretchr/testify v1.9.0 @@ -10,5 +10,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + golang.org/x/sync v0.19.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 724cb68..97687f5 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/logging.go b/logging.go index ed975c9..923fbb2 100644 --- a/logging.go +++ b/logging.go @@ -112,7 +112,10 @@ func (l *Logger) doLog(lvl slog.Level, msg string, args ...any) { var pcs [1]uintptr runtime.Callers(3, pcs[:]) if len(args) > 0 { - r := slog.NewRecord(time.Now(), lvl, fmt.Sprintf(msg, args...), pcs[0]) + // Workaround to ignore go vet, see https://github.com/golang/go/issues/60529 + var format = fmt.Sprintf + formatted := format(msg, args...) + r := slog.NewRecord(time.Now(), lvl, formatted, pcs[0]) _ = l.Log.Handler().Handle(ctx, r) //nolint:contextcheck } else { r := slog.NewRecord(time.Now(), lvl, msg, pcs[0]) diff --git a/logging_test.go b/logging_test.go index 7076dc4..e40fcce 100644 --- a/logging_test.go +++ b/logging_test.go @@ -4,49 +4,56 @@ import ( "bytes" "context" "errors" - "fmt" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "io" "log/slog" "os" + "os/signal" "testing" - "time" "github.com/castai/logging" - "github.com/stretchr/testify/require" - "golang.org/x/time/rate" + "github.com/castai/logging/components" ) -func TestExampleLogger(t *testing.T) { - text := logging.NewTextHandler(logging.TextHandlerConfig{ - Level: logging.MustParseLevel("INFO"), - Output: os.Stdout, - AddSource: false, - }) - rt := logging.NewRateLimitHandler(logging.RateLimiterHandlerConfig{ - Limit: rate.Limit(100 * time.Millisecond), - Burst: 100, +func ExampleLogger() { + ingestClient, err := components.NewAPIClient(components.Config{ + APIBaseURL: "https://api.cast.ai", + APIKey: "", + ClusterID: "", + Component: "castware", + Version: "", }) - export := logging.NewExportHandler(logging.ExportHandlerConfig{ - MinLevel: logging.MustParseLevel("WARN"), - BufferSize: 1000, - }) - log := logging.New(text, export, rt) + if err != nil { + // Handle err ... + return + } + + var errg errgroup.Group - // Print dropped logs due rate limit. - go logging.PrintDroppedLogs(context.Background(), 5*time.Second, rt, func(level slog.Level, count uint64) { - fmt.Println("dropped lines", level, count) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + ingestClientBatchClient := components.NewBatchClient(ingestClient) + errg.Go(func() error { + return ingestClientBatchClient.Run(ctx) }) - // Export logs to your destination. - go func() { - select { - case log := <-export.Records(): - fmt.Println(log) - } - }() + errg.Go(func() error { + text := logging.NewTextHandler(logging.DefaultTextHandlerConfig) + export := logging.NewExportHandler(ingestClientBatchClient, logging.DefaultExportHandlerConfig) + log := logging.New(text, export) + + // Log logs + log.Infof("debug message with format value %s", "hello") + log.WithField("component", "agent").Errorf("something failed: %v", "unknown") + + return nil + }) - log.Infof("debug message with format value %s", "hello") - log.WithField("component", "agent").Errorf("something failed: %v", "unknown") + if err := errg.Wait(); err != nil { + // Hanlde err. + } } func TestLogger(t *testing.T) { diff --git a/ratelimit_handler.go b/ratelimit_handler.go index f0f0e58..f874d69 100644 --- a/ratelimit_handler.go +++ b/ratelimit_handler.go @@ -9,6 +9,13 @@ import ( "golang.org/x/time/rate" ) +var DefaultRateLimitHandlerConfig = RateLimiterHandlerConfig{ + Limit: 100, + Burst: 100, +} + +var _ Handler = new(RateLimitHandler) + type RateLimiterHandlerConfig struct { Limit rate.Limit Burst int diff --git a/text_handler.go b/text_handler.go index c2774d6..1799e37 100644 --- a/text_handler.go +++ b/text_handler.go @@ -7,6 +7,12 @@ import ( "path/filepath" ) +var DefaultTextHandlerConfig = TextHandlerConfig{ + Level: MustParseLevel("INFO"), + Output: os.Stdout, + AddSource: true, +} + type TextHandlerConfig struct { Level slog.Level Output io.Writer