diff --git a/examples/nodejs/workflows/workflows.ts b/examples/nodejs/workflows/workflows.ts index c99897f..b257817 100644 --- a/examples/nodejs/workflows/workflows.ts +++ b/examples/nodejs/workflows/workflows.ts @@ -1,40 +1,85 @@ -// Experimental upcoming beta AI primitve. -// Please refer to the documentation for more information: https://langbase.com/docs for more information. - +// Test script for the simplified proxy approach import 'dotenv/config'; -import {Langbase, Workflow} from 'langbase'; +import {Langbase} from 'langbase'; +// Create Langbase instance const langbase = new Langbase({ apiKey: process.env.LANGBASE_API_KEY!, }); async function main() { - const {step} = new Workflow({debug: true}); - - const result = await step({ - id: 'sumamrize', - run: async () => { - return langbase.llm.run({ - model: 'openai:gpt-4o-mini', - apiKey: process.env.OPENAI_API_KEY!, - messages: [ - { - role: 'system', - content: - 'You are an expert summarizer. Summarize the user input.', - }, - { - role: 'user', - content: - 'I am testing workflows. I just created an example of summarize workflow. Can you summarize this?', - }, - ], - stream: false, - }); - }, + // Create a workflow with debug mode enabled + const workflow = langbase.workflow({ + name: 'simplified-proxy-test', + debug: true, // Enable debug logging }); - console.log(result['completion']); + try { + // STEP 1: Call langbase.agent.run but don't return its result directly + const step1Result = await workflow.step({ + id: 'call-but-return-custom', + run: async () => { + // Return custom result instead + return { + customField: 'Custom result from simplified proxy', + timestamp: new Date().toISOString(), + }; + }, + }); + + // STEP 2: Return agent.run result directly + const step2Result = await workflow.step({ + id: 'return-agent-run-directly', + run: async () => { + // Call Langbase API and return the result directly + return langbase.agent.run({ + model: 'openai:gpt-4o-mini', + apiKey: process.env.OPENAI_API_KEY!, + instructions: 'Be brief and concise.', + input: 'What is 2+2?', + stream: false, + }); + }, + }); + + // STEP 3: Make multiple Langbase calls in one step + const step3Result = await workflow.step({ + id: 'multiple-calls', + run: async () => { + // First call + const call1 = await langbase.agent.run({ + model: 'openai:gpt-4o-mini', + apiKey: process.env.OPENAI_API_KEY!, + instructions: 'Be brief.', + input: 'First proxy test', + stream: false, + }); + + // Second call with different method + const call2 = await langbase.agent.run({ + model: 'openai:gpt-4o-mini', + apiKey: process.env.OPENAI_API_KEY!, + instructions: 'Be brief.', + input: 'Second proxy test', + stream: false, + }); + + // Return combined result + return { + summary: 'Multiple calls completed with simplified proxy', + calls: 2, + firstOutput: call1.output, + secondOutput: call2.output, + }; + }, + }); + } catch (error) { + console.error('āŒ Workflow error:', error); + } finally { + // End the workflow to show trace report + workflow.end(); + } } -main(); +// Run the test +main().catch(console.error); diff --git a/packages/langbase/src/common/request.ts b/packages/langbase/src/common/request.ts index cf2d817..b6a6a90 100644 --- a/packages/langbase/src/common/request.ts +++ b/packages/langbase/src/common/request.ts @@ -62,6 +62,17 @@ export class Request { const isLllmGenerationEndpoint = GENERATION_ENDPOINTS.includes(endpoint); + // All endpoints should return headers if rawResponse is true + if (!isLllmGenerationEndpoint && options.body?.rawResponse) { + const responseData = await response.json(); + return { + ...responseData, + rawResponse: { + headers: Object.fromEntries(response.headers.entries()), + }, + } as T; + } + if (isLllmGenerationEndpoint) { const threadId = response.headers.get('lb-thread-id'); diff --git a/packages/langbase/src/langbase/langbase.ts b/packages/langbase/src/langbase/langbase.ts index 011c1a5..f58f9ed 100644 --- a/packages/langbase/src/langbase/langbase.ts +++ b/packages/langbase/src/langbase/langbase.ts @@ -1,5 +1,6 @@ import {convertDocToFormData} from '@/lib/utils/doc-to-formdata'; import {Request} from '../common/request'; +import {Workflow} from './workflows'; export type Role = 'user' | 'assistant' | 'system' | 'tool'; @@ -602,6 +603,12 @@ export class Langbase { }; }; + public workflow: (config: {debug?: boolean; name: string}) => Workflow; + + public traces: { + create: (trace: any) => Promise; + }; + constructor(options?: LangbaseOptions) { this.baseUrl = options?.baseUrl ?? 'https://api.langbase.com'; this.apiKey = options?.apiKey ?? ''; @@ -684,6 +691,12 @@ export class Langbase { this.agent = { run: this.runAgent.bind(this), }; + + this.workflow = config => new Workflow({...config, langbase: this}); + + this.traces = { + create: this.createTrace.bind(this), + }; } private async runPipe( @@ -1131,4 +1144,17 @@ export class Langbase { }, }); } + + /** + * Creates a new trace on Langbase. + * + * @param {any} trace - The trace data to send. + * @returns {Promise} A promise that resolves to the response of the trace creation. + */ + private async createTrace(trace: any): Promise { + return this.request.post({ + endpoint: '/v1/traces', + body: trace, + }); + } } diff --git a/packages/langbase/src/langbase/trace.ts b/packages/langbase/src/langbase/trace.ts new file mode 100644 index 0000000..e97f47b --- /dev/null +++ b/packages/langbase/src/langbase/trace.ts @@ -0,0 +1,133 @@ +export interface Trace { + name: string; + startTime: number; + endTime?: number; + duration?: number; + steps: StepTrace[]; + error?: string; +} + +export interface StepTrace { + name: string; + output: any; + error?: string; + traces: string[] | null; + duration: number; + startTime: number; + endTime: number; +} + +export type TraceType = + | 'workflow' + | 'agent' + | 'chunk' + | 'memory' + | 'parse' + | 'embed'; + +export type PrimitiveTrace = + | {chunk: any} + | {agent: any} + | {memory: any} + | {parse: any} + | {embed: any} + | {workflow: WorkflowTrace; entityAuthId: string}; + +type WorkflowTrace = { + createdAt: string; + id: string; + agentWorkflowId: string; + name: string; + startTime: number; + endTime?: number; + duration?: number; + steps: StepTrace[]; + error?: string; +}; + +export class TraceManager { + private traces: Map = new Map(); + + createTrace(type: TraceType, traceData: any = {}): string { + const traceId = crypto.randomUUID(); + let trace: PrimitiveTrace; + const createdAt = new Date().toISOString(); + if (type === 'workflow') { + trace = { + workflow: { + createdAt, + id: traceId, + agentWorkflowId: process.env.LANGBASE_AGENT_ID || '', + name: traceData.name || '', + startTime: Date.now(), + steps: [], + }, + entityAuthId: '', + }; + } else if (type === 'agent') { + trace = {agent: {...traceData, createdAt, id: traceId}}; + } else if (type === 'chunk') { + trace = {chunk: {...traceData, createdAt, id: traceId}}; + } else if (type === 'memory') { + trace = {memory: {...traceData, createdAt, id: traceId}}; + } else if (type === 'parse') { + trace = {parse: {...traceData, createdAt, id: traceId}}; + } else if (type === 'embed') { + trace = {embed: {...traceData, createdAt, id: traceId}}; + } else { + throw new Error('Unknown trace type'); + } + this.traces.set(traceId, trace); + return traceId; + } + + addStep(traceId: string, step: StepTrace) { + const trace = this.traces.get(traceId); + if (trace && 'workflow' in trace) { + trace.workflow.steps.push(step); + } + } + + endTrace(traceId: string) { + const trace = this.traces.get(traceId); + if (trace && 'workflow' in trace) { + trace.workflow.endTime = Date.now(); + trace.workflow.duration = + trace.workflow.endTime - trace.workflow.startTime; + } + } + + getTrace(traceId: string): PrimitiveTrace | undefined { + return this.traces.get(traceId); + } + + printTrace(traceId: string) { + const trace = this.traces.get(traceId); + if (!trace) return; + if ('workflow' in trace) { + const wf = trace.workflow; + const duration = wf.endTime + ? wf.endTime - wf.startTime + : Date.now() - wf.startTime; + console.log('\nšŸ“Š Workflow Trace:'); + console.log(`Name: ${wf.name}`); + console.log(`Duration: ${duration}ms`); + console.log(`Start Time: ${new Date(wf.startTime).toISOString()}`); + if (wf.endTime) { + console.log(`End Time: ${new Date(wf.endTime).toISOString()}`); + } + console.log('\nSteps:'); + wf.steps.forEach(step => { + console.log(`\n Step: ${step.name}`); + console.log(` Duration: ${step.duration}ms`); + if (step.traces && step.traces.length > 0) { + console.log(` Traces:`, step.traces); + } + console.log(` Output:`, step.output); + }); + } else { + console.log('\nšŸ“Š Primitive Trace:'); + console.dir(trace, {depth: 4}); + } + } +} diff --git a/packages/langbase/src/langbase/workflows.ts b/packages/langbase/src/langbase/workflows.ts index 63847c0..d48a070 100644 --- a/packages/langbase/src/langbase/workflows.ts +++ b/packages/langbase/src/langbase/workflows.ts @@ -1,3 +1,13 @@ +import {TraceManager, StepTrace} from './trace'; +import {Langbase} from './langbase'; + +// Declare the global langbase instance +declare global { + var langbase: Langbase; + var _activeTraceCollector: ((traceId: string) => void) | null; + var _workflowDebugEnabled: boolean; +} + type WorkflowContext = { outputs: Record; }; @@ -15,6 +25,12 @@ type StepConfig = { run: () => Promise; }; +type WorkflowConfig = { + debug?: boolean; + name: string; + langbase: Langbase; +}; + class TimeoutError extends Error { constructor(stepId: string, timeout: number) { super(`Step "${stepId}" timed out after ${timeout}ms`); @@ -22,18 +38,196 @@ class TimeoutError extends Error { } } +// Setup the global trace collector for cross-instance communication +if (typeof global._activeTraceCollector === 'undefined') { + global._activeTraceCollector = null; +} + +// For debug flag +if (typeof global._workflowDebugEnabled === 'undefined') { + global._workflowDebugEnabled = false; +} + export class Workflow { private context: WorkflowContext; private debug: boolean; + private name: string; + private traceManager: TraceManager; + private traceId: string; + private langbase: Langbase; + + private originalMethods: Map = new Map(); public readonly step: (config: StepConfig) => Promise; - constructor(config: {debug?: boolean} = {debug: false}) { + constructor(config: WorkflowConfig) { this.context = {outputs: {}}; this.debug = config.debug ?? false; + this.name = config.name; + this.langbase = config.langbase; + this.traceManager = new TraceManager(); + this.traceId = this.traceManager.createTrace('workflow', { + name: this.name, + }); this.step = this._step.bind(this); + + // Set global debug flag + global._workflowDebugEnabled = this.debug; + } + + /** + * Replace a method in the Langbase instance with a traced version + */ + private interceptMethod(obj: any, method: string, path: string = ''): void { + if (!obj || typeof obj[method] !== 'function') return; + + const fullPath = path ? `${path}.${method}` : method; + const originalMethod = obj[method]; + + // Only replace if not already intercepted + if (!this.originalMethods.has(fullPath)) { + this.originalMethods.set(fullPath, originalMethod); + + const debug = this.debug; + + // Replace with intercepted version + obj[method] = async function (...args: any[]) { + // Add custom arguments for tracing + // Add rawResponse to the options if it's an object + const lastArg = args[args.length - 1]; + const newArgs = [...args]; + + if (lastArg && typeof lastArg === 'object') { + newArgs[newArgs.length - 1] = { + ...lastArg, + rawResponse: true, + }; + } + // Append a new object if the last argument is not an object + else { + newArgs.push({rawResponse: true}); + } + + const result = await originalMethod.apply(this, newArgs); + console.log(`šŸ”„ Intercepted method: ${fullPath}`, result); + + // Process result for tracing if we have an active collector + if (global._activeTraceCollector) { + // Extract or create traceId + let traceId: string | undefined; + + // Check if result is an object with response headers + if (result && typeof result === 'object') { + // Extract from response headers + if ('rawResponse' in result && result.rawResponse) { + // Check for lb-trace-id in headers + if (result.rawResponse.headers['lb-trace-id']) { + // Plain object headers + traceId = + result.rawResponse.headers['lb-trace-id']; + } + } + + // Notify collector if traceId was found + if (traceId && global._activeTraceCollector) { + if (debug) + console.log( + `šŸ” Trace ID extracted: ${traceId}`, + ); + global._activeTraceCollector(traceId); + } + } + } + + return result; + }; + } + } + + /** + * Restore all original methods that were intercepted + */ + private restoreOriginalMethods(): void { + this.originalMethods.forEach((originalMethod, path) => { + // Parse the path to find the object and method + const parts = path.split('.'); + const methodName = parts.pop()!; + let obj: any = this.langbase; + + // Navigate to the correct object + for (const part of parts) { + if (obj && typeof obj === 'object' && part in obj) { + obj = obj[part as keyof typeof obj]; // Type safe access + } else { + return; // Skip if path no longer exists + } + } + + // Restore original method + if ( + obj && + methodName in obj && + typeof obj[methodName] === 'function' + ) { + obj[methodName] = originalMethod; + } + }); + + // Clear the map + this.originalMethods.clear(); + } + + /** + * Intercept all important methods in the Langbase instance + */ + private setupMethodInterceptors(): void { + // Agent methods + this.interceptMethod(this.langbase.agent, 'run', 'agent'); + + // Pipes methods + this.interceptMethod(this.langbase.pipes, 'run', 'pipes'); + this.interceptMethod(this.langbase.pipe, 'run', 'pipe'); + + // Memory methods + if (this.langbase.memories) { + this.interceptMethod( + this.langbase.memories, + 'retrieve', + 'memories', + ); + } + if (this.langbase.memory) { + this.interceptMethod(this.langbase.memory, 'retrieve', 'memory'); + } + + // Tool methods + if (this.langbase.tools) { + this.interceptMethod(this.langbase.tools, 'webSearch', 'tools'); + this.interceptMethod(this.langbase.tools, 'crawl', 'tools'); + } + if (this.langbase.tool) { + this.interceptMethod(this.langbase.tool, 'webSearch', 'tool'); + this.interceptMethod(this.langbase.tool, 'crawl', 'tool'); + } + + // Top-level methods + this.interceptMethod(this.langbase, 'embed'); + this.interceptMethod(this.langbase, 'chunk'); + this.interceptMethod(this.langbase, 'parse'); } private async _step(config: StepConfig): Promise { + const stepStartTime = Date.now(); + // Initialize an array to collect trace IDs + const stepTraces: string[] = []; + + // Function to collect trace IDs + const collectTrace = (traceId: string) => { + if (this.debug) { + console.log(`šŸ“‹ Collected trace ID: ${traceId}`); + } + stepTraces.push(traceId); + }; + if (this.debug) { console.log(`\nšŸ”„ Starting step: ${config.id}`); console.time(`ā±ļø Step ${config.id}`); @@ -48,60 +242,103 @@ export class Workflow { ? config.retries.limit + 1 : 1; - while (attempt <= maxAttempts) { - try { - let stepPromise = config.run(); + // Set up method interceptors before running the step + this.setupMethodInterceptors(); - if (config.timeout) { - stepPromise = this.withTimeout({ - promise: stepPromise, - timeout: config.timeout, - stepId: config.id, - }); + // Set the global active trace collector + const previousTraceCollector = global._activeTraceCollector; + global._activeTraceCollector = collectTrace; + + try { + // Execute the step function directly + let stepPromise: Promise = config.run(); + + // Apply timeout if configured + if (config.timeout) { + stepPromise = this.withTimeout({ + promise: stepPromise, + timeout: config.timeout, + stepId: config.id, + }); + } + + // Wait for the step to complete + const result = await stepPromise; + + // Store step result in context + this.context.outputs[config.id] = result; + + if (this.debug) { + console.timeEnd(`ā±ļø Step ${config.id}`); + console.log(`šŸ“¤ Output:`, result); + + if (stepTraces.length > 0) { + console.log( + `šŸ“‹ Trace IDs (${stepTraces.length}):`, + stepTraces, + ); + } else { + console.log(`šŸ” No trace IDs captured for this step`); } + } + + // Create step trace + const stepEndTime = Date.now(); + const stepTrace: StepTrace = { + name: config.id, + output: result, + traces: stepTraces.length > 0 ? stepTraces : null, + duration: stepEndTime - stepStartTime, + startTime: stepStartTime, + endTime: stepEndTime, + }; - const result = await stepPromise; - this.context.outputs[config.id] = result; + // Add step to trace manager + this.traceManager.addStep(this.traceId, stepTrace); + + // Restore original methods and trace collector + this.restoreOriginalMethods(); + global._activeTraceCollector = previousTraceCollector; + + return result; + } catch (error) { + // Restore original methods and trace collector on error + this.restoreOriginalMethods(); + global._activeTraceCollector = previousTraceCollector; + + // Store error for potential retry or final throw + lastError = error as Error; + + // If retries are configured, try again + if (attempt < maxAttempts) { + const delay = config.retries + ? this.calculateDelay( + config.retries.delay, + attempt, + config.retries.backoff, + ) + : 0; if (this.debug) { - console.timeEnd(`ā±ļø Step ${config.id}`); - console.log(`šŸ“¤ Output:`, result); - console.log(`āœ… Completed step: ${config.id}\n`); + console.log( + `āš ļø Attempt ${attempt} failed, retrying in ${delay}ms...`, + ); + console.error(error); } - return result; - } catch (error) { - lastError = error as Error; - - if (attempt < maxAttempts) { - const delay = config.retries - ? this.calculateDelay( - config.retries.delay, - attempt, - config.retries.backoff, - ) - : 0; - - if (this.debug) { - console.log( - `āš ļø Attempt ${attempt} failed, retrying in ${delay}ms...`, - ); - console.error(error); - } + await this.sleep(delay); + attempt++; - await this.sleep(delay); - attempt++; - } else { - if (this.debug) { - console.timeEnd(`ā±ļø Step ${config.id}`); - console.error(`āŒ Failed step: ${config.id}`, error); - } - throw lastError; + // Try again with the next attempt + return this._step(config); + } else { + if (this.debug) { + console.timeEnd(`ā±ļø Step ${config.id}`); + console.error(`āŒ Failed step: ${config.id}`, error); } + throw lastError; } } - - throw lastError; } private async withTimeout({ @@ -141,4 +378,32 @@ export class Workflow { private async sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } + + public async end(): Promise { + // Finalise and grab the trace + this.traceManager.endTrace(this.traceId); + this.traceManager.printTrace(this.traceId); + const traceData = this.traceManager.getTrace(this.traceId); + + // --- send to LB API v1/traces/create using SDK method --- + try { + const res = await this.langbase.traces.create(traceData); + + if (!res || res.error) { + console.error( + `āŒ Trace upload failed: ${res?.status || ''} ${res?.statusText || ''}`, + ); + } else if (this.debug) { + console.log(`āœ… Trace ${this.traceId} sent to collector`); + } + } catch (err) { + console.error('āŒ Error while sending trace', err); + } + // ------------------------------------------------------------------------- + + if (this.debug) { + console.log('\nšŸ” DEBUG: Final trace data:'); + console.log(JSON.stringify(traceData, null, 2)); + } + } }