Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions app/vlagent/kubernetescollector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ func (kc *kubernetesCollector) startReadPodLogs(pod pod) {
}
}

// streamFieldNames is a list of _stream fields.
// Must be synced with getCommonFields.
var streamFieldNames = []string{"kubernetes.container_name", "kubernetes.pod_name", "kubernetes.pod_namespace"}

func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field {
var fs logstorage.Fields

Expand Down
8 changes: 0 additions & 8 deletions app/vlagent/kubernetescollector/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

// The maximum log line size that VictoriaLogs can accept.
Expand Down Expand Up @@ -331,9 +329,3 @@ func symlinkExists(path string) bool {
}
return true
}

func reportLogRowSizeExceeded(streamFields []logstorage.Field, size int) {
stream := logstorage.MarshalFieldsToJSON(nil, streamFields)
logger.Warnf("skipping log entry from stream %s: entry size %d bytes exceeds maximum allowed size of %d MiB",
stream, size, maxLogLineSize/1024/1024)
}
72 changes: 40 additions & 32 deletions app/vlagent/kubernetescollector/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var (
"See https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field")
extraFields = flag.String("kubernetesCollector.extraFields", "", "Extra fields to add to each log line collected from Kubernetes pods in JSON format. "+
`For example: -kubernetesCollector.extraFields='{"cluster":"cluster-1","env":"production"}'`)
streamFields = flagutil.NewArrayString("kubernetesCollector.streamFields", "Comma-separated list of fields to use as log stream fields for logs ingested from Kubernetes Pods. "+
"Default: kubernetes.container_name,kubernetes.pod_name,kubernetes.pod_namespace. "+
"See: https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields")

includePodLabels = flag.Bool("kubernetesCollector.includePodLabels", true, "Include Pod labels as additional fields in the log entries. "+
"Even this setting is disabled, Pod labels are available for filtering via -kubernetes.excludeFilter flag")
Expand All @@ -48,9 +51,6 @@ type logFileProcessor struct {
lr *logstorage.LogRows
tenantID logstorage.TenantID

// streamFieldsLen is the number of stream fields at the beginning of commonFields.
streamFieldsLen int

// commonFields are common fields for the given log file.
commonFields []logstorage.Field

Expand Down Expand Up @@ -83,36 +83,16 @@ func newLogFileProcessor(storage insertutil.LogRowsStorage, commonFields []logst
commonFields = fields
}

// move stream fields to the beginning of commonFields

streamFields := make([]logstorage.Field, 0, len(commonFields))
for _, f := range commonFields {
if slices.Contains(streamFieldNames, f.Name) {
streamFields = append(streamFields, f)
}
}
streamFieldsLen := len(streamFields)

fields := streamFields
for _, f := range commonFields {
if !slices.Contains(streamFieldNames, f.Name) {
fields = append(fields, f)
}
}

tenantID := getTenantID()
extraFields := getExtraFields()

sfs := getStreamFields()
efs := getExtraFields()
const defaultMsgValue = "missing _msg field; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field"
lr := logstorage.GetLogRows(nil, *ignoreFields, *decolorizeFields, extraFields, defaultMsgValue)
lr := logstorage.GetLogRows(sfs, *ignoreFields, *decolorizeFields, efs, defaultMsgValue)

return &logFileProcessor{
storage: storage,
lr: lr,
tenantID: tenantID,

streamFieldsLen: streamFieldsLen,
commonFields: fields,
storage: storage,
lr: lr,
tenantID: getTenantID(),
commonFields: commonFields,
}
}

Expand Down Expand Up @@ -189,7 +169,7 @@ func (lfp *logFileProcessor) joinPartialLines(criLine criLine) (int64, []byte, b
lfp.partialCRIContentSize += len(criLine.content)
if lfp.partialCRIContentSize > maxLogLineSize {
// Discard the too large log line.
reportLogRowSizeExceeded(lfp.commonFields[:lfp.streamFieldsLen], lfp.partialCRIContentSize)
reportLogRowSizeExceeded(lfp.commonFields, lfp.partialCRIContentSize)

lfp.partialCRIContent.Reset()
lfp.partialCRIContentSize = 0
Expand Down Expand Up @@ -237,7 +217,7 @@ func (lfp *logFileProcessor) addRow(timestamp int64, fields []logstorage.Field)
lfp.fieldsBuf = append(lfp.fieldsBuf[:0], lfp.commonFields...)
lfp.fieldsBuf = append(lfp.fieldsBuf, fields...)

lfp.lr.MustAdd(lfp.tenantID, timestamp, lfp.fieldsBuf, lfp.streamFieldsLen)
lfp.lr.MustAdd(lfp.tenantID, timestamp, lfp.fieldsBuf, -1)
lfp.storage.MustAddRows(lfp.lr)
lfp.lr.ResetKeepSettings()
}
Expand Down Expand Up @@ -581,6 +561,34 @@ func getTimeFields() []string {
return *timeField
}

// defaultStreamFields is a list of default _stream fields.
// Must be synced with getCommonFields.
var defaultStreamFields = []string{"kubernetes.container_name", "kubernetes.pod_name", "kubernetes.pod_namespace"}

func getStreamFields() []string {
if len(*streamFields) == 0 {
return defaultStreamFields
}
return *streamFields
}

var partialCRIContentBufPool bytesutil.ByteBufferPool

var criJSONParserPool fastjson.ParserPool

func reportLogRowSizeExceeded(commonFields []logstorage.Field, size int) {
var pod, namespace string
for _, f := range commonFields {
if f.Name == "kubernetes.pod_namespace" {
namespace = f.Value
}
if f.Name == "kubernetes.pod_name" {
pod = f.Value
}
if pod != "" && namespace != "" {
break
}
}
logger.Warnf("skipping log entry from Pod %q in namespace %q: entry size of %.2f MiB exceeds the maximum allowed size of %d MiB",
pod, namespace, float64(size)/1024/1024, maxLogLineSize/1024/1024)
}
17 changes: 17 additions & 0 deletions app/vlagent/kubernetescollector/processor_timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ func BenchmarkProcessorKlog(b *testing.B) {
benchmarkProcessor(b, in)
}

func BenchmarkProcessorJSON(b *testing.B) {
in := []string{
`2025-12-15T10:34:25.637326000Z stderr F {"message":"Generated self-signed cert","file":"/tmp/apiserver.crt","key":"/tmp/apiserver.key","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.872911000Z stderr F {"message":"Adding GroupVersion metrics.k8s.io v1beta1 to ResourceManager","component":"handler","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.977313000Z stderr F {"message":"Starting RequestHeaderAuthRequestController","controller":"requestheader","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.977317000Z stderr F {"message":"Starting controller","name":"client-ca::kube-system::extension-apiserver-authentication::client-ca-file","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.977332000Z stderr F {"message":"Waiting for caches to sync for RequestHeaderAuthRequestController","controller":"shared_informer","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.977336000Z stderr F {"message":"Waiting for caches to sync","controller":"client-ca::kube-system::extension-apiserver-authentication::requestheader-client-ca-file","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.977526000Z stderr F {"message":"Starting controller","name":"serving-cert::/tmp/apiserver.crt::/tmp/apiserver.key","component":"dynamic_serving","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.977591000Z stderr F {"message":"Serving securely on [::]:10250","component":"secure_serving","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:25.977605000Z stderr F {"message":"Starting DynamicServingCertificateController","component":"tlsconfig","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:26.077533000Z stderr F {"message":"Caches are synced for RequestHeaderAuthRequestController","controller":"shared_informer","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
`2025-12-15T10:34:26.948143000Z stderr F {"message":"Failed probe","probe":"metric-storage-ready","error":"no metrics to serve","severity":"ERROR","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
}
benchmarkProcessor(b, in)
}

func benchmarkProcessor(b *testing.B, logLines []string) {
totalSize := 0

Expand Down
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ according to the following docs:
* FEATURE: add an ability to delete snapshots via `/internal/partition/snapshot/delete` endpoint. See [these docs](https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle) and [#828](https://github.com/VictoriaMetrics/VictoriaLogs/issues/828).
* FEATURE: [dashboards/internal](https://grafana.com/grafana/dashboards/24585): add Grafana dashboard for monitoring VictoriaLogs internal state. The source of the dashboard is available [here](https://github.com/VictoriaMetrics/VictoriaLogs/blob/master/dashboards/victorialogs-internal.json).
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add `pattern_match_prefix()` and `pattern_match_suffx()` filters for matching the given pattern at the beginning or at the end of the log field value. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#pattern-match-filter) and [#762](https://github.com/VictoriaMetrics/VictoriaLogs/issues/762).
* FEATURE: [Kubernetes Collector](https://docs.victoriametrics.com/victorialogs/vlagent/#collect-kubernetes-pod-logs): add an ability to change default [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) fields via `-kubernetesCollector.streamFields` command-line flag. See [#998](https://github.com/VictoriaMetrics/VictoriaLogs/issues/998).

* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): properly apply time offset according to the docs for the [`day_range`](https://docs.victoriametrics.com/victorialogs/logsql/#day-range-filter) and [`week_range`](https://docs.victoriametrics.com/victorialogs/logsql/#week-range-filter) filters. Previously `offset 2h` was incorrectly translated into `-02:00` timezone offset instead of the expected `+02:00` timezone offset. See [#796](https://github.com/VictoriaMetrics/VictoriaLogs/issues/796).
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): use local time zone for the VictoriaLogs server when the [`day_range`](https://docs.victoriametrics.com/victorialogs/logsql/#day-range-filter) or [`week_range`](https://docs.victoriametrics.com/victorialogs/logsql/#week-range-filter) filter doesn't contain explicitly specified `offset ...` suffix. This aligns with the behaviour when the timezone information is missing in the [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter).
Expand Down
4 changes: 3 additions & 1 deletion docs/victorialogs/vlagent.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,14 @@ To set the default [tenant](http://localhost:1313/victorialogs/#multitenancy) ID
pass `-kubernetesCollector.tenantID` command-line flag with a tenant ID in the format `accountID:projectID`.
See also [multitenancy docs for vlagent](https://docs.victoriametrics.com/victorialogs/vlagent/#multitenancy).

`vlagent` uses the following fields as [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) fields for Kubernetes Pod logs:
By default, `vlagent` uses the following fields as [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) fields for Kubernetes Pod logs:

- `kubernetes.container_name`
- `kubernetes.pod_name`
- `kubernetes.pod_namespace`

Use these fields for fast filtering and grouping of logs in VictoriaLogs via [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
While it is recommended to keep the default stream fields, you can override them using the `-kubernetesCollector.streamFields` command-line flag.

### Filtering Kubernetes logs

Expand Down