LCOV - code coverage report
Current view: top level - libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 92.3 % 104 96
Test Date: 2026-01-15 20:40:20 Functions: 95.0 % 20 19

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/boostorg/capy
       8              : //
       9              : 
      10              : #include "src/work_allocator.hpp"
      11              : 
      12              : #include <boost/capy/ex/thread_pool.hpp>
      13              : #include <atomic>
      14              : #include <condition_variable>
      15              : #include <mutex>
      16              : #include <thread>
      17              : #include <vector>
      18              : 
      19              : namespace boost {
      20              : namespace capy {
      21              : 
      22              : //------------------------------------------------------------------------------
      23              : 
      24              : // Handler that wraps an any_coro for execution
      25              : class coro_handler : public execution_context::handler
      26              : {
      27              :     any_coro h_;
      28              : 
      29              : public:
      30           83 :     explicit coro_handler(any_coro h) noexcept
      31           83 :         : h_(h)
      32              :     {
      33           83 :     }
      34              : 
      35           83 :     void operator()() override
      36              :     {
      37           83 :         h_.resume();
      38           83 :     }
      39              : 
      40            0 :     void destroy() override
      41              :     {
      42              :         // Coroutine handle is not owned, nothing to destroy
      43            0 :     }
      44              : };
      45              : 
      46              : //------------------------------------------------------------------------------
      47              : 
      48              : class thread_pool::impl
      49              : {
      50              :     // Prepended to each work allocation to track metadata
      51              :     struct header
      52              :     {
      53              :         header* next;
      54              :         std::size_t size;
      55              :         std::size_t align;
      56              :     };
      57              : 
      58              :     std::mutex mutex_;
      59              :     std::condition_variable cv_;
      60              :     header* head_;
      61              :     header* tail_;
      62              :     std::vector<std::thread> threads_;
      63              :     work_allocator arena_;
      64              :     std::atomic<std::size_t> work_count_;
      65              :     bool stop_;
      66              : 
      67              :     static header*
      68           83 :     to_header(void* p) noexcept
      69              :     {
      70           83 :         return static_cast<header*>(p) - 1;
      71              :     }
      72              : 
      73              :     static void*
      74          166 :     from_header(header* h) noexcept
      75              :     {
      76          166 :         return h + 1;
      77              :     }
      78              : 
      79              : public:
      80           14 :     ~impl()
      81              :     {
      82              :         {
      83           14 :             std::lock_guard<std::mutex> lock(mutex_);
      84           14 :             stop_ = true;
      85           14 :         }
      86           14 :         cv_.notify_all();
      87              : 
      88           35 :         for(auto& t : threads_)
      89           21 :             t.join();
      90              : 
      91              :         // Drain remaining work (no lock needed, threads are joined)
      92           14 :         while(head_)
      93              :         {
      94            0 :             header* h = head_;
      95            0 :             head_ = head_->next;
      96            0 :             auto* w = static_cast<execution_context::handler*>(from_header(h));
      97            0 :             w->destroy();
      98            0 :             arena_.deallocate(h, h->size, h->align);
      99              :         }
     100           14 :     }
     101              : 
     102              :     explicit
     103           14 :     impl(std::size_t num_threads)
     104           28 :         : head_(nullptr)
     105           14 :         , tail_(nullptr)
     106           14 :         , work_count_(0)
     107           14 :         , stop_(false)
     108              :     {
     109           14 :         if(num_threads == 0)
     110            1 :             num_threads = std::thread::hardware_concurrency();
     111           14 :         if(num_threads == 0)
     112            0 :             num_threads = 1;
     113              : 
     114           14 :         threads_.reserve(num_threads);
     115           35 :         for(std::size_t i = 0; i < num_threads; ++i)
     116           42 :             threads_.emplace_back([this]{ run(); });
     117           14 :     }
     118              : 
     119              :     void
     120            2 :     on_work_started() noexcept
     121              :     {
     122            2 :         ++work_count_;
     123            2 :     }
     124              : 
     125              :     void
     126            2 :     on_work_finished() noexcept
     127              :     {
     128            2 :         --work_count_;
     129            2 :     }
     130              : 
     131              :     void*
     132           83 :     allocate(std::size_t size, std::size_t align)
     133              :     {
     134              :         // Allocate space for header + work object
     135           83 :         std::size_t total = sizeof(header) + size;
     136           83 :         std::lock_guard<std::mutex> lock(mutex_);
     137           83 :         void* p = arena_.allocate(total, align);
     138           83 :         auto* h = new(p) header{nullptr, total, align};
     139          166 :         return from_header(h);
     140           83 :     }
     141              : 
     142              :     void
     143              :     deallocate(void* p, std::size_t, std::size_t) noexcept
     144              :     {
     145              :         // Size/align from caller are ignored; we use stored values
     146              :         header* h = to_header(p);
     147              :         std::lock_guard<std::mutex> lock(mutex_);
     148              :         arena_.deallocate(h, h->size, h->align);
     149              :     }
     150              : 
     151              :     void
     152           83 :     submit(execution_context::handler* h)
     153              :     {
     154           83 :         header* hdr = to_header(h);
     155              :         {
     156           83 :             std::lock_guard<std::mutex> lock(mutex_);
     157           83 :             hdr->next = nullptr;
     158           83 :             if(tail_)
     159           64 :                 tail_->next = hdr;
     160              :             else
     161           19 :                 head_ = hdr;
     162           83 :             tail_ = hdr;
     163           83 :         }
     164           83 :         cv_.notify_one();
     165           83 :     }
     166              : 
     167              :     void
     168           83 :     post(any_coro h)
     169              :     {
     170              :         // Allocate handler and submit
     171           83 :         void* p = allocate(sizeof(coro_handler), alignof(coro_handler));
     172           83 :         auto* handler = new(p) coro_handler(h);
     173           83 :         submit(handler);
     174           83 :     }
     175              : 
     176              : private:
     177              :     void
     178           21 :     run()
     179              :     {
     180              :         for(;;)
     181              :         {
     182          104 :             header* h = nullptr;
     183              :             {
     184          104 :                 std::unique_lock<std::mutex> lock(mutex_);
     185          104 :                 cv_.wait(lock, [this]{
     186          129 :                     return stop_ || head_ != nullptr;
     187              :                 });
     188              : 
     189          104 :                 if(stop_ && !head_)
     190           42 :                     return;
     191              : 
     192           83 :                 h = head_;
     193           83 :                 head_ = head_->next;
     194           83 :                 if(!head_)
     195           19 :                     tail_ = nullptr;
     196          104 :             }
     197              : 
     198           83 :             auto* w = static_cast<execution_context::handler*>(from_header(h));
     199           83 :             (*w)();
     200              : 
     201              :             {
     202           83 :                 std::lock_guard<std::mutex> lock(mutex_);
     203           83 :                 arena_.deallocate(h, h->size, h->align);
     204           83 :             }
     205           83 :         }
     206              :     }
     207              : };
     208              : 
     209              : //------------------------------------------------------------------------------
     210              : 
     211           14 : thread_pool::
     212              : ~thread_pool()
     213              : {
     214           14 :     shutdown();
     215           14 :     delete impl_;
     216           14 :     destroy();
     217           14 : }
     218              : 
     219           14 : thread_pool::
     220           14 : thread_pool(std::size_t num_threads)
     221           14 :     : impl_(new impl(num_threads))
     222              : {
     223           14 : }
     224              : 
     225              : //------------------------------------------------------------------------------
     226              : 
     227              : void
     228            2 : thread_pool::executor_type::
     229              : on_work_started() const noexcept
     230              : {
     231            2 :     pool_->impl_->on_work_started();
     232            2 : }
     233              : 
     234              : void
     235            2 : thread_pool::executor_type::
     236              : on_work_finished() const noexcept
     237              : {
     238            2 :     pool_->impl_->on_work_finished();
     239            2 : }
     240              : 
     241              : void
     242           83 : thread_pool::executor_type::
     243              : post(any_coro h) const
     244              : {
     245           83 :     pool_->impl_->post(h);
     246           83 : }
     247              : 
     248              : } // capy
     249              : } // boost
        

Generated by: LCOV version 2.3