Skip to content

Commit ecb69fa

Browse files
committed
Add example of connection limited server.
1 parent 317f078 commit ecb69fa

File tree

5 files changed

+258
-5
lines changed

5 files changed

+258
-5
lines changed

examples/limited/config.ru

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env falcon --verbose serve -c
2+
# frozen_string_literal: true
3+
4+
def limited_semaphore_token(request)
5+
if request.respond_to?(:connection)
6+
io = request.connection.stream.io
7+
8+
if io.respond_to?(:token)
9+
return io.token
10+
end
11+
end
12+
13+
return nil
14+
end
15+
16+
run do |env|
17+
# This is not part of the rack specification, but is available when running under Falcon.
18+
request = env["protocol.http.request"]
19+
20+
# There is no guarantee that there is a connection or that the connection has a token:
21+
token = limited_semaphore_token(request)
22+
23+
Console.info(self, "Sleeping 10 seconds", token: token)
24+
25+
if env["PATH_INFO"] == "/fast"
26+
if token
27+
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already.
28+
token.release
29+
request.connection.persistent = false
30+
end
31+
32+
# Simulated "fast / non-blocking" request:
33+
sleep(1)
34+
else
35+
# Simulated "slow / blocking" request:
36+
sleep(10)
37+
end
38+
39+
[200, {}, ["Hello World"]]
40+
end

examples/limited/falcon.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env falcon-host
2+
# frozen_string_literal: true
3+
4+
# Released under the MIT License.
5+
# Copyright, 2019-2024, by Samuel Williams.
6+
7+
require "falcon/environment/rack"
8+
require_relative "limited"
9+
10+
service "limited.localhost" do
11+
include Falcon::Environment::Rack
12+
13+
scheme "http"
14+
protocol {Async::HTTP::Protocol::HTTP}
15+
16+
# Extend the endpoint options to include the (connection) limited wrapper.
17+
endpoint_options do
18+
super().merge(wrapper: Limited::Wrapper.new)
19+
end
20+
21+
count 2
22+
23+
url "http://localhost:8080"
24+
25+
endpoint do
26+
::Async::HTTP::Endpoint.parse(url).with(**endpoint_options)
27+
end
28+
end

examples/limited/limited.rb

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
module Limited
2+
# Thread local storage for the semaphore (per-worker):
3+
Thread.attr_accessor :limited_semaphore
4+
5+
# We use a thread-safe semaphore to limit the number of connections that can be accepted at once.
6+
class Semaphore
7+
# Get the semaphore for the current thread.
8+
def self.instance
9+
Thread.current.limited_semaphore ||= new
10+
end
11+
12+
# Create a new semaphore with the given limit.
13+
def initialize(limit = 1)
14+
@queue = Thread::Queue.new
15+
limit.times{release}
16+
end
17+
18+
# Release the semaphore.
19+
def release
20+
@queue.push(true)
21+
end
22+
23+
# Acquire the semaphore. May block until the semaphore is available.
24+
def acquire
25+
@queue.pop
26+
27+
return Token.new(self)
28+
end
29+
30+
# A token that can be used to release the semaphore once and once only.
31+
class Token
32+
def initialize(semaphore)
33+
@semaphore = semaphore
34+
end
35+
36+
def release
37+
if semaphore = @semaphore
38+
@semaphore = nil
39+
semaphore.release
40+
end
41+
end
42+
end
43+
end
44+
45+
# A wrapper implementation for the endpoint that limits the number of connections that can be accepted.
46+
class Wrapper < IO::Endpoint::Wrapper
47+
def socket_accept(server)
48+
semaphore = Semaphore.instance
49+
50+
# Wait until there is a connection ready to be accepted:
51+
server.wait_readable
52+
53+
# Acquire the semaphore:
54+
Console.info(self, "Acquiring semaphore...")
55+
token = semaphore.acquire
56+
57+
# Accept the connection:
58+
socket, address = super
59+
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket)
60+
61+
# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server:
62+
socket.define_singleton_method :token do
63+
token
64+
end
65+
66+
# Provide a way to release the semaphore when the connection is closed:
67+
socket.define_singleton_method :close do
68+
super()
69+
ensure
70+
Console.info(self, "Closing connection from #{address.inspect}", socket: socket)
71+
token.release
72+
end
73+
74+
return socket, address
75+
end
76+
end
77+
end

examples/limited/readme.md

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Limited Example
2+
3+
This example shows how to limit the number of connections to a server. It takes advantage of `IO::Endpoint`'s wrapper to inject the necessary logic. More specifically, we do the following:
4+
5+
1. Instead of `accept`ing a connection in a loop directly, we call `server.wait_readable` to wait for a connection to be available.
6+
2. We then try to acquire a semaphore token. If we can't, we wait for one to be available.
7+
3. Once we have a token, we accept the connection and process it.
8+
4. Once the connection is closed, we release the token.
9+
10+
This way, we can limit the number of connections to a server.
11+
12+
## Usage
13+
14+
Start the server:
15+
16+
```console
17+
> bundle exec falcon host falcon.rb
18+
0.0s info: Falcon::Command::Host [oid=0x4c8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
19+
| Falcon Host v0.49.0 taking flight!
20+
| - Configuration: falcon.rb
21+
| - To terminate: Ctrl-C or kill 99469
22+
| - To reload: kill -HUP 99469
23+
0.03s info: Async::Container::Notify::Console [oid=0x4d8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
24+
| {status: "Initializing..."}
25+
0.04s info: Falcon::Service::Server [oid=0x4e8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
26+
| Starting limited.localhost on #<Async::HTTP::Endpoint http://localhost:8080/ {reuse_address: true, timeout: nil, wrapper: #<Limited::Wrapper:0x000000011f5dfc30>}>
27+
0.04s info: Async::Service::Controller [oid=0x4f0] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
28+
| Controller starting...
29+
0.04s info: Async::Container::Notify::Console [oid=0x4d8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
30+
| {ready: true}
31+
0.04s info: Async::Service::Controller [oid=0x4f0] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
32+
| Controller started...
33+
```
34+
35+
Then, you can connect to it using `curl -v http://localhost:8080`. The default example includes two workers with a limit of one connection per worker.
36+
37+
```console
38+
> curl -v http://localhost:8080
39+
* Host localhost:8080 was resolved.
40+
* IPv6: ::1
41+
* IPv4: 127.0.0.1
42+
* Trying [::1]:8080...
43+
* Connected to localhost (::1) port 8080
44+
* using HTTP/1.x
45+
> GET / HTTP/1.1
46+
> Host: localhost:8080
47+
> User-Agent: curl/8.10.1
48+
> Accept: */*
49+
>
50+
* Request completely sent off
51+
< HTTP/1.1 200 OK
52+
< vary: accept-encoding
53+
< content-length: 11
54+
<
55+
* Connection #0 to host localhost left intact
56+
Hello World
57+
```
58+
59+
There is also a fast path which simulates requests that may not count towards the connection limit:
60+
61+
```console
62+
> curl -v http://localhost:8080/fast http://localhost:8080/fast
63+
* Host localhost:8080 was resolved.
64+
* IPv6: ::1
65+
* IPv4: 127.0.0.1
66+
* Trying [::1]:8080...
67+
* Connected to localhost (::1) port 8080
68+
* using HTTP/1.x
69+
> GET /fast HTTP/1.1
70+
> Host: localhost:8080
71+
> User-Agent: curl/8.10.1
72+
> Accept: */*
73+
>
74+
* Request completely sent off
75+
< HTTP/1.1 200 OK
76+
< vary: accept-encoding
77+
< connection: close
78+
< content-length: 11
79+
<
80+
* shutting down connection #0
81+
Hello World* Hostname localhost was found in DNS cache
82+
* Trying [::1]:8080...
83+
* Connected to localhost (::1) port 8080
84+
* using HTTP/1.x
85+
> GET /fast HTTP/1.1
86+
> Host: localhost:8080
87+
> User-Agent: curl/8.10.1
88+
> Accept: */*
89+
>
90+
* Request completely sent off
91+
< HTTP/1.1 200 OK
92+
< vary: accept-encoding
93+
< connection: close
94+
< content-length: 11
95+
<
96+
* shutting down connection #1
97+
Hello World
98+
```
99+
100+
Note that we use `connection: close` because we are using the fast path. This is to ensure that the connection is closed immediately after the response is sent such that a subsequent "slow" request won't double up.

lib/falcon/environment/server.rb

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ def count
3333

3434
# Options to use when creating the container.
3535
def container_options
36-
{restart: true, count: self.count, health_check_timeout: 30}.compact
36+
{
37+
restart: true,
38+
count: self.count,
39+
health_check_timeout: 30,
40+
}.compact
3741
end
3842

3943
# The host that this server will receive connections for.
@@ -45,13 +49,17 @@ def timeout
4549
nil
4650
end
4751

52+
def endpoint_options
53+
{
54+
reuse_address: true,
55+
timeout: timeout,
56+
}
57+
end
58+
4859
# The upstream endpoint that will handle incoming requests.
4960
# @returns [Async::HTTP::Endpoint]
5061
def endpoint
51-
::Async::HTTP::Endpoint.parse(url).with(
52-
reuse_address: true,
53-
timeout: timeout,
54-
)
62+
::Async::HTTP::Endpoint.parse(url).with(**endpoint_options)
5563
end
5664

5765
def verbose

0 commit comments

Comments
 (0)