9#include <unordered_map> 
   20  std::queue<std::function<void()>> 
q;
 
   21  std::condition_variable 
cond;
 
   34      std::lock_guard<std::mutex> lk(
mtx);
 
 
   43      std::function<void()> task;
 
   45        std::unique_lock<std::mutex> lk(
mtx);
 
   46        cond.wait(lk, [
this] { 
return !this->q.empty() || this->
stop; });
 
   47        if (q.empty() && 
stop) {
 
   50        task = std::move(
q.front());
 
 
   61      std::lock_guard<std::mutex> lk(
mtx);
 
   63        throw std::runtime_error(
 
   64            "Cannot enqueue work after stream is stopped.");
 
   66      q.emplace(std::forward<F>(
f));
 
 
 
   88    auto stream = 
Stream(streams_.size(), d);
 
 
   97    return default_streams_.at(d.
type);
 
 
  106      std::lock_guard<std::mutex> lk(mtx);
 
  109    completion_cv.notify_all();
 
 
  114      std::lock_guard<std::mutex> lk(mtx);
 
  117    completion_cv.notify_all();
 
 
  121    return n_active_tasks_;
 
 
  125    std::unique_lock<std::mutex> lk(mtx);
 
  127    if (n_tasks_old > 1) {
 
  128      completion_cv.wait(lk, [
this, n_tasks_old] {
 
 
  135    for (
auto s : streams_) {
 
 
  142  std::vector<StreamThread*> streams_;
 
  143  std::unordered_map<Device::DeviceType, Stream> default_streams_;
 
  144  std::condition_variable completion_cv;
 
 
  150  streams_[stream.
index]->enqueue(std::forward<F>(
f));
 
 
 
Definition scheduler.h:72
 
void wait_for_one()
Definition scheduler.h:124
 
Scheduler & operator=(Scheduler &&)=delete
 
void enqueue(const Stream &stream, F &&f)
Definition scheduler.h:149
 
Stream new_stream(const Device &d)
Definition scheduler.h:87
 
Stream get_default_stream(const Device &d) const
Definition scheduler.h:96
 
Scheduler()
Definition scheduler.h:74
 
int n_active_tasks() const
Definition scheduler.h:120
 
Scheduler(const Scheduler &)=delete
 
~Scheduler()
Definition scheduler.h:134
 
void set_default_stream(const Stream &s)
Definition scheduler.h:100
 
Scheduler & operator=(const Scheduler &)=delete
 
void notify_task_completion(const Stream &stream)
Definition scheduler.h:112
 
Scheduler(Scheduler &&)=delete
 
void notify_new_task(const Stream &stream)
Definition scheduler.h:104
 
Definition scheduler.h:16
 
void notify_task_completion(const Stream &stream)
Definition scheduler.h:168
 
void notify_new_task(const Stream &stream)
Definition scheduler.h:164
 
void wait_for_one()
Definition scheduler.h:172
 
int n_active_tasks()
Definition scheduler.h:160
 
void enqueue(const Stream &stream, F &&f)
Definition scheduler.h:156
 
static constexpr DeviceType gpu
Definition device.h:14
 
static constexpr DeviceType cpu
Definition device.h:13
 
DeviceType type
Definition device.h:18
 
Device device
Definition stream.h:11
 
int index
Definition stream.h:10
 
Definition scheduler.h:18
 
void thread_fn()
Definition scheduler.h:41
 
std::thread thread
Definition scheduler.h:24
 
bool stop
Definition scheduler.h:22
 
void enqueue(F &&f)
Definition scheduler.h:59
 
std::condition_variable cond
Definition scheduler.h:21
 
std::mutex mtx
Definition scheduler.h:19
 
~StreamThread()
Definition scheduler.h:31
 
Stream stream
Definition scheduler.h:23
 
StreamThread(Stream stream)
Definition scheduler.h:26
 
std::queue< std::function< void()> > q
Definition scheduler.h:20
 
float f
Definition bf16.h:16