-
Notifications
You must be signed in to change notification settings - Fork 0
[FOUND-114] Reduce GVL contention #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: braze-main
Are you sure you want to change the base?
Changes from all commits
6b4c416
bb1e2f0
3537a70
0d2af2d
ae4c1a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| require 'resolv' | ||
|
|
||
| module Dalli | ||
| module Socket | ||
| module InstanceMethods | ||
|
|
@@ -21,6 +23,22 @@ def readfull(count) | |
| value | ||
| end | ||
|
|
||
| def writefull(bytes) | ||
| offset = 0 | ||
| while offset < bytes.bytesize | ||
| chunk = offset == 0 ? bytes : bytes.byteslice(offset..-1) | ||
| result = write_nonblock(chunk, exception: false) | ||
| if result == :wait_writable | ||
| raise Timeout::Error, "IO timeout: #{safe_options.inspect}" unless IO.select(nil, [self], nil, options[:socket_timeout]) | ||
| elsif result == :wait_readable | ||
| raise Timeout::Error, "IO timeout: #{safe_options.inspect}" unless IO.select([self], nil, nil, options[:socket_timeout]) | ||
| else | ||
| offset += result | ||
| end | ||
| end | ||
| offset | ||
| end | ||
|
|
||
| def read_available | ||
| value = +"" | ||
| loop do | ||
|
|
@@ -43,35 +61,97 @@ def safe_options | |
| end | ||
| end | ||
|
|
||
| class TCP < TCPSocket | ||
| class TCP < ::Socket | ||
| include Dalli::Socket::InstanceMethods | ||
| attr_accessor :options, :server | ||
|
|
||
| def self.open(host, port, server, options = {}) | ||
| Timeout.timeout(options[:socket_timeout]) do | ||
| sock = new(host, port) | ||
| sock.options = {host: host, port: port}.merge(options) | ||
| sock.server = server | ||
| sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, true) | ||
| sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) if options[:keepalive] | ||
| sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVBUF, options[:rcvbuf]) if options[:rcvbuf] | ||
| sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF, options[:sndbuf]) if options[:sndbuf] | ||
| sock | ||
| addr_info = resolve_address(host, options[:socket_timeout]) | ||
| sock = new(addr_info[4], ::Socket::SOCK_STREAM, 0) # addr_info[4] == address family constant (e.g. AF_INET), expressed as an integer | ||
|
|
||
| sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, true) | ||
| sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) if options[:keepalive] | ||
| sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVBUF, options[:rcvbuf]) if options[:rcvbuf] | ||
| sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF, options[:sndbuf]) if options[:sndbuf] | ||
|
|
||
| sockaddr = ::Socket.pack_sockaddr_in(port, addr_info[3]) # addr_info[3] == IP address string (e.g. "192.168.1.1") | ||
| result = sock.connect_nonblock(sockaddr, exception: false) | ||
| if result == :wait_writable | ||
| unless IO.select(nil, [sock], nil, options[:socket_timeout]) | ||
| raise Timeout::Error, "Connection timeout: #{host}:#{port}" | ||
| end | ||
| begin | ||
| sock.connect_nonblock(sockaddr) | ||
| rescue Errno::EISCONN | ||
| # already connected | ||
| end | ||
| end | ||
|
|
||
| sock.options = { host: host, port: port }.merge(options) | ||
| sock.server = server | ||
| sock | ||
| rescue | ||
| sock&.close rescue nil | ||
| raise | ||
| end | ||
|
|
||
| # Resolve a hostname to structured address info with timeout protection. | ||
| # getaddrinfo(3) is a blocking C library call that can block indefinitely | ||
| # on unresponsive DNS. For IP addresses (the common case with memcached), | ||
| # getaddrinfo returns immediately without DNS and is safe to call directly. | ||
| # For hostnames, we use Ruby's Resolv library which is pure Ruby and | ||
| # supports timeouts, then pass the resolved IP to getaddrinfo for the | ||
| # structured address info the caller expects. | ||
| def self.resolve_address(host, timeout) | ||
| if ip_address?(host) | ||
| return ::Socket.getaddrinfo(host, nil, ::Socket::AF_UNSPEC, ::Socket::SOCK_STREAM).first | ||
| end | ||
|
|
||
| dns = Resolv::DNS.new | ||
| dns.timeouts = timeout | ||
| resolver = Resolv.new([Resolv::Hosts.new, dns]) | ||
| resolved_ip = resolver.getaddress(host).to_s | ||
| ::Socket.getaddrinfo(resolved_ip, nil, ::Socket::AF_UNSPEC, ::Socket::SOCK_STREAM).first | ||
| rescue Resolv::ResolvError => e | ||
| raise SocketError, "getaddrinfo: Name or service not known - #{host} (#{e.message})" | ||
| ensure | ||
| dns&.close | ||
| end | ||
| private_class_method :resolve_address | ||
|
|
||
| # Returns true if host is an IP address (v4 or v6) rather than a hostname. | ||
| def self.ip_address?(host) | ||
| host.match?(Resolv::IPv4::Regex) || host.include?(':') | ||
| end | ||
| private_class_method :ip_address? | ||
| end | ||
|
|
||
| class UNIX < UNIXSocket | ||
| class UNIX < ::Socket | ||
| include Dalli::Socket::InstanceMethods | ||
| attr_accessor :options, :server | ||
|
|
||
| def self.open(path, server, options = {}) | ||
| Timeout.timeout(options[:socket_timeout]) do | ||
| sock = new(path) | ||
| sock.options = {path: path}.merge(options) | ||
| sock.server = server | ||
| sock | ||
| sock = new(::Socket::AF_UNIX, ::Socket::SOCK_STREAM, 0) | ||
| sockaddr = ::Socket.pack_sockaddr_un(path) | ||
|
|
||
| result = sock.connect_nonblock(sockaddr, exception: false) | ||
| if result == :wait_writable | ||
| unless IO.select(nil, [sock], nil, options[:socket_timeout]) | ||
| raise Timeout::Error, "Connection timeout: #{path}" | ||
| end | ||
| begin | ||
| sock.connect_nonblock(sockaddr) | ||
| rescue Errno::EISCONN | ||
| # already connected | ||
| end | ||
| end | ||
|
|
||
| sock.options = { path: path }.merge(options) | ||
| sock.server = server | ||
| sock | ||
| rescue | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. iirc, |
||
| sock&.close rescue nil | ||
| raise | ||
| end | ||
| end | ||
| end | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idk how costly it is but i generally like the idea of caching resolved hosts for a while instead of resolving on every conn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At Braze, we shouldn't ever get down to this line because we use IP addresses to connect to memcached and will return on the guard clause. Also connection creation is not a hot path, so optimizing here is not so important.
Also also, this bit from Claude: if we were to use DNS, there is a risk of caching and using stale hosts if K8s should rotate DNS records to redirect traffic. Leaving as-is for now