Skip to content

Commit dc0c4da

Browse files
committed
Improved implementation of blocking_operation_wait.
1 parent acabc87 commit dc0c4da

File tree

3 files changed

+91
-6
lines changed

3 files changed

+91
-6
lines changed

fixtures/io/event/test_scheduler.rb

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ module IO::Event
3232
class TestScheduler
3333
def initialize(selector: nil, worker_pool: nil, maximum_worker_count: nil)
3434
@selector = selector || ::IO::Event::Selector.new(Fiber.current)
35-
@worker_pool = worker_pool || WorkerPool.new(maximum_worker_count: maximum_worker_count)
35+
36+
if ::IO::Event.const_defined?(:WorkerPool)
37+
@worker_pool = worker_pool || ::IO::Event::WorkerPool.new(maximum_worker_count: maximum_worker_count)
38+
end
39+
3640
@timers = ::IO::Event::Timers.new
3741

3842
# Track the number of fibers that are blocked.
@@ -45,10 +49,15 @@ def initialize(selector: nil, worker_pool: nil, maximum_worker_count: nil)
4549
# @attribute [IO::Event::Selector] The I/O event selector used for managing fiber scheduling.
4650
attr_reader :selector
4751

48-
# Required fiber scheduler hook - delegates to WorkerPool
49-
def blocking_operation_wait(operation)
50-
# Submit the operation to the worker pool and wait for completion
51-
@worker_pool.call(operation)
52+
if ::IO::Event.const_defined?(:WorkerPool)
53+
# Optional fiber scheduler hook - delegates to WorkerPool:
54+
def blocking_operation_wait(operation)
55+
@blocked += 1
56+
# Submit the operation to the worker pool and wait for completion
57+
@worker_pool&.call(operation)
58+
ensure
59+
@blocked -= 1
60+
end
5261
end
5362

5463
# Required fiber scheduler hooks
@@ -114,7 +123,12 @@ def io_wait(io, events, timeout = nil)
114123
end
115124
end
116125

117-
return @selector.io_wait(fiber, io, events)
126+
begin
127+
@blocked += 1
128+
return @selector.io_wait(fiber, io, events)
129+
ensure
130+
@blocked -= 1
131+
end
118132
ensure
119133
timer&.cancel!
120134
end

lib/io/event/debug/selector.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ def ready?
128128
@selector.ready?
129129
end
130130

131+
# Run the given blocking operation and wait for its completion.
132+
def blocking_operation_wait(operation)
133+
log("Waiting for blocking operation #{operation.inspect}")
134+
@selector.blocking_operation_wait(operation)
135+
end
136+
131137
# Wait for the given process, forwarded to the underlying selector.
132138
def process_wait(*arguments)
133139
log("Waiting for process with #{arguments.inspect}")

test/tcp_socket.rb

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "io/event"
7+
require "io/event/test_scheduler"
8+
9+
require "socket"
10+
11+
describe TCPSocket do
12+
let(:scheduler) {IO::Event::TestScheduler.new}
13+
14+
it "can read and write data" do
15+
chunk_size = 1024*6
16+
buffer_size = 1024*64
17+
18+
server_socket = TCPServer.new("localhost", 0)
19+
port = server_socket.addr[1]
20+
21+
client = TCPSocket.new("localhost", port)
22+
server = server_socket.accept
23+
24+
Fiber.set_scheduler(scheduler)
25+
26+
writers = Thread::Queue.new
27+
2.times do |i|
28+
Fiber.schedule do
29+
buffer = i.to_s * chunk_size
30+
31+
128.times do
32+
server.write(buffer)
33+
server.flush
34+
end
35+
36+
writers << :done
37+
end
38+
end
39+
40+
Fiber.schedule do
41+
2.times do
42+
writers.pop
43+
end
44+
45+
server.close
46+
end
47+
48+
Fiber.schedule do
49+
while result = client.read_nonblock(buffer_size, exception: false)
50+
case result
51+
when :wait_readable
52+
client.wait_readable
53+
when :wait_writable
54+
client.wait_writable
55+
else
56+
# Done.
57+
end
58+
end
59+
end
60+
61+
scheduler.run
62+
ensure
63+
Fiber.set_scheduler(nil)
64+
end
65+
end

0 commit comments

Comments
 (0)