Skip to content
Merged
132 changes: 132 additions & 0 deletions CircuitBreaker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Circuit Breaker implementation with configurable failure thresholds and cooldown periods
*/
class CircuitBreaker {
static #instances = new Map(); // bucketId -> instance

constructor({
failureThreshold = 5,
cooldownPeriod = 30000,
name = 'default'
}) {
this.failureThreshold = failureThreshold;
this.cooldownPeriod = cooldownPeriod;
this.name = name;

// State
this.failCount = 0;
this.isOpen = false;
this.openedAt = null;
this.lastFailureTime = null;
}

/**
* Get or create a circuit breaker instance for the given bucketId
* @param {string} bucketId - The service identifier
* @param {Object} config - Circuit breaker configuration
* @returns {CircuitBreaker} - The circuit breaker instance
*/
static getInstance(bucketId, config) {
if (!this.#instances.has(bucketId)) {
this.#instances.set(bucketId, new CircuitBreaker({
...config,
name: `CircuitBreaker-${bucketId}`
}));
}
return this.#instances.get(bucketId);
}

/**
* Clear a circuit breaker instance for the given bucketId
* @param {string} bucketId - The service identifier
*/
static clear(bucketId) {
this.#instances.delete(bucketId);
}

/**
* Check if the circuit breaker is open, reset if cooldown period has expired
* @returns {boolean} - True if circuit is open, false if closed
*/
isCircuitOpen() {
if (!this.isOpen) return false;

// Check if cooldown period has expired
if (Date.now() - this.openedAt > this.cooldownPeriod) {
this._reset();
return false;
}

return true;
}

/**
* Record a successful operation
*/
recordSuccess() {
this._reset();
}

/**
* Record a failed operation
*/
recordFailure() {
this.failCount++;
this.lastFailureTime = Date.now();

if (this.failCount >= this.failureThreshold) {
this._open();
}
}

/**
* Get current circuit breaker status
* @returns {Object} - Status information
*/
getStatus() {
return {
isOpen: this.isCircuitOpen(),
failCount: this.failCount,
failureThreshold: this.failureThreshold,
cooldownRemaining: this.isOpen ?
Math.max(0, this.cooldownPeriod - (Date.now() - this.openedAt)) : 0,
lastFailureTime: this.lastFailureTime,
name: this.name
};
}

/**
* Manually open the circuit breaker
*/
forceOpen() {
this._open();
}

/**
* Manually close the circuit breaker
*/
forceClose() {
this._reset();
}

/**
* Reset circuit breaker to closed state
* @private
*/
_reset() {
this.isOpen = false;
this.failCount = 0;
this.openedAt = null;
}

/**
* Open the circuit breaker
* @private
*/
_open() {
this.isOpen = true;
this.openedAt = Date.now();
}
}

export default CircuitBreaker;
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ResilientLLM

A robust LLM integration layer designed to ensure reliable, seamless interactions across multiple APIs by intelligently handling failures and rate limits.
A simple but robust LLM integration layer designed to ensure reliable, seamless interactions across multiple APIs by intelligently handling failures and rate limits.

## Motivation

Expand Down Expand Up @@ -34,7 +34,9 @@ const llm = new ResilientLLM({
rateLimitConfig: {
requestsPerMinute: 60, // Limit to 60 requests per minute
llmTokensPerMinute: 90000 // Limit to 90,000 LLM tokens per minute
}
},
retries: 3,
backoffFactor: 2
});
const conversationHistory = [
Expand Down
52 changes: 44 additions & 8 deletions RateLimitManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import TokenBucket from './TokenBucket.js';
import { sleep } from './Utility.js';

class RateLimitManager {
static #instances = new Map(); // bucketId -> instance

/**
* @param {Object} config
* @param {number} config.requestsPerMinute - Max requests per minute
Expand All @@ -12,15 +15,52 @@ class RateLimitManager {
// llmTokenBucket: limits number of LLM text tokens per minute
this.llmTokenBucket = new TokenBucket(llmTokensPerMinute, llmTokensPerMinute / 60); // refill per second
}

/**
* Get or create a rate limit manager instance for the given bucketId
* @param {string} bucketId - The service identifier
* @param {Object} config - Rate limit configuration
* @returns {RateLimitManager} - The rate limit manager instance
*/
static getInstance(bucketId, config) {
if (!this.#instances.has(bucketId)) {
this.#instances.set(bucketId, new RateLimitManager(config));
}
return this.#instances.get(bucketId);
}

/**
* Clear a rate limit manager instance for the given bucketId
* @param {string} bucketId - The service identifier
*/
static clear(bucketId) {
this.#instances.delete(bucketId);
}

/**
* Attempt to acquire a request slot and the required number of LLM tokens.
* Waits until both are available.
* @param {number} llmTokenCount
*/
async acquire(llmTokenCount = 1) {
while (!(this.requestBucket.tryRemoveToken() && this.llmTokenBucket.tryRemoveToken(llmTokenCount))) {
await this._sleep(100);
async acquire(llmTokenCount = 1, abortSignal) {
// Check abort signal before entering loop
if (abortSignal?.aborted) {
const error = new Error(abortSignal.reason || 'Operation was aborted');
error.name = 'AbortError';
throw error;
}

console.log('Awaiting rate limit...');
while (!abortSignal?.aborted && !(this.requestBucket.tryRemoveToken() && this.llmTokenBucket.tryRemoveToken(llmTokenCount))) {
await sleep(100, abortSignal);
}
console.log('Wait for rate limit complete...');

// Final check after loop - if aborted during sleep, throw error
if (abortSignal?.aborted) {
const error = new Error(abortSignal.reason || 'Operation was aborted');
error.name = 'AbortError';
throw error;
}
}

Expand Down Expand Up @@ -55,11 +95,7 @@ class RateLimitManager {
if (info.llmTokensPerMinute) {
this.llmTokenBucket.update({ capacity: info.llmTokensPerMinute, refillRate: info.llmTokensPerMinute / 60 });
}
}

_sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
}

export default RateLimitManager;
54 changes: 36 additions & 18 deletions ResilientLLM.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* const llm = new LLM({ aiService: "anthropic", model: "claude-3-5-sonnet-20240620", maxTokens: 2048, temperature: 0 });
* const response = await llm.chat([{ role: "user", content: "Hello, world!" }]);
* console.log(response);
* // You may cancel all llm operations (for the given instance) by calling abort() method on the ResilientLLM instance
* llm.abort();
*/
import { Tiktoken } from "js-tiktoken/lite";
import o200k_base from "js-tiktoken/ranks/o200k_base";
Expand All @@ -12,10 +14,10 @@ import ResilientOperation from "./ResilientOperation.js";
class ResilientLLM {
static encoder;
static DEFAULT_MODELS = {
anthropic: "claude-3-5-sonnet-20240620",
anthropic: "claude-3-5-sonnet-20240620",
openai: "gpt-4o-mini",
gemini: "gemini-2.0-flash",
ollama: "openai"
ollama: "llama3.1:8b"
}

constructor(options) {
Expand All @@ -29,16 +31,11 @@ class ResilientLLM {
this.topP = options?.topP || process.env.AI_TOP_P || 0.95;
// Add rate limit config options if provided
this.rateLimitConfig = options?.rateLimitConfig || { requestsPerMinute: 10, llmTokensPerMinute: 150000 };
// Instantiate ResilientOperation for LLM calls
this.resilientOperation = new ResilientOperation({
bucketId: this.aiService,
rateLimitConfig: this.rateLimitConfig,
retries: options?.retries || 3,
timeout: this.timeout,
backoffFactor: options?.backoffFactor || 2,
onRateLimitUpdate: options?.onRateLimitUpdate,
cacheStore: this.cacheStore
});
this.retries = options?.retries || 3;
this.backoffFactor = options?.backoffFactor || 2;
this.onRateLimitUpdate = options?.onRateLimitUpdate;
this._abortController = null;
this.resilientOperations = {}; // Store resilient operation instances for observability
}

getApiUrl(aiService) {
Expand Down Expand Up @@ -159,11 +156,25 @@ class ResilientLLM {
throw new Error('Invalid provider specified. Use "anthropic" or "openai" or "gemini" or "ollama".');
}
try{
// Instantiate ResilientOperation for LLM calls
const resilientOperation = new ResilientOperation({
bucketId: this.aiService,
rateLimitConfig: this.rateLimitConfig,
retries: this.retries,
timeout: this.timeout,
backoffFactor: this.backoffFactor,
onRateLimitUpdate: this.onRateLimitUpdate,
cacheStore: this.cacheStore
});
// Use single instance of abort controller for all operations
this._abortController = this._abortController || new AbortController();
this.resilientOperations[resilientOperation.id] = resilientOperation;
// Wrap the LLM API call in ResilientOperation for rate limiting, retries, etc.
const { data, statusCode } = await this.resilientOperation
const { data, statusCode } = await resilientOperation
.withTokens(estimatedLLMTokens)
.withCache()
.execute(this._makeHttpRequest, apiUrl, requestBody, headers);
.withAbortControl(this._abortController)
.execute(this._makeHttpRequest, apiUrl, requestBody, headers, this._abortController.signal);
/**
* OpenAI chat completion response
* {
Expand Down Expand Up @@ -223,6 +234,7 @@ class ResilientLLM {
content = this.parseOllamaChatCompletion(data, llmOptions?.tools);
break;
}
delete this.resilientOperations[resilientOperation.id];
return content;
} catch (error) {
console.error(`Error calling ${aiService} API:`, error);
Expand Down Expand Up @@ -256,6 +268,8 @@ class ResilientLLM {
* @returns {Promise<{data: any, statusCode: number}>}
*/
async _makeHttpRequest(apiUrl, requestBody, headers, abortSignal) {
console.log("Making HTTP request to:", apiUrl);
console.log("You may cancel it by calling abort() method on the ResilientLLM instance");
const startTime = Date.now();

try {
Expand Down Expand Up @@ -291,7 +305,8 @@ class ResilientLLM {

/**
* Parse errors from various LLM APIs to create uniform error communication
* @param {*} error
* @param {number|null} statusCode - HTTP status code or null for general errors
* @param {Error|Object|null} error - Error object
* @reference https://platform.openai.com/docs/guides/error-codes/api-error-codes
* @reference https://docs.anthropic.com/en/api/errors
*/
Expand All @@ -305,8 +320,6 @@ class ResilientLLM {
throw new Error(error?.message || "Invalid API Key");
case 403:
throw new Error(error?.message || "You are not authorized to access this resource");
case 400:
throw new Error(error?.message || "Bad request");
case 429:
throw new Error(error?.message || "Rate limit exceeded");
case 404:
Expand Down Expand Up @@ -380,7 +393,12 @@ class ResilientLLM {
return data?.choices?.[0]?.message?.content;
}


abort(){
this._abortController?.abort();
this._abortController = null;
this.resilientOperations = {};
this._abortController = null;
}

/**
* Estimate the number of tokens in a text
Expand Down
Loading