From 60e20bedb6d087a9fd253d78ab671de5ec5c3a0e Mon Sep 17 00:00:00 2001 From: Awni Hannun Date: Sat, 19 Jul 2025 13:35:57 -0700 Subject: [PATCH] Don't use shared event in worker --- mlx/backend/cuda/allocator.cpp | 3 +++ mlx/backend/cuda/device.cpp | 1 - mlx/backend/cuda/worker.cpp | 43 ++++++++++++++++++++-------------- mlx/backend/cuda/worker.h | 27 ++++++++++----------- 4 files changed, 41 insertions(+), 33 deletions(-) diff --git a/mlx/backend/cuda/allocator.cpp b/mlx/backend/cuda/allocator.cpp index 642cedb1f..6ca225a5f 100644 --- a/mlx/backend/cuda/allocator.cpp +++ b/mlx/backend/cuda/allocator.cpp @@ -30,6 +30,9 @@ SmallSizePool::SmallSizePool() { reinterpret_cast(buffer_) + small_pool_size); next_free_ = reinterpret_cast(buffer_); + CHECK_CUDA_ERROR( + cudaMemAdvise(buffer_, small_pool_size, cudaMemAdviseSetReadMostly, 0)); + auto num_blocks = small_pool_size / small_block_size; auto curr = next_free_; for (size_t i = 0; i < num_blocks - 1; ++i) { diff --git a/mlx/backend/cuda/device.cpp b/mlx/backend/cuda/device.cpp index 47f1a44dc..366cdf826 100644 --- a/mlx/backend/cuda/device.cpp +++ b/mlx/backend/cuda/device.cpp @@ -306,7 +306,6 @@ void CommandEncoder::commit() { } // Put completion handlers in a batch. - worker_.end_batch(); worker_.commit(stream_); } diff --git a/mlx/backend/cuda/worker.cpp b/mlx/backend/cuda/worker.cpp index 544928419..ee5c664e6 100644 --- a/mlx/backend/cuda/worker.cpp +++ b/mlx/backend/cuda/worker.cpp @@ -11,10 +11,10 @@ Worker::Worker() Worker::~Worker() { { - std::lock_guard lock(worker_mutex_); + std::lock_guard lock(mtx_); stop_ = true; } - worker_event_.signal(batch_ + 1); + cond_.notify_one(); worker_.join(); } @@ -22,35 +22,45 @@ void Worker::add_task(std::function task) { pending_tasks_.push_back(std::move(task)); } -void Worker::end_batch() { - batch_++; +void signal_worker(void* data) { + auto w = static_cast(data); + w->signal_(); +} + +void Worker::signal_() { { - std::lock_guard lock(worker_mutex_); - worker_tasks_[batch_] = std::move(pending_tasks_); + std::lock_guard lock(mtx_); + signaled_batch_++; } - uncommited_batches_++; + cond_.notify_one(); } void Worker::commit(cudaStream_t stream) { - if (uncommited_batches_ == 0) { + // Move pending tasks into tasks + if (pending_tasks_.empty()) { return; } - uncommited_batches_ = 0; - // Signal the |worker_event_| in |signal_stream_| after the kernels in - // |stream_| finish running. + { + std::lock_guard lock(mtx_); + // Move pending tasks into ready tasks + worker_tasks_[++committed_batch_] = std::move(pending_tasks_); + } signal_event_.record(stream); signal_event_.wait(signal_stream_); - worker_event_.signal(signal_stream_, batch_); + cudaLaunchHostFunc(signal_stream_, signal_worker, this); } void Worker::thread_fn() { while (!stop_) { - uint64_t batch = worker_event_.value(); + uint64_t current_batch = 0; Tasks tasks; { - std::lock_guard lock(worker_mutex_); - // Move tasks in signaled batches. - auto end = worker_tasks_.upper_bound(batch); + std::unique_lock lk(mtx_); + cond_.wait(lk, [this, ¤t_batch] { + return this->signaled_batch_ > current_batch || this->stop_; + }); + current_batch = signaled_batch_; + auto end = worker_tasks_.upper_bound(current_batch); for (auto it = worker_tasks_.begin(); it != end; ++it) { if (tasks.empty()) { tasks = std::move(it->second); @@ -66,7 +76,6 @@ void Worker::thread_fn() { auto task = std::move(tasks[i]); task(); } - worker_event_.wait(batch + 1); } } diff --git a/mlx/backend/cuda/worker.h b/mlx/backend/cuda/worker.h index 6d39c3a61..9e6b7b5f4 100644 --- a/mlx/backend/cuda/worker.h +++ b/mlx/backend/cuda/worker.h @@ -5,6 +5,7 @@ #include "mlx/backend/cuda/event.h" #include "mlx/backend/cuda/utils.h" +#include #include #include #include @@ -12,6 +13,8 @@ namespace mlx::core::cu { +void signal_worker(void* data); + // Run tasks in worker thread, synchronized with cuda stream. class Worker { public: @@ -24,32 +27,25 @@ class Worker { // Add a pending |task| that will run when consumed or commited. void add_task(std::function task); - // Put pending tasks in a batch. - void end_batch(); - // Inform worker thread to run current batches after kernels in |stream| // finish running. void commit(cudaStream_t stream); - // Return how many batches have been added but not committed yet. - size_t uncommited_batches() const { - return uncommited_batches_; - } - private: - void thread_fn(); + friend void signal_worker(void*); - uint64_t batch_{0}; - size_t uncommited_batches_{0}; + void signal_(); + void thread_fn(); + std::mutex mtx_; + std::condition_variable cond_; + + uint64_t committed_batch_{0}; + uint64_t signaled_batch_{0}; // Cuda stream and event for signaling kernel completion. CudaStream signal_stream_; CudaEvent signal_event_; - // Worker thread. - SharedEvent worker_event_; - std::thread worker_; - std::mutex worker_mutex_; bool stop_{false}; // Tasks are put in |pending_tasks_| first, and then moved to @@ -57,6 +53,7 @@ class Worker { using Tasks = std::vector>; Tasks pending_tasks_; std::map worker_tasks_; + std::thread worker_; }; } // namespace mlx::core::cu