diff --git a/src/CLAUDE.md b/src/CLAUDE.md index c69b28041c..dac94dff47 100644 --- a/src/CLAUDE.md +++ b/src/CLAUDE.md @@ -105,6 +105,90 @@ CoCalc is organized as a monorepo with key packages: 5. **Authentication**: Each conat request includes account_id and is subject to permission checks at the hub level 6. **Subjects**: Messages are routed using hierarchical subjects like `hub.account.{uuid}.{service}` or `project.{uuid}.{compute_server_id}.{service}` +### Conat Message Patterns + +CoCalc's Conat messaging system uses hierarchical dot-separated subject patterns for routing messages between distributed services: + +#### User Account Messages +``` +hub.account.{account_id}.{service} +``` +- **account_id**: UUID v4 format (e.g., `123e4567-e89b-12d3-a456-426614174000`) +- **service**: API service name (`api`, `projects`, `db`, `purchases`, `jupyter`, `sync`, `org`, `messages`) +- **Examples**: + - `hub.account.123e4567-e89b-12d3-a456-426614174000.api` - Main API calls + - `hub.account.123e4567-e89b-12d3-a456-426614174000.projects` - Project operations + - `hub.account.123e4567-e89b-12d3-a456-426614174000.db` - Database operations + +#### Project Messages +``` +project.{project_id}.{compute_server_id}.{service}.{path} +``` +- **project_id**: UUID v4 format for the project +- **compute_server_id**: Numeric ID or `-` for default/no specific server +- **service**: Service name (`api`, `terminal`, `sync`, `jupyter`, etc.) +- **path**: Base64-encoded file path or `-` for no path +- **Examples**: + - `project.456e7890-e89b-12d3-a456-426614174001.1.api.-` - Project API (compute server 1) + - `project.456e7890-e89b-12d3-a456-426614174001.-.terminal.L2hvbWUvdXNlcg==` - Terminal service (path: `/home/user`) + - `project.456e7890-e89b-12d3-a456-426614174001.2.sync.-` - Sync service (compute server 2) + +#### Hub Project Messages +``` +hub.project.{project_id}.{service} +``` +- Used for hub-level project operations +- **Examples**: + - `hub.project.456e7890-e89b-12d3-a456-426614174001.api` - Project API calls + - `hub.project.456e7890-e89b-12d3-a456-426614174001.sync` - Project sync operations + +#### Browser Session Messages +``` +{sessionId}.account-{account_id}.{service} +``` +- Used for browser-specific sessions +- **sessionId**: Unique session identifier +- **Examples**: `{session123}.account-123e4567-e89b-12d3-a456-426614174000.sync` + +#### Service-Specific Messages +``` +{service}.account-{account_id}.api +{service}.project-{project_id}.api +``` +- Used by global services like time, LLM, etc. +- **Examples**: + - `time.account-123e4567-e89b-12d3-a456-426614174000.api` - Time service + - `llm.project-456e7890-e89b-12d3-a456-426614174001.api` - LLM service + +#### Pattern Matching +- `*` - Matches any single segment +- `>` - Matches the rest of the subject (catch-all) +- Used for subscribing to multiple related subjects + +#### Key Features +- **Automatic Chunking**: Large messages are automatically split and reassembled +- **Multiple Encodings**: MsgPack (compact) and JSON supported +- **Interest Awareness**: Wait for subscribers before sending messages +- **Delivery Confirmation**: Optional confirmation of message receipt +- **Authentication**: Per-subject permission checking with account/project IDs + +#### Usage in Code +```typescript +// Account message +const accountSubject = `hub.account.${accountId}.api`; + +// Project message using helper +import { projectSubject } from "@cocalc/conat/names"; +const projectSub = projectSubject({ + project_id: projectId, + compute_server_id: 1, + service: 'terminal', + path: '/home/user' +}); +``` + +These patterns ensure proper routing, authentication, and isolation between different users and projects in the distributed system. The hierarchical structure allows for efficient pattern matching and scalable message routing across the CoCalc platform. + ### Key Technologies - **TypeScript**: Primary language for all new code diff --git a/src/packages/conat/benchmark.ts b/src/packages/conat/benchmark.ts new file mode 100644 index 0000000000..90757e3770 --- /dev/null +++ b/src/packages/conat/benchmark.ts @@ -0,0 +1,360 @@ +#!/usr/bin/env node + +import { AsciiTable3 } from "ascii-table3"; + +import { Patterns } from "./core/patterns"; +import { getSplitCacheStats, setSplitCacheEnabled } from "./core/split-cache"; +import { + clearConsistentHashCache, + consistentHashingChoice, + getConsistentHashCacheStats, + setConsistentHashCacheEnabled, +} from "./core/sticky"; + +const ITERATIONS = 10; +const MESSAGE_COUNT = 100_000; +const PATTERN_COUNT = 1_000; // will be proportional to that number +const NS_TO_MS = 1_000_000; + +// Helper functions for statistics +function calculateMean(values: number[]): number { + return values.reduce((sum, val) => sum + val, 0) / values.length; +} + +function calculateStdDev(values: number[], mean: number): number { + const variance = + values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / + values.length; + return Math.sqrt(variance); +} + +function formatStat(avg: number, std: number): string { + const relativeStd = (std / avg) * 100; + const n = Math.round(avg).toString().padStart(6); + if (isNaN(relativeStd) || !isFinite(relativeStd)) { + return `${n}`.padEnd(10); + } + const s = Math.round(relativeStd).toString().padStart(4); + return `${n} ±${s}%`; +} + +// Generate realistic CoCalc patterns based on CLAUDE.md patterns +function generateRealisticPatterns(num: number): string[] { + const patterns: string[] = []; + + // Generate 10000 accounts with 10 interests each + for (let i = 0; i < num; i++) { + const accountId = `${i + .toString() + .padStart(8, "0")}-e89b-12d3-a456-426614174000`; + const services = [ + "api", + "projects", + "db", + "purchases", + "jupyter", + "sync", + "org", + "messages", + "llm", + "billing", + ]; + + for (const service of services) { + patterns.push(`hub.account.${accountId}.${service}`); + } + } + + // Generate 10000 projects with 3 interests each + for (let i = 0; i < num; i++) { + const projectId = `${i + .toString() + .padStart(8, "0")}-proj-12d3-a456-426614174001`; + const services = ["api", "sync"]; + const computeServices = ["terminal"]; + + // Hub project patterns + for (const service of services) { + patterns.push(`hub.project.${projectId}.${service}`); + } + + // Project compute patterns + for (const service of computeServices) { + patterns.push(`project.${projectId}.1.${service}.-`); + } + } + + // Additional realistic patterns (1,000 patterns) + const additionalPatterns = [ + "time.account-*.api", + "llm.project-*.api", + "system.stats.>", + "browser.session.*.sync", + "notifications.account.*.alerts", + ]; + + for (let i = 0; i < Math.floor(num / 100); i++) { + for (const pattern of additionalPatterns) { + patterns.push(pattern.replace("*", `${i.toString().padStart(6, "0")}`)); + } + } + + return patterns; +} + +// Generate realistic message subjects for testing +function generateRealisticMessages(count: number): string[] { + const messages: string[] = []; + + for (let i = 0; i < count; i++) { + const rand = Math.random(); + + if (rand < 0.7) { + // 70% exact account/project matches + if (Math.random() < 0.6) { + const accountId = `${Math.floor(Math.random() * PATTERN_COUNT) + .toString() + .padStart(8, "0")}-e89b-12d3-a456-426614174000`; + const services = [ + "api", + "projects", + "db", + "purchases", + "jupyter", + "sync", + "org", + "messages", + ]; + const service = services[Math.floor(Math.random() * services.length)]; + messages.push(`hub.account.${accountId}.${service}`); + } else { + const projectId = `${Math.floor(Math.random() * PATTERN_COUNT) + .toString() + .padStart(8, "0")}-proj-12d3-a456-426614174001`; + const services = ["api", "sync", "terminal"]; + const service = services[Math.floor(Math.random() * services.length)]; + if (service === "terminal") { + messages.push(`project.${projectId}.1.${service}.-`); + } else { + messages.push(`hub.project.${projectId}.${service}`); + } + } + } else if (rand < 0.9) { + // 20% stream subjects (multiple matches) + const streamId = Math.floor(Math.random() * Math.floor(PATTERN_COUNT / 100)) + .toString() + .padStart(6, "0"); + const services = ["time", "llm", "notifications", "browser", "system"]; + const service = services[Math.floor(Math.random() * services.length)]; + messages.push(`${service}.account-${streamId}.api`); + } else { + // 10% completely random subjects + const segments = Math.floor(Math.random() * 5) + 2; + const parts: string[] = []; + for (let j = 0; j < segments; j++) { + parts.push(`seg${Math.floor(Math.random() * 1000)}`); + } + messages.push(parts.join(".")); + } + } + + return messages; +} + +function benchmark() { + console.log("CoCalc Conat Routing Benchmark"); + console.log("==============================="); + + console.log( + `Running ${ITERATIONS} iterations with ${MESSAGE_COUNT.toLocaleString()} messages each...`, + ); + console.log(); + + // Data structures to collect results across iterations + const variantNames = [ + "No Caching", + "Split Cache", + "Hash Cache", + "Both Caches", + ]; + const variantConfigs = [ + [false, false], // No Caching + [true, false], // Split Cache + [false, true], // Hash Cache + [true, true], // Both Caches + ]; + + const results: { + name: string; + setupTimes: number[]; + matchTimes: number[]; + throughputs: number[]; + splitCacheHitRates: number[]; + hashCacheHitRates: number[]; + }[] = variantNames.map((name) => ({ + name, + setupTimes: [], + matchTimes: [], + throughputs: [], + splitCacheHitRates: [], + hashCacheHitRates: [], + })); + + // Run iterations + for (let iter = 0; iter < ITERATIONS; iter++) { + console.log(`Iteration ${iter + 1}/${ITERATIONS}...`); + + // Generate fresh patterns and messages for each iteration + const patterns = generateRealisticPatterns(PATTERN_COUNT); + const messages = generateRealisticMessages(MESSAGE_COUNT); + + // Run all 4 variants on the same data + for ( + let variantIndex = 0; + variantIndex < variantNames.length; + variantIndex++ + ) { + const [splitCacheEnabled, hashCacheEnabled] = + variantConfigs[variantIndex]; + const result = results[variantIndex]; + + // Configure caches + setSplitCacheEnabled(splitCacheEnabled); + setConsistentHashCacheEnabled(hashCacheEnabled); + clearConsistentHashCache(); // Reset cache stats for accurate measurement + + const p = new Patterns(); + + // Setup timing + const startSetup = process.hrtime.bigint(); + for (let i = 0; i < patterns.length; i++) { + p.set(patterns[i], `handler-${i}`); + } + const endSetup = process.hrtime.bigint(); + const setupTime = Number(endSetup - startSetup) / NS_TO_MS; + result.setupTimes.push(setupTime); + + // Create a set of fake targets for consistent hashing simulation + const targets = new Set([ + "target1", + "target2", + "target3", + "target4", + "target5", + ]); + + // Realistic benchmark: pattern matching + target selection (when matches found) + const startMatch = process.hrtime.bigint(); + let totalMatches = 0; + let totalTargetSelections = 0; + let messagesWithMatches = 0; + + for (const message of messages) { + // Step 1: Pattern matching (uses split cache) + const matches = p.matches(message); + totalMatches += matches.length; + + // Step 2: Target selection for each match (simulates realistic routing) + if (matches.length > 0) { + messagesWithMatches++; + // Always use consistent hashing - caching is controlled internally + const selectedTarget = consistentHashingChoice(targets, message); + totalTargetSelections++; + // Use the result to avoid optimization + if (selectedTarget.length === 0) totalTargetSelections--; + } + } + const endMatch = process.hrtime.bigint(); + + // Consistency check: totalTargetSelections should equal messagesWithMatches + if (totalTargetSelections !== messagesWithMatches) { + console.error( + `Consistency error in ${result.name}: totalTargetSelections=${totalTargetSelections}, messagesWithMatches=${messagesWithMatches}`, + ); + } + + const matchTime = Number(endMatch - startMatch) / NS_TO_MS; + const throughput = messages.length / (matchTime / 1000); + + result.matchTimes.push(matchTime); + result.throughputs.push(throughput); + + // Get cache hit rates + const splitStats = getSplitCacheStats(); + const splitCacheHitRate = + splitCacheEnabled && splitStats.enabled ? splitStats.hitRate || 0 : 0; + const hashStats = getConsistentHashCacheStats(); + const hashCacheHitRate = + hashCacheEnabled && hashStats.enabled ? hashStats.hitRate || 0 : 0; + + result.splitCacheHitRates.push(splitCacheHitRate); + result.hashCacheHitRates.push(hashCacheHitRate); + } + } + + console.log(); + + // Calculate statistics and use average times for speedup calculation + const variantAvgTimes: number[] = []; + for (const result of results) { + variantAvgTimes.push(calculateMean(result.matchTimes)); + } + const baselineAvgTime = variantAvgTimes[0]; // No Caching average + + // Create results table using AsciiTable3 + const table = new AsciiTable3("Benchmark Results").setHeading( + "Variant", + "Setup (ms)", + "Match (ms)", + "Throughput", + "Split Hit %", + "Hash Hit %", + "Speedup", + ); + + for (let i = 0; i < results.length; i++) { + const result = results[i]; + const variantAvgTime = variantAvgTimes[i]; + + // Calculate averages and standard deviations + const setupMean = calculateMean(result.setupTimes); + const setupStd = calculateStdDev(result.setupTimes, setupMean); + const matchMean = calculateMean(result.matchTimes); + const matchStd = calculateStdDev(result.matchTimes, matchMean); + const throughputMean = calculateMean(result.throughputs); + const throughputStd = calculateStdDev(result.throughputs, throughputMean); + const splitCacheMean = calculateMean(result.splitCacheHitRates); + const splitCacheStd = calculateStdDev(result.splitCacheHitRates, splitCacheMean); + const hashCacheMean = calculateMean(result.hashCacheHitRates); + const hashCacheStd = calculateStdDev(result.hashCacheHitRates, hashCacheMean); + + // Use average time for speedup calculation + const speedup = baselineAvgTime / variantAvgTime; + + table.addRow( + result.name, + formatStat(setupMean, setupStd), + formatStat(matchMean, matchStd), + formatStat(throughputMean, throughputStd), + formatStat(splitCacheMean, splitCacheStd), + formatStat(hashCacheMean, hashCacheStd), + speedup.toFixed(2), + ); + } + + table.setStyle("unicode-round"); + console.log(table.toString()); + console.log(); + + + console.log( + `✅ Completed ${ITERATIONS} iterations with ${ + ITERATIONS * 4 + } total benchmark runs`, + ); + console.log(" All variants ran on identical data for fair comparison"); +} + +if (require.main === module) { + benchmark(); +} diff --git a/src/packages/conat/core/patterns.test.ts b/src/packages/conat/core/patterns.test.ts index ff70b032a9..7eb22e3688 100644 --- a/src/packages/conat/core/patterns.test.ts +++ b/src/packages/conat/core/patterns.test.ts @@ -1,85 +1,112 @@ /* DEVELOPMENT: -pnpm test ./patterns.test.ts +pnpm test ./patterns.test.ts */ -import { Patterns } from "./patterns"; import { randomId } from "@cocalc/conat/names"; +import { Patterns } from "./patterns"; +import { setSplitCacheEnabled, clearSplitCache } from "./split-cache"; function expectEqual(actual: any[], expected: any[]) { expect(actual).toEqual(expect.arrayContaining(expected)); expect(actual).toHaveLength(expected.length); } -describe("test some basic pattern matching", () => { - it("tests some simple examples with just one or no matches", () => { - const p = new Patterns(); - p.set("x", 0); - p.set("a.b.>", 0); - p.set("a.*", 0); - expectEqual(p.matches("x"), ["x"]); - expectEqual(p.matches("y"), []); - expectEqual(p.matches("a.b.c"), ["a.b.>"]); - expectEqual(p.matches("a.b"), ["a.*"]); - }); +// Test both cached and non-cached variants +const SPLIT_CACHE_VARIANTS = [ + { name: "no-split-cache", enabled: false }, + { name: "with-split-cache", enabled: true }, +]; - it("some examples with several matches", () => { - const p = new Patterns(); - p.set("a.b.>", 0); - p.set("a.*.*", 0); - expectEqual(p.matches("a.b.c"), ["a.b.>", "a.*.*"]); - expectEqual(p.matches("a.b.c.d"), ["a.b.>"]); - }); +SPLIT_CACHE_VARIANTS.forEach(({ name: cacheName, enabled }) => { + describe(`test some basic pattern matching - Patterns (${cacheName})`, () => { + beforeEach(() => { + // Set split cache enabled state for this test variant + setSplitCacheEnabled(enabled); + // Clear cache before each test + clearSplitCache(); + }); + it("tests some simple examples with just one or no matches", () => { + const p = new Patterns(); + p.set("x", 0); + p.set("a.b.>", 0); + p.set("a.*", 0); + expectEqual(p.matches("x"), ["x"]); + expectEqual(p.matches("y"), []); + expectEqual(p.matches("a.b.c"), ["a.b.>"]); + expectEqual(p.matches("a.b"), ["a.*"]); + }); + + it("some examples with several matches", () => { + const p = new Patterns(); + p.set("a.b.>", 0); + p.set("a.*.*", 0); + expectEqual(p.matches("a.b.c"), ["a.b.>", "a.*.*"]); + expectEqual(p.matches("a.b.c.d"), ["a.b.>"]); + }); - it("example where we delete a pattern", () => { - const p = new Patterns(); - p.set("a.b.>", 0); - p.set("a.b.c", 0); - p.set("a.b.d", 0); - expectEqual(p.matches("a.b.c"), ["a.b.>", "a.b.c"]); - expectEqual(p.matches("a.b.d"), ["a.b.>", "a.b.d"]); - expectEqual(p.matches("a.b.c.d"), ["a.b.>"]); - p.delete("a.b.c"); - expectEqual(p.matches("a.b.d"), ["a.b.>", "a.b.d"]); - expectEqual(p.matches("a.b.c"), ["a.b.>"]); - p.delete("a.b.d"); - expectEqual(p.matches("a.b.d"), ["a.b.>"]); - p.delete("a.b.>"); - expectEqual(p.matches("a.b.d"), []); + it("example where we delete a pattern", () => { + const p = new Patterns(); + p.set("a.b.>", 0); + p.set("a.b.c", 0); + p.set("a.b.d", 0); + expectEqual(p.matches("a.b.c"), ["a.b.>", "a.b.c"]); + expectEqual(p.matches("a.b.d"), ["a.b.>", "a.b.d"]); + expectEqual(p.matches("a.b.c.d"), ["a.b.>"]); + p.delete("a.b.c"); + expectEqual(p.matches("a.b.d"), ["a.b.>", "a.b.d"]); + expectEqual(p.matches("a.b.c"), ["a.b.>"]); + p.delete("a.b.d"); + expectEqual(p.matches("a.b.d"), ["a.b.>"]); + p.delete("a.b.>"); + expectEqual(p.matches("a.b.d"), []); + }); }); }); -describe("do some stress tests", () => { - const patterns = 1e5; +SPLIT_CACHE_VARIANTS.forEach(({ name: cacheName, enabled }) => { + describe(`do some stress tests - Patterns (${cacheName})`, () => { + beforeEach(() => { + // Set split cache enabled state for this test variant + setSplitCacheEnabled(enabled); + // Clear cache before each test + clearSplitCache(); + }); + // NOTE: This stress test uses highly diverse random patterns (service.*.randomId, hub.x.randomId) + // which may show different performance characteristics with the global split cache optimization. + // In contrast, realistic CoCalc patterns (hub.account.{id}.{service}) with repeated segments + // benefit from the global split cache due to high cache hit rates. + const patterns = 1_000; - let p; - const knownIds: string[] = []; - it(`create ${patterns} patterns`, () => { - p = new Patterns(); - for (const seg1 of ["service", "hub", "project", "account", "global"]) { - for (const seg2 of ["*", "x"]) { - for (let i = 0; i < patterns / 10; i++) { - const id = randomId(); - knownIds.push(id); - const pattern = `${seg1}.${seg2}.${id}`; - p.set(pattern, 0); + let p; + const knownIds: string[] = []; + it(`create ${patterns} patterns`, () => { + p = new Patterns(); + for (const seg1 of ["service", "hub", "project", "account", "global"]) { + for (const seg2 of ["*", "x"]) { + for (let i = 0; i < patterns / 10; i++) { + const id = randomId(); + knownIds.push(id); + const pattern = `${seg1}.${seg2}.${id}`; + p.set(pattern, 0); + } } } - } - }); + }); - const count = 1e6; - let m = 0; - it(`match ${count} times against them`, () => { - for (const seg1 of ["service", "hub", "project", "account", "global"]) { - for (const seg2 of ["a", "x"]) { - for (let i = 0; i < count / 10; i++) { - const subject = `${seg1}.${seg2}.${knownIds[i] ?? randomId()}`; - m = Math.max(p.matches(subject).length, m); + const count = 10_000; + let m = 0; + it(`match ${count} times against them`, () => { + for (const seg1 of ["service", "hub", "project", "account", "global"]) { + for (const seg2 of ["a", "x"]) { + for (let i = 0; i < count / 10; i++) { + const subject = `${seg1}.${seg2}.${knownIds[i] ?? randomId()}`; + m = Math.max(p.matches(subject).length, m); + } } } - } - expect(m).toBeGreaterThan(0); + expect(m).toBeGreaterThan(0); + }); }); }); diff --git a/src/packages/conat/core/patterns.ts b/src/packages/conat/core/patterns.ts index 79eada9e5e..1234340a4a 100644 --- a/src/packages/conat/core/patterns.ts +++ b/src/packages/conat/core/patterns.ts @@ -1,7 +1,9 @@ +import { EventEmitter } from "events"; import { isEqual } from "lodash"; + import { getLogger } from "@cocalc/conat/client"; -import { EventEmitter } from "events"; import { hash_string } from "@cocalc/util/misc"; +import { splitSubject } from "./split-cache"; type Index = { [pattern: string]: Index | string }; @@ -68,12 +70,12 @@ export class Patterns extends EventEmitter { }; matches = (subject: string): string[] => { - return matchUsingIndex(this.index, subject.split(".")); + return matchUsingIndex(this.index, splitSubject(subject)); }; // return true if there is at least one match hasMatch = (subject: string): boolean => { - return matchUsingIndex(this.index, subject.split("."), true).length > 0; + return matchUsingIndex(this.index, splitSubject(subject), true).length > 0; }; hasPattern = (pattern: string): boolean => { @@ -113,13 +115,13 @@ export class Patterns extends EventEmitter { set = (pattern: string, t: T) => { this.patterns[pattern] = t; - setIndex(this.index, pattern.split("."), pattern); + setIndex(this.index, splitSubject(pattern), pattern); this.emit("change"); }; delete = (pattern: string) => { delete this.patterns[pattern]; - deleteIndex(this.index, pattern.split(".")); + deleteIndex(this.index, splitSubject(pattern)); }; } @@ -221,8 +223,8 @@ export function matchesSegment(pattern, subject): boolean { } export function matchesPattern(pattern, subject): boolean { - const subParts = subject.split("."); - const patParts = pattern.split("."); + const subParts = splitSubject(subject); + const patParts = splitSubject(pattern); let i = 0, j = 0; while (i < subParts.length && j < patParts.length) { diff --git a/src/packages/conat/core/server.ts b/src/packages/conat/core/server.ts index dc87dcf191..e7de654654 100644 --- a/src/packages/conat/core/server.ts +++ b/src/packages/conat/core/server.ts @@ -27,54 +27,55 @@ cd packages/server */ -import type { ConnectionStats, ServerInfo } from "./types"; +import { delay } from "awaiting"; +import { throttle } from "lodash"; +import { Server } from "socket.io"; + +import { getLogger } from "@cocalc/conat/client"; +import { UsageMonitor } from "@cocalc/conat/monitor/usage"; +import { type ConatSocketServer } from "@cocalc/conat/socket"; import { isValidSubject, isValidSubjectWithoutWildcards, } from "@cocalc/conat/util"; -import { Server } from "socket.io"; -import { delay } from "awaiting"; +import { once, until } from "@cocalc/util/async-utils"; +import { is_array } from "@cocalc/util/misc"; +import { reuseInFlight } from "@cocalc/util/reuse-in-flight"; +import { EventEmitter } from "events"; +import { Metrics } from "../types"; import { - ConatError, - connect, Client, type ClientOptions, + ConatError, + connect, MAX_INTEREST_TIMEOUT, STICKY_QUEUE_GROUP, } from "./client"; -import { - RESOURCE, - MAX_CONNECTIONS_PER_USER, - MAX_CONNECTIONS, - MAX_PAYLOAD, - MAX_SUBSCRIPTIONS_PER_CLIENT, - MAX_SUBSCRIPTIONS_PER_HUB, -} from "./constants"; -import { Patterns } from "./patterns"; -import { is_array } from "@cocalc/util/misc"; -import { UsageMonitor } from "@cocalc/conat/monitor/usage"; -import { once, until } from "@cocalc/util/async-utils"; import { clusterLink, type ClusterLink, clusterStreams, type ClusterStreams, - trimClusterStreams, createClusterPersistServer, - Sticky, - Interest, hashInterest, hashSticky, + Interest, + Sticky, + trimClusterStreams, } from "./cluster"; -import { type ConatSocketServer } from "@cocalc/conat/socket"; -import { throttle } from "lodash"; -import { getLogger } from "@cocalc/conat/client"; -import { reuseInFlight } from "@cocalc/util/reuse-in-flight"; -import { type SysConatServer, sysApiSubject, sysApi } from "./sys"; +import { + MAX_CONNECTIONS, + MAX_CONNECTIONS_PER_USER, + MAX_PAYLOAD, + MAX_SUBSCRIPTIONS_PER_CLIENT, + MAX_SUBSCRIPTIONS_PER_HUB, + RESOURCE, +} from "./constants"; +import { Patterns } from "./patterns"; import { forkedConatServer } from "./start-server"; import { stickyChoice } from "./sticky"; -import { EventEmitter } from "events"; -import { Metrics } from "../types"; +import { sysApi, sysApiSubject, type SysConatServer } from "./sys"; +import type { ConnectionStats, ServerInfo } from "./types"; const logger = getLogger("conat:core:server"); @@ -163,6 +164,20 @@ export interface Options { clusterIpAddress?: string; } +// Pattern matching algorithm selection +function createPatternMatcher(): Patterns { + const algo = process.env.COCALC_CONAT_MATCHING_ALGO?.toLowerCase(); + + switch (algo) { + case "original": + case undefined: + default: + return new Patterns(); + } + console.log("ConatServer: Using original Patterns class"); + return new Patterns(); + } + type State = "init" | "ready" | "closed"; export class ConatServer extends EventEmitter { @@ -180,7 +195,7 @@ export class ConatServer extends EventEmitter { public state: State = "init"; private subscriptions: { [socketId: string]: Set } = {}; - public interest: Interest = new Patterns(); + public interest: Interest = createPatternMatcher(); // the target string is JSON.stringify({ id: string; subject: string }), // which is the socket.io room to send the messages to. public sticky: Sticky = {}; diff --git a/src/packages/conat/core/split-cache.ts b/src/packages/conat/core/split-cache.ts new file mode 100644 index 0000000000..503b2c48b7 --- /dev/null +++ b/src/packages/conat/core/split-cache.ts @@ -0,0 +1,97 @@ +import LRU from "lru-cache"; + +export const SPLIT_CACHE_SIZE_DEFAULT = 100_000; + +const SPLIT_CACHE_SIZE: number = parseInt( + process.env.COCALC_CONAT_SPLIT_CACHE_SIZE ?? `${SPLIT_CACHE_SIZE_DEFAULT}`, +); + +// Global LRU cache for string.split(".") operations +// This optimizes performance by avoiding repeated splitting of the same strings +let splitCache: LRU | null = null; +let cacheStats = { hits: 0, misses: 0 }; + +// Global flag for split cache enabled state +let splitCacheEnabled: boolean = + process.env.COCALC_CONAT_SPLIT_CACHE_ENABLED?.toLowerCase() !== "false"; // Default to true + +function getSplitCache(): LRU | null { + if (!splitCacheEnabled) { + return null; + } + + if (!splitCache) { + splitCache = new LRU({ max: SPLIT_CACHE_SIZE }); + } + + return splitCache; +} + +/** + * Optimized string splitting on "." with global LRU caching + * Falls back to regular string.split(".") if caching is disabled + */ +export function splitSubject(subject: string): string[] { + const cache = getSplitCache(); + if (!cache) { + return subject.split("."); + } + + const cached = cache.get(subject); + if (cached !== undefined) { + cacheStats.hits++; + return cached; + } + + cacheStats.misses++; + const segments = subject.split("."); + cache.set(subject, segments); + return segments; +} + +/** + * Set the split cache enabled state (useful for testing) + */ +export function setSplitCacheEnabled(enabled: boolean) { + splitCacheEnabled = enabled; + // Clear existing cache when toggling + if (splitCache) { + splitCache.clear(); + splitCache = null; + } + cacheStats = { hits: 0, misses: 0 }; +} + +/** + * Clear the split cache + */ +export function clearSplitCache() { + if (splitCache) { + splitCache.clear(); + } + cacheStats = { hits: 0, misses: 0 }; +} + +/** + * Get split cache statistics + */ +export function getSplitCacheStats() { + const cache = getSplitCache(); + if (!cache) { + return { + enabled: false, + message: "Split cache is disabled", + }; + } + + const total = cacheStats.hits + cacheStats.misses; + return { + enabled: true, + size: cache.size, + maxSize: cache.max, + hits: cacheStats.hits, + misses: cacheStats.misses, + hitRate: total > 0 ? (cacheStats.hits / total) * 100 : 0, + utilization: (cache.size / cache.max) * 100, + }; +} diff --git a/src/packages/conat/core/sticky.test.ts b/src/packages/conat/core/sticky.test.ts index dbc0f7b409..d966746d46 100644 --- a/src/packages/conat/core/sticky.test.ts +++ b/src/packages/conat/core/sticky.test.ts @@ -1,33 +1,52 @@ -import { consistentHashingChoice } from "./sticky"; +import { + consistentHashingChoice, + clearConsistentHashCache, + setConsistentHashCacheEnabled, +} from "./sticky"; -describe("tests of consistentHashingChoice", () => { - it("throws when set has size 0", () => { - expect(() => consistentHashingChoice(new Set(), "x")).toThrow("size"); - }); +// Test both cached and non-cached variants +const CACHE_VARIANTS = [ + { name: "no-cache", enabled: false }, + { name: "with-cache", enabled: true }, +]; - it("for size one it just returns the unique item", () => { - expect(consistentHashingChoice(new Set(["foo"]), "bar")).toEqual("foo"); - }); +CACHE_VARIANTS.forEach(({ name, enabled }) => { + describe(`tests of consistentHashingChoice - ${name}`, () => { + beforeEach(() => { + // Set cache enabled state for this test variant + setConsistentHashCacheEnabled(enabled); + // Clear cache before each test + clearConsistentHashCache(); + }); - it("for size 3 it gives the same result every time for the same input (and also that it's not stupidly slow)", () => { - const v = new Set(["a", "b", "x"]); - const resource = "thing"; - const choice = consistentHashingChoice(v, resource); - expect(v.has(choice)).toBe(true); - for (let i = 0; i < 1000; i++) { - expect(consistentHashingChoice(v, resource)).toBe(choice); - } - }); + it("throws when set has size 0", () => { + expect(() => consistentHashingChoice(new Set(), "x")).toThrow("size"); + }); + + it("for size one it just returns the unique item", () => { + expect(consistentHashingChoice(new Set(["foo"]), "bar")).toEqual("foo"); + }); + + it("for size 3 it gives the same result every time for the same input (and also that it's not stupidly slow)", () => { + const v = new Set(["a", "b", "x"]); + const resource = "thing"; + const choice = consistentHashingChoice(v, resource); + expect(v.has(choice)).toBe(true); + for (let i = 0; i < 1000; i++) { + expect(consistentHashingChoice(v, resource)).toBe(choice); + } + }); - it("the results are uniformly distributed when the resources are different", () => { - const v = new Set(["a", "b", "x"]); - const c = { a: 0, b: 0, x: 0 }; - for (let i = 0; i < 1000; i++) { - c[consistentHashingChoice(v, `${i}`)] += 1; - } - // just roughly in the direction of uniform... - expect(c.a).toBeGreaterThan(250); - expect(c.b).toBeGreaterThan(250); - expect(c.x).toBeGreaterThan(250); + it("the results are uniformly distributed when the resources are different", () => { + const v = new Set(["a", "b", "x"]); + const c = { a: 0, b: 0, x: 0 }; + for (let i = 0; i < 1000; i++) { + c[consistentHashingChoice(v, `${i}`)] += 1; + } + // just roughly in the direction of uniform... + expect(c.a).toBeGreaterThan(250); + expect(c.b).toBeGreaterThan(250); + expect(c.x).toBeGreaterThan(250); + }); }); }); diff --git a/src/packages/conat/core/sticky.ts b/src/packages/conat/core/sticky.ts index 8c0d2a457b..e6c9e5dae4 100644 --- a/src/packages/conat/core/sticky.ts +++ b/src/packages/conat/core/sticky.ts @@ -1,5 +1,53 @@ import ConsistentHash from "consistent-hash"; +import LRU from "lru-cache"; + +import { getLogger } from "@cocalc/conat/client"; import { hash_string } from "@cocalc/util/misc"; +import { splitSubject } from "./split-cache"; + +const logger = getLogger("conat:consistent-hash-cache"); + +// Cache configuration +export const CONSISTENT_HASH_CACHE_SIZE_DEFAULT = 10_000; + +const CONSISTENT_HASH_CACHE_SIZE: number = parseInt( + process.env.COCALC_CONAT_CONSISTENT_HASH_CACHE_SIZE ?? + `${CONSISTENT_HASH_CACHE_SIZE_DEFAULT}`, +); + +// Global flag for consistent hash cache enabled state +let consistentHashCacheEnabled: boolean = + process.env.COCALC_CONAT_CONSISTENT_HASH_CACHE_ENABLED?.toLowerCase() === + "true"; + +// LRU cache for consistent hashing results +// Key: hash of (sorted targets + resource) +// Value: chosen target +let consistentHashCache: LRU | null = null; +let cacheStats = { hits: 0, misses: 0 }; + +function getCache(): LRU | null { + if (!consistentHashCacheEnabled) { + return null; + } + + if (!consistentHashCache) { + consistentHashCache = new LRU({ + max: CONSISTENT_HASH_CACHE_SIZE, + }); + logger.debug( + `Initialized consistent hash cache with ${CONSISTENT_HASH_CACHE_SIZE} entries`, + ); + } + + return consistentHashCache; +} + +function getCacheKey(targets: Set, resource: string): string { + // Sort targets for consistent cache keys regardless of Set order + const sortedTargets = Array.from(targets).sort().join(","); + return `${sortedTargets}|${resource}`; +} export function consistentHashingChoice( v: Set, @@ -13,6 +61,20 @@ export function consistentHashingChoice( return x; } } + + // Check cache first + const cache = getCache(); + if (cache) { + const cacheKey = getCacheKey(v, resource); + const cached = cache.get(cacheKey); + if (cached !== undefined && v.has(cached)) { + cacheStats.hits++; + return cached; + } + cacheStats.misses++; + } + + // Cache miss or caching disabled - compute the expensive consistent hash const hr = new ConsistentHash({ distribution: "uniform" }); const w = Array.from(v); w.sort(); @@ -22,7 +84,64 @@ export function consistentHashingChoice( // we hash the resource so that the values are randomly distributed even // if the resources look very similar (e.g., subject.1, subject.2, etc.) // I thought that "consistent-hash" hashed the resource, but it doesn't really. - return hr.get(hash_string(resource)); + const result = hr.get(hash_string(resource)); + + // Store in cache if enabled + if (cache) { + const cacheKey = getCacheKey(v, resource); + cache.set(cacheKey, result); + } + + return result; +} + +/** + * Get statistics about consistent hash cache performance + */ +export function getConsistentHashCacheStats() { + const cache = getCache(); + if (!cache) { + return { + enabled: false, + message: "Consistent hash cache is disabled", + }; + } + + const total = cacheStats.hits + cacheStats.misses; + return { + enabled: true, + size: cache.size, + maxSize: cache.max, + hits: cacheStats.hits, + misses: cacheStats.misses, + hitRate: total > 0 ? (cacheStats.hits / total) * 100 : 0, + utilization: (cache.size / cache.max) * 100, + }; +} + +/** + * Clear the consistent hash cache (useful for testing) + */ +export function clearConsistentHashCache() { + if (consistentHashCache) { + consistentHashCache.clear(); + } + // Reset cache instance to force re-initialization with current env vars + consistentHashCache = null; + cacheStats = { hits: 0, misses: 0 }; +} + +/** + * Set the consistent hash cache enabled state (useful for testing) + */ +export function setConsistentHashCacheEnabled(enabled: boolean) { + consistentHashCacheEnabled = enabled; + // Clear existing cache when toggling to ensure clean state + if (consistentHashCache) { + consistentHashCache.clear(); + consistentHashCache = null; + } + cacheStats = { hits: 0, misses: 0 }; } export function stickyChoice({ @@ -42,7 +161,7 @@ export function stickyChoice({ targets: Set; }) => string | undefined; }) { - const v = subject.split("."); + const v = splitSubject(subject); subject = v.slice(0, v.length - 1).join("."); const currentTarget = getStickyTarget({ pattern, subject, targets }); if (currentTarget === undefined || !targets.has(currentTarget)) { diff --git a/src/packages/conat/hub/changefeeds/server.ts b/src/packages/conat/hub/changefeeds/server.ts index 1d685c6c82..676e388826 100644 --- a/src/packages/conat/hub/changefeeds/server.ts +++ b/src/packages/conat/hub/changefeeds/server.ts @@ -11,6 +11,7 @@ import { KEEPALIVE_TIMEOUT, RESOURCE, } from "./util"; + export { type ConatSocketServer }; const logger = getLogger("hub:changefeeds:server"); diff --git a/src/packages/conat/package.json b/src/packages/conat/package.json index 3cc702d846..5b6b86a6cb 100644 --- a/src/packages/conat/package.json +++ b/src/packages/conat/package.json @@ -22,11 +22,20 @@ "clean": "rm -rf dist node_modules", "tsc": "pnpm exec tsc --watch --pretty --preserveWatchOutput", "test": "pnpm exec jest", + "benchmark": "pnpm build && node dist/benchmark.js", "depcheck": "pnpx depcheck --ignores events,bufferutil,utf-8-validate" }, - "files": ["dist/**", "README.md", "package.json"], + "files": [ + "dist/**", + "README.md", + "package.json" + ], "author": "SageMath, Inc.", - "keywords": ["utilities", "conat", "cocalc"], + "keywords": [ + "utilities", + "conat", + "cocalc" + ], "license": "SEE LICENSE.md", "dependencies": { "@cocalc/comm": "workspace:*", @@ -44,6 +53,7 @@ "js-base64": "^3.7.7", "json-stable-stringify": "^1.0.1", "lodash": "^4.17.21", + "lru-cache": "^7.18.3", "socket.io": "^4.8.1", "socket.io-client": "^4.8.1", "utf-8-validate": "^6.0.5" diff --git a/src/packages/conat/persist/auth.ts b/src/packages/conat/persist/auth.ts index abc28e8f76..161cef13d4 100644 --- a/src/packages/conat/persist/auth.ts +++ b/src/packages/conat/persist/auth.ts @@ -1,6 +1,7 @@ import { SERVICE } from "./util"; import { ConatError } from "@cocalc/conat/core/client"; import { normalize } from "path"; +import { splitSubject } from "../core/split-cache"; export const MAX_PATH_LENGTH = 4000; @@ -58,7 +59,7 @@ export function assertHasWritePermission({ { code: 403 }, ); } - const v = subject.split("."); + const v = splitSubject(subject); if (v[0] != service) { throw Error( `bug -- first segment of subject must be '${service}' -- subject='${subject}'`, diff --git a/src/packages/conat/persist/server.ts b/src/packages/conat/persist/server.ts index 5a270630e7..695680464f 100644 --- a/src/packages/conat/persist/server.ts +++ b/src/packages/conat/persist/server.ts @@ -70,6 +70,7 @@ import { throttle } from "lodash"; import { type SetOptions } from "./client"; import { once } from "@cocalc/util/async-utils"; import { UsageMonitor } from "@cocalc/conat/monitor/usage"; +import { splitSubject } from "../core/split-cache"; const logger = getLogger("persist:server"); @@ -139,7 +140,7 @@ export function server({ storage = data.storage; changefeed = data.changefeed; try { - user = socket.subject.split(".")[1]; + user = splitSubject(socket.subject)[1]; usage.add(user); added = true; stream = await getStream({ diff --git a/src/packages/conat/socket/server.ts b/src/packages/conat/socket/server.ts index 2f5ac21fba..e9c1ff3b98 100644 --- a/src/packages/conat/socket/server.ts +++ b/src/packages/conat/socket/server.ts @@ -9,6 +9,7 @@ import { ServerSocket } from "./server-socket"; import { delay } from "awaiting"; import { type Headers } from "@cocalc/conat/core/client"; import { getLogger } from "@cocalc/conat/client"; +import { splitSubject } from "../core/split-cache"; const logger = getLogger("socket:server"); @@ -63,7 +64,7 @@ export class ConatSocketServer extends ConatSocketBase { return; } const cmd = mesg.headers?.[SOCKET_HEADER_CMD]; - const id = mesg.subject.split(".").slice(-1)[0]; + const id = splitSubject(mesg.subject).slice(-1)[0]; let socket = this.sockets[id]; if (socket === undefined) { diff --git a/src/packages/conat/socket/util.ts b/src/packages/conat/socket/util.ts index 9e81f08439..83759b48db 100644 --- a/src/packages/conat/socket/util.ts +++ b/src/packages/conat/socket/util.ts @@ -1,3 +1,5 @@ +import { splitSubject } from "../core/split-cache"; + export const SOCKET_HEADER_CMD = "CN-SocketCmd"; export const SOCKET_HEADER_SEQ = "CN-SocketSeq"; @@ -69,7 +71,7 @@ export interface ConatSocketOptions extends SocketConfiguration { export const RECONNECT_DELAY = 500; export function clientSubject(subject: string) { - const segments = subject.split("."); + const segments = splitSubject(subject); segments[segments.length - 2] = "client"; return segments.join("."); } diff --git a/src/packages/conat/util.ts b/src/packages/conat/util.ts index 670fede4d9..24f3a5ee51 100644 --- a/src/packages/conat/util.ts +++ b/src/packages/conat/util.ts @@ -2,6 +2,7 @@ import jsonStableStringify from "json-stable-stringify"; import { encode as encodeBase64, decode as decodeBase64 } from "js-base64"; export { encodeBase64, decodeBase64 }; import { reuseInFlight } from "@cocalc/util/reuse-in-flight"; +import { splitSubject } from "./core/split-cache"; export function handleErrorMessage(mesg) { if (mesg?.error) { @@ -22,8 +23,8 @@ export function matchesPattern({ pattern: string; subject: string; }): boolean { - const subParts = subject.split("."); - const patParts = pattern.split("."); + const subParts = splitSubject(subject); + const patParts = splitSubject(pattern); let i = 0, j = 0; while (i < subParts.length && j < patParts.length) { @@ -41,7 +42,7 @@ export function matchesPattern({ export function isValidSubject(subject: string): boolean { if (typeof subject !== "string" || subject.length === 0) return false; if (subject.startsWith(".") || subject.endsWith(".")) return false; - const tokens = subject.split("."); + const tokens = splitSubject(subject); // No empty tokens if (tokens.some((t) => t.length === 0)) return false; for (let i = 0; i < tokens.length; ++i) { diff --git a/src/packages/pnpm-lock.yaml b/src/packages/pnpm-lock.yaml index 3c27e4b55d..fa8a4ef89b 100644 --- a/src/packages/pnpm-lock.yaml +++ b/src/packages/pnpm-lock.yaml @@ -212,6 +212,9 @@ importers: lodash: specifier: ^4.17.21 version: 4.17.21 + lru-cache: + specifier: ^7.18.3 + version: 7.18.3 socket.io: specifier: ^4.8.1 version: 4.8.1(bufferutil@4.0.9)(utf-8-validate@6.0.5)