Skip to content

Commit b8cb13a

Browse files
committed
fix: add getReplicatedTableName
1 parent 93e1a2d commit b8cb13a

File tree

3 files changed

+32
-7
lines changed

3 files changed

+32
-7
lines changed

apps/worker/src/jobs/cron.delete-projects.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { logger } from '@/utils/logger';
2-
import { TABLE_NAMES, ch, db } from '@openpanel/db';
2+
import { TABLE_NAMES, ch, db, getReplicatedTableName } from '@openpanel/db';
33
import type { CronQueuePayload } from '@openpanel/queue';
44
import type { Job } from 'bullmq';
55
import sqlstring from 'sqlstring';
@@ -46,10 +46,7 @@ export async function deleteProjects(job: Job<CronQueuePayload>) {
4646
];
4747

4848
for (const table of tables) {
49-
const query =
50-
process.env.SELF_HOSTED === 'true'
51-
? `ALTER TABLE ${table} DELETE WHERE ${where};`
52-
: `ALTER TABLE ${table}_replicated ON CLUSTER '{cluster}' DELETE WHERE ${where};`;
49+
const query = `ALTER TABLE ${getReplicatedTableName(table)} DELETE WHERE ${where};`;
5350

5451
await ch.command({
5552
query,

packages/db/src/clickhouse/client.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,29 @@ export const TABLE_NAMES = {
5757
events_imports: 'events_imports',
5858
};
5959

60+
/**
61+
* Check if ClickHouse is running in clustered mode
62+
* Clustered mode = production (not self-hosted)
63+
* Non-clustered mode = self-hosted environments
64+
*/
65+
export function isClickhouseClustered(): boolean {
66+
return !(
67+
process.env.SELF_HOSTED === 'true' || process.env.SELF_HOSTED === '1'
68+
);
69+
}
70+
71+
/**
72+
* Get the replicated table name for mutations
73+
* In clustered mode, returns table_name_replicated
74+
* In non-clustered mode, returns the original table name
75+
*/
76+
export function getReplicatedTableName(tableName: string): string {
77+
if (isClickhouseClustered()) {
78+
return `${tableName}_replicated ON CLUSTER '{cluster}'`;
79+
}
80+
return tableName;
81+
}
82+
6083
export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
6184
max_open_connections: 30,
6285
request_timeout: 300000,

packages/db/src/services/import.service.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
chInsertCSV,
77
convertClickhouseDateToJs,
88
formatClickhouseDate,
9+
getReplicatedTableName,
910
} from '../clickhouse/client';
1011
import { csvEscapeField, csvEscapeJson } from '../clickhouse/csv';
1112
import { type Prisma, db } from '../prisma-client';
@@ -107,8 +108,10 @@ export async function generateSessionIds(
107108

108109
// Use SQL to generate deterministic session IDs based on device_id + 30-min time windows
109110
// This ensures same events always get same session IDs regardless of import order
111+
// In clustered mode, we must use the replicated table for mutations
112+
const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports);
110113
const updateQuery = `
111-
ALTER TABLE ${TABLE_NAMES.events_imports}
114+
ALTER TABLE ${mutationTableName}
112115
UPDATE session_id = lower(hex(MD5(concat(
113116
device_id,
114117
'-',
@@ -572,8 +575,10 @@ export async function backfillSessionsToProduction(
572575
* Mark import as complete by updating status
573576
*/
574577
export async function markImportComplete(importId: string): Promise<void> {
578+
// In clustered mode, we must use the replicated table for mutations
579+
const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports);
575580
const updateQuery = `
576-
ALTER TABLE ${TABLE_NAMES.events_imports}
581+
ALTER TABLE ${mutationTableName}
577582
UPDATE import_status = 'processed'
578583
WHERE import_id = {importId:String}
579584
`;

0 commit comments

Comments
 (0)