Skip to content

feat(lambda): publish retry message from scale up if runner creation fails #4605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ScaleError from './ScaleError';
import * as scaleUpModule from './scale-up';
import { getParameter } from '@aws-github-runner/aws-ssm-util';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { publishRetryMessage } from './job-retry';

const mockOctokit = {
paginate: vi.fn(),
Expand All @@ -33,6 +34,7 @@ const mockCreateRunner = vi.mocked(createRunner);
const mockListRunners = vi.mocked(listEC2Runners);
const mockSSMClient = mockClient(SSMClient);
const mockSSMgetParameter = vi.mocked(getParameter);
const mockPublishRetryMessage = vi.mocked(publishRetryMessage);

vi.mock('@octokit/rest', () => ({
Octokit: vi.fn().mockImplementation(() => mockOctokit),
Expand Down Expand Up @@ -60,6 +62,10 @@ vi.mock('@aws-github-runner/aws-ssm-util', async () => {
};
});

vi.mock('./job-retry', async () => ({
publishRetryMessage: vi.fn(),
}));

export type RunnerType = 'ephemeral' | 'non-ephemeral';

// for ephemeral and non-ephemeral runners
Expand Down Expand Up @@ -134,6 +140,9 @@ beforeEach(() => {
owner: TEST_DATA.repositoryOwner,
},
]);
mockPublishRetryMessage.mockImplementation(async () => {
return;
});

mockedAppAuth.mockResolvedValue({
type: 'app',
Expand Down Expand Up @@ -328,6 +337,36 @@ describe('scaleUp with GHES', () => {
],
});
});

it('tries to publish a retry message when runner creation succeeds', async () => {
await scaleUpModule.scaleUp('aws:sqs', TEST_DATA);
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when runner creation fails', async () => {
const mockCreateRunners = vi.mocked(createRunner);
mockCreateRunners.mockRejectedValue(new Error('no retry'));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when maximum runners has been reached', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '1';
process.env.ENABLE_EPHEMERAL_RUNNERS = 'false';
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('does not publish a retry message when the job is not queued', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: { status: 'completed' },
}));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).not.toBeCalled();
});

it.each(RUNNER_TYPES)(
'calls create start runner config of 40' + ' instances (ssm rate limit condition) to test time delay ',
async (type: RunnerType) => {
Expand Down Expand Up @@ -456,6 +495,34 @@ describe('scaleUp with GHES', () => {
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow('no retry');
mockCreateRunners.mockReset();
});

it('tries to publish a retry message when runner creation succeeds', async () => {
await scaleUpModule.scaleUp('aws:sqs', TEST_DATA);
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when runner creation fails', async () => {
const mockCreateRunners = vi.mocked(createRunner);
mockCreateRunners.mockRejectedValue(new Error('no retry'));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when maximum runners has been reached', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '1';
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('does not publish a retry message when the job is not queued', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: { status: 'completed' },
}));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).not.toBeCalled();
});
});
});

Expand Down Expand Up @@ -530,6 +597,34 @@ describe('scaleUp with public GH', () => {
await scaleUpModule.scaleUp('aws:sqs', TEST_DATA);
expect(createRunner).toBeCalledWith(expectedRunnerParams);
});

it('tries to publish a retry message when runner creation succeeds', async () => {
await scaleUpModule.scaleUp('aws:sqs', TEST_DATA);
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when runner creation fails', async () => {
const mockCreateRunners = vi.mocked(createRunner);
mockCreateRunners.mockRejectedValue(new Error('no retry'));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when maximum runners has been reached', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '1';
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('does not publish a retry message when the job is not queued', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: { status: 'completed' },
}));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).not.toBeCalled();
});
});

describe('on repo level', () => {
Expand Down Expand Up @@ -687,6 +782,34 @@ describe('scaleUp with public GH', () => {
process.env.ENABLE_EPHEMERAL_RUNNERS = 'true';
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toBeInstanceOf(ScaleError);
});

it('tries to publish a retry message when runner creation succeeds', async () => {
await scaleUpModule.scaleUp('aws:sqs', TEST_DATA);
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when runner creation fails', async () => {
const mockCreateRunners = vi.mocked(createRunner);
mockCreateRunners.mockRejectedValue(new Error('no retry'));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when maximum runners has been reached', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '1';
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('does not publish a retry message when the job is not queued', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: { status: 'completed' },
}));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).not.toBeCalled();
});
});
});

Expand Down Expand Up @@ -863,6 +986,36 @@ describe('scaleUp with Github Data Residency', () => {
],
});
});

it('tries to publish a retry message when runner creation succeeds', async () => {
await scaleUpModule.scaleUp('aws:sqs', TEST_DATA);
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when runner creation fails', async () => {
const mockCreateRunners = vi.mocked(createRunner);
mockCreateRunners.mockRejectedValue(new Error('no retry'));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when maximum runners has been reached', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '1';
process.env.ENABLE_EPHEMERAL_RUNNERS = 'false';
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('does not publish a retry message when the job is not queued', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: { status: 'completed' },
}));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).not.toBeCalled();
});

it.each(RUNNER_TYPES)(
'calls create start runner config of 40' + ' instances (ssm rate limit condition) to test time delay ',
async (type: RunnerType) => {
Expand Down Expand Up @@ -991,6 +1144,34 @@ describe('scaleUp with Github Data Residency', () => {
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow('no retry');
mockCreateRunners.mockReset();
});

it('tries to publish a retry message when runner creation succeeds', async () => {
await scaleUpModule.scaleUp('aws:sqs', TEST_DATA);
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when runner creation fails', async () => {
const mockCreateRunners = vi.mocked(createRunner);
mockCreateRunners.mockRejectedValue(new Error('no retry'));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).rejects.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('tries to publish a retry message when maximum runners has been reached', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '1';
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).toBeCalledWith(TEST_DATA);
});

it('does not publish a retry message when the job is not queued', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: { status: 'completed' },
}));
await expect(scaleUpModule.scaleUp('aws:sqs', TEST_DATA)).resolves.not.toThrow();
expect(publishRetryMessage).not.toBeCalled();
});
});
});

Expand Down
78 changes: 40 additions & 38 deletions lambdas/functions/control-plane/src/scale-runners/scale-up.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,46 +305,48 @@ export async function scaleUp(eventSource: string, payload: ActionRequestMessage
scaleUp = currentRunners.length < maximumRunners;
}

if (scaleUp) {
logger.info(`Attempting to launch a new runner`);

await createRunners(
{
ephemeral,
enableJitConfig,
ghesBaseUrl,
runnerLabels,
runnerGroup,
runnerNamePrefix,
runnerOwner,
runnerType,
disableAutoUpdate,
ssmTokenPath,
ssmConfigPath,
},
{
ec2instanceCriteria: {
instanceTypes,
targetCapacityType: instanceTargetCapacityType,
maxSpotPrice: instanceMaxSpotPrice,
instanceAllocationStrategy: instanceAllocationStrategy,
try {
if (scaleUp) {
logger.info(`Attempting to launch a new runner`);

await createRunners(
{
ephemeral,
enableJitConfig,
ghesBaseUrl,
runnerLabels,
runnerGroup,
runnerNamePrefix,
runnerOwner,
runnerType,
disableAutoUpdate,
ssmTokenPath,
ssmConfigPath,
},
environment,
launchTemplateName,
subnets,
amiIdSsmParameterName,
tracingEnabled,
onDemandFailoverOnError,
},
githubInstallationClient,
);

await publishRetryMessage(payload);
} else {
logger.info('No runner will be created, maximum number of runners reached.');
if (ephemeral) {
throw new ScaleError('No runners create: maximum of runners reached.');
{
ec2instanceCriteria: {
instanceTypes,
targetCapacityType: instanceTargetCapacityType,
maxSpotPrice: instanceMaxSpotPrice,
instanceAllocationStrategy: instanceAllocationStrategy,
},
environment,
launchTemplateName,
subnets,
amiIdSsmParameterName,
tracingEnabled,
onDemandFailoverOnError,
},
githubInstallationClient,
);
} else {
logger.info('No runner will be created, maximum number of runners reached.');
if (ephemeral) {
throw new ScaleError('No runners create: maximum of runners reached.');
}
}
} finally {
await publishRetryMessage(payload);
}
} else {
logger.info('No runner will be created, job is not queued.');
Expand Down