Skip to content

Commit b1e6dd0

Browse files
committed
use a less CPU-intensive way of inserting task runs
1 parent 3c2c269 commit b1e6dd0

File tree

8 files changed

+204
-22
lines changed

8 files changed

+204
-22
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ export class RunsReplicationService {
772772

773773
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
774774
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
775-
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
775+
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertUnsafe(
776776
taskRunInserts,
777777
{
778778
params: {
@@ -797,7 +797,7 @@ export class RunsReplicationService {
797797

798798
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
799799
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
800-
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
800+
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloadsUnsafe(
801801
payloadInserts,
802802
{
803803
params: {

apps/webapp/scripts/profile-runs-replication.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import { program } from "commander";
44
import path from "path";
55
import fs from "fs/promises";
6+
import { spawn } from "child_process";
67
import { RunsReplicationHarness } from "../test/performance/harness";
78
import { getDefaultConfig, type HarnessConfig } from "../test/performance/config";
89

@@ -243,6 +244,48 @@ async function createSummaryReport(
243244
await fs.writeFile(outputPath, lines.join("\n"));
244245
}
245246

247+
async function generateVisualization(config: HarnessConfig): Promise<void> {
248+
console.log("\n🔥 Generating flamegraph visualization...");
249+
250+
// Find the clinic flame data file
251+
const files = await fs.readdir(config.profiling.outputDir);
252+
const flameDataFile = files.find((f) => f.endsWith(".clinic-flame"));
253+
254+
if (!flameDataFile) {
255+
console.warn("⚠️ No clinic flame data file found. Skipping visualization.");
256+
return;
257+
}
258+
259+
const dataPath = path.join(config.profiling.outputDir, flameDataFile);
260+
console.log(` Data file: ${flameDataFile}`);
261+
262+
// Run clinic flame --visualize-only
263+
const clinicPath = path.join(__dirname, "../node_modules/.bin/clinic");
264+
265+
await new Promise<void>((resolve, reject) => {
266+
const proc = spawn(clinicPath, ["flame", "--visualize-only", dataPath], {
267+
stdio: "inherit",
268+
cwd: path.join(__dirname, ".."),
269+
});
270+
271+
proc.on("exit", (code) => {
272+
if (code === 0) {
273+
console.log("✅ Flamegraph generated successfully!");
274+
console.log(` Open: ${dataPath.replace(".clinic-flame", ".clinic-flame.html")}\n`);
275+
resolve();
276+
} else {
277+
console.error(`⚠️ Flamegraph generation exited with code ${code}`);
278+
resolve(); // Don't fail the whole run
279+
}
280+
});
281+
282+
proc.on("error", (error) => {
283+
console.error("⚠️ Error generating flamegraph:", error.message);
284+
resolve(); // Don't fail the whole run
285+
});
286+
});
287+
}
288+
246289
async function main() {
247290
const options = program.opts();
248291
const config = await loadConfig(options);
@@ -272,8 +315,8 @@ async function main() {
272315
console.log(`📊 Detailed metrics: ${config.output.metricsFile}\n`);
273316

274317
if (config.profiling.enabled && config.profiling.tool !== "none") {
275-
console.log("🔥 Profiling data:");
276-
console.log(` View flamegraph/analysis in: ${config.profiling.outputDir}\n`);
318+
// Generate visualization from collected data
319+
await generateVisualization(config);
277320
}
278321

279322
process.exit(0);

apps/webapp/test/performance/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export interface ConsumerConfig {
3838
redisOptions: RedisOptions;
3939
slotName: string;
4040
publicationName: string;
41+
outputDir?: string; // For shutdown signal file
4142
}
4243

4344
export interface ProfilingConfig {

apps/webapp/test/performance/consumer-runner.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,24 @@
33
import { RunsReplicationService } from "~/services/runsReplicationService.server";
44
import { MockClickHouse } from "./clickhouse-mock";
55
import type { ConsumerConfig } from "./config";
6+
import fs from "fs";
7+
import path from "path";
68

79
async function main() {
810
const hasIPC = !!process.send;
911

1012
if (!hasIPC) {
11-
console.log("Warning: IPC not available (likely running under profiler) - metrics will not be sent to parent");
13+
console.log(
14+
"Warning: IPC not available (likely running under profiler) - metrics will not be sent to parent"
15+
);
1216
}
1317

1418
// Parse configuration from environment variable
1519
const config: ConsumerConfig = JSON.parse(process.env.CONSUMER_CONFIG!);
1620

21+
// Create shutdown signal file path
22+
const shutdownFilePath = path.join(config.outputDir || "/tmp", ".shutdown-signal");
23+
1724
console.log("Consumer process starting with config:", {
1825
useMockClickhouse: config.useMockClickhouse,
1926
flushBatchSize: config.flushBatchSize,
@@ -34,6 +41,7 @@ async function main() {
3441
compression: {
3542
request: true,
3643
},
44+
logLevel: "info",
3745
});
3846
}
3947

@@ -64,6 +72,20 @@ async function main() {
6472
});
6573
}
6674

75+
// Watch for shutdown signal file (works even when IPC is unavailable)
76+
const shutdownCheckInterval = setInterval(() => {
77+
if (fs.existsSync(shutdownFilePath)) {
78+
console.log("Consumer: Shutdown signal file detected, exiting...");
79+
clearInterval(shutdownCheckInterval);
80+
clearInterval(metricsInterval);
81+
// Clean up the signal file
82+
try {
83+
fs.unlinkSync(shutdownFilePath);
84+
} catch (e) {}
85+
process.exit(0);
86+
}
87+
}, 500);
88+
6789
// Send periodic metrics to parent (if IPC available)
6890
const metricsInterval = setInterval(() => {
6991
const memUsage = process.memoryUsage();
@@ -82,14 +104,13 @@ async function main() {
82104
}
83105
}, 1000);
84106

85-
// Listen for shutdown signal from parent (if IPC available)
107+
// Listen for shutdown signal from parent (if IPC available, for non-profiling mode)
86108
if (hasIPC) {
87109
process.on("message", async (msg: any) => {
88110
if (msg.type === "shutdown") {
89-
console.log("Consumer: Received shutdown signal");
111+
console.log("Consumer: Received IPC shutdown message");
112+
clearInterval(shutdownCheckInterval);
90113
clearInterval(metricsInterval);
91-
await service.shutdown();
92-
await service.stop();
93114
process.exit(0);
94115
}
95116
});

apps/webapp/test/performance/consumer.ts

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ChildProcess, spawn } from "child_process";
22
import path from "path";
3+
import fs from "fs";
34
import type { ConsumerConfig, ProfilingConfig } from "./config";
45

56
interface ConsumerMetrics {
@@ -37,6 +38,7 @@ export class ConsumerProcessManager {
3738
const isProfiling = this.profiling.enabled && this.profiling.tool !== "none";
3839

3940
this.process = spawn(args[0], args.slice(1), {
41+
cwd: path.join(__dirname, "../.."), // Run from webapp directory
4042
env: {
4143
...process.env,
4244
CONSUMER_CONFIG: JSON.stringify(this.config),
@@ -99,18 +101,45 @@ export class ConsumerProcessManager {
99101
}
100102

101103
console.log("Stopping consumer process");
102-
this.process.send({ type: "shutdown" });
104+
105+
const isProfiling = this.profiling.enabled && this.profiling.tool !== "none";
106+
107+
if (isProfiling) {
108+
// When profiling, IPC doesn't work - use a shutdown signal file instead
109+
const outputDir = this.config.outputDir || "/tmp";
110+
const shutdownFilePath = path.join(outputDir, ".shutdown-signal");
111+
console.log("Creating shutdown signal file for consumer (profiling mode)");
112+
113+
// Ensure directory exists
114+
if (!fs.existsSync(outputDir)) {
115+
fs.mkdirSync(outputDir, { recursive: true });
116+
}
117+
118+
fs.writeFileSync(shutdownFilePath, "shutdown");
119+
} else {
120+
// For non-profiling runs, use IPC message
121+
try {
122+
this.process.send({ type: "shutdown" });
123+
} catch (error) {
124+
console.warn("Could not send shutdown message, process may have already exited");
125+
}
126+
}
103127

104128
// Wait for process to exit
105129
await new Promise<void>((resolve) => {
130+
// With shutdown signal file, consumer-runner should exit within a few seconds
131+
// With --collect-only, Clinic.js then quickly packages the data and exits
132+
const timeoutMs = isProfiling ? 15000 : 30000;
133+
106134
const timeout = setTimeout(() => {
107-
console.warn("Consumer process did not exit gracefully, killing");
135+
console.warn(`Consumer process did not exit after ${timeoutMs}ms, killing`);
108136
this.process?.kill("SIGKILL");
109137
resolve();
110-
}, 30000); // 30 second timeout
138+
}, timeoutMs);
111139

112-
this.process?.on("exit", () => {
140+
this.process?.on("exit", (code, signal) => {
113141
clearTimeout(timeout);
142+
console.log(`Consumer process exited with code ${code}, signal ${signal}`);
114143
resolve();
115144
});
116145
});
@@ -144,21 +173,30 @@ export class ConsumerProcessManager {
144173
const tool = this.profiling.tool === "both" ? "doctor" : this.profiling.tool;
145174
const runnerPath = path.join(__dirname, "consumer-runner.ts");
146175

176+
// Use clinic from node_modules/.bin directly (more reliable than pnpm exec)
177+
const clinicPath = path.join(__dirname, "../../node_modules/.bin/clinic");
178+
179+
// Point --dest to the output directory itself
180+
// Clinic.js will create PID.clinic-flame inside this directory
181+
const destPath = path.resolve(this.profiling.outputDir);
182+
147183
// Clinic.js requires node, so use node with tsx/register loader
148184
const args = [
149-
"pnpm",
150-
"exec",
151-
"clinic",
185+
clinicPath,
152186
tool,
187+
"--collect-only", // Only collect data, don't generate visualization immediately
188+
"--open=false", // Don't try to open in browser
153189
"--dest",
154-
this.profiling.outputDir,
190+
destPath,
155191
"--",
156192
"node",
157193
"--import",
158194
"tsx",
159195
runnerPath,
160196
];
161197

198+
console.log(`Clinic.js will save profiling data to: ${destPath}`);
199+
162200
return args;
163201
}
164202

apps/webapp/test/performance/harness.ts

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ export class RunsReplicationHarness {
9797

9898
// 9. Start consumer process
9999
console.log("\nStarting consumer process...");
100+
// Set outputDir for shutdown signal file (needed when IPC isn't available under Clinic.js)
101+
this.config.consumer.outputDir = this.config.profiling.outputDir;
102+
100103
this.consumerProcess = new ConsumerProcessManager(
101104
this.config.consumer,
102105
this.config.profiling
@@ -282,15 +285,59 @@ export class RunsReplicationHarness {
282285
);
283286

284287
// Drop existing replication slot if it exists (ensures clean slate)
285-
const slotExists = await this.prisma!.$queryRaw<Array<{ slot_name: string }>>`
286-
SELECT slot_name FROM pg_replication_slots WHERE slot_name = ${this.config.consumer.slotName};
288+
const slotExists = await this.prisma!.$queryRaw<
289+
Array<{ slot_name: string; active: boolean; active_pid: number | null }>
290+
>`
291+
SELECT slot_name, active, active_pid FROM pg_replication_slots WHERE slot_name = ${this.config.consumer.slotName};
287292
`;
288293

289294
if (slotExists.length > 0) {
295+
const slot = slotExists[0];
290296
console.log(`🧹 Dropping existing replication slot: ${this.config.consumer.slotName}`);
291-
await this.prisma!.$executeRawUnsafe(
292-
`SELECT pg_drop_replication_slot('${this.config.consumer.slotName}');`
293-
);
297+
298+
// If the slot is active, terminate the backend process first
299+
if (slot.active && slot.active_pid) {
300+
console.log(
301+
`⚠️ Replication slot is active for PID ${slot.active_pid}, terminating backend...`
302+
);
303+
try {
304+
await this.prisma!.$executeRawUnsafe(
305+
`SELECT pg_terminate_backend(${slot.active_pid});`
306+
);
307+
console.log(`✅ Terminated backend process ${slot.active_pid}`);
308+
309+
// Wait a bit for the termination to complete
310+
await new Promise((resolve) => setTimeout(resolve, 1000));
311+
} catch (error) {
312+
console.warn(
313+
`⚠️ Could not terminate backend ${slot.active_pid}, it may have already exited`
314+
);
315+
}
316+
}
317+
318+
// Now try to drop the slot with retry logic
319+
let attempts = 0;
320+
const maxAttempts = 5;
321+
while (attempts < maxAttempts) {
322+
try {
323+
await this.prisma!.$executeRawUnsafe(
324+
`SELECT pg_drop_replication_slot('${this.config.consumer.slotName}');`
325+
);
326+
console.log(`✅ Dropped replication slot: ${this.config.consumer.slotName}`);
327+
break;
328+
} catch (error: any) {
329+
attempts++;
330+
if (attempts === maxAttempts) {
331+
throw new Error(
332+
`Failed to drop replication slot after ${maxAttempts} attempts: ${error.message}`
333+
);
334+
}
335+
console.log(
336+
`⚠️ Slot still active, waiting 2s before retry (attempt ${attempts}/${maxAttempts})...`
337+
);
338+
await new Promise((resolve) => setTimeout(resolve, 2000));
339+
}
340+
}
294341
}
295342

296343
// Drop and recreate publication (ensures clean slate)

internal-packages/clickhouse/src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import { ClickhouseReader, ClickhouseWriter } from "./client/types.js";
44
import { NoopClient } from "./client/noop.js";
55
import {
66
insertTaskRuns,
7+
insertTaskRunsUnsafe,
78
insertRawTaskRunPayloads,
9+
insertRawTaskRunPayloadsUnsafe,
810
getTaskRunsQueryBuilder,
911
getTaskActivityQueryBuilder,
1012
getCurrentRunningStats,
@@ -169,7 +171,9 @@ export class ClickHouse {
169171
get taskRuns() {
170172
return {
171173
insert: insertTaskRuns(this.writer),
174+
insertUnsafe: insertTaskRunsUnsafe(this.writer),
172175
insertPayloads: insertRawTaskRunPayloads(this.writer),
176+
insertPayloadsUnsafe: insertRawTaskRunPayloadsUnsafe(this.writer),
173177
queryBuilder: getTaskRunsQueryBuilder(this.reader),
174178
countQueryBuilder: getTaskRunsCountQueryBuilder(this.reader),
175179
tagQueryBuilder: getTaskRunTagsQueryBuilder(this.reader),

0 commit comments

Comments
 (0)