Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,25 +877,42 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
return
}

labelFields, err := ca.q.GetStatsLabelsAddGroupingByTime(step, offset)
labelFields, metricFields, err := ca.q.GetStatsLabelsAndMetricFieldsAddGroupingByTime(step, offset)
if err != nil {
httpserver.SendPrometheusError(w, r, err)
return
}

m := make(map[string]*statsSeries)
var mLock sync.Mutex
metricOrder := make(map[string]int)
for i, name := range metricFields {
metricOrder[name] = i
}

addPoint := func(name string, labels []logstorage.Field, p statsPoint) {
dst := append([]byte{}, name...)
dst = logstorage.MarshalFieldsToJSON(dst, labels)
key := string(dst)

order, ok := metricOrder[name]
if !ok {
// Keep histogram() buckets grouped with the parent metric order.
if before, cutOk := strings.CutSuffix(name, "_bucket"); cutOk {
baseName := before
order, ok = metricOrder[baseName]
}
}
if !ok {
order = len(metricOrder)
}

mLock.Lock()
ss := m[key]
if ss == nil {
ss = &statsSeries{
key: key,
order: order,
Name: name,
Labels: labels,
}
Expand Down Expand Up @@ -996,6 +1013,9 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
rows = append(rows, ss)
}
sort.Slice(rows, func(i, j int) bool {
if rows[i].order != rows[j].order {
return rows[i].order < rows[j].order
}
return rows[i].key < rows[j].key
})

Expand All @@ -1011,6 +1031,8 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r

type statsSeries struct {
key string
// order preserves metric result order as defined by the query.
order int

Name string
Labels []logstorage.Field
Expand Down
74 changes: 74 additions & 0 deletions apptest/tests/stats_query_range_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package tests

import (
"encoding/json"
"net/http"
"reflect"
"testing"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"

"github.com/VictoriaMetrics/VictoriaLogs/apptest"
)

type statsQueryResponse struct {
Data struct {
Result []struct {
Metric map[string]string `json:"metric"`
} `json:"result"`
} `json:"data"`
}

func getMetricNamesFromStatsResponse(t *testing.T, s string) []string {
t.Helper()

var resp statsQueryResponse
if err := json.Unmarshal([]byte(s), &resp); err != nil {
t.Fatalf("cannot unmarshal stats response %q: %s", s, err)
}

names := make([]string, 0, len(resp.Data.Result))
for _, r := range resp.Data.Result {
names = append(names, r.Metric["__name__"])
}
return names
}

func TestStatsQueryRangeMetricOrderConsistentWithStatsQuery(t *testing.T) {
fs.MustRemoveDir(t.Name())
tc := apptest.NewTestCase(t)
defer tc.Stop()

sut := tc.MustStartDefaultVlsingle()

records := []string{
`{"_time":"2025-01-01T00:00:01Z","source.bytes":1,"destination.bytes":10}`,
`{"_time":"2025-01-01T00:00:02Z","source.bytes":2,"destination.bytes":20}`,
}
sut.JSONLineWrite(t, records, apptest.IngestOpts{})
sut.ForceFlush(t)

query := "* | stats count() sessions, sum(source.bytes) source.bytes, sum(destination.bytes) destination.bytes"

instantResponse, statusCode := sut.StatsQueryRaw(t, query, apptest.StatsQueryOpts{
Time: "2025-01-01T00:01:00Z",
})
if statusCode != http.StatusOK {
t.Fatalf("unexpected statusCode when executing instant query %q; got %d; want %d", query, statusCode, http.StatusOK)
}

rangeResponse, statusCode := sut.StatsQueryRangeRaw(t, query, apptest.StatsQueryRangeOpts{
Start: "2025-01-01T00:00:00Z",
End: "2025-01-01T00:01:00Z",
Step: "1m",
})
if statusCode != http.StatusOK {
t.Fatalf("unexpected statusCode when executing range query %q; got %d; want %d", query, statusCode, http.StatusOK)
}

instantMetricNames := getMetricNamesFromStatsResponse(t, instantResponse)
rangeMetricNames := getMetricNamesFromStatsResponse(t, rangeResponse)
if !reflect.DeepEqual(rangeMetricNames, instantMetricNames) {
t.Fatalf("unexpected metric order for stats_query_range\ngot\n%q\nwant\n%q", rangeMetricNames, instantMetricNames)
}
}
2 changes: 2 additions & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ according to the following docs:

## tip

* BUGFIX: [`/select/logsql/stats_query_range` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats): preserve metric order in responses so it is consistent with [`/select/logsql/stats_query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats). See [#1011](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1011).

## [v1.45.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.45.0)

Released at 2026-02-05
Expand Down
39 changes: 31 additions & 8 deletions lib/logstorage/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,18 @@ func (q *Query) GetStatsLabels() ([]string, error) {
//
// if step > 0, then _time:step is added to the last `stats by (...)` pipe at q.
func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, error) {
labelFields, _, err := q.GetStatsLabelsAndMetricFieldsAddGroupingByTime(step, offset)
return labelFields, err
}

// GetStatsLabelsAndMetricFieldsAddGroupingByTime returns stats labels and metric fields from q
// for /select/logsql/stats_query and /select/logsql/stats_query_range endpoints.
//
// if step > 0, then _time:step is added to the last `stats by (...)` pipe at q.
func (q *Query) GetStatsLabelsAndMetricFieldsAddGroupingByTime(step, offset int64) ([]string, []string, error) {
idx := getLastPipeStatsIdx(q.pipes)
if idx < 0 {
return nil, fmt.Errorf("missing `| stats ...` pipe in the query [%s]", q)
return nil, nil, fmt.Errorf("missing `| stats ...` pipe in the query [%s]", q)
}
ps := q.pipes[idx].(*pipeStats)

Expand All @@ -1062,7 +1071,7 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e
continue
}
if !p.canReturnLastNResults() {
return nil, fmt.Errorf("the pipe `| %q` cannot be put in front of `| %q`, since it may modify or delete `_time` field", p, ps)
return nil, nil, fmt.Errorf("the pipe `| %q` cannot be put in front of `| %q`, since it may modify or delete `_time` field", p, ps)
}
}
}
Expand All @@ -1078,6 +1087,7 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e

labelFields := make([]string, 0, len(ps.byFields))
metricFields := make(map[string]struct{}, len(ps.funcs))
metricFieldsOrdered := make([]string, 0, len(ps.funcs))

addToLabelFields := func(f string) {
if !slices.Contains(labelFields, f) {
Expand All @@ -1090,6 +1100,9 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e
if idx := slices.Index(labelFields, f); idx >= 0 {
labelFields = append(labelFields[:idx], labelFields[idx+1:]...)
}
if _, ok := metricFields[f]; !ok {
metricFieldsOrdered = append(metricFieldsOrdered, f)
}
metricFields[f] = struct{}{}
}

Expand Down Expand Up @@ -1120,7 +1133,7 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e
case *pipeRunningStats:
// `| running_stats ...` pipe must contain the same labelFields as the preceding `stats` pipe.
if !hasNeededFieldsExceptTime(t.byFields, labelFields) {
return nil, fmt.Errorf("the %q must contain the same list of fields as `stats` pipe in the query [%s]", t, q)
return nil, nil, fmt.Errorf("the %q must contain the same list of fields as `stats` pipe in the query [%s]", t, q)
}
for _, f := range t.funcs {
addToMetricFields(f.resultName)
Expand Down Expand Up @@ -1213,24 +1226,34 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e
case *pipeUnpackJSON:
// Assume that `| unpack_json ... fields (...)` pipe generates an additional by(...) labels from fields(...)
if len(t.fieldFilters) == 0 {
return nil, fmt.Errorf("missing fields(...) after %q in the query [%s]", t, q)
return nil, nil, fmt.Errorf("missing fields(...) after %q in the query [%s]", t, q)
}
for _, f := range t.fieldFilters {
if prefixfilter.IsWildcardFilter(f) {
return nil, fmt.Errorf("fields(...) at %q cannot contain wildcard filter; got %s; query [%s]", t, f, q)
return nil, nil, fmt.Errorf("fields(...) at %q cannot contain wildcard filter; got %s; query [%s]", t, f, q)
}
addToLabelFields(f)
}
default:
return nil, fmt.Errorf("the %q pipe cannot be put after %q pipe in the query [%s]", p, ps, q)
return nil, nil, fmt.Errorf("the %q pipe cannot be put after %q pipe in the query [%s]", p, ps, q)
}
}

if len(metricFields) == 0 {
return nil, fmt.Errorf("missing metric fields in the results of query [%s]", q)
return nil, nil, fmt.Errorf("missing metric fields in the results of query [%s]", q)
}

resultMetricFields := make([]string, 0, len(metricFields))
for _, f := range metricFieldsOrdered {
if _, ok := metricFields[f]; ok {
resultMetricFields = append(resultMetricFields, f)
}
}
if len(resultMetricFields) == 0 {
return nil, nil, fmt.Errorf("missing metric fields in the results of query [%s]", q)
}

return labelFields, nil
return labelFields, resultMetricFields, nil
}

func hasNeededFieldsExceptTime(fields, neededFields []string) bool {
Expand Down
25 changes: 25 additions & 0 deletions lib/logstorage/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,31 @@ func TestApplyOptionTimeOffsetToSubqueries(t *testing.T) {
})
}

func TestQueryGetStatsLabelsAndMetricFieldsOrder(t *testing.T) {
q, err := ParseQueryAtTimestamp(`* | stats count() sessions, sum(source.bytes) source.bytes, sum(destination.bytes) destination.bytes`, 0)
if err != nil {
t.Fatalf("cannot parse query: %s", err)
}

_, metricFields, err := q.GetStatsLabelsAndMetricFieldsAddGroupingByTime(0, 0)
if err != nil {
t.Fatalf("cannot get metric fields: %s", err)
}

metricFieldsExpected := []string{"sessions", "source.bytes", "destination.bytes"}
if !reflect.DeepEqual(metricFields, metricFieldsExpected) {
t.Fatalf("unexpected metric fields order\ngot\n%q\nwant\n%q", metricFields, metricFieldsExpected)
}

_, metricFields, err = q.GetStatsLabelsAndMetricFieldsAddGroupingByTime(1e9, 0)
if err != nil {
t.Fatalf("cannot get metric fields for step=1s: %s", err)
}
if !reflect.DeepEqual(metricFields, metricFieldsExpected) {
t.Fatalf("unexpected metric fields order for step=1s\ngot\n%q\nwant\n%q", metricFields, metricFieldsExpected)
}
}

func TestLexer(t *testing.T) {
f := func(s string, tokensExpected []string) {
t.Helper()
Expand Down