1- import { TaskQueueType } from "@trigger.dev/database" ;
1+ import type { RunEngine } from "@internal/run-engine" ;
2+ import { Prisma , TaskQueueType } from "@trigger.dev/database" ;
3+ import { type PrismaClientOrTransaction } from "~/db.server" ;
24import { type AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
35import { determineEngineVersion } from "~/v3/engineVersion.server" ;
46import { engine } from "~/v3/runEngine.server" ;
57import { BasePresenter } from "./basePresenter.server" ;
68import { toQueueItem } from "./QueueRetrievePresenter.server" ;
9+ import type { QueueListPagination } from "./queueListPagination.server" ;
710
8- const DEFAULT_ITEMS_PER_PAGE = 25 ;
11+ type QueueListEngine = Pick < RunEngine , "lengthOfQueues" | "currentConcurrencyOfQueues" > ;
12+
13+ export const QUEUE_LIST_DEFAULT_ITEMS_PER_PAGE = 25 ;
914const MAX_ITEMS_PER_PAGE = 100 ;
1015
1116const typeToDBQueueType : Record < "task" | "custom" , TaskQueueType > = {
1217 task : TaskQueueType . VIRTUAL ,
1318 custom : TaskQueueType . NAMED ,
1419} ;
1520
21+ const queueListSelect = {
22+ friendlyId : true ,
23+ name : true ,
24+ orderableName : true ,
25+ concurrencyLimit : true ,
26+ concurrencyLimitBase : true ,
27+ concurrencyLimitOverriddenAt : true ,
28+ concurrencyLimitOverriddenBy : true ,
29+ type : true ,
30+ paused : true ,
31+ } satisfies Prisma . TaskQueueSelect ;
32+
33+ function buildQueueListWhere (
34+ environmentId : string ,
35+ query : string | undefined ,
36+ type : "task" | "custom" | undefined
37+ ) : Prisma . TaskQueueWhereInput {
38+ const trimmedQuery = query ?. trim ( ) ;
39+
40+ return {
41+ runtimeEnvironmentId : environmentId ,
42+ version : "V2" ,
43+ name : trimmedQuery
44+ ? {
45+ contains : trimmedQuery ,
46+ mode : "insensitive" ,
47+ }
48+ : undefined ,
49+ type : type ? typeToDBQueueType [ type ] : undefined ,
50+ } ;
51+ }
52+
1653export class QueueListPresenter extends BasePresenter {
1754 private readonly perPage : number ;
55+ private readonly engineClient : QueueListEngine ;
1856
19- constructor ( perPage : number = DEFAULT_ITEMS_PER_PAGE ) {
20- super ( ) ;
57+ constructor (
58+ perPage : number = QUEUE_LIST_DEFAULT_ITEMS_PER_PAGE ,
59+ prismaClient ?: PrismaClientOrTransaction ,
60+ replicaClient ?: PrismaClientOrTransaction ,
61+ engineClient : QueueListEngine = engine
62+ ) {
63+ super ( prismaClient , replicaClient ) ;
2164 this . perPage = Math . min ( perPage , MAX_ITEMS_PER_PAGE ) ;
65+ this . engineClient = engineClient ;
2266 }
2367
2468 public async call ( {
@@ -33,26 +77,14 @@ export class QueueListPresenter extends BasePresenter {
3377 perPage ?: number ;
3478 type ?: "task" | "custom" ;
3579 } ) {
36- const hasFilters = ( query !== undefined && query . length > 0 ) || type !== undefined ;
37-
38- // Get total count for pagination
39- const totalQueues = await this . _replica . taskQueue . count ( {
40- where : {
41- runtimeEnvironmentId : environment . id ,
42- version : "V2" ,
43- name : query
44- ? {
45- contains : query ,
46- mode : "insensitive" ,
47- }
48- : undefined ,
49- type : type ? typeToDBQueueType [ type ] : undefined ,
50- } ,
51- } ) ;
80+ const hasFilters = Boolean ( query ?. trim ( ) ) || type !== undefined ;
5281
53- //check the engine is the correct version
5482 const engineVersion = await determineEngineVersion ( { environment } ) ;
5583 if ( engineVersion === "V1" ) {
84+ const totalQueues = await this . _replica . taskQueue . count ( {
85+ where : buildQueueListWhere ( environment . id , query , type ) ,
86+ } ) ;
87+
5688 if ( totalQueues === 0 ) {
5789 const oldQueue = await this . _replica . taskQueue . findFirst ( {
5890 where : {
@@ -78,10 +110,30 @@ export class QueueListPresenter extends BasePresenter {
78110 } ;
79111 }
80112
113+ if ( hasFilters ) {
114+ const { queues, hasMore } = await this . getFilteredQueues ( environment , query , page , type ) ;
115+
116+ return {
117+ success : true as const ,
118+ queues,
119+ pagination : {
120+ mode : "filtered" as const ,
121+ currentPage : page ,
122+ hasMore,
123+ } ,
124+ hasFilters,
125+ } ;
126+ }
127+
128+ const totalQueues = await this . _replica . taskQueue . count ( {
129+ where : buildQueueListWhere ( environment . id , query , type ) ,
130+ } ) ;
131+
81132 return {
82133 success : true as const ,
83- queues : await this . getQueuesWithPagination ( environment , query , page , type ) ,
134+ queues : await this . getUnfilteredQueues ( environment , page , type ) ,
84135 pagination : {
136+ mode : "unfiltered" as const ,
85137 currentPage : page ,
86138 totalPages : Math . ceil ( totalQueues / this . perPage ) ,
87139 count : totalQueues ,
@@ -91,48 +143,68 @@ export class QueueListPresenter extends BasePresenter {
91143 } ;
92144 }
93145
94- private async getQueuesWithPagination (
146+ private async getFilteredQueues (
95147 environment : AuthenticatedEnvironment ,
96148 query : string | undefined ,
97149 page : number ,
98150 type : "task" | "custom" | undefined
99151 ) {
100152 const queues = await this . _replica . taskQueue . findMany ( {
101- where : {
102- runtimeEnvironmentId : environment . id ,
103- version : "V2" ,
104- name : query
105- ? {
106- contains : query ,
107- mode : "insensitive" ,
108- }
109- : undefined ,
110- type : type ? typeToDBQueueType [ type ] : undefined ,
111- } ,
112- select : {
113- friendlyId : true ,
114- name : true ,
115- orderableName : true ,
116- concurrencyLimit : true ,
117- concurrencyLimitBase : true ,
118- concurrencyLimitOverriddenAt : true ,
119- concurrencyLimitOverriddenBy : true ,
120- type : true ,
121- paused : true ,
153+ where : buildQueueListWhere ( environment . id , query , type ) ,
154+ select : queueListSelect ,
155+ orderBy : {
156+ orderableName : "asc" ,
122157 } ,
158+ skip : ( page - 1 ) * this . perPage ,
159+ take : this . perPage + 1 ,
160+ } ) ;
161+
162+ const hasMore = queues . length > this . perPage ;
163+
164+ return {
165+ queues : await this . enrichQueues ( environment , queues . slice ( 0 , this . perPage ) ) ,
166+ hasMore,
167+ } ;
168+ }
169+
170+ private async getUnfilteredQueues (
171+ environment : AuthenticatedEnvironment ,
172+ page : number ,
173+ type : "task" | "custom" | undefined
174+ ) {
175+ const queues = await this . _replica . taskQueue . findMany ( {
176+ where : buildQueueListWhere ( environment . id , undefined , type ) ,
177+ select : queueListSelect ,
123178 orderBy : {
124179 orderableName : "asc" ,
125180 } ,
126181 skip : ( page - 1 ) * this . perPage ,
127182 take : this . perPage ,
128183 } ) ;
129184
130- const results = await Promise . all ( [
131- engine . lengthOfQueues (
185+ return this . enrichQueues ( environment , queues ) ;
186+ }
187+
188+ private async enrichQueues (
189+ environment : AuthenticatedEnvironment ,
190+ queues : {
191+ friendlyId : string ;
192+ name : string ;
193+ orderableName : string | null ;
194+ concurrencyLimit : number | null ;
195+ concurrencyLimitBase : number | null ;
196+ concurrencyLimitOverriddenAt : Date | null ;
197+ concurrencyLimitOverriddenBy : string | null ;
198+ type : TaskQueueType ;
199+ paused : boolean ;
200+ } [ ]
201+ ) {
202+ const [ queuedByQueue , runningByQueue ] = await Promise . all ( [
203+ this . engineClient . lengthOfQueues (
132204 environment ,
133205 queues . map ( ( q ) => q . name )
134206 ) ,
135- engine . currentConcurrencyOfQueues (
207+ this . engineClient . currentConcurrencyOfQueues (
136208 environment ,
137209 queues . map ( ( q ) => q . name )
138210 ) ,
@@ -149,14 +221,13 @@ export class QueueListPresenter extends BasePresenter {
149221
150222 const overriddenByMap = new Map ( overriddenByUsers . map ( ( u ) => [ u . id , u ] ) ) ;
151223
152- // Transform queues to include running and queued counts
153224 return queues . map ( ( queue ) =>
154225 toQueueItem ( {
155226 friendlyId : queue . friendlyId ,
156227 name : queue . name ,
157228 type : queue . type ,
158- running : results [ 1 ] [ queue . name ] ?? 0 ,
159- queued : results [ 0 ] [ queue . name ] ?? 0 ,
229+ running : runningByQueue [ queue . name ] ?? 0 ,
230+ queued : queuedByQueue [ queue . name ] ?? 0 ,
160231 concurrencyLimit : queue . concurrencyLimit ?? null ,
161232 concurrencyLimitBase : queue . concurrencyLimitBase ?? null ,
162233 concurrencyLimitOverriddenAt : queue . concurrencyLimitOverriddenAt ?? null ,
0 commit comments