Skip to content

Commit a5f8a17

Browse files
committed
app/vlagent/kubernetescollector: add an ability to change default _stream fields
1 parent 22ab684 commit a5f8a17

File tree

6 files changed

+61
-45
lines changed

6 files changed

+61
-45
lines changed

app/vlagent/kubernetescollector/collector.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,6 @@ func (kc *kubernetesCollector) startReadPodLogs(pod pod) {
216216
}
217217
}
218218

219-
// streamFieldNames is a list of _stream fields.
220-
// Must be synced with getCommonFields.
221-
var streamFieldNames = []string{"kubernetes.container_name", "kubernetes.pod_name", "kubernetes.pod_namespace"}
222-
223219
func getCommonFields(n node, p pod, cs containerStatus) []logstorage.Field {
224220
var fs logstorage.Fields
225221

app/vlagent/kubernetescollector/logfile.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
1313
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
1414
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
15-
16-
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
1715
)
1816

1917
// The maximum log line size that VictoriaLogs can accept.
@@ -331,9 +329,3 @@ func symlinkExists(path string) bool {
331329
}
332330
return true
333331
}
334-
335-
func reportLogRowSizeExceeded(streamFields []logstorage.Field, size int) {
336-
stream := logstorage.MarshalFieldsToJSON(nil, streamFields)
337-
logger.Warnf("skipping log entry from stream %s: entry size %d bytes exceeds maximum allowed size of %d MiB",
338-
stream, size, maxLogLineSize/1024/1024)
339-
}

app/vlagent/kubernetescollector/processor.go

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ var (
3232
"See https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field")
3333
extraFields = flag.String("kubernetesCollector.extraFields", "", "Extra fields to add to each log line collected from Kubernetes pods in JSON format. "+
3434
`For example: -kubernetesCollector.extraFields='{"cluster":"cluster-1","env":"production"}'`)
35+
streamFields = flagutil.NewArrayString("kubernetesCollector.streamFields", "Comma-separated list of fields to use as log stream fields for logs ingested from Kubernetes Pods. "+
36+
"Default: kubernetes.container_name,kubernetes.pod_name,kubernetes.pod_namespace. "+
37+
"See: https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields")
3538

3639
includePodLabels = flag.Bool("kubernetesCollector.includePodLabels", true, "Include Pod labels as additional fields in the log entries. "+
3740
"Even this setting is disabled, Pod labels are available for filtering via -kubernetes.excludeFilter flag")
@@ -48,9 +51,6 @@ type logFileProcessor struct {
4851
lr *logstorage.LogRows
4952
tenantID logstorage.TenantID
5053

51-
// streamFieldsLen is the number of stream fields at the beginning of commonFields.
52-
streamFieldsLen int
53-
5454
// commonFields are common fields for the given log file.
5555
commonFields []logstorage.Field
5656

@@ -83,36 +83,16 @@ func newLogFileProcessor(storage insertutil.LogRowsStorage, commonFields []logst
8383
commonFields = fields
8484
}
8585

86-
// move stream fields to the beginning of commonFields
87-
88-
streamFields := make([]logstorage.Field, 0, len(commonFields))
89-
for _, f := range commonFields {
90-
if slices.Contains(streamFieldNames, f.Name) {
91-
streamFields = append(streamFields, f)
92-
}
93-
}
94-
streamFieldsLen := len(streamFields)
95-
96-
fields := streamFields
97-
for _, f := range commonFields {
98-
if !slices.Contains(streamFieldNames, f.Name) {
99-
fields = append(fields, f)
100-
}
101-
}
102-
103-
tenantID := getTenantID()
104-
extraFields := getExtraFields()
105-
86+
sfs := getStreamFields()
87+
efs := getExtraFields()
10688
const defaultMsgValue = "missing _msg field; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field"
107-
lr := logstorage.GetLogRows(nil, *ignoreFields, *decolorizeFields, extraFields, defaultMsgValue)
89+
lr := logstorage.GetLogRows(sfs, *ignoreFields, *decolorizeFields, efs, defaultMsgValue)
10890

10991
return &logFileProcessor{
110-
storage: storage,
111-
lr: lr,
112-
tenantID: tenantID,
113-
114-
streamFieldsLen: streamFieldsLen,
115-
commonFields: fields,
92+
storage: storage,
93+
lr: lr,
94+
tenantID: getTenantID(),
95+
commonFields: commonFields,
11696
}
11797
}
11898

@@ -189,7 +169,7 @@ func (lfp *logFileProcessor) joinPartialLines(criLine criLine) (int64, []byte, b
189169
lfp.partialCRIContentSize += len(criLine.content)
190170
if lfp.partialCRIContentSize > maxLogLineSize {
191171
// Discard the too large log line.
192-
reportLogRowSizeExceeded(lfp.commonFields[:lfp.streamFieldsLen], lfp.partialCRIContentSize)
172+
reportLogRowSizeExceeded(lfp.commonFields, lfp.partialCRIContentSize)
193173

194174
lfp.partialCRIContent.Reset()
195175
lfp.partialCRIContentSize = 0
@@ -237,7 +217,7 @@ func (lfp *logFileProcessor) addRow(timestamp int64, fields []logstorage.Field)
237217
lfp.fieldsBuf = append(lfp.fieldsBuf[:0], lfp.commonFields...)
238218
lfp.fieldsBuf = append(lfp.fieldsBuf, fields...)
239219

240-
lfp.lr.MustAdd(lfp.tenantID, timestamp, lfp.fieldsBuf, lfp.streamFieldsLen)
220+
lfp.lr.MustAdd(lfp.tenantID, timestamp, lfp.fieldsBuf, -1)
241221
lfp.storage.MustAddRows(lfp.lr)
242222
lfp.lr.ResetKeepSettings()
243223
}
@@ -581,6 +561,34 @@ func getTimeFields() []string {
581561
return *timeField
582562
}
583563

564+
// defaultStreamFields is a list of default _stream fields.
565+
// Must be synced with getCommonFields.
566+
var defaultStreamFields = []string{"kubernetes.container_name", "kubernetes.pod_name", "kubernetes.pod_namespace"}
567+
568+
func getStreamFields() []string {
569+
if len(*streamFields) == 0 {
570+
return defaultStreamFields
571+
}
572+
return *streamFields
573+
}
574+
584575
var partialCRIContentBufPool bytesutil.ByteBufferPool
585576

586577
var criJSONParserPool fastjson.ParserPool
578+
579+
func reportLogRowSizeExceeded(commonFields []logstorage.Field, size int) {
580+
var pod, namespace string
581+
for _, f := range commonFields {
582+
if f.Name == "kubernetes.pod_namespace" {
583+
namespace = f.Value
584+
}
585+
if f.Name == "kubernetes.pod_name" {
586+
pod = f.Value
587+
}
588+
if pod != "" && namespace != "" {
589+
break
590+
}
591+
}
592+
logger.Warnf("skipping log entry from Pod %q in namespace %q: entry size of %.2f MiB exceeds the maximum allowed size of %d MiB",
593+
pod, namespace, float64(size)/1024/1024, maxLogLineSize/1024/1024)
594+
}

app/vlagent/kubernetescollector/processor_timing_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,23 @@ func BenchmarkProcessorKlog(b *testing.B) {
4141
benchmarkProcessor(b, in)
4242
}
4343

44+
func BenchmarkProcessorJSON(b *testing.B) {
45+
in := []string{
46+
`2025-12-15T10:34:25.637326000Z stderr F {"message":"Generated self-signed cert","file":"/tmp/apiserver.crt","key":"/tmp/apiserver.key","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
47+
`2025-12-15T10:34:25.872911000Z stderr F {"message":"Adding GroupVersion metrics.k8s.io v1beta1 to ResourceManager","component":"handler","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
48+
`2025-12-15T10:34:25.977313000Z stderr F {"message":"Starting RequestHeaderAuthRequestController","controller":"requestheader","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
49+
`2025-12-15T10:34:25.977317000Z stderr F {"message":"Starting controller","name":"client-ca::kube-system::extension-apiserver-authentication::client-ca-file","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
50+
`2025-12-15T10:34:25.977332000Z stderr F {"message":"Waiting for caches to sync for RequestHeaderAuthRequestController","controller":"shared_informer","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
51+
`2025-12-15T10:34:25.977336000Z stderr F {"message":"Waiting for caches to sync","controller":"client-ca::kube-system::extension-apiserver-authentication::requestheader-client-ca-file","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
52+
`2025-12-15T10:34:25.977526000Z stderr F {"message":"Starting controller","name":"serving-cert::/tmp/apiserver.crt::/tmp/apiserver.key","component":"dynamic_serving","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
53+
`2025-12-15T10:34:25.977591000Z stderr F {"message":"Serving securely on [::]:10250","component":"secure_serving","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
54+
`2025-12-15T10:34:25.977605000Z stderr F {"message":"Starting DynamicServingCertificateController","component":"tlsconfig","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
55+
`2025-12-15T10:34:26.077533000Z stderr F {"message":"Caches are synced for RequestHeaderAuthRequestController","controller":"shared_informer","severity":"INFO","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
56+
`2025-12-15T10:34:26.948143000Z stderr F {"message":"Failed probe","probe":"metric-storage-ready","error":"no metrics to serve","severity":"ERROR","kubernetes.container_name":"test-container","kubernetes.pod_name":"test-pod","kubernetes.pod_namespace":"test-namespace"}`,
57+
}
58+
benchmarkProcessor(b, in)
59+
}
60+
4461
func benchmarkProcessor(b *testing.B, logLines []string) {
4562
totalSize := 0
4663

docs/victorialogs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ according to the following docs:
2727
* FEATURE: add an ability to delete snapshots via `/internal/partition/snapshot/delete` endpoint. See [these docs](https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle) and [#828](https://github.com/VictoriaMetrics/VictoriaLogs/issues/828).
2828
* FEATURE: [dashboards/internal](https://grafana.com/grafana/dashboards/24585): add Grafana dashboard for monitoring VictoriaLogs internal state. The source of the dashboard is available [here](https://github.com/VictoriaMetrics/VictoriaLogs/blob/master/dashboards/victorialogs-internal.json).
2929
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add `pattern_match_prefix()` and `pattern_match_suffx()` filters for matching the given pattern at the beginning or at the end of the log field value. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#pattern-match-filter) and [#762](https://github.com/VictoriaMetrics/VictoriaLogs/issues/762).
30+
* FEATURE: [Kubernetes Collector](https://docs.victoriametrics.com/victorialogs/vlagent/#collect-kubernetes-pod-logs): add an ability to change default [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) fields via `-kubernetesCollector.streamFields` command-line flag. See [#998](https://github.com/VictoriaMetrics/VictoriaLogs/issues/998).
3031

3132
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): properly apply time offset according to the docs for the [`day_range`](https://docs.victoriametrics.com/victorialogs/logsql/#day-range-filter) and [`week_range`](https://docs.victoriametrics.com/victorialogs/logsql/#week-range-filter) filters. Previously `offset 2h` was incorrectly translated into `-02:00` timezone offset instead of the expected `+02:00` timezone offset. See [#796](https://github.com/VictoriaMetrics/VictoriaLogs/issues/796).
3233
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): use local time zone for the VictoriaLogs server when the [`day_range`](https://docs.victoriametrics.com/victorialogs/logsql/#day-range-filter) or [`week_range`](https://docs.victoriametrics.com/victorialogs/logsql/#week-range-filter) filter doesn't contain explicitly specified `offset ...` suffix. This aligns with the behaviour when the timezone information is missing in the [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter).

docs/victorialogs/vlagent.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,14 @@ To set the default [tenant](http://localhost:1313/victorialogs/#multitenancy) ID
134134
pass `-kubernetesCollector.tenantID` command-line flag with a tenant ID in the format `accountID:projectID`.
135135
See also [multitenancy docs for vlagent](https://docs.victoriametrics.com/victorialogs/vlagent/#multitenancy).
136136

137-
`vlagent` uses the following fields as [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) fields for Kubernetes Pod logs:
137+
By default, `vlagent` uses the following fields as [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) fields for Kubernetes Pod logs:
138+
138139
- `kubernetes.container_name`
139140
- `kubernetes.pod_name`
140141
- `kubernetes.pod_namespace`
141142

142143
Use these fields for fast filtering and grouping of logs in VictoriaLogs via [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
144+
While it is recommended to keep the default stream fields, you can override them using the `-kubernetesCollector.streamFields` command-line flag if needed.
143145

144146
### Filtering Kubernetes logs
145147

0 commit comments

Comments
 (0)