Skip to content

Commit d2b8f12

Browse files
committed
Merge branch 'slurm-refactor'
2 parents 510648e + ce140e5 commit d2b8f12

File tree

7 files changed

+672
-404
lines changed

7 files changed

+672
-404
lines changed

cmd/cc-slurm-adapter/daemon.go

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func DaemonMain() error {
106106
profiler.Begin()
107107

108108
trace.Info("Job Event timer triggered (%d events queued)", len(jobEvents))
109-
slurmApi.ClearJobCache()
110109
jobEventsProcess()
111110
if len(jobEvents) > 0 {
112111
jobEventTimer.Reset(queryDelay)
@@ -124,7 +123,6 @@ func DaemonMain() error {
124123
pollEventTicker.Reset(pollEventInterval)
125124
}
126125

127-
slurmApi.ClearJobCache()
128126
err = ccApi.CacheUpdate()
129127
if err != nil {
130128
trace.Error("Unable to update cc-backend cache. Trying later...")
@@ -270,10 +268,10 @@ func jobEventsProcess() {
270268
newJobEvents := make([]prep.SlurmctldEnv, 0)
271269

272270
// map[cluster]jobId
273-
clusterQueries := make(map[string][]uint32, 0)
271+
clusterQueries := make(map[string][]int64, 0)
274272

275273
for _, jobEvent := range jobEvents {
276-
jobEventId, err := strconv.ParseUint(jobEvent.SLURM_JOB_ID, 10, 32)
274+
jobEventId, err := strconv.ParseInt(jobEvent.SLURM_JOB_ID, 10, 64)
277275
if err != nil {
278276
trace.Warn("SLURM_JOB_ID contains non-integer value: %v", err)
279277
continue
@@ -285,10 +283,12 @@ func jobEventsProcess() {
285283
continue
286284
}
287285

288-
clusterQueries[jobEventCluster] = append(clusterQueries[jobEventCluster], uint32(jobEventId))
286+
clusterQueries[jobEventCluster] = append(clusterQueries[jobEventCluster], jobEventId)
289287
}
290288

291289
for cluster, jobIds := range clusterQueries {
290+
// Strictly speaking, QueryJobs does an sacct call for the job list, which becomes
291+
// redundant once we do the squeue call on the job list later. This could be optimized.
292292
jobs, err := slurmApi.QueryJobs(cluster, jobIds)
293293
if err != nil {
294294
jobEventSacctAttempts += 1
@@ -302,11 +302,9 @@ func jobEventsProcess() {
302302
}
303303
}
304304

305-
for _, job := range jobs {
306-
err := ccApi.SyncJob(job, false)
307-
if err != nil {
308-
trace.Warn("Syncing job (%s, %d) via PrEp hook failed (we will try again later during regular poll): %v", cluster, job.GetJobId(), err)
309-
}
305+
err = ccApi.SyncJobs(cluster, jobs, false)
306+
if err != nil {
307+
trace.Warn("Syncing job (%s, %v) via PrEp hook failed (we will try again later during regular poll): %v", cluster, jobIdsOfJobs(jobs), err)
310308
}
311309
}
312310

@@ -343,12 +341,10 @@ func processSlurmSacctPoll() {
343341
return
344342
}
345343

346-
for _, job := range jobs {
347-
err = ccApi.SyncJob(job, false)
348-
if err != nil {
349-
trace.Error("Syncing job to ClusterCockpit failed (%s). Trying later...", err)
350-
return
351-
}
344+
err = ccApi.SyncJobs(cluster, jobs, false)
345+
if err != nil {
346+
trace.Error("Syncing job (%s, %v) to ClusterCockpit failed (%s). Trying later...", cluster, jobIdsOfJobs(jobs), err)
347+
return
352348
}
353349

354350
if len(jobs) > 0 {
@@ -379,7 +375,7 @@ func processSlurmSqueuePoll() {
379375
// Check if there are any stale jobs in cc-backend, which are no longer known to Slurm.
380376
// This should usually not happen, but in the past Slurm would occasionally lie to use and we would miss
381377
// job stops.
382-
jobIdsToQuery := make([]uint32, 0)
378+
jobIdsToQuery := make([]int64, 0)
383379

384380
for jobId, cachedJobState := range ccApi.JobCache[cluster] {
385381
if !cachedJobState.Running {
@@ -397,31 +393,39 @@ func processSlurmSqueuePoll() {
397393
continue
398394
}
399395

400-
jobIdsToQuery = append(jobIdsToQuery, uint32(jobId))
396+
jobIdsToQuery = append(jobIdsToQuery, jobId)
401397
}
402398

403399
if len(jobIdsToQuery) == 0 {
404400
continue
405401
}
406402

407403
trace.Warn("Detected stale jobs in cc-backend (%s, %v). Trying to synchronize...", cluster, jobIdsToQuery)
408-
saJobs, err := slurmApi.QueryJobs(cluster, jobIdsToQuery)
404+
jobs, err := slurmApi.QueryJobs(cluster, jobIdsToQuery)
409405
if err != nil {
410406
trace.Error("Failed to query cc-backend's stale job from Slurm: %v", err)
411407
continue
412408
}
413409

414-
for _, job := range saJobs {
415-
trace.Warn("Stale job state is: %s", job.GetState())
410+
for _, job := range jobs {
411+
trace.Warn("Queuing sync of stale job (%s, %d), which is in state '%s'", job.GetCluster(), job.GetJobId(), job.GetState())
412+
}
416413

417-
err = ccApi.SyncJob(job, false)
418-
if err != nil {
419-
trace.Error("Failed to sync cc-backend's stale job from Slurm: %v", err)
420-
}
414+
err = ccApi.SyncJobs(cluster, jobs, false)
415+
if err != nil {
416+
trace.Error("Failed to sync cc-backend's stale job from Slurm: %v", err)
421417
}
422418
}
423419
}
424420

421+
func jobIdsOfJobs(jobs []slurm_common.Job) []int64 {
422+
result := make([]int64, len(jobs))
423+
for i, job := range jobs {
424+
result[i] = job.GetJobId()
425+
}
426+
return result
427+
}
428+
425429
func lastRunGet() time.Time {
426430
statInfo, err := os.Stat(config.Config.LastRunPath)
427431
if errors.Is(err, os.ErrNotExist) {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.24.0
55
toolchain go1.24.4
66

77
require (
8-
github.com/ClusterCockpit/cc-lib v1.0.2
8+
github.com/ClusterCockpit/cc-lib/v2 v2.4.0
99
github.com/nats-io/nats.go v1.48.0
1010
)
1111

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/ClusterCockpit/cc-lib v1.0.2 h1:ZWn3oZkXgxrr3zSigBdlOOfayZ4Om4xL20DhmritPPg=
2-
github.com/ClusterCockpit/cc-lib v1.0.2/go.mod h1:UGdOvXEnjFqlnPSxtvtFwO6BtXYW6NnXFoud9FtN93k=
1+
github.com/ClusterCockpit/cc-lib/v2 v2.4.0 h1:OnZlvqSatg7yCQ2NtSR7AddpUVSiuSMZ8scF1a7nfOk=
2+
github.com/ClusterCockpit/cc-lib/v2 v2.4.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
33
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
44
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
55
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=

0 commit comments

Comments
 (0)