Skip to content

Commit cc0cb2f

Browse files
mustafai-grGitHub Enterprise
authored andcommitted
ARMADA-3283 - Implementing GetActiveQueues on QueryAPI (#393)
* Implementing GetActiveQueuesByPool * Generating code for proto change * Changing code to respect proto change * Removing unused imports * Making index creation concurrent * Removing dependency exclusion
1 parent 498774c commit cc0cb2f

File tree

14 files changed

+1018
-99
lines changed

14 files changed

+1018
-99
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE job_run ADD COLUMN pool TEXT;
2+
3+
CREATE INDEX CONCURRENTLY idx_job_run_state_pool ON job_run (job_run_state, pool);

internal/lookoutingester/instructions/instructions.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ func (c *InstructionConverter) handleJobRunLeased(ts time.Time, event *armadaeve
337337
JobId: event.JobId,
338338
Cluster: util.Truncate(event.ExecutorId, maxClusterLen),
339339
Node: pointer.String(util.Truncate(event.NodeId, maxNodeLen)),
340+
Pool: event.Pool,
340341
Leased: &ts,
341342
JobRunState: lookout.JobRunLeasedOrdinal,
342343
}

internal/lookoutingester/instructions/instructions_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ var expectedLeasedRun = model.CreateJobRunInstruction{
6565
Cluster: testfixtures.ExecutorId,
6666
Leased: &testfixtures.BaseTime,
6767
Node: pointer.String(testfixtures.NodeName),
68+
Pool: testfixtures.Pool,
6869
JobRunState: lookout.JobRunLeasedOrdinal,
6970
}
7071

internal/lookoutingester/lookoutdb/insertion.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,8 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions
569569
node varchar(512),
570570
leased timestamp,
571571
pending timestamp,
572-
job_run_state smallint
572+
job_run_state smallint,
573+
pool text
573574
) ON COMMIT DROP;`, tmpTable))
574575
if err != nil {
575576
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
@@ -588,6 +589,7 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions
588589
"leased",
589590
"pending",
590591
"job_run_state",
592+
"pool",
591593
},
592594
pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) {
593595
return []interface{}{
@@ -598,6 +600,7 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions
598600
instructions[i].Leased,
599601
instructions[i].Pending,
600602
instructions[i].JobRunState,
603+
instructions[i].Pool,
601604
}, nil
602605
}),
603606
)
@@ -615,7 +618,8 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions
615618
node,
616619
leased,
617620
pending,
618-
job_run_state
621+
job_run_state,
622+
pool
619623
) SELECT * from %s
620624
ON CONFLICT DO NOTHING`, tmpTable))
621625
if err != nil {
@@ -635,8 +639,9 @@ func (l *LookoutDb) CreateJobRunsScalar(ctx *armadacontext.Context, instructions
635639
node,
636640
leased,
637641
pending,
638-
job_run_state)
639-
VALUES ($1, $2, $3, $4, $5, $6, $7)
642+
job_run_state,
643+
pool)
644+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
640645
ON CONFLICT DO NOTHING`
641646
for _, i := range instructions {
642647
err := l.withDatabaseRetryInsert(func() error {
@@ -647,7 +652,8 @@ func (l *LookoutDb) CreateJobRunsScalar(ctx *armadacontext.Context, instructions
647652
i.Node,
648653
i.Leased,
649654
i.Pending,
650-
i.JobRunState)
655+
i.JobRunState,
656+
i.Pool)
651657
if err != nil {
652658
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
653659
}

internal/lookoutingester/model/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type CreateJobRunInstruction struct {
4747
RunId string
4848
JobId string
4949
Cluster string
50+
Pool string
5051
Node *string
5152
Leased *time.Time
5253
Pending *time.Time

internal/server/queryapi/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/server/queryapi/database/query.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,6 @@ SELECT job_id, state FROM job
2222
WHERE queue=sqlc.arg(queue)::text
2323
AND jobset = sqlc.arg(jobset)::text
2424
AND external_job_uri = sqlc.arg(external_job_uri)::text;
25+
26+
-- name: GetActiveQueuesByPool :many
27+
SELECT DISTINCT jr.pool, j.queue FROM job j JOIN job_run jr ON j.job_id = jr.job_id WHERE j.state IN (2, 3, 8) AND jr.job_run_state IN (1, 2, 11) ORDER BY jr.pool, j.queue;

internal/server/queryapi/database/query.sql.go

Lines changed: 33 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/server/queryapi/query_api.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,32 @@ func (q *QueryApi) GetJobStatusUsingExternalJobUri(ctx context.Context, req *api
242242
}, nil
243243
}
244244

245+
func (q *QueryApi) GetActiveQueues(ctx context.Context, _ *api.GetActiveQueuesRequest) (*api.GetActiveQueuesResponse, error) {
246+
queries := database.New(q.db)
247+
activeQueues, err := queries.GetActiveQueuesByPool(ctx)
248+
if err != nil {
249+
return nil, fmt.Errorf("could not get active queues by pool")
250+
}
251+
queuesByPool := map[string]*api.ActiveQueues{}
252+
for _, result := range activeQueues {
253+
if result.Pool == nil {
254+
continue
255+
}
256+
257+
pool := *result.Pool
258+
if _, ok := queuesByPool[pool]; !ok {
259+
queuesByPool[pool] = &api.ActiveQueues{
260+
Queues: []string{},
261+
}
262+
}
263+
queuesByPool[pool].Queues = append(queuesByPool[pool].Queues, result.Queue)
264+
}
265+
266+
return &api.GetActiveQueuesResponse{
267+
ActiveQueuesByPool: queuesByPool,
268+
}, nil
269+
}
270+
245271
func parseDbJobStateToApi(dbStatus int16) api.JobState {
246272
apiStatus, ok := JobStateMap[dbStatus]
247273
if !ok {

pkg/api/api.swagger.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,38 @@ func SwaggerJsonTemplate() string {
585585
" }\n" +
586586
" }\n" +
587587
" },\n" +
588+
" \"/v1/queues/active\": {\n" +
589+
" \"post\": {\n" +
590+
" \"tags\": [\n" +
591+
" \"Jobs\"\n" +
592+
" ],\n" +
593+
" \"operationId\": \"GetActiveQueues\",\n" +
594+
" \"parameters\": [\n" +
595+
" {\n" +
596+
" \"name\": \"body\",\n" +
597+
" \"in\": \"body\",\n" +
598+
" \"required\": true,\n" +
599+
" \"schema\": {\n" +
600+
" \"$ref\": \"#/definitions/apiGetActiveQueuesRequest\"\n" +
601+
" }\n" +
602+
" }\n" +
603+
" ],\n" +
604+
" \"responses\": {\n" +
605+
" \"200\": {\n" +
606+
" \"description\": \"A successful response.\",\n" +
607+
" \"schema\": {\n" +
608+
" \"$ref\": \"#/definitions/apiGetActiveQueuesResponse\"\n" +
609+
" }\n" +
610+
" },\n" +
611+
" \"default\": {\n" +
612+
" \"description\": \"An unexpected error response.\",\n" +
613+
" \"schema\": {\n" +
614+
" \"$ref\": \"#/definitions/runtimeError\"\n" +
615+
" }\n" +
616+
" }\n" +
617+
" }\n" +
618+
" }\n" +
619+
" },\n" +
588620
" \"/v1/run/details\": {\n" +
589621
" \"post\": {\n" +
590622
" \"tags\": [\n" +
@@ -647,6 +679,17 @@ func SwaggerJsonTemplate() string {
647679
" }\n" +
648680
" }\n" +
649681
" },\n" +
682+
" \"apiActiveQueues\": {\n" +
683+
" \"type\": \"object\",\n" +
684+
" \"properties\": {\n" +
685+
" \"queues\": {\n" +
686+
" \"type\": \"array\",\n" +
687+
" \"items\": {\n" +
688+
" \"type\": \"string\"\n" +
689+
" }\n" +
690+
" }\n" +
691+
" }\n" +
692+
" },\n" +
650693
" \"apiBatchQueueCreateResponse\": {\n" +
651694
" \"type\": \"object\",\n" +
652695
" \"properties\": {\n" +
@@ -788,6 +831,20 @@ func SwaggerJsonTemplate() string {
788831
" }\n" +
789832
" }\n" +
790833
" },\n" +
834+
" \"apiGetActiveQueuesRequest\": {\n" +
835+
" \"type\": \"object\"\n" +
836+
" },\n" +
837+
" \"apiGetActiveQueuesResponse\": {\n" +
838+
" \"type\": \"object\",\n" +
839+
" \"properties\": {\n" +
840+
" \"activeQueuesByPool\": {\n" +
841+
" \"type\": \"object\",\n" +
842+
" \"additionalProperties\": {\n" +
843+
" \"$ref\": \"#/definitions/apiActiveQueues\"\n" +
844+
" }\n" +
845+
" }\n" +
846+
" }\n" +
847+
" },\n" +
791848
" \"apiIngressConfig\": {\n" +
792849
" \"type\": \"object\",\n" +
793850
" \"properties\": {\n" +

0 commit comments

Comments
 (0)