Skip to content

Commit 985d82c

Browse files
authored
Merge branch 'main' into ale/eng-8365-enable-email-level-breakdown-for-custom-events-in-posthog
2 parents eb1fd35 + 0b0d42d commit 985d82c

File tree

30 files changed

+4595
-3502
lines changed

30 files changed

+4595
-3502
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import type { BlobObject, BlobStorage } from './index.js';
2+
3+
/**
4+
* A BlobStorage implementation that writes to two underlying stores (primary + secondary).
5+
*
6+
* - Writes and deletes go to both stores concurrently; both must succeed.
7+
* - Reads try the primary first, falling back to the secondary on failure.
8+
*/
9+
export class DualBlobStorage implements BlobStorage {
10+
constructor(
11+
private primary: BlobStorage,
12+
private secondary: BlobStorage,
13+
) {}
14+
15+
async putObject<Metadata extends Record<string, string>>(data: {
16+
key: string;
17+
abortSignal?: AbortSignal;
18+
body: Buffer;
19+
contentType: string;
20+
metadata?: Metadata;
21+
}): Promise<void> {
22+
const results = await Promise.allSettled([this.primary.putObject(data), this.secondary.putObject(data)]);
23+
const [primaryResult, secondaryResult] = results;
24+
25+
if (primaryResult.status === 'fulfilled' && secondaryResult.status === 'fulfilled') {
26+
return;
27+
}
28+
29+
// Roll back successful writes before throwing, independent of the caller's signal
30+
const rollbacks: Promise<void>[] = [];
31+
if (primaryResult.status === 'fulfilled') {
32+
rollbacks.push(this.primary.deleteObject({ key: data.key }));
33+
}
34+
if (secondaryResult.status === 'fulfilled') {
35+
rollbacks.push(this.secondary.deleteObject({ key: data.key }));
36+
}
37+
const rollbackResults = await Promise.allSettled(rollbacks);
38+
39+
const putErrors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected').map((r) => r.reason);
40+
const rollbackErrors = rollbackResults
41+
.filter((r): r is PromiseRejectedResult => r.status === 'rejected')
42+
.map((r) => r.reason);
43+
throw new AggregateError([...putErrors, ...rollbackErrors], 'Failed to put object into storage');
44+
}
45+
46+
async getObject(data: { key: string; abortSignal?: AbortSignal }): Promise<BlobObject> {
47+
try {
48+
return await this.primary.getObject(data);
49+
} catch (primaryError) {
50+
try {
51+
return await this.secondary.getObject(data);
52+
} catch (secondaryError) {
53+
throw new AggregateError(
54+
[primaryError, secondaryError],
55+
'Both primary and secondary storage failed to get object',
56+
);
57+
}
58+
}
59+
}
60+
61+
async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
62+
const results = await Promise.all([this.primary.removeDirectory(data), this.secondary.removeDirectory(data)]);
63+
return results[0];
64+
}
65+
66+
async deleteObject(data: { key: string; abortSignal?: AbortSignal }): Promise<void> {
67+
await Promise.all([this.primary.deleteObject(data), this.secondary.deleteObject(data)]);
68+
}
69+
}

controlplane/src/core/blobstorage/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export { S3BlobStorage } from './s3.js';
2+
export { DualBlobStorage } from './dual.js';
23

34
export class BlobNotFoundError extends Error {
45
constructor(message: string, cause?: Error) {

controlplane/src/core/build-server.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import Keycloak from './services/Keycloak.js';
2929
import { PlatformWebhookService } from './webhooks/PlatformWebhookService.js';
3030
import AccessTokenAuthenticator from './services/AccessTokenAuthenticator.js';
3131
import { GitHubRepository } from './repositories/GitHubRepository.js';
32-
import { S3BlobStorage } from './blobstorage/index.js';
32+
import { S3BlobStorage, DualBlobStorage, type BlobStorage } from './blobstorage/index.js';
3333
import Mailer from './services/Mailer.js';
3434
import { OrganizationInvitationRepository } from './repositories/OrganizationInvitationRepository.js';
3535
import { Authorization } from './services/Authorization.js';
@@ -114,6 +114,15 @@ export interface BuildConfig {
114114
forcePathStyle?: boolean;
115115
useIndividualDeletes?: boolean;
116116
};
117+
s3StorageFailover?: {
118+
url: string;
119+
endpoint?: string;
120+
region?: string;
121+
username?: string;
122+
password?: string;
123+
forcePathStyle?: boolean;
124+
useIndividualDeletes?: boolean;
125+
};
117126
mailer: {
118127
smtpEnabled: boolean;
119128
smtpHost?: string;
@@ -343,14 +352,31 @@ export default async function build(opts: BuildConfig) {
343352
const s3Config = createS3ClientConfig(bucketName, opts.s3Storage);
344353

345354
const s3Client = new S3Client(s3Config);
346-
const blobStorage = new S3BlobStorage(s3Client, bucketName, {
355+
const primaryBlobStorage = new S3BlobStorage(s3Client, bucketName, {
347356
// GCS does not support DeleteObjects; force individual deletes when detected.
348357
useIndividualDeletes:
349358
isGoogleCloudStorageUrl(opts.s3Storage.url) || isGoogleCloudStorageUrl(s3Config.endpoint as string)
350359
? true
351360
: (opts.s3Storage.useIndividualDeletes ?? false),
352361
});
353362

363+
let blobStorage: BlobStorage = primaryBlobStorage;
364+
365+
if (opts.s3StorageFailover?.url) {
366+
const failoverBucketName = extractS3BucketName(opts.s3StorageFailover);
367+
const failoverS3Config = createS3ClientConfig(failoverBucketName, opts.s3StorageFailover);
368+
const failoverS3Client = new S3Client(failoverS3Config);
369+
const failoverBlobStorage = new S3BlobStorage(failoverS3Client, failoverBucketName, {
370+
useIndividualDeletes:
371+
isGoogleCloudStorageUrl(opts.s3StorageFailover.url) ||
372+
isGoogleCloudStorageUrl(failoverS3Config.endpoint as string)
373+
? true
374+
: (opts.s3StorageFailover.useIndividualDeletes ?? false),
375+
});
376+
377+
blobStorage = new DualBlobStorage(primaryBlobStorage, failoverBlobStorage);
378+
}
379+
354380
const platformWebhooks = new PlatformWebhookService(
355381
opts.webhook?.url,
356382
opts.webhook?.key,

controlplane/src/core/env.schema.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,22 @@ export const envVariables = z
162162
.string()
163163
.transform((val) => val === 'true')
164164
.optional(),
165+
/**
166+
* S3 Failover Storage (optional secondary bucket for resilience)
167+
*/
168+
S3_FAILOVER_STORAGE_URL: z.string().optional(),
169+
S3_FAILOVER_ENDPOINT: z.string().optional(),
170+
S3_FAILOVER_REGION: z.string().default('auto'),
171+
S3_FAILOVER_ACCESS_KEY_ID: z.string().optional(),
172+
S3_FAILOVER_SECRET_ACCESS_KEY: z.string().optional(),
173+
S3_FAILOVER_FORCE_PATH_STYLE: z
174+
.string()
175+
.transform((val) => val === 'true')
176+
.optional(),
177+
S3_FAILOVER_USE_INDIVIDUAL_DELETES: z
178+
.string()
179+
.transform((val) => val === 'true')
180+
.optional(),
165181
/**
166182
* Email
167183
*/

controlplane/src/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ const {
5050
S3_SECRET_ACCESS_KEY,
5151
S3_FORCE_PATH_STYLE,
5252
S3_USE_INDIVIDUAL_DELETES,
53+
S3_FAILOVER_STORAGE_URL,
54+
S3_FAILOVER_ENDPOINT,
55+
S3_FAILOVER_REGION,
56+
S3_FAILOVER_ACCESS_KEY_ID,
57+
S3_FAILOVER_SECRET_ACCESS_KEY,
58+
S3_FAILOVER_FORCE_PATH_STYLE,
59+
S3_FAILOVER_USE_INDIVIDUAL_DELETES,
5360
SMTP_ENABLED,
5461
SMTP_HOST,
5562
SMTP_PORT,
@@ -142,6 +149,17 @@ const options: BuildConfig = {
142149
forcePathStyle: S3_FORCE_PATH_STYLE,
143150
useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES,
144151
},
152+
s3StorageFailover: S3_FAILOVER_STORAGE_URL
153+
? {
154+
url: S3_FAILOVER_STORAGE_URL,
155+
region: S3_FAILOVER_REGION,
156+
endpoint: S3_FAILOVER_ENDPOINT,
157+
username: S3_FAILOVER_ACCESS_KEY_ID,
158+
password: S3_FAILOVER_SECRET_ACCESS_KEY,
159+
forcePathStyle: S3_FAILOVER_FORCE_PATH_STYLE,
160+
useIndividualDeletes: S3_FAILOVER_USE_INDIVIDUAL_DELETES,
161+
}
162+
: undefined,
145163
mailer: {
146164
smtpEnabled: SMTP_ENABLED,
147165
smtpHost: SMTP_HOST,
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import { describe, expect, test, vi } from 'vitest';
2+
import { DualBlobStorage } from '../src/core/blobstorage/dual.js';
3+
import type { BlobObject, BlobStorage } from '../src/core/blobstorage/index.js';
4+
5+
function createMockBlobStorage(overrides?: Partial<BlobStorage>): BlobStorage {
6+
return {
7+
putObject: vi.fn().mockResolvedValue(undefined),
8+
getObject: vi.fn().mockResolvedValue({ stream: new ReadableStream(), metadata: {} }),
9+
removeDirectory: vi.fn().mockResolvedValue(5),
10+
deleteObject: vi.fn().mockResolvedValue(undefined),
11+
...overrides,
12+
};
13+
}
14+
15+
describe('DualBlobStorage', () => {
16+
describe('putObject', () => {
17+
test('calls both primary and secondary', async () => {
18+
const primary = createMockBlobStorage();
19+
const secondary = createMockBlobStorage();
20+
const dual = new DualBlobStorage(primary, secondary);
21+
22+
const data = { key: 'test-key', body: Buffer.from('data'), contentType: 'text/plain' };
23+
await dual.putObject(data);
24+
25+
expect(primary.putObject).toHaveBeenCalledWith(data);
26+
expect(secondary.putObject).toHaveBeenCalledWith(data);
27+
});
28+
29+
test('rejects when primary fails and rolls back secondary', async () => {
30+
const primaryError = new Error('primary write failed');
31+
const primary = createMockBlobStorage({
32+
putObject: vi.fn().mockRejectedValue(primaryError),
33+
});
34+
const secondary = createMockBlobStorage();
35+
const dual = new DualBlobStorage(primary, secondary);
36+
37+
await expect(
38+
dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }),
39+
).rejects.toMatchObject({
40+
message: 'Failed to put object into storage',
41+
errors: [primaryError],
42+
});
43+
44+
expect(primary.deleteObject).not.toHaveBeenCalled();
45+
expect(secondary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
46+
});
47+
48+
test('rejects when secondary fails and rolls back primary', async () => {
49+
const secondaryError = new Error('secondary write failed');
50+
const primary = createMockBlobStorage();
51+
const secondary = createMockBlobStorage({
52+
putObject: vi.fn().mockRejectedValue(secondaryError),
53+
});
54+
const dual = new DualBlobStorage(primary, secondary);
55+
56+
await expect(
57+
dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }),
58+
).rejects.toMatchObject({
59+
message: 'Failed to put object into storage',
60+
errors: [secondaryError],
61+
});
62+
63+
expect(primary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
64+
expect(secondary.deleteObject).not.toHaveBeenCalled();
65+
});
66+
test('includes rollback errors in aggregate when rollback also fails', async () => {
67+
const secondaryError = new Error('secondary write failed');
68+
const rollbackError = new Error('primary rollback failed');
69+
const primary = createMockBlobStorage({
70+
deleteObject: vi.fn().mockRejectedValue(rollbackError),
71+
});
72+
const secondary = createMockBlobStorage({
73+
putObject: vi.fn().mockRejectedValue(secondaryError),
74+
});
75+
const dual = new DualBlobStorage(primary, secondary);
76+
77+
await expect(
78+
dual.putObject({ key: 'k', body: Buffer.from('d'), contentType: 'text/plain' }),
79+
).rejects.toMatchObject({
80+
message: 'Failed to put object into storage',
81+
errors: [secondaryError, rollbackError],
82+
});
83+
});
84+
});
85+
86+
describe('getObject', () => {
87+
test('returns primary result when primary succeeds', async () => {
88+
const primaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'primary' } };
89+
const primary = createMockBlobStorage({
90+
getObject: vi.fn().mockResolvedValue(primaryResult),
91+
});
92+
const secondary = createMockBlobStorage({
93+
getObject: vi.fn().mockRejectedValue(new Error('secondary read failed')),
94+
});
95+
const dual = new DualBlobStorage(primary, secondary);
96+
97+
const result = await dual.getObject({ key: 'k' });
98+
99+
expect(result).toBe(primaryResult);
100+
});
101+
102+
test('falls back to secondary when primary fails', async () => {
103+
const secondaryResult: BlobObject = { stream: new ReadableStream(), metadata: { source: 'secondary' } };
104+
const primary = createMockBlobStorage({
105+
getObject: vi.fn().mockRejectedValue(new Error('primary read failed')),
106+
});
107+
const secondary = createMockBlobStorage({
108+
getObject: vi.fn().mockResolvedValue(secondaryResult),
109+
});
110+
const dual = new DualBlobStorage(primary, secondary);
111+
112+
const result = await dual.getObject({ key: 'k' });
113+
114+
expect(result).toBe(secondaryResult);
115+
});
116+
117+
test('throws aggregate error with both underlying errors when both fail', async () => {
118+
const primaryError = new Error('primary read failed');
119+
const secondaryError = new Error('secondary read failed');
120+
const primary = createMockBlobStorage({
121+
getObject: vi.fn().mockRejectedValue(primaryError),
122+
});
123+
const secondary = createMockBlobStorage({
124+
getObject: vi.fn().mockRejectedValue(secondaryError),
125+
});
126+
const dual = new DualBlobStorage(primary, secondary);
127+
128+
await expect(dual.getObject({ key: 'k' })).rejects.toMatchObject({
129+
message: 'Both primary and secondary storage failed to get object',
130+
errors: [primaryError, secondaryError],
131+
});
132+
});
133+
});
134+
135+
describe('deleteObject', () => {
136+
test('calls both primary and secondary', async () => {
137+
const primary = createMockBlobStorage();
138+
const secondary = createMockBlobStorage();
139+
const dual = new DualBlobStorage(primary, secondary);
140+
141+
await dual.deleteObject({ key: 'k' });
142+
143+
expect(primary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
144+
expect(secondary.deleteObject).toHaveBeenCalledWith({ key: 'k' });
145+
});
146+
147+
test('rejects when one fails', async () => {
148+
const primary = createMockBlobStorage({
149+
deleteObject: vi.fn().mockRejectedValue(new Error('delete failed')),
150+
});
151+
const secondary = createMockBlobStorage();
152+
const dual = new DualBlobStorage(primary, secondary);
153+
154+
await expect(dual.deleteObject({ key: 'k' })).rejects.toThrow('delete failed');
155+
});
156+
});
157+
158+
describe('removeDirectory', () => {
159+
test('returns primary count when both succeed', async () => {
160+
const primary = createMockBlobStorage({
161+
removeDirectory: vi.fn().mockResolvedValue(10),
162+
});
163+
const secondary = createMockBlobStorage({
164+
removeDirectory: vi.fn().mockResolvedValue(10),
165+
});
166+
const dual = new DualBlobStorage(primary, secondary);
167+
168+
const count = await dual.removeDirectory({ key: 'dir/' });
169+
170+
expect(count).toBe(10);
171+
expect(primary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' });
172+
expect(secondary.removeDirectory).toHaveBeenCalledWith({ key: 'dir/' });
173+
});
174+
175+
test('rejects when one fails', async () => {
176+
const primary = createMockBlobStorage();
177+
const secondary = createMockBlobStorage({
178+
removeDirectory: vi.fn().mockRejectedValue(new Error('remove failed')),
179+
});
180+
const dual = new DualBlobStorage(primary, secondary);
181+
182+
await expect(dual.removeDirectory({ key: 'dir/' })).rejects.toThrow('remove failed');
183+
});
184+
});
185+
});

demo/pkg/subgraphs/employees/subgraph/generated/generated.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

demo/pkg/subgraphs/employees/subgraph/schema.graphqls

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,6 @@ type WorkSetup @shareable {
277277

278278
input FindEmployeeCriteria @oneOf {
279279
id: Int
280-
department: Department
281-
title: String
280+
department: Department @cost(weight: 17)
281+
title: String @cost(weight: -3) # totally made-up example for testing
282282
}

0 commit comments

Comments
 (0)