Skip to content

Commit eef9de7

Browse files
committed
WIP
1 parent daeb4b1 commit eef9de7

File tree

6 files changed

+121
-6
lines changed

6 files changed

+121
-6
lines changed

examples/behaviour/a.out

15.8 KB
Binary file not shown.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <unistd.h>
4+
#include <pthread.h>
5+
#include <sys/select.h>
6+
#include <errno.h>
7+
#include <string.h>
8+
9+
static int pipefd[2];
10+
11+
void* select_thread(void* arg) {
12+
fd_set readfds;
13+
struct timeval tv;
14+
int ret;
15+
16+
FD_ZERO(&readfds);
17+
FD_SET(pipefd[0], &readfds);
18+
19+
/* Set a timeout, so select won't block forever if something unexpected happens. */
20+
tv.tv_sec = 5;
21+
tv.tv_usec = 0;
22+
23+
printf("Thread: calling select()...\n");
24+
25+
ret = select(pipefd[0] + 1, &readfds, NULL, NULL, &tv);
26+
if (ret == -1) {
27+
/* You will often see errno = EBADF after the close() in the other thread. */
28+
printf("Thread: select() returned -1, errno=%d (%s)\n", errno, strerror(errno));
29+
} else if (ret == 0) {
30+
printf("Thread: select() timed out\n");
31+
} else {
32+
printf("Thread: select() returned %d (pipefd[0] is readable)\n", ret);
33+
}
34+
35+
return NULL;
36+
}
37+
38+
int main(void) {
39+
pthread_t tid;
40+
int ret;
41+
42+
if (pipe(pipefd) == -1) {
43+
perror("pipe");
44+
exit(EXIT_FAILURE);
45+
}
46+
47+
/* Create a thread that calls select() on the read end of the pipe. */
48+
ret = pthread_create(&tid, NULL, select_thread, NULL);
49+
if (ret != 0) {
50+
fprintf(stderr, "pthread_create failed: %s\n", strerror(ret));
51+
exit(EXIT_FAILURE);
52+
}
53+
54+
/* Give the select() thread time to block. */
55+
sleep(1);
56+
57+
/* Close the read end of the pipe from the main thread. */
58+
printf("Main: closing pipefd[0]...\n");
59+
close(pipefd[0]);
60+
61+
/* Wait for the select thread to finish. */
62+
pthread_join(tid, NULL);
63+
64+
/* Clean up the write end (though it's not strictly necessary here). */
65+
close(pipefd[1]);
66+
67+
return 0;
68+
}

fixtures/io/event/scheduler.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,10 @@ def io_wait(io, events, timeout = nil)
220220
end
221221
end
222222

223+
Fiber.blocking{$stderr.puts "-> (#{fiber}) io_wait(#{io}, #{events}, #{timeout})"}
223224
return @selector.io_wait(fiber, io, events)
224225
ensure
226+
Fiber.blocking{$stderr.puts "<- io_wait(#{io}, #{events}, #{timeout})"}
225227
timer&.cancel!
226228
end
227229

@@ -304,6 +306,7 @@ def transfer
304306
# @parameter fiber [Fiber] The fiber to raise the exception on.
305307
# @parameter exception [Exception] The exception to raise.
306308
def fiber_interrupt(fiber, exception)
309+
Fiber.blocking{$stderr.puts "fiber_interrupt(#{fiber}, #{exception})"}
307310
unblock(nil, RaiseException.new(fiber, exception))
308311
end
309312

lib/io/event/selector/select.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ def process_wait(fiber, pid, flags)
426426
end
427427

428428
def select(duration = nil)
429+
Fiber.blocking{$stderr.puts "-> Selecting for #{duration.inspect}..."}
429430
if pop_ready
430431
# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
431432
duration = 0
@@ -436,6 +437,10 @@ def select(duration = nil)
436437
priority = Array.new
437438

438439
@waiting.each do |io, waiter|
440+
# If an IO is closed, we can no longer wait on it:
441+
next if io.closed?
442+
# This is consistent with epoll/kqueue behavior, which will remove the IO from the set of watched IOs if it is closed. This can be a problem if the IO is closed during `pop_ready` above, while another Fiber is waiting on it. The interruption to `io_wait` will be delivered on the next iteration of the event loop, but we can't use it with `IO.select` as it will raise an exception and break the event loop.
443+
439444
waiter.each do |fiber, events|
440445
if (events & IO::READABLE) > 0
441446
readable << io
@@ -463,6 +468,7 @@ def select(duration = nil)
463468
# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
464469
Thread.handle_interrupt(::Exception => :on_blocking) do
465470
@blocked = true
471+
Fiber.blocking{$stderr.puts "-> IO.select(#{readable.inspect}, #{writable.inspect}, #{priority.inspect}, #{duration.inspect})..."}
466472
readable, writable, priority = ::IO.select(readable, writable, priority, duration)
467473
rescue ::Exception => error
468474
# Requeue below...

test.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/usr/bin/env ruby
2+
3+
r, w = IO.pipe
4+
5+
thread = Thread.new do
6+
# pp IO.select([r])
7+
pp r.wait_readable
8+
rescue => error
9+
pp error
10+
end
11+
12+
Thread.pass until thread.status == "sleep"
13+
14+
r.close
15+
16+
binding.irb

test/io/event/selector/cancellable.rb

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,28 +70,50 @@ def with_scheduler
7070
Fiber.set_scheduler(nil)
7171
end
7272

73-
it "can interrupt reads" do
73+
def skip_unless_interruptable
7474
skip("IO#close interruption unsupported") unless IO::Event::INTERRUPTABLE
75+
end
76+
77+
it "can interrupt read" do
78+
skip_unless_interruptable
7579

7680
error = nil
7781

7882
with_scheduler do
7983
buffer = IO::Buffer.new(64)
8084

8185
Fiber.schedule do
82-
$stderr.puts "Reading..."
8386
begin
8487
buffer.read(input, 1)
8588
rescue => error
86-
ensure
87-
$stderr.puts "Read: #{error}"
89+
# Ignore.
90+
end
91+
end
92+
93+
Fiber.schedule do
94+
input.close
95+
end
96+
end
97+
98+
expect(error).to be_a(IOError)
99+
end
100+
101+
it "can interrupt wait_readable" do
102+
skip_unless_interruptable
103+
104+
error = nil
105+
106+
with_scheduler do
107+
Fiber.schedule do
108+
begin
109+
input.wait_readable
110+
rescue => error
111+
# Ignore.
88112
end
89113
end
90114

91115
Fiber.schedule do
92-
$stderr.puts "Closing input..."
93116
input.close
94-
$stderr.puts "Closed input."
95117
end
96118
end
97119

0 commit comments

Comments
 (0)