Google OR-Tools: ortools/base/threadpool.cc Source File
19#include "absl/algorithm/container.h"
20#include "absl/base/nullability.h"
21#include "absl/base/optimization.h"
22#include "absl/base/thread_annotations.h"
23#include "absl/functional/any_invocable.h"
25#include "absl/strings/string_view.h"
26#include "absl/synchronization/mutex.h"
34 : max_threads_(num_threads == 0 ? 1 : num_threads) {
35 CHECK_GT(max_threads_, 0u);
50 for (Waiter* absl_nonnull waiter : waiters_) {
55 auto queue_empty = [this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
58 mutex_.Await(absl::Condition(&queue_empty));
67void ThreadPool::SpawnThread() {
68 CHECK_LE(threads_.size(), max_threads_);
69 threads_.emplace_back([this] { RunWorker(); });
72void ThreadPool::RunWorker() {
74 absl::MutexLock lock(mutex_);
78 std::optional<absl::AnyInvocable<void() &&>> item = DequeueWork();
87void ThreadPool::SignalWaiter() {
91 if (running_threads_ == threads_.size() && threads_.size() < max_threads_) {
98 waiters_.back()->cv.Signal();
102std::optional<absl::AnyInvocable<void() &&>> ThreadPool::DequeueWork() {
104 absl::MutexLock m(mutex_);
105 while (queue_.empty() && !stopping_) {
107 waiters_.push_back(&self);
109 waiters_.erase(absl::c_find(waiters_, &self));
115 absl::AnyInvocable<void() &&> result = std::move(queue_.front());
125 absl::MutexLock m(mutex_);
126 DCHECK(!stopping_) << "Callback added after destructor started";
127 if (ABSL_PREDICT_FALSE(stopping_)) return;
~ThreadPool()
Definition threadpool.cc:44
ThreadPool(int num_threads)
Definition threadpool.cc:33
void Schedule(absl::AnyInvocable< void() && > callback)
Definition threadpool.cc:123