From 4dbeffbe0ce11983ef4c8f245d4c05e13c02dd75 Mon Sep 17 00:00:00 2001 From: thebondo Date: Fri, 16 May 2025 15:22:49 +0000 Subject: [PATCH 1/9] update the client to retry the connection If the original HTTP get for the tail endpoint is successful, but then the connection is lost, no retry is done. I updated the Tail method to also retry in this case. --- .../internal/vlclient/vl_client.go | 76 ++++++++++++++----- 1 file changed, 57 insertions(+), 19 deletions(-) diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index b091b25c163..6d7aad9feb2 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -286,9 +286,8 @@ func (lc *VLClient) Ready(ctx context.Context) error { } } -// Tail live-tailing for logs -// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing -func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { +func (lc *VLClient) getTailResponse(ctx context.Context) (*http.Response, error) { + t := time.Now().Add(-1 * lc.config.Since) u := lc.getURLFor("select/logsql/tail", map[string]string{ "limit": strconv.Itoa(lc.config.Limit), @@ -304,10 +303,37 @@ func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { err error ) - for { - resp, err = lc.Get(ctx, u) - lc.Logger.Tracef("Tail request done: %v | %s", resp, err) + resp, err = lc.Get(ctx, u) + lc.Logger.Tracef("Tail request done: %v | %s", resp, err) + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + lc.Logger.Warnf("bad HTTP response code for tail request: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if ok := lc.shouldRetry(); !ok { + return nil, fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err) + } + } + + return resp, nil +} + +func (lc *VLClient) getTailResponseWithRetry(ctx context.Context) (*http.Response, error) { + + retryInterval := 2*time.Second + + var ( + resp *http.Response + err error + ) + + for { + resp, err = lc.getTailResponse(ctx) if err != nil { if errors.Is(err, context.Canceled) { return nil, nil @@ -317,30 +343,42 @@ func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { return nil, fmt.Errorf("error tailing logs: %w", err) } + // Wait a little bit to be nice + <- time.After(retryInterval) continue } - break + // we got a successful response, reset the retry state + lc.failStart = time.Time{} + return resp, nil } +} - if resp.StatusCode != http.StatusOK { - lc.Logger.Warnf("bad HTTP response code for tail request: %d", resp.StatusCode) - body, _ := io.ReadAll(resp.Body) - resp.Body.Close() +// Tail live-tailing for logs +// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing +func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { - if ok := lc.shouldRetry(); !ok { - return nil, fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err) - } + resp, err := lc.getTailResponseWithRetry(ctx) + if resp == nil { + return nil, err } responseChan := make(chan *Log) - lc.t.Go(func() error { - _, _, err = lc.readResponse(ctx, resp, responseChan) - if err != nil { - return fmt.Errorf("error while reading tail response: %w", err) - } + for { + _, _, err = lc.readResponse(ctx, resp, responseChan) + if err != nil { + lc.Logger.Warnf("error while reading tail response: %w", err) + } else if ctx.Err() != nil { + return nil + } + + resp, err = lc.getTailResponseWithRetry(ctx) + if resp == nil { + return err + } + } return nil }) From 106e0db7362bb74f2715955d3e100b066f4a7c57 Mon Sep 17 00:00:00 2001 From: thebondo Date: Fri, 16 May 2025 19:16:16 +0000 Subject: [PATCH 2/9] update fixes to be more consistent with the rest of the code Upon review, the added code was pretty different in the approach used to keep retrying compared to the approach for the QueryRange method. I updated the method to create a new doTail that has the same style as doQueryRange and updated Tail to use it. This has the following effects: - doTail will keep trying after losing a connection - the retry interval will grow (with an upper limit) and shrink (with a lower limit) as connections are made and broken - the time in the request is updated to avoid overlapping with previous data that was returned (missing in the first fix) --- .../internal/vlclient/vl_client.go | 124 +++++++----------- 1 file changed, 46 insertions(+), 78 deletions(-) diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index 6d7aad9feb2..8e1f9a2897c 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -286,103 +286,71 @@ func (lc *VLClient) Ready(ctx context.Context) error { } } -func (lc *VLClient) getTailResponse(ctx context.Context) (*http.Response, error) { - - t := time.Now().Add(-1 * lc.config.Since) - u := lc.getURLFor("select/logsql/tail", map[string]string{ - "limit": strconv.Itoa(lc.config.Limit), - "start": t.Format(time.RFC3339Nano), - "query": lc.config.Query, - }) - - lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t) - lc.Logger.Infof("Connecting to %s", u) - - var ( - resp *http.Response - err error - ) - - resp, err = lc.Get(ctx, u) - lc.Logger.Tracef("Tail request done: %v | %s", resp, err) - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - lc.Logger.Warnf("bad HTTP response code for tail request: %d", resp.StatusCode) - body, _ := io.ReadAll(resp.Body) - resp.Body.Close() - - if ok := lc.shouldRetry(); !ok { - return nil, fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err) - } - } - - return resp, nil -} - -func (lc *VLClient) getTailResponseWithRetry(ctx context.Context) (*http.Response, error) { - - retryInterval := 2*time.Second +func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { + lc.currentTickerInterval = 100 * time.Millisecond + ticker := time.NewTicker(lc.currentTickerInterval) - var ( - resp *http.Response - err error - ) + defer ticker.Stop() for { - resp, err = lc.getTailResponse(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return nil, nil + select { + case <-ctx.Done(): + return ctx.Err() + case <-lc.t.Dying(): + return lc.t.Err() + case <-ticker.C: + // Attempt the HTTP request + resp, err := lc.Get(ctx, uri) + if err != nil { + lc.Logger.Warnf("error tailing logs: %w", err) + lc.increaseTicker(ticker) + continue } - if ok := lc.shouldRetry(); !ok { - return nil, fmt.Errorf("error tailing logs: %w", err) + // Verify the HTTP response code + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + lc.Logger.Warnf("bad HTTP response code for tail request: %d: %s: %w", resp.StatusCode, body, err) + lc.increaseTicker(ticker) + continue } - // Wait a little bit to be nice - <- time.After(retryInterval) - continue + n, largestTime, err := lc.readResponse(ctx, resp, c) + if err != nil { + lc.Logger.Warnf("error while reading tail response: %w", err) + lc.increaseTicker(ticker) + } else if n > 0 { + // as long as we get results, we keep lowest ticker + lc.decreaseTicker(ticker) + uri = updateURI(uri, largestTime) + } else { + lc.increaseTicker(ticker) + } } - - // we got a successful response, reset the retry state - lc.failStart = time.Time{} - return resp, nil } } // Tail live-tailing for logs // See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { + t := time.Now().Add(-1 * lc.config.Since) + u := lc.getURLFor("select/logsql/tail", map[string]string{ + "limit": strconv.Itoa(lc.config.Limit), + "start": t.Format(time.RFC3339Nano), + "query": lc.config.Query, + }) - resp, err := lc.getTailResponseWithRetry(ctx) - if resp == nil { - return nil, err - } - - responseChan := make(chan *Log) - lc.t.Go(func() error { + c := make(chan *Log) - for { - _, _, err = lc.readResponse(ctx, resp, responseChan) - if err != nil { - lc.Logger.Warnf("error while reading tail response: %w", err) - } else if ctx.Err() != nil { - return nil - } + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t) - resp, err = lc.getTailResponseWithRetry(ctx) - if resp == nil { - return err - } - } - return nil + lc.Logger.Infof("Connecting to %s", u) + lc.t.Go(func() error { + return lc.doTail(ctx, u, c) }) - return responseChan, nil + return c, nil } // QueryRange queries the logs From b1ff10775db016694f7c19beb4070aa8bd07181d Mon Sep 17 00:00:00 2001 From: thebondo Date: Fri, 30 May 2025 13:28:00 +0000 Subject: [PATCH 3/9] Updated tail method The use of the ticker was unnecessary. I updated doTail to use a backoff interval with time.After. --- .../internal/vlclient/vl_client.go | 69 +++++++++++-------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index 8e1f9a2897c..e600f8207d1 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -287,47 +287,56 @@ func (lc *VLClient) Ready(ctx context.Context) error { } func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { - lc.currentTickerInterval = 100 * time.Millisecond - ticker := time.NewTicker(lc.currentTickerInterval) - defer ticker.Stop() + minBackoff := 100 * time.Millisecond + maxBackoff := 10 * time.Second + backoffInterval := minBackoff for { + // Wait for the backoff interval, respect context ending as well + // Putting this first keeps the logic simpler + if backoffInterval > maxBackoff { + backoffInterval = maxBackoff + } select { case <-ctx.Done(): return ctx.Err() case <-lc.t.Dying(): return lc.t.Err() - case <-ticker.C: - // Attempt the HTTP request - resp, err := lc.Get(ctx, uri) - if err != nil { - lc.Logger.Warnf("error tailing logs: %w", err) - lc.increaseTicker(ticker) - continue - } + case <-time.After(backoffInterval): + // now we can make the next request + } - // Verify the HTTP response code - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - resp.Body.Close() - lc.Logger.Warnf("bad HTTP response code for tail request: %d: %s: %w", resp.StatusCode, body, err) - lc.increaseTicker(ticker) - continue - } + // Make the HTTP request + resp, err := lc.Get(ctx, uri) + if err != nil { + lc.Logger.Warnf("error tailing logs: %w", err) + backoffInterval *= 2 + continue + } - n, largestTime, err := lc.readResponse(ctx, resp, c) - if err != nil { - lc.Logger.Warnf("error while reading tail response: %w", err) - lc.increaseTicker(ticker) - } else if n > 0 { - // as long as we get results, we keep lowest ticker - lc.decreaseTicker(ticker) - uri = updateURI(uri, largestTime) - } else { - lc.increaseTicker(ticker) - } + // Verify the HTTP response code + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + lc.Logger.Warnf("bad HTTP response code for tail request: %d: %s: %w", resp.StatusCode, body, err) + backoffInterval *= 2 + continue + } + + // Read all the responses + n, largestTime, err := lc.readResponse(ctx, resp, c) + if err != nil { + lc.Logger.Warnf("error while reading tail response: %w", err) + backoffInterval *= 2 + } else if n > 0 { + // as long as we get results, reset the backoff interval + backoffInterval = minBackoff + uri = updateURI(uri, largestTime) + } else { + backoffInterval *= 2 } + } } From 9e14e2f5720c0ade0a4573318fd5457836073389 Mon Sep 17 00:00:00 2001 From: thebondo Date: Thu, 17 Jul 2025 07:01:24 +0000 Subject: [PATCH 4/9] Fixed tail query to use the correct parameters. The tail query endpoint does not support start, but start_offset. I updated the doTail method to use this parameter, and calculate the required value each time the query is attempted based on the desired start time for the results returned from the query. --- .../internal/vlclient/vl_client.go | 36 +++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index 72aad54cecb..f9ec96e7094 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -64,6 +64,28 @@ func updateURI(uri string, newStart time.Time) string { return u.String() } +// newStart should be the desired start time for the tail query +// start_offset is computed from this +func updateTailURI(uri string, newStart time.Time) string { + u, _ := url.Parse(uri) + queryParams := u.Query() + + queryTime := time.Now() + startOffset := 5000 + gracePeriod := 50*time.Millisecond + + if !newStart.IsZero() { + if newStart.Before(queryTime) { + startOffset = int((queryTime.Sub(newStart) + gracePeriod).Milliseconds()) + } + } else { + } + queryParams.Set("start_offset", strconv.Itoa(startOffset)) + + u.RawQuery = queryParams.Encode() + + return u.String() +} func (lc *VLClient) SetTomb(t *tomb.Tomb) { lc.t = t } @@ -289,9 +311,11 @@ func (lc *VLClient) Ready(ctx context.Context) error { func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { + // These control how the timing of requests when the active connection is lost minBackoff := 100 * time.Millisecond maxBackoff := 10 * time.Second backoffInterval := minBackoff + queryStart := time.Now() for { // Wait for the backoff interval, respect context ending as well @@ -308,6 +332,10 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { // now we can make the next request } + // update start_offset in the request + // This needs to be done just before the query is made, since + // the desired offset depends on the query time. + uri = updateTailURI(uri, queryStart) // Make the HTTP request resp, err := lc.Get(ctx, uri) if err != nil { @@ -333,7 +361,10 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { } else if n > 0 { // as long as we get results, reset the backoff interval backoffInterval = minBackoff - uri = updateURI(uri, largestTime) + // update the queryStart time if the latest result was later + if largestTime.After(queryStart) { + queryStart = largestTime + } } else { backoffInterval *= 2 } @@ -346,8 +377,7 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { t := time.Now().Add(-1 * lc.config.Since) u := lc.getURLFor("select/logsql/tail", map[string]string{ - "limit": strconv.Itoa(lc.config.Limit), - "start": t.Format(time.RFC3339Nano), + "start_offset": "0", "query": lc.config.Query, }) From 1bb425cae852691f218ec83b7a7b02b145f7b6c7 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Thu, 25 Sep 2025 19:55:58 +0400 Subject: [PATCH 5/9] pkg/acquisition/modules/victorialogs: rework handling of backoff and "since" paramater Simplify code of backoff handling in order to reduce duplication. Also do not sleep when making the first request in order to avoid artificial delay for startup. While at it, implement proper handling of "since" parameter. Previously, it was reset to 0 and ignored in "tail" mode since VL API did not support tailing results from the past. Implemented full support to use "since" value when performing initial tailing and keeping track of last seen log item in order to not miss log lines when retrying the request. --- .../internal/vlclient/vl_client.go | 81 ++++++++----------- .../modules/victorialogs/victorialogs.go | 7 +- 2 files changed, 34 insertions(+), 54 deletions(-) diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index f9ec96e7094..8be6608d126 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "io" + "maps" "net/http" "net/url" "strconv" @@ -18,7 +19,6 @@ import ( "gopkg.in/tomb.v2" "github.com/crowdsecurity/crowdsec/pkg/apiclient/useragent" - "maps" ) type VLClient struct { @@ -64,28 +64,6 @@ func updateURI(uri string, newStart time.Time) string { return u.String() } -// newStart should be the desired start time for the tail query -// start_offset is computed from this -func updateTailURI(uri string, newStart time.Time) string { - u, _ := url.Parse(uri) - queryParams := u.Query() - - queryTime := time.Now() - startOffset := 5000 - gracePeriod := 50*time.Millisecond - - if !newStart.IsZero() { - if newStart.Before(queryTime) { - startOffset = int((queryTime.Sub(newStart) + gracePeriod).Milliseconds()) - } - } else { - } - queryParams.Set("start_offset", strconv.Itoa(startOffset)) - - u.RawQuery = queryParams.Encode() - - return u.String() -} func (lc *VLClient) SetTomb(t *tomb.Tomb) { lc.t = t } @@ -310,37 +288,48 @@ func (lc *VLClient) Ready(ctx context.Context) error { } func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { - // These control how the timing of requests when the active connection is lost minBackoff := 100 * time.Millisecond maxBackoff := 10 * time.Second backoffInterval := minBackoff - queryStart := time.Now() + vlURL, _ := url.Parse(uri) + lastDatapoint := time.Now().Add(-1 * lc.config.Since) + firstHandled := false for { - // Wait for the backoff interval, respect context ending as well - // Putting this first keeps the logic simpler - if backoffInterval > maxBackoff { - backoffInterval = maxBackoff - } select { case <-ctx.Done(): return ctx.Err() case <-lc.t.Dying(): return lc.t.Err() - case <-time.After(backoffInterval): - // now we can make the next request + default: + if firstHandled { + lc.Logger.Debugf("sleeping for %s before retry", backoffInterval) + time.Sleep(backoffInterval) + } + } + firstHandled = true + + // callback to increase backoff interval on error + backoffError := func() { + backoffInterval *= 2 + if backoffInterval > maxBackoff { + backoffInterval = maxBackoff + } } - // update start_offset in the request - // This needs to be done just before the query is made, since - // the desired offset depends on the query time. - uri = updateTailURI(uri, queryStart) - // Make the HTTP request - resp, err := lc.Get(ctx, uri) + q := vlURL.Query() + offset := lastDatapoint.Sub(time.Now()).Abs() + if offset > time.Millisecond { + // do not use offset less than a millisecond, as VL does not support it + q.Set("start_offset", offset.String()) + } + vlURL.RawQuery = q.Encode() + + resp, err := lc.Get(ctx, vlURL.String()) if err != nil { lc.Logger.Warnf("error tailing logs: %w", err) - backoffInterval *= 2 + backoffError() continue } @@ -348,8 +337,8 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) resp.Body.Close() - lc.Logger.Warnf("bad HTTP response code for tail request: %d: %s: %w", resp.StatusCode, body, err) - backoffInterval *= 2 + lc.Logger.Warnf("bad HTTP response code for tail request: %d, expected: %d; response: %s;", resp.StatusCode, http.StatusOK, body) + backoffError() continue } @@ -357,18 +346,15 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { n, largestTime, err := lc.readResponse(ctx, resp, c) if err != nil { lc.Logger.Warnf("error while reading tail response: %w", err) - backoffInterval *= 2 + backoffError() } else if n > 0 { // as long as we get results, reset the backoff interval backoffInterval = minBackoff // update the queryStart time if the latest result was later - if largestTime.After(queryStart) { - queryStart = largestTime + if largestTime.After(lastDatapoint) { + lastDatapoint = largestTime } - } else { - backoffInterval *= 2 } - } } @@ -377,7 +363,6 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { t := time.Now().Add(-1 * lc.config.Since) u := lc.getURLFor("select/logsql/tail", map[string]string{ - "start_offset": "0", "query": lc.config.Query, }) diff --git a/pkg/acquisition/modules/victorialogs/victorialogs.go b/pkg/acquisition/modules/victorialogs/victorialogs.go index c6bb3b320ba..b94e0fae010 100644 --- a/pkg/acquisition/modules/victorialogs/victorialogs.go +++ b/pkg/acquisition/modules/victorialogs/victorialogs.go @@ -94,11 +94,6 @@ func (l *VLSource) UnmarshalConfig(yamlConfig []byte) error { l.Config.Limit = defaultLimit } - if l.Config.Mode == configuration.TAIL_MODE { - l.logger.Infof("Resetting since") - l.Config.Since = 0 - } - if l.Config.MaxFailureDuration == 0 { l.Config.MaxFailureDuration = 30 * time.Second } @@ -300,7 +295,7 @@ func (l *VLSource) StreamingAcquisition(ctx context.Context, out chan types.Even } lctx, clientCancel := context.WithCancel(ctx) - //Don't defer clientCancel(), the client outlives this function call + // Don't defer clientCancel(), the client outlives this function call t.Go(func() error { <-t.Dying() From 6b7e9f2ecf62c3d6d1dd8f6a49d02d842defe663 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Sat, 27 Sep 2025 20:17:08 +0400 Subject: [PATCH 6/9] pkg/acquisition/modules/victorialogs: do not use %w for error logging --- .../modules/victorialogs/internal/vlclient/vl_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index 8be6608d126..211f75e4bdd 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -328,7 +328,7 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { resp, err := lc.Get(ctx, vlURL.String()) if err != nil { - lc.Logger.Warnf("error tailing logs: %w", err) + lc.Logger.Warnf("error tailing logs: %s", err) backoffError() continue } @@ -345,7 +345,7 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { // Read all the responses n, largestTime, err := lc.readResponse(ctx, resp, c) if err != nil { - lc.Logger.Warnf("error while reading tail response: %w", err) + lc.Logger.Warnf("error while reading tail response: %s", err) backoffError() } else if n > 0 { // as long as we get results, reset the backoff interval From b0bebb8f6f38ecd372801b46f2cd086cda9c1c28 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Sat, 27 Sep 2025 20:40:33 +0400 Subject: [PATCH 7/9] ci: update version of VictoriaLogs to the latest --- .github/workflows/go-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go-tests.yml b/.github/workflows/go-tests.yml index 9d58601ac93..01ab2691c88 100644 --- a/.github/workflows/go-tests.yml +++ b/.github/workflows/go-tests.yml @@ -114,7 +114,7 @@ jobs: --health-start-period 30s victorialogs: - image: victoriametrics/victoria-logs:v1.5.0-victorialogs + image: victoriametrics/victoria-logs:v1.35.0 ports: - "9428:9428" options: >- From 7c9d6ff3d95fd421e0c4cebd73ec85a3902675a1 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Sat, 27 Sep 2025 20:58:02 +0400 Subject: [PATCH 8/9] pkg/acquisition/modules/victorialogs: fix flacky test - read out channel in order to avoid blocking on client - do not treat context.Cancelled as a fatal error as client propagates it --- pkg/acquisition/modules/victorialogs/victorialogs_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/acquisition/modules/victorialogs/victorialogs_test.go b/pkg/acquisition/modules/victorialogs/victorialogs_test.go index 018f19a71b6..8da141c0d00 100644 --- a/pkg/acquisition/modules/victorialogs/victorialogs_test.go +++ b/pkg/acquisition/modules/victorialogs/victorialogs_test.go @@ -3,6 +3,7 @@ package victorialogs_test import ( "bytes" "context" + "errors" "fmt" "io" "math/rand" @@ -448,6 +449,11 @@ query: > } out := make(chan types.Event, 10) + go func() { + for { + <-out + } + }() vlTomb := &tomb.Tomb{} @@ -466,7 +472,7 @@ query: > vlTomb.Kill(nil) err = vlTomb.Wait() - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("Unexpected error : %s", err) } } From 7e679eefb7e3ccf3eedab8e5312700600036af7d Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Sat, 27 Sep 2025 21:09:32 +0400 Subject: [PATCH 9/9] pkg/acquisition/modules/victorialogs: make linter happy --- .../modules/victorialogs/internal/vlclient/vl_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go index 211f75e4bdd..001e137fe31 100644 --- a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -319,7 +319,7 @@ func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error { } q := vlURL.Query() - offset := lastDatapoint.Sub(time.Now()).Abs() + offset := time.Until(lastDatapoint).Abs() if offset > time.Millisecond { // do not use offset less than a millisecond, as VL does not support it q.Set("start_offset", offset.String())