15
15
#include < algorithm>
16
16
#include < cassert>
17
17
#include < cerrno>
18
+ #include < chrono>
18
19
#include < csignal>
19
20
#include < ctime>
20
21
#include < fcntl.h>
@@ -68,6 +69,30 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
68
69
(void )bytes_written;
69
70
}
70
71
72
+ class ToTimeSpec {
73
+ public:
74
+ explicit ToTimeSpec (std::optional<MainLoopPosix::TimePoint> point) {
75
+ using namespace std ::chrono;
76
+
77
+ if (!point) {
78
+ m_ts_ptr = nullptr ;
79
+ return ;
80
+ }
81
+ nanoseconds dur = std::max (*point - steady_clock::now (), nanoseconds (0 ));
82
+ m_ts_ptr = &m_ts;
83
+ m_ts.tv_sec = duration_cast<seconds>(dur).count ();
84
+ m_ts.tv_nsec = (dur % seconds (1 )).count ();
85
+ }
86
+ ToTimeSpec (const ToTimeSpec &) = delete ;
87
+ ToTimeSpec &operator =(const ToTimeSpec &) = delete ;
88
+
89
+ operator struct timespec *() { return m_ts_ptr; }
90
+
91
+ private:
92
+ struct timespec m_ts;
93
+ struct timespec *m_ts_ptr;
94
+ };
95
+
71
96
class MainLoopPosix ::RunImpl {
72
97
public:
73
98
RunImpl (MainLoopPosix &loop);
@@ -100,8 +125,9 @@ Status MainLoopPosix::RunImpl::Poll() {
100
125
for (auto &fd : loop.m_read_fds )
101
126
EV_SET (&in_events[i++], fd.first , EVFILT_READ, EV_ADD, 0 , 0 , 0 );
102
127
103
- num_events = kevent (loop.m_kqueue , in_events.data (), in_events.size (),
104
- out_events, std::size (out_events), nullptr );
128
+ num_events =
129
+ kevent (loop.m_kqueue , in_events.data (), in_events.size (), out_events,
130
+ std::size (out_events), ToTimeSpec (loop.GetNextWakeupTime ()));
105
131
106
132
if (num_events < 0 ) {
107
133
if (errno == EINTR) {
@@ -145,7 +171,7 @@ Status MainLoopPosix::RunImpl::Poll() {
145
171
}
146
172
147
173
if (ppoll (read_fds.data (), read_fds.size (),
148
- /* timeout= */ nullptr ,
174
+ ToTimeSpec (loop. GetNextWakeupTime ()) ,
149
175
/* sigmask=*/ nullptr ) == -1 &&
150
176
errno != EINTR)
151
177
return Status (errno, eErrorTypePOSIX);
@@ -166,27 +192,28 @@ void MainLoopPosix::RunImpl::ProcessReadEvents() {
166
192
}
167
193
#endif
168
194
169
- MainLoopPosix::MainLoopPosix () : m_triggering( false ) {
170
- Status error = m_trigger_pipe .CreateNew (/* child_process_inherit=*/ false );
195
+ MainLoopPosix::MainLoopPosix () {
196
+ Status error = m_interrupt_pipe .CreateNew (/* child_process_inherit=*/ false );
171
197
assert (error.Success ());
172
198
173
199
// Make the write end of the pipe non-blocking.
174
- int result = fcntl (m_trigger_pipe .GetWriteFileDescriptor (), F_SETFL,
175
- fcntl (m_trigger_pipe .GetWriteFileDescriptor (), F_GETFL) |
200
+ int result = fcntl (m_interrupt_pipe .GetWriteFileDescriptor (), F_SETFL,
201
+ fcntl (m_interrupt_pipe .GetWriteFileDescriptor (), F_GETFL) |
176
202
O_NONBLOCK);
177
203
assert (result == 0 );
178
204
UNUSED_IF_ASSERT_DISABLED (result);
179
205
180
- const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor ();
181
- m_read_fds.insert ({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) {
182
- char c;
183
- ssize_t bytes_read = llvm::sys::RetryAfterSignal (
184
- -1 , ::read, trigger_pipe_fd, &c, 1 );
185
- assert (bytes_read == 1 );
186
- UNUSED_IF_ASSERT_DISABLED (bytes_read);
187
- // NB: This implicitly causes another loop iteration
188
- // and therefore the execution of pending callbacks.
189
- }});
206
+ const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor ();
207
+ m_read_fds.insert (
208
+ {interrupt_pipe_fd, [interrupt_pipe_fd](MainLoopBase &loop) {
209
+ char c;
210
+ ssize_t bytes_read =
211
+ llvm::sys::RetryAfterSignal (-1 , ::read, interrupt_pipe_fd, &c, 1 );
212
+ assert (bytes_read == 1 );
213
+ UNUSED_IF_ASSERT_DISABLED (bytes_read);
214
+ // NB: This implicitly causes another loop iteration
215
+ // and therefore the execution of pending callbacks.
216
+ }});
190
217
#if HAVE_SYS_EVENT_H
191
218
m_kqueue = kqueue ();
192
219
assert (m_kqueue >= 0 );
@@ -197,8 +224,8 @@ MainLoopPosix::~MainLoopPosix() {
197
224
#if HAVE_SYS_EVENT_H
198
225
close (m_kqueue);
199
226
#endif
200
- m_read_fds.erase (m_trigger_pipe .GetReadFileDescriptor ());
201
- m_trigger_pipe .Close ();
227
+ m_read_fds.erase (m_interrupt_pipe .GetReadFileDescriptor ());
228
+ m_interrupt_pipe .Close ();
202
229
assert (m_read_fds.size () == 0 );
203
230
assert (m_signals.size () == 0 );
204
231
}
@@ -245,11 +272,9 @@ MainLoopPosix::RegisterSignal(int signo, const Callback &callback,
245
272
sigset_t old_set;
246
273
247
274
// Set signal info before installing the signal handler!
248
- g_signal_info[signo].pipe_fd = m_trigger_pipe .GetWriteFileDescriptor ();
275
+ g_signal_info[signo].pipe_fd = m_interrupt_pipe .GetWriteFileDescriptor ();
249
276
g_signal_info[signo].flag = 0 ;
250
277
251
- // Even if using kqueue, the signal handler will still be invoked, so it's
252
- // important to replace it with our "benign" handler.
253
278
int ret = sigaction (signo, &new_action, &info.old_action );
254
279
UNUSED_IF_ASSERT_DISABLED (ret);
255
280
assert (ret == 0 && " sigaction failed" );
@@ -308,8 +333,8 @@ Status MainLoopPosix::Run() {
308
333
309
334
ProcessSignals ();
310
335
311
- m_triggering = false ;
312
- ProcessPendingCallbacks ();
336
+ m_interrupting = false ;
337
+ ProcessCallbacks ();
313
338
}
314
339
return Status ();
315
340
}
@@ -347,13 +372,13 @@ void MainLoopPosix::ProcessSignal(int signo) {
347
372
}
348
373
}
349
374
350
- void MainLoopPosix::TriggerPendingCallbacks () {
351
- if (m_triggering .exchange (true ))
375
+ void MainLoopPosix::Interrupt () {
376
+ if (m_interrupting .exchange (true ))
352
377
return ;
353
378
354
379
char c = ' .' ;
355
380
size_t bytes_written;
356
- Status error = m_trigger_pipe .Write (&c, 1 , bytes_written);
381
+ Status error = m_interrupt_pipe .Write (&c, 1 , bytes_written);
357
382
assert (error.Success ());
358
383
UNUSED_IF_ASSERT_DISABLED (error);
359
384
assert (bytes_written == 1 );
0 commit comments