diff --git a/examples/nodejs/examples/workflows/workflows.ts b/examples/nodejs/examples/workflows/workflows.ts new file mode 100644 index 0000000..4777b16 --- /dev/null +++ b/examples/nodejs/examples/workflows/workflows.ts @@ -0,0 +1,37 @@ +import 'dotenv/config'; +import {Langbase, Workflow} from 'langbase'; + +const langbase = new Langbase({ + apiKey: process.env.LANGBASE_API_KEY!, +}); + +async function main() { + const {step} = new Workflow({debug: true}); + + const result = await step({ + id: 'sumamrize', + run: async () => { + return langbase.llm.run({ + model: 'openai:gpt-4o-mini', + apiKey: process.env.OPENAI_API_KEY!, + messages: [ + { + role: 'system', + content: + 'You are an expert summarizer. Summarize the user input.', + }, + { + role: 'user', + content: + 'I am testing workflows. I just created an example of summarize workflow. Can you summarize this?', + }, + ], + stream: false, + }); + }, + }); + + console.log(result['completion']); +} + +main(); diff --git a/packages/langbase/src/index.ts b/packages/langbase/src/index.ts index a0b85d3..d583556 100644 --- a/packages/langbase/src/index.ts +++ b/packages/langbase/src/index.ts @@ -1,3 +1,4 @@ export * from './langbase/langbase'; export * from './pipes/pipes'; -export * from './lib/helpers' +export * from './lib/helpers'; +export * from './langbase/workflows'; diff --git a/packages/langbase/src/langbase/workflows.ts b/packages/langbase/src/langbase/workflows.ts new file mode 100644 index 0000000..63847c0 --- /dev/null +++ b/packages/langbase/src/langbase/workflows.ts @@ -0,0 +1,144 @@ +type WorkflowContext = { + outputs: Record; +}; + +type RetryConfig = { + limit: number; + delay: number; + backoff: 'exponential' | 'linear' | 'fixed'; +}; + +type StepConfig = { + id: string; + timeout?: number; + retries?: RetryConfig; + run: () => Promise; +}; + +class TimeoutError extends Error { + constructor(stepId: string, timeout: number) { + super(`Step "${stepId}" timed out after ${timeout}ms`); + this.name = 'TimeoutError'; + } +} + +export class Workflow { + private context: WorkflowContext; + private debug: boolean; + public readonly step: (config: StepConfig) => Promise; + + constructor(config: {debug?: boolean} = {debug: false}) { + this.context = {outputs: {}}; + this.debug = config.debug ?? false; + this.step = this._step.bind(this); + } + + private async _step(config: StepConfig): Promise { + if (this.debug) { + console.log(`\nšŸ”„ Starting step: ${config.id}`); + console.time(`ā±ļø Step ${config.id}`); + if (config.timeout) console.log(`ā³ Timeout: ${config.timeout}ms`); + if (config.retries) + console.log(`šŸ”„ Retries: ${JSON.stringify(config.retries)}`); + } + + let lastError: Error | null = null; + let attempt = 1; + const maxAttempts = config.retries?.limit + ? config.retries.limit + 1 + : 1; + + while (attempt <= maxAttempts) { + try { + let stepPromise = config.run(); + + if (config.timeout) { + stepPromise = this.withTimeout({ + promise: stepPromise, + timeout: config.timeout, + stepId: config.id, + }); + } + + const result = await stepPromise; + this.context.outputs[config.id] = result; + + if (this.debug) { + console.timeEnd(`ā±ļø Step ${config.id}`); + console.log(`šŸ“¤ Output:`, result); + console.log(`āœ… Completed step: ${config.id}\n`); + } + + return result; + } catch (error) { + lastError = error as Error; + + if (attempt < maxAttempts) { + const delay = config.retries + ? this.calculateDelay( + config.retries.delay, + attempt, + config.retries.backoff, + ) + : 0; + + if (this.debug) { + console.log( + `āš ļø Attempt ${attempt} failed, retrying in ${delay}ms...`, + ); + console.error(error); + } + + await this.sleep(delay); + attempt++; + } else { + if (this.debug) { + console.timeEnd(`ā±ļø Step ${config.id}`); + console.error(`āŒ Failed step: ${config.id}`, error); + } + throw lastError; + } + } + } + + throw lastError; + } + + private async withTimeout({ + promise, + timeout, + stepId, + }: { + promise: Promise; + timeout: number; + stepId: string; + }): Promise { + const timeoutPromise = new Promise((_, reject) => { + setTimeout( + () => reject(new TimeoutError(stepId, timeout)), + timeout, + ); + }); + return Promise.race([promise, timeoutPromise]); + } + + private calculateDelay( + baseDelay: number, + attempt: number, + backoff: RetryConfig['backoff'], + ): number { + switch (backoff) { + case 'exponential': + return baseDelay * Math.pow(2, attempt - 1); + case 'linear': + return baseDelay * attempt; + case 'fixed': + default: + return baseDelay; + } + } + + private async sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } +}