|
| 1 | +require "http/client" |
| 2 | + |
| 3 | +module EasyAwscr::S3::Internals |
| 4 | + class ConnectionPool < Awscr::S3::HttpClientFactory |
| 5 | + getter created_at |
| 6 | + |
| 7 | + def initialize(*, @max_ttl : Time::Span? = 5.minutes, @max_size = 128) |
| 8 | + @pool = Hash(Fiber, {HTTP::Client, Time}).new |
| 9 | + @tls = OpenSSL::SSL::Context::Client.new |
| 10 | + @mutex = Mutex.new(:unchecked) |
| 11 | + @closed = false |
| 12 | + @created_at = Time.utc |
| 13 | + end |
| 14 | + |
| 15 | + def acquire_client(endpoint : URI, signer : Awscr::Signer::Signers::Interface) : HTTP::Client |
| 16 | + @mutex.synchronize { @pool.delete(Fiber.current) }.try do |client, last_checked| |
| 17 | + if expired?(last_checked) |
| 18 | + client.close |
| 19 | + else |
| 20 | + return client |
| 21 | + end |
| 22 | + end |
| 23 | + |
| 24 | + # creates a new client |
| 25 | + super |
| 26 | + end |
| 27 | + |
| 28 | + # Overwritten only to call "reset_headers" (see comment there for details). |
| 29 | + # |
| 30 | + # Note: At this point it is not clear if the workaround can be improved. Should |
| 31 | + # it become clear that it cannot, then this code should perhaps be moved into the |
| 32 | + # awscr-s3 library (since then every user would need to replicate the code here). |
| 33 | + protected def attach_signer(client, signer) |
| 34 | + if signer.is_a?(Awscr::Signer::Signers::V4) |
| 35 | + client.before_request do |req| |
| 36 | + reset_headers(req) |
| 37 | + signer.as(Awscr::Signer::Signers::V4).sign(req, encode_path: false) |
| 38 | + end |
| 39 | + else |
| 40 | + client.before_request do |req| |
| 41 | + reset_headers(req) |
| 42 | + signer.sign(req) |
| 43 | + end |
| 44 | + end |
| 45 | + end |
| 46 | + |
| 47 | + # Workaround to avoid signing errors when requests have to be repeated |
| 48 | + # after a TCPSocket has to be reconnected. |
| 49 | + # |
| 50 | + # Background: |
| 51 | + # * https://github.com/taylorfinnell/awscr-signer/issues/56 |
| 52 | + # * https://github.com/crystal-lang/crystal/issues/16028 |
| 53 | + private def reset_headers(req) |
| 54 | + req.headers.delete "Authorization" |
| 55 | + req.headers.delete "X-Amz-Content-Sha256" |
| 56 | + req.headers.delete "X-Amz-Date" |
| 57 | + end |
| 58 | + |
| 59 | + private def expired?(last_checked, now = Time.utc) |
| 60 | + @max_ttl.try { |ttl| now - last_checked > ttl } |
| 61 | + end |
| 62 | + |
| 63 | + def acquire_raw_client(endpoint : URI) : HTTP::Client |
| 64 | + HTTP::Client.new(endpoint, tls: @tls) |
| 65 | + end |
| 66 | + |
| 67 | + def release(client : HTTP::Client?) |
| 68 | + return unless client |
| 69 | + |
| 70 | + if @max_size == 0 |
| 71 | + client.close |
| 72 | + return |
| 73 | + end |
| 74 | + |
| 75 | + now = Time.utc |
| 76 | + dead1 = nil |
| 77 | + dead2 = nil |
| 78 | + dead3 = nil |
| 79 | + |
| 80 | + current_fiber = Fiber.current |
| 81 | + @mutex.synchronize do |
| 82 | + unless @closed |
| 83 | + @pool.first_key?.try do |fiber| |
| 84 | + dead1 = @pool.shift[1][0] if fiber.dead? || expired?(@pool.first_value[1], now) |
| 85 | + @pool.delete(current_fiber).try { |old_client, _| dead2 = old_client } |
| 86 | + end |
| 87 | + @pool[current_fiber] = {client, now} |
| 88 | + dead3 = @pool.shift[1][0] if @pool.size > @max_size |
| 89 | + end |
| 90 | + end |
| 91 | + ensure |
| 92 | + dead1.try &.close |
| 93 | + dead2.try &.close |
| 94 | + dead3.try &.close |
| 95 | + end |
| 96 | + |
| 97 | + def close |
| 98 | + @mutex.synchronize do |
| 99 | + return if @closed |
| 100 | + |
| 101 | + @closed = true |
| 102 | + @pool.values.each { |client, _| client.close } |
| 103 | + @pool.clear |
| 104 | + end |
| 105 | + end |
| 106 | + end |
| 107 | +end |
0 commit comments