-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
A note for the community
- Please vote on this issue by adding a π reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Use Cases
My use-case is to create metrics from logs and aggregate them. Vector's log_to_metric + aggregate transforms doesn't seem to use event's timestamp.
Config:
[log_schema]
timestamp_key = "@timestamp"
[sources.file_input]
type = "file" # required
include = ["/logs/*.json"] # required
read_from = "beginning"
[transforms.parse_log_json]
type = "remap"
inputs = ["file_input"]
source = '''
. = parse_json!(string!(.message))
'''
#####################
[transforms.log_to_met]
type = "log_to_metric" # required
inputs = [ "parse_log_json" ] # required
[[transforms.log_to_met.metrics]]
type = "counter"
field = "statuscode"
name = "response_total"
[transforms.log_to_met.metrics.tags]
statuscode = "{{statuscode}}"
[transforms.msg_agg]
type = "aggregate"
inputs = [ "log_to_met" ]
interval_ms = 5_000
[sinks.console]
type = "console"
inputs = [ "msg_agg" ]
target = "stdout"
[sinks.console.encoding]
codec = "json"Input file:
{"level":"info","@timestamp":"2022-07-29T11:39:29.230Z","caller":"esque/main.go:66","msg":"Hello World","reason":"Greeting from unidentifed species","statuscode":200,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64"}
{"level":"error","@timestamp":"2022-07-29T11:39:29.483Z","caller":"esque/main.go:68","msg":"End of World","reason":"Invasion of unidentifed species","statuscode":500,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64","stacktrace":"main.main\n\t/Users/p.kolhe/Documents/projects/esque/main.go:68\nruntime.main\n\t/usr/local/Cellar/go/1.18.4/libexec/src/runtime/proc.go:250"}
{"level":"info","@timestamp":"2022-07-30T03:48:20.325Z","caller":"esque/main.go:66","msg":"Hello World","reason":"Greeting from unidentifed species","statuscode":200,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64"}
{"level":"error","@timestamp":"2022-07-30T03:48:20.576Z","caller":"esque/main.go:68","msg":"End of World","reason":"Invasion of unidentifed species","statuscode":500,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64","stacktrace":"main.main\n\t/Users/p.kolhe/Documents/projects/esque/main.go:68\nruntime.main\n\t/usr/local/Cellar/go/1.18.4/libexec/src/runtime/proc.go:250"}
{"level":"info","@timestamp":"2022-07-31T05:16:33.945Z","caller":"esque/main.go:66","msg":"Hello World","reason":"Greeting from unidentifed species","statuscode":200,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64"}
{"level":"error","@timestamp":"2022-07-31T05:16:34.196Z","caller":"esque/main.go:68","msg":"End of World","reason":"Invasion of unidentifed species","statuscode":500,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64","stacktrace":"main.main\n\t/Users/p.kolhe/Documents/projects/esque/main.go:68\nruntime.main\n\t/usr/local/Cellar/go/1.18.4/libexec/src/runtime/proc.go:250"}
{"level":"info","@timestamp":"2022-07-31T06:19:10.245Z","caller":"esque/main.go:66","msg":"Hello World","reason":"Greeting from unidentifed species","statuscode":200,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64"}
{"level":"error","@timestamp":"2022-07-31T06:19:10.497Z","caller":"esque/main.go:68","msg":"End of World","reason":"Invasion of unidentifed species","statuscode":500,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64","stacktrace":"main.main\n\t/Users/p.kolhe/Documents/projects/esque/main.go:68\nruntime.main\n\t/usr/local/Cellar/go/1.18.4/libexec/src/runtime/proc.go:250"}
{"level":"error","@timestamp":"2022-08-05T09:09:23.877Z","caller":"esque/main.go:68","msg":"End of World","reason":"Invasion of unidentifed species","statuscode":500,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64","stacktrace":"main.main\n\t/Users/p.kolhe/Documents/projects/esque/main.go:68\nruntime.main\n\t/usr/local/Cellar/go/1.18.4/libexec/src/runtime/proc.go:250"}
{"level":"info","@timestamp":"2022-08-05T09:09:24.129Z","caller":"esque/main.go:66","msg":"Hello World","reason":"Greeting from unidentifed species","statuscode":200,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64"}
{"level":"info","@timestamp":"2022-08-08T13:15:09.057Z","caller":"esque/main.go:73","msg":"Hello World","reason":"Greeting from unidentifed species","statuscode":200,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64"}
{"level":"error","@timestamp":"2022-08-08T13:15:09.309Z","caller":"esque/main.go:75","msg":"End of World","reason":"Invasion of unidentifed species","statuscode":500,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64","stacktrace":"main.main\n\t/Users/p.kolhe/Documents/projects/esque/main.go:75\nruntime.main\n\t/usr/local/Cellar/go/1.18.5/libexec/src/runtime/proc.go:250"}
{"level":"info","@timestamp":"2022-08-10T07:18:33.044Z","caller":"esque/main.go:74","msg":"Hello World","reason":"Greeting from unidentifed species","statuscode":200,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64"}
{"level":"error","@timestamp":"2022-08-10T07:18:33.296Z","caller":"esque/main.go:76","msg":"End of World","reason":"Invasion of unidentifed species","statuscode":500,"method":"PUT","messageid":"j1ae77e5-915l-13e9-bc42-556nf7864d64","stacktrace":"main.main\n\t/Users/p.kolhe/Documents/projects/esque/main.go:76\nruntime.main\n\t/usr/local/Cellar/go/1.18.5/libexec/src/runtime/proc.go:250"}
Output:
{"@timestamp":"2022-08-16T13:25:03.393968870Z","counter":{"value":7.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-08-16T13:25:03.393989063Z","counter":{"value":7.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
Expected output:
{"@timestamp":"2022-07-29T11:39:29.230Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-07-29T11:39:29.483Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
{"@timestamp":"2022-07-30T03:48:20.325Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-07-30T03:48:20.576Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
{"@timestamp":"2022-07-31T05:16:33.945Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-07-31T05:16:34.196Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
{"@timestamp":"2022-07-31T06:19:10.245Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-07-31T06:19:10.497Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
{"@timestamp":"2022-08-05T09:09:23.877Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
{"@timestamp":"2022-08-05T09:09:24.129Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-08-08T13:15:09.057Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-08-08T13:15:09.309Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
{"@timestamp":"2022-08-10T07:18:33.044Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"200"}}
{"@timestamp":"2022-08-10T07:18:33.296Z","counter":{"value":1.0},"kind":"incremental","name":"response_total","tags":{"statuscode":"500"}}
Attempted Solutions
- log_to_metric with aggregate transform
- uses wall clock for aggregation
- reduce transform
- starts_when/ends_when conditions doesn't expose state
- lua
- should work fine for this use-case but I believe this should be option of
aggregatetransform
- should work fine for this use-case but I believe this should be option of
Proposal
@tobz feedback:
Chatting with @fpytloun in Discord, we talked through the intended/expected behavior, which is actually windowing the metrics by their own timestamp, not the real clock time as Vector processes the event. Thus, the goal would be to aggregate each metric individually with a 5 second window, based on the original timestamp... and thus, with the example input, having timestamps more than 5 seconds apart, the expectation effectively has no reduction/aggregation.
With the intended behavior explained here, the answer is that aggregate transform operates on wall clock time, and so it has no concept of being able to window incoming data in this way. Similarly, the reduce transform gets a little closer, having the data in the reducer state that could potentially be used to do this type of windowing, but doesn't expose a way to query the reducer state in the starts_when/ends_when conditions.
At present, the best approach would likely be to use the lua transform as it can support holding state, as well as allowing custom logic to be written to do this type of windowing/reduction.
References
Follow-up from question here: #13991
Version
No response