Skip to content

Commit d2f642c

Browse files
Copilotnpalmstuartp44Brend-Smits
authored
fix: Move publishRetryMessage to end of processing loop to avoid duplicate retries (#4966)
- [x] Understand the current flow and identify where `publishRetryMessage` is called - [x] Remove the `publishRetryMessage` call from line 360 (early in the loop) - [x] Add logic to track which messages should have retry messages published - [x] Call `publishRetryMessage` at the end of the loop for messages not marked as invalid - [x] Update tests to reflect the new behavior (publishRetryMessage after runner creation) - [x] Validate changes with linting and testing - [x] Address code review feedback: use Set for performance, extract helper function - [x] Simplify naming and reduce complexity per reviewer feedback - [x] Update lambda workflow to run on all PRs - [x] Fix formatting issues - [x] Fix merge conflicts from stu/fix_job_retry branch - [x] Revert unnecessary changes to keep PR minimal and focused Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: npalm <[email protected]> Co-authored-by: Stuart Pearson <[email protected]> Co-authored-by: Brend-Smits <[email protected]>
1 parent 7d99874 commit d2f642c

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,6 +1711,7 @@ describe('Retry mechanism tests', () => {
17111711

17121712
it('calls publishRetryMessage for each valid message when job is queued', async () => {
17131713
const messages = createTestMessages(3);
1714+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef']); // Create all requested runners
17141715

17151716
await scaleUpModule.scaleUp(messages);
17161717

@@ -1762,7 +1763,7 @@ describe('Retry mechanism tests', () => {
17621763
);
17631764
});
17641765

1765-
it('calls publishRetryMessage even when maximum runners is reached', async () => {
1766+
it('does not call publishRetryMessage when maximum runners is reached and messages are marked invalid', async () => {
17661767
process.env.RUNNERS_MAXIMUM_COUNT = '0'; // No runners can be created
17671768

17681769
const messages = createTestMessages(2);
@@ -1776,8 +1777,9 @@ describe('Retry mechanism tests', () => {
17761777
runnerOwner: TEST_DATA_SINGLE.repositoryOwner,
17771778
});
17781779

1779-
// publishRetryMessage should still be called even though no runners will be created
1780-
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(2);
1780+
// publishRetryMessage should NOT be called because messages are marked as invalid
1781+
// Invalid messages go back to the SQS queue and will be retried there
1782+
expect(mockPublishRetryMessage).not.toHaveBeenCalled();
17811783
expect(createRunner).not.toHaveBeenCalled();
17821784
});
17831785

@@ -1801,6 +1803,7 @@ describe('Retry mechanism tests', () => {
18011803

18021804
it('calls publishRetryMessage when ENABLE_JOB_QUEUED_CHECK is false', async () => {
18031805
process.env.ENABLE_JOB_QUEUED_CHECK = 'false';
1806+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890']); // Create all requested runners
18041807

18051808
const messages = createTestMessages(2);
18061809

@@ -1812,6 +1815,7 @@ describe('Retry mechanism tests', () => {
18121815
});
18131816

18141817
it('calls publishRetryMessage for each message in a multi-runner scenario', async () => {
1818+
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef', 'i-11111', 'i-22222']); // Create all requested runners
18151819
const messages = createTestMessages(5);
18161820

18171821
await scaleUpModule.scaleUp(messages);
@@ -1828,8 +1832,9 @@ describe('Retry mechanism tests', () => {
18281832
});
18291833
});
18301834

1831-
it('calls publishRetryMessage before runner creation', async () => {
1835+
it('calls publishRetryMessage after runner creation', async () => {
18321836
const messages = createTestMessages(1);
1837+
mockCreateRunner.mockResolvedValue(['i-12345']); // Create the requested runner
18331838

18341839
const callOrder: string[] = [];
18351840
mockPublishRetryMessage.mockImplementation(() => {
@@ -1843,7 +1848,7 @@ describe('Retry mechanism tests', () => {
18431848

18441849
await scaleUpModule.scaleUp(messages);
18451850

1846-
expect(callOrder).toEqual(['publishRetryMessage', 'createRunner']);
1851+
expect(callOrder).toEqual(['createRunner', 'publishRetryMessage']);
18471852
});
18481853
});
18491854

lambdas/functions/control-plane/src/scale-runners/scale-up.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
277277
};
278278

279279
const validMessages = new Map<string, MessagesWithClient>();
280-
const invalidMessages: string[] = [];
280+
const rejectedMessageIds = new Set<string>();
281281
for (const payload of payloads) {
282282
const { eventType, messageId, repositoryName, repositoryOwner } = payload;
283283
if (ephemeralEnabled && eventType !== 'workflow_job') {
@@ -286,7 +286,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
286286
{ eventType, messageId },
287287
);
288288

289-
invalidMessages.push(messageId);
289+
rejectedMessageIds.add(messageId);
290290

291291
continue;
292292
}
@@ -341,6 +341,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
341341
for (const [group, { githubInstallationClient, messages }] of validMessages.entries()) {
342342
// Work out how much we want to scale up by.
343343
let scaleUp = 0;
344+
const queuedMessages: ActionRequestMessageSQS[] = [];
344345

345346
for (const message of messages) {
346347
const messageLogger = logger.createChild({
@@ -359,7 +360,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
359360
}
360361

361362
scaleUp++;
362-
await publishRetryMessage(message);
363+
queuedMessages.push(message);
363364
}
364365

365366
if (scaleUp === 0) {
@@ -395,11 +396,18 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
395396
if (ephemeralEnabled) {
396397
// This removes `missingInstanceCount` items from the start of the array
397398
// so that, if we retry more messages later, we pick fresh ones.
398-
invalidMessages.push(...messages.splice(0, missingInstanceCount).map(({ messageId }) => messageId));
399+
const removedMessages = messages.splice(0, missingInstanceCount);
400+
removedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
399401
}
400402

401403
// No runners will be created, so skip calling the EC2 API.
402404
if (newRunners <= 0) {
405+
// Publish retry messages for messages that are not rejected
406+
for (const message of queuedMessages) {
407+
if (!rejectedMessageIds.has(message.messageId)) {
408+
await publishRetryMessage(message as ActionRequestMessageRetry);
409+
}
410+
}
403411
continue;
404412
}
405413
}
@@ -452,11 +460,19 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
452460
failedInstanceCount,
453461
});
454462

455-
invalidMessages.push(...messages.slice(0, failedInstanceCount).map(({ messageId }) => messageId));
463+
const failedMessages = messages.slice(0, failedInstanceCount);
464+
failedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
465+
}
466+
467+
// Publish retry messages for messages that are not rejected
468+
for (const message of queuedMessages) {
469+
if (!rejectedMessageIds.has(message.messageId)) {
470+
await publishRetryMessage(message as ActionRequestMessageRetry);
471+
}
456472
}
457473
}
458474

459-
return invalidMessages;
475+
return Array.from(rejectedMessageIds);
460476
}
461477

462478
export function getGitHubEnterpriseApiUrl() {

0 commit comments

Comments
 (0)