diff --git a/.github/workflows/go-tests.yml b/.github/workflows/go-tests.yml index 925293465c1..61626591674 100644 --- a/.github/workflows/go-tests.yml +++ b/.github/workflows/go-tests.yml @@ -114,7 +114,7 @@ jobs: --health-start-period 30s victorialogs: - image: victoriametrics/victoria-logs:v1.5.0-victorialogs + image: victoriametrics/victoria-logs:v1.35.0 ports: - "9428:9428" options: >- diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index 81df3f0bd1e..55732777c3c 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "io" + "maps" "net/http" "net/url" "strconv" @@ -18,7 +19,6 @@ import ( "gopkg.in/tomb.v2" "github.com/crowdsecurity/crowdsec/pkg/apiclient/useragent" - "maps" ) type VLClient struct { @@ -294,65 +294,95 @@ func (lc *VLClient) Ready(ctx context.Context) error { } } -// Tail live-tailing for logs -// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing -func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { - t := time.Now().Add(-1 * lc.config.Since) - u := lc.getURLFor("select/logsql/tail", map[string]string{ - "limit": strconv.Itoa(lc.config.Limit), - "start": t.Format(time.RFC3339Nano), - "query": lc.config.Query, - }) - - lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t) - lc.Logger.Infof("Connecting to %s", u) - - var ( - resp *http.Response - err error - ) +func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { + // These control how the timing of requests when the active connection is lost + minBackoff := 100 * time.Millisecond + maxBackoff := 10 * time.Second + backoffInterval := minBackoff + vlURL, _ := url.Parse(uri) + lastDatapoint := time.Now().Add(-1 * lc.config.Since) + firstHandled := false for { - resp, err = lc.Get(ctx, u) - lc.Logger.Tracef("Tail request done: %v | %s", resp, err) - - if err != nil { - if errors.Is(err, context.Canceled) { - return nil, nil + select { + case <-ctx.Done(): + return ctx.Err() + case <-lc.t.Dying(): + return lc.t.Err() + default: + if firstHandled { + lc.Logger.Debugf("sleeping for %s before retry", backoffInterval) + time.Sleep(backoffInterval) } + } + firstHandled = true - if ok := lc.shouldRetry(); !ok { - return nil, fmt.Errorf("error tailing logs: %w", err) + // callback to increase backoff interval on error + backoffError := func() { + backoffInterval *= 2 + if backoffInterval > maxBackoff { + backoffInterval = maxBackoff } + } - continue + q := vlURL.Query() + offset := time.Until(lastDatapoint).Abs() + if offset > time.Millisecond { + // do not use offset less than a millisecond, as VL does not support it + q.Set("start_offset", offset.String()) } + vlURL.RawQuery = q.Encode() - break - } + resp, err := lc.Get(ctx, vlURL.String()) + if err != nil { + lc.Logger.Warnf("error tailing logs: %s", err) + backoffError() + continue + } - if resp.StatusCode != http.StatusOK { - lc.Logger.Warnf("bad HTTP response code for tail request: %d", resp.StatusCode) - body, _ := io.ReadAll(resp.Body) - resp.Body.Close() + // Verify the HTTP response code + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + lc.Logger.Warnf("bad HTTP response code for tail request: %d, expected: %d; response: %s;", resp.StatusCode, http.StatusOK, body) + backoffError() + continue + } - if ok := lc.shouldRetry(); !ok { - return nil, fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err) + // Read all the responses + n, largestTime, err := lc.readResponse(ctx, resp, c) + if err != nil { + lc.Logger.Warnf("error while reading tail response: %s", err) + backoffError() + } else if n > 0 { + // as long as we get results, reset the backoff interval + backoffInterval = minBackoff + // update the queryStart time if the latest result was later + if largestTime.After(lastDatapoint) { + lastDatapoint = largestTime + } } } +} + +// Tail live-tailing for logs +// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing +func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { + t := time.Now().Add(-1 * lc.config.Since) + u := lc.getURLFor("select/logsql/tail", map[string]string{ + "query": lc.config.Query, + }) - responseChan := make(chan *Log) + c := make(chan *Log) - lc.t.Go(func() error { - _, _, err = lc.readResponse(ctx, resp, responseChan) - if err != nil { - return fmt.Errorf("error while reading tail response: %w", err) - } + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t) - return nil + lc.Logger.Infof("Connecting to %s", u) + lc.t.Go(func() error { + return lc.doTail(ctx, u, c) }) - return responseChan, nil + return c, nil } // QueryRange queries the logs diff --git a/pkg/acquisition/modules/victorialogs/victorialogs.go b/pkg/acquisition/modules/victorialogs/victorialogs.go index 5fdfc62f821..ab00e47ad4f 100644 --- a/pkg/acquisition/modules/victorialogs/victorialogs.go +++ b/pkg/acquisition/modules/victorialogs/victorialogs.go @@ -92,11 +92,6 @@ func (l *VLSource) UnmarshalConfig(yamlConfig []byte) error { l.Config.Limit = defaultLimit } - if l.Config.Mode == configuration.TAIL_MODE { - l.logger.Infof("Resetting since") - l.Config.Since = 0 - } - if l.Config.MaxFailureDuration == 0 { l.Config.MaxFailureDuration = 30 * time.Second } @@ -298,7 +293,7 @@ func (l *VLSource) StreamingAcquisition(ctx context.Context, out chan types.Even } lctx, clientCancel := context.WithCancel(ctx) - //Don't defer clientCancel(), the client outlives this function call + // Don't defer clientCancel(), the client outlives this function call t.Go(func() error { <-t.Dying() diff --git a/pkg/acquisition/modules/victorialogs/victorialogs_test.go b/pkg/acquisition/modules/victorialogs/victorialogs_test.go index 8d1a90e3ce0..c4df7a68dd1 100644 --- a/pkg/acquisition/modules/victorialogs/victorialogs_test.go +++ b/pkg/acquisition/modules/victorialogs/victorialogs_test.go @@ -3,6 +3,7 @@ package victorialogs_test import ( "bytes" "context" + "errors" "fmt" "io" "math/rand" @@ -461,6 +462,11 @@ query: > } out := make(chan types.Event, 10) + go func() { + for { + <-out + } + }() vlTomb := &tomb.Tomb{} @@ -479,7 +485,7 @@ query: > vlTomb.Kill(nil) err = vlTomb.Wait() - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("Unexpected error : %s", err) } }