Skip to content

Commit 3c2c269

Browse files
committed
improve test harness producer throughput and better organize run outputs
1 parent 5cfbb59 commit 3c2c269

File tree

6 files changed

+222
-46
lines changed

6 files changed

+222
-46
lines changed

apps/webapp/scripts/profile-runs-replication.ts

Lines changed: 121 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ program
1010
.name("profile-runs-replication")
1111
.description("Profile RunsReplicationService performance and identify bottlenecks")
1212
.option("-c, --config <file>", "Config file path (JSON)")
13+
.option("-n, --name <name>", "Run name/label (e.g., 'baseline', 'optimized-v1')")
14+
.option("--description <text>", "Run description (what is being tested)")
1315
.option("-t, --throughput <number>", "Target throughput (records/sec)", "5000")
1416
.option("-d, --duration <number>", "Test duration per phase (seconds)", "60")
17+
.option("-w, --workers <number>", "Number of producer worker processes", "1")
1518
.option("--mock-clickhouse", "Use mock ClickHouse (CPU-only profiling)")
1619
.option(
1720
"--profile <tool>",
@@ -59,6 +62,18 @@ async function loadConfig(options: any): Promise<HarnessConfig> {
5962
}
6063
}
6164

65+
if (options.workers) {
66+
config.producer.workerCount = parseInt(options.workers, 10);
67+
}
68+
69+
if (options.name) {
70+
config.runName = options.name;
71+
}
72+
73+
if (options.description) {
74+
config.runDescription = options.description;
75+
}
76+
6277
if (options.mockClickhouse) {
6378
config.consumer.useMockClickhouse = true;
6479
}
@@ -76,9 +91,12 @@ async function loadConfig(options: any): Promise<HarnessConfig> {
7691
config.output.verbose = true;
7792
}
7893

79-
// Ensure output directory exists
80-
const timestamp = new Date().toISOString().replace(/[:.]/g, "-").split("T")[0];
81-
const outputDir = path.join(config.profiling.outputDir, timestamp);
94+
// Organize output directory: profiling-results/[runName]-[timestamp]/
95+
const timestamp = new Date().toISOString().replace(/[:.]/g, "-").replace("T", "_").split("_")[0];
96+
const timeWithSeconds = new Date().toISOString().replace(/[:.]/g, "-").replace("T", "_").substring(0, 19);
97+
const runFolder = `${config.runName}-${timeWithSeconds}`;
98+
const outputDir = path.join(config.profiling.outputDir, runFolder);
99+
82100
config.profiling.outputDir = outputDir;
83101
config.output.metricsFile = path.join(outputDir, "metrics.json");
84102

@@ -89,6 +107,10 @@ function printConfig(config: HarnessConfig): void {
89107
console.log("\n" + "=".repeat(60));
90108
console.log("RunsReplicationService Performance Test Harness");
91109
console.log("=".repeat(60));
110+
console.log(`\n🏷️ Run: ${config.runName}`);
111+
if (config.runDescription) {
112+
console.log(`📝 Description: ${config.runDescription}`);
113+
}
92114
console.log("\n📋 Configuration:");
93115
console.log(` Profiling DB: ${config.infrastructure.profilingDatabaseName}`);
94116
console.log(` Output Dir: ${config.profiling.outputDir}`);
@@ -102,6 +124,7 @@ function printConfig(config: HarnessConfig): void {
102124
}
103125

104126
console.log("\n⚙️ Producer Config:");
127+
console.log(` Worker Processes: ${config.producer.workerCount}`);
105128
console.log(` Insert/Update: ${(config.producer.insertUpdateRatio * 100).toFixed(0)}% inserts`);
106129
console.log(` Batch Size: ${config.producer.batchSize}`);
107130
console.log(` Payload Size: ${config.producer.payloadSizeKB} KB`);
@@ -133,6 +156,93 @@ function printSummary(phases: any[]): void {
133156
console.log("=".repeat(60) + "\n");
134157
}
135158

159+
async function createSummaryReport(
160+
config: HarnessConfig,
161+
phases: any[],
162+
outputPath: string
163+
): Promise<void> {
164+
const lines: string[] = [];
165+
166+
// Header
167+
lines.push(`# Performance Test Run: ${config.runName}`);
168+
lines.push("");
169+
lines.push(`**Date**: ${new Date().toISOString()}`);
170+
if (config.runDescription) {
171+
lines.push(`**Description**: ${config.runDescription}`);
172+
}
173+
lines.push("");
174+
175+
// Configuration
176+
lines.push("## Configuration");
177+
lines.push("");
178+
lines.push(`- **Producer Workers**: ${config.producer.workerCount}`);
179+
lines.push(`- **Batch Size**: ${config.producer.batchSize}`);
180+
lines.push(`- **Insert/Update Ratio**: ${(config.producer.insertUpdateRatio * 100).toFixed(0)}% inserts`);
181+
lines.push(`- **Payload Size**: ${config.producer.payloadSizeKB} KB`);
182+
lines.push(`- **Consumer Flush Batch**: ${config.consumer.flushBatchSize}`);
183+
lines.push(`- **Consumer Flush Interval**: ${config.consumer.flushIntervalMs} ms`);
184+
lines.push(`- **Consumer Max Concurrency**: ${config.consumer.maxFlushConcurrency}`);
185+
lines.push(`- **ClickHouse Mode**: ${config.consumer.useMockClickhouse ? "Mock (CPU-only)" : "Real"}`);
186+
lines.push(`- **Profiling Tool**: ${config.profiling.tool}`);
187+
lines.push("");
188+
189+
// Key Results - highlight most important metrics
190+
lines.push("## Key Results");
191+
lines.push("");
192+
193+
// For flamegraph runs, focus on throughput only
194+
if (config.profiling.enabled && config.profiling.tool !== "none") {
195+
lines.push("**Profiling Output**: See flamegraph/analysis files in this directory");
196+
lines.push("");
197+
lines.push("### Throughput");
198+
lines.push("");
199+
lines.push("| Phase | Duration | Producer (rec/sec) | Consumer (rec/sec) |");
200+
lines.push("|-------|----------|--------------------|--------------------|");
201+
for (const phase of phases) {
202+
lines.push(
203+
`| ${phase.phase} | ${(phase.durationMs / 1000).toFixed(1)}s | ${phase.producerThroughput.toFixed(0)} | ${phase.consumerThroughput.toFixed(0)} |`
204+
);
205+
}
206+
} else {
207+
// For non-profiling runs, show ELU prominently
208+
lines.push("### Throughput & Event Loop Utilization");
209+
lines.push("");
210+
lines.push("| Phase | Duration | Producer (rec/sec) | Consumer (rec/sec) | ELU (%) |");
211+
lines.push("|-------|----------|--------------------|--------------------|---------|");
212+
for (const phase of phases) {
213+
lines.push(
214+
`| ${phase.phase} | ${(phase.durationMs / 1000).toFixed(1)}s | ${phase.producerThroughput.toFixed(0)} | ${phase.consumerThroughput.toFixed(0)} | ${(phase.eventLoopUtilization * 100).toFixed(1)}% |`
215+
);
216+
}
217+
}
218+
lines.push("");
219+
220+
// Detailed Metrics
221+
lines.push("## Detailed Metrics");
222+
lines.push("");
223+
224+
for (const phase of phases) {
225+
lines.push(`### ${phase.phase}`);
226+
lines.push("");
227+
lines.push(`- **Duration**: ${(phase.durationMs / 1000).toFixed(1)}s`);
228+
lines.push(`- **Records Produced**: ${phase.recordsProduced.toLocaleString()}`);
229+
lines.push(`- **Records Consumed**: ${phase.recordsConsumed.toLocaleString()}`);
230+
lines.push(`- **Batches Flushed**: ${phase.batchesFlushed.toLocaleString()}`);
231+
lines.push(`- **Producer Throughput**: ${phase.producerThroughput.toFixed(1)} rec/sec`);
232+
lines.push(`- **Consumer Throughput**: ${phase.consumerThroughput.toFixed(1)} rec/sec`);
233+
lines.push(`- **Event Loop Utilization**: ${(phase.eventLoopUtilization * 100).toFixed(1)}%`);
234+
lines.push(`- **Heap Used**: ${phase.heapUsedMB.toFixed(1)} MB`);
235+
lines.push(`- **Heap Total**: ${phase.heapTotalMB.toFixed(1)} MB`);
236+
lines.push(`- **Replication Lag P50**: ${phase.replicationLagP50.toFixed(1)} ms`);
237+
lines.push(`- **Replication Lag P95**: ${phase.replicationLagP95.toFixed(1)} ms`);
238+
lines.push(`- **Replication Lag P99**: ${phase.replicationLagP99.toFixed(1)} ms`);
239+
lines.push("");
240+
}
241+
242+
// Write to file
243+
await fs.writeFile(outputPath, lines.join("\n"));
244+
}
245+
136246
async function main() {
137247
const options = program.opts();
138248
const config = await loadConfig(options);
@@ -146,14 +256,20 @@ async function main() {
146256
const phases = await harness.run();
147257
await harness.teardown();
148258

149-
// Export metrics
259+
// Export metrics JSON
150260
await harness.exportMetrics(config.output.metricsFile);
151261

262+
// Create summary report
263+
const summaryPath = path.join(config.profiling.outputDir, "SUMMARY.md");
264+
await createSummaryReport(config, phases, summaryPath);
265+
152266
// Print summary
153267
printSummary(phases);
154268

155269
console.log("\n✅ Profiling complete!");
156-
console.log(`📊 Results saved to: ${config.profiling.outputDir}\n`);
270+
console.log(`📊 Results saved to: ${config.profiling.outputDir}`);
271+
console.log(`📄 Summary report: ${summaryPath}`);
272+
console.log(`📊 Detailed metrics: ${config.output.metricsFile}\n`);
157273

158274
if (config.profiling.enabled && config.profiling.tool !== "none") {
159275
console.log("🔥 Profiling data:");

apps/webapp/test/performance/config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export interface TestPhase {
1414

1515
export interface ProducerConfig {
1616
enabled: boolean;
17+
workerCount: number; // Number of parallel producer processes
18+
workerId?: string; // Unique identifier for this specific worker
1719
targetThroughput: number;
1820
insertUpdateRatio: number; // 0.0-1.0, e.g. 0.8 = 80% inserts, 20% updates
1921
batchSize: number;
@@ -57,6 +59,8 @@ export interface InfrastructureConfig {
5759
}
5860

5961
export interface HarnessConfig {
62+
runName: string; // Short identifier for this run (e.g. "baseline", "optimized-v1")
63+
runDescription?: string; // Optional longer description of what this run is testing
6064
phases: TestPhase[];
6165
producer: ProducerConfig;
6266
consumer: ConsumerConfig;
@@ -67,6 +71,8 @@ export interface HarnessConfig {
6771

6872
export function getDefaultConfig(): Partial<HarnessConfig> {
6973
return {
74+
runName: "default",
75+
runDescription: undefined,
7076
phases: [
7177
{
7278
name: "warmup",
@@ -81,6 +87,7 @@ export function getDefaultConfig(): Partial<HarnessConfig> {
8187
],
8288
producer: {
8389
enabled: true,
90+
workerCount: 1,
8491
targetThroughput: 5000,
8592
insertUpdateRatio: 0.8,
8693
batchSize: 500,
@@ -121,6 +128,7 @@ export function getDefaultConfig(): Partial<HarnessConfig> {
121128
}
122129

123130
export interface ProducerMetrics {
131+
workerId?: string; // Unique identifier for this producer worker
124132
totalInserts: number;
125133
totalUpdates: number;
126134
actualThroughput: number;

apps/webapp/test/performance/harness.ts

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import path from "path";
99
export class RunsReplicationHarness {
1010
private prisma: PrismaClient | null = null;
1111
private adminPrisma: PrismaClient | null = null;
12-
private producerProcess: ProducerProcessManager | null = null;
12+
private producerProcesses: ProducerProcessManager[] = [];
1313
private consumerProcess: ConsumerProcessManager | null = null;
1414
private metricsCollector: MetricsCollector;
1515

@@ -52,11 +52,16 @@ export class RunsReplicationHarness {
5252
console.log("Running database migrations...");
5353
await this.runMigrations();
5454

55-
// 5. Configure replication
55+
// 5. Truncate existing TaskRun data for clean slate
56+
console.log("Truncating existing TaskRun data...");
57+
await this.prisma!.$executeRawUnsafe(`TRUNCATE TABLE public."TaskRun" CASCADE;`);
58+
console.log("✅ TaskRun table truncated");
59+
60+
// 6. Configure replication
5661
console.log("Configuring logical replication...");
5762
await this.setupReplication();
5863

59-
// 6. Set up test fixtures
64+
// 7. Set up test fixtures
6065
console.log("Setting up test fixtures...");
6166
await this.setupTestFixtures();
6267

@@ -111,19 +116,31 @@ export class RunsReplicationHarness {
111116

112117
await this.consumerProcess.start();
113118

114-
// 10. Start producer process
115-
console.log("Starting producer process...");
116-
this.producerProcess = new ProducerProcessManager(this.config.producer);
119+
// 10. Start producer processes (multiple workers for high throughput)
120+
const workerCount = this.config.producer.workerCount;
121+
console.log(`Starting ${workerCount} producer process(es)...`);
117122

118-
this.producerProcess.setOnMetrics((metrics) => {
119-
this.metricsCollector.recordProducerMetrics(metrics);
120-
});
123+
for (let i = 0; i < workerCount; i++) {
124+
const workerConfig = { ...this.config.producer };
125+
// Each worker gets a unique ID and a portion of the total throughput
126+
workerConfig.workerId = `worker-${i + 1}`;
127+
workerConfig.targetThroughput = this.config.producer.targetThroughput / workerCount;
121128

122-
this.producerProcess.setOnError((error) => {
123-
console.error("Producer error:", error);
124-
});
129+
const producerProcess = new ProducerProcessManager(workerConfig);
130+
131+
producerProcess.setOnMetrics((metrics) => {
132+
this.metricsCollector.recordProducerMetrics(metrics);
133+
});
125134

126-
await this.producerProcess.start();
135+
producerProcess.setOnError((error) => {
136+
console.error(`Producer worker ${i + 1} error:`, error);
137+
});
138+
139+
await producerProcess.start();
140+
this.producerProcesses.push(producerProcess);
141+
142+
console.log(` ✓ Producer worker ${i + 1}/${workerCount} started (target: ${workerConfig.targetThroughput.toFixed(0)} rec/sec)`);
143+
}
127144

128145
console.log("\n✅ Profiling environment ready!\n");
129146
}
@@ -139,18 +156,23 @@ export class RunsReplicationHarness {
139156

140157
this.metricsCollector.startPhase(phase.name);
141158

142-
// Start producer
143-
this.producerProcess!.send({
144-
type: "start",
145-
throughput: phase.targetThroughput,
146-
});
159+
// Start all producer workers
160+
const perWorkerThroughput = phase.targetThroughput / this.producerProcesses.length;
161+
for (const producer of this.producerProcesses) {
162+
producer.send({
163+
type: "start",
164+
throughput: perWorkerThroughput,
165+
});
166+
}
147167

148168
// Wait for phase duration
149169
await this.waitWithProgress(phase.durationSec * 1000);
150170

151-
// Stop producer
152-
this.producerProcess!.send({ type: "stop" });
153-
console.log("\nProducer stopped, waiting for consumer to catch up...");
171+
// Stop all producer workers
172+
for (const producer of this.producerProcesses) {
173+
producer.send({ type: "stop" });
174+
}
175+
console.log("\nAll producers stopped, waiting for consumer to catch up...");
154176

155177
// Wait for consumer to catch up
156178
await this.waitForReplicationLag(100, 60000);
@@ -168,9 +190,9 @@ export class RunsReplicationHarness {
168190
async teardown(): Promise<void> {
169191
console.log("\n🧹 Tearing down...\n");
170192

171-
if (this.producerProcess) {
172-
console.log("Stopping producer process...");
173-
await this.producerProcess.stop();
193+
if (this.producerProcesses.length > 0) {
194+
console.log(`Stopping ${this.producerProcesses.length} producer process(es)...`);
195+
await Promise.all(this.producerProcesses.map((p) => p.stop()));
174196
}
175197

176198
if (this.consumerProcess) {
@@ -259,33 +281,41 @@ export class RunsReplicationHarness {
259281
`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`
260282
);
261283

262-
// Check if slot exists
284+
// Drop existing replication slot if it exists (ensures clean slate)
263285
const slotExists = await this.prisma!.$queryRaw<Array<{ slot_name: string }>>`
264286
SELECT slot_name FROM pg_replication_slots WHERE slot_name = ${this.config.consumer.slotName};
265287
`;
266288

267-
if (slotExists.length === 0) {
289+
if (slotExists.length > 0) {
290+
console.log(`🧹 Dropping existing replication slot: ${this.config.consumer.slotName}`);
268291
await this.prisma!.$executeRawUnsafe(
269-
`SELECT pg_create_logical_replication_slot('${this.config.consumer.slotName}', 'pgoutput');`
292+
`SELECT pg_drop_replication_slot('${this.config.consumer.slotName}');`
270293
);
271-
console.log(`✅ Created replication slot: ${this.config.consumer.slotName}`);
272-
} else {
273-
console.log(`✅ Replication slot exists: ${this.config.consumer.slotName}`);
274294
}
275295

276-
// Check if publication exists
296+
// Drop and recreate publication (ensures clean slate)
277297
const pubExists = await this.prisma!.$queryRaw<Array<{ pubname: string }>>`
278298
SELECT pubname FROM pg_publication WHERE pubname = ${this.config.consumer.publicationName};
279299
`;
280300

281-
if (pubExists.length === 0) {
301+
if (pubExists.length > 0) {
302+
console.log(`🧹 Dropping existing publication: ${this.config.consumer.publicationName}`);
282303
await this.prisma!.$executeRawUnsafe(
283-
`CREATE PUBLICATION ${this.config.consumer.publicationName} FOR TABLE "TaskRun";`
304+
`DROP PUBLICATION ${this.config.consumer.publicationName};`
284305
);
285-
console.log(`✅ Created publication: ${this.config.consumer.publicationName}`);
286-
} else {
287-
console.log(`✅ Publication exists: ${this.config.consumer.publicationName}`);
288306
}
307+
308+
// Create fresh publication
309+
await this.prisma!.$executeRawUnsafe(
310+
`CREATE PUBLICATION ${this.config.consumer.publicationName} FOR TABLE "TaskRun";`
311+
);
312+
console.log(`✅ Created publication: ${this.config.consumer.publicationName}`);
313+
314+
// Create fresh replication slot
315+
await this.prisma!.$executeRawUnsafe(
316+
`SELECT pg_create_logical_replication_slot('${this.config.consumer.slotName}', 'pgoutput');`
317+
);
318+
console.log(`✅ Created replication slot: ${this.config.consumer.slotName}`);
289319
}
290320

291321
private async setupTestFixtures(): Promise<void> {

0 commit comments

Comments
 (0)