Skip to content

Commit 4129480

Browse files
committed
feat(controlplane): implement dual-bucket write for execution configs
1 parent dc4388d commit 4129480

File tree

6 files changed

+251
-2
lines changed

6 files changed

+251
-2
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import type { BlobObject, BlobStorage } from './index.js';
2+
3+
/**
4+
* A BlobStorage implementation that writes to two underlying stores (primary + secondary).
5+
*
6+
* - Writes and deletes go to both stores concurrently; both must succeed.
7+
* - Reads try the primary first, falling back to the secondary on failure.
8+
*/
9+
export class DualBlobStorage implements BlobStorage {
10+
constructor(
11+
private primary: BlobStorage,
12+
private secondary: BlobStorage,
13+
) {}
14+
15+
async putObject<Metadata extends Record<string, string>>(data: {
16+
key: string;
17+
abortSignal?: AbortSignal;
18+
body: Buffer;
19+
contentType: string;
20+
metadata?: Metadata;
21+
}): Promise<void> {
22+
await Promise.all([this.primary.putObject(data), this.secondary.putObject(data)]);
23+
}
24+
25+
async getObject(data: { key: string; abortSignal?: AbortSignal }): Promise<BlobObject> {
26+
try {
27+
return await this.primary.getObject(data);
28+
} catch {
29+
return await this.secondary.getObject(data);
30+
}
31+
}
32+
33+
async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
34+
const results = await Promise.all([this.primary.removeDirectory(data), this.secondary.removeDirectory(data)]);
35+
return results[0];
36+
}
37+
38+
async deleteObject(data: { key: string; abortSignal?: AbortSignal }): Promise<void> {
39+
await Promise.all([this.primary.deleteObject(data), this.secondary.deleteObject(data)]);
40+
}
41+
}

controlplane/src/core/blobstorage/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export { S3BlobStorage } from './s3.js';
2+
export { DualBlobStorage } from './dual.js';
23

34
export class BlobNotFoundError extends Error {
45
constructor(message: string, cause?: Error) {

controlplane/src/core/build-server.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import Keycloak from './services/Keycloak.js';
2929
import { PlatformWebhookService } from './webhooks/PlatformWebhookService.js';
3030
import AccessTokenAuthenticator from './services/AccessTokenAuthenticator.js';
3131
import { GitHubRepository } from './repositories/GitHubRepository.js';
32-
import { S3BlobStorage } from './blobstorage/index.js';
32+
import { S3BlobStorage, DualBlobStorage, type BlobStorage } from './blobstorage/index.js';
3333
import Mailer from './services/Mailer.js';
3434
import { OrganizationInvitationRepository } from './repositories/OrganizationInvitationRepository.js';
3535
import { Authorization } from './services/Authorization.js';
@@ -114,6 +114,15 @@ export interface BuildConfig {
114114
forcePathStyle?: boolean;
115115
useIndividualDeletes?: boolean;
116116
};
117+
s3StorageFailover?: {
118+
url: string;
119+
endpoint?: string;
120+
region?: string;
121+
username?: string;
122+
password?: string;
123+
forcePathStyle?: boolean;
124+
useIndividualDeletes?: boolean;
125+
};
117126
mailer: {
118127
smtpEnabled: boolean;
119128
smtpHost?: string;
@@ -343,14 +352,31 @@ export default async function build(opts: BuildConfig) {
343352
const s3Config = createS3ClientConfig(bucketName, opts.s3Storage);
344353

345354
const s3Client = new S3Client(s3Config);
346-
const blobStorage = new S3BlobStorage(s3Client, bucketName, {
355+
const primaryBlobStorage = new S3BlobStorage(s3Client, bucketName, {
347356
// GCS does not support DeleteObjects; force individual deletes when detected.
348357
useIndividualDeletes:
349358
isGoogleCloudStorageUrl(opts.s3Storage.url) || isGoogleCloudStorageUrl(s3Config.endpoint as string)
350359
? true
351360
: (opts.s3Storage.useIndividualDeletes ?? false),
352361
});
353362

363+
let blobStorage: BlobStorage = primaryBlobStorage;
364+
365+
if (opts.s3StorageFailover?.url) {
366+
const failoverBucketName = extractS3BucketName(opts.s3StorageFailover);
367+
const failoverS3Config = createS3ClientConfig(failoverBucketName, opts.s3StorageFailover);
368+
const failoverS3Client = new S3Client(failoverS3Config);
369+
const failoverBlobStorage = new S3BlobStorage(failoverS3Client, failoverBucketName, {
370+
useIndividualDeletes:
371+
isGoogleCloudStorageUrl(opts.s3StorageFailover.url) ||
372+
isGoogleCloudStorageUrl(failoverS3Config.endpoint as string)
373+
? true
374+
: (opts.s3StorageFailover.useIndividualDeletes ?? false),
375+
});
376+
377+
blobStorage = new DualBlobStorage(primaryBlobStorage, failoverBlobStorage);
378+
}
379+
354380
const platformWebhooks = new PlatformWebhookService(
355381
opts.webhook?.url,
356382
opts.webhook?.key,

controlplane/src/core/env.schema.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,22 @@ export const envVariables = z
162162
.string()
163163
.transform((val) => val === 'true')
164164
.optional(),
165+
/**
166+
* S3 Failover Storage (optional secondary bucket for resilience)
167+
*/
168+
S3_FAILOVER_STORAGE_URL: z.string().optional(),
169+
S3_FAILOVER_ENDPOINT: z.string().optional(),
170+
S3_FAILOVER_REGION: z.string().optional(),
171+
S3_FAILOVER_ACCESS_KEY_ID: z.string().optional(),
172+
S3_FAILOVER_SECRET_ACCESS_KEY: z.string().optional(),
173+
S3_FAILOVER_FORCE_PATH_STYLE: z
174+
.string()
175+
.transform((val) => val === 'true')
176+
.optional(),
177+
S3_FAILOVER_USE_INDIVIDUAL_DELETES: z
178+
.string()
179+
.transform((val) => val === 'true')
180+
.optional(),
165181
/**
166182
* Email
167183
*/

controlplane/src/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ const {
5050
S3_SECRET_ACCESS_KEY,
5151
S3_FORCE_PATH_STYLE,
5252
S3_USE_INDIVIDUAL_DELETES,
53+
S3_FAILOVER_STORAGE_URL,
54+
S3_FAILOVER_ENDPOINT,
55+
S3_FAILOVER_REGION,
56+
S3_FAILOVER_ACCESS_KEY_ID,
57+
S3_FAILOVER_SECRET_ACCESS_KEY,
58+
S3_FAILOVER_FORCE_PATH_STYLE,
59+
S3_FAILOVER_USE_INDIVIDUAL_DELETES,
5360
SMTP_ENABLED,
5461
SMTP_HOST,
5562
SMTP_PORT,
@@ -142,6 +149,17 @@ const options: BuildConfig = {
142149
forcePathStyle: S3_FORCE_PATH_STYLE,
143150
useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES,
144151
},
152+
s3StorageFailover: S3_FAILOVER_STORAGE_URL
153+
? {
154+
url: S3_FAILOVER_STORAGE_URL,
155+
region: S3_FAILOVER_REGION,
156+
endpoint: S3_FAILOVER_ENDPOINT,
157+
username: S3_FAILOVER_ACCESS_KEY_ID,
158+
password: S3_FAILOVER_SECRET_ACCESS_KEY,
159+
forcePathStyle: S3_FAILOVER_FORCE_PATH_STYLE,
160+
useIndividualDeletes: S3_FAILOVER_USE_INDIVIDUAL_DELETES,
161+
}
162+
: undefined,
145163
mailer: {
146164
smtpEnabled: SMTP_ENABLED,
147165
smtpHost: SMTP_HOST,
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import { describe, expect, test, vi } from 'vitest';
2+
import { DualBlobStorage } from '../src/core/blobstorage/dual.js';
3+
import type { BlobObject, BlobStorage } from '../src/core/blobstorage/index.js';
4+
5+
function createMockBlobStorage(overrides?: Partial<BlobStorage>): BlobStorage {
6+
return {
7+
putObject: vi.fn().mockResolvedValue(undefined),
8+
getObject: vi.fn().mockResolvedValue({ stream: new ReadableStream(), metadata: {} }),
9+
removeDirectory: vi.fn().mockResolvedValue(5),
10+
deleteObject: vi.fn().mockResolvedValue(undefined),
11+
...overrides,
12+
};
13+
}
14+
15+
describe('DualBlobStorage', () => {
16+
describe('putObject', () => {
17+
test('calls both primary and secondary', async () => {
18+
const primary = createMockBlobStorage();
19+
const secondary = createMockBlobStorage();
20+
const dual = new DualBlobStorage(primary, secondary);
21+
22+
const data = { key: 'test-key', body: Buffer.from('data'), contentType: 'text/plain' };
23+
await dual.putObject(data);
24+
25+
expect(primary.putObject).toHaveBeenCalledWith(data);
26+
expect(secondary.putObject).toHaveBeenCalledWith(data);
27+
});
28+
29+
test('rejects when primary fails', async () => {
30+
const primary = createMockBlobStorage({
31+
putObject: vi.fn().mockRejectedValue(new Error('primary write failed')),
32+
});
33+
const secondary = createMockBlobStorage();
34+
const dual = new DualBlobStorage(primary, secondary);
35+
36+
await expect(dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' })).rejects.toThrow(
37+
'primary write failed',
38+
);
39+
});
40+
41+
test('rejects when secondary fails', async () => {
42+
const primary = createMockBlobStorage();
43+
const secondary = createMockBlobStorage({
44+
putObject: vi.fn().mockRejectedValue(new Error('secondary write failed')),
45+
});
46+
const dual = new DualBlobStorage(primary, secondary);
47+
48+
await expect(dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' })).rejects.toThrow(
49+
'secondary write failed',
50+
);
51+
});
52+
});
53+
54+
describe('getObject', () => {
55+
test('returns primary result when primary succeeds', async () => {
56+
const primaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'primary' } };
57+
const primary = createMockBlobStorage({
58+
getObject: vi.fn().mockResolvedValue(primaryResult),
59+
});
60+
const secondary = createMockBlobStorage();
61+
const dual = new DualBlobStorage(primary, secondary);
62+
63+
const result = await dual.getObject({ key: 'k' });
64+
65+
expect(result).toBe(primaryResult);
66+
expect(secondary.getObject).not.toHaveBeenCalled();
67+
});
68+
69+
test('falls back to secondary when primary fails', async () => {
70+
const secondaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'secondary' } };
71+
const primary = createMockBlobStorage({
72+
getObject: vi.fn().mockRejectedValue(new Error('primary read failed')),
73+
});
74+
const secondary = createMockBlobStorage({
75+
getObject: vi.fn().mockResolvedValue(secondaryResult),
76+
});
77+
const dual = new DualBlobStorage(primary, secondary);
78+
79+
const result = await dual.getObject({ key: 'k' });
80+
81+
expect(result).toBe(secondaryResult);
82+
});
83+
84+
test('throws secondary error when both fail', async () => {
85+
const primary = createMockBlobStorage({
86+
getObject: vi.fn().mockRejectedValue(new Error('primary read failed')),
87+
});
88+
const secondary = createMockBlobStorage({
89+
getObject: vi.fn().mockRejectedValue(new Error('secondary read failed')),
90+
});
91+
const dual = new DualBlobStorage(primary, secondary);
92+
93+
await expect(dual.getObject({ key: 'k' })).rejects.toThrow('secondary read failed');
94+
});
95+
});
96+
97+
describe('deleteObject', () => {
98+
test('calls both primary and secondary', async () => {
99+
const primary = createMockBlobStorage();
100+
const secondary = createMockBlobStorage();
101+
const dual = new DualBlobStorage(primary, secondary);
102+
103+
await dual.deleteObject({ key: 'k' });
104+
105+
expect(primary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
106+
expect(secondary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
107+
});
108+
109+
test('rejects when one fails', async () => {
110+
const primary = createMockBlobStorage({
111+
deleteObject: vi.fn().mockRejectedValue(new Error('delete failed')),
112+
});
113+
const secondary = createMockBlobStorage();
114+
const dual = new DualBlobStorage(primary, secondary);
115+
116+
await expect(dual.deleteObject({ key: 'k' })).rejects.toThrow('delete failed');
117+
});
118+
});
119+
120+
describe('removeDirectory', () => {
121+
test('returns primary count when both succeed', async () => {
122+
const primary = createMockBlobStorage({
123+
removeDirectory: vi.fn().mockResolvedValue(10),
124+
});
125+
const secondary = createMockBlobStorage({
126+
removeDirectory: vi.fn().mockResolvedValue(10),
127+
});
128+
const dual = new DualBlobStorage(primary, secondary);
129+
130+
const count = await dual.removeDirectory({ key: 'dir/' });
131+
132+
expect(count).toBe(10);
133+
expect(primary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' });
134+
expect(secondary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' });
135+
});
136+
137+
test('rejects when one fails', async () => {
138+
const primary = createMockBlobStorage();
139+
const secondary = createMockBlobStorage({
140+
removeDirectory: vi.fn().mockRejectedValue(new Error('remove failed')),
141+
});
142+
const dual = new DualBlobStorage(primary, secondary);
143+
144+
await expect(dual.removeDirectory({ key: 'dir/' })).rejects.toThrow('remove failed');
145+
});
146+
});
147+
});

0 commit comments

Comments
 (0)