Skip to content

Commit dd8f8a4

Browse files
committed
logicalplan: distributed optimizer fuzzing
We should only have remote executions that maintain the partition. Add a fuzzing test to check that invariant. Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent da6a186 commit dd8f8a4

File tree

2 files changed

+313
-1
lines changed

2 files changed

+313
-1
lines changed

logicalplan/distribute.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,18 @@ func (r RemoteExecution) ReturnType() parser.ValueType { return r.Query.ReturnTy
102102

103103
// Deduplicate is a logical plan which deduplicates samples from multiple RemoteExecutions.
104104
type Deduplicate struct {
105-
LeafNode
106105
Expressions RemoteExecutions
107106
}
108107

108+
func (r Deduplicate) Children() []*Node {
109+
children := make([]*Node, len(r.Expressions))
110+
for i := range r.Expressions {
111+
var n Node = r.Expressions[i]
112+
children[i] = &n
113+
}
114+
return children
115+
}
116+
109117
func (r Deduplicate) Clone() Node {
110118
clone := r
111119
clone.Expressions = make(RemoteExecutions, len(r.Expressions))

logicalplan/distribute_test.go

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44
package logicalplan
55

66
import (
7+
"context"
78
"math"
9+
"math/rand"
810
"regexp"
911
"testing"
1012
"time"
1113

1214
"github.com/thanos-io/promql-engine/api"
1315
"github.com/thanos-io/promql-engine/query"
1416

17+
"github.com/cortexproject/promqlsmith"
1518
"github.com/efficientgo/core/testutil"
1619
"github.com/prometheus/prometheus/model/labels"
20+
"github.com/prometheus/prometheus/promql"
1721
"github.com/prometheus/prometheus/promql/parser"
22+
"github.com/prometheus/prometheus/promql/promqltest"
1823
)
1924

2025
var replacements = map[string]*regexp.Regexp{
@@ -1083,3 +1088,302 @@ func newEngineMock(mint, maxt int64, labelSets []labels.Labels) *engineMock {
10831088
func newEngineMockWithExplicitPartition(mint, maxt int64, labelSets, partitionLabelSets []labels.Labels) *engineMock {
10841089
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: partitionLabelSets}
10851090
}
1091+
1092+
func TestPreservesPartitionLabels(t *testing.T) {
1093+
partitionLabels := map[string]struct{}{"region": {}}
1094+
1095+
parse := func(t *testing.T, expr string) Node {
1096+
t.Helper()
1097+
parsed, err := parser.ParseExpr(expr)
1098+
testutil.Ok(t, err)
1099+
plan, err := NewFromAST(parsed, &query.Options{
1100+
Start: time.Unix(0, 0),
1101+
End: time.Unix(0, 0),
1102+
}, PlanOptions{})
1103+
testutil.Ok(t, err)
1104+
return plan.Root()
1105+
}
1106+
1107+
cases := []struct {
1108+
name string
1109+
expr string
1110+
partitionLabels map[string]struct{}
1111+
expected bool
1112+
}{
1113+
{
1114+
name: "vector selector preserves",
1115+
expr: `metric`,
1116+
expected: true,
1117+
},
1118+
{
1119+
name: "number literal preserves",
1120+
expr: `1`,
1121+
expected: true,
1122+
},
1123+
{
1124+
name: "sum by partition label preserves",
1125+
expr: `sum by (region) (metric)`,
1126+
expected: true,
1127+
},
1128+
{
1129+
name: "sum by non-partition label does not preserve",
1130+
expr: `sum by (pod) (metric)`,
1131+
expected: false,
1132+
},
1133+
{
1134+
name: "sum by both labels preserves",
1135+
expr: `sum by (pod, region) (metric)`,
1136+
expected: true,
1137+
},
1138+
{
1139+
name: "sum without partition label does not preserve",
1140+
expr: `sum without (region) (metric)`,
1141+
expected: false,
1142+
},
1143+
{
1144+
name: "sum without non-partition label preserves",
1145+
expr: `sum without (pod) (metric)`,
1146+
expected: true,
1147+
},
1148+
{
1149+
name: "sum with no grouping does not preserve",
1150+
expr: `sum(metric)`,
1151+
expected: false,
1152+
},
1153+
{
1154+
name: "binary with on(partition) preserves",
1155+
expr: `metric_a + on (region) metric_b`,
1156+
expected: true,
1157+
},
1158+
{
1159+
name: "binary with on(non-partition) does not preserve",
1160+
expr: `metric_a + on (pod) metric_b`,
1161+
expected: false,
1162+
},
1163+
{
1164+
name: "binary with ignoring(partition) does not preserve",
1165+
expr: `metric_a + ignoring (region) metric_b`,
1166+
expected: false,
1167+
},
1168+
{
1169+
name: "binary with ignoring(non-partition) preserves",
1170+
expr: `metric_a + ignoring (pod) metric_b`,
1171+
expected: true,
1172+
},
1173+
{
1174+
name: "binary with default matching preserves",
1175+
expr: `metric_a + metric_b`,
1176+
expected: true,
1177+
},
1178+
{
1179+
name: "binary with partition in group_left include preserves",
1180+
expr: `metric_a * on (pod) group_left(region) metric_b`,
1181+
expected: true,
1182+
},
1183+
{
1184+
name: "unary preserves",
1185+
expr: `-metric`,
1186+
expected: true,
1187+
},
1188+
{
1189+
name: "subquery preserves",
1190+
expr: `max_over_time(metric[5m:1m])`,
1191+
expected: true,
1192+
},
1193+
{
1194+
name: "label_replace targeting partition label does not preserve",
1195+
expr: `label_replace(metric, "region", "$1", "pod", "(.*)")`,
1196+
expected: false,
1197+
},
1198+
{
1199+
name: "label_replace targeting non-partition label preserves",
1200+
expr: `label_replace(metric, "zone", "$1", "pod", "(.*)")`,
1201+
expected: true,
1202+
},
1203+
{
1204+
name: "rate preserves",
1205+
expr: `rate(metric[5m])`,
1206+
expected: true,
1207+
},
1208+
{
1209+
name: "nested sum by(region)(sum by(pod)(X)) preserves at top level",
1210+
expr: `sum by (region) (sum by (pod) (metric))`,
1211+
expected: true,
1212+
},
1213+
{
1214+
name: "nested sum by(pod)(sum by(region)(X)) does not preserve",
1215+
expr: `sum by (pod) (sum by (region) (metric))`,
1216+
expected: false,
1217+
},
1218+
{
1219+
name: "binary with scalar preserves",
1220+
expr: `metric / 1000`,
1221+
expected: true,
1222+
},
1223+
{
1224+
name: "avg by partition label preserves",
1225+
expr: `avg by (region) (metric)`,
1226+
expected: true,
1227+
},
1228+
{
1229+
name: "avg by non-partition label does not preserve",
1230+
expr: `avg by (pod) (metric)`,
1231+
expected: false,
1232+
},
1233+
{
1234+
name: "empty partition labels returns false",
1235+
expr: `metric`,
1236+
partitionLabels: map[string]struct{}{},
1237+
expected: false,
1238+
},
1239+
}
1240+
1241+
for _, tc := range cases {
1242+
t.Run(tc.name, func(t *testing.T) {
1243+
pl := partitionLabels
1244+
if tc.partitionLabels != nil {
1245+
pl = tc.partitionLabels
1246+
}
1247+
result := preservesPartitionLabels(parse(t, tc.expr), pl)
1248+
testutil.Equals(t, tc.expected, result)
1249+
})
1250+
}
1251+
}
1252+
1253+
func FuzzDistributedExecutionPreservesPartitionLabels(f *testing.F) {
1254+
f.Add(int64(0))
1255+
f.Fuzz(func(t *testing.T, seed int64) {
1256+
rnd := rand.New(rand.NewSource(seed))
1257+
1258+
load := `load 30s
1259+
http_requests_total{pod="nginx-1", region="east"} 1+1x15
1260+
http_requests_total{pod="nginx-2", region="east"} 2+2x15
1261+
http_requests_total{pod="nginx-1", region="west"} 3+1x15
1262+
http_requests_total{pod="nginx-2", region="west"} 4+2x15`
1263+
1264+
testStorage := promqltest.LoadedStorage(t, load)
1265+
defer testStorage.Close()
1266+
1267+
engines := []api.RemoteEngine{
1268+
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "east")}),
1269+
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "west")}),
1270+
}
1271+
optimizers := []Optimizer{
1272+
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
1273+
}
1274+
1275+
lbls := []labels.Labels{
1276+
labels.FromStrings("__name__", "http_requests_total", "pod", "nginx-1", "region", "east"),
1277+
labels.FromStrings("__name__", "http_requests_total", "pod", "nginx-2", "region", "west"),
1278+
}
1279+
1280+
// Exclude functions that produce unlabeled series by design.
1281+
enabledFunctions := make([]*parser.Function, 0, len(parser.Functions))
1282+
for _, f := range parser.Functions {
1283+
switch f.Name {
1284+
case "vector", "absent", "absent_over_time":
1285+
continue
1286+
}
1287+
enabledFunctions = append(enabledFunctions, f)
1288+
}
1289+
1290+
psOpts := []promqlsmith.Option{
1291+
promqlsmith.WithEnableOffset(false),
1292+
promqlsmith.WithEnableAtModifier(false),
1293+
promqlsmith.WithEnabledAggrs([]parser.ItemType{
1294+
parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP,
1295+
parser.COUNT, parser.QUANTILE, parser.STDDEV, parser.STDVAR,
1296+
parser.COUNT_VALUES, parser.TOPK, parser.BOTTOMK,
1297+
}),
1298+
promqlsmith.WithEnabledFunctions(enabledFunctions),
1299+
promqlsmith.WithEnableVectorMatching(true),
1300+
}
1301+
ps := promqlsmith.New(rnd, lbls, psOpts...)
1302+
1303+
ng := promql.NewEngine(promql.EngineOpts{
1304+
Timeout: 1 * time.Hour,
1305+
MaxSamples: 1e10,
1306+
EnableNegativeOffset: true,
1307+
EnableAtModifier: true,
1308+
})
1309+
1310+
start := time.Unix(60, 0)
1311+
end := time.Unix(120, 0)
1312+
step := 30 * time.Second
1313+
1314+
opts := &query.Options{
1315+
Start: start,
1316+
End: end,
1317+
Step: step,
1318+
}
1319+
for range testRuns {
1320+
expr := ps.WalkRangeQuery()
1321+
exprStr := expr.Pretty(0)
1322+
1323+
parsed, err := parser.ParseExpr(exprStr)
1324+
if err != nil {
1325+
continue
1326+
}
1327+
1328+
plan, err := NewFromAST(parsed, opts, PlanOptions{})
1329+
if err != nil {
1330+
continue
1331+
}
1332+
1333+
optimizedPlan, _ := plan.Optimize(optimizers)
1334+
root := optimizedPlan.Root()
1335+
1336+
// For each remote query in the optimized plan, execute it
1337+
// against the test storage and verify that all result series
1338+
// still have the partition label "region".
1339+
Traverse(&root, func(node *Node) {
1340+
remote, ok := (*node).(RemoteExecution)
1341+
if !ok {
1342+
return
1343+
}
1344+
1345+
// Skip remote queries that don't touch real series data
1346+
// (e.g. scalar parameters to aggregations like quantile).
1347+
hasSelector := false
1348+
var remoteNode Node = remote.Query
1349+
Traverse(&remoteNode, func(n *Node) {
1350+
switch (*n).(type) {
1351+
case *VectorSelector, *MatrixSelector:
1352+
hasSelector = true
1353+
}
1354+
})
1355+
if !hasSelector {
1356+
return
1357+
}
1358+
1359+
remoteQuery := remote.Query.String()
1360+
qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, remoteQuery, start, end, step)
1361+
if err != nil {
1362+
return
1363+
}
1364+
result := qry.Exec(context.Background())
1365+
if result.Err != nil {
1366+
return
1367+
}
1368+
1369+
matrix, err := result.Matrix()
1370+
if err != nil {
1371+
return
1372+
}
1373+
1374+
for _, series := range matrix {
1375+
if !series.Metric.Has("region") {
1376+
t.Errorf(
1377+
"remote query result series missing partition label 'region'\n"+
1378+
" original: %s\n"+
1379+
" optimized: %s\n"+
1380+
" remote query: %s\n"+
1381+
" series: %s",
1382+
exprStr, root.String(), remoteQuery, series.Metric.String(),
1383+
)
1384+
}
1385+
}
1386+
})
1387+
}
1388+
})
1389+
}

0 commit comments

Comments
 (0)