-
-
Notifications
You must be signed in to change notification settings - Fork 853
v4: dequeue performance improvements (split concurrency from dequeue) #2127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
d8352f9
WIP
ericallam 9cec702
Run queue now works with the worker queue / master queue split
ericallam 9204ba0
Acking should also cause the master queue to be processed
ericallam 0ca8b0a
Convert run engine tests and run engine to use runQueue changes
ericallam 9193e53
Include the util files in the test tsconfig
ericallam a38f8e5
coordinator target should be es2020 as well
ericallam 6d5c8d2
providers target 2020
ericallam a25bae2
Fix the triggerTask tests in the webapp
ericallam f5d54b0
v4 now working with the new worker queues, and added the legacy maste…
ericallam a2aa2c8
report worker queue lengths via opentelemetry metrics
ericallam e2d7c13
Adding lock metrics
ericallam ff2768c
Release concurrency bucket metrics
ericallam 57ee381
• Updated RunQueue.removeEnvironmentQueuesFromMasterQueue() method si…
ericallam 2e0ead9
metrics now working, configure the run queue settings, additional met…
ericallam 5fd0efd
Fix CodeRabbit suggestions
ericallam 5b78b5d
return undefined from dequeueFromWorkerQueue, not null
ericallam 01f62df
Remove message from worker queue in certain circumstances when acking
ericallam 95a522b
Update log
ericallam be82a67
Ensure master queue consumers cannot stop from a processing error, an…
ericallam 4b073b2
Change how the run queue master queue consumers are disabled internally
ericallam d5cfa24
Fixed tests
ericallam 9037d3c
process the queue on nack
ericallam 79e7a15
Fix more tests
ericallam 7387c0e
Fix priority tests
ericallam 54f06f5
Fixed dequeueing test
ericallam File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 37 additions & 0 deletions
37
apps/webapp/app/routes/admin.api.v1.migrate-legacy-master-queues.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; | ||
import { prisma } from "~/db.server"; | ||
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; | ||
import { engine } from "~/v3/runEngine.server"; | ||
|
||
export async function action({ request }: ActionFunctionArgs) { | ||
// Next authenticate the request | ||
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); | ||
|
||
if (!authenticationResult) { | ||
return json({ error: "Invalid or Missing API key" }, { status: 401 }); | ||
} | ||
|
||
const user = await prisma.user.findUnique({ | ||
where: { | ||
id: authenticationResult.userId, | ||
}, | ||
}); | ||
|
||
if (!user) { | ||
return json({ error: "Invalid or Missing API key" }, { status: 401 }); | ||
} | ||
|
||
if (!user.admin) { | ||
return json({ error: "You must be an admin to perform this action" }, { status: 403 }); | ||
} | ||
|
||
try { | ||
await engine.migrateLegacyMasterQueues(); | ||
|
||
return json({ | ||
success: true, | ||
}); | ||
} catch (error) { | ||
return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,92 +1,22 @@ | ||
import { json } from "@remix-run/server-runtime"; | ||
import { DequeuedMessage, DevDequeueRequestBody, MachineResources } from "@trigger.dev/core/v3"; | ||
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; | ||
import { env } from "~/env.server"; | ||
import { DevDequeueRequestBody } from "@trigger.dev/core/v3"; | ||
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
import { engine } from "~/v3/runEngine.server"; | ||
|
||
const { action } = createActionApiRoute( | ||
{ | ||
body: DevDequeueRequestBody, | ||
body: DevDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility | ||
maxContentLength: 1024 * 10, // 10KB | ||
method: "POST", | ||
}, | ||
async ({ authentication, body }) => { | ||
//we won't return more runs than this in one API call | ||
let maxDequeueCount = env.DEV_DEQUEUE_MAX_RUNS_PER_PULL; | ||
|
||
//we can't use more than the max resources | ||
const availableResources = body.maxResources ?? { | ||
cpu: 8, | ||
memory: 16, | ||
}; | ||
|
||
let dequeuedMessages: DequeuedMessage[] = []; | ||
|
||
//we need to check the current worker, because a run might have been locked to it | ||
const workers = body.oldWorkers.concat(body.currentWorker); | ||
|
||
//first we want to clear out old runs | ||
for (const worker of workers) { | ||
//dequeue | ||
const latestResult = await engine.dequeueFromBackgroundWorkerMasterQueue({ | ||
consumerId: authentication.environment.id, | ||
//specific version | ||
backgroundWorkerId: BackgroundWorkerId.toId(worker), | ||
maxRunCount: maxDequeueCount, | ||
maxResources: availableResources, | ||
}); | ||
|
||
//add runs to the array | ||
dequeuedMessages.push(...latestResult); | ||
|
||
//update availableResources | ||
const consumedResources = latestResult.reduce( | ||
(acc, r) => { | ||
return { | ||
cpu: acc.cpu + r.run.machine.cpu, | ||
memory: acc.memory + r.run.machine.memory, | ||
}; | ||
}, | ||
{ cpu: 0, memory: 0 } | ||
); | ||
updateAvailableResources(availableResources, consumedResources); | ||
|
||
//update maxDequeueCount | ||
maxDequeueCount -= latestResult.length; | ||
|
||
//if we have no resources left, we exit the loop | ||
if (!hasAvailableResources(availableResources)) break; | ||
//we've already dequeued the max number of runs | ||
if (maxDequeueCount <= 0) break; | ||
} | ||
|
||
//dequeue from the current version if we still have space | ||
if (hasAvailableResources(availableResources) && maxDequeueCount > 0) { | ||
const latestResult = await engine.dequeueFromEnvironmentMasterQueue({ | ||
consumerId: authentication.environment.id, | ||
//current dev version (no specific version specified) | ||
environmentId: authentication.environment.id, | ||
maxRunCount: maxDequeueCount, | ||
maxResources: availableResources, | ||
}); | ||
dequeuedMessages.push(...latestResult); | ||
} | ||
async ({ authentication }) => { | ||
const dequeuedMessages = await engine.dequeueFromEnvironmentWorkerQueue({ | ||
consumerId: authentication.environment.id, | ||
environmentId: authentication.environment.id, | ||
}); | ||
|
||
return json({ dequeuedMessages }, { status: 200 }); | ||
} | ||
); | ||
|
||
function updateAvailableResources( | ||
availableResources: MachineResources, | ||
resources: MachineResources | ||
) { | ||
availableResources.cpu -= resources.cpu; | ||
availableResources.memory -= resources.memory; | ||
} | ||
|
||
function hasAvailableResources(availableResources: MachineResources) { | ||
return availableResources.cpu > 0 && availableResources.memory > 0; | ||
} | ||
|
||
export { action }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.