Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ PATH
specs:
langfuse (0.1.0)
base64 (~> 0.2)
concurrent-ruby (~> 1.2)
faraday (~> 2.0)
faraday-retry (~> 2.0)
mustache (~> 1.1)
Expand All @@ -19,6 +20,7 @@ GEM
ast (2.4.3)
base64 (0.3.0)
bigdecimal (3.3.1)
concurrent-ruby (1.3.5)
crack (1.0.0)
bigdecimal
rexml
Expand Down Expand Up @@ -136,6 +138,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-24
arm64-darwin-25
x86_64-linux

Expand Down
3 changes: 3 additions & 0 deletions langfuse.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Gem::Specification.new do |spec|
spec.add_dependency "faraday-retry", "~> 2.0"
spec.add_dependency "mustache", "~> 1.1"

# Runtime dependencies - Concurrency (for SWR caching)
spec.add_dependency "concurrent-ruby", "~> 1.2"

# Runtime dependencies - OpenTelemetry (for tracing)
spec.add_dependency "opentelemetry-api", "~> 1.2"
spec.add_dependency "opentelemetry-common", "~> 0.21"
Expand Down
72 changes: 58 additions & 14 deletions lib/langfuse/api_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "json"

module Langfuse
# rubocop:disable Metrics/ClassLength
# HTTP client for Langfuse API
#
# Handles authentication, connection management, and HTTP requests
Expand Down Expand Up @@ -108,26 +109,68 @@ def get_prompt(name, version: nil, label: nil)

cache_key = PromptCache.build_key(name, version: version, label: label)

# Use distributed lock if cache supports it (Rails.cache backend)
if cache.respond_to?(:fetch_with_lock)
cache.fetch_with_lock(cache_key) do
fetch_prompt_from_api(name, version: version, label: label)
end
elsif cache
# In-memory cache - use simple get/set pattern
cached_data = cache.get(cache_key)
return cached_data if cached_data
fetch_with_appropriate_caching_strategy(cache_key, name, version, label)
end

private

prompt_data = fetch_prompt_from_api(name, version: version, label: label)
cache.set(cache_key, prompt_data)
prompt_data
# Fetch prompt using the most appropriate caching strategy available
#
# @param cache_key [String] The cache key for this prompt
# @param name [String] The name of the prompt
# @param version [Integer, nil] Optional specific version number
# @param label [String, nil] Optional label
# @return [Hash] The prompt data
def fetch_with_appropriate_caching_strategy(cache_key, name, version, label)
if swr_cache_available?
fetch_with_swr_cache(cache_key, name, version, label)
elsif distributed_cache_available?
fetch_with_distributed_cache(cache_key, name, version, label)
elsif simple_cache_available?
fetch_with_simple_cache(cache_key, name, version, label)
else
# No cache - fetch directly
fetch_prompt_from_api(name, version: version, label: label)
end
end

private
# Check if SWR cache is available
def swr_cache_available?
cache&.respond_to?(:fetch_with_stale_while_revalidate)
end

# Check if distributed cache is available
def distributed_cache_available?
cache&.respond_to?(:fetch_with_lock)
end

# Check if simple cache is available
def simple_cache_available?
!cache.nil?
end

# Fetch with SWR cache
def fetch_with_swr_cache(cache_key, name, version, label)
cache.fetch_with_stale_while_revalidate(cache_key) do
fetch_prompt_from_api(name, version: version, label: label)
end
end

# Fetch with distributed cache (Rails.cache with stampede protection)
def fetch_with_distributed_cache(cache_key, name, version, label)
cache.fetch_with_lock(cache_key) do
fetch_prompt_from_api(name, version: version, label: label)
end
end

# Fetch with simple cache (in-memory cache)
def fetch_with_simple_cache(cache_key, name, version, label)
cached_data = cache.get(cache_key)
return cached_data if cached_data

prompt_data = fetch_prompt_from_api(name, version: version, label: label)
cache.set(cache_key, prompt_data)
prompt_data
end

# Fetch a prompt from the API (without caching)
#
Expand Down Expand Up @@ -260,4 +303,5 @@ def extract_error_message(response)
response.body["message"] || response.body["error"] || "Unknown error"
end
end
# rubocop:enable Metrics/ClassLength
end
15 changes: 11 additions & 4 deletions lib/langfuse/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,22 @@ def create_cache
max_size: config.cache_max_size
)
when :rails
RailsCacheAdapter.new(
ttl: config.cache_ttl,
lock_timeout: config.cache_lock_timeout
)
create_rails_cache_adapter
else
raise ConfigurationError, "Unknown cache backend: #{config.cache_backend}"
end
end

def create_rails_cache_adapter
RailsCacheAdapter.new(
ttl: config.cache_ttl,
lock_timeout: config.cache_lock_timeout,
stale_ttl: config.cache_stale_while_revalidate ? config.cache_stale_ttl : nil,
refresh_threads: config.cache_refresh_threads,
logger: config.logger
)
end

# Build the appropriate prompt client based on prompt type
#
# @param prompt_data [Hash] The prompt data from API
Expand Down
32 changes: 32 additions & 0 deletions lib/langfuse/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ class Config
# @return [Integer] Lock timeout in seconds for distributed cache stampede protection
attr_accessor :cache_lock_timeout

# @return [Boolean] Enable stale-while-revalidate caching
attr_accessor :cache_stale_while_revalidate

# @return [Integer] Stale TTL in seconds (grace period for serving stale data)
attr_accessor :cache_stale_ttl

# @return [Integer] Number of background threads for cache refresh
attr_accessor :cache_refresh_threads

# @return [Boolean] Use async processing for traces (requires ActiveJob)
attr_accessor :tracing_async

Expand All @@ -65,6 +74,9 @@ class Config
DEFAULT_CACHE_MAX_SIZE = 1000
DEFAULT_CACHE_BACKEND = :memory
DEFAULT_CACHE_LOCK_TIMEOUT = 10
DEFAULT_CACHE_STALE_WHILE_REVALIDATE = false
DEFAULT_CACHE_STALE_TTL = 300
DEFAULT_CACHE_REFRESH_THREADS = 5
DEFAULT_TRACING_ASYNC = true
DEFAULT_BATCH_SIZE = 50
DEFAULT_FLUSH_INTERVAL = 10
Expand All @@ -83,6 +95,9 @@ def initialize
@cache_max_size = DEFAULT_CACHE_MAX_SIZE
@cache_backend = DEFAULT_CACHE_BACKEND
@cache_lock_timeout = DEFAULT_CACHE_LOCK_TIMEOUT
@cache_stale_while_revalidate = DEFAULT_CACHE_STALE_WHILE_REVALIDATE
@cache_stale_ttl = DEFAULT_CACHE_STALE_TTL
@cache_refresh_threads = DEFAULT_CACHE_REFRESH_THREADS
@tracing_async = DEFAULT_TRACING_ASYNC
@batch_size = DEFAULT_BATCH_SIZE
@flush_interval = DEFAULT_FLUSH_INTERVAL
Expand Down Expand Up @@ -110,6 +125,8 @@ def validate!
"cache_lock_timeout must be positive"
end

validate_swr_config!

validate_cache_backend!
end
# rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Expand All @@ -131,5 +148,20 @@ def validate_cache_backend!
raise ConfigurationError,
"cache_backend must be one of #{valid_backends.inspect}, got #{cache_backend.inspect}"
end

def validate_swr_config!
if cache_stale_ttl.nil? || cache_stale_ttl.negative?
raise ConfigurationError, "cache_stale_ttl must be non-negative"
end

if cache_refresh_threads.nil? || cache_refresh_threads <= 0
raise ConfigurationError, "cache_refresh_threads must be positive"
end

return unless cache_stale_while_revalidate && cache_backend != :rails

raise ConfigurationError,
"cache_stale_while_revalidate requires cache_backend to be :rails"
end
end
end
Loading