Skip to content

Commit c0566eb

Browse files
authored
chore: Add S3 bucket configuration function for querycomparator (#20777)
1 parent 2de6e79 commit c0566eb

File tree

6 files changed

+91
-55
lines changed

6 files changed

+91
-55
lines changed

tools/querycomparator/compare.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ func addCompareCommand(app *kingpin.Application) {
3737
cmd.Flag("host2", "Second Loki host").Default("localhost:3102").StringVar(&host2)
3838

3939
cmd.Action(func(_ *kingpin.ParseContext) error {
40-
storageBucket = cfg.Bucket
4140
orgID = cfg.OrgID
4241

4342
parsed, err := parseTimeConfig(&cfg)

tools/querycomparator/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ func parseTimeConfig(cfg *Config) (*ParsedConfig, error) {
4747

4848
// Global variables for bucket and org ID (used by storage functions)
4949
var (
50-
storageBucket string
5150
orgID string
5251
indexStoragePrefix string
5352
logger log.Logger

tools/querycomparator/execute.go

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/grafana/dskit/user"
1717
"github.com/prometheus/client_golang/prometheus"
1818
"github.com/prometheus/common/model"
19+
"github.com/thanos-io/objstore"
1920
"golang.org/x/net/http2"
2021
"golang.org/x/net/http2/h2c"
2122

@@ -51,7 +52,6 @@ func addExecuteCommand(app *kingpin.Application) {
5152
cmd.Flag("engine", "Engine version (1 or 2)").Default("2").IntVar(&engineVersion)
5253

5354
cmd.Action(func(_ *kingpin.ParseContext) error {
54-
storageBucket = cfg.Bucket
5555
orgID = cfg.OrgID
5656

5757
parsed, err := parseTimeConfig(&cfg)
@@ -69,22 +69,29 @@ func addExecuteCommand(app *kingpin.Application) {
6969

7070
switch engineVersion {
7171
case 1:
72-
return doExecuteLocallyV1(params)
72+
return doExecuteLocallyV1(params, cfg.Bucket)
7373
case 2:
74-
return doExecuteLocallyV2(params)
74+
return doExecuteLocallyV2(params, MustGCSDataobjBucket(cfg.Bucket))
7575
default:
7676
return fmt.Errorf("unsupported engine version: %d (must be 1 or 2)", engineVersion)
7777
}
7878
})
7979
}
8080

81+
func initV2Settings() {
82+
if indexStoragePrefix == "" {
83+
level.Warn(logger).Log("msg", "index storage prefix is not set. using default value.")
84+
indexStoragePrefix = "index/v0"
85+
}
86+
}
87+
8188
// doExecuteLocallyV1 executes a query using the V1 engine
82-
func doExecuteLocallyV1(params logql.LiteralParams) error {
89+
func doExecuteLocallyV1(params logql.LiteralParams, bucketName string) error {
8390
if indexStoragePrefix == "" {
8491
level.Warn(logger).Log("msg", "index storage prefix is not set. v1 engine may not find any chunks.")
8592
}
8693
level.Info(logger).Log("msg", "executing local query with V1 engine")
87-
result, err := doLocalQueryWithV1Engine(params)
94+
result, err := doLocalQueryWithV1Engine(params, bucketName)
8895
if err != nil {
8996
level.Error(logger).Log("msg", "local query with V1 engine failed", "error", err)
9097
return fmt.Errorf("V1 query execution failed: %w", err)
@@ -93,29 +100,32 @@ func doExecuteLocallyV1(params logql.LiteralParams) error {
93100
}
94101

95102
// doExecuteLocallyV2 executes a query using the V2 engine
96-
func doExecuteLocallyV2(params logql.LiteralParams) error {
103+
func doExecuteLocallyV2(params logql.LiteralParams, bucket objstore.Bucket) error {
104+
initV2Settings()
97105
level.Info(logger).Log("msg", "executing local query with V2 engine")
98-
result, err := doLocalQueryWithV2Engine(params)
106+
result, err := doLocalQueryWithV2Engine(params, bucket)
99107
if err != nil {
100108
level.Error(logger).Log("msg", "V2 query execution failed", "error", err)
101109
return fmt.Errorf("V2 query execution failed: %w", err)
102110
}
103111
return checkResult(result)
104112
}
105113

106-
func doExecuteLocallyV2Scheduler(params logql.LiteralParams) error {
114+
func doExecuteLocallyV2Scheduler(params logql.LiteralParams, bucket objstore.Bucket) error {
115+
initV2Settings()
107116
level.Info(logger).Log("msg", "executing local query with V2 engine via local scheduler and worker")
108-
result, err := doLocalQueryWithV2EngineScheduler(params)
117+
result, err := doLocalQueryWithV2EngineScheduler(params, bucket)
109118
if err != nil {
110119
level.Error(logger).Log("msg", "v2 query execution failed", "error", err)
111120
return fmt.Errorf("v2 query execution failed: %w", err)
112121
}
113122
return checkResult(result)
114123
}
115124

116-
func doExecuteLocallyV2SchedulerRemote(params logql.LiteralParams) error {
125+
func doExecuteLocallyV2SchedulerRemote(params logql.LiteralParams, bucket objstore.Bucket) error {
126+
initV2Settings()
117127
level.Info(logger).Log("msg", "executing local query with V2 engine via remote scheduler and worker")
118-
result, err := doLocalQueryWithV2EngineSchedulerRemote(params)
128+
result, err := doLocalQueryWithV2EngineSchedulerRemote(params, bucket)
119129
if err != nil {
120130
level.Error(logger).Log("msg", "v2 query execution failed", "error", err)
121131
return fmt.Errorf("v2 query execution failed: %w", err)
@@ -138,23 +148,23 @@ func checkResult(result logqlmodel.Result) error {
138148
}
139149

140150
// doLocalQueryWithV2Engine executes a query using the V2 engine
141-
func doLocalQueryWithV2Engine(params logql.LiteralParams) (logqlmodel.Result, error) {
151+
func doLocalQueryWithV2Engine(params logql.LiteralParams, bucket objstore.Bucket) (logqlmodel.Result, error) {
142152
logger := glog.NewLogfmtLogger(os.Stderr)
143153
ms := metastore.NewObjectMetastore(
144-
MustDataobjBucket(),
154+
bucket,
145155
metastore.Config{IndexStoragePrefix: "index/v0"},
146156
logger,
147157
metastore.NewObjectMetastoreMetrics(prometheus.DefaultRegisterer),
148158
)
149159

150160
ctx := user.InjectOrgID(context.Background(), orgID)
151-
qe := engine.NewBasic(engine.ExecutorConfig{BatchSize: 512}, ms, MustDataobjBucket(), logql.NoLimits, prometheus.DefaultRegisterer, logger)
161+
qe := engine.NewBasic(engine.ExecutorConfig{BatchSize: 512}, ms, bucket, logql.NoLimits, prometheus.DefaultRegisterer, logger)
152162
query := qe.Query(params)
153163
return query.Exec(ctx)
154164
}
155165

156166
// doLocalQueryWithV1Engine executes a query using the V1 engine
157-
func doLocalQueryWithV1Engine(params logql.LiteralParams) (logqlmodel.Result, error) {
167+
func doLocalQueryWithV1Engine(params logql.LiteralParams, bucketName string) (logqlmodel.Result, error) {
158168
ctx := user.InjectOrgID(context.Background(), orgID)
159169

160170
l := &validation.Limits{}
@@ -175,7 +185,7 @@ func doLocalQueryWithV1Engine(params logql.LiteralParams) (logqlmodel.Result, er
175185
ObjectStore: bucket.ConfigWithNamedStores{
176186
Config: bucket.Config{
177187
GCS: gcs.Config{
178-
BucketName: storageBucket,
188+
BucketName: bucketName,
179189
},
180190
},
181191
},
@@ -226,7 +236,7 @@ func doLocalQueryWithV1Engine(params logql.LiteralParams) (logqlmodel.Result, er
226236
return query.Exec(ctx)
227237
}
228238

229-
func doLocalQueryWithV2EngineScheduler(params logql.LiteralParams) (logqlmodel.Result, error) {
239+
func doLocalQueryWithV2EngineScheduler(params logql.LiteralParams, bucket objstore.Bucket) (logqlmodel.Result, error) {
230240
ctx := user.InjectOrgID(context.Background(), orgID)
231241

232242
sched, err := engine.NewScheduler(engine.SchedulerParams{
@@ -239,16 +249,14 @@ func doLocalQueryWithV2EngineScheduler(params logql.LiteralParams) (logqlmodel.R
239249
return logqlmodel.Result{}, fmt.Errorf("starting scheduler service: %w", err)
240250
}
241251

242-
b := MustDataobjBucket()
243-
244252
metastoreMetrics := metastore.NewObjectMetastoreMetrics(prometheus.DefaultRegisterer)
245253
msConfig := metastore.Config{IndexStoragePrefix: "index/v0"}
246254
workerLogger := glog.With(logger, "component", "worker")
247255
worker, err := engine.NewWorker(engine.WorkerParams{
248256
Logger: workerLogger,
249257
AdvertiseAddr: nil,
250-
Bucket: b,
251-
Metastore: metastore.NewObjectMetastore(b, msConfig, workerLogger, metastoreMetrics),
258+
Bucket: bucket,
259+
Metastore: metastore.NewObjectMetastore(bucket, msConfig, workerLogger, metastoreMetrics),
252260
LocalScheduler: sched,
253261
Config: engine.WorkerConfig{
254262
SchedulerLookupAddress: "",
@@ -274,7 +282,7 @@ func doLocalQueryWithV2EngineScheduler(params logql.LiteralParams) (logqlmodel.R
274282
BatchSize: 128,
275283
},
276284
},
277-
Metastore: metastore.NewObjectMetastore(b, msConfig, engineLogger, metastoreMetrics),
285+
Metastore: metastore.NewObjectMetastore(bucket, msConfig, engineLogger, metastoreMetrics),
278286
Scheduler: sched,
279287
Limits: logql.NoLimits,
280288
})
@@ -285,7 +293,8 @@ func doLocalQueryWithV2EngineScheduler(params logql.LiteralParams) (logqlmodel.R
285293
return e.Execute(ctx, params)
286294
}
287295

288-
func doLocalQueryWithV2EngineSchedulerRemote(params logql.LiteralParams) (logqlmodel.Result, error) {
296+
// doLocalQueryWithV2EngineSchedulerRemote executes a query using the V2 engine via remote scheduler and workers so it also executes the serialization logic.
297+
func doLocalQueryWithV2EngineSchedulerRemote(params logql.LiteralParams, bucket objstore.Bucket) (logqlmodel.Result, error) {
289298
ctx := user.InjectOrgID(context.Background(), orgID)
290299

291300
schedSrv, schedSvc, err := newServerService("scheduler", 3101, logger, prometheus.NewRegistry())
@@ -313,19 +322,18 @@ func doLocalQueryWithV2EngineSchedulerRemote(params logql.LiteralParams) (logqlm
313322
return logqlmodel.Result{}, fmt.Errorf("starting worker service: %w", err)
314323
}
315324

316-
b := MustDataobjBucket()
317325
metastoreMetrics := metastore.NewObjectMetastoreMetrics(prometheus.DefaultRegisterer)
318-
msConfig := metastore.Config{IndexStoragePrefix: "index/v0"}
326+
msConfig := metastore.Config{IndexStoragePrefix: indexStoragePrefix}
319327
workerLogger := glog.With(logger, "component", "worker")
320328
worker, err := engine.NewWorker(engine.WorkerParams{
321329
Logger: workerLogger,
322330
AdvertiseAddr: workerSrv.HTTPListenAddr(),
323-
Bucket: b,
324-
Metastore: metastore.NewObjectMetastore(b, msConfig, workerLogger, metastoreMetrics),
331+
Bucket: bucket,
332+
Metastore: metastore.NewObjectMetastore(bucket, msConfig, workerLogger, metastoreMetrics),
325333
LocalScheduler: nil,
326334
Config: engine.WorkerConfig{
327335
SchedulerLookupAddress: schedSrv.HTTPListenAddr().String(),
328-
SchedulerLookupInterval: 60,
336+
SchedulerLookupInterval: time.Minute,
329337
WorkerThreads: 64,
330338
},
331339
Executor: engine.ExecutorConfig{
@@ -348,7 +356,7 @@ func doLocalQueryWithV2EngineSchedulerRemote(params logql.LiteralParams) (logqlm
348356
BatchSize: 128,
349357
},
350358
},
351-
Metastore: metastore.NewObjectMetastore(b, msConfig, engineLogger, metastoreMetrics),
359+
Metastore: metastore.NewObjectMetastore(bucket, msConfig, engineLogger, metastoreMetrics),
352360
Scheduler: sched,
353361
Limits: logql.NoLimits,
354362
})

tools/querycomparator/main_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,21 @@ import (
99
"github.com/stretchr/testify/require"
1010
)
1111

12-
func TestDebugCmd(t *testing.T) {
12+
func TestQueryComparatorCmd(t *testing.T) {
1313
t.Skip("Test for debugging purposes only")
1414

15-
storageBucket = "" // TODO: set bucket name for local engine support
16-
indexStoragePrefix = "" // TODO: set index storage prefix for local engine support
17-
orgID = "" // TODO: set org ID for querying remote instances
15+
bucket := MustGCSDataobjBucket("")
16+
orgID = "" // TODO: set org ID for querying remote instances
1817

19-
start := time.Date(2025, 10, 24, 0, 0, 0, 0, time.UTC)
20-
end := start.Add(45 * time.Minute)
21-
query := "{namespace=\"test\"}"
18+
start := time.Date(2026, 2, 1, 0, 0, 0, 0, time.UTC)
19+
end := start.Add(1 * time.Minute)
20+
query := `{container="distributor"}`
2221

2322
params, err := logql.NewLiteralParams(query, start, end, 0, 0, logproto.BACKWARD, 1000, nil, nil)
2423
require.NoError(t, err)
2524

2625
// Run subcommands as necessary
2726
require.NoError(t, doComparison(params, "localhost:3101", "localhost:3102"))
28-
require.NoError(t, queryMetastore(params))
29-
require.NoError(t, doExecuteLocallyV2(params))
27+
require.NoError(t, queryMetastore(params, bucket))
28+
require.NoError(t, doExecuteLocallyV2SchedulerRemote(params, bucket))
3029
}

tools/querycomparator/metastore.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/go-kit/log/level"
1313
"github.com/grafana/dskit/user"
1414
"github.com/prometheus/prometheus/model/labels"
15+
"github.com/thanos-io/objstore"
1516

1617
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
1718
"github.com/grafana/loki/v3/pkg/logproto"
@@ -31,33 +32,34 @@ func addMetastoreCommand(app *kingpin.Application) {
3132
cmd.Flag("query", "LogQL query to analyze").Required().StringVar(&cfg.Query)
3233

3334
cmd.Action(func(_ *kingpin.ParseContext) error {
34-
storageBucket = cfg.Bucket
3535
orgID = cfg.OrgID
3636

3737
parsed, err := parseTimeConfig(&cfg)
3838
if err != nil {
3939
return err
4040
}
4141

42+
bucket := MustGCSDataobjBucket(cfg.Bucket)
43+
4244
params, err := logql.NewLiteralParams(cfg.Query, parsed.StartTime, parsed.EndTime, 0, 0, logproto.BACKWARD, 10, nil, nil)
4345
if err != nil {
4446
return err
4547
}
4648

47-
return queryMetastore(params)
49+
return queryMetastore(params, bucket)
4850
})
4951
}
5052

5153
// queryMetastore queries the metastore for stream sections
52-
func queryMetastore(params logql.LiteralParams) error {
54+
func queryMetastore(params logql.LiteralParams, bucket objstore.Bucket) error {
5355
query := params.QueryString()
5456
closeIdx := strings.Index(query, "}")
5557
streamMatchers, err := syntax.ParseMatchers(query[:closeIdx+1], true)
5658
if err != nil {
5759
return err
5860
}
5961

60-
sections, err := getSections(params.Start(), params.End(), streamMatchers)
62+
sections, err := getSections(bucket, params.Start(), params.End(), streamMatchers)
6163
if err != nil {
6264
return err
6365
}
@@ -70,11 +72,11 @@ func queryMetastore(params logql.LiteralParams) error {
7072

7173
// getSections queries the metastore for dataobject sections matching the query selector
7274
// Currently, it does not pass structured metadata predicates
73-
func getSections(start, end time.Time, streamMatchers []*labels.Matcher) ([]*metastore.DataobjSectionDescriptor, error) {
75+
func getSections(bucket objstore.Bucket, start, end time.Time, streamMatchers []*labels.Matcher) ([]*metastore.DataobjSectionDescriptor, error) {
7476
ctx := user.InjectOrgID(context.Background(), orgID)
7577
ms := metastore.NewObjectMetastore(
76-
MustDataobjBucket(),
77-
metastore.Config{IndexStoragePrefix: "index/v0"},
78+
bucket,
79+
metastore.Config{IndexStoragePrefix: indexStoragePrefix},
7880
log.NewLogfmtLogger(os.Stderr),
7981
metastore.NewObjectMetastoreMetrics(nil),
8082
)

tools/querycomparator/storage.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,56 @@ import (
55
"log"
66

77
glog "github.com/go-kit/log"
8+
"github.com/grafana/dskit/flagext"
89
"github.com/grafana/loki/v3/pkg/storage/bucket/gcs"
10+
"github.com/grafana/loki/v3/pkg/storage/bucket/s3"
911
"github.com/thanos-io/objstore"
1012
)
1113

12-
// MustDataobjBucket creates a GCS bucket client for dataobj storage
13-
func MustDataobjBucket() objstore.Bucket {
14+
// MustS3DataobjBucket creates a S3 bucket client for dataobj storage
15+
// The access key id, secret access key, and session token are required for S3 dataobj bucket and must be provided.
16+
// The region endpoint follows the format "s3.<aws region name>.amazonaws.com" e.g. "s3.eu-south-2.amazonaws.com".
17+
func MustS3DataobjBucket(bucketName string, regionEndpoint string) objstore.Bucket {
18+
accessKeyID := ""
19+
secretAccessKey := ""
20+
sessionToken := ""
21+
22+
if accessKeyID == "" || secretAccessKey == "" || sessionToken == "" {
23+
log.Fatal("access key id, secret access key, and session token are required for S3 dataobj bucket")
24+
}
25+
26+
bkt, err := s3.NewBucketClient(s3.Config{
27+
Endpoint: regionEndpoint,
28+
BucketName: bucketName,
29+
AccessKeyID: accessKeyID,
30+
SecretAccessKey: flagext.SecretWithValue(secretAccessKey),
31+
SessionToken: flagext.SecretWithValue(sessionToken),
32+
}, "querycomparator", glog.NewNopLogger(), nil)
33+
if err != nil {
34+
log.Fatal(err)
35+
}
36+
37+
prefixedBkt := objstore.NewPrefixedBucket(bkt, "dataobj")
38+
return prefixedBkt
39+
}
40+
41+
// MustGCSDataobjBucket creates a GCS bucket client for dataobj storage
42+
func MustGCSDataobjBucket(bucketName string) objstore.Bucket {
1443
bkt, err := gcs.NewBucketClient(context.Background(), gcs.Config{
15-
BucketName: storageBucket,
16-
}, "testing", glog.NewNopLogger(), nil)
44+
BucketName: bucketName,
45+
}, "querycomparator", glog.NewNopLogger(), nil)
1746
if err != nil {
1847
log.Fatal(err)
1948
}
2049
objBucket := objstore.NewPrefixedBucket(bkt, "dataobj")
2150
return objBucket
2251
}
2352

24-
// MustRawBucket creates a GCS bucket client for raw storage
25-
func MustRawBucket() objstore.Bucket {
53+
// MustRawGCSBucket creates a GCS bucket client for raw storage
54+
func MustRawGCSBucket(bucketName string) objstore.Bucket {
2655
bkt, err := gcs.NewBucketClient(context.Background(), gcs.Config{
27-
BucketName: storageBucket,
28-
}, "testing", glog.NewNopLogger(), nil)
56+
BucketName: bucketName,
57+
}, "querycomparator", glog.NewNopLogger(), nil)
2958
if err != nil {
3059
log.Fatal(err)
3160
}

0 commit comments

Comments
 (0)