Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:
--health-start-period 30s

victorialogs:
image: victoriametrics/victoria-logs:v1.5.0-victorialogs
image: victoriametrics/victoria-logs:v1.35.0
ports:
- "9428:9428"
options: >-
Expand Down
116 changes: 73 additions & 43 deletions pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"maps"
"net/http"
"net/url"
"strconv"
Expand All @@ -18,7 +19,6 @@ import (
"gopkg.in/tomb.v2"

"github.com/crowdsecurity/crowdsec/pkg/apiclient/useragent"
"maps"
)

type VLClient struct {
Expand Down Expand Up @@ -294,65 +294,95 @@ func (lc *VLClient) Ready(ctx context.Context) error {
}
}

// Tail live-tailing for logs
// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing
func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) {
t := time.Now().Add(-1 * lc.config.Since)
u := lc.getURLFor("select/logsql/tail", map[string]string{
"limit": strconv.Itoa(lc.config.Limit),
"start": t.Format(time.RFC3339Nano),
"query": lc.config.Query,
})

lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t)
lc.Logger.Infof("Connecting to %s", u)

var (
resp *http.Response
err error
)
func (lc *VLClient) doTail(ctx context.Context, uri string, c chan *Log) error {
// These control how the timing of requests when the active connection is lost
minBackoff := 100 * time.Millisecond
maxBackoff := 10 * time.Second
backoffInterval := minBackoff
vlURL, _ := url.Parse(uri)
lastDatapoint := time.Now().Add(-1 * lc.config.Since)

firstHandled := false
for {
resp, err = lc.Get(ctx, u)
lc.Logger.Tracef("Tail request done: %v | %s", resp, err)

if err != nil {
if errors.Is(err, context.Canceled) {
return nil, nil
select {
case <-ctx.Done():
return ctx.Err()
case <-lc.t.Dying():
return lc.t.Err()
default:
if firstHandled {
lc.Logger.Debugf("sleeping for %s before retry", backoffInterval)
time.Sleep(backoffInterval)
}
}
firstHandled = true

if ok := lc.shouldRetry(); !ok {
return nil, fmt.Errorf("error tailing logs: %w", err)
// callback to increase backoff interval on error
backoffError := func() {
backoffInterval *= 2
if backoffInterval > maxBackoff {
backoffInterval = maxBackoff
}
}

continue
q := vlURL.Query()
offset := time.Until(lastDatapoint).Abs()
if offset > time.Millisecond {
// do not use offset less than a millisecond, as VL does not support it
q.Set("start_offset", offset.String())
}
vlURL.RawQuery = q.Encode()

break
}
resp, err := lc.Get(ctx, vlURL.String())
if err != nil {
lc.Logger.Warnf("error tailing logs: %s", err)
backoffError()
continue
}

if resp.StatusCode != http.StatusOK {
lc.Logger.Warnf("bad HTTP response code for tail request: %d", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
// Verify the HTTP response code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
lc.Logger.Warnf("bad HTTP response code for tail request: %d, expected: %d; response: %s;", resp.StatusCode, http.StatusOK, body)
backoffError()
continue
}

if ok := lc.shouldRetry(); !ok {
return nil, fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err)
// Read all the responses
n, largestTime, err := lc.readResponse(ctx, resp, c)
if err != nil {
lc.Logger.Warnf("error while reading tail response: %s", err)
backoffError()
} else if n > 0 {
// as long as we get results, reset the backoff interval
backoffInterval = minBackoff
// update the queryStart time if the latest result was later
if largestTime.After(lastDatapoint) {
lastDatapoint = largestTime
}
}
}
}

// Tail live-tailing for logs
// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing
func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) {
t := time.Now().Add(-1 * lc.config.Since)
u := lc.getURLFor("select/logsql/tail", map[string]string{
"query": lc.config.Query,
})

responseChan := make(chan *Log)
c := make(chan *Log)

lc.t.Go(func() error {
_, _, err = lc.readResponse(ctx, resp, responseChan)
if err != nil {
return fmt.Errorf("error while reading tail response: %w", err)
}
lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t)

return nil
lc.Logger.Infof("Connecting to %s", u)
lc.t.Go(func() error {
return lc.doTail(ctx, u, c)
})

return responseChan, nil
return c, nil
}

// QueryRange queries the logs
Expand Down
7 changes: 1 addition & 6 deletions pkg/acquisition/modules/victorialogs/victorialogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ func (l *VLSource) UnmarshalConfig(yamlConfig []byte) error {
l.Config.Limit = defaultLimit
}

if l.Config.Mode == configuration.TAIL_MODE {
l.logger.Infof("Resetting since")
l.Config.Since = 0
}

if l.Config.MaxFailureDuration == 0 {
l.Config.MaxFailureDuration = 30 * time.Second
}
Expand Down Expand Up @@ -298,7 +293,7 @@ func (l *VLSource) StreamingAcquisition(ctx context.Context, out chan types.Even
}

lctx, clientCancel := context.WithCancel(ctx)
//Don't defer clientCancel(), the client outlives this function call
// Don't defer clientCancel(), the client outlives this function call

t.Go(func() error {
<-t.Dying()
Expand Down
8 changes: 7 additions & 1 deletion pkg/acquisition/modules/victorialogs/victorialogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package victorialogs_test
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -461,6 +462,11 @@ query: >
}

out := make(chan types.Event, 10)
go func() {
for {
<-out
}
}()

vlTomb := &tomb.Tomb{}

Expand All @@ -479,7 +485,7 @@ query: >
vlTomb.Kill(nil)

err = vlTomb.Wait()
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("Unexpected error : %s", err)
}
}