Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/boostorg/capy
8 : //
9 :
10 : #include "src/work_allocator.hpp"
11 :
12 : #include <boost/capy/ex/thread_pool.hpp>
13 : #include <atomic>
14 : #include <condition_variable>
15 : #include <mutex>
16 : #include <thread>
17 : #include <vector>
18 :
19 : namespace boost {
20 : namespace capy {
21 :
22 : //------------------------------------------------------------------------------
23 :
24 : // Handler that wraps an any_coro for execution
25 : class coro_handler : public execution_context::handler
26 : {
27 : any_coro h_;
28 :
29 : public:
30 83 : explicit coro_handler(any_coro h) noexcept
31 83 : : h_(h)
32 : {
33 83 : }
34 :
35 83 : void operator()() override
36 : {
37 83 : h_.resume();
38 83 : }
39 :
40 0 : void destroy() override
41 : {
42 : // Coroutine handle is not owned, nothing to destroy
43 0 : }
44 : };
45 :
46 : //------------------------------------------------------------------------------
47 :
48 : class thread_pool::impl
49 : {
50 : // Prepended to each work allocation to track metadata
51 : struct header
52 : {
53 : header* next;
54 : std::size_t size;
55 : std::size_t align;
56 : };
57 :
58 : std::mutex mutex_;
59 : std::condition_variable cv_;
60 : header* head_;
61 : header* tail_;
62 : std::vector<std::thread> threads_;
63 : work_allocator arena_;
64 : std::atomic<std::size_t> work_count_;
65 : bool stop_;
66 :
67 : static header*
68 83 : to_header(void* p) noexcept
69 : {
70 83 : return static_cast<header*>(p) - 1;
71 : }
72 :
73 : static void*
74 166 : from_header(header* h) noexcept
75 : {
76 166 : return h + 1;
77 : }
78 :
79 : public:
80 14 : ~impl()
81 : {
82 : {
83 14 : std::lock_guard<std::mutex> lock(mutex_);
84 14 : stop_ = true;
85 14 : }
86 14 : cv_.notify_all();
87 :
88 35 : for(auto& t : threads_)
89 21 : t.join();
90 :
91 : // Drain remaining work (no lock needed, threads are joined)
92 14 : while(head_)
93 : {
94 0 : header* h = head_;
95 0 : head_ = head_->next;
96 0 : auto* w = static_cast<execution_context::handler*>(from_header(h));
97 0 : w->destroy();
98 0 : arena_.deallocate(h, h->size, h->align);
99 : }
100 14 : }
101 :
102 : explicit
103 14 : impl(std::size_t num_threads)
104 28 : : head_(nullptr)
105 14 : , tail_(nullptr)
106 14 : , work_count_(0)
107 14 : , stop_(false)
108 : {
109 14 : if(num_threads == 0)
110 1 : num_threads = std::thread::hardware_concurrency();
111 14 : if(num_threads == 0)
112 0 : num_threads = 1;
113 :
114 14 : threads_.reserve(num_threads);
115 35 : for(std::size_t i = 0; i < num_threads; ++i)
116 42 : threads_.emplace_back([this]{ run(); });
117 14 : }
118 :
119 : void
120 2 : on_work_started() noexcept
121 : {
122 2 : ++work_count_;
123 2 : }
124 :
125 : void
126 2 : on_work_finished() noexcept
127 : {
128 2 : --work_count_;
129 2 : }
130 :
131 : void*
132 83 : allocate(std::size_t size, std::size_t align)
133 : {
134 : // Allocate space for header + work object
135 83 : std::size_t total = sizeof(header) + size;
136 83 : std::lock_guard<std::mutex> lock(mutex_);
137 83 : void* p = arena_.allocate(total, align);
138 83 : auto* h = new(p) header{nullptr, total, align};
139 166 : return from_header(h);
140 83 : }
141 :
142 : void
143 : deallocate(void* p, std::size_t, std::size_t) noexcept
144 : {
145 : // Size/align from caller are ignored; we use stored values
146 : header* h = to_header(p);
147 : std::lock_guard<std::mutex> lock(mutex_);
148 : arena_.deallocate(h, h->size, h->align);
149 : }
150 :
151 : void
152 83 : submit(execution_context::handler* h)
153 : {
154 83 : header* hdr = to_header(h);
155 : {
156 83 : std::lock_guard<std::mutex> lock(mutex_);
157 83 : hdr->next = nullptr;
158 83 : if(tail_)
159 64 : tail_->next = hdr;
160 : else
161 19 : head_ = hdr;
162 83 : tail_ = hdr;
163 83 : }
164 83 : cv_.notify_one();
165 83 : }
166 :
167 : void
168 83 : post(any_coro h)
169 : {
170 : // Allocate handler and submit
171 83 : void* p = allocate(sizeof(coro_handler), alignof(coro_handler));
172 83 : auto* handler = new(p) coro_handler(h);
173 83 : submit(handler);
174 83 : }
175 :
176 : private:
177 : void
178 21 : run()
179 : {
180 : for(;;)
181 : {
182 104 : header* h = nullptr;
183 : {
184 104 : std::unique_lock<std::mutex> lock(mutex_);
185 104 : cv_.wait(lock, [this]{
186 129 : return stop_ || head_ != nullptr;
187 : });
188 :
189 104 : if(stop_ && !head_)
190 42 : return;
191 :
192 83 : h = head_;
193 83 : head_ = head_->next;
194 83 : if(!head_)
195 19 : tail_ = nullptr;
196 104 : }
197 :
198 83 : auto* w = static_cast<execution_context::handler*>(from_header(h));
199 83 : (*w)();
200 :
201 : {
202 83 : std::lock_guard<std::mutex> lock(mutex_);
203 83 : arena_.deallocate(h, h->size, h->align);
204 83 : }
205 83 : }
206 : }
207 : };
208 :
209 : //------------------------------------------------------------------------------
210 :
211 14 : thread_pool::
212 : ~thread_pool()
213 : {
214 14 : shutdown();
215 14 : delete impl_;
216 14 : destroy();
217 14 : }
218 :
219 14 : thread_pool::
220 14 : thread_pool(std::size_t num_threads)
221 14 : : impl_(new impl(num_threads))
222 : {
223 14 : }
224 :
225 : //------------------------------------------------------------------------------
226 :
227 : void
228 2 : thread_pool::executor_type::
229 : on_work_started() const noexcept
230 : {
231 2 : pool_->impl_->on_work_started();
232 2 : }
233 :
234 : void
235 2 : thread_pool::executor_type::
236 : on_work_finished() const noexcept
237 : {
238 2 : pool_->impl_->on_work_finished();
239 2 : }
240 :
241 : void
242 83 : thread_pool::executor_type::
243 : post(any_coro h) const
244 : {
245 83 : pool_->impl_->post(h);
246 83 : }
247 :
248 : } // capy
249 : } // boost
|