Google OR-Tools: ortools/base/threadpool.cc Source File

1

2

3

4

5

6

7

8

9

10

11

12

13

15

16#include <optional>

17#include <utility>

18

19#include "absl/algorithm/container.h"

20#include "absl/base/nullability.h"

21#include "absl/base/optimization.h"

22#include "absl/base/thread_annotations.h"

23#include "absl/functional/any_invocable.h"

24#include "absl/log/check.h"

25#include "absl/strings/string_view.h"

26#include "absl/synchronization/mutex.h"

27

29

30

31

32

34 : max_threads_(num_threads == 0 ? 1 : num_threads) {

35 CHECK_GT(max_threads_, 0u);

36

37 absl::MutexLock lock(mutex_);

38 SpawnThread();

39}

40

43

45

46

47 {

48 absl::MutexLock l(mutex_);

49 stopping_ = true;

50 for (Waiter* absl_nonnull waiter : waiters_) {

51 waiter->cv.Signal();

52 }

53

54

55 auto queue_empty = [this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {

56 return queue_.empty();

57 };

58 mutex_.Await(absl::Condition(&queue_empty));

59 }

60

61

62 for (auto& worker : threads_) {

63 worker.join();

64 }

65}

66

67void ThreadPool::SpawnThread() {

68 CHECK_LE(threads_.size(), max_threads_);

69 threads_.emplace_back([this] { RunWorker(); });

70}

71

72void ThreadPool::RunWorker() {

73 {

74 absl::MutexLock lock(mutex_);

75 ++running_threads_;

76 }

77 while (true) {

78 std::optional<absl::AnyInvocable<void() &&>> item = DequeueWork();

79 if (!item.has_value()) {

80 break;

81 }

82 DCHECK(item);

83 std::move (*item)();

84 }

85}

86

87void ThreadPool::SignalWaiter() {

88 DCHECK(!queue_.empty());

89 if (waiters_.empty()) {

90

91 if (running_threads_ == threads_.size() && threads_.size() < max_threads_) {

92 SpawnThread();

93 }

94 } else {

95

96

97

98 waiters_.back()->cv.Signal();

99 }

100}

101

102std::optional<absl::AnyInvocable<void() &&>> ThreadPool::DequeueWork() {

103

104 absl::MutexLock m(mutex_);

105 while (queue_.empty() && !stopping_) {

106 Waiter self;

107 waiters_.push_back(&self);

108 self.cv.Wait(&mutex_);

109 waiters_.erase(absl::c_find(waiters_, &self));

110 }

111 if (queue_.empty()) {

112 DCHECK(stopping_);

113 return std::nullopt;

114 }

115 absl::AnyInvocable<void() &&> result = std::move(queue_.front());

116 queue_.pop_front();

117 if (!queue_.empty()) {

118 SignalWaiter();

119 }

120 return std::move(result);

121}

122

124

125 absl::MutexLock m(mutex_);

126 DCHECK(!stopping_) << "Callback added after destructor started";

127 if (ABSL_PREDICT_FALSE(stopping_)) return;

128 queue_.push_back(std::move(callback));

129 SignalWaiter();

130}

131

132}

~ThreadPool()

Definition threadpool.cc:44

ThreadPool(int num_threads)

Definition threadpool.cc:33

void Schedule(absl::AnyInvocable< void() && > callback)

Definition threadpool.cc:123