diff --git a/apps/dev-playground/config/queries/.appkit-types-cache.json b/apps/dev-playground/config/queries/.appkit-types-cache.json new file mode 100644 index 0000000..adf530d --- /dev/null +++ b/apps/dev-playground/config/queries/.appkit-types-cache.json @@ -0,0 +1,37 @@ +{ + "version": "1", + "queries": { + "apps_list": { + "hash": "e2c65853cf4b332d638bdd30a3aefb69", + "type": "{\n name: \"apps_list\";\n parameters: Record;\n result: Array<{\n /** @sqlType STRING */\n id: string;\n /** @sqlType STRING */\n name: string;\n /** @sqlType STRING */\n creator: string;\n /** @sqlType STRING */\n tags: string;\n /** @sqlType DECIMAL(38,6) */\n totalSpend: number;\n /** @sqlType DATE */\n createdAt: string;\n }>;\n }" + }, + "cost_recommendations": { + "hash": "730c7d8b5e2726981088b5975157b0da", + "type": "{\n name: \"cost_recommendations\";\n parameters: Record;\n result: Array<{\n /** @sqlType INT */\n dummy: number;\n }>;\n }" + }, + "example": { + "hash": "aeb02ed3e8a6c77279099406f8709543", + "type": "{\n name: \"example\";\n parameters: Record;\n result: Array<{\n /** @sqlType BOOLEAN */\n \"(1 = 1)\": boolean;\n }>;\n }" + }, + "spend_data": { + "hash": "caa0430652fe15eff658e48e6dac2446", + "type": "{\n name: \"spend_data\";\n parameters: {\n /** STRING - use sql.string() */\n groupBy: SQLStringMarker;\n /** STRING - use sql.string() */\n aggregationLevel: SQLStringMarker;\n /** DATE - use sql.date() */\n startDate: SQLDateMarker;\n /** DATE - use sql.date() */\n endDate: SQLDateMarker;\n /** STRING - use sql.string() */\n appId: SQLStringMarker;\n /** STRING - use sql.string() */\n creator: SQLStringMarker;\n };\n result: Array<{\n /** @sqlType STRING */\n group_key: string;\n /** @sqlType TIMESTAMP */\n aggregation_period: string;\n /** @sqlType DECIMAL(38,6) */\n cost_usd: number;\n }>;\n }" + }, + "spend_summary": { + "hash": "bbe188624c3f5904c3a7593cb32982d5", + "type": "{\n name: \"spend_summary\";\n parameters: {\n /** STRING - use sql.string() */\n aggregationLevel: SQLStringMarker;\n /** DATE - use sql.date() */\n endDate: SQLDateMarker;\n /** DATE - use sql.date() */\n startDate: SQLDateMarker;\n };\n result: Array<{\n /** @sqlType DECIMAL(33,0) */\n total: number;\n /** @sqlType DECIMAL(33,0) */\n average: number;\n /** @sqlType DECIMAL(33,0) */\n forecasted: number;\n }>;\n }" + }, + "sql_helpers_test": { + "hash": "1322df4ba9c107e8d23e2a04bae860c5", + "type": "{\n name: \"sql_helpers_test\";\n parameters: {\n /** STRING - use sql.string() */\n stringParam: SQLStringMarker;\n /** NUMERIC - use sql.number() */\n numberParam: SQLNumberMarker;\n /** BOOLEAN - use sql.boolean() */\n booleanParam: SQLBooleanMarker;\n /** DATE - use sql.date() */\n dateParam: SQLDateMarker;\n /** TIMESTAMP - use sql.timestamp() */\n timestampParam: SQLTimestampMarker;\n /** STRING - use sql.string() */\n binaryParam: SQLStringMarker;\n };\n result: Array<{\n /** @sqlType STRING */\n string_value: string;\n /** @sqlType STRING */\n number_value: string;\n /** @sqlType STRING */\n boolean_value: string;\n /** @sqlType STRING */\n date_value: string;\n /** @sqlType STRING */\n timestamp_value: string;\n /** @sqlType BINARY */\n binary_value: string;\n /** @sqlType STRING */\n binary_hex: string;\n /** @sqlType INT */\n binary_length: number;\n }>;\n }" + }, + "top_contributors": { + "hash": "2d58759cca2fe31dae06475a23080120", + "type": "{\n name: \"top_contributors\";\n parameters: {\n /** STRING - use sql.string() */\n aggregationLevel: SQLStringMarker;\n /** DATE - use sql.date() */\n startDate: SQLDateMarker;\n /** DATE - use sql.date() */\n endDate: SQLDateMarker;\n };\n result: Array<{\n /** @sqlType STRING */\n app_name: string;\n /** @sqlType DECIMAL(38,6) */\n total_cost_usd: number;\n }>;\n }" + }, + "untagged_apps": { + "hash": "5946262b49710b8ab458d1bf950ff8c9", + "type": "{\n name: \"untagged_apps\";\n parameters: {\n /** STRING - use sql.string() */\n aggregationLevel: SQLStringMarker;\n /** DATE - use sql.date() */\n startDate: SQLDateMarker;\n /** DATE - use sql.date() */\n endDate: SQLDateMarker;\n };\n result: Array<{\n /** @sqlType STRING */\n app_name: string;\n /** @sqlType STRING */\n creator: string;\n /** @sqlType DECIMAL(38,6) */\n total_cost_usd: number;\n /** @sqlType DECIMAL(38,10) */\n avg_period_cost_usd: number;\n }>;\n }" + } + } +} diff --git a/package.json b/package.json index 4039aac..23d8532 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,8 @@ }, "lint-staged": { "(*.ts|*.tsx|*.js|*.jsx|*.json|*.md|*.yml|*.yaml|*.css)": [ - "pnpm lint:fix && pnpm format" + "biome lint --write", + "biome format --write" ] }, "devDependencies": { @@ -48,6 +49,7 @@ "jsdom": "^27.0.0", "lint-staged": "^15.5.1", "plop": "^4.0.4", + "pg": "^8.16.3", "publint": "^0.3.15", "tsdown": "^0.15.7", "tsx": "^4.20.6", diff --git a/packages/app-kit/package.json b/packages/app-kit/package.json index d411e41..b8324f2 100644 --- a/packages/app-kit/package.json +++ b/packages/app-kit/package.json @@ -50,6 +50,7 @@ "@opentelemetry/semantic-conventions": "^1.38.0", "dotenv": "^16.6.1", "express": "^4.22.0", + "pg": "^8.16.3", "shared": "workspace:*", "vite": "npm:rolldown-vite@7.1.14", "ws": "^8.18.3", @@ -57,6 +58,7 @@ }, "devDependencies": { "@types/express": "^4.17.25", + "@types/pg": "^8.15.6", "@types/ws": "^8.18.1", "@vitejs/plugin-react": "^5.1.1" }, diff --git a/packages/app-kit/src/analytics/analytics.ts b/packages/app-kit/src/analytics/analytics.ts index 2b91b68..9fd8b34 100644 --- a/packages/app-kit/src/analytics/analytics.ts +++ b/packages/app-kit/src/analytics/analytics.ts @@ -35,7 +35,7 @@ export class AnalyticsPlugin extends Plugin { this.SQLClient = new SQLWarehouseConnector({ timeout: config.timeout, - telemetry: this.telemetry, + telemetry: config.telemetry, }); } diff --git a/packages/app-kit/src/analytics/tests/analytics.test.ts b/packages/app-kit/src/analytics/tests/analytics.test.ts index 3eb105a..1fd11ab 100644 --- a/packages/app-kit/src/analytics/tests/analytics.test.ts +++ b/packages/app-kit/src/analytics/tests/analytics.test.ts @@ -10,12 +10,53 @@ import { beforeEach, describe, expect, test, vi } from "vitest"; import { AnalyticsPlugin, analytics } from "../analytics"; import type { IAnalyticsConfig } from "../types"; +// Mock CacheManager singleton with actual caching behavior +const { mockCacheStore, mockCacheInstance } = vi.hoisted(() => { + const store = new Map(); + + const generateKey = (parts: unknown[], userKey: string): string => { + const { createHash } = require("node:crypto"); + const allParts = [userKey, ...parts]; + const serialized = JSON.stringify(allParts); + return createHash("sha256").update(serialized).digest("hex"); + }; + + const instance = { + get: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + getOrExecute: vi.fn( + async (key: unknown[], fn: () => Promise, userKey: string) => { + const cacheKey = generateKey(key, userKey); + if (store.has(cacheKey)) { + return store.get(cacheKey); + } + const result = await fn(); + store.set(cacheKey, result); + return result; + }, + ), + generateKey: vi.fn((parts: unknown[], userKey: string) => + generateKey(parts, userKey), + ), + }; + + return { mockCacheStore: store, mockCacheInstance: instance }; +}); + +vi.mock("../../cache", () => ({ + CacheManager: { + getInstanceSync: vi.fn(() => mockCacheInstance), + }, +})); + describe("Analytics Plugin", () => { let config: IAnalyticsConfig; beforeEach(() => { config = { timeout: 5000 }; setupDatabricksEnv(); + mockCacheStore.clear(); }); test("Analytics plugin data should have correct name", () => { @@ -180,7 +221,7 @@ describe("Analytics Plugin", () => { }, { userDatabricksClient: mockUserClient as any, - userName: "user-token-123", + userId: "user-token-123", }, ); @@ -277,7 +318,7 @@ describe("Analytics Plugin", () => { async () => { await handler(mockReq1, mockRes1); }, - { userName: "user-token-1" }, + { userId: "user-token-1" }, ); const mockReq2 = createMockRequest({ @@ -290,7 +331,7 @@ describe("Analytics Plugin", () => { async () => { await handler(mockReq2, mockRes2); }, - { userName: "user-token-2" }, + { userId: "user-token-2" }, ); const mockReq1Again = createMockRequest({ @@ -303,7 +344,7 @@ describe("Analytics Plugin", () => { async () => { await handler(mockReq1Again, mockRes1Again); }, - { userName: "user-token-1" }, + { userId: "user-token-1" }, ); expect(executeMock).toHaveBeenCalledTimes(2); diff --git a/packages/app-kit/src/analytics/types.ts b/packages/app-kit/src/analytics/types.ts index 6e14607..a8efd5d 100644 --- a/packages/app-kit/src/analytics/types.ts +++ b/packages/app-kit/src/analytics/types.ts @@ -2,7 +2,6 @@ import type { BasePluginConfig } from "shared"; export interface IAnalyticsConfig extends BasePluginConfig { timeout?: number; - typePath?: string; } export interface IAnalyticsQueryRequest { diff --git a/packages/app-kit/src/cache/defaults.ts b/packages/app-kit/src/cache/defaults.ts new file mode 100644 index 0000000..3a48b95 --- /dev/null +++ b/packages/app-kit/src/cache/defaults.ts @@ -0,0 +1,11 @@ +import type { CacheConfig } from "shared"; + +/** Default configuration for cache */ +export const cacheDefaults: CacheConfig = { + enabled: true, + ttl: 3600, // 1 hour + maxSize: 1000, // 1000 entries + cacheKey: [], // no cache key by default + cleanupProbability: 0.01, // 1% probability of triggering cleanup on each get operation + strictPersistence: false, // if false, use in-memory storage if lakebase is unavailable +}; diff --git a/packages/app-kit/src/cache/index.ts b/packages/app-kit/src/cache/index.ts index 3016704..a8f0c9f 100644 --- a/packages/app-kit/src/cache/index.ts +++ b/packages/app-kit/src/cache/index.ts @@ -1,60 +1,202 @@ -import type { CacheConfig } from "shared"; -import type { ITelemetry } from "../telemetry"; -import { type Counter, SpanStatusCode } from "../telemetry"; - -export interface CacheEntry { - value: T; - expiry: number; -} +import { createHash } from "node:crypto"; +import { WorkspaceClient } from "@databricks/sdk-experimental"; +import type { CacheConfig, CacheStorage } from "shared"; +import { LakebaseConnector } from "@/connectors"; +import type { Counter, TelemetryProvider } from "../telemetry"; +import { SpanStatusCode, TelemetryManager } from "../telemetry"; +import { deepMerge } from "../utils"; +import { cacheDefaults } from "./defaults"; +import { InMemoryStorage, PersistentStorage } from "./storage"; +/** + * Cache manager class to handle cache operations. + * Can be used with in-memory storage or persistent storage (Lakebase). + * + * The cache is automatically initialized by AppKit. Use `getInstanceSync()` to access + * the singleton instance after initialization. + * + * @example + * ```typescript + * const cache = CacheManager.getInstanceSync(); + * const result = await cache.getOrExecute(["users", userId], () => fetchUser(userId), userKey); + * ``` + */ export class CacheManager { - private static readonly TELEMETRY_INSTRUMENT_CONFIG = { - name: "cache-manager", - includePrefix: true, + private static readonly MIN_CLEANUP_INTERVAL_MS = 60_000; + private readonly name: string = "cache-manager"; + private static instance: CacheManager | null = null; + private static initPromise: Promise | null = null; + + private storage: CacheStorage; + private config: CacheConfig; + private inFlightRequests: Map>; + private cleanupInProgress: boolean; + private lastCleanupAttempt: number; + + // Telemetry + private telemetry: TelemetryProvider; + private telemetryMetrics: { + cacheHitCount: Counter; + cacheMissCount: Counter; }; - private static readonly DEFAULT_ENABLED = true; - private static readonly DEFAULT_TTL = 3600; // 1 hour - private static readonly DEFAULT_MAX_SIZE = 1000; // default max 1000 entries - - private cache = new Map(); - private accessOrder = new Map(); - private accessCounter = 0; - private config: Required; - private inFlightRequests = new Map>(); - private telemetry: ITelemetry; - - // Create metrics once at class level - private cacheHitCounter: Counter; - private cacheMissCounter: Counter; - - constructor(config: CacheConfig = {}, telemetry: ITelemetry) { - this.config = { - enabled: config.enabled ?? CacheManager.DEFAULT_ENABLED, - ttl: config.ttl ?? CacheManager.DEFAULT_TTL, - maxSize: config.maxSize ?? CacheManager.DEFAULT_MAX_SIZE, - cacheKey: config.cacheKey ?? [], - }; - this.telemetry = telemetry; - const meter = this.telemetry.getMeter( - CacheManager.TELEMETRY_INSTRUMENT_CONFIG, + + private constructor(storage: CacheStorage, config: CacheConfig) { + this.storage = storage; + this.config = config; + this.inFlightRequests = new Map(); + this.cleanupInProgress = false; + this.lastCleanupAttempt = 0; + + this.telemetry = TelemetryManager.getProvider( + this.name, + this.config.telemetry, ); - this.cacheHitCounter = meter.createCounter("cache.hit", { - description: "Total number of cache hits", - unit: "1", - }); - this.cacheMissCounter = meter.createCounter("cache.miss", { - description: "Total number of cache misses", - unit: "1", - }); + this.telemetryMetrics = { + cacheHitCount: this.telemetry.getMeter().createCounter("cache.hit", { + description: "Total number of cache hits", + unit: "1", + }), + cacheMissCount: this.telemetry.getMeter().createCounter("cache.miss", { + description: "Total number of cache misses", + unit: "1", + }), + }; } - // Get or execute a function and cache the result + /** + * Get the singleton instance of the cache manager (sync version). + * + * Throws if not initialized - ensure AppKit.create() has completed first. + * @returns CacheManager instance + */ + static getInstanceSync(): CacheManager { + if (!CacheManager.instance) { + throw new Error( + "CacheManager not initialized. Ensure AppKit.create() has completed before accessing the cache.", + ); + } + + return CacheManager.instance; + } + + /** + * Initialize and get the singleton instance of the cache manager. + * Called internally by AppKit - prefer `getInstanceSync()` for plugin access. + * @param userConfig - User configuration for the cache manager + * @returns CacheManager instance + * @internal + */ + static async getInstance( + userConfig?: Partial, + ): Promise { + if (CacheManager.instance) { + return CacheManager.instance; + } + + if (!CacheManager.initPromise) { + CacheManager.initPromise = CacheManager.create(userConfig).then( + (instance) => { + CacheManager.instance = instance; + return instance; + }, + ); + } + + return CacheManager.initPromise; + } + + /** + * Create a new cache manager instance + * + * Storage selection logic: + * 1. If `storage` provided and healthy → use provided storage + * 2. If `storage` provided but unhealthy → fallback to InMemory (or disable if strictPersistence) + * 3. If no `storage` provided and Lakebase available → use Lakebase + * 4. If no `storage` provided and Lakebase unavailable → fallback to InMemory (or disable if strictPersistence) + * + * @param userConfig - User configuration for the cache manager + * @returns CacheManager instance + */ + private static async create( + userConfig?: Partial, + ): Promise { + const config = deepMerge(cacheDefaults, userConfig); + + if (config.storage) { + const isHealthy = await config.storage.healthCheck(); + if (isHealthy) { + return new CacheManager(config.storage, config); + } + + console.warn("[Cache] Provided storage health check failed"); + + if (config.strictPersistence) { + console.warn( + "[Cache] strictPersistence enabled but provided storage unhealthy. Cache disabled.", + ); + const disabledConfig = { ...config, enabled: false }; + return new CacheManager( + new InMemoryStorage(disabledConfig), + disabledConfig, + ); + } + + console.warn("[Cache] Falling back to in-memory cache."); + return new CacheManager(new InMemoryStorage(config), config); + } + + // try to use lakebase storage + try { + const workspaceClient = new WorkspaceClient({}); + const connector = new LakebaseConnector({ workspaceClient }); + const isHealthy = await connector.healthCheck(); + + if (isHealthy) { + const persistentStorage = new PersistentStorage(config, connector); + await persistentStorage.initialize(); + return new CacheManager(persistentStorage, config); + } + + console.warn( + "[Cache] Lakebase health check failed, default storage unhealthy", + ); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + console.warn(`[Cache] Lakebase unavailable: ${errorMessage}`); + } + + if (config.strictPersistence) { + console.warn( + "[Cache] strictPersistence enabled but lakebase unavailable. Cache disabled.", + ); + const disabledConfig = { ...config, enabled: false }; + return new CacheManager( + new InMemoryStorage(disabledConfig), + disabledConfig, + ); + } + + console.warn("[Cache] Falling back to in-memory cache."); + return new CacheManager(new InMemoryStorage(config), config); + } + + /** + * Get or execute a function and cache the result + * @param key - Cache key + * @param fn - Function to execute + * @param userKey - User key + * @param options - Options for the cache + * @returns Promise of the result + */ async getOrExecute( key: (string | number | object)[], fn: () => Promise, userKey: string, options?: { ttl?: number }, ): Promise { + if (!this.config.enabled) return fn(); + const cacheKey = this.generateKey(key, userKey); return this.telemetry.startActiveSpan( @@ -63,20 +205,23 @@ export class CacheManager { attributes: { "cache.key": cacheKey, "cache.enabled": this.config.enabled, + "cache.persistent": this.storage.isPersistent(), }, }, async (span) => { try { - // Check cache first - const cached = this.get(cacheKey); - if (cached) { + // check if the value is in the cache + const cached = await this.storage.get(cacheKey); + if (cached !== null) { span.setAttribute("cache.hit", true); span.setStatus({ code: SpanStatusCode.OK }); - this.cacheHitCounter.add(1, { "cache.key": cacheKey }); - return cached; + this.telemetryMetrics.cacheHitCount.add(1, { + "cache.key": cacheKey, + }); + return cached.value as T; } - // Check in-flight requests for deduplication + // check if the value is being processed by another request const inFlight = this.inFlightRequests.get(cacheKey); if (inFlight) { span.setAttribute("cache.hit", true); @@ -85,25 +230,27 @@ export class CacheManager { "cache.key": cacheKey, }); span.setStatus({ code: SpanStatusCode.OK }); - this.cacheHitCounter.add(1, { + this.telemetryMetrics.cacheHitCount.add(1, { "cache.key": cacheKey, "cache.deduplication": "true", }); span.end(); - return inFlight; + return inFlight as Promise; } - // Cache miss - execute function + // cache miss - execute function span.setAttribute("cache.hit", false); span.addEvent("cache.miss", { "cache.key": cacheKey }); - this.cacheMissCounter.add(1, { "cache.key": cacheKey }); + this.telemetryMetrics.cacheMissCount.add(1, { + "cache.key": cacheKey, + }); const promise = fn() - .then((result) => { - this.set(cacheKey, result, options); + .then(async (result) => { + await this.set(cacheKey, result, options); span.addEvent("cache.value_stored", { "cache.key": cacheKey, - "cache.ttl": options?.ttl ?? this.config.ttl, + "cache.ttl": options?.ttl ?? this.config.ttl ?? 3600, }); return result; }) @@ -129,84 +276,131 @@ export class CacheManager { span.end(); } }, - CacheManager.TELEMETRY_INSTRUMENT_CONFIG, + { name: this.name, includePrefix: true }, ); } - get(key: string): T | null { + /** + * Get a cached value + * @param key - Cache key + * @returns Promise of the value or null if not found or expired + */ + async get(key: string): Promise { if (!this.config.enabled) return null; - const entry = this.cache.get(key); + // probabilistic cleanup trigger + this.maybeCleanup(); + + const entry = await this.storage.get(key); if (!entry) return null; if (Date.now() > entry.expiry) { - this.cache.delete(key); - this.accessOrder.delete(key); + await this.storage.delete(key); return null; } - - // Update access order for LRU - this.accessOrder.set(key, ++this.accessCounter); return entry.value as T; } - set(key: string, value: T, options?: { ttl?: number }): void { - if (!this.config.enabled) return; + /** Probabilistically trigger cleanup of expired entries (fire-and-forget) */ + private maybeCleanup(): void { + if (this.cleanupInProgress) return; + if (!this.storage.isPersistent()) return; + const now = Date.now(); + if (now - this.lastCleanupAttempt < CacheManager.MIN_CLEANUP_INTERVAL_MS) + return; - if (this.cache.size >= this.config.maxSize && !this.cache.has(key)) { - this.evictLRU(); - } + const probability = this.config.cleanupProbability ?? 0.01; - const expiryTime = Date.now() + (options?.ttl ?? this.config.ttl) * 1000; - this.cache.set(key, { value, expiry: expiryTime }); - this.accessOrder.set(key, ++this.accessCounter); + if (Math.random() > probability) return; + + this.lastCleanupAttempt = now; + + this.cleanupInProgress = true; + (this.storage as PersistentStorage) + .cleanupExpired() + .catch((error) => { + console.debug("Error cleaning up expired entries:", error); + }) + .finally(() => { + this.cleanupInProgress = false; + }); + } + + /** + * Set a value in the cache + * @param key - Cache key + * @param value - Value to set + * @param options - Options for the cache + * @returns Promise of the result + */ + async set( + key: string, + value: T, + options?: { ttl?: number }, + ): Promise { + if (!this.config.enabled) return; + + const ttl = options?.ttl ?? this.config.ttl ?? 3600; + const expiryTime = Date.now() + ttl * 1000; + await this.storage.set(key, { value, expiry: expiryTime }); } - delete(key: string): void { - this.cache.delete(key); - this.accessOrder.delete(key); + /** + * Delete a value from the cache + * @param key - Cache key + * @returns Promise of the result + */ + async delete(key: string): Promise { + if (!this.config.enabled) return; + await this.storage.delete(key); } - clear(): void { - this.cache.clear(); - this.accessOrder.clear(); - this.accessCounter = 0; + /** Clear the cache */ + async clear(): Promise { + await this.storage.clear(); this.inFlightRequests.clear(); } - has(key: string): boolean { + /** + * Check if a value exists in the cache + * @param key - Cache key + * @returns Promise of true if the value exists, false otherwise + */ + async has(key: string): Promise { if (!this.config.enabled) return false; - const entry = this.cache.get(key); + + const entry = await this.storage.get(key); if (!entry) return false; if (Date.now() > entry.expiry) { - this.cache.delete(key); - this.accessOrder.delete(key); + await this.storage.delete(key); return false; } return true; } + /** + * Generate a cache key + * @param parts - Parts of the key + * @param userKey - User key + * @returns Cache key + */ generateKey(parts: (string | number | object)[], userKey: string): string { - parts = [userKey, ...parts]; - return parts.map((p) => JSON.stringify(p)).join(":"); + const allParts = [userKey, ...parts]; + const serialized = JSON.stringify(allParts); + return createHash("sha256").update(serialized).digest("hex"); } - // Evict the least recently used entry (LRU) - private evictLRU(): void { - let oldestKey: string | null = null; - let oldestAccess = Infinity; - - for (const [key, accessTime] of this.accessOrder) { - if (accessTime < oldestAccess) { - oldestAccess = accessTime; - oldestKey = key; - } - } + /** Close the cache */ + async close(): Promise { + await this.storage.close(); + } - if (oldestKey) { - this.cache.delete(oldestKey); - this.accessOrder.delete(oldestKey); - } + /** + * Check if the storage is healthy + * @returns Promise of true if the storage is healthy, false otherwise + */ + async isStorageHealthy(): Promise { + return this.storage.healthCheck(); } } diff --git a/packages/app-kit/src/cache/storage/defaults.ts b/packages/app-kit/src/cache/storage/defaults.ts new file mode 100644 index 0000000..2db682a --- /dev/null +++ b/packages/app-kit/src/cache/storage/defaults.ts @@ -0,0 +1,21 @@ +/** Default configuration for in-memory storage */ +export const inMemoryStorageDefaults = { + /** Maximum number of entries in the cache */ + maxSize: 1000, +}; + +/** Default configuration for Lakebase storage */ +export const lakebaseStorageDefaults = { + /** Table name for the cache */ + tableName: "appkit_cache_entries", + /** Maximum number of bytes in the cache */ + maxBytes: 256 * 1024 * 1024, // 256MB + /** Maximum number of bytes per entry in the cache */ + maxEntryBytes: 10 * 1024 * 1024, // 10MB + /** Maximum number of entries in the cache */ + maxSize: 1000, + /** Number of entries to evict when cache is full */ + evictionBatchSize: 100, + /** Probability (0-1) of checking total bytes on each write operation */ + evictionCheckProbability: 0.1, +}; diff --git a/packages/app-kit/src/cache/storage/index.ts b/packages/app-kit/src/cache/storage/index.ts new file mode 100644 index 0000000..9d213c7 --- /dev/null +++ b/packages/app-kit/src/cache/storage/index.ts @@ -0,0 +1,2 @@ +export { InMemoryStorage } from "./memory"; +export { PersistentStorage } from "./persistent"; diff --git a/packages/app-kit/src/cache/storage/memory.ts b/packages/app-kit/src/cache/storage/memory.ts new file mode 100644 index 0000000..23f36e0 --- /dev/null +++ b/packages/app-kit/src/cache/storage/memory.ts @@ -0,0 +1,106 @@ +import type { CacheConfig, CacheEntry, CacheStorage } from "shared"; +import { inMemoryStorageDefaults } from "./defaults"; + +/** + * In-memory cache storage implementation. Uses a least recently used (LRU) eviction policy + * to manage memory usage and ensure efficient cache operations. + */ +export class InMemoryStorage implements CacheStorage { + private cache: Map = new Map(); + private accessOrder: Map = new Map(); + private accessCounter: number; + private maxSize: number; + + constructor(config: CacheConfig) { + this.cache = new Map(); + this.accessOrder = new Map(); + this.maxSize = config.maxSize ?? inMemoryStorageDefaults.maxSize; + this.accessCounter = 0; + } + + /** Get an entry from the cache */ + async get(key: string): Promise | null> { + const entry = this.cache.get(key); + if (!entry) return null; + + this.accessOrder.set(key, ++this.accessCounter); + return entry as CacheEntry; + } + + /** Set an entry in the cache */ + async set(key: string, entry: CacheEntry): Promise { + if (this.cache.size >= this.maxSize && !this.cache.has(key)) { + this.evictLRU(); + } + + this.cache.set(key, entry); + this.accessOrder.set(key, ++this.accessCounter); + } + + /** Delete an entry from the cache */ + async delete(key: string): Promise { + this.cache.delete(key); + this.accessOrder.delete(key); + } + + /** Clean in-memory cache */ + async clear(): Promise { + this.cache.clear(); + this.accessOrder.clear(); + this.accessCounter = 0; + } + + /** Check if the cache has an entry */ + async has(key: string): Promise { + const entry = this.cache.get(key); + if (!entry) return false; + + if (Date.now() > entry.expiry) { + this.cache.delete(key); + this.accessOrder.delete(key); + return false; + } + + return true; + } + + /** Get the size of the cache */ + async size(): Promise { + return this.cache.size; + } + + /** Check if the cache is persistent */ + isPersistent(): boolean { + return false; + } + + /** Check the health of the cache */ + async healthCheck(): Promise { + return true; + } + + /** Close the cache */ + async close(): Promise { + this.cache.clear(); + this.accessOrder.clear(); + this.accessCounter = 0; + } + + /** Evict the least recently used entry (LRU) */ + private evictLRU(): void { + let oldestKey: string | null = null; + let oldestAccess = Infinity; + + for (const [key, accessTime] of this.accessOrder) { + if (accessTime < oldestAccess) { + oldestAccess = accessTime; + oldestKey = key; + } + } + + if (oldestKey) { + this.cache.delete(oldestKey); + this.accessOrder.delete(oldestKey); + } + } +} diff --git a/packages/app-kit/src/cache/storage/persistent.ts b/packages/app-kit/src/cache/storage/persistent.ts new file mode 100644 index 0000000..9309a8b --- /dev/null +++ b/packages/app-kit/src/cache/storage/persistent.ts @@ -0,0 +1,312 @@ +import { createHash } from "node:crypto"; +import type { CacheConfig, CacheEntry, CacheStorage } from "shared"; +import type { LakebaseConnector } from "../../connectors"; +import { lakebaseStorageDefaults } from "./defaults"; + +/** + * Persistent cache storage implementation. Uses a least recently used (LRU) eviction policy + * to manage memory usage and ensure efficient cache operations. + * + * @example + * const persistentStorage = new PersistentStorage(config, connector); + * await persistentStorage.initialize(); + * await persistentStorage.get("my-key"); + * await persistentStorage.set("my-key", "my-value"); + * await persistentStorage.delete("my-key"); + * await persistentStorage.clear(); + * await persistentStorage.has("my-key"); + * + */ +export class PersistentStorage implements CacheStorage { + private readonly connector: LakebaseConnector; + private readonly tableName: string; + private readonly maxBytes: number; + private readonly maxEntryBytes: number; + private readonly evictionBatchSize: number; + private readonly evictionCheckProbability: number; + private initialized: boolean; + + constructor(config: CacheConfig, connector: LakebaseConnector) { + this.connector = connector; + this.maxBytes = config.maxBytes ?? lakebaseStorageDefaults.maxBytes; + this.maxEntryBytes = + config.maxEntryBytes ?? lakebaseStorageDefaults.maxEntryBytes; + this.evictionBatchSize = lakebaseStorageDefaults.evictionBatchSize; + this.evictionCheckProbability = + config.evictionCheckProbability ?? + lakebaseStorageDefaults.evictionCheckProbability; + this.tableName = lakebaseStorageDefaults.tableName; // hardcoded, safe for now + this.initialized = false; + } + + /** Initialize the persistent storage and run migrations if necessary */ + async initialize(): Promise { + if (this.initialized) return; + + try { + await this.runMigrations(); + this.initialized = true; + } catch (error) { + console.error("Error in persistent storage initialization:", error); + throw error; + } + } + + /** + * Get a cached value from the persistent storage + * @param key - Cache key + * @returns Promise of the cached value or null if not found + */ + async get(key: string): Promise | null> { + await this.ensureInitialized(); + + const keyHash = this.hashKey(key); + + const result = await this.connector.query<{ + value: Buffer; + expiry: string; + }>(`SELECT value, expiry FROM ${this.tableName} WHERE key_hash = $1`, [ + keyHash, + ]); + + if (result.rows.length === 0) return null; + + const entry = result.rows[0]; + + // fire-and-forget update + this.connector + .query( + `UPDATE ${this.tableName} SET last_accessed = NOW() WHERE key_hash = $1`, + [keyHash], + ) + .catch(() => { + console.debug("Error updating last_accessed time for key:", key); + }); + + return { + value: this.deserializeValue(entry.value), + expiry: Number(entry.expiry), + }; + } + + /** + * Set a value in the persistent storage + * @param key - Cache key + * @param entry - Cache entry + * @returns Promise of the result + */ + async set(key: string, entry: CacheEntry): Promise { + await this.ensureInitialized(); + + const keyHash = this.hashKey(key); + const keyBytes = Buffer.from(key, "utf-8"); + const valueBytes = this.serializeValue(entry.value); + const byteSize = keyBytes.length + valueBytes.length; + + if (byteSize > this.maxEntryBytes) { + throw new Error( + `Cache entry too large: ${byteSize} bytes exceeds maximum of ${this.maxEntryBytes} bytes`, + ); + } + + // probabilistic eviction check + if (Math.random() < this.evictionCheckProbability) { + const totalBytes = await this.totalBytes(); + if (totalBytes + byteSize > this.maxBytes) { + await this.evictBySize(byteSize); + } + } + + await this.connector.query( + `INSERT INTO ${this.tableName} (key_hash, key, value, byte_size, expiry, created_at, last_accessed) + VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) + ON CONFLICT (key_hash) + DO UPDATE SET value = $3, byte_size = $4, expiry = $5, last_accessed = NOW() + `, + [keyHash, keyBytes, valueBytes, byteSize, entry.expiry], + ); + } + + /** + * Delete a value from the persistent storage + * @param key - Cache key + * @returns Promise of the result + */ + async delete(key: string): Promise { + await this.ensureInitialized(); + const keyHash = this.hashKey(key); + await this.connector.query( + `DELETE FROM ${this.tableName} WHERE key_hash = $1`, + [keyHash], + ); + } + + /** Clear the persistent storage */ + async clear(): Promise { + await this.ensureInitialized(); + await this.connector.query(`TRUNCATE TABLE ${this.tableName}`); + } + + /** + * Check if a value exists in the persistent storage + * @param key - Cache key + * @returns Promise of true if the value exists, false otherwise + */ + async has(key: string): Promise { + await this.ensureInitialized(); + const keyHash = this.hashKey(key); + + const result = await this.connector.query<{ exists: boolean }>( + `SELECT EXISTS(SELECT 1 FROM ${this.tableName} WHERE key_hash = $1) as exists`, + [keyHash], + ); + + return result.rows[0]?.exists ?? false; + } + + /** + * Get the size of the persistent storage + * @returns Promise of the size of the storage + */ + async size(): Promise { + await this.ensureInitialized(); + + const result = await this.connector.query<{ count: string }>( + `SELECT COUNT(*) as count FROM ${this.tableName}`, + ); + return parseInt(result.rows[0]?.count ?? "0", 10); + } + + /** Get the total number of bytes in the persistent storage */ + async totalBytes(): Promise { + await this.ensureInitialized(); + + const result = await this.connector.query<{ total: string }>( + `SELECT COALESCE(SUM(byte_size), 0) as total FROM ${this.tableName}`, + ); + return parseInt(result.rows[0]?.total ?? "0", 10); + } + + /** + * Check if the persistent storage is persistent + * @returns true if the storage is persistent, false otherwise + */ + isPersistent(): boolean { + return true; + } + + /** + * Check if the persistent storage is healthy + * @returns Promise of true if the storage is healthy, false otherwise + */ + async healthCheck(): Promise { + try { + return await this.connector.healthCheck(); + } catch { + return false; + } + } + + /** Close the persistent storage */ + async close(): Promise { + await this.connector.close(); + } + + /** + * Cleanup expired entries from the persistent storage + * @returns Promise of the number of expired entries + */ + async cleanupExpired(): Promise { + await this.ensureInitialized(); + const result = await this.connector.query<{ count: string }>( + `WITH deleted as (DELETE FROM ${this.tableName} WHERE expiry < $1 RETURNING *) SELECT COUNT(*) as count FROM deleted`, + [Date.now()], + ); + return parseInt(result.rows[0]?.count ?? "0", 10); + } + + /** Evict entries from the persistent storage by size */ + private async evictBySize(requiredBytes: number): Promise { + const freedByExpiry = await this.cleanupExpired(); + if (freedByExpiry > 0) { + const currentBytes = await this.totalBytes(); + if (currentBytes + requiredBytes <= this.maxBytes) { + return; + } + } + + await this.connector.query( + `DELETE FROM ${this.tableName} WHERE key_hash IN + (SELECT key_hash FROM ${this.tableName} ORDER BY last_accessed ASC LIMIT $1)`, + [this.evictionBatchSize], + ); + } + + /** Ensure the persistent storage is initialized */ + private async ensureInitialized(): Promise { + if (!this.initialized) { + await this.initialize(); + } + } + + /** Generate a 64-bit hash for the cache key using SHA256 */ + private hashKey(key: string): bigint { + if (!key) throw new Error("Cache key cannot be empty"); + const hash = createHash("sha256").update(key).digest(); + return hash.readBigInt64BE(0); + } + + /** Serialize a value to a buffer */ + private serializeValue(value: T): Buffer { + return Buffer.from(JSON.stringify(value), "utf-8"); + } + + /** Deserialize a value from a buffer */ + private deserializeValue(buffer: Buffer): T { + return JSON.parse(buffer.toString("utf-8")) as T; + } + + /** Run migrations for the persistent storage */ + private async runMigrations(): Promise { + try { + await this.connector.query(` + CREATE TABLE IF NOT EXISTS ${this.tableName} ( + id BIGSERIAL PRIMARY KEY, + key_hash BIGINT NOT NULL, + key BYTEA NOT NULL, + value BYTEA NOT NULL, + byte_size INTEGER NOT NULL, + expiry BIGINT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + last_accessed TIMESTAMP NOT NULL DEFAULT NOW() + ) + `); + + // unique index on key_hash for fast lookups + await this.connector.query( + `CREATE UNIQUE INDEX IF NOT EXISTS idx_${this.tableName}_key_hash ON ${this.tableName} (key_hash);`, + ); + + // index on expiry for cleanup queries + await this.connector.query( + `CREATE INDEX IF NOT EXISTS idx_${this.tableName}_expiry ON ${this.tableName} (expiry); `, + ); + + // index on last_accessed for LRU eviction + await this.connector.query( + `CREATE INDEX IF NOT EXISTS idx_${this.tableName}_last_accessed ON ${this.tableName} (last_accessed); `, + ); + + // index on byte_size for monitoring + await this.connector.query( + `CREATE INDEX IF NOT EXISTS idx_${this.tableName}_byte_size ON ${this.tableName} (byte_size); `, + ); + } catch (error) { + console.error( + "Error in running migrations for persistent storage:", + error, + ); + throw error; + } + } +} diff --git a/packages/app-kit/src/cache/tests/cache-manager.test.ts b/packages/app-kit/src/cache/tests/cache-manager.test.ts new file mode 100644 index 0000000..7fc45c5 --- /dev/null +++ b/packages/app-kit/src/cache/tests/cache-manager.test.ts @@ -0,0 +1,760 @@ +import type { CacheStorage } from "shared"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { CacheManager } from "../../index"; + +// Mock LakebaseConnector +const mockLakebaseHealthCheck = vi.fn(); +vi.mock("@/connectors", () => ({ + LakebaseConnector: vi.fn().mockImplementation(() => ({ + healthCheck: mockLakebaseHealthCheck, + close: vi.fn().mockResolvedValue(undefined), + })), +})); + +// Mock PersistentStorage +vi.mock("../storage/persistent", () => ({ + PersistentStorage: vi.fn().mockImplementation(() => { + const cache = new Map(); + return { + initialize: vi.fn().mockResolvedValue(undefined), + get: vi + .fn() + .mockImplementation(async (key: string) => cache.get(key) || null), + set: vi + .fn() + .mockImplementation(async (key: string, entry: any) => + cache.set(key, entry), + ), + delete: vi + .fn() + .mockImplementation(async (key: string) => cache.delete(key)), + clear: vi.fn().mockImplementation(async () => cache.clear()), + has: vi.fn().mockImplementation(async (key: string) => cache.has(key)), + size: vi.fn().mockImplementation(async () => cache.size), + isPersistent: vi.fn().mockReturnValue(true), + healthCheck: vi.fn().mockResolvedValue(true), + close: vi.fn().mockResolvedValue(undefined), + cleanupExpired: vi.fn().mockResolvedValue(0), + }; + }), +})); + +// Mock WorkspaceClient +vi.mock("@databricks/sdk-experimental", () => ({ + WorkspaceClient: vi.fn().mockImplementation(() => ({})), +})); + +/** Create a mock storage for testing */ +function createMockStorage(persistent = false): CacheStorage { + const cache = new Map(); + + return { + get: vi.fn().mockImplementation(async (key: string) => { + return cache.get(key) || null; + }), + set: vi.fn().mockImplementation(async (key: string, entry: any) => { + cache.set(key, entry); + }), + delete: vi.fn().mockImplementation(async (key: string) => { + cache.delete(key); + }), + clear: vi.fn().mockImplementation(async () => { + cache.clear(); + }), + has: vi.fn().mockImplementation(async (key: string) => { + return cache.has(key); + }), + size: vi.fn().mockImplementation(async () => { + return cache.size; + }), + isPersistent: vi.fn().mockReturnValue(persistent), + healthCheck: vi.fn().mockResolvedValue(true), + close: vi.fn().mockResolvedValue(undefined), + }; +} + +/** Create a mock storage with healthCheck returning false */ +function createUnhealthyMockStorage(): CacheStorage { + const storage = createMockStorage(); + storage.healthCheck = vi.fn().mockResolvedValue(false); + return storage; +} + +describe("CacheManager", () => { + // Reset singleton between tests + beforeEach(() => { + // Access private static fields to reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + // Default: Lakebase unavailable (most tests pass explicit storage) + mockLakebaseHealthCheck.mockResolvedValue(false); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe("singleton pattern", () => { + test("getInstanceSync should throw when not initialized", () => { + expect(() => CacheManager.getInstanceSync()).toThrow( + "CacheManager not initialized", + ); + }); + + test("getInstance should create singleton", async () => { + const instance1 = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + const instance2 = await CacheManager.getInstance(); + + expect(instance1).toBe(instance2); + }); + + test("getInstanceSync should return instance after initialization", async () => { + await CacheManager.getInstance({ storage: createMockStorage() }); + + const instance = CacheManager.getInstanceSync(); + + expect(instance).toBeInstanceOf(CacheManager); + }); + }); + + describe("generateKey", () => { + test("should generate consistent hash for same inputs", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const key1 = cache.generateKey(["users", 123], "user1"); + const key2 = cache.generateKey(["users", 123], "user1"); + + expect(key1).toBe(key2); + }); + + test("should generate different hash for different inputs", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const key1 = cache.generateKey(["users", 123], "user1"); + const key2 = cache.generateKey(["users", 456], "user1"); + const key3 = cache.generateKey(["users", 123], "user2"); + + expect(key1).not.toBe(key2); + expect(key1).not.toBe(key3); + expect(key2).not.toBe(key3); + }); + + test("should handle objects in key parts", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const key1 = cache.generateKey([{ filter: "active" }], "user1"); + const key2 = cache.generateKey([{ filter: "active" }], "user1"); + const key3 = cache.generateKey([{ filter: "inactive" }], "user1"); + + expect(key1).toBe(key2); + expect(key1).not.toBe(key3); + }); + }); + + describe("get/set operations", () => { + test("should return null for non-existent key", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const result = await cache.get("non-existent"); + + expect(result).toBeNull(); + }); + + test("should set and get value", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + await cache.set("test-key", { data: "test-value" }); + const result = await cache.get("test-key"); + + expect(result).toEqual({ data: "test-value" }); + }); + + test("should respect TTL expiry", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + // Set with very short TTL + await cache.set("test-key", "value", { ttl: 0.001 }); // 1ms + + // Wait for expiry + await new Promise((resolve) => setTimeout(resolve, 10)); + + const result = await cache.get("test-key"); + + expect(result).toBeNull(); + }); + }); + + describe("delete operation", () => { + test("should delete existing key", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + await cache.set("test-key", "value"); + await cache.delete("test-key"); + + const result = await cache.get("test-key"); + expect(result).toBeNull(); + }); + }); + + describe("has operation", () => { + test("should return true for existing key", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + await cache.set("test-key", "value"); + + const exists = await cache.has("test-key"); + expect(exists).toBe(true); + }); + + test("should return false for non-existent key", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const exists = await cache.has("non-existent"); + expect(exists).toBe(false); + }); + + test("should return false for expired key", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + await cache.set("test-key", "value", { ttl: 0.001 }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const exists = await cache.has("test-key"); + expect(exists).toBe(false); + }); + }); + + describe("clear operation", () => { + test("should clear all entries", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + await cache.set("key1", "value1"); + await cache.set("key2", "value2"); + + await cache.clear(); + + expect(await cache.get("key1")).toBeNull(); + expect(await cache.get("key2")).toBeNull(); + }); + }); + + describe("getOrExecute", () => { + test("should execute function on cache miss", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + const fn = vi.fn().mockResolvedValue("result"); + + const result = await cache.getOrExecute(["key"], fn, "user1"); + + expect(result).toBe("result"); + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("should return cached value on cache hit", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + const fn = vi.fn().mockResolvedValue("new-result"); + + // First call - populates cache + await cache.getOrExecute(["key"], async () => "cached-result", "user1"); + + // Second call - should use cache + const result = await cache.getOrExecute(["key"], fn, "user1"); + + expect(result).toBe("cached-result"); + expect(fn).not.toHaveBeenCalled(); + }); + + test("should deduplicate concurrent requests", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + let callCount = 0; + const fn = vi.fn().mockImplementation(async () => { + callCount++; + await new Promise((resolve) => setTimeout(resolve, 50)); + return `result-${callCount}`; + }); + + // Fire multiple concurrent requests + const promises = [ + cache.getOrExecute(["key"], fn, "user1"), + cache.getOrExecute(["key"], fn, "user1"), + cache.getOrExecute(["key"], fn, "user1"), + ]; + + const results = await Promise.all(promises); + + // All should return same result + expect(results[0]).toBe(results[1]); + expect(results[1]).toBe(results[2]); + // Function should only be called once + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("should use different cache keys for different users", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + await cache.getOrExecute(["key"], async () => "user1-data", "user1"); + await cache.getOrExecute(["key"], async () => "user2-data", "user2"); + + const result1 = await cache.getOrExecute( + ["key"], + async () => "should-not-be-called", + "user1", + ); + const result2 = await cache.getOrExecute( + ["key"], + async () => "should-not-be-called", + "user2", + ); + + expect(result1).toBe("user1-data"); + expect(result2).toBe("user2-data"); + }); + }); + + describe("disabled cache", () => { + test("should bypass cache when disabled", async () => { + const cache = await CacheManager.getInstance({ + enabled: false, + storage: createMockStorage(), + }); + const fn = vi.fn().mockResolvedValue("result"); + + const result1 = await cache.getOrExecute(["key"], fn, "user1"); + const result2 = await cache.getOrExecute(["key"], fn, "user1"); + + expect(result1).toBe("result"); + expect(result2).toBe("result"); + expect(fn).toHaveBeenCalledTimes(2); // Called twice because cache is disabled + }); + + test("should return null for get when disabled", async () => { + const cache = await CacheManager.getInstance({ + enabled: false, + storage: createMockStorage(), + }); + + await cache.set("test-key", "value"); + const result = await cache.get("test-key"); + + expect(result).toBeNull(); + }); + + test("should return false for has when disabled", async () => { + const cache = await CacheManager.getInstance({ + enabled: false, + storage: createMockStorage(), + }); + + await cache.set("test-key", "value"); + const exists = await cache.has("test-key"); + + expect(exists).toBe(false); + }); + }); + + describe("storage health", () => { + test("should check storage health", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const isHealthy = await cache.isStorageHealthy(); + + expect(isHealthy).toBe(true); + }); + }); + + describe("close", () => { + test("should close storage", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + await expect(cache.close()).resolves.not.toThrow(); + }); + }); + + describe("maybeCleanup", () => { + test("should not trigger cleanup for non-persistent storage", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(false), + cleanupProbability: 1, // 100% probability + }); + + // Access private method via reflection + const maybeCleanup = (cache as any).maybeCleanup.bind(cache); + const storage = (cache as any).storage; + + maybeCleanup(); + + // cleanupExpired should not exist on in-memory storage + expect(storage.isPersistent()).toBe(false); + }); + + test("should respect MIN_CLEANUP_INTERVAL_MS", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + cleanupProbability: 1, + }); + + // Simulate persistent storage + const mockStorage = { + isPersistent: vi.fn().mockReturnValue(true), + cleanupExpired: vi.fn().mockResolvedValue(5), + }; + (cache as any).storage = mockStorage; + (cache as any).lastCleanupAttempt = Date.now(); // Just cleaned up + + const maybeCleanup = (cache as any).maybeCleanup.bind(cache); + maybeCleanup(); + + // Should not trigger cleanup due to interval + expect(mockStorage.cleanupExpired).not.toHaveBeenCalled(); + }); + + test("should trigger cleanup when probability allows and interval passed", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + cleanupProbability: 1, // 100% probability + }); + + // Simulate persistent storage + const mockStorage = { + isPersistent: vi.fn().mockReturnValue(true), + cleanupExpired: vi.fn().mockResolvedValue(5), + }; + (cache as any).storage = mockStorage; + (cache as any).lastCleanupAttempt = 0; // Long time ago + + const maybeCleanup = (cache as any).maybeCleanup.bind(cache); + maybeCleanup(); + + // Should trigger cleanup + expect(mockStorage.cleanupExpired).toHaveBeenCalled(); + }); + + test("should not trigger cleanup when already in progress", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + cleanupProbability: 1, + }); + + const mockStorage = { + isPersistent: vi.fn().mockReturnValue(true), + cleanupExpired: vi.fn().mockResolvedValue(5), + }; + (cache as any).storage = mockStorage; + (cache as any).lastCleanupAttempt = 0; + (cache as any).cleanupInProgress = true; // Already running + + const maybeCleanup = (cache as any).maybeCleanup.bind(cache); + maybeCleanup(); + + expect(mockStorage.cleanupExpired).not.toHaveBeenCalled(); + }); + + test("should handle cleanup errors gracefully", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + cleanupProbability: 1, + }); + + const mockStorage = { + isPersistent: vi.fn().mockReturnValue(true), + cleanupExpired: vi.fn().mockRejectedValue(new Error("Cleanup failed")), + }; + (cache as any).storage = mockStorage; + (cache as any).lastCleanupAttempt = 0; + + const maybeCleanup = (cache as any).maybeCleanup.bind(cache); + + // Should not throw + expect(() => maybeCleanup()).not.toThrow(); + + // Wait for async cleanup to complete + await new Promise((resolve) => setTimeout(resolve, 10)); + + // cleanupInProgress should be reset + expect((cache as any).cleanupInProgress).toBe(false); + }); + }); + + describe("getOrExecute error handling", () => { + test("should propagate errors from executed function", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + const error = new Error("Execution failed"); + const fn = vi.fn().mockRejectedValue(error); + + await expect(cache.getOrExecute(["key"], fn, "user1")).rejects.toThrow( + "Execution failed", + ); + }); + + test("should remove in-flight request on error", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + const error = new Error("Execution failed"); + const fn = vi.fn().mockRejectedValue(error); + + try { + await cache.getOrExecute(["key"], fn, "user1"); + } catch { + // Expected + } + + // Verify in-flight request was cleaned up + const cacheKey = cache.generateKey(["key"], "user1"); + expect((cache as any).inFlightRequests.has(cacheKey)).toBe(false); + }); + + test("should allow retry after error", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + const fn = vi + .fn() + .mockRejectedValueOnce(new Error("First attempt failed")) + .mockResolvedValueOnce("success"); + + // First call fails + await expect(cache.getOrExecute(["key"], fn, "user1")).rejects.toThrow(); + + // Second call succeeds + const result = await cache.getOrExecute(["key"], fn, "user1"); + expect(result).toBe("success"); + }); + }); + + describe("strictPersistence mode", () => { + test("should disable cache when strictPersistence is true and storage unhealthy", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + // Pass an unhealthy storage with strictPersistence: true + const cache = await CacheManager.getInstance({ + storage: createUnhealthyMockStorage(), + strictPersistence: true, + }); + + // Should have logged about strictPersistence + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("strictPersistence"), + ); + + // Cache should be disabled + const fn = vi.fn().mockResolvedValue("result"); + await cache.getOrExecute(["key"], fn, "user1"); + await cache.getOrExecute(["key"], fn, "user1"); + + // Function called twice because cache is disabled + expect(fn).toHaveBeenCalledTimes(2); + + consoleSpy.mockRestore(); + }); + }); + + describe("storage fallback", () => { + test("should fallback to in-memory when provided storage is unhealthy", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + // Pass an unhealthy storage, should fallback to in-memory + const cache = await CacheManager.getInstance({ + storage: createUnhealthyMockStorage(), + strictPersistence: false, + }); + + // Should log fallback message + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("[Cache]"), + ); + + // Cache should still work (in-memory fallback) + await cache.set("test-key", "value"); + const result = await cache.get("test-key"); + expect(result).toBe("value"); + + consoleSpy.mockRestore(); + }); + + test("should log warning when provided storage health check fails", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + await CacheManager.getInstance({ + storage: createUnhealthyMockStorage(), + strictPersistence: false, + }); + + // Should have logged about storage health check failing + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("[Cache] Provided storage health check failed"), + ); + + consoleSpy.mockRestore(); + }); + }); + + describe("lakebase default storage", () => { + test("should use Lakebase when no storage provided and Lakebase is available", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + // Make Lakebase healthy + mockLakebaseHealthCheck.mockResolvedValue(true); + + const cache = await CacheManager.getInstance({}); + + // Storage should be persistent (Lakebase) + const storage = (cache as any).storage; + expect(storage.isPersistent()).toBe(true); + }); + + test("should fallback to in-memory when Lakebase is unavailable", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + // Lakebase unhealthy (default in beforeEach) + mockLakebaseHealthCheck.mockResolvedValue(false); + + const cache = await CacheManager.getInstance({}); + + // Should log fallback message + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("[Cache] Falling back to in-memory cache"), + ); + + // Cache should work (in-memory fallback) + await cache.set("test-key", "value"); + const result = await cache.get("test-key"); + expect(result).toBe("value"); + + // Storage should not be persistent + const storage = (cache as any).storage; + expect(storage.isPersistent()).toBe(false); + + consoleSpy.mockRestore(); + }); + + test("should disable cache when Lakebase unavailable and strictPersistence is true", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + // Lakebase unhealthy + mockLakebaseHealthCheck.mockResolvedValue(false); + + const cache = await CacheManager.getInstance({ + strictPersistence: true, + }); + + // Should have logged about strictPersistence + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining( + "strictPersistence enabled but lakebase unavailable", + ), + ); + + // Cache should be disabled + const fn = vi.fn().mockResolvedValue("result"); + await cache.getOrExecute(["key"], fn, "user1"); + await cache.getOrExecute(["key"], fn, "user1"); + + // Function called twice because cache is disabled + expect(fn).toHaveBeenCalledTimes(2); + + consoleSpy.mockRestore(); + }); + + test("should log warning when Lakebase health check fails", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + // Lakebase unhealthy + mockLakebaseHealthCheck.mockResolvedValue(false); + + await CacheManager.getInstance({}); + + // Should have logged about Lakebase health check + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("[Cache] Lakebase health check failed"), + ); + + consoleSpy.mockRestore(); + }); + + test("should log warning when Lakebase throws an error", async () => { + // Reset singleton + (CacheManager as any).instance = null; + (CacheManager as any).initPromise = null; + + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + // Lakebase throws + mockLakebaseHealthCheck.mockRejectedValue( + new Error("Connection refused"), + ); + + await CacheManager.getInstance({}); + + // Should have logged about Lakebase being unavailable + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("[Cache] Lakebase unavailable"), + ); + + consoleSpy.mockRestore(); + }); + }); +}); diff --git a/packages/app-kit/src/cache/tests/memory.test.ts b/packages/app-kit/src/cache/tests/memory.test.ts new file mode 100644 index 0000000..5f4f0f2 --- /dev/null +++ b/packages/app-kit/src/cache/tests/memory.test.ts @@ -0,0 +1,233 @@ +import { beforeEach, describe, expect, test } from "vitest"; +import { InMemoryStorage } from "../storage"; + +describe("InMemoryStorage", () => { + let storage: InMemoryStorage; + + beforeEach(() => { + storage = new InMemoryStorage({ maxSize: 5 }); + }); + + describe("basic operations", () => { + test("should set and get a value", async () => { + const entry = { value: "test-value", expiry: Date.now() + 10000 }; + await storage.set("key1", entry); + + const result = await storage.get("key1"); + + expect(result).toEqual(entry); + }); + + test("should return null for non-existent key", async () => { + const result = await storage.get("non-existent"); + + expect(result).toBeNull(); + }); + + test("should delete a value", async () => { + const entry = { value: "test-value", expiry: Date.now() + 10000 }; + await storage.set("key1", entry); + + await storage.delete("key1"); + + const result = await storage.get("key1"); + expect(result).toBeNull(); + }); + + test("should check if key exists", async () => { + const entry = { value: "test-value", expiry: Date.now() + 10000 }; + await storage.set("key1", entry); + + expect(await storage.has("key1")).toBe(true); + expect(await storage.has("non-existent")).toBe(false); + }); + + test("should return correct size", async () => { + expect(await storage.size()).toBe(0); + + await storage.set("key1", { value: "v1", expiry: Date.now() + 10000 }); + expect(await storage.size()).toBe(1); + + await storage.set("key2", { value: "v2", expiry: Date.now() + 10000 }); + expect(await storage.size()).toBe(2); + }); + + test("should clear all entries", async () => { + await storage.set("key1", { value: "v1", expiry: Date.now() + 10000 }); + await storage.set("key2", { value: "v2", expiry: Date.now() + 10000 }); + + await storage.clear(); + + expect(await storage.size()).toBe(0); + expect(await storage.get("key1")).toBeNull(); + expect(await storage.get("key2")).toBeNull(); + }); + }); + + describe("expiry handling", () => { + test("should return entry if not expired", async () => { + const entry = { value: "test-value", expiry: Date.now() + 10000 }; + await storage.set("key1", entry); + + const result = await storage.get("key1"); + + expect(result).toEqual(entry); + }); + + test("should return expired entry on get (expiry check is done by CacheManager)", async () => { + const entry = { value: "test-value", expiry: Date.now() - 1000 }; + await storage.set("key1", entry); + + // InMemoryStorage.get() returns the entry even if expired + // The expiry check is done by CacheManager + const result = await storage.get("key1"); + expect(result).toEqual(entry); + }); + + test("should return false for expired key on has()", async () => { + const entry = { value: "test-value", expiry: Date.now() - 1000 }; + await storage.set("key1", entry); + + const exists = await storage.has("key1"); + + expect(exists).toBe(false); + }); + + test("should delete expired entry when checking has()", async () => { + const entry = { value: "test-value", expiry: Date.now() - 1000 }; + await storage.set("key1", entry); + + await storage.has("key1"); + + // Entry should be deleted after has() check + expect(await storage.size()).toBe(0); + }); + }); + + describe("LRU eviction", () => { + test("should evict least recently used entry when full", async () => { + // Fill storage to capacity (maxSize = 5) + for (let i = 1; i <= 5; i++) { + await storage.set(`key${i}`, { + value: `value${i}`, + expiry: Date.now() + 10000, + }); + } + + expect(await storage.size()).toBe(5); + + // Add one more entry, should evict key1 (least recently used) + await storage.set("key6", { + value: "value6", + expiry: Date.now() + 10000, + }); + + expect(await storage.size()).toBe(5); + expect(await storage.get("key1")).toBeNull(); // evicted + expect(await storage.get("key6")).not.toBeNull(); // new entry exists + }); + + test("should update access order on get", async () => { + // Fill storage + for (let i = 1; i <= 5; i++) { + await storage.set(`key${i}`, { + value: `value${i}`, + expiry: Date.now() + 10000, + }); + } + + // Access key1 to make it recently used + await storage.get("key1"); + + // Add new entry, should evict key2 (now least recently used) + await storage.set("key6", { + value: "value6", + expiry: Date.now() + 10000, + }); + + expect(await storage.get("key1")).not.toBeNull(); // still exists (was accessed) + expect(await storage.get("key2")).toBeNull(); // evicted + }); + + test("should update access order on set (existing key)", async () => { + // Fill storage + for (let i = 1; i <= 5; i++) { + await storage.set(`key${i}`, { + value: `value${i}`, + expiry: Date.now() + 10000, + }); + } + + // Update key1 to make it recently used + await storage.set("key1", { + value: "updated-value1", + expiry: Date.now() + 10000, + }); + + // Add new entry, should evict key2 (now least recently used) + await storage.set("key6", { + value: "value6", + expiry: Date.now() + 10000, + }); + + expect(await storage.get("key1")).not.toBeNull(); // still exists (was updated) + expect(await storage.get("key2")).toBeNull(); // evicted + }); + + test("should not evict when updating existing key", async () => { + // Fill storage + for (let i = 1; i <= 5; i++) { + await storage.set(`key${i}`, { + value: `value${i}`, + expiry: Date.now() + 10000, + }); + } + + // Update existing key should not trigger eviction + await storage.set("key3", { + value: "updated-value3", + expiry: Date.now() + 10000, + }); + + expect(await storage.size()).toBe(5); + // All keys should still exist + for (let i = 1; i <= 5; i++) { + expect(await storage.get(`key${i}`)).not.toBeNull(); + } + }); + }); + + describe("storage properties", () => { + test("should report as non-persistent", () => { + expect(storage.isPersistent()).toBe(false); + }); + + test("should always return true for healthCheck", async () => { + expect(await storage.healthCheck()).toBe(true); + }); + + test("should clear storage on close", async () => { + await storage.set("key1", { value: "v1", expiry: Date.now() + 10000 }); + + await storage.close(); + + expect(await storage.size()).toBe(0); + }); + }); + + describe("default maxSize", () => { + test("should use default maxSize when not provided", async () => { + const defaultStorage = new InMemoryStorage({}); + + // Default is 1000, just verify it accepts many entries + for (let i = 1; i <= 100; i++) { + await defaultStorage.set(`key${i}`, { + value: `value${i}`, + expiry: Date.now() + 10000, + }); + } + + expect(await defaultStorage.size()).toBe(100); + }); + }); +}); diff --git a/packages/app-kit/src/cache/tests/persistent.test.ts b/packages/app-kit/src/cache/tests/persistent.test.ts new file mode 100644 index 0000000..2623e25 --- /dev/null +++ b/packages/app-kit/src/cache/tests/persistent.test.ts @@ -0,0 +1,449 @@ +import { beforeEach, describe, expect, test, vi } from "vitest"; +import { PersistentStorage } from "../storage"; + +/** Mock LakebaseConnector for testing */ +const createMockConnector = () => ({ + query: vi.fn(), + healthCheck: vi.fn().mockResolvedValue(true), + close: vi.fn().mockResolvedValue(undefined), +}); + +describe("PersistentStorage", () => { + let storage: PersistentStorage; + let mockConnector: ReturnType; + + beforeEach(() => { + mockConnector = createMockConnector(); + + // Default: migrations succeed + mockConnector.query.mockResolvedValue({ rows: [] }); + + storage = new PersistentStorage( + { maxBytes: 1024 * 1024 }, // 1MB + mockConnector as any, + ); + }); + + describe("initialization", () => { + test("should run migrations on initialize", async () => { + await storage.initialize(); + + // Should create table + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("CREATE TABLE IF NOT EXISTS"), + ); + + // Should create unique index on key_hash + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("CREATE UNIQUE INDEX IF NOT EXISTS"), + ); + }); + + test("should only initialize once", async () => { + await storage.initialize(); + await storage.initialize(); + + // CREATE TABLE should only be called once (first initialization) + const createTableCalls = mockConnector.query.mock.calls.filter((call) => + call[0].includes("CREATE TABLE"), + ); + expect(createTableCalls.length).toBe(1); + }); + + test("should throw on migration error", async () => { + const consoleErrorSpy = vi + .spyOn(console, "error") + .mockImplementation(() => {}); + + mockConnector.query.mockRejectedValue(new Error("migration failed")); + + await expect(storage.initialize()).rejects.toThrow("migration failed"); + + consoleErrorSpy.mockRestore(); + }); + }); + + describe("get", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should return cached entry", async () => { + const expiry = Date.now() + 10000; + const valueBuffer = Buffer.from( + JSON.stringify({ data: "test" }), + "utf-8", + ); + + mockConnector.query.mockResolvedValueOnce({ + rows: [{ value: valueBuffer, expiry: String(expiry) }], + }); + + const result = await storage.get("test-key"); + + expect(result).toEqual({ + value: { data: "test" }, + expiry, + }); + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("SELECT value, expiry"), + [expect.any(BigInt)], // key_hash is bigint + ); + }); + + test("should return null for non-existent key", async () => { + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + const result = await storage.get("non-existent"); + + expect(result).toBeNull(); + }); + + test("should update last_accessed on get (fire-and-forget)", async () => { + const expiry = Date.now() + 10000; + const valueBuffer = Buffer.from( + JSON.stringify({ data: "test" }), + "utf-8", + ); + + mockConnector.query + .mockResolvedValueOnce({ + rows: [{ value: valueBuffer, expiry: String(expiry) }], + }) + .mockResolvedValue({ rows: [] }); + + await storage.get("test-key"); + + // Wait for fire-and-forget update + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("UPDATE"), + [expect.any(BigInt)], // key_hash + ); + }); + }); + + describe("set", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should insert new entry", async () => { + // Mock Math.random to skip eviction check (>= evictionCheckProbability) + const randomSpy = vi.spyOn(Math, "random").mockReturnValue(0.5); + + // INSERT succeeds + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + await storage.set("test-key", { + value: { data: "test" }, + expiry: Date.now() + 10000, + }); + + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("INSERT INTO"), + expect.arrayContaining([ + expect.any(BigInt), // key_hash + expect.any(Buffer), // key + expect.any(Buffer), // value + expect.any(Number), // byte_size + expect.any(Number), // expiry + ]), + ); + + randomSpy.mockRestore(); + }); + + test("should evict when maxBytes exceeded", async () => { + // Mock Math.random to ensure eviction check runs (< evictionCheckProbability) + const randomSpy = vi.spyOn(Math, "random").mockReturnValue(0.05); + + // totalBytes() returns maxBytes (triggers eviction) + mockConnector.query.mockResolvedValueOnce({ + rows: [{ total: String(1024 * 1024) }], // 1MB (at limit) + }); + // cleanupExpired returns 0 + mockConnector.query.mockResolvedValueOnce({ + rows: [{ count: "0" }], + }); + // eviction DELETE succeeds + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + // INSERT succeeds + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + await storage.set("new-key", { + value: { data: "new" }, + expiry: Date.now() + 10000, + }); + + // Should have called DELETE for LRU eviction + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("DELETE FROM"), + expect.any(Array), + ); + + randomSpy.mockRestore(); + }); + + test("should serialize value to Buffer", async () => { + // Mock Math.random to skip eviction check (>= evictionCheckProbability) + const randomSpy = vi.spyOn(Math, "random").mockReturnValue(0.5); + + // INSERT succeeds + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + const value = { nested: { array: [1, 2, 3] } }; + await storage.set("test-key", { + value, + expiry: Date.now() + 10000, + }); + + const insertCall = mockConnector.query.mock.calls.find((call) => + call[0].includes("INSERT"), + ); + + // value is at index 2 (key_hash, key, value, ...) + const valueBuffer = insertCall?.[1]?.[2] as Buffer; + expect(valueBuffer).toBeInstanceOf(Buffer); + expect(valueBuffer.toString("utf-8")).toBe(JSON.stringify(value)); + + randomSpy.mockRestore(); + }); + }); + + describe("delete", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should delete entry by key_hash", async () => { + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + await storage.delete("test-key"); + + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("DELETE FROM"), + [expect.any(BigInt)], // key_hash + ); + }); + }); + + describe("clear", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should truncate table", async () => { + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + await storage.clear(); + + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("TRUNCATE TABLE"), + ); + }); + }); + + describe("has", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should return true when key exists", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ exists: true }], + }); + + const result = await storage.has("test-key"); + + expect(result).toBe(true); + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("SELECT EXISTS"), + [expect.any(BigInt)], // key_hash + ); + }); + + test("should return false when key does not exist", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ exists: false }], + }); + + const result = await storage.has("non-existent"); + + expect(result).toBe(false); + }); + + test("should return false when query returns no rows", async () => { + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + const result = await storage.has("test-key"); + + expect(result).toBe(false); + }); + }); + + describe("size", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should return count of entries", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ count: "42" }], + }); + + const result = await storage.size(); + + expect(result).toBe(42); + }); + + test("should return 0 when empty", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ count: "0" }], + }); + + const result = await storage.size(); + + expect(result).toBe(0); + }); + + test("should return 0 when no rows", async () => { + mockConnector.query.mockResolvedValueOnce({ rows: [] }); + + const result = await storage.size(); + + expect(result).toBe(0); + }); + }); + + describe("totalBytes", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should return sum of byte_size", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ total: "1048576" }], // 1MB + }); + + const result = await storage.totalBytes(); + + expect(result).toBe(1048576); + }); + + test("should return 0 when empty", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ total: "0" }], + }); + + const result = await storage.totalBytes(); + + expect(result).toBe(0); + }); + }); + + describe("cleanupExpired", () => { + beforeEach(async () => { + await storage.initialize(); + mockConnector.query.mockClear(); + }); + + test("should delete expired entries", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ count: "5" }], + }); + + const deleted = await storage.cleanupExpired(); + + expect(deleted).toBe(5); + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("DELETE FROM"), + expect.arrayContaining([expect.any(Number)]), + ); + }); + + test("should return 0 when no expired entries", async () => { + mockConnector.query.mockResolvedValueOnce({ + rows: [{ count: "0" }], + }); + + const deleted = await storage.cleanupExpired(); + + expect(deleted).toBe(0); + }); + }); + + describe("storage properties", () => { + test("should report as persistent", () => { + expect(storage.isPersistent()).toBe(true); + }); + + test("should delegate healthCheck to connector", async () => { + mockConnector.healthCheck.mockResolvedValueOnce(true); + + const result = await storage.healthCheck(); + + expect(result).toBe(true); + expect(mockConnector.healthCheck).toHaveBeenCalled(); + }); + + test("should return false on healthCheck error", async () => { + mockConnector.healthCheck.mockRejectedValueOnce(new Error("failed")); + + const result = await storage.healthCheck(); + + expect(result).toBe(false); + }); + + test("should close connector on close", async () => { + await storage.close(); + + expect(mockConnector.close).toHaveBeenCalled(); + }); + }); + + describe("auto-initialization", () => { + test("should auto-initialize on get if not initialized", async () => { + const uninitializedStorage = new PersistentStorage( + { maxBytes: 1024 * 1024 }, + mockConnector as any, + ); + + mockConnector.query.mockResolvedValue({ rows: [] }); + + await uninitializedStorage.get("test-key"); + + // Should have run migrations + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("CREATE TABLE"), + ); + }); + + test("should auto-initialize on set if not initialized", async () => { + const uninitializedStorage = new PersistentStorage( + { maxBytes: 1024 * 1024 }, + mockConnector as any, + ); + + mockConnector.query.mockResolvedValue({ rows: [] }); + + await uninitializedStorage.set("test-key", { + value: "test", + expiry: Date.now() + 10000, + }); + + // Should have run migrations + expect(mockConnector.query).toHaveBeenCalledWith( + expect.stringContaining("CREATE TABLE"), + ); + }); + }); +}); diff --git a/packages/app-kit/src/connectors/index.ts b/packages/app-kit/src/connectors/index.ts index 37f2a12..70702b4 100644 --- a/packages/app-kit/src/connectors/index.ts +++ b/packages/app-kit/src/connectors/index.ts @@ -1 +1,2 @@ +export * from "./lakebase"; export * from "./sql-warehouse"; diff --git a/packages/app-kit/src/connectors/lakebase/client.ts b/packages/app-kit/src/connectors/lakebase/client.ts new file mode 100644 index 0000000..34e57c2 --- /dev/null +++ b/packages/app-kit/src/connectors/lakebase/client.ts @@ -0,0 +1,535 @@ +import { randomUUID } from "node:crypto"; +import type { WorkspaceClient } from "@databricks/sdk-experimental"; +import { ApiClient, Config } from "@databricks/sdk-experimental"; +import pg from "pg"; +import { + type Counter, + type Histogram, + SpanStatusCode, + TelemetryManager, + type TelemetryProvider, +} from "@/telemetry"; +import { deepMerge } from "../../utils"; +import { lakebaseDefaults } from "./defaults"; +import type { + LakebaseConfig, + LakebaseConnectionConfig, + LakebaseCredentials, +} from "./types"; + +/** + * Enterprise-grade connector for Databricks Lakebase + * @example Simplest - everything from env/context + * ```typescript + * const connector = new LakebaseConnector(); + * await connector.query('SELECT * FROM users'); + * ``` + * + * @example With explicit connection string + * ```typescript + * const connector = new LakebaseConnector({ + * connectionString: 'postgresql://...' + * }); + * ``` + */ +export class LakebaseConnector { + private readonly name: string = "lakebase"; + private readonly CACHE_BUFFER_MS = 2 * 60 * 1000; + private readonly config: LakebaseConfig; + private readonly connectionConfig: LakebaseConnectionConfig; + private pool: pg.Pool | null = null; + private credentials: LakebaseCredentials | null = null; + + // telemetry + private readonly telemetry: TelemetryProvider; + private readonly telemetryMetrics: { + queryCount: Counter; + queryDuration: Histogram; + }; + + constructor(userConfig?: Partial) { + this.config = deepMerge(lakebaseDefaults, userConfig); + this.connectionConfig = this.parseConnectionConfig(); + + this.telemetry = TelemetryManager.getProvider( + this.name, + this.config.telemetry, + ); + this.telemetryMetrics = { + queryCount: this.telemetry + .getMeter() + .createCounter("lakebase.query.count", { + description: "Total number of queries executed", + unit: "1", + }), + queryDuration: this.telemetry + .getMeter() + .createHistogram("lakebase.query.duration", { + description: "Duration of queries executed", + unit: "ms", + }), + }; + + // validate configuration + if (this.config.maxPoolSize < 1) { + throw new Error("maxPoolSize must be at least 1"); + } + } + + /** + * Execute a SQL query + * + * @example + * ```typescript + * const users = await connector.query('SELECT * FROM users'); + * const user = await connector.query('SELECT * FROM users WHERE id = $1', [123]); + * ``` + */ + async query( + sql: string, + params?: any[], + retryCount: number = 0, + ): Promise> { + const startTime = Date.now(); + + return this.telemetry.startActiveSpan( + "lakebase.query", + { + attributes: { + "db.system": "lakebase", + "db.statement": sql.substring(0, 500), + "db.retry_count": retryCount, + }, + }, + async (span) => { + try { + const pool = await this.getPool(); + const result = await pool.query(sql, params); + span.setAttribute("db.rows_affected", result.rowCount ?? 0); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + // retry on auth failure + if (this.isAuthError(error)) { + span.addEvent("auth_error_retry"); + await this.rotateCredentials(); + const newPool = await this.getPool(); + const result = await newPool.query(sql, params); + span.setAttribute("db.rows_affected", result.rowCount ?? 0); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } + + // retry on transient errors, but only once + if (this.isTransientError(error) && retryCount < 1) { + span.addEvent("transient_error_retry"); + await new Promise((resolve) => setTimeout(resolve, 100)); + return await this.query(sql, params, retryCount + 1); + } + + span.recordException(error as Error); + span.setStatus({ code: SpanStatusCode.ERROR }); + + throw error; + } finally { + const duration = Date.now() - startTime; + this.telemetryMetrics.queryCount.add(1); + this.telemetryMetrics.queryDuration.record(duration); + span.end(); + } + }, + ); + } + + /** + * Execute a transaction + * + * COMMIT and ROLLBACK are automatically managed by the transaction function. + * + * @param callback - Callback function to execute within the transaction context + * @example + * ```typescript + * await connector.transaction(async (client) => { + * await client.query('INSERT INTO accounts (name) VALUES ($1)', ['Alice']); + * await client.query('INSERT INTO logs (action) VALUES ($1)', ['Created Alice']); + * }); + * ``` + */ + async transaction( + callback: (client: pg.PoolClient) => Promise, + retryCount: number = 0, + ): Promise { + const startTime = Date.now(); + return this.telemetry.startActiveSpan( + "lakebase.transaction", + { + attributes: { + "db.system": "lakebase", + "db.retry_count": retryCount, + }, + }, + async (span) => { + const pool = await this.getPool(); + const client = await pool.connect(); + try { + await client.query("BEGIN"); + const result = await callback(client); + await client.query("COMMIT"); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + try { + await client.query("ROLLBACK"); + } catch {} + // retry on auth failure + if (this.isAuthError(error)) { + span.addEvent("auth_error_retry"); + client.release(); + await this.rotateCredentials(); + const newPool = await this.getPool(); + const retryClient = await newPool.connect(); + try { + await client.query("BEGIN"); + const result = await callback(retryClient); + await client.query("COMMIT"); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (retryError) { + try { + await retryClient.query("ROLLBACK"); + } catch {} + throw retryError; + } finally { + retryClient.release(); + } + } + + // retry on transient errors, but only once + if (this.isTransientError(error) && retryCount < 1) { + span.addEvent("transaction_error_retry"); + client.release(); + await new Promise((resolve) => setTimeout(resolve, 100)); + return await this.transaction(callback, retryCount + 1); + } + span.recordException(error as Error); + span.setStatus({ code: SpanStatusCode.ERROR }); + throw error; + } finally { + client.release(); + const duration = Date.now() - startTime; + this.telemetryMetrics.queryCount.add(1); + this.telemetryMetrics.queryDuration.record(duration); + span.end(); + } + }, + ); + } + + /** Check if database connection is healthy */ + async healthCheck(): Promise { + return this.telemetry.startActiveSpan( + "lakebase.healthCheck", + {}, + async (span) => { + try { + const result = await this.query<{ result: number }>( + "SELECT 1 as result", + ); + const healthy = result.rows[0]?.result === 1; + span.setAttribute("db.healthy", healthy); + span.setStatus({ code: SpanStatusCode.OK }); + return healthy; + } catch { + span.setAttribute("db.healthy", false); + span.setStatus({ code: SpanStatusCode.ERROR }); + return false; + } finally { + span.end(); + } + }, + ); + } + + /** Close connection pool (call on shutdown) */ + async close(): Promise { + if (this.pool) { + await this.pool.end().catch((error: unknown) => { + console.error("Error closing connection pool:", error); + }); + this.pool = null; + } + this.credentials = null; + } + + /** Setup graceful shutdown to close connection pools */ + shutdown(): void { + process.on("SIGTERM", () => this.close()); + process.on("SIGINT", () => this.close()); + this.close(); + } + + /** Get Databricks workspace client - from config or request context */ + private getWorkspaceClient(): WorkspaceClient { + if (this.config.workspaceClient) { + return this.config.workspaceClient; + } + + try { + const { getRequestContext } = require("../../utils"); + const { serviceDatabricksClient } = getRequestContext(); + + // cache it for subsequent calls + this.config.workspaceClient = serviceDatabricksClient; + return serviceDatabricksClient; + } catch (_error) { + throw new Error( + "Databricks workspace client not available. Either pass it in config or use within App Kit request context.", + ); + } + } + + /** Get or create connection pool */ + private async getPool(): Promise { + if (!this.connectionConfig) { + throw new Error( + "Lakebase connection not configured. " + + "Set PGHOST, PGDATABASE, PGAPPNAME env vars, provide a connectionString, or pass explicit config.", + ); + } + + if (!this.pool) { + const creds = await this.getCredentials(); + this.pool = this.createPool(creds); + } + return this.pool; + } + + /** Create PostgreSQL pool */ + private createPool(credentials: { + username: string; + password: string; + }): pg.Pool { + const { host, database, port, sslMode } = this.connectionConfig; + + const pool = new pg.Pool({ + host, + port, + database, + user: credentials.username, + password: credentials.password, + max: this.config.maxPoolSize, + idleTimeoutMillis: this.config.idleTimeoutMs, + connectionTimeoutMillis: this.config.connectionTimeoutMs, + ssl: sslMode === "require" ? { rejectUnauthorized: true } : false, + }); + + pool.on("error", (error: Error & { code?: string }) => { + console.error("Connection pool error:", error.message, { + code: error.code, + }); + }); + + return pool; + } + + /** Get or fetch credentials with caching */ + private async getCredentials(): Promise<{ + username: string; + password: string; + }> { + const now = Date.now(); + + // return cached if still valid + if ( + this.credentials && + now < this.credentials.expiresAt - this.CACHE_BUFFER_MS + ) { + return this.credentials; + } + + // fetch new credentials + const username = await this.fetchUsername(); + const { token, expiresAt } = await this.fetchPassword(); + + this.credentials = { + username, + password: token, + expiresAt, + }; + + return { username, password: token }; + } + + /** Rotate credentials and recreate pool */ + private async rotateCredentials(): Promise { + // clear cached credentials + this.credentials = null; + + if (this.pool) { + const oldPool = this.pool; + this.pool = null; + oldPool.end().catch((error: unknown) => { + console.error( + "Error closing old connection pool during rotation:", + error, + ); + }); + } + } + + /** Fetch username from Databricks */ + private async fetchUsername(): Promise { + const workspaceClient = this.getWorkspaceClient(); + const user = await workspaceClient.currentUser.me(); + if (!user.userName) { + throw new Error("Failed to get current user from Databricks workspace"); + } + return user.userName; + } + + /** Fetch password (OAuth token) from Databricks */ + private async fetchPassword(): Promise<{ token: string; expiresAt: number }> { + const workspaceClient = this.getWorkspaceClient(); + const config = new Config({ host: workspaceClient.config.host }); + const apiClient = new ApiClient(config); + + if (!this.connectionConfig.appName) { + throw new Error(`Database app name not found in connection config`); + } + + const credentials = await apiClient.request({ + path: `/api/2.0/database/credentials`, + method: "POST", + headers: new Headers(), + raw: false, + payload: { + instance_names: [this.connectionConfig.appName], + request_id: randomUUID(), + }, + }); + + if (!this.validateCredentials(credentials)) { + throw new Error( + `Failed to generate credentials for instance: ${this.connectionConfig.appName}`, + ); + } + + const expiresAt = new Date(credentials.expiration_time).getTime(); + + return { token: credentials.token, expiresAt }; + } + + /** Check if error is auth failure */ + private isAuthError(error: unknown): boolean { + return ( + typeof error === "object" && + error !== null && + "code" in error && + (error as any).code === "28P01" + ); + } + + /** Check if error is transient */ + private isTransientError(error: unknown): boolean { + if (typeof error !== "object" || error === null || !("code" in error)) { + return false; + } + + const code = (error as any).code; + return ( + code === "ECONNRESET" || + code === "ECONNREFUSED" || + code === "ETIMEDOUT" || + code === "57P01" || // admin_shutdown + code === "57P03" || // cannot_connect_now + code === "08006" || // connection_failure + code === "08003" || // connection_does_not_exist + code === "08000" // connection_exception + ); + } + + /** Type guard for credentials */ + private validateCredentials( + value: unknown, + ): value is { token: string; expiration_time: string } { + if (typeof value !== "object" || value === null) { + return false; + } + + const credentials = value as { token: string; expiration_time: string }; + return ( + "token" in credentials && + typeof credentials.token === "string" && + "expiration_time" in credentials && + typeof credentials.expiration_time === "string" && + new Date(credentials.expiration_time).getTime() > Date.now() + ); + } + + /** Parse connection configuration from config or environment */ + private parseConnectionConfig(): LakebaseConnectionConfig { + if (this.config.connectionString) { + return this.parseConnectionString(this.config.connectionString); + } + + // get connection from config + if (this.config.host && this.config.database && this.config.appName) { + return { + host: this.config.host, + database: this.config.database, + port: this.config.port ?? 5432, + sslMode: this.config.sslMode ?? "require", + appName: this.config.appName, + }; + } + + // get connection from environment variables + const pgHost = process.env.PGHOST; + const pgDatabase = process.env.PGDATABASE; + const pgAppName = process.env.PGAPPNAME; + if (!pgHost || !pgDatabase || !pgAppName) { + throw new Error( + "Lakebase connection not configured. Required env vars: PGHOST, PGDATABASE, PGAPPNAME. " + + "Optional: PGPORT (default: 5432), PGSSLMODE (default: require).", + ); + } + const pgPort = process.env.PGPORT; + const port = pgPort ? parseInt(pgPort, 10) : 5432; + + if (Number.isNaN(port)) { + throw new Error(`Invalid port: ${pgPort}. Must be a number.`); + } + + const pgSSLMode = process.env.PGSSLMODE; + const sslMode = + (pgSSLMode as "require" | "disable" | "prefer") || "require"; + + return { + host: pgHost, + database: pgDatabase, + port, + sslMode, + appName: pgAppName, + }; + } + + private parseConnectionString( + connectionString: string, + ): LakebaseConnectionConfig { + const url = new URL(connectionString); + const appName = url.searchParams.get("appName"); + if (!appName) { + throw new Error("Connection string must include appName parameter"); + } + + return { + host: url.hostname, + database: url.pathname.slice(1), // remove leading slash + port: url.port ? parseInt(url.port, 10) : 5432, + sslMode: + (url.searchParams.get("sslmode") as "require" | "disable" | "prefer") ?? + "require", + appName: appName, + }; + } +} diff --git a/packages/app-kit/src/connectors/lakebase/defaults.ts b/packages/app-kit/src/connectors/lakebase/defaults.ts new file mode 100644 index 0000000..c025334 --- /dev/null +++ b/packages/app-kit/src/connectors/lakebase/defaults.ts @@ -0,0 +1,10 @@ +import type { LakebaseConfig } from "./types"; + +/** Default configuration for Lakebase connector */ +export const lakebaseDefaults: LakebaseConfig = { + port: 5432, + sslMode: "require", + maxPoolSize: 10, + idleTimeoutMs: 30_000, + connectionTimeoutMs: 10_000, +}; diff --git a/packages/app-kit/src/connectors/lakebase/index.ts b/packages/app-kit/src/connectors/lakebase/index.ts new file mode 100644 index 0000000..fe028c4 --- /dev/null +++ b/packages/app-kit/src/connectors/lakebase/index.ts @@ -0,0 +1,2 @@ +export { LakebaseConnector } from "./client"; +export type { LakebaseConfig } from "./types"; diff --git a/packages/app-kit/src/connectors/lakebase/types.ts b/packages/app-kit/src/connectors/lakebase/types.ts new file mode 100644 index 0000000..a8591b5 --- /dev/null +++ b/packages/app-kit/src/connectors/lakebase/types.ts @@ -0,0 +1,65 @@ +import type { WorkspaceClient } from "@databricks/sdk-experimental"; +import type { TelemetryOptions } from "shared"; + +/** Configuration for LakebaseConnector */ +export interface LakebaseConfig { + /** Databricks workspace client */ + workspaceClient?: WorkspaceClient; + + /** Connection string */ + connectionString?: string; + + /** Database host (e.g., instance-uuid.database.region.databricks.com) */ + host?: string; + + /** Database name */ + database?: string; + + /** Database port */ + port: number; + + /** App name */ + appName?: string; + + /** SSL mode */ + sslMode: "require" | "disable" | "prefer"; + + /** Maximum number of connections in the pool */ + maxPoolSize: number; + + /** Close idle connections after this time (milliseconds) */ + idleTimeoutMs: number; + + /** Connection timeout (milliseconds) */ + connectionTimeoutMs: number; + + /** Telemetry configuration */ + telemetry?: TelemetryOptions; + + /** Additional configuration options */ + [key: string]: unknown; +} + +/** Lakebase credentials for authentication */ +export interface LakebaseCredentials { + /** Username */ + username: string; + /** Password */ + password: string; + /** Expires at */ + expiresAt: number; +} + +/** Internal connection configuration */ +export interface LakebaseConnectionConfig { + /** Database host */ + readonly host: string; + /** Database name */ + readonly database: string; + /** Database port */ + readonly port: number; + /** SSL mode */ + readonly sslMode: "require" | "disable" | "prefer"; + /** App name */ + readonly appName?: string; +} diff --git a/packages/app-kit/src/connectors/lakebase/utils.ts b/packages/app-kit/src/connectors/lakebase/utils.ts new file mode 100644 index 0000000..e7d79c1 --- /dev/null +++ b/packages/app-kit/src/connectors/lakebase/utils.ts @@ -0,0 +1,92 @@ +import type { LakebaseConnectionConfig } from "./types"; + +export interface ParsedConnectionString { + connectionParams: LakebaseConnectionConfig; + originalConnectionString: string; +} + +/** Parse connection string or environment variables */ +export function parseConnectionString( + connectionStringOrHost: string, + database?: string, + port?: number, +): ParsedConnectionString { + if ( + connectionStringOrHost.startsWith("postgresql://") || + connectionStringOrHost.startsWith("postgres://") + ) { + // parse full connection string + const cleanedString = connectionStringOrHost.replace( + /:?\$\{PGPASSWORD\}@/, + "@", + ); + const url = new URL(cleanedString); + + if (url.protocol !== "postgresql:" && url.protocol !== "postgres:") { + throw new Error( + `Invalid connection string protocol: ${url.protocol}. Expected postgresql: or postgres:`, + ); + } + + if (!url.hostname) { + throw new Error("Connection string must include a hostname"); + } + + const dbName = url.pathname.slice(1) || "databricks_postgres"; + const sslMode = + (url.searchParams.get("sslmode") as "require" | "disable" | "prefer") || + "require"; + + const connectionParams: LakebaseConnectionConfig = { + host: url.hostname, + database: dbName, + port: url.port ? parseInt(url.port, 10) : 5432, + sslMode, + }; + + return { + connectionParams, + originalConnectionString: connectionStringOrHost, + }; + } + + if (!database) { + throw new Error( + "Database name is required when using hostname directly (PGHOST format)", + ); + } + + const connectionParams: LakebaseConnectionConfig = { + host: connectionStringOrHost, + database, + port: port || 5432, + sslMode: "require", + }; + + return { + connectionParams, + originalConnectionString: `postgresql://:@${connectionStringOrHost}:${port || 5432}/${database}`, + }; +} + +/** Parse connection configuration from environment variables */ +export function parseFromEnv(): LakebaseConnectionConfig { + const host = process.env.PGHOST; + const database = process.env.PGDATABASE; + const port = process.env.PGPORT ? parseInt(process.env.PGPORT, 10) : 5432; + + if (!host) { + throw new Error("PGHOST environment variable is required"); + } + + if (!database) { + throw new Error("PGDATABASE environment variable is required"); + } + + return { + host, + database, + port, + sslMode: "require", + }; +} diff --git a/packages/app-kit/src/connectors/sql-warehouse/client.ts b/packages/app-kit/src/connectors/sql-warehouse/client.ts index 3caa6f2..e7afe28 100644 --- a/packages/app-kit/src/connectors/sql-warehouse/client.ts +++ b/packages/app-kit/src/connectors/sql-warehouse/client.ts @@ -3,48 +3,54 @@ import { type sql, type WorkspaceClient, } from "@databricks/sdk-experimental"; -import type { ITelemetry } from "../../telemetry"; +import type { TelemetryOptions } from "shared"; +import type { TelemetryProvider } from "../../telemetry"; import { type Counter, type Histogram, - type Meter, type Span, SpanKind, SpanStatusCode, + TelemetryManager, } from "../../telemetry"; import { executeStatementDefaults } from "./defaults"; export interface SQLWarehouseConfig { timeout?: number; - telemetry: ITelemetry; + telemetry?: TelemetryOptions; } export class SQLWarehouseConnector { - private static readonly TELEMETRY_INSTRUMENT_CONFIG = { - name: "sql-warehouse", - includePrefix: true, - }; + private readonly name = "sql-warehouse"; private config: SQLWarehouseConfig; - private meter: Meter; - private queryCounter: Counter; - private queryDuration: Histogram; + // telemetry + private readonly telemetry: TelemetryProvider; + private readonly telemetryMetrics: { + queryCount: Counter; + queryDuration: Histogram; + }; constructor(config: SQLWarehouseConfig) { this.config = config; - this.meter = this.config.telemetry.getMeter( - SQLWarehouseConnector.TELEMETRY_INSTRUMENT_CONFIG, - ); - this.queryCounter = this.meter.createCounter("db.query.count", { - description: "Total number of database queries", - unit: "1", - }); - this.queryDuration = this.meter.createHistogram("db.query.duration", { - description: "Duration of database queries", - unit: "ms", - }); + this.telemetry = TelemetryManager.getProvider( + this.name, + this.config.telemetry, + ); + this.telemetryMetrics = { + queryCount: this.telemetry.getMeter().createCounter("query.count", { + description: "Total number of queries executed", + unit: "1", + }), + queryDuration: this.telemetry + .getMeter() + .createHistogram("query.duration", { + description: "Duration of queries executed", + unit: "ms", + }), + }; } async executeStatement( @@ -55,7 +61,7 @@ export class SQLWarehouseConnector { const startTime = Date.now(); let success = false; - return this.config.telemetry.startActiveSpan( + return this.telemetry.startActiveSpan( "sql.query", { kind: SpanKind.CLIENT, @@ -185,11 +191,11 @@ export class SQLWarehouseConnector { success: success.toString(), }; - this.queryCounter.add(1, attributes); - this.queryDuration.record(duration, attributes); + this.telemetryMetrics.queryCount.add(1, attributes); + this.telemetryMetrics.queryDuration.record(duration, attributes); } }, - SQLWarehouseConnector.TELEMETRY_INSTRUMENT_CONFIG, + { name: this.name, includePrefix: true }, ); } @@ -199,7 +205,7 @@ export class SQLWarehouseConnector { timeout = executeStatementDefaults.timeout, signal?: AbortSignal, ) { - return this.config.telemetry.startActiveSpan( + return this.telemetry.startActiveSpan( "sql.poll", { attributes: { @@ -303,7 +309,7 @@ export class SQLWarehouseConnector { span.end(); } }, - SQLWarehouseConnector.TELEMETRY_INSTRUMENT_CONFIG, + { name: this.name, includePrefix: true }, ); } diff --git a/packages/app-kit/src/connectors/tests/lakebase.test.ts b/packages/app-kit/src/connectors/tests/lakebase.test.ts new file mode 100644 index 0000000..22322ac --- /dev/null +++ b/packages/app-kit/src/connectors/tests/lakebase.test.ts @@ -0,0 +1,494 @@ +import { beforeEach, describe, expect, test, vi } from "vitest"; +import { LakebaseConnector } from "../lakebase"; + +// Mock pg module +vi.mock("pg", () => { + const mockQuery = vi.fn(); + const mockConnect = vi.fn(); + const mockEnd = vi.fn().mockResolvedValue(undefined); + const mockRelease = vi.fn(); + const mockOn = vi.fn(); + + const MockPool = vi.fn(() => ({ + query: mockQuery, + connect: mockConnect, + end: mockEnd, + on: mockOn, + })); + + return { + default: { Pool: MockPool }, + Pool: MockPool, + __mockQuery: mockQuery, + __mockConnect: mockConnect, + __mockEnd: mockEnd, + __mockRelease: mockRelease, + __mockOn: mockOn, + __MockPool: MockPool, + }; +}); + +// Mock Databricks SDK +vi.mock("@databricks/sdk-experimental", () => { + const mockMe = vi.fn(); + const mockRequest = vi.fn(); + + const MockWorkspaceClient = vi.fn(() => ({ + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + })); + + const MockApiClient = vi.fn(() => ({ + request: mockRequest, + })); + + const MockConfig = vi.fn(() => ({})); + + return { + WorkspaceClient: MockWorkspaceClient, + ApiClient: MockApiClient, + Config: MockConfig, + __mockMe: mockMe, + __mockRequest: mockRequest, + __MockWorkspaceClient: MockWorkspaceClient, + __MockApiClient: MockApiClient, + }; +}); + +describe("LakebaseConnector", () => { + beforeEach(() => { + vi.clearAllMocks(); + // Set required env vars + process.env.PGHOST = "test-host.databricks.com"; + process.env.PGDATABASE = "test-db"; + process.env.PGAPPNAME = "test-app"; + }); + + describe("configuration", () => { + test("should throw error when maxPoolSize is less than 1", () => { + expect( + () => + new LakebaseConnector({ + maxPoolSize: 0, + workspaceClient: {} as any, + }), + ).toThrow("maxPoolSize must be at least 1"); + }); + + test("should create connector with valid config", () => { + const connector = new LakebaseConnector({ + workspaceClient: {} as any, + }); + + expect(connector).toBeInstanceOf(LakebaseConnector); + }); + + test("should throw when env vars are missing", () => { + delete process.env.PGHOST; + delete process.env.PGDATABASE; + delete process.env.PGAPPNAME; + + expect(() => new LakebaseConnector()).toThrow( + "Lakebase connection not configured", + ); + }); + + test("should throw when PGPORT is invalid", () => { + process.env.PGPORT = "invalid"; + + expect(() => new LakebaseConnector()).toThrow("Invalid port"); + }); + + test("should parse env vars correctly", () => { + process.env.PGPORT = "5433"; + process.env.PGSSLMODE = "disable"; + + const connector = new LakebaseConnector(); + + expect(connector).toBeInstanceOf(LakebaseConnector); + }); + + test("should use explicit config over env vars", () => { + const connector = new LakebaseConnector({ + host: "explicit-host.databricks.com", + database: "explicit-db", + appName: "explicit-app", + port: 5434, + sslMode: "prefer", + workspaceClient: {} as any, + }); + + expect(connector).toBeInstanceOf(LakebaseConnector); + }); + }); + + describe("query", () => { + let connector: LakebaseConnector; + let mockQuery: ReturnType; + let mockMe: ReturnType; + let mockRequest: ReturnType; + + beforeEach(async () => { + const pg = await import("pg"); + const sdk = await import("@databricks/sdk-experimental"); + + mockQuery = (pg as any).__mockQuery; + mockMe = (sdk as any).__mockMe; + mockRequest = (sdk as any).__mockRequest; + + // Setup default mocks + mockMe.mockResolvedValue({ userName: "test-user@example.com" }); + mockRequest.mockResolvedValue({ + token: "test-oauth-token", + expiration_time: new Date(Date.now() + 3600000).toISOString(), // 1 hour from now + }); + mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); + + connector = new LakebaseConnector({ + workspaceClient: { + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + } as any, + }); + }); + + test("should execute query successfully", async () => { + const result = await connector.query("SELECT 1 as result"); + + expect(result.rows).toEqual([{ result: 1 }]); + expect(mockQuery).toHaveBeenCalledWith("SELECT 1 as result", undefined); + }); + + test("should execute query with parameters", async () => { + mockQuery.mockResolvedValue({ rows: [{ id: 1, name: "test" }] }); + + const result = await connector.query( + "SELECT * FROM users WHERE id = $1", + [1], + ); + + expect(result.rows).toEqual([{ id: 1, name: "test" }]); + expect(mockQuery).toHaveBeenCalledWith( + "SELECT * FROM users WHERE id = $1", + [1], + ); + }); + + test("should retry on auth error (28P01)", async () => { + const authError = new Error("auth failed") as any; + authError.code = "28P01"; + + mockQuery + .mockRejectedValueOnce(authError) + .mockResolvedValue({ rows: [{ result: 1 }] }); + + const result = await connector.query("SELECT 1"); + + expect(result.rows).toEqual([{ result: 1 }]); + expect(mockQuery).toHaveBeenCalledTimes(2); + }); + + test("should retry once on transient error", async () => { + const transientError = new Error("connection reset") as any; + transientError.code = "ECONNRESET"; + + mockQuery + .mockRejectedValueOnce(transientError) + .mockResolvedValue({ rows: [{ result: 1 }] }); + + const result = await connector.query("SELECT 1"); + + expect(result.rows).toEqual([{ result: 1 }]); + expect(mockQuery).toHaveBeenCalledTimes(2); + }); + + test("should not retry transient error more than once", async () => { + const transientError = new Error("connection reset") as any; + transientError.code = "ECONNRESET"; + + mockQuery.mockRejectedValue(transientError); + + await expect(connector.query("SELECT 1")).rejects.toThrow( + "connection reset", + ); + expect(mockQuery).toHaveBeenCalledTimes(2); + }); + + test("should throw non-retriable errors immediately", async () => { + const syntaxError = new Error("syntax error") as any; + syntaxError.code = "42601"; + + mockQuery.mockRejectedValue(syntaxError); + + await expect(connector.query("SELEC 1")).rejects.toThrow("syntax error"); + expect(mockQuery).toHaveBeenCalledTimes(1); + }); + }); + + describe("transaction", () => { + let connector: LakebaseConnector; + let mockConnect: ReturnType; + let mockMe: ReturnType; + let mockRequest: ReturnType; + + beforeEach(async () => { + const pg = await import("pg"); + const sdk = await import("@databricks/sdk-experimental"); + + mockConnect = (pg as any).__mockConnect; + mockMe = (sdk as any).__mockMe; + mockRequest = (sdk as any).__mockRequest; + + mockMe.mockResolvedValue({ userName: "test-user@example.com" }); + mockRequest.mockResolvedValue({ + token: "test-oauth-token", + expiration_time: new Date(Date.now() + 3600000).toISOString(), + }); + + const mockClient = { + query: vi.fn().mockResolvedValue({ rows: [] }), + release: vi.fn(), + }; + mockConnect.mockResolvedValue(mockClient); + + connector = new LakebaseConnector({ + workspaceClient: { + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + } as any, + }); + }); + + test("should execute transaction successfully", async () => { + const result = await connector.transaction(async (client) => { + await client.query("BEGIN"); + await client.query("INSERT INTO test VALUES (1)"); + await client.query("COMMIT"); + return "success"; + }); + + expect(result).toBe("success"); + }); + + test("should release client after transaction", async () => { + const mockClient = { + query: vi.fn().mockResolvedValue({ rows: [] }), + release: vi.fn(), + }; + mockConnect.mockResolvedValue(mockClient); + + await connector.transaction(async (client) => { + await client.query("SELECT 1"); + return "done"; + }); + + expect(mockClient.release).toHaveBeenCalled(); + }); + }); + + describe("healthCheck", () => { + let connector: LakebaseConnector; + let mockQuery: ReturnType; + let mockMe: ReturnType; + let mockRequest: ReturnType; + + beforeEach(async () => { + const pg = await import("pg"); + const sdk = await import("@databricks/sdk-experimental"); + + mockQuery = (pg as any).__mockQuery; + mockMe = (sdk as any).__mockMe; + mockRequest = (sdk as any).__mockRequest; + + mockMe.mockResolvedValue({ userName: "test-user@example.com" }); + mockRequest.mockResolvedValue({ + token: "test-oauth-token", + expiration_time: new Date(Date.now() + 3600000).toISOString(), + }); + + connector = new LakebaseConnector({ + workspaceClient: { + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + } as any, + }); + }); + + test("should return true when database is healthy", async () => { + mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); + + const isHealthy = await connector.healthCheck(); + + expect(isHealthy).toBe(true); + }); + + test("should return false when database is unhealthy", async () => { + mockQuery.mockRejectedValue(new Error("connection failed")); + + const isHealthy = await connector.healthCheck(); + + expect(isHealthy).toBe(false); + }); + + test("should return false when result is unexpected", async () => { + mockQuery.mockResolvedValue({ rows: [{ result: 0 }] }); + + const isHealthy = await connector.healthCheck(); + + expect(isHealthy).toBe(false); + }); + }); + + describe("close", () => { + let connector: LakebaseConnector; + let mockEnd: ReturnType; + let mockQuery: ReturnType; + let mockMe: ReturnType; + let mockRequest: ReturnType; + + beforeEach(async () => { + const pg = await import("pg"); + const sdk = await import("@databricks/sdk-experimental"); + + mockEnd = (pg as any).__mockEnd; + mockQuery = (pg as any).__mockQuery; + mockMe = (sdk as any).__mockMe; + mockRequest = (sdk as any).__mockRequest; + + mockMe.mockResolvedValue({ userName: "test-user@example.com" }); + mockRequest.mockResolvedValue({ + token: "test-oauth-token", + expiration_time: new Date(Date.now() + 3600000).toISOString(), + }); + mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); + mockEnd.mockResolvedValue(undefined); + + connector = new LakebaseConnector({ + workspaceClient: { + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + } as any, + }); + }); + + test("should close connection pool", async () => { + // Initialize pool by making a query + await connector.query("SELECT 1"); + + await connector.close(); + + expect(mockEnd).toHaveBeenCalled(); + }); + + test("should handle close when pool not initialized", async () => { + // Don't make any queries, pool is not initialized + await expect(connector.close()).resolves.not.toThrow(); + }); + }); + + describe("credentials", () => { + let mockMe: ReturnType; + let mockRequest: ReturnType; + let mockQuery: ReturnType; + + beforeEach(async () => { + const pg = await import("pg"); + const sdk = await import("@databricks/sdk-experimental"); + + mockQuery = (pg as any).__mockQuery; + mockMe = (sdk as any).__mockMe; + mockRequest = (sdk as any).__mockRequest; + + mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); + }); + + test("should throw when username cannot be fetched", async () => { + mockMe.mockResolvedValue({ userName: null }); + mockRequest.mockResolvedValue({ token: "test-token" }); + + const connector = new LakebaseConnector({ + workspaceClient: { + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + } as any, + }); + + await expect(connector.query("SELECT 1")).rejects.toThrow( + "Failed to get current user", + ); + }); + + test("should throw when token cannot be fetched", async () => { + mockMe.mockResolvedValue({ userName: "test-user@example.com" }); + mockRequest.mockResolvedValue({ error: "unauthorized" }); // missing token and expiration_time + + const connector = new LakebaseConnector({ + workspaceClient: { + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + } as any, + }); + + await expect(connector.query("SELECT 1")).rejects.toThrow( + "Failed to generate credentials", + ); + }); + }); + + describe("transient error codes", () => { + let connector: LakebaseConnector; + let mockQuery: ReturnType; + let mockMe: ReturnType; + let mockRequest: ReturnType; + + beforeEach(async () => { + const pg = await import("pg"); + const sdk = await import("@databricks/sdk-experimental"); + + mockQuery = (pg as any).__mockQuery; + mockMe = (sdk as any).__mockMe; + mockRequest = (sdk as any).__mockRequest; + + mockMe.mockResolvedValue({ userName: "test-user@example.com" }); + mockRequest.mockResolvedValue({ + token: "test-oauth-token", + expiration_time: new Date(Date.now() + 3600000).toISOString(), + }); + + connector = new LakebaseConnector({ + workspaceClient: { + currentUser: { me: mockMe }, + config: { host: "https://test.databricks.com" }, + } as any, + }); + }); + + const transientCodes = [ + "ECONNRESET", + "ECONNREFUSED", + "ETIMEDOUT", + "57P01", // admin_shutdown + "57P03", // cannot_connect_now + "08006", // connection_failure + "08003", // connection_does_not_exist + "08000", // connection_exception + ]; + + test.each(transientCodes)( + "should retry on transient error code: %s", + async (code) => { + const error = new Error(`transient error ${code}`) as any; + error.code = code; + + mockQuery + .mockRejectedValueOnce(error) + .mockResolvedValue({ rows: [{ result: 1 }] }); + + const result = await connector.query("SELECT 1"); + + expect(result.rows).toEqual([{ result: 1 }]); + expect(mockQuery).toHaveBeenCalledTimes(2); + }, + ); + }); +}); diff --git a/packages/app-kit/src/core/app-kit.ts b/packages/app-kit/src/core/app-kit.ts index afb2c32..7aecaf7 100644 --- a/packages/app-kit/src/core/app-kit.ts +++ b/packages/app-kit/src/core/app-kit.ts @@ -1,11 +1,13 @@ import type { BasePlugin, + CacheConfig, InputPluginMap, OptionalConfigPluginDef, PluginConstructor, PluginData, PluginMap, } from "shared"; +import { CacheManager } from "../cache"; import type { TelemetryConfig } from "../telemetry"; import { TelemetryManager } from "../telemetry"; @@ -83,9 +85,14 @@ export class AppKit { static async _createApp< T extends PluginData[], >( - config: { plugins?: T; telemetry?: TelemetryConfig } = {}, + config: { + plugins?: T; + telemetry?: TelemetryConfig; + cache?: CacheConfig; + } = {}, ): Promise> { - TelemetryManager.initialize(config.telemetry); + TelemetryManager.initialize(config?.telemetry); + await CacheManager.getInstance(config?.cache); const rawPlugins = config.plugins as T; const preparedPlugins = AppKit.preparePlugins(rawPlugins); @@ -116,6 +123,12 @@ export class AppKit { export async function createApp< T extends PluginData[], ->(config: { plugins?: T } = {}): Promise> { +>( + config: { + plugins?: T; + telemetry?: TelemetryConfig; + cache?: CacheConfig; + } = {}, +): Promise> { return AppKit._createApp(config); } diff --git a/packages/app-kit/src/core/tests/databricks.test.ts b/packages/app-kit/src/core/tests/databricks.test.ts index 1396a8e..09d7772 100644 --- a/packages/app-kit/src/core/tests/databricks.test.ts +++ b/packages/app-kit/src/core/tests/databricks.test.ts @@ -6,6 +6,25 @@ import { createApp, AppKit } from "../app-kit"; // Mock environment validation vi.mock("../utils", () => ({ validateEnv: vi.fn(), + deepMerge: vi.fn((a, b) => ({ ...a, ...b })), +})); + +// Mock CacheManager +vi.mock("@databricks-apps/cache", () => ({ + CacheManager: { + getInstance: vi.fn().mockResolvedValue({ + get: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + getOrExecute: vi.fn(), + }), + getInstanceSync: vi.fn().mockReturnValue({ + get: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + getOrExecute: vi.fn(), + }), + }, })); // Test plugin classes for different phases diff --git a/packages/app-kit/src/plugin/plugin.ts b/packages/app-kit/src/plugin/plugin.ts index 734ea40..d39cbbb 100644 --- a/packages/app-kit/src/plugin/plugin.ts +++ b/packages/app-kit/src/plugin/plugin.ts @@ -48,7 +48,7 @@ export abstract class Plugin< this.name = config.name ?? "plugin"; this.telemetry = TelemetryManager.getProvider(this.name, config.telemetry); this.streamManager = new StreamManager(); - this.cache = new CacheManager(undefined, this.telemetry); + this.cache = CacheManager.getInstanceSync(); this.app = new AppManager(); this.devFileReader = DevFileReader.getInstance(); diff --git a/packages/app-kit/src/plugin/tests/cache.test.ts b/packages/app-kit/src/plugin/tests/cache.test.ts index 8e3b145..3acf17f 100644 --- a/packages/app-kit/src/plugin/tests/cache.test.ts +++ b/packages/app-kit/src/plugin/tests/cache.test.ts @@ -1,6 +1,3 @@ -import { TelemetryManager, type TelemetryProvider } from "../../telemetry"; -import { createMockTelemetry } from "@tools/test-helpers"; -import { CacheManager } from "../../cache"; import type { CacheConfig } from "shared"; import { beforeEach, describe, expect, test, vi } from "vitest"; import { CacheInterceptor } from "../interceptors/cache"; @@ -16,21 +13,75 @@ vi.mock("../../telemetry", () => ({ }, })); +/** Mock CacheManager for testing */ +class MockCacheManager { + private cache = new Map(); + private inFlightRequests = new Map>(); + + async getOrExecute( + key: (string | number | object)[], + fn: () => Promise, + userKey: string, + options?: { ttl?: number }, + ): Promise { + const cacheKey = this.generateKey(key, userKey); + const cached = await this.get(cacheKey); + if (cached !== null) { + return cached; + } + + const inFlight = this.inFlightRequests.get(cacheKey); + if (inFlight) { + return inFlight as Promise; + } + + const promise = fn() + .then(async (result) => { + await this.set(cacheKey, result, options); + return result; + }) + .finally(() => { + this.inFlightRequests.delete(cacheKey); + }); + + this.inFlightRequests.set(cacheKey, promise); + return promise; + } + + async get(key: string): Promise { + const entry = this.cache.get(key); + if (!entry) return null; + + if (Date.now() > entry.expiry) { + this.cache.delete(key); + return null; + } + return entry.value as T; + } + + async set( + key: string, + value: T, + options?: { ttl?: number }, + ): Promise { + const expiryTime = Date.now() + (options?.ttl ?? 3600) * 1000; + this.cache.set(key, { value, expiry: expiryTime }); + } + + generateKey(parts: (string | number | object)[], userKey: string): string { + const { createHash } = require("node:crypto"); + const allParts = [userKey, ...parts]; + const serialized = JSON.stringify(allParts); + return createHash("sha256").update(serialized).digest("hex"); + } +} + describe("CacheInterceptor", () => { - let cacheManager: CacheManager; + let cacheManager: MockCacheManager; let context: ExecutionContext; beforeEach(() => { - const mockTelemetry = createMockTelemetry(); - vi.mocked(TelemetryManager.getProvider).mockReturnValue( - mockTelemetry as TelemetryProvider, - ); - const telemetry = TelemetryManager.getProvider("cache-test", { - traces: false, - metrics: false, - logs: false, - }); - cacheManager = new CacheManager({}, telemetry); + cacheManager = new MockCacheManager(); context = { metadata: new Map(), userKey: "service", @@ -42,7 +93,12 @@ describe("CacheInterceptor", () => { enabled: false, cacheKey: ["test"], }; - const interceptor = new CacheInterceptor(cacheManager, config); + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); const fn = vi.fn().mockResolvedValue("result"); const result = await interceptor.intercept(fn, context); @@ -56,7 +112,12 @@ describe("CacheInterceptor", () => { enabled: true, cacheKey: [], }; - const interceptor = new CacheInterceptor(cacheManager, config); + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); const fn = vi.fn().mockResolvedValue("result"); const result = await interceptor.intercept(fn, context); @@ -70,11 +131,16 @@ describe("CacheInterceptor", () => { enabled: true, cacheKey: ["test", "key"], }; - const interceptor = new CacheInterceptor(cacheManager, config); + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); // Pre-populate cache const cacheKey = cacheManager.generateKey(["test", "key"], "service"); - cacheManager.set(cacheKey, "cached-result"); + await cacheManager.set(cacheKey, "cached-result"); const fn = vi.fn().mockResolvedValue("new-result"); @@ -90,7 +156,12 @@ describe("CacheInterceptor", () => { cacheKey: ["test", "key"], ttl: 3600, }; - const interceptor = new CacheInterceptor(cacheManager, config); + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); const fn = vi.fn().mockResolvedValue("fresh-result"); const result = await interceptor.intercept(fn, context); @@ -100,7 +171,7 @@ describe("CacheInterceptor", () => { // Verify result was cached const cacheKey = cacheManager.generateKey(["test", "key"], "service"); - const cached = cacheManager.get(cacheKey); + const cached = await cacheManager.get(cacheKey); expect(cached).toBe("fresh-result"); }); @@ -113,14 +184,19 @@ describe("CacheInterceptor", () => { metadata: new Map(), userKey: "user1", }; - const interceptor = new CacheInterceptor(cacheManager, config); + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); const fn = vi.fn().mockResolvedValue("user-result"); await interceptor.intercept(fn, contextWithToken); // Cache key should include userKey const cacheKey = cacheManager.generateKey(["query", "sales"], "user1"); - const cached = cacheManager.get(cacheKey); + const cached = await cacheManager.get(cacheKey); expect(cached).toBe("user-result"); }); @@ -129,7 +205,12 @@ describe("CacheInterceptor", () => { enabled: true, cacheKey: ["query", "profile"], }; - const interceptor = new CacheInterceptor(cacheManager, config); + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); // Service account context const context1: ExecutionContext = { @@ -154,8 +235,8 @@ describe("CacheInterceptor", () => { // Verify separate cache entries const key1 = cacheManager.generateKey(["query", "profile"], "service"); const key2 = cacheManager.generateKey(["query", "profile"], "user1"); - expect(cacheManager.get(key1)).toBe("service-account-data"); - expect(cacheManager.get(key2)).toBe("user-data"); + expect(await cacheManager.get(key1)).toBe("service-account-data"); + expect(await cacheManager.get(key2)).toBe("user-data"); }); test("should respect TTL setting", async () => { @@ -164,7 +245,12 @@ describe("CacheInterceptor", () => { cacheKey: ["test"], ttl: 1, // 1 second }; - const interceptor = new CacheInterceptor(cacheManager, config); + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); const fn = vi.fn().mockResolvedValue("result"); await interceptor.intercept(fn, context); @@ -177,29 +263,4 @@ describe("CacheInterceptor", () => { expect(fn).toHaveBeenCalledTimes(2); }); - - test("should work correctly with telemetry enabled", async () => { - // Create telemetry with traces enabled - const mockTelemetryWithTraces = createMockTelemetry(); - vi.mocked(TelemetryManager.getProvider).mockReturnValue( - mockTelemetryWithTraces as TelemetryProvider, - ); - - const telemetryProvider = - TelemetryManager.getProvider("cache-test-enabled"); - const cacheManagerWithTelemetry = new CacheManager({}, telemetryProvider); - - const config: CacheConfig = { - enabled: true, - cacheKey: ["telemetry-test"], - }; - const interceptor = new CacheInterceptor(cacheManagerWithTelemetry, config); - const fn = vi.fn().mockResolvedValue("result"); - - const result = await interceptor.intercept(fn, context); - - // Verify the cache works correctly with telemetry - expect(result).toBe("result"); - expect(fn).toHaveBeenCalledTimes(1); - }); }); diff --git a/packages/app-kit/src/plugin/tests/plugin.test.ts b/packages/app-kit/src/plugin/tests/plugin.test.ts index b4c665d..b431b20 100644 --- a/packages/app-kit/src/plugin/tests/plugin.test.ts +++ b/packages/app-kit/src/plugin/tests/plugin.test.ts @@ -17,7 +17,11 @@ import { Plugin } from "../plugin"; // Mock all dependencies vi.mock("../../app"); -vi.mock("../../cache"); +vi.mock("../../cache", () => ({ + CacheManager: { + getInstanceSync: vi.fn(), + }, +})); vi.mock("../../stream"); vi.mock("../../utils", () => ({ validateEnv: vi.fn(), @@ -152,7 +156,7 @@ describe("Plugin", () => { }; // Setup constructor mocks - vi.mocked(CacheManager).mockImplementation(() => mockCache); + vi.mocked(CacheManager.getInstanceSync).mockReturnValue(mockCache); vi.mocked(AppManager).mockImplementation(() => mockApp); vi.mocked(StreamManager).mockImplementation(() => mockStreamManager); vi.mocked(TelemetryManager.getProvider).mockReturnValue( @@ -187,7 +191,7 @@ describe("Plugin", () => { test("should initialize managers", () => { new TestPlugin(config); - expect(CacheManager).toHaveBeenCalledTimes(1); + expect(CacheManager.getInstanceSync).toHaveBeenCalledTimes(1); expect(AppManager).toHaveBeenCalledTimes(1); expect(StreamManager).toHaveBeenCalledTimes(1); }); diff --git a/packages/shared/src/cache.ts b/packages/shared/src/cache.ts index 0df05c9..485b568 100644 --- a/packages/shared/src/cache.ts +++ b/packages/shared/src/cache.ts @@ -1,6 +1,64 @@ +import type { TelemetryOptions } from "./plugin"; + +/** Cache entry interface */ +export interface CacheEntry { + value: T; + expiry: number; +} + +/** + * Cache storage interface for custom implementations + * - InMemoryStorage + * - PersistentStorage (Lakebase) + */ +export interface CacheStorage { + /** Get a cached value from the storage */ + get(key: string): Promise | null>; + /** Set a value in the storage */ + set(key: string, entry: CacheEntry): Promise; + /** Delete a value from the storage */ + delete(key: string): Promise; + /** Clear the storage */ + clear(): Promise; + /** Check if a value exists in the storage */ + has(key: string): Promise; + /** Get the size of the storage */ + size(): Promise; + /** Check if the storage is persistent */ + isPersistent(): boolean; + /** Check if the storage is healthy */ + healthCheck(): Promise; + /** Close the storage */ + close(): Promise; +} + +/** Configuration for caching */ export interface CacheConfig { + /** Whether caching is enabled */ enabled?: boolean; - ttl?: number; // time to live in seconds - maxSize?: number; // maximum number of entries in the cache + /** Time to live in seconds */ + ttl?: number; + /** Maximum number of bytes in the cache */ + maxBytes?: number; + /** Maximum number of entries in the cache */ + maxSize?: number; + /** Cache key */ cacheKey?: (string | number | object)[]; + /** Cache Storage provider instance */ + storage?: CacheStorage; + /** Whether to enforce strict persistence */ + strictPersistence?: boolean; + /** Telemetry configuration */ + telemetry?: TelemetryOptions; + + /** Probability (0-1) of triggering cleanup on each get operation */ + cleanupProbability?: number; + + /** Probability (0-1) of checking total bytes on each write operation */ + evictionCheckProbability?: number; + + /** Maximum number of bytes per entry in the cache */ + maxEntryBytes?: number; + + [key: string]: unknown; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1a0072f..f8a3172 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: lint-staged: specifier: ^15.5.1 version: 15.5.2 + pg: + specifier: ^8.16.3 + version: 8.16.3 plop: specifier: ^4.0.4 version: 4.0.4(@types/node@24.7.2) @@ -150,6 +153,9 @@ importers: express: specifier: ^4.22.0 version: 4.22.0 + pg: + specifier: ^8.16.3 + version: 8.16.3 shared: specifier: workspace:* version: link:../shared @@ -166,6 +172,9 @@ importers: '@types/express': specifier: ^4.17.25 version: 4.17.25 + '@types/pg': + specifier: ^8.15.6 + version: 8.15.6 '@types/ws': specifier: ^8.18.1 version: 8.18.1 @@ -4266,10 +4275,21 @@ packages: resolution: {integrity: sha512-//nshmD55c46FuFw26xV/xFAaB5HF9Xdap7HJBBnrKdAd6/GxDBaNA1870O79+9ueg61cZLSVc+OaFlfmObYVQ==} engines: {node: '>= 14.16'} + pg-cloudflare@1.2.7: + resolution: {integrity: sha512-YgCtzMH0ptvZJslLM1ffsY4EuGaU0cx4XSdXLRFae8bPP4dS5xL1tNB3k2o/N64cHJpwU7dxKli/nZ2lUa5fLg==} + + pg-connection-string@2.9.1: + resolution: {integrity: sha512-nkc6NpDcvPVpZXxrreI/FOtX3XemeLl8E0qFr6F2Lrm/I8WOnaWNhIPK2Z7OHpw7gh5XJThi6j6ppgNoaT1w4w==} + pg-int8@1.0.1: resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} engines: {node: '>=4.0.0'} + pg-pool@3.10.1: + resolution: {integrity: sha512-Tu8jMlcX+9d8+QVzKIvM/uJtp07PKr82IUOYEphaWcoBhIYkoHpLXN3qO59nAI11ripznDsEzEv8nUxBVWajGg==} + peerDependencies: + pg: '>=8.0' + pg-protocol@1.10.3: resolution: {integrity: sha512-6DIBgBQaTKDJyxnXaLiLR8wBpQQcGWuAESkRBX/t6OwA8YsqP+iVSiond2EDy6Y/dsGk8rh/jtax3js5NeV7JQ==} @@ -4277,6 +4297,18 @@ packages: resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==} engines: {node: '>=4'} + pg@8.16.3: + resolution: {integrity: sha512-enxc1h0jA/aq5oSDMvqyW3q89ra6XIIDZgCX9vkMrnz5DFTw/Ny3Li2lFQ+pt3L6MCgm/5o2o8HW9hiJji+xvw==} + engines: {node: '>= 16.0.0'} + peerDependencies: + pg-native: '>=3.0.1' + peerDependenciesMeta: + pg-native: + optional: true + + pgpass@1.0.5: + resolution: {integrity: sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==} + picocolors@1.1.1: resolution: {integrity: sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA==} @@ -9472,8 +9504,17 @@ snapshots: pathval@2.0.1: {} + pg-cloudflare@1.2.7: + optional: true + + pg-connection-string@2.9.1: {} + pg-int8@1.0.1: {} + pg-pool@3.10.1(pg@8.16.3): + dependencies: + pg: 8.16.3 + pg-protocol@1.10.3: {} pg-types@2.2.0: @@ -9484,6 +9525,20 @@ snapshots: postgres-date: 1.0.7 postgres-interval: 1.2.0 + pg@8.16.3: + dependencies: + pg-connection-string: 2.9.1 + pg-pool: 3.10.1(pg@8.16.3) + pg-protocol: 1.10.3 + pg-types: 2.2.0 + pgpass: 1.0.5 + optionalDependencies: + pg-cloudflare: 1.2.7 + + pgpass@1.0.5: + dependencies: + split2: 4.2.0 + picocolors@1.1.1: {} picomatch@2.3.1: {} diff --git a/principles.md b/principles.md new file mode 100644 index 0000000..34e8845 --- /dev/null +++ b/principles.md @@ -0,0 +1,23 @@ +SDK Core Principles +1. Highly Opinionated + The SDK must provide a clear path with best practices for building Databricks + applications. We provide strong defaults, with advanced customization when needed. +2. Built for Application Use Cases + This SDK is for application development, not infrastructure management. + Databricks' internal implementation details must be abstracted. We're building an + application SDK, not a service wrapper. +3. Delightful Developer Experience + Every interface, doc, example, tool, and implementation must provide developer joy. Combined with the Highly Opinionated principle, this creates a true plug-and-play experience. +4. Zero-Trust Security + Minimize exposed surface area, fail safely by default, and validate all inputs. + The SDK must always have a zero-trust mindset. +5. Optimized for Humans and AI + Developers and LLMs both use this SDK. Every API must be discoverable, + self-documenting, and inferable by both types of users. Test with both. +6. Production-Ready from Day One + Even the smallest feature can be used by enterprise users, so everything + shipped must be production-ready. Observability, reliability, and scalability + since day one. +7. Layered Extensibility +The SDK provides high-level plugins, low-level primitives, and extension points for custom plugins. It integrates into any application architecture and never blocks your path forward. +