diff --git a/lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts b/lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts index 0611a6e697..a9324e6374 100644 --- a/lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts +++ b/lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts @@ -13,6 +13,7 @@ import ScaleError from './ScaleError'; import * as scaleUpModule from './scale-up'; import { getParameter } from '@aws-github-runner/aws-ssm-util'; import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { publishRetryMessage } from './job-retry'; const mockOctokit = { paginate: vi.fn(), @@ -33,6 +34,7 @@ const mockCreateRunner = vi.mocked(createRunner); const mockListRunners = vi.mocked(listEC2Runners); const mockSSMClient = mockClient(SSMClient); const mockSSMgetParameter = vi.mocked(getParameter); +const mockPublishRetryMessage = vi.mocked(publishRetryMessage); vi.mock('@octokit/rest', () => ({ Octokit: vi.fn().mockImplementation(() => mockOctokit), @@ -60,6 +62,10 @@ vi.mock('@aws-github-runner/aws-ssm-util', async () => { }; }); +vi.mock('./job-retry', async () => ({ + publishRetryMessage: vi.fn(), +})); + export type RunnerType = 'ephemeral' | 'non-ephemeral'; // for ephemeral and non-ephemeral runners @@ -134,6 +140,9 @@ beforeEach(() => { owner: TEST_DATA.repositoryOwner, }, ]); + mockPublishRetryMessage.mockImplementation(async () => { + return; + }); mockedAppAuth.mockResolvedValue({ type: 'app', @@ -328,6 +337,36 @@ describe('scaleUp with GHES', () => { ], }); }); + + it('tries to publish a retry message when runner creation succeeds', async () => { + await scaleUpModule.scaleUp('aws:sqs', TEST_DATA); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when runner creation fails', async () => { + const mockCreateRunners = vi.mocked(createRunner); + mockCreateRunners.mockRejectedValue(new Error('no retry')); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when maximum runners has been reached', async () => { + process.env.RUNNERS_MAXIMUM_COUNT = '1'; + process.env.ENABLE_EPHEMERAL_RUNNERS = 'false'; + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('does not publish a retry message when the job is not queued', async () => { + process.env.ENABLE_JOB_QUEUED_CHECK = 'true'; + mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({ + data: { status: 'completed' }, + })); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).not.toBeCalled(); + }); + it.each(RUNNER_TYPES)( 'calls create start runner config of 40' + ' instances (ssm rate limit condition) to test time delay ', async (type: RunnerType) => { @@ -456,6 +495,34 @@ describe('scaleUp with GHES', () => { await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow('no retry'); mockCreateRunners.mockReset(); }); + + it('tries to publish a retry message when runner creation succeeds', async () => { + await scaleUpModule.scaleUp('aws:sqs', TEST_DATA); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when runner creation fails', async () => { + const mockCreateRunners = vi.mocked(createRunner); + mockCreateRunners.mockRejectedValue(new Error('no retry')); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when maximum runners has been reached', async () => { + process.env.RUNNERS_MAXIMUM_COUNT = '1'; + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('does not publish a retry message when the job is not queued', async () => { + process.env.ENABLE_JOB_QUEUED_CHECK = 'true'; + mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({ + data: { status: 'completed' }, + })); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).not.toBeCalled(); + }); }); }); @@ -530,6 +597,34 @@ describe('scaleUp with public GH', () => { await scaleUpModule.scaleUp('aws:sqs', TEST_DATA); expect(createRunner).toBeCalledWith(expectedRunnerParams); }); + + it('tries to publish a retry message when runner creation succeeds', async () => { + await scaleUpModule.scaleUp('aws:sqs', TEST_DATA); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when runner creation fails', async () => { + const mockCreateRunners = vi.mocked(createRunner); + mockCreateRunners.mockRejectedValue(new Error('no retry')); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when maximum runners has been reached', async () => { + process.env.RUNNERS_MAXIMUM_COUNT = '1'; + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('does not publish a retry message when the job is not queued', async () => { + process.env.ENABLE_JOB_QUEUED_CHECK = 'true'; + mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({ + data: { status: 'completed' }, + })); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).not.toBeCalled(); + }); }); describe('on repo level', () => { @@ -687,6 +782,34 @@ describe('scaleUp with public GH', () => { process.env.ENABLE_EPHEMERAL_RUNNERS = 'true'; await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toBeInstanceOf(ScaleError); }); + + it('tries to publish a retry message when runner creation succeeds', async () => { + await scaleUpModule.scaleUp('aws:sqs', TEST_DATA); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when runner creation fails', async () => { + const mockCreateRunners = vi.mocked(createRunner); + mockCreateRunners.mockRejectedValue(new Error('no retry')); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when maximum runners has been reached', async () => { + process.env.RUNNERS_MAXIMUM_COUNT = '1'; + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('does not publish a retry message when the job is not queued', async () => { + process.env.ENABLE_JOB_QUEUED_CHECK = 'true'; + mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({ + data: { status: 'completed' }, + })); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).not.toBeCalled(); + }); }); }); @@ -863,6 +986,36 @@ describe('scaleUp with Github Data Residency', () => { ], }); }); + + it('tries to publish a retry message when runner creation succeeds', async () => { + await scaleUpModule.scaleUp('aws:sqs', TEST_DATA); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when runner creation fails', async () => { + const mockCreateRunners = vi.mocked(createRunner); + mockCreateRunners.mockRejectedValue(new Error('no retry')); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when maximum runners has been reached', async () => { + process.env.RUNNERS_MAXIMUM_COUNT = '1'; + process.env.ENABLE_EPHEMERAL_RUNNERS = 'false'; + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('does not publish a retry message when the job is not queued', async () => { + process.env.ENABLE_JOB_QUEUED_CHECK = 'true'; + mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({ + data: { status: 'completed' }, + })); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).not.toBeCalled(); + }); + it.each(RUNNER_TYPES)( 'calls create start runner config of 40' + ' instances (ssm rate limit condition) to test time delay ', async (type: RunnerType) => { @@ -991,6 +1144,34 @@ describe('scaleUp with Github Data Residency', () => { await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow('no retry'); mockCreateRunners.mockReset(); }); + + it('tries to publish a retry message when runner creation succeeds', async () => { + await scaleUpModule.scaleUp('aws:sqs', TEST_DATA); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when runner creation fails', async () => { + const mockCreateRunners = vi.mocked(createRunner); + mockCreateRunners.mockRejectedValue(new Error('no retry')); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('tries to publish a retry message when maximum runners has been reached', async () => { + process.env.RUNNERS_MAXIMUM_COUNT = '1'; + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).toBeCalledWith(TEST_DATA); + }); + + it('does not publish a retry message when the job is not queued', async () => { + process.env.ENABLE_JOB_QUEUED_CHECK = 'true'; + mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({ + data: { status: 'completed' }, + })); + await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow(); + expect(publishRetryMessage).not.toBeCalled(); + }); }); }); diff --git a/lambdas/functions/control-plane/src/scale-runners/scale-up.ts b/lambdas/functions/control-plane/src/scale-runners/scale-up.ts index 08d16d682a..08f098d6e9 100644 --- a/lambdas/functions/control-plane/src/scale-runners/scale-up.ts +++ b/lambdas/functions/control-plane/src/scale-runners/scale-up.ts @@ -305,46 +305,48 @@ export async function scaleUp(eventSource: string, payload: ActionRequestMessage scaleUp = currentRunners.length < maximumRunners; } - if (scaleUp) { - logger.info(`Attempting to launch a new runner`); - - await createRunners( - { - ephemeral, - enableJitConfig, - ghesBaseUrl, - runnerLabels, - runnerGroup, - runnerNamePrefix, - runnerOwner, - runnerType, - disableAutoUpdate, - ssmTokenPath, - ssmConfigPath, - }, - { - ec2instanceCriteria: { - instanceTypes, - targetCapacityType: instanceTargetCapacityType, - maxSpotPrice: instanceMaxSpotPrice, - instanceAllocationStrategy: instanceAllocationStrategy, + try { + if (scaleUp) { + logger.info(`Attempting to launch a new runner`); + + await createRunners( + { + ephemeral, + enableJitConfig, + ghesBaseUrl, + runnerLabels, + runnerGroup, + runnerNamePrefix, + runnerOwner, + runnerType, + disableAutoUpdate, + ssmTokenPath, + ssmConfigPath, }, - environment, - launchTemplateName, - subnets, - amiIdSsmParameterName, - tracingEnabled, - onDemandFailoverOnError, - }, - githubInstallationClient, - ); - - await publishRetryMessage(payload); - } else { - logger.info('No runner will be created, maximum number of runners reached.'); - if (ephemeral) { - throw new ScaleError('No runners create: maximum of runners reached.'); + { + ec2instanceCriteria: { + instanceTypes, + targetCapacityType: instanceTargetCapacityType, + maxSpotPrice: instanceMaxSpotPrice, + instanceAllocationStrategy: instanceAllocationStrategy, + }, + environment, + launchTemplateName, + subnets, + amiIdSsmParameterName, + tracingEnabled, + onDemandFailoverOnError, + }, + githubInstallationClient, + ); + } else { + logger.info('No runner will be created, maximum number of runners reached.'); + if (ephemeral) { + throw new ScaleError('No runners create: maximum of runners reached.'); + } } + } finally { + await publishRetryMessage(payload); } } else { logger.info('No runner will be created, job is not queued.');