Skip to content

Commit c920c57

Browse files
committed
Add parquet file job archiving target
1 parent a8194de commit c920c57

File tree

10 files changed

+755
-9
lines changed

10 files changed

+755
-9
lines changed

go.mod

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/ClusterCockpit/cc-backend
22

3-
go 1.24.0
4-
5-
toolchain go1.24.1
3+
go 1.24.9
64

75
tool (
86
github.com/99designs/gqlgen
@@ -47,6 +45,7 @@ require (
4745
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
4846
github.com/KyleBanks/depth v1.2.1 // indirect
4947
github.com/agnivade/levenshtein v1.2.1 // indirect
48+
github.com/andybalholm/brotli v1.1.1 // indirect
5049
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
5150
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect
5251
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 // indirect
@@ -98,6 +97,10 @@ require (
9897
github.com/nats-io/nkeys v0.4.12 // indirect
9998
github.com/nats-io/nuid v1.0.1 // indirect
10099
github.com/oapi-codegen/runtime v1.1.1 // indirect
100+
github.com/parquet-go/bitpack v1.0.0 // indirect
101+
github.com/parquet-go/jsonlite v1.0.0 // indirect
102+
github.com/parquet-go/parquet-go v0.27.0 // indirect
103+
github.com/pierrec/lz4/v4 v4.1.21 // indirect
101104
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
102105
github.com/prometheus/common v0.67.4 // indirect
103106
github.com/robfig/cron/v3 v3.0.1 // indirect
@@ -106,6 +109,7 @@ require (
106109
github.com/stmcginnis/gofish v0.20.0 // indirect
107110
github.com/stretchr/objx v0.5.2 // indirect
108111
github.com/swaggo/files v1.0.1 // indirect
112+
github.com/twpayne/go-geom v1.6.1 // indirect
109113
github.com/urfave/cli/v2 v2.27.7 // indirect
110114
github.com/urfave/cli/v3 v3.6.1 // indirect
111115
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 // indirect
@@ -118,6 +122,7 @@ require (
118122
golang.org/x/sys v0.39.0 // indirect
119123
golang.org/x/text v0.32.0 // indirect
120124
golang.org/x/tools v0.40.0 // indirect
125+
google.golang.org/protobuf v1.36.11 // indirect
121126
gopkg.in/yaml.v3 v3.0.1 // indirect
122127
sigs.k8s.io/yaml v1.6.0 // indirect
123128
)

go.sum

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e h1:4dAU9FXIyQktp
2121
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
2222
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
2323
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
24+
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
25+
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
2426
github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM=
2527
github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA=
2628
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM=
@@ -238,6 +240,14 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
238240
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
239241
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
240242
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
243+
github.com/parquet-go/bitpack v1.0.0 h1:AUqzlKzPPXf2bCdjfj4sTeacrUwsT7NlcYDMUQxPcQA=
244+
github.com/parquet-go/bitpack v1.0.0/go.mod h1:XnVk9TH+O40eOOmvpAVZ7K2ocQFrQwysLMnc6M/8lgs=
245+
github.com/parquet-go/jsonlite v1.0.0 h1:87QNdi56wOfsE5bdgas0vRzHPxfJgzrXGml1zZdd7VU=
246+
github.com/parquet-go/jsonlite v1.0.0/go.mod h1:nDjpkpL4EOtqs6NQugUsi0Rleq9sW/OtC1NnZEnxzF0=
247+
github.com/parquet-go/parquet-go v0.27.0 h1:vHWK2xaHbj+v1DYps03yDRpEsdtOeKbhiXUaixoPb3g=
248+
github.com/parquet-go/parquet-go v0.27.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg=
249+
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
250+
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
241251
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
242252
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
243253
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
@@ -285,6 +295,8 @@ github.com/swaggo/http-swagger v1.3.4 h1:q7t/XLx0n15H1Q9/tk3Y9L4n210XzJF5WtnDX64
285295
github.com/swaggo/http-swagger v1.3.4/go.mod h1:9dAh0unqMBAlbp1uE2Uc2mQTxNMU/ha4UbucIg1MFkQ=
286296
github.com/swaggo/swag v1.16.6 h1:qBNcx53ZaX+M5dxVyTrgQ0PJ/ACK+NzhwcbieTt+9yI=
287297
github.com/swaggo/swag v1.16.6/go.mod h1:ngP2etMK5a0P3QBizic5MEwpRmluJZPHjXcMoj4Xesg=
298+
github.com/twpayne/go-geom v1.6.1 h1:iLE+Opv0Ihm/ABIcvQFGIiFBXd76oBIar9drAwHFhR4=
299+
github.com/twpayne/go-geom v1.6.1/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028=
288300
github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU=
289301
github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4=
290302
github.com/urfave/cli/v3 v3.6.1 h1:j8Qq8NyUawj/7rTYdBGrxcH7A/j7/G8Q5LhWEW4G3Mo=
@@ -293,6 +305,7 @@ github.com/vektah/gqlparser/v2 v2.5.31 h1:YhWGA1mfTjID7qJhd1+Vxhpk5HTgydrGU9IgkW
293305
github.com/vektah/gqlparser/v2 v2.5.31/go.mod h1:c1I28gSOVNzlfc4WuDlqU7voQnsqI6OG2amkBAFmgts=
294306
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg=
295307
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
308+
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
296309
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
297310
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
298311
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

internal/taskmanager/retentionService.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/ClusterCockpit/cc-backend/pkg/archive"
12+
pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet"
1213
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
1314
"github.com/go-co-op/gocron/v2"
1415
)
@@ -66,3 +67,96 @@ func RegisterRetentionMoveService(age int, includeDB bool, location string, omit
6667
}
6768
}))
6869
}
70+
71+
func RegisterRetentionParquetService(retention Retention) {
72+
cclog.Info("Register retention parquet service")
73+
74+
maxFileSizeMB := retention.MaxFileSizeMB
75+
if maxFileSizeMB <= 0 {
76+
maxFileSizeMB = 512
77+
}
78+
79+
var target pqarchive.ParquetTarget
80+
var err error
81+
82+
switch retention.TargetKind {
83+
case "s3":
84+
target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{
85+
Endpoint: retention.TargetEndpoint,
86+
Bucket: retention.TargetBucket,
87+
AccessKey: retention.TargetAccessKey,
88+
SecretKey: retention.TargetSecretKey,
89+
Region: retention.TargetRegion,
90+
UsePathStyle: retention.TargetUsePathStyle,
91+
})
92+
default:
93+
target, err = pqarchive.NewFileTarget(retention.TargetPath)
94+
}
95+
96+
if err != nil {
97+
cclog.Errorf("Parquet retention: failed to create target: %v", err)
98+
return
99+
}
100+
101+
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))),
102+
gocron.NewTask(
103+
func() {
104+
startTime := time.Now().Unix() - int64(retention.Age*24*3600)
105+
jobs, err := jobRepo.FindJobsBetween(0, startTime, retention.OmitTagged)
106+
if err != nil {
107+
cclog.Warnf("Parquet retention: error finding jobs: %v", err)
108+
return
109+
}
110+
if len(jobs) == 0 {
111+
return
112+
}
113+
114+
cclog.Infof("Parquet retention: processing %d jobs", len(jobs))
115+
ar := archive.GetHandle()
116+
pw := pqarchive.NewParquetWriter(target, maxFileSizeMB)
117+
118+
for _, job := range jobs {
119+
meta, err := ar.LoadJobMeta(job)
120+
if err != nil {
121+
cclog.Warnf("Parquet retention: load meta for job %d: %v", job.JobID, err)
122+
continue
123+
}
124+
125+
data, err := ar.LoadJobData(job)
126+
if err != nil {
127+
cclog.Warnf("Parquet retention: load data for job %d: %v", job.JobID, err)
128+
continue
129+
}
130+
131+
row, err := pqarchive.JobToParquetRow(meta, &data)
132+
if err != nil {
133+
cclog.Warnf("Parquet retention: convert job %d: %v", job.JobID, err)
134+
continue
135+
}
136+
137+
if err := pw.AddJob(*row); err != nil {
138+
cclog.Errorf("Parquet retention: add job %d to writer: %v", job.JobID, err)
139+
continue
140+
}
141+
}
142+
143+
if err := pw.Close(); err != nil {
144+
cclog.Errorf("Parquet retention: close writer: %v", err)
145+
return
146+
}
147+
148+
ar.CleanUp(jobs)
149+
150+
if retention.IncludeDB {
151+
cnt, err := jobRepo.DeleteJobsBefore(startTime, retention.OmitTagged)
152+
if err != nil {
153+
cclog.Errorf("Parquet retention: delete jobs from db: %v", err)
154+
} else {
155+
cclog.Infof("Parquet retention: removed %d jobs from db", cnt)
156+
}
157+
if err = jobRepo.Optimize(); err != nil {
158+
cclog.Errorf("Parquet retention: db optimization error: %v", err)
159+
}
160+
}
161+
}))
162+
}

internal/taskmanager/taskManager.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,20 @@ const (
2323

2424
// Retention defines the configuration for job retention policies.
2525
type Retention struct {
26-
Policy string `json:"policy"`
27-
Location string `json:"location"`
28-
Age int `json:"age"`
29-
IncludeDB bool `json:"includeDB"`
30-
OmitTagged bool `json:"omitTagged"`
26+
Policy string `json:"policy"`
27+
Location string `json:"location"`
28+
Age int `json:"age"`
29+
IncludeDB bool `json:"includeDB"`
30+
OmitTagged bool `json:"omitTagged"`
31+
TargetKind string `json:"target-kind"`
32+
TargetPath string `json:"target-path"`
33+
TargetEndpoint string `json:"target-endpoint"`
34+
TargetBucket string `json:"target-bucket"`
35+
TargetAccessKey string `json:"target-access-key"`
36+
TargetSecretKey string `json:"target-secret-key"`
37+
TargetRegion string `json:"target-region"`
38+
TargetUsePathStyle bool `json:"target-use-path-style"`
39+
MaxFileSizeMB int `json:"max-file-size-mb"`
3140
}
3241

3342
// CronFrequency defines the execution intervals for various background workers.
@@ -87,6 +96,8 @@ func initArchiveServices(config json.RawMessage) {
8796
cfg.Retention.IncludeDB,
8897
cfg.Retention.Location,
8998
cfg.Retention.OmitTagged)
99+
case "parquet":
100+
RegisterRetentionParquetService(cfg.Retention)
90101
}
91102

92103
if cfg.Compression > 0 {

pkg/archive/ConfigSchema.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ var configSchema = `
5757
"policy": {
5858
"description": "Retention policy",
5959
"type": "string",
60-
"enum": ["none", "delete", "move"]
60+
"enum": ["none", "delete", "move", "parquet"]
6161
},
6262
"include-db": {
6363
"description": "Also remove jobs from database",
@@ -70,6 +70,43 @@ var configSchema = `
7070
"location": {
7171
"description": "The target directory for retention. Only applicable for retention move.",
7272
"type": "string"
73+
},
74+
"target-kind": {
75+
"description": "Target storage kind for parquet retention: file or s3",
76+
"type": "string",
77+
"enum": ["file", "s3"]
78+
},
79+
"target-path": {
80+
"description": "Target directory path for parquet file storage",
81+
"type": "string"
82+
},
83+
"target-endpoint": {
84+
"description": "S3 endpoint URL for parquet target",
85+
"type": "string"
86+
},
87+
"target-bucket": {
88+
"description": "S3 bucket name for parquet target",
89+
"type": "string"
90+
},
91+
"target-access-key": {
92+
"description": "S3 access key for parquet target",
93+
"type": "string"
94+
},
95+
"target-secret-key": {
96+
"description": "S3 secret key for parquet target",
97+
"type": "string"
98+
},
99+
"target-region": {
100+
"description": "S3 region for parquet target",
101+
"type": "string"
102+
},
103+
"target-use-path-style": {
104+
"description": "Use path-style S3 URLs for parquet target",
105+
"type": "boolean"
106+
},
107+
"max-file-size-mb": {
108+
"description": "Maximum parquet file size in MB before splitting",
109+
"type": "integer"
73110
}
74111
},
75112
"required": ["policy"]

pkg/archive/parquet/convert.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
2+
// All rights reserved. This file is part of cc-backend.
3+
// Use of this source code is governed by a MIT-style
4+
// license that can be found in the LICENSE file.
5+
6+
package parquet
7+
8+
import (
9+
"bytes"
10+
"compress/gzip"
11+
"encoding/json"
12+
"fmt"
13+
14+
"github.com/ClusterCockpit/cc-lib/v2/schema"
15+
)
16+
17+
// JobToParquetRow converts job metadata and metric data into a flat ParquetJobRow.
18+
// Nested fields are marshaled to JSON; metric data is gzip-compressed JSON.
19+
func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, error) {
20+
resourcesJSON, err := json.Marshal(meta.Resources)
21+
if err != nil {
22+
return nil, fmt.Errorf("marshal resources: %w", err)
23+
}
24+
25+
var statisticsJSON []byte
26+
if meta.Statistics != nil {
27+
statisticsJSON, err = json.Marshal(meta.Statistics)
28+
if err != nil {
29+
return nil, fmt.Errorf("marshal statistics: %w", err)
30+
}
31+
}
32+
33+
var tagsJSON []byte
34+
if len(meta.Tags) > 0 {
35+
tagsJSON, err = json.Marshal(meta.Tags)
36+
if err != nil {
37+
return nil, fmt.Errorf("marshal tags: %w", err)
38+
}
39+
}
40+
41+
var metaDataJSON []byte
42+
if meta.MetaData != nil {
43+
metaDataJSON, err = json.Marshal(meta.MetaData)
44+
if err != nil {
45+
return nil, fmt.Errorf("marshal metadata: %w", err)
46+
}
47+
}
48+
49+
var footprintJSON []byte
50+
if meta.Footprint != nil {
51+
footprintJSON, err = json.Marshal(meta.Footprint)
52+
if err != nil {
53+
return nil, fmt.Errorf("marshal footprint: %w", err)
54+
}
55+
}
56+
57+
var energyFootJSON []byte
58+
if meta.EnergyFootprint != nil {
59+
energyFootJSON, err = json.Marshal(meta.EnergyFootprint)
60+
if err != nil {
61+
return nil, fmt.Errorf("marshal energy footprint: %w", err)
62+
}
63+
}
64+
65+
metricDataGz, err := compressJobData(data)
66+
if err != nil {
67+
return nil, fmt.Errorf("compress metric data: %w", err)
68+
}
69+
70+
return &ParquetJobRow{
71+
JobID: meta.JobID,
72+
Cluster: meta.Cluster,
73+
SubCluster: meta.SubCluster,
74+
Partition: meta.Partition,
75+
Project: meta.Project,
76+
User: meta.User,
77+
State: string(meta.State),
78+
StartTime: meta.StartTime,
79+
Duration: meta.Duration,
80+
Walltime: meta.Walltime,
81+
NumNodes: meta.NumNodes,
82+
NumHWThreads: meta.NumHWThreads,
83+
NumAcc: meta.NumAcc,
84+
Exclusive: meta.Exclusive,
85+
Energy: meta.Energy,
86+
SMT: meta.SMT,
87+
ResourcesJSON: resourcesJSON,
88+
StatisticsJSON: statisticsJSON,
89+
TagsJSON: tagsJSON,
90+
MetaDataJSON: metaDataJSON,
91+
FootprintJSON: footprintJSON,
92+
EnergyFootJSON: energyFootJSON,
93+
MetricDataGz: metricDataGz,
94+
}, nil
95+
}
96+
97+
func compressJobData(data *schema.JobData) ([]byte, error) {
98+
jsonBytes, err := json.Marshal(data)
99+
if err != nil {
100+
return nil, err
101+
}
102+
103+
var buf bytes.Buffer
104+
gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
105+
if err != nil {
106+
return nil, err
107+
}
108+
if _, err := gz.Write(jsonBytes); err != nil {
109+
return nil, err
110+
}
111+
if err := gz.Close(); err != nil {
112+
return nil, err
113+
}
114+
115+
return buf.Bytes(), nil
116+
}

0 commit comments

Comments
 (0)