GCC Code Coverage Report


Directory: ./
File: libs/capy/src/ex/thread_pool.cpp
Date: 2026-01-15 20:40:20
Exec Total Coverage
Lines: 96 104 92.3%
Functions: 19 20 95.0%
Branches: 32 37 86.5%

Line Branch Exec Source
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/capy
8 //
9
10 #include "src/work_allocator.hpp"
11
12 #include <boost/capy/ex/thread_pool.hpp>
13 #include <atomic>
14 #include <condition_variable>
15 #include <mutex>
16 #include <thread>
17 #include <vector>
18
19 namespace boost {
20 namespace capy {
21
22 //------------------------------------------------------------------------------
23
24 // Handler that wraps an any_coro for execution
25 class coro_handler : public execution_context::handler
26 {
27 any_coro h_;
28
29 public:
30 83 explicit coro_handler(any_coro h) noexcept
31 83 : h_(h)
32 {
33 83 }
34
35 83 void operator()() override
36 {
37 83 h_.resume();
38 83 }
39
40 void destroy() override
41 {
42 // Coroutine handle is not owned, nothing to destroy
43 }
44 };
45
46 //------------------------------------------------------------------------------
47
48 class thread_pool::impl
49 {
50 // Prepended to each work allocation to track metadata
51 struct header
52 {
53 header* next;
54 std::size_t size;
55 std::size_t align;
56 };
57
58 std::mutex mutex_;
59 std::condition_variable cv_;
60 header* head_;
61 header* tail_;
62 std::vector<std::thread> threads_;
63 work_allocator arena_;
64 std::atomic<std::size_t> work_count_;
65 bool stop_;
66
67 static header*
68 83 to_header(void* p) noexcept
69 {
70 83 return static_cast<header*>(p) - 1;
71 }
72
73 static void*
74 166 from_header(header* h) noexcept
75 {
76 166 return h + 1;
77 }
78
79 public:
80 14 ~impl()
81 {
82 {
83 14 std::lock_guard<std::mutex> lock(mutex_);
84 14 stop_ = true;
85 14 }
86 14 cv_.notify_all();
87
88
2/2
✓ Branch 5 taken 21 times.
✓ Branch 6 taken 14 times.
35 for(auto& t : threads_)
89 21 t.join();
90
91 // Drain remaining work (no lock needed, threads are joined)
92
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14 times.
14 while(head_)
93 {
94 header* h = head_;
95 head_ = head_->next;
96 auto* w = static_cast<execution_context::handler*>(from_header(h));
97 w->destroy();
98 arena_.deallocate(h, h->size, h->align);
99 }
100 14 }
101
102 explicit
103 14 impl(std::size_t num_threads)
104 28 : head_(nullptr)
105 14 , tail_(nullptr)
106 14 , work_count_(0)
107
1/1
✓ Branch 4 taken 14 times.
14 , stop_(false)
108 {
109
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 13 times.
14 if(num_threads == 0)
110 1 num_threads = std::thread::hardware_concurrency();
111
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14 times.
14 if(num_threads == 0)
112 num_threads = 1;
113
114
1/1
✓ Branch 1 taken 14 times.
14 threads_.reserve(num_threads);
115
2/2
✓ Branch 0 taken 21 times.
✓ Branch 1 taken 14 times.
35 for(std::size_t i = 0; i < num_threads; ++i)
116
1/1
✓ Branch 1 taken 21 times.
42 threads_.emplace_back([this]{ run(); });
117 14 }
118
119 void
120 2 on_work_started() noexcept
121 {
122 2 ++work_count_;
123 2 }
124
125 void
126 2 on_work_finished() noexcept
127 {
128 2 --work_count_;
129 2 }
130
131 void*
132 83 allocate(std::size_t size, std::size_t align)
133 {
134 // Allocate space for header + work object
135 83 std::size_t total = sizeof(header) + size;
136
1/1
✓ Branch 1 taken 83 times.
83 std::lock_guard<std::mutex> lock(mutex_);
137
1/1
✓ Branch 1 taken 83 times.
83 void* p = arena_.allocate(total, align);
138 83 auto* h = new(p) header{nullptr, total, align};
139 166 return from_header(h);
140 83 }
141
142 void
143 deallocate(void* p, std::size_t, std::size_t) noexcept
144 {
145 // Size/align from caller are ignored; we use stored values
146 header* h = to_header(p);
147 std::lock_guard<std::mutex> lock(mutex_);
148 arena_.deallocate(h, h->size, h->align);
149 }
150
151 void
152 83 submit(execution_context::handler* h)
153 {
154 83 header* hdr = to_header(h);
155 {
156
1/1
✓ Branch 1 taken 83 times.
83 std::lock_guard<std::mutex> lock(mutex_);
157 83 hdr->next = nullptr;
158
2/2
✓ Branch 0 taken 64 times.
✓ Branch 1 taken 19 times.
83 if(tail_)
159 64 tail_->next = hdr;
160 else
161 19 head_ = hdr;
162 83 tail_ = hdr;
163 83 }
164 83 cv_.notify_one();
165 83 }
166
167 void
168 83 post(any_coro h)
169 {
170 // Allocate handler and submit
171 83 void* p = allocate(sizeof(coro_handler), alignof(coro_handler));
172 83 auto* handler = new(p) coro_handler(h);
173 83 submit(handler);
174 83 }
175
176 private:
177 void
178 21 run()
179 {
180 for(;;)
181 {
182 104 header* h = nullptr;
183 {
184
1/1
✓ Branch 1 taken 104 times.
104 std::unique_lock<std::mutex> lock(mutex_);
185
1/1
✓ Branch 1 taken 104 times.
104 cv_.wait(lock, [this]{
186
4/4
✓ Branch 0 taken 105 times.
✓ Branch 1 taken 24 times.
✓ Branch 2 taken 80 times.
✓ Branch 3 taken 25 times.
129 return stop_ || head_ != nullptr;
187 });
188
189
4/4
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 80 times.
✓ Branch 2 taken 21 times.
✓ Branch 3 taken 3 times.
104 if(stop_ && !head_)
190 42 return;
191
192 83 h = head_;
193 83 head_ = head_->next;
194
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 64 times.
83 if(!head_)
195 19 tail_ = nullptr;
196 104 }
197
198 83 auto* w = static_cast<execution_context::handler*>(from_header(h));
199 83 (*w)();
200
201 {
202
1/1
✓ Branch 1 taken 83 times.
83 std::lock_guard<std::mutex> lock(mutex_);
203 83 arena_.deallocate(h, h->size, h->align);
204 83 }
205 83 }
206 }
207 };
208
209 //------------------------------------------------------------------------------
210
211 14 thread_pool::
212 ~thread_pool()
213 {
214 14 shutdown();
215
1/2
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
14 delete impl_;
216 14 destroy();
217 14 }
218
219 14 thread_pool::
220 14 thread_pool(std::size_t num_threads)
221
2/4
✓ Branch 2 taken 14 times.
✓ Branch 5 taken 14 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
14 : impl_(new impl(num_threads))
222 {
223 14 }
224
225 //------------------------------------------------------------------------------
226
227 void
228 2 thread_pool::executor_type::
229 on_work_started() const noexcept
230 {
231 2 pool_->impl_->on_work_started();
232 2 }
233
234 void
235 2 thread_pool::executor_type::
236 on_work_finished() const noexcept
237 {
238 2 pool_->impl_->on_work_finished();
239 2 }
240
241 void
242 83 thread_pool::executor_type::
243 post(any_coro h) const
244 {
245 83 pool_->impl_->post(h);
246 83 }
247
248 } // capy
249 } // boost
250