libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

82.7% Lines (405/490) 93.5% Functions (43/46) 69.5% Branches (210/302)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105
106 178 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
107 178 : key(k)
108 178 , next(n)
109 178 , private_outstanding_work(0)
110 {
111 178 }
112 };
113
114 namespace {
115
116 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
117
118 struct thread_context_guard
119 {
120 scheduler_context frame_;
121
122 178 explicit thread_context_guard(
123 epoll_scheduler const* ctx) noexcept
124 178 : frame_(ctx, context_stack.get())
125 {
126 178 context_stack.set(&frame_);
127 178 }
128
129 178 ~thread_context_guard() noexcept
130 {
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 178 times.
178 if (!frame_.private_queue.empty())
132 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
133 178 context_stack.set(frame_.next);
134 178 }
135 };
136
137 scheduler_context*
138 356438 find_context(epoll_scheduler const* self) noexcept
139 {
140
2/2
✓ Branch 1 taken 354789 times.
✓ Branch 2 taken 1649 times.
356438 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
141
1/2
✓ Branch 0 taken 354789 times.
✗ Branch 1 not taken.
354789 if (c->key == self)
142 354789 return c;
143 1649 return nullptr;
144 }
145
146 } // namespace
147
148 void
149 123989 descriptor_state::
150 operator()()
151 {
152 123989 is_enqueued_.store(false, std::memory_order_relaxed);
153
154 // Take ownership of impl ref set by close_socket() to prevent
155 // the owning impl from being freed while we're executing
156 123989 auto prevent_impl_destruction = std::move(impl_ref_);
157
158 123989 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 123989 times.
123989 if (ev == 0)
160 {
161 scheduler_->compensating_work_started();
162 return;
163 }
164
165 123989 op_queue local_ops;
166
167 123989 int err = 0;
168
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 123988 times.
123989 if (ev & EPOLLERR)
169 {
170 1 socklen_t len = sizeof(err);
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
172 err = errno;
173
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
174 1 err = EIO;
175 }
176
177 123989 epoll_op* rd = nullptr;
178 123989 epoll_op* wr = nullptr;
179 123989 epoll_op* cn = nullptr;
180 {
181
1/1
✓ Branch 1 taken 123989 times.
123989 std::lock_guard lock(mutex);
182
2/2
✓ Branch 0 taken 53161 times.
✓ Branch 1 taken 70828 times.
123989 if (ev & EPOLLIN)
183 {
184 53161 rd = std::exchange(read_op, nullptr);
185
2/2
✓ Branch 0 taken 48732 times.
✓ Branch 1 taken 4429 times.
53161 if (!rd)
186 48732 read_ready = true;
187 }
188
2/2
✓ Branch 0 taken 119612 times.
✓ Branch 1 taken 4377 times.
123989 if (ev & EPOLLOUT)
189 {
190 119612 cn = std::exchange(connect_op, nullptr);
191 119612 wr = std::exchange(write_op, nullptr);
192
3/4
✓ Branch 0 taken 115232 times.
✓ Branch 1 taken 4380 times.
✓ Branch 2 taken 115232 times.
✗ Branch 3 not taken.
119612 if (!cn && !wr)
193 115232 write_ready = true;
194 }
195
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 123988 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
123989 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
196 {
197 rd = std::exchange(read_op, nullptr);
198 wr = std::exchange(write_op, nullptr);
199 cn = std::exchange(connect_op, nullptr);
200 }
201 123989 }
202
203 // Non-null after I/O means EAGAIN; re-register under lock below
204
2/2
✓ Branch 0 taken 4429 times.
✓ Branch 1 taken 119560 times.
123989 if (rd)
205 {
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4429 times.
4429 if (err)
207 rd->complete(err, 0);
208 else
209 4429 rd->perform_io();
210
211
2/4
✓ Branch 0 taken 4429 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4429 times.
4429 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
212 {
213 rd->errn = 0;
214 }
215 else
216 {
217 4429 local_ops.push(rd);
218 4429 rd = nullptr;
219 }
220 }
221
222
2/2
✓ Branch 0 taken 4380 times.
✓ Branch 1 taken 119609 times.
123989 if (cn)
223 {
224
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4380 times.
4380 if (err)
225 cn->complete(err, 0);
226 else
227 4380 cn->perform_io();
228 4380 local_ops.push(cn);
229 4380 cn = nullptr;
230 }
231
232
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 123989 times.
123989 if (wr)
233 {
234 if (err)
235 wr->complete(err, 0);
236 else
237 wr->perform_io();
238
239 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
240 {
241 wr->errn = 0;
242 }
243 else
244 {
245 local_ops.push(wr);
246 wr = nullptr;
247 }
248 }
249
250
2/4
✓ Branch 0 taken 123989 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 123989 times.
123989 if (rd || wr)
251 {
252 std::lock_guard lock(mutex);
253 if (rd)
254 read_op = rd;
255 if (wr)
256 write_op = wr;
257 }
258
259 // Execute first handler inline — the scheduler's work_cleanup
260 // accounts for this as the "consumed" work item
261 123989 scheduler_op* first = local_ops.pop();
262
2/2
✓ Branch 0 taken 8809 times.
✓ Branch 1 taken 115180 times.
123989 if (first)
263 {
264
1/1
✓ Branch 1 taken 8809 times.
8809 scheduler_->post_deferred_completions(local_ops);
265
1/1
✓ Branch 1 taken 8809 times.
8809 (*first)();
266 }
267 else
268 {
269 115180 scheduler_->compensating_work_started();
270 }
271 123989 }
272
273 189 epoll_scheduler::
274 epoll_scheduler(
275 capy::execution_context& ctx,
276 189 int)
277 189 : epoll_fd_(-1)
278 189 , event_fd_(-1)
279 189 , timer_fd_(-1)
280 189 , outstanding_work_(0)
281 189 , stopped_(false)
282 189 , shutdown_(false)
283 189 , task_running_{false}
284 189 , task_interrupted_(false)
285 378 , state_(0)
286 {
287 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
288
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
289 detail::throw_system_error(make_err(errno), "epoll_create1");
290
291 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
293 {
294 int errn = errno;
295 ::close(epoll_fd_);
296 detail::throw_system_error(make_err(errn), "eventfd");
297 }
298
299 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
300
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
301 {
302 int errn = errno;
303 ::close(event_fd_);
304 ::close(epoll_fd_);
305 detail::throw_system_error(make_err(errn), "timerfd_create");
306 }
307
308 189 epoll_event ev{};
309 189 ev.events = EPOLLIN | EPOLLET;
310 189 ev.data.ptr = nullptr;
311
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
312 {
313 int errn = errno;
314 ::close(timer_fd_);
315 ::close(event_fd_);
316 ::close(epoll_fd_);
317 detail::throw_system_error(make_err(errn), "epoll_ctl");
318 }
319
320 189 epoll_event timer_ev{};
321 189 timer_ev.events = EPOLLIN | EPOLLERR;
322 189 timer_ev.data.ptr = &timer_fd_;
323
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
324 {
325 int errn = errno;
326 ::close(timer_fd_);
327 ::close(event_fd_);
328 ::close(epoll_fd_);
329 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
330 }
331
332
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
333
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
334 timer_service::callback(
335 this,
336 [](void* p) {
337 4573 auto* self = static_cast<epoll_scheduler*>(p);
338 4573 self->timerfd_stale_.store(true, std::memory_order_release);
339
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4573 times.
4573 if (self->task_running_.load(std::memory_order_acquire))
340 self->interrupt_reactor();
341 4573 }));
342
343 // Initialize resolver service
344
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
345
346 // Initialize signal service
347
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
348
349 // Push task sentinel to interleave reactor runs with handler execution
350 189 completed_ops_.push(&task_op_);
351 189 }
352
353 378 epoll_scheduler::
354 189 ~epoll_scheduler()
355 {
356
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
357 189 ::close(timer_fd_);
358
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
359 189 ::close(event_fd_);
360
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
361 189 ::close(epoll_fd_);
362 378 }
363
364 void
365 189 epoll_scheduler::
366 shutdown()
367 {
368 {
369
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
370 189 shutdown_ = true;
371
372
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
373 {
374
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
375 189 continue;
376 lock.unlock();
377 h->destroy();
378 lock.lock();
379 189 }
380
381 189 signal_all(lock);
382 189 }
383
384 189 outstanding_work_.store(0, std::memory_order_release);
385
386
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
387 189 interrupt_reactor();
388 189 }
389
390 void
391 6255 epoll_scheduler::
392 post(capy::coro h) const
393 {
394 struct post_handler final
395 : scheduler_op
396 {
397 capy::coro h_;
398
399 explicit
400 6255 post_handler(capy::coro h)
401 6255 : h_(h)
402 {
403 6255 }
404
405 12510 ~post_handler() = default;
406
407 6255 void operator()() override
408 {
409 6255 auto h = h_;
410
1/2
✓ Branch 0 taken 6255 times.
✗ Branch 1 not taken.
6255 delete this;
411
1/1
✓ Branch 1 taken 6255 times.
6255 h.resume();
412 6255 }
413
414 void destroy() override
415 {
416 delete this;
417 }
418 };
419
420
1/1
✓ Branch 1 taken 6255 times.
6255 auto ph = std::make_unique<post_handler>(h);
421
422 // Fast path: same thread posts to private queue
423 // Only count locally; work_cleanup batches to global counter
424
2/2
✓ Branch 1 taken 4632 times.
✓ Branch 2 taken 1623 times.
6255 if (auto* ctx = find_context(this))
425 {
426 4632 ++ctx->private_outstanding_work;
427 4632 ctx->private_queue.push(ph.release());
428 4632 return;
429 }
430
431 // Slow path: cross-thread post requires mutex
432 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
433
434
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
435 1623 completed_ops_.push(ph.release());
436
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
437 6255 }
438
439 void
440 235003 epoll_scheduler::
441 post(scheduler_op* h) const
442 {
443 // Fast path: same thread posts to private queue
444 // Only count locally; work_cleanup batches to global counter
445
2/2
✓ Branch 1 taken 234977 times.
✓ Branch 2 taken 26 times.
235003 if (auto* ctx = find_context(this))
446 {
447 234977 ++ctx->private_outstanding_work;
448 234977 ctx->private_queue.push(h);
449 234977 return;
450 }
451
452 // Slow path: cross-thread post requires mutex
453 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
454
455
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
456 26 completed_ops_.push(h);
457
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
458 26 }
459
460 void
461 5075 epoll_scheduler::
462 on_work_started() noexcept
463 {
464 5075 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
465 5075 }
466
467 void
468 5043 epoll_scheduler::
469 on_work_finished() noexcept
470 {
471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5043 times.
10086 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
472 stop();
473 5043 }
474
475 bool
476 4861 epoll_scheduler::
477 running_in_this_thread() const noexcept
478 {
479
2/2
✓ Branch 1 taken 4651 times.
✓ Branch 2 taken 210 times.
4861 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
480
1/2
✓ Branch 0 taken 4651 times.
✗ Branch 1 not taken.
4651 if (c->key == this)
481 4651 return true;
482 210 return false;
483 }
484
485 void
486 20 epoll_scheduler::
487 stop()
488 {
489
1/1
✓ Branch 1 taken 20 times.
20 std::unique_lock lock(mutex_);
490
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 4 times.
20 if (!stopped_)
491 {
492 16 stopped_ = true;
493 16 signal_all(lock);
494
1/1
✓ Branch 1 taken 16 times.
16 interrupt_reactor();
495 }
496 20 }
497
498 bool
499 16 epoll_scheduler::
500 stopped() const noexcept
501 {
502 16 std::unique_lock lock(mutex_);
503 32 return stopped_;
504 16 }
505
506 void
507 49 epoll_scheduler::
508 restart()
509 {
510
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
511 49 stopped_ = false;
512 49 }
513
514 std::size_t
515 175 epoll_scheduler::
516 run()
517 {
518
2/2
✓ Branch 1 taken 11 times.
✓ Branch 2 taken 164 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
519 {
520
1/1
✓ Branch 1 taken 11 times.
11 stop();
521 11 return 0;
522 }
523
524 164 thread_context_guard ctx(this);
525
1/1
✓ Branch 1 taken 164 times.
164 std::unique_lock lock(mutex_);
526
527 164 std::size_t n = 0;
528 for (;;)
529 {
530
3/3
✓ Branch 1 taken 365396 times.
✓ Branch 3 taken 164 times.
✓ Branch 4 taken 365232 times.
365396 if (!do_one(lock, -1, &ctx.frame_))
531 164 break;
532
1/2
✓ Branch 1 taken 365232 times.
✗ Branch 2 not taken.
365232 if (n != (std::numeric_limits<std::size_t>::max)())
533 365232 ++n;
534
2/2
✓ Branch 1 taken 130249 times.
✓ Branch 2 taken 234983 times.
365232 if (!lock.owns_lock())
535
1/1
✓ Branch 1 taken 130249 times.
130249 lock.lock();
536 }
537 164 return n;
538 164 }
539
540 std::size_t
541 2 epoll_scheduler::
542 run_one()
543 {
544
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
545 {
546 stop();
547 return 0;
548 }
549
550 2 thread_context_guard ctx(this);
551
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
552
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
553 2 }
554
555 std::size_t
556 14 epoll_scheduler::
557 wait_one(long usec)
558 {
559
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
560 {
561
1/1
✓ Branch 1 taken 5 times.
5 stop();
562 5 return 0;
563 }
564
565 9 thread_context_guard ctx(this);
566
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
567
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
568 9 }
569
570 std::size_t
571 2 epoll_scheduler::
572 poll()
573 {
574
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
575 {
576
1/1
✓ Branch 1 taken 1 time.
1 stop();
577 1 return 0;
578 }
579
580 1 thread_context_guard ctx(this);
581
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
582
583 1 std::size_t n = 0;
584 for (;;)
585 {
586
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
587 1 break;
588
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
589 2 ++n;
590
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
591
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
592 }
593 1 return n;
594 1 }
595
596 std::size_t
597 4 epoll_scheduler::
598 poll_one()
599 {
600
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
601 {
602
1/1
✓ Branch 1 taken 2 times.
2 stop();
603 2 return 0;
604 }
605
606 2 thread_context_guard ctx(this);
607
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
608
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
609 2 }
610
611 void
612 8832 epoll_scheduler::
613 register_descriptor(int fd, descriptor_state* desc) const
614 {
615 8832 epoll_event ev{};
616 8832 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
617 8832 ev.data.ptr = desc;
618
619
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8832 times.
8832 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
620 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
621
622 8832 desc->registered_events = ev.events;
623 8832 desc->fd = fd;
624 8832 desc->scheduler_ = this;
625
626
1/1
✓ Branch 1 taken 8832 times.
8832 std::lock_guard lock(desc->mutex);
627 8832 desc->read_ready = false;
628 8832 desc->write_ready = false;
629 8832 }
630
631 void
632 8832 epoll_scheduler::
633 deregister_descriptor(int fd) const
634 {
635 8832 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
636 8832 }
637
638 void
639 8937 epoll_scheduler::
640 work_started() const noexcept
641 {
642 8937 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
643 8937 }
644
645 void
646 15241 epoll_scheduler::
647 work_finished() const noexcept
648 {
649
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 15093 times.
30482 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
650 {
651 // Last work item completed - wake all threads so they can exit.
652 // signal_all() wakes threads waiting on the condvar.
653 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
654 // Both are needed because they target different blocking mechanisms.
655 148 std::unique_lock lock(mutex_);
656 148 signal_all(lock);
657
5/6
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 144 times.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
✓ Branch 6 taken 144 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
658 {
659 4 task_interrupted_ = true;
660 4 lock.unlock();
661 4 interrupt_reactor();
662 }
663 148 }
664 15241 }
665
666 void
667 115180 epoll_scheduler::
668 compensating_work_started() const noexcept
669 {
670 115180 auto* ctx = find_context(this);
671
1/2
✓ Branch 0 taken 115180 times.
✗ Branch 1 not taken.
115180 if (ctx)
672 115180 ++ctx->private_outstanding_work;
673 115180 }
674
675 void
676 epoll_scheduler::
677 drain_thread_queue(op_queue& queue, long count) const
678 {
679 // Note: outstanding_work_ was already incremented when posting
680 std::unique_lock lock(mutex_);
681 completed_ops_.splice(queue);
682 if (count > 0)
683 maybe_unlock_and_signal_one(lock);
684 }
685
686 void
687 8809 epoll_scheduler::
688 post_deferred_completions(op_queue& ops) const
689 {
690
1/2
✓ Branch 1 taken 8809 times.
✗ Branch 2 not taken.
8809 if (ops.empty())
691 8809 return;
692
693 // Fast path: if on scheduler thread, use private queue
694 if (auto* ctx = find_context(this))
695 {
696 ctx->private_queue.splice(ops);
697 return;
698 }
699
700 // Slow path: add to global queue and wake a thread
701 std::unique_lock lock(mutex_);
702 completed_ops_.splice(ops);
703 wake_one_thread_and_unlock(lock);
704 }
705
706 void
707 235 epoll_scheduler::
708 interrupt_reactor() const
709 {
710 // Only write if not already armed to avoid redundant writes
711 235 bool expected = false;
712
2/2
✓ Branch 1 taken 225 times.
✓ Branch 2 taken 10 times.
235 if (eventfd_armed_.compare_exchange_strong(expected, true,
713 std::memory_order_release, std::memory_order_relaxed))
714 {
715 225 std::uint64_t val = 1;
716
1/1
✓ Branch 1 taken 225 times.
225 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
717 }
718 235 }
719
720 void
721 353 epoll_scheduler::
722 signal_all(std::unique_lock<std::mutex>&) const
723 {
724 353 state_ |= 1;
725 353 cond_.notify_all();
726 353 }
727
728 bool
729 1649 epoll_scheduler::
730 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
731 {
732 1649 state_ |= 1;
733
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
734 {
735 lock.unlock();
736 cond_.notify_one();
737 return true;
738 }
739 1649 return false;
740 }
741
742 void
743 484214 epoll_scheduler::
744 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
745 {
746 484214 state_ |= 1;
747 484214 bool have_waiters = state_ > 1;
748 484214 lock.unlock();
749
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 484214 times.
484214 if (have_waiters)
750 cond_.notify_one();
751 484214 }
752
753 void
754 1 epoll_scheduler::
755 clear_signal() const
756 {
757 1 state_ &= ~std::size_t(1);
758 1 }
759
760 void
761 1 epoll_scheduler::
762 wait_for_signal(std::unique_lock<std::mutex>& lock) const
763 {
764
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 while ((state_ & 1) == 0)
765 {
766 1 state_ += 2;
767 1 cond_.wait(lock);
768 1 state_ -= 2;
769 }
770 1 }
771
772 void
773 epoll_scheduler::
774 wait_for_signal_for(
775 std::unique_lock<std::mutex>& lock,
776 long timeout_us) const
777 {
778 if ((state_ & 1) == 0)
779 {
780 state_ += 2;
781 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
782 state_ -= 2;
783 }
784 }
785
786 void
787 1649 epoll_scheduler::
788 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
789 {
790
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
791 return;
792
793
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
794 {
795 26 task_interrupted_ = true;
796 26 lock.unlock();
797 26 interrupt_reactor();
798 }
799 else
800 {
801 1623 lock.unlock();
802 }
803 }
804
805 /** RAII guard for handler execution work accounting.
806
807 Handler consumes 1 work item, may produce N new items via fast-path posts.
808 Net change = N - 1:
809 - If N > 1: add (N-1) to global (more work produced than consumed)
810 - If N == 1: net zero, do nothing
811 - If N < 1: call work_finished() (work consumed, may trigger stop)
812
813 Also drains private queue to global for other threads to process.
814 */
815 struct work_cleanup
816 {
817 epoll_scheduler const* scheduler;
818 std::unique_lock<std::mutex>* lock;
819 scheduler_context* ctx;
820
821 365247 ~work_cleanup()
822 {
823
1/2
✓ Branch 0 taken 365247 times.
✗ Branch 1 not taken.
365247 if (ctx)
824 {
825 365247 long produced = ctx->private_outstanding_work;
826
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 365200 times.
365247 if (produced > 1)
827 47 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
828
2/2
✓ Branch 0 taken 15081 times.
✓ Branch 1 taken 350119 times.
365200 else if (produced < 1)
829 15081 scheduler->work_finished();
830 // produced == 1: net zero, handler consumed what it produced
831 365247 ctx->private_outstanding_work = 0;
832
833
2/2
✓ Branch 1 taken 234986 times.
✓ Branch 2 taken 130261 times.
365247 if (!ctx->private_queue.empty())
834 {
835 234986 lock->lock();
836 234986 scheduler->completed_ops_.splice(ctx->private_queue);
837 }
838 }
839 else
840 {
841 // No thread context - slow-path op was already counted globally
842 scheduler->work_finished();
843 }
844 365247 }
845 };
846
847 /** RAII guard for reactor work accounting.
848
849 Reactor only produces work via timer/signal callbacks posting handlers.
850 Unlike handler execution which consumes 1, the reactor consumes nothing.
851 All produced work must be flushed to global counter.
852 */
853 struct task_cleanup
854 {
855 epoll_scheduler const* scheduler;
856 std::unique_lock<std::mutex>* lock;
857 scheduler_context* ctx;
858
859 127947 ~task_cleanup()
860 127947 {
861
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 127947 times.
127947 if (!ctx)
862 return;
863
864
2/2
✓ Branch 0 taken 4575 times.
✓ Branch 1 taken 123372 times.
127947 if (ctx->private_outstanding_work > 0)
865 {
866 4575 scheduler->outstanding_work_.fetch_add(
867 4575 ctx->private_outstanding_work, std::memory_order_relaxed);
868 4575 ctx->private_outstanding_work = 0;
869 }
870
871
2/2
✓ Branch 1 taken 4575 times.
✓ Branch 2 taken 123372 times.
127947 if (!ctx->private_queue.empty())
872 {
873
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4575 times.
4575 if (!lock->owns_lock())
874 lock->lock();
875 4575 scheduler->completed_ops_.splice(ctx->private_queue);
876 }
877 127947 }
878 };
879
880 void
881 9144 epoll_scheduler::
882 update_timerfd() const
883 {
884 9144 auto nearest = timer_svc_->nearest_expiry();
885
886 9144 itimerspec ts{};
887 9144 int flags = 0;
888
889
3/3
✓ Branch 2 taken 9144 times.
✓ Branch 4 taken 9104 times.
✓ Branch 5 taken 40 times.
9144 if (nearest == timer_service::time_point::max())
890 {
891 // No timers - disarm by setting to 0 (relative)
892 }
893 else
894 {
895 9104 auto now = std::chrono::steady_clock::now();
896
3/3
✓ Branch 1 taken 9104 times.
✓ Branch 4 taken 124 times.
✓ Branch 5 taken 8980 times.
9104 if (nearest <= now)
897 {
898 // Use 1ns instead of 0 - zero disarms the timerfd
899 124 ts.it_value.tv_nsec = 1;
900 }
901 else
902 {
903 8980 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
904
1/1
✓ Branch 1 taken 8980 times.
17960 nearest - now).count();
905 8980 ts.it_value.tv_sec = nsec / 1000000000;
906 8980 ts.it_value.tv_nsec = nsec % 1000000000;
907 // Ensure non-zero to avoid disarming if duration rounds to 0
908
3/4
✓ Branch 0 taken 8976 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8976 times.
8980 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
909 ts.it_value.tv_nsec = 1;
910 }
911 }
912
913
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9144 times.
9144 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
914 detail::throw_system_error(make_err(errno), "timerfd_settime");
915 9144 }
916
917 void
918 127947 epoll_scheduler::
919 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
920 {
921
2/2
✓ Branch 0 taken 118967 times.
✓ Branch 1 taken 8980 times.
127947 int timeout_ms = task_interrupted_ ? 0 : -1;
922
923
2/2
✓ Branch 1 taken 8980 times.
✓ Branch 2 taken 118967 times.
127947 if (lock.owns_lock())
924
1/1
✓ Branch 1 taken 8980 times.
8980 lock.unlock();
925
926 127947 task_cleanup on_exit{this, &lock, ctx};
927
928 // Flush deferred timerfd programming before blocking
929
2/2
✓ Branch 1 taken 4569 times.
✓ Branch 2 taken 123378 times.
127947 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
930
1/1
✓ Branch 1 taken 4569 times.
4569 update_timerfd();
931
932 // Event loop runs without mutex held
933 epoll_event events[128];
934
1/1
✓ Branch 1 taken 127947 times.
127947 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
935
936
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 127947 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
127947 if (nfds < 0 && errno != EINTR)
937 detail::throw_system_error(make_err(errno), "epoll_wait");
938
939 127947 bool check_timers = false;
940 127947 op_queue local_ops;
941
942 // Process events without holding the mutex
943
2/2
✓ Branch 0 taken 128600 times.
✓ Branch 1 taken 127947 times.
256547 for (int i = 0; i < nfds; ++i)
944 {
945
2/2
✓ Branch 0 taken 36 times.
✓ Branch 1 taken 128564 times.
128600 if (events[i].data.ptr == nullptr)
946 {
947 std::uint64_t val;
948
1/1
✓ Branch 1 taken 36 times.
36 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
949 36 eventfd_armed_.store(false, std::memory_order_relaxed);
950 36 continue;
951 36 }
952
953
2/2
✓ Branch 0 taken 4575 times.
✓ Branch 1 taken 123989 times.
128564 if (events[i].data.ptr == &timer_fd_)
954 {
955 std::uint64_t expirations;
956
1/1
✓ Branch 1 taken 4575 times.
4575 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
957 4575 check_timers = true;
958 4575 continue;
959 4575 }
960
961 // Deferred I/O: just set ready events and enqueue descriptor
962 // No per-descriptor mutex locking in reactor hot path!
963 123989 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
964 123989 desc->add_ready_events(events[i].events);
965
966 // Only enqueue if not already enqueued
967 123989 bool expected = false;
968
1/2
✓ Branch 1 taken 123989 times.
✗ Branch 2 not taken.
123989 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
969 std::memory_order_release, std::memory_order_relaxed))
970 {
971 123989 local_ops.push(desc);
972 }
973 }
974
975 // Process timers only when timerfd fires
976
2/2
✓ Branch 0 taken 4575 times.
✓ Branch 1 taken 123372 times.
127947 if (check_timers)
977 {
978
1/1
✓ Branch 1 taken 4575 times.
4575 timer_svc_->process_expired();
979
1/1
✓ Branch 1 taken 4575 times.
4575 update_timerfd();
980 }
981
982
1/1
✓ Branch 1 taken 127947 times.
127947 lock.lock();
983
984
2/2
✓ Branch 1 taken 69827 times.
✓ Branch 2 taken 58120 times.
127947 if (!local_ops.empty())
985 69827 completed_ops_.splice(local_ops);
986 127947 }
987
988 std::size_t
989 365412 epoll_scheduler::
990 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
991 {
992 for (;;)
993 {
994
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 493349 times.
493360 if (stopped_)
995 11 return 0;
996
997 493349 scheduler_op* op = completed_ops_.pop();
998
999 // Handle reactor sentinel - time to poll for I/O
1000
2/2
✓ Branch 0 taken 128095 times.
✓ Branch 1 taken 365254 times.
493349 if (op == &task_op_)
1001 {
1002 128095 bool more_handlers = !completed_ops_.empty();
1003
1004 // Nothing to run the reactor for: no pending work to wait on,
1005 // or caller requested a non-blocking poll
1006
4/4
✓ Branch 0 taken 9128 times.
✓ Branch 1 taken 118967 times.
✓ Branch 2 taken 148 times.
✓ Branch 3 taken 127947 times.
137223 if (!more_handlers &&
1007
3/4
✓ Branch 1 taken 8980 times.
✓ Branch 2 taken 148 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 8980 times.
18256 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1008 timeout_us == 0))
1009 {
1010 148 completed_ops_.push(&task_op_);
1011 148 return 0;
1012 }
1013
1014
3/4
✓ Branch 0 taken 8980 times.
✓ Branch 1 taken 118967 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8980 times.
127947 task_interrupted_ = more_handlers || timeout_us == 0;
1015 127947 task_running_.store(true, std::memory_order_release);
1016
1017
2/2
✓ Branch 0 taken 118967 times.
✓ Branch 1 taken 8980 times.
127947 if (more_handlers)
1018 118967 unlock_and_signal_one(lock);
1019
1020 127947 run_task(lock, ctx);
1021
1022 127947 task_running_.store(false, std::memory_order_relaxed);
1023 127947 completed_ops_.push(&task_op_);
1024 127947 continue;
1025 127947 }
1026
1027 // Handle operation
1028
2/2
✓ Branch 0 taken 365247 times.
✓ Branch 1 taken 7 times.
365254 if (op != nullptr)
1029 {
1030
1/2
✓ Branch 1 taken 365247 times.
✗ Branch 2 not taken.
365247 if (!completed_ops_.empty())
1031
1/1
✓ Branch 1 taken 365247 times.
365247 unlock_and_signal_one(lock);
1032 else
1033 lock.unlock();
1034
1035 365247 work_cleanup on_exit{this, &lock, ctx};
1036
1037
1/1
✓ Branch 1 taken 365247 times.
365247 (*op)();
1038 365247 return 1;
1039 365247 }
1040
1041 // No pending work to wait on, or caller requested non-blocking poll
1042
5/6
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✓ Branch 5 taken 6 times.
✓ Branch 6 taken 1 time.
14 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1043 timeout_us == 0)
1044 6 return 0;
1045
1046 1 clear_signal();
1047
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (timeout_us < 0)
1048 1 wait_for_signal(lock);
1049 else
1050 wait_for_signal_for(lock, timeout_us);
1051 127948 }
1052 }
1053
1054 } // namespace boost::corosio::detail
1055
1056 #endif
1057