// This code was modified from https://github.com/progschj/ThreadPool // The original License is copied below: // // Copyright (c) 2012 Jakob Progsch, Václav Zeman // This software is provided 'as-is', without any express or implied // warranty. In no event will the authors be held liable for any damages // arising from the use of this software. // // Permission is granted to anyone to use this software for any purpose, // including commercial applications, and to alter it and redistribute it // freely, subject to the following restrictions: // // 1. The origin of this software must not be misrepresented; you must not // claim that you wrote the original software. If you use this software // in a product, an acknowledgment in the product documentation would be // appreciated but is not required. // // 2. Altered source versions must be plainly marked as such, and must not be // misrepresented as being the original software. // // 3. This notice may not be removed or altered from any source // distribution. #pragma once #include #include #include #include #include #include #include #include #include class ThreadPool { public: ThreadPool(size_t); template auto enqueue(F&& f, Args&&... args) -> std::future>; void resize(size_t); ~ThreadPool(); private: void stop_and_wait(); void start_threads(size_t); std::vector workers; std::queue> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; inline ThreadPool::ThreadPool(size_t threads) : stop(false) { start_threads(threads); } template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future> { using return_type = typename std::invoke_result_t; auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...)); std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); if (stop) { throw std::runtime_error( "[ThreadPool::enqueue] Not allowed on stopped ThreadPool"); } tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } inline void ThreadPool::resize(size_t threads) { if (workers.size() == threads) { return; } if (workers.size() > threads) { stop_and_wait(); } start_threads(threads - workers.size()); } inline ThreadPool::~ThreadPool() { stop_and_wait(); } inline void ThreadPool::stop_and_wait() { // Stop the current threads and wait until they finish { std::unique_lock lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } // Reset the member variables so that the threadpool is reusable stop = false; workers.clear(); } inline void ThreadPool::start_threads(size_t threads) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { for (;;) { std::function task; { std::unique_lock lock(this->queue_mutex); this->condition.wait( lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } }