Skip to content

[FOUND-114] Reduce GVL contention#23

Open
mattwd7 wants to merge 5 commits intobraze-mainfrom
FOUND-114-reduce-gvl-contention
Open

[FOUND-114] Reduce GVL contention#23
mattwd7 wants to merge 5 commits intobraze-mainfrom
FOUND-114-reduce-gvl-contention

Conversation

@mattwd7
Copy link

@mattwd7 mattwd7 commented Feb 13, 2026

This PR reduces Global VM Lock (GVL) contention in the Dalli memcached client by switching socket operations to non-blocking I/O with explicit IO.select waits. When the socket would have previously blocked, the thread now yields the GVL instead of holding it across blocking syscalls, improving concurrency under load.

More details involving the changes, motivation, and benchmarking results provided by @radixdev - https://gist.github.com/radixdev/d601cf8a9676d10774463b63748fffe6

benchmark_async.rb
#!/usr/bin/env ruby
# frozen_string_literal: true

# Benchmark: Blocking vs Async IO for GVL contention
#
# Includes a forked TCP latency proxy to simulate real network conditions.
# The proxy runs in a separate process (separate GVL) so it does not
# interfere with the benchmark measurements.
#
# Usage: bundle exec ruby test/benchmark_async.rb

require 'bundler/setup'
require 'benchmark'
require 'timeout'
require 'socket'

$TESTING = true
require_relative 'memcached_mock'
require 'dalli'
require 'logger'

Dalli.logger = Logger.new(STDOUT)
Dalli.logger.level = Logger::ERROR

include MemcachedMock::Helper

MEMCACHED_PORT = 11_277
PROXY_PORT     = 11_278

# ---- TCP latency proxy (runs in a forked child — separate GVL) ----

def start_latency_proxy(listen_port, target_host, target_port, one_way_latency_s)
  pid = fork do
    trap("TERM") { exit! }
    trap("INT")  { exit! }

    server = TCPServer.new('127.0.0.1', listen_port)
    server.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true)

    loop do
      client = begin
        server.accept
      rescue IOError, Errno::EBADF
        break
      end

      # Each connection gets two relay threads: client→target and target→client
      Thread.new(client) do |c|
        target = TCPSocket.new(target_host, target_port)
        target.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)

        relay = ->(src, dst) do
          Thread.new do
            begin
              buf = String.new(capacity: 65_536)
              loop do
                src.readpartial(65_536, buf)
                sleep one_way_latency_s if one_way_latency_s > 0
                dst.write(buf)
              end
            rescue EOFError, IOError, Errno::ECONNRESET, Errno::EPIPE
            ensure
              dst.close_write rescue nil
            end
          end
        end

        t1 = relay.call(c, target)
        t2 = relay.call(target, c)
        t1.join
        t2.join
      rescue => e
        # connection error, ignore
      ensure
        c.close rescue nil
        target.close rescue nil
      end
    end
  end

  sleep 0.15 # let proxy bind
  pid
end

def stop_proxy(pid)
  return unless pid
  Process.kill("TERM", pid)
  Process.wait(pid)
rescue Errno::ECHILD, Errno::ESRCH
end

# ---- Define blocking (old) socket classes for A/B comparison ----

module Dalli
  module Socket
    class BlockingTCP < ::TCPSocket
      include Dalli::Socket::InstanceMethods
      attr_accessor :options, :server

      def self.open(host, port, server, options = {})
        Timeout.timeout(options[:socket_timeout]) do
          sock = new(host, port)
          sock.options = { host: host, port: port }.merge(options)
          sock.server = server
          sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, true)
          sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) if options[:keepalive]
          sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVBUF, options[:rcvbuf]) if options[:rcvbuf]
          sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF, options[:sndbuf]) if options[:sndbuf]
          sock
        end
      end

      # Blocking write (old behavior)
      def writefull(bytes)
        write(bytes)
      end
    end
  end
end

# Stash references to both implementations
ASYNC_TCP    = Dalli::Socket::TCP
BLOCKING_TCP = Dalli::Socket::BlockingTCP

def use_blocking_io!
  Dalli::Socket.send(:remove_const, :TCP)
  Dalli::Socket.const_set(:TCP, BLOCKING_TCP)
end

def use_async_io!
  Dalli::Socket.send(:remove_const, :TCP)
  Dalli::Socket.const_set(:TCP, ASYNC_TCP)
end

# ---- Benchmark helpers ----

def run_throughput(thread_count, ops_per_thread, servers, value)
  ready = Thread::Queue.new
  go = Thread::Queue.new

  threads = thread_count.times.map do |t|
    Thread.new do
      client = Dalli::Client.new(servers)
      ready.push(true)
      go.pop

      ops_per_thread.times do |i|
        key = "bench-#{t}-#{i % 200}"
        client.set(key, value)
        client.get(key)
      end
      client.close
    rescue => e
      $stderr.puts "  !! Thread #{t} error: #{e.class}: #{e.message}"
    end
  end

  thread_count.times { ready.pop }
  thread_count.times { go.push(:go) }
  threads.each(&:join)
end

def run_connect_disconnect(iterations, servers)
  iterations.times do
    client = Dalli::Client.new(servers)
    client.set("ping", "pong")
    client.close
  end
end

ROUNDS = 3

def best_of(rounds)
  rounds.times.map { yield }.min
end

def fib(n)
  n < 2 ? n : fib(n - 1) + fib(n - 2)
end

def run_io_vs_cpu(thread_count, ops_per_thread, servers, value)
  cpu_elapsed = nil

  wall = Benchmark.realtime do
    io_threads = thread_count.times.map do |t|
      Thread.new do
        client = Dalli::Client.new(servers)
        ops_per_thread.times do |i|
          key = "bench-#{t}-#{i % 200}"
          client.set(key, value)
          client.get(key)
        end
        client.close
      end
    end

    cpu_thread = Thread.new do
      t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      fib(35)
      cpu_elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
    end

    io_threads.each(&:join)
    cpu_thread.join
  end
  [wall, cpu_elapsed]
end

def run_benchmark_suite(servers, label_prefix, thread_counts, ops_per_thread)
  # ----------------------------------------------------------
  # A) Connection churn
  # ----------------------------------------------------------
  connect_iters = 300

  puts "-" * 68
  puts "  #{label_prefix}A: Connect + single op + close (#{connect_iters} iters, best of #{ROUNDS})"
  puts "-" * 68
  puts

  results = {}
  [[:blocking, method(:use_blocking_io!)],
   [:async,    method(:use_async_io!)]].each do |mode, switcher|
    switcher.call
    run_connect_disconnect(10, servers)
    GC.start
    results[mode] = best_of(ROUNDS) { Benchmark.realtime { run_connect_disconnect(connect_iters, servers) } }
  end

  printf "  Blocking: %8.3f s  (%6.1f connects/s)\n", results[:blocking], connect_iters / results[:blocking]
  printf "  Async:    %8.3f s  (%6.1f connects/s)\n", results[:async], connect_iters / results[:async]
  printf "  Speedup:  %.2fx\n", results[:blocking] / results[:async]
  puts

  # ----------------------------------------------------------
  # B) Threaded throughput
  # ----------------------------------------------------------
  [
    ["100 B values",  "x" * 100],
    ["10 KB values",  "x" * 10_000],
  ].each do |val_label, value|
    puts "-" * 68
    puts "  #{label_prefix}B: Throughput — #{val_label}"
    puts "  #{ops_per_thread} set+get ops/thread, best of #{ROUNDS}"
    puts "-" * 68
    puts
    printf "  %-8s | %12s | %12s | %8s\n", "Threads", "Blocking", "Async", "Speedup"
    puts "  " + "-" * 56

    thread_counts.each do |tc|
      times = {}
      [[:blocking, method(:use_blocking_io!)],
       [:async,    method(:use_async_io!)]].each do |mode, switcher|
        switcher.call
        run_throughput(tc, [50, ops_per_thread / 10].min, servers, value)
        GC.start
        times[mode] = best_of(ROUNDS) { Benchmark.realtime { run_throughput(tc, ops_per_thread, servers, value) } }
      end

      speedup = times[:blocking] / times[:async]
      printf "  %-8d | %9.3f s  | %9.3f s  | %6.2fx\n",
             tc, times[:blocking], times[:async], speedup
    end
    puts
  end

  # ----------------------------------------------------------
  # C) IO + CPU contention
  # ----------------------------------------------------------
  cpu_ops = [ops_per_thread, 1000].min
  value = "x" * 1_000

  puts "-" * 68
  puts "  #{label_prefix}C: 4 IO threads (#{cpu_ops} ops) + 1 CPU thread fib(35)"
  puts "  best of #{ROUNDS}"
  puts "-" * 68
  puts

  [[:blocking, method(:use_blocking_io!)],
   [:async,    method(:use_async_io!)]].each do |mode, switcher|
    switcher.call
    run_io_vs_cpu(4, 20, servers, value)
    GC.start

    best_wall = Float::INFINITY
    best_cpu  = Float::INFINITY
    ROUNDS.times do
      w, c = run_io_vs_cpu(4, cpu_ops, servers, value)
      best_wall = w if w < best_wall
      best_cpu  = c if c < best_cpu
    end

    printf "  %-10s — wall: %7.3fs, CPU fib(35): %7.3fs\n", mode, best_wall, best_cpu
  end
  puts
end

# ========================================================================
# Main
# ========================================================================

puts "=" * 68
puts "  GVL Contention Benchmark: Blocking vs Async IO"
puts "=" * 68
puts
puts "  Ruby:  #{RUBY_DESCRIPTION}"
puts "  Dalli: #{Dalli::VERSION}"
puts

# Start memcached
memcached_server(MEMCACHED_PORT)

# Quick sanity check
use_async_io!
c = Dalli::Client.new(["127.0.0.1:#{MEMCACHED_PORT}"])
c.set("warmup", "ok")
abort "ERROR: Cannot reach memcached on port #{MEMCACHED_PORT}" unless c.get("warmup") == "ok"
c.close

# ================================================================
# Part 1: No latency (localhost direct)
# ================================================================
puts "#" * 68
puts "  PART 1: Localhost (no added latency)"
puts "#" * 68
puts

direct_servers = ["127.0.0.1:#{MEMCACHED_PORT}"]
run_benchmark_suite(direct_servers, "1", [1, 4, 8, 16], 2000)

# ================================================================
# Part 2: With simulated latency via TCP proxy
# ================================================================
latencies = {
  "0.5ms" => 0.0005,   # same-rack / fast LAN
  "2ms"   => 0.002,    # same-datacenter
  "5ms"   => 0.005,    # cross-datacenter
}

latencies.each_with_index do |(label, one_way), idx|
  rtt_ms = (one_way * 2 * 1000).round(1)
  proxy_port = PROXY_PORT + idx

  puts "#" * 68
  puts "  PART #{idx + 2}: Simulated #{label} one-way latency (#{rtt_ms}ms RTT)"
  puts "#" * 68
  puts

  proxy_pid = start_latency_proxy(proxy_port, '127.0.0.1', MEMCACHED_PORT, one_way)

  proxy_servers = ["127.0.0.1:#{proxy_port}"]

  # Verify proxy works
  use_async_io!
  pc = Dalli::Client.new(proxy_servers, socket_timeout: 5)
  pc.set("proxy_test", "ok")
  unless pc.get("proxy_test") == "ok"
    stop_proxy(proxy_pid)
    abort "ERROR: Latency proxy on port #{proxy_port} is not working"
  end
  pc.close

  # Fewer ops for higher latency to keep runtime reasonable
  ops = case one_way
        when 0..0.001  then 1000
        when 0.001..0.003 then 500
        else 200
        end

  run_benchmark_suite(proxy_servers, "#{idx + 2}", [1, 4, 8], ops)
  stop_proxy(proxy_pid)
end

puts "=" * 68
puts "  Done."
puts "=" * 68

memcached_kill(MEMCACHED_PORT)

Benchmark Results

Benchmarked on:

  • Ruby: 3.2.9 (arm64-darwin24)
  • Dalli: 2.7.11
  • Memcached: 1.6.40

Latency injected via a forked TCP proxy process (separate GVL) that adds a
configurable sleep per data relay in each direction.

Localhost (no added latency)

Scenario Threads Blocking Async Speedup
Connect + op + close (300 iters) 1 0.056s 0.054s 1.03x
Throughput, 100B values 1 0.152s 0.149s 1.02x
Throughput, 100B values 4 0.626s 0.737s 0.85x
Throughput, 100B values 16 2.143s 2.174s ~1.00x
Throughput, 10KB values 1 0.153s 0.153s ~1.00x
Throughput, 10KB values 8 1.109s 1.169s 0.95x
4 IO threads + CPU fib(35) 4+1 1.074s 1.081s ~1.00x

On localhost, the kernel socket buffer absorbs writes instantly and
IO.select in readfull already releases the GVL during reads. The async
path adds a small extra-syscall overhead that shows up at high thread counts
with small payloads. Performance is effectively equivalent.

1ms RTT (same-rack)

Scenario Threads Blocking Async Speedup
Throughput, 100B values 4 3.426s 3.280s 1.04x
Throughput, 100B values 8 2.836s 2.847s ~1.00x
Throughput, 10KB values 1 3.314s 3.212s 1.03x

4ms RTT (same-datacenter)

Scenario Threads Blocking Async Speedup
Connect churn (300 iters) 1 3.803s 3.635s 1.05x
Throughput, 100B values 1 5.596s 5.811s 0.96x
Throughput, 100B values 4 6.064s 5.539s 1.09x
Throughput, 10KB values 4 6.435s 6.308s 1.02x
Throughput, 10KB values 8 6.664s 6.185s 1.08x
4 IO threads + CPU fib(35) 4+1 7.148s 6.878s 1.04x

At realistic datacenter latency, the async approach shows consistent 5–9%
improvements
in multi-threaded throughput. The fixed overhead of the extra
non-blocking syscall becomes negligible relative to the per-operation network
time.

10ms RTT (cross-datacenter)

Scenario Threads Blocking Async Speedup
Throughput, 10KB values 4 5.321s 5.089s 1.05x
Throughput, 100B values 8 5.231s 5.246s ~1.00x

At high latency, operations are dominated by network round-trip time. Both
approaches spend nearly all time in IO.select (which releases the GVL
regardless), so throughput converges.

@mattwd7 mattwd7 changed the title Found 114 reduce gvl contention Found 114 reduce GVL contention Feb 13, 2026
@mattwd7 mattwd7 changed the title Found 114 reduce GVL contention [FOUND-114] Reduce GVL contention Feb 13, 2026
Base automatically changed from FOUND-112-prevent-timeout-induced-data-corruption to braze-main February 18, 2026 19:04
@mattwd7 mattwd7 force-pushed the FOUND-114-reduce-gvl-contention branch from a461b44 to 5dc45d8 Compare February 18, 2026 19:20
@mattwd7 mattwd7 force-pushed the FOUND-114-reduce-gvl-contention branch from 5dc45d8 to 3537a70 Compare February 19, 2026 21:47
@mattwd7 mattwd7 marked this pull request as ready for review February 20, 2026 18:37
@mattwd7 mattwd7 requested a review from a team as a code owner February 20, 2026 18:37
@mattwd7 mattwd7 requested review from jonhyman and naveg February 20, 2026 19:03

# Returns true if host is an IP address (v4 or v6) rather than a hostname.
def self.ip_address?(host)
host.match?(/\A\d{1,3}(\.\d{1,3}){3}\z/) || host.include?(':')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Resolv::IPv4 and Ipv6 have Regexes, might be useful?

  host.match?(Resolv::IPv4::Regex) || host.match?(Resolv::IPv6::Regex)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea
host.include?(':') already captures IPv6 format, but is looser and will capture edge cases that Resolv::IPv6::Regex will miss (e.g. a scoped address like fe80::1%eth0)
Gonna move forward with

host.match?(Resolv::IPv4::Regex) || host.include?(':')

sock.options = { path: path }.merge(options)
sock.server = server
sock
rescue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rescue => e so that we at least only match StdErrors? this sounds too catch-allish

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc, rescue and rescue => e are the same, with the latter capturing the exception to a var e


dns = Resolv::DNS.new
dns.timeouts = timeout
resolver = Resolv.new([Resolv::Hosts.new, dns])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk how costly it is but i generally like the idea of caching resolved hosts for a while instead of resolving on every conn

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At Braze, we shouldn't ever get down to this line because we use IP addresses to connect to memcached and will return on the guard clause. Also connection creation is not a hot path, so optimizing here is not so important.

Also also, this bit from Claude: if we were to use DNS, there is a risk of caching and using stale hosts if K8s should rotate DNS records to redirect traffic. Leaving as-is for now

Copy link

@radixdev radixdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't call myself an expert on this, but lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants