diff --git a/controlplane/src/core/blobstorage/dual.ts b/controlplane/src/core/blobstorage/dual.ts new file mode 100644 index 0000000000..14c2467fcb --- /dev/null +++ b/controlplane/src/core/blobstorage/dual.ts @@ -0,0 +1,69 @@ +import type { BlobObject, BlobStorage } from './index.js'; + +/** + * A BlobStorage implementation that writes to two underlying stores (primary + secondary). + * + * - Writes and deletes go to both stores concurrently; both must succeed. + * - Reads try the primary first, falling back to the secondary on failure. + */ +export class DualBlobStorage implements BlobStorage { + constructor( + private primary: BlobStorage, + private secondary: BlobStorage, + ) {} + + async putObject>(data: { + key: string; + abortSignal?: AbortSignal; + body: Buffer; + contentType: string; + metadata?: Metadata; + }): Promise { + const results = await Promise.allSettled([this.primary.putObject(data), this.secondary.putObject(data)]); + const [primaryResult, secondaryResult] = results; + + if (primaryResult.status === 'fulfilled' && secondaryResult.status === 'fulfilled') { + return; + } + + // Roll back successful writes before throwing, independent of the caller's signal + const rollbacks: Promise[] = []; + if (primaryResult.status === 'fulfilled') { + rollbacks.push(this.primary.deleteObject({ key: data.key })); + } + if (secondaryResult.status === 'fulfilled') { + rollbacks.push(this.secondary.deleteObject({ key: data.key })); + } + const rollbackResults = await Promise.allSettled(rollbacks); + + const putErrors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected').map((r) => r.reason); + const rollbackErrors = rollbackResults + .filter((r): r is PromiseRejectedResult => r.status === 'rejected') + .map((r) => r.reason); + throw new AggregateError([...putErrors, ...rollbackErrors], 'Failed to put object into storage'); + } + + async getObject(data: { key: string; abortSignal?: AbortSignal }): Promise { + try { + return await this.primary.getObject(data); + } catch (primaryError) { + try { + return await this.secondary.getObject(data); + } catch (secondaryError) { + throw new AggregateError( + [primaryError, secondaryError], + 'Both primary and secondary storage failed to get object', + ); + } + } + } + + async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise { + const results = await Promise.all([this.primary.removeDirectory(data), this.secondary.removeDirectory(data)]); + return results[0]; + } + + async deleteObject(data: { key: string; abortSignal?: AbortSignal }): Promise { + await Promise.all([this.primary.deleteObject(data), this.secondary.deleteObject(data)]); + } +} diff --git a/controlplane/src/core/blobstorage/index.ts b/controlplane/src/core/blobstorage/index.ts index 33d7fcba0d..7b64ec1baa 100644 --- a/controlplane/src/core/blobstorage/index.ts +++ b/controlplane/src/core/blobstorage/index.ts @@ -1,4 +1,5 @@ export { S3BlobStorage } from './s3.js'; +export { DualBlobStorage } from './dual.js'; export class BlobNotFoundError extends Error { constructor(message: string, cause?: Error) { diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index 18b90ea916..a6f9d8f9d0 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -29,7 +29,7 @@ import Keycloak from './services/Keycloak.js'; import { PlatformWebhookService } from './webhooks/PlatformWebhookService.js'; import AccessTokenAuthenticator from './services/AccessTokenAuthenticator.js'; import { GitHubRepository } from './repositories/GitHubRepository.js'; -import { S3BlobStorage } from './blobstorage/index.js'; +import { S3BlobStorage, DualBlobStorage, type BlobStorage } from './blobstorage/index.js'; import Mailer from './services/Mailer.js'; import { OrganizationInvitationRepository } from './repositories/OrganizationInvitationRepository.js'; import { Authorization } from './services/Authorization.js'; @@ -114,6 +114,15 @@ export interface BuildConfig { forcePathStyle?: boolean; useIndividualDeletes?: boolean; }; + s3StorageFailover?: { + url: string; + endpoint?: string; + region?: string; + username?: string; + password?: string; + forcePathStyle?: boolean; + useIndividualDeletes?: boolean; + }; mailer: { smtpEnabled: boolean; smtpHost?: string; @@ -343,7 +352,7 @@ export default async function build(opts: BuildConfig) { const s3Config = createS3ClientConfig(bucketName, opts.s3Storage); const s3Client = new S3Client(s3Config); - const blobStorage = new S3BlobStorage(s3Client, bucketName, { + const primaryBlobStorage = new S3BlobStorage(s3Client, bucketName, { // GCS does not support DeleteObjects; force individual deletes when detected. useIndividualDeletes: isGoogleCloudStorageUrl(opts.s3Storage.url) || isGoogleCloudStorageUrl(s3Config.endpoint as string) @@ -351,6 +360,23 @@ export default async function build(opts: BuildConfig) { : (opts.s3Storage.useIndividualDeletes ?? false), }); + let blobStorage: BlobStorage = primaryBlobStorage; + + if (opts.s3StorageFailover?.url) { + const failoverBucketName = extractS3BucketName(opts.s3StorageFailover); + const failoverS3Config = createS3ClientConfig(failoverBucketName, opts.s3StorageFailover); + const failoverS3Client = new S3Client(failoverS3Config); + const failoverBlobStorage = new S3BlobStorage(failoverS3Client, failoverBucketName, { + useIndividualDeletes: + isGoogleCloudStorageUrl(opts.s3StorageFailover.url) || + isGoogleCloudStorageUrl(failoverS3Config.endpoint as string) + ? true + : (opts.s3StorageFailover.useIndividualDeletes ?? false), + }); + + blobStorage = new DualBlobStorage(primaryBlobStorage, failoverBlobStorage); + } + const platformWebhooks = new PlatformWebhookService( opts.webhook?.url, opts.webhook?.key, diff --git a/controlplane/src/core/env.schema.ts b/controlplane/src/core/env.schema.ts index f505323d3f..9612c54bbf 100644 --- a/controlplane/src/core/env.schema.ts +++ b/controlplane/src/core/env.schema.ts @@ -162,6 +162,22 @@ export const envVariables = z .string() .transform((val) => val === 'true') .optional(), + /** + * S3 Failover Storage (optional secondary bucket for resilience) + */ + S3_FAILOVER_STORAGE_URL: z.string().optional(), + S3_FAILOVER_ENDPOINT: z.string().optional(), + S3_FAILOVER_REGION: z.string().default('auto'), + S3_FAILOVER_ACCESS_KEY_ID: z.string().optional(), + S3_FAILOVER_SECRET_ACCESS_KEY: z.string().optional(), + S3_FAILOVER_FORCE_PATH_STYLE: z + .string() + .transform((val) => val === 'true') + .optional(), + S3_FAILOVER_USE_INDIVIDUAL_DELETES: z + .string() + .transform((val) => val === 'true') + .optional(), /** * Email */ diff --git a/controlplane/src/index.ts b/controlplane/src/index.ts index e541a48ce0..33dc16c093 100644 --- a/controlplane/src/index.ts +++ b/controlplane/src/index.ts @@ -50,6 +50,13 @@ const { S3_SECRET_ACCESS_KEY, S3_FORCE_PATH_STYLE, S3_USE_INDIVIDUAL_DELETES, + S3_FAILOVER_STORAGE_URL, + S3_FAILOVER_ENDPOINT, + S3_FAILOVER_REGION, + S3_FAILOVER_ACCESS_KEY_ID, + S3_FAILOVER_SECRET_ACCESS_KEY, + S3_FAILOVER_FORCE_PATH_STYLE, + S3_FAILOVER_USE_INDIVIDUAL_DELETES, SMTP_ENABLED, SMTP_HOST, SMTP_PORT, @@ -142,6 +149,17 @@ const options: BuildConfig = { forcePathStyle: S3_FORCE_PATH_STYLE, useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES, }, + s3StorageFailover: S3_FAILOVER_STORAGE_URL + ? { + url: S3_FAILOVER_STORAGE_URL, + region: S3_FAILOVER_REGION, + endpoint: S3_FAILOVER_ENDPOINT, + username: S3_FAILOVER_ACCESS_KEY_ID, + password: S3_FAILOVER_SECRET_ACCESS_KEY, + forcePathStyle: S3_FAILOVER_FORCE_PATH_STYLE, + useIndividualDeletes: S3_FAILOVER_USE_INDIVIDUAL_DELETES, + } + : undefined, mailer: { smtpEnabled: SMTP_ENABLED, smtpHost: SMTP_HOST, diff --git a/controlplane/test/dual-blob-storage.test.ts b/controlplane/test/dual-blob-storage.test.ts new file mode 100644 index 0000000000..86809a11b7 --- /dev/null +++ b/controlplane/test/dual-blob-storage.test.ts @@ -0,0 +1,185 @@ +import { describe, expect, test, vi } from 'vitest'; +import { DualBlobStorage } from '../src/core/blobstorage/dual.js'; +import type { BlobObject, BlobStorage } from '../src/core/blobstorage/index.js'; + +function createMockBlobStorage(overrides?: Partial): BlobStorage { + return { + putObject: vi.fn().mockResolvedValue(undefined), + getObject: vi.fn().mockResolvedValue({ stream: new ReadableStream(), metadata: {} }), + removeDirectory: vi.fn().mockResolvedValue(5), + deleteObject: vi.fn().mockResolvedValue(undefined), + ...overrides, + }; +} + +describe('DualBlobStorage', () => { + describe('putObject', () => { + test('calls both primary and secondary', async () => { + const primary = createMockBlobStorage(); + const secondary = createMockBlobStorage(); + const dual = new DualBlobStorage(primary, secondary); + + const data = { key: 'test-key', body: Buffer.from('data'), contentType: 'text/plain' }; + await dual.putObject(data); + + expect(primary.putObject).toHaveBeenCalledWith(data); + expect(secondary.putObject).toHaveBeenCalledWith(data); + }); + + test('rejects when primary fails and rolls back secondary', async () => { + const primaryError = new Error('primary write failed'); + const primary = createMockBlobStorage({ + putObject: vi.fn().mockRejectedValue(primaryError), + }); + const secondary = createMockBlobStorage(); + const dual = new DualBlobStorage(primary, secondary); + + await expect( + dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }), + ).rejects.toMatchObject({ + message: 'Failed to put object into storage', + errors: [primaryError], + }); + + expect(primary.deleteObject).not.toHaveBeenCalled(); + expect(secondary.deleteObject).toHaveBeenCalledWith({ key: 'k' }); + }); + + test('rejects when secondary fails and rolls back primary', async () => { + const secondaryError = new Error('secondary write failed'); + const primary = createMockBlobStorage(); + const secondary = createMockBlobStorage({ + putObject: vi.fn().mockRejectedValue(secondaryError), + }); + const dual = new DualBlobStorage(primary, secondary); + + await expect( + dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }), + ).rejects.toMatchObject({ + message: 'Failed to put object into storage', + errors: [secondaryError], + }); + + expect(primary.deleteObject).toHaveBeenCalledWith({ key: 'k' }); + expect(secondary.deleteObject).not.toHaveBeenCalled(); + }); + test('includes rollback errors in aggregate when rollback also fails', async () => { + const secondaryError = new Error('secondary write failed'); + const rollbackError = new Error('primary rollback failed'); + const primary = createMockBlobStorage({ + deleteObject: vi.fn().mockRejectedValue(rollbackError), + }); + const secondary = createMockBlobStorage({ + putObject: vi.fn().mockRejectedValue(secondaryError), + }); + const dual = new DualBlobStorage(primary, secondary); + + await expect( + dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }), + ).rejects.toMatchObject({ + message: 'Failed to put object into storage', + errors: [secondaryError, rollbackError], + }); + }); + }); + + describe('getObject', () => { + test('returns primary result when primary succeeds', async () => { + const primaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'primary' } }; + const primary = createMockBlobStorage({ + getObject: vi.fn().mockResolvedValue(primaryResult), + }); + const secondary = createMockBlobStorage({ + getObject: vi.fn().mockRejectedValue(new Error('secondary read failed')), + }); + const dual = new DualBlobStorage(primary, secondary); + + const result = await dual.getObject({ key: 'k' }); + + expect(result).toBe(primaryResult); + }); + + test('falls back to secondary when primary fails', async () => { + const secondaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'secondary' } }; + const primary = createMockBlobStorage({ + getObject: vi.fn().mockRejectedValue(new Error('primary read failed')), + }); + const secondary = createMockBlobStorage({ + getObject: vi.fn().mockResolvedValue(secondaryResult), + }); + const dual = new DualBlobStorage(primary, secondary); + + const result = await dual.getObject({ key: 'k' }); + + expect(result).toBe(secondaryResult); + }); + + test('throws aggregate error with both underlying errors when both fail', async () => { + const primaryError = new Error('primary read failed'); + const secondaryError = new Error('secondary read failed'); + const primary = createMockBlobStorage({ + getObject: vi.fn().mockRejectedValue(primaryError), + }); + const secondary = createMockBlobStorage({ + getObject: vi.fn().mockRejectedValue(secondaryError), + }); + const dual = new DualBlobStorage(primary, secondary); + + await expect(dual.getObject({ key: 'k' })).rejects.toMatchObject({ + message: 'Both primary and secondary storage failed to get object', + errors: [primaryError, secondaryError], + }); + }); + }); + + describe('deleteObject', () => { + test('calls both primary and secondary', async () => { + const primary = createMockBlobStorage(); + const secondary = createMockBlobStorage(); + const dual = new DualBlobStorage(primary, secondary); + + await dual.deleteObject({ key: 'k' }); + + expect(primary.deleteObject).toHaveBeenCalledWith({ key: 'k' }); + expect(secondary.deleteObject).toHaveBeenCalledWith({ key: 'k' }); + }); + + test('rejects when one fails', async () => { + const primary = createMockBlobStorage({ + deleteObject: vi.fn().mockRejectedValue(new Error('delete failed')), + }); + const secondary = createMockBlobStorage(); + const dual = new DualBlobStorage(primary, secondary); + + await expect(dual.deleteObject({ key: 'k' })).rejects.toThrow('delete failed'); + }); + }); + + describe('removeDirectory', () => { + test('returns primary count when both succeed', async () => { + const primary = createMockBlobStorage({ + removeDirectory: vi.fn().mockResolvedValue(10), + }); + const secondary = createMockBlobStorage({ + removeDirectory: vi.fn().mockResolvedValue(10), + }); + const dual = new DualBlobStorage(primary, secondary); + + const count = await dual.removeDirectory({ key: 'dir/' }); + + expect(count).toBe(10); + expect(primary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' }); + expect(secondary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' }); + }); + + test('rejects when one fails', async () => { + const primary = createMockBlobStorage(); + const secondary = createMockBlobStorage({ + removeDirectory: vi.fn().mockRejectedValue(new Error('remove failed')), + }); + const dual = new DualBlobStorage(primary, secondary); + + await expect(dual.removeDirectory({ key: 'dir/' })).rejects.toThrow('remove failed'); + }); + }); +});