Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions controlplane/src/core/blobstorage/dual.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import type { BlobObject, BlobStorage } from './index.js';

/**
* A BlobStorage implementation that writes to two underlying stores (primary + secondary).
*
* - Writes and deletes go to both stores concurrently; both must succeed.
* - Reads try the primary first, falling back to the secondary on failure.
*/
export class DualBlobStorage implements BlobStorage {
constructor(
private primary: BlobStorage,
private secondary: BlobStorage,
) {}

async putObject<Metadata extends Record<string, string>>(data: {
key: string;
abortSignal?: AbortSignal;
body: Buffer;
contentType: string;
metadata?: Metadata;
}): Promise<void> {
const results = await Promise.allSettled([this.primary.putObject(data), this.secondary.putObject(data)]);
const [primaryResult, secondaryResult] = results;

if (primaryResult.status === 'fulfilled' && secondaryResult.status === 'fulfilled') {
return;
}

// Roll back successful writes before throwing, independent of the caller's signal
const rollbacks: Promise<void>[] = [];
if (primaryResult.status === 'fulfilled') {
rollbacks.push(this.primary.deleteObject({ key: data.key }));
}
if (secondaryResult.status === 'fulfilled') {
rollbacks.push(this.secondary.deleteObject({ key: data.key }));
}
const rollbackResults = await Promise.allSettled(rollbacks);

const putErrors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected').map((r) => r.reason);
const rollbackErrors = rollbackResults
.filter((r): r is PromiseRejectedResult => r.status === 'rejected')
.map((r) => r.reason);
throw new AggregateError([...putErrors, ...rollbackErrors], 'Failed to put object into storage');
}

async getObject(data: { key: string; abortSignal?: AbortSignal }): Promise<BlobObject> {
try {
return await this.primary.getObject(data);
} catch (primaryError) {
try {
return await this.secondary.getObject(data);
} catch (secondaryError) {
throw new AggregateError(
[primaryError, secondaryError],
'Both primary and secondary storage failed to get object',
);
}
}
}

async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
const results = await Promise.all([this.primary.removeDirectory(data), this.secondary.removeDirectory(data)]);
return results[0];
}

async deleteObject(data: { key: string; abortSignal?: AbortSignal }): Promise<void> {
await Promise.all([this.primary.deleteObject(data), this.secondary.deleteObject(data)]);
}
}
1 change: 1 addition & 0 deletions controlplane/src/core/blobstorage/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export { S3BlobStorage } from './s3.js';
export { DualBlobStorage } from './dual.js';

export class BlobNotFoundError extends Error {
constructor(message: string, cause?: Error) {
Expand Down
30 changes: 28 additions & 2 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import Keycloak from './services/Keycloak.js';
import { PlatformWebhookService } from './webhooks/PlatformWebhookService.js';
import AccessTokenAuthenticator from './services/AccessTokenAuthenticator.js';
import { GitHubRepository } from './repositories/GitHubRepository.js';
import { S3BlobStorage } from './blobstorage/index.js';
import { S3BlobStorage, DualBlobStorage, type BlobStorage } from './blobstorage/index.js';
import Mailer from './services/Mailer.js';
import { OrganizationInvitationRepository } from './repositories/OrganizationInvitationRepository.js';
import { Authorization } from './services/Authorization.js';
Expand Down Expand Up @@ -114,6 +114,15 @@ export interface BuildConfig {
forcePathStyle?: boolean;
useIndividualDeletes?: boolean;
};
s3StorageFailover?: {
url: string;
endpoint?: string;
region?: string;
username?: string;
password?: string;
forcePathStyle?: boolean;
useIndividualDeletes?: boolean;
};
mailer: {
smtpEnabled: boolean;
smtpHost?: string;
Expand Down Expand Up @@ -343,14 +352,31 @@ export default async function build(opts: BuildConfig) {
const s3Config = createS3ClientConfig(bucketName, opts.s3Storage);

const s3Client = new S3Client(s3Config);
const blobStorage = new S3BlobStorage(s3Client, bucketName, {
const primaryBlobStorage = new S3BlobStorage(s3Client, bucketName, {
// GCS does not support DeleteObjects; force individual deletes when detected.
useIndividualDeletes:
isGoogleCloudStorageUrl(opts.s3Storage.url) || isGoogleCloudStorageUrl(s3Config.endpoint as string)
? true
: (opts.s3Storage.useIndividualDeletes ?? false),
});

let blobStorage: BlobStorage = primaryBlobStorage;

if (opts.s3StorageFailover?.url) {
const failoverBucketName = extractS3BucketName(opts.s3StorageFailover);
const failoverS3Config = createS3ClientConfig(failoverBucketName, opts.s3StorageFailover);
const failoverS3Client = new S3Client(failoverS3Config);
const failoverBlobStorage = new S3BlobStorage(failoverS3Client, failoverBucketName, {
useIndividualDeletes:
isGoogleCloudStorageUrl(opts.s3StorageFailover.url) ||
isGoogleCloudStorageUrl(failoverS3Config.endpoint as string)
? true
: (opts.s3StorageFailover.useIndividualDeletes ?? false),
});

blobStorage = new DualBlobStorage(primaryBlobStorage, failoverBlobStorage);
}

const platformWebhooks = new PlatformWebhookService(
opts.webhook?.url,
opts.webhook?.key,
Expand Down
16 changes: 16 additions & 0 deletions controlplane/src/core/env.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ export const envVariables = z
.string()
.transform((val) => val === 'true')
.optional(),
/**
* S3 Failover Storage (optional secondary bucket for resilience)
*/
S3_FAILOVER_STORAGE_URL: z.string().optional(),
S3_FAILOVER_ENDPOINT: z.string().optional(),
S3_FAILOVER_REGION: z.string().default('auto'),
S3_FAILOVER_ACCESS_KEY_ID: z.string().optional(),
S3_FAILOVER_SECRET_ACCESS_KEY: z.string().optional(),
S3_FAILOVER_FORCE_PATH_STYLE: z
.string()
.transform((val) => val === 'true')
.optional(),
S3_FAILOVER_USE_INDIVIDUAL_DELETES: z
.string()
.transform((val) => val === 'true')
.optional(),
/**
* Email
*/
Expand Down
18 changes: 18 additions & 0 deletions controlplane/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ const {
S3_SECRET_ACCESS_KEY,
S3_FORCE_PATH_STYLE,
S3_USE_INDIVIDUAL_DELETES,
S3_FAILOVER_STORAGE_URL,
S3_FAILOVER_ENDPOINT,
S3_FAILOVER_REGION,
S3_FAILOVER_ACCESS_KEY_ID,
S3_FAILOVER_SECRET_ACCESS_KEY,
S3_FAILOVER_FORCE_PATH_STYLE,
S3_FAILOVER_USE_INDIVIDUAL_DELETES,
SMTP_ENABLED,
SMTP_HOST,
SMTP_PORT,
Expand Down Expand Up @@ -142,6 +149,17 @@ const options: BuildConfig = {
forcePathStyle: S3_FORCE_PATH_STYLE,
useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES,
},
s3StorageFailover: S3_FAILOVER_STORAGE_URL
? {
url: S3_FAILOVER_STORAGE_URL,
region: S3_FAILOVER_REGION,
endpoint: S3_FAILOVER_ENDPOINT,
username: S3_FAILOVER_ACCESS_KEY_ID,
password: S3_FAILOVER_SECRET_ACCESS_KEY,
forcePathStyle: S3_FAILOVER_FORCE_PATH_STYLE,
useIndividualDeletes: S3_FAILOVER_USE_INDIVIDUAL_DELETES,
}
: undefined,
mailer: {
smtpEnabled: SMTP_ENABLED,
smtpHost: SMTP_HOST,
Expand Down
185 changes: 185 additions & 0 deletions controlplane/test/dual-blob-storage.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import { describe, expect, test, vi } from 'vitest';
import { DualBlobStorage } from '../src/core/blobstorage/dual.js';
import type { BlobObject, BlobStorage } from '../src/core/blobstorage/index.js';

function createMockBlobStorage(overrides?: Partial<BlobStorage>): BlobStorage {
return {
putObject: vi.fn().mockResolvedValue(undefined),
getObject: vi.fn().mockResolvedValue({ stream: new ReadableStream(), metadata: {} }),
removeDirectory: vi.fn().mockResolvedValue(5),
deleteObject: vi.fn().mockResolvedValue(undefined),
...overrides,
};
}

describe('DualBlobStorage', () => {
describe('putObject', () => {
test('calls both primary and secondary', async () => {
const primary = createMockBlobStorage();
const secondary = createMockBlobStorage();
const dual = new DualBlobStorage(primary, secondary);

const data = { key: 'test-key', body: Buffer.from('data'), contentType: 'text/plain' };
await dual.putObject(data);

expect(primary.putObject).toHaveBeenCalledWith(data);
expect(secondary.putObject).toHaveBeenCalledWith(data);
});

test('rejects when primary fails and rolls back secondary', async () => {
const primaryError = new Error('primary write failed');
const primary = createMockBlobStorage({
putObject: vi.fn().mockRejectedValue(primaryError),
});
const secondary = createMockBlobStorage();
const dual = new DualBlobStorage(primary, secondary);

await expect(
dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }),
).rejects.toMatchObject({
message: 'Failed to put object into storage',
errors: [primaryError],
});

expect(primary.deleteObject).not.toHaveBeenCalled();
expect(secondary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
});

test('rejects when secondary fails and rolls back primary', async () => {
const secondaryError = new Error('secondary write failed');
const primary = createMockBlobStorage();
const secondary = createMockBlobStorage({
putObject: vi.fn().mockRejectedValue(secondaryError),
});
const dual = new DualBlobStorage(primary, secondary);

await expect(
dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }),
).rejects.toMatchObject({
message: 'Failed to put object into storage',
errors: [secondaryError],
});

expect(primary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
expect(secondary.deleteObject).not.toHaveBeenCalled();
});
test('includes rollback errors in aggregate when rollback also fails', async () => {
const secondaryError = new Error('secondary write failed');
const rollbackError = new Error('primary rollback failed');
const primary = createMockBlobStorage({
deleteObject: vi.fn().mockRejectedValue(rollbackError),
});
const secondary = createMockBlobStorage({
putObject: vi.fn().mockRejectedValue(secondaryError),
});
const dual = new DualBlobStorage(primary, secondary);

await expect(
dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }),
).rejects.toMatchObject({
message: 'Failed to put object into storage',
errors: [secondaryError, rollbackError],
});
});
});

describe('getObject', () => {
test('returns primary result when primary succeeds', async () => {
const primaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'primary' } };
const primary = createMockBlobStorage({
getObject: vi.fn().mockResolvedValue(primaryResult),
});
const secondary = createMockBlobStorage({
getObject: vi.fn().mockRejectedValue(new Error('secondary read failed')),
});
const dual = new DualBlobStorage(primary, secondary);

const result = await dual.getObject({ key: 'k' });

expect(result).toBe(primaryResult);
});

test('falls back to secondary when primary fails', async () => {
const secondaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'secondary' } };
const primary = createMockBlobStorage({
getObject: vi.fn().mockRejectedValue(new Error('primary read failed')),
});
const secondary = createMockBlobStorage({
getObject: vi.fn().mockResolvedValue(secondaryResult),
});
const dual = new DualBlobStorage(primary, secondary);

const result = await dual.getObject({ key: 'k' });

expect(result).toBe(secondaryResult);
});

test('throws aggregate error with both underlying errors when both fail', async () => {
const primaryError = new Error('primary read failed');
const secondaryError = new Error('secondary read failed');
const primary = createMockBlobStorage({
getObject: vi.fn().mockRejectedValue(primaryError),
});
const secondary = createMockBlobStorage({
getObject: vi.fn().mockRejectedValue(secondaryError),
});
const dual = new DualBlobStorage(primary, secondary);

await expect(dual.getObject({ key: 'k' })).rejects.toMatchObject({
message: 'Both primary and secondary storage failed to get object',
errors: [primaryError, secondaryError],
});
});
});

describe('deleteObject', () => {
test('calls both primary and secondary', async () => {
const primary = createMockBlobStorage();
const secondary = createMockBlobStorage();
const dual = new DualBlobStorage(primary, secondary);

await dual.deleteObject({ key: 'k' });

expect(primary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
expect(secondary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
});

test('rejects when one fails', async () => {
const primary = createMockBlobStorage({
deleteObject: vi.fn().mockRejectedValue(new Error('delete failed')),
});
const secondary = createMockBlobStorage();
const dual = new DualBlobStorage(primary, secondary);

await expect(dual.deleteObject({ key: 'k' })).rejects.toThrow('delete failed');
});
});

describe('removeDirectory', () => {
test('returns primary count when both succeed', async () => {
const primary = createMockBlobStorage({
removeDirectory: vi.fn().mockResolvedValue(10),
});
const secondary = createMockBlobStorage({
removeDirectory: vi.fn().mockResolvedValue(10),
});
const dual = new DualBlobStorage(primary, secondary);

const count = await dual.removeDirectory({ key: 'dir/' });

expect(count).toBe(10);
expect(primary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' });
expect(secondary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' });
});

test('rejects when one fails', async () => {
const primary = createMockBlobStorage();
const secondary = createMockBlobStorage({
removeDirectory: vi.fn().mockRejectedValue(new Error('remove failed')),
});
const dual = new DualBlobStorage(primary, secondary);

await expect(dual.removeDirectory({ key: 'dir/' })).rejects.toThrow('remove failed');
});
});
});
Loading