Compare commits

..

4 Commits

Author SHA1 Message Date
Awni Hannun
d107d8d495 add cuda gemv (#2400) 2025-07-22 08:24:13 -07:00
Awni Hannun
1e496ddb82 [CUDA] Simplify allocator (#2392)
* simplify allocator and fixe race with small pool

* Don't use shared event in worker

* use cuda buffer in small pool

* comment

* comment
2025-07-22 08:24:01 -07:00
Awni Hannun
74eccbf3fa use size option in binary (#2399) 2025-07-22 07:00:53 -07:00
Awni Hannun
08638223ca Fix including stubs in wheel (#2398)
* fix including stubs in wheel

* fix bool_
2025-07-22 06:30:17 -07:00
22 changed files with 322 additions and 191 deletions

View File

@@ -20,6 +20,7 @@ target_sources(
${CMAKE_CURRENT_SOURCE_DIR}/eval.cpp ${CMAKE_CURRENT_SOURCE_DIR}/eval.cpp
${CMAKE_CURRENT_SOURCE_DIR}/event.cu ${CMAKE_CURRENT_SOURCE_DIR}/event.cu
${CMAKE_CURRENT_SOURCE_DIR}/fence.cpp ${CMAKE_CURRENT_SOURCE_DIR}/fence.cpp
${CMAKE_CURRENT_SOURCE_DIR}/gemv.cu
${CMAKE_CURRENT_SOURCE_DIR}/jit_module.cpp ${CMAKE_CURRENT_SOURCE_DIR}/jit_module.cpp
${CMAKE_CURRENT_SOURCE_DIR}/indexing.cpp ${CMAKE_CURRENT_SOURCE_DIR}/indexing.cpp
${CMAKE_CURRENT_SOURCE_DIR}/kernel_utils.cu ${CMAKE_CURRENT_SOURCE_DIR}/kernel_utils.cu
@@ -87,6 +88,11 @@ endif()
target_compile_options( target_compile_options(
mlx PRIVATE "$<$<COMPILE_LANGUAGE:CUDA>:--Wno-deprecated-gpu-targets>") mlx PRIVATE "$<$<COMPILE_LANGUAGE:CUDA>:--Wno-deprecated-gpu-targets>")
if(CMAKE_CUDA_COMPILER_VERSION VERSION_GREATER_EQUAL 12.4.0)
target_compile_options(
mlx PRIVATE "$<$<COMPILE_LANGUAGE:CUDA>:--compress-mode=size>")
endif()
# Compute capability 7 is required for synchronization between CPU/GPU with # Compute capability 7 is required for synchronization between CPU/GPU with
# managed memory. TODO: Add more architectures for potential performance gain. # managed memory. TODO: Add more architectures for potential performance gain.
set(MLX_CUDA_ARCHITECTURES set(MLX_CUDA_ARCHITECTURES

View File

@@ -2,7 +2,6 @@
#include "mlx/backend/cuda/allocator.h" #include "mlx/backend/cuda/allocator.h"
#include "mlx/backend/cuda/utils.h" #include "mlx/backend/cuda/utils.h"
#include "mlx/backend/cuda/worker.h"
#include "mlx/utils.h" #include "mlx/utils.h"
#include <cuda_runtime.h> #include <cuda_runtime.h>
@@ -25,52 +24,58 @@ constexpr int small_block_size = 8;
constexpr int small_pool_size = 4 * page_size; constexpr int small_pool_size = 4 * page_size;
SmallSizePool::SmallSizePool() { SmallSizePool::SmallSizePool() {
CHECK_CUDA_ERROR(cudaMallocManaged(&buffer_, small_pool_size));
end_ = reinterpret_cast<void*>(
reinterpret_cast<char*>(buffer_) + small_pool_size);
next_free_ = reinterpret_cast<Block*>(buffer_);
auto num_blocks = small_pool_size / small_block_size; auto num_blocks = small_pool_size / small_block_size;
buffer_ = new Block[num_blocks];
next_free_ = buffer_;
CHECK_CUDA_ERROR(cudaMallocManaged(&data_, small_pool_size));
CHECK_CUDA_ERROR(
cudaMemAdvise(data_, small_pool_size, cudaMemAdviseSetReadMostly, 0));
auto curr = next_free_; auto curr = next_free_;
for (size_t i = 0; i < num_blocks - 1; ++i) { for (size_t i = 1; i < num_blocks; ++i) {
curr->next = reinterpret_cast<Block*>( curr->next = buffer_ + i;
reinterpret_cast<char*>(buffer_) + (i + 1) * small_block_size);
curr = curr->next; curr = curr->next;
} }
curr->next = nullptr; curr->next = nullptr;
} }
SmallSizePool::~SmallSizePool() { SmallSizePool::~SmallSizePool() {
CHECK_CUDA_ERROR(cudaFree(buffer_)); CHECK_CUDA_ERROR(cudaFree(data_));
delete[] buffer_;
} }
void* SmallSizePool::malloc() { CudaBuffer* SmallSizePool::malloc() {
if (next_free_ == nullptr) { if (next_free_ == nullptr) {
return nullptr; return nullptr;
} }
Block* b = next_free_; Block* b = next_free_;
uint64_t i = next_free_ - buffer_;
next_free_ = next_free_->next; next_free_ = next_free_->next;
return static_cast<void*>(b); b->buf.data = static_cast<char*>(data_) + i * small_block_size;
b->buf.size = small_block_size;
return &b->buf;
} }
void SmallSizePool::free(void* p) { void SmallSizePool::free(CudaBuffer* buf) {
auto b = static_cast<Block*>(p); auto b = reinterpret_cast<Block*>(buf);
b->next = next_free_; b->next = next_free_;
next_free_ = b; next_free_ = b;
} }
bool SmallSizePool::in_pool(void* p) { bool SmallSizePool::in_pool(CudaBuffer* buf) {
return (p >= buffer_) && (p < end_); constexpr int num_blocks = (small_pool_size / small_block_size);
auto b = reinterpret_cast<Block*>(buf);
int64_t block_num = b - buffer_;
return block_num >= 0 && block_num < num_blocks;
} }
CudaAllocator::CudaAllocator() CudaAllocator::CudaAllocator()
: buffer_cache_( : buffer_cache_(
page_size, page_size,
[](CudaBuffer* buf) { return buf->size; }, [](CudaBuffer* buf) { return buf->size; },
[this](CudaBuffer* buf) { [this](CudaBuffer* buf) { cuda_free(buf); }) {
cuda_free(buf->data);
delete buf;
}) {
// TODO: Set memory limit for multi-device. // TODO: Set memory limit for multi-device.
size_t free, total; size_t free, total;
CHECK_CUDA_ERROR(cudaMemGetInfo(&free, &total)); CHECK_CUDA_ERROR(cudaMemGetInfo(&free, &total));
@@ -92,28 +97,26 @@ Buffer CudaAllocator::malloc(size_t size) {
CudaBuffer* buf = buffer_cache_.reuse_from_cache(size); CudaBuffer* buf = buffer_cache_.reuse_from_cache(size);
if (!buf) { if (!buf) {
// If we have a lot of memory pressure or are over the maximum cache size, // If we have a lot of memory pressure try to reclaim memory from the cache.
// try to reclaim memory from the cache. int64_t mem_to_free =
size_t mem_required = get_active_memory() + get_cache_memory() + size; get_active_memory() + get_cache_memory() + size - memory_limit_;
if (mem_required >= memory_limit_) { if (mem_to_free > 0) {
buffer_cache_.release_cached_buffers(mem_required - memory_limit_); buffer_cache_.release_cached_buffers(mem_to_free);
} }
lock.unlock();
buf = new CudaBuffer{nullptr, size};
// Try the scalar pool first // Try the scalar pool first
if (size <= small_block_size) { if (size <= small_block_size) {
buf->data = scalar_pool_.malloc(); buf = scalar_pool_.malloc();
} }
if (!buf->data) { lock.unlock();
if (!buf) {
buf = new CudaBuffer{nullptr, size};
cudaError_t err = cudaMallocManaged(&buf->data, size); cudaError_t err = cudaMallocManaged(&buf->data, size);
if (err != cudaSuccess && err != cudaErrorMemoryAllocation) { if (err != cudaSuccess && err != cudaErrorMemoryAllocation) {
throw std::runtime_error(fmt::format( throw std::runtime_error(fmt::format(
"cudaMallocManaged failed: {}.", cudaGetErrorString(err))); "cudaMallocManaged failed: {}.", cudaGetErrorString(err)));
} }
} }
lock.lock(); lock.lock();
} }
active_memory_ += size; active_memory_ += size;
@@ -123,7 +126,6 @@ Buffer CudaAllocator::malloc(size_t size) {
if (get_cache_memory() > max_pool_size_) { if (get_cache_memory() > max_pool_size_) {
buffer_cache_.release_cached_buffers(get_cache_memory() - max_pool_size_); buffer_cache_.release_cached_buffers(get_cache_memory() - max_pool_size_);
} }
return Buffer{buf}; return Buffer{buf};
} }
@@ -138,9 +140,7 @@ void CudaAllocator::free(Buffer buffer) {
if (get_cache_memory() < max_pool_size_) { if (get_cache_memory() < max_pool_size_) {
buffer_cache_.recycle_to_cache(buf); buffer_cache_.recycle_to_cache(buf);
} else { } else {
lock.unlock(); cuda_free(buf);
cuda_free(buf->data);
delete buf;
} }
} }
@@ -152,30 +152,13 @@ size_t CudaAllocator::size(Buffer buffer) const {
return buf->size; return buf->size;
} }
void CudaAllocator::register_this_thread() { // This must be called with mutex_ aquired
std::lock_guard lock(worker_mutex_); void CudaAllocator::cuda_free(CudaBuffer* buf) {
allowed_threads_.insert(std::this_thread::get_id());
}
void CudaAllocator::cuda_free(void* buf) {
// If cuda_free() is called from a unregistered thread, reschedule the call to
// worker.
{
std::lock_guard lock(worker_mutex_);
if (allowed_threads_.count(std::this_thread::get_id()) == 0) {
if (!worker_) {
worker_.reset(new Worker);
}
worker_->add_task([this, buf]() { this->cuda_free(buf); });
worker_->end_batch();
worker_->commit();
return;
}
}
if (scalar_pool_.in_pool(buf)) { if (scalar_pool_.in_pool(buf)) {
scalar_pool_.free(buf); scalar_pool_.free(buf);
} else { } else {
cudaFree(buf); cudaFree(buf->data);
delete buf;
} }
} }

View File

@@ -7,13 +7,10 @@
#include <mutex> #include <mutex>
#include <set> #include <set>
#include <thread>
#include <utility> #include <utility>
namespace mlx::core::cu { namespace mlx::core::cu {
class Worker;
using allocator::Buffer; using allocator::Buffer;
// Stores cuda-managed unified memory. // Stores cuda-managed unified memory.
@@ -24,13 +21,14 @@ struct CudaBuffer {
class SmallSizePool { class SmallSizePool {
private: private:
struct Block { union Block {
Block* next; Block* next;
CudaBuffer buf;
}; };
void* buffer_{nullptr}; Block* buffer_{nullptr};
void* data_{nullptr};
Block* next_free_{nullptr}; Block* next_free_{nullptr};
void* end_{nullptr};
public: public:
SmallSizePool(); SmallSizePool();
@@ -39,9 +37,9 @@ class SmallSizePool {
SmallSizePool(const SmallSizePool&) = delete; SmallSizePool(const SmallSizePool&) = delete;
SmallSizePool& operator=(const SmallSizePool&) = delete; SmallSizePool& operator=(const SmallSizePool&) = delete;
void* malloc(); CudaBuffer* malloc();
void free(void* p); void free(CudaBuffer* buf);
bool in_pool(void* p); bool in_pool(CudaBuffer* buf);
}; };
class CudaAllocator : public allocator::Allocator { class CudaAllocator : public allocator::Allocator {
@@ -50,15 +48,6 @@ class CudaAllocator : public allocator::Allocator {
void free(Buffer buffer) override; void free(Buffer buffer) override;
size_t size(Buffer buffer) const override; size_t size(Buffer buffer) const override;
// Register current thread as safe to free buffers.
// In cuda freeing a buffer implicitly synchronizes stream, and for threads
// that may be waited by gpu stream (for example cpu stream threads), freeing
// buffers there would result in dead lock.
void register_this_thread();
// Call cudaFree in the safe thread.
void cuda_free(void* buf);
size_t get_active_memory() const; size_t get_active_memory() const;
size_t get_peak_memory() const; size_t get_peak_memory() const;
void reset_peak_memory(); void reset_peak_memory();
@@ -69,13 +58,11 @@ class CudaAllocator : public allocator::Allocator {
void clear_cache(); void clear_cache();
private: private:
void cuda_free(CudaBuffer* buf);
CudaAllocator(); CudaAllocator();
friend CudaAllocator& allocator(); friend CudaAllocator& allocator();
std::mutex worker_mutex_;
std::unique_ptr<Worker> worker_;
std::set<std::thread::id> allowed_threads_;
std::mutex mutex_; std::mutex mutex_;
size_t memory_limit_; size_t memory_limit_;
size_t max_pool_size_; size_t max_pool_size_;

View File

@@ -128,7 +128,7 @@ __global__ void binary_g(
int ndim) { int ndim) {
IdxT index = cg::this_grid().thread_rank(); IdxT index = cg::this_grid().thread_rank();
if (index < size) { if (index < size) {
auto [a_idx, b_idx] = elem_to_loc_4d( auto [a_idx, b_idx] = elem_to_loc(
index, shape.data(), a_strides.data(), b_strides.data(), ndim); index, shape.data(), a_strides.data(), b_strides.data(), ndim);
out[index] = Op{}(a[a_idx], b[b_idx]); out[index] = Op{}(a[a_idx], b[b_idx]);
} }

View File

@@ -160,7 +160,7 @@ __global__ void binary_two_g(
int ndim) { int ndim) {
IdxT index = cg::this_grid().thread_rank(); IdxT index = cg::this_grid().thread_rank();
if (index < size) { if (index < size) {
auto [a_idx, b_idx] = elem_to_loc_4d( auto [a_idx, b_idx] = elem_to_loc(
index, shape.data(), a_strides.data(), b_strides.data(), ndim); index, shape.data(), a_strides.data(), b_strides.data(), ndim);
auto out = Op{}(a[a_idx], b[b_idx]); auto out = Op{}(a[a_idx], b[b_idx]);
out_a[index] = out[0]; out_a[index] = out[0];

View File

@@ -37,7 +37,7 @@ __global__ void copy_gg(
int ndim) { int ndim) {
IdxT index = cg::this_grid().thread_rank(); IdxT index = cg::this_grid().thread_rank();
if (index < size) { if (index < size) {
auto [idx_in, idx_out] = elem_to_loc_4d( auto [idx_in, idx_out] = elem_to_loc(
index, shape.data(), strides_in.data(), strides_out.data(), ndim); index, shape.data(), strides_in.data(), strides_out.data(), ndim);
out[idx_out] = CastOp<In, Out>{}(in[idx_in]); out[idx_out] = CastOp<In, Out>{}(in[idx_in]);
} }

View File

@@ -41,7 +41,7 @@ __global__ void copy_gg_dynamic(
const int64_t* offset_out) { const int64_t* offset_out) {
IdxT index = cg::this_grid().thread_rank(); IdxT index = cg::this_grid().thread_rank();
if (index < size) { if (index < size) {
auto [idx_in, idx_out] = elem_to_loc_4d( auto [idx_in, idx_out] = elem_to_loc(
index, shape.data(), strides_in.data(), strides_out.data(), ndim); index, shape.data(), strides_in.data(), strides_out.data(), ndim);
out[idx_out + *offset_out] = CastOp<In, Out>{}(in[idx_in + *offset_in]); out[idx_out + *offset_out] = CastOp<In, Out>{}(in[idx_in + *offset_in]);
} }

View File

@@ -34,7 +34,7 @@ __global__ void copy_g(
int ndim) { int ndim) {
IdxT index = cg::this_grid().thread_rank(); IdxT index = cg::this_grid().thread_rank();
if (index < size) { if (index < size) {
IdxT idx_in = elem_to_loc_4d(index, shape.data(), strides_in.data(), ndim); IdxT idx_in = elem_to_loc(index, shape.data(), strides_in.data(), ndim);
out[index] = CastOp<In, Out>{}(in[idx_in]); out[index] = CastOp<In, Out>{}(in[idx_in]);
} }
} }

View File

@@ -306,7 +306,6 @@ void CommandEncoder::commit() {
} }
// Put completion handlers in a batch. // Put completion handlers in a batch.
worker_.end_batch();
worker_.commit(stream_); worker_.commit(stream_);
} }
@@ -315,7 +314,6 @@ void CommandEncoder::synchronize() {
auto p = std::make_shared<std::promise<void>>(); auto p = std::make_shared<std::promise<void>>();
std::future<void> f = p->get_future(); std::future<void> f = p->get_future();
add_completed_handler([p = std::move(p)]() { p->set_value(); }); add_completed_handler([p = std::move(p)]() { p->set_value(); });
worker_.end_batch();
commit(); commit();
f.wait(); f.wait();
} }

View File

@@ -218,20 +218,8 @@ inline __host__ __device__ cuda::std::tuple<IdxT, IdxT, IdxT> elem_to_loc_nd(
return cuda::std::make_tuple(a_loc, b_loc, c_loc); return cuda::std::make_tuple(a_loc, b_loc, c_loc);
} }
// Optimized version when ndim is larger than 4.
template <typename IdxT = int64_t> template <typename IdxT = int64_t>
inline __host__ __device__ IdxT inline __host__ __device__ cuda::std::tuple<IdxT, IdxT> elem_to_loc(
elem_to_loc_4d(IdxT elem, const int* shape, const int64_t* strides, int ndim) {
IdxT loc = 0;
for (int i = ndim - 1; i >= 0; --i) {
loc += (elem % shape[i]) * IdxT(strides[i]);
elem /= shape[i];
}
return loc;
}
template <typename IdxT = int64_t>
inline __host__ __device__ cuda::std::tuple<IdxT, IdxT> elem_to_loc_4d(
IdxT elem, IdxT elem,
const int* shape, const int* shape,
const int64_t* a_strides, const int64_t* a_strides,
@@ -249,7 +237,7 @@ inline __host__ __device__ cuda::std::tuple<IdxT, IdxT> elem_to_loc_4d(
} }
template <typename IdxT = int64_t> template <typename IdxT = int64_t>
inline __host__ __device__ cuda::std::tuple<IdxT, IdxT, IdxT> elem_to_loc_4d( inline __host__ __device__ cuda::std::tuple<IdxT, IdxT, IdxT> elem_to_loc(
IdxT elem, IdxT elem,
const int* shape, const int* shape,
const int64_t* a_strides, const int64_t* a_strides,

View File

@@ -19,8 +19,6 @@ void new_stream(Stream s) {
cudaFree(nullptr); cudaFree(nullptr);
// Ensure the static stream objects get created. // Ensure the static stream objects get created.
cu::get_command_encoder(s); cu::get_command_encoder(s);
// The main thread is safe to free buffers.
cu::allocator().register_this_thread();
} }
void eval(array& arr) { void eval(array& arr) {

View File

@@ -110,24 +110,26 @@ __global__ void event_signal_kernel(SharedEvent::Atomic* ac, uint64_t value) {
event_signal(ac, value); event_signal(ac, value);
} }
SharedEvent::Atomic* to_atomic(std::shared_ptr<Buffer> buf) {
return static_cast<SharedEvent::Atomic*>(buf->raw_ptr());
}
SharedEvent::SharedEvent() { SharedEvent::SharedEvent() {
// Allocate cuda::atomic on managed memory. buf_ = std::shared_ptr<Buffer>(
Atomic* ac; new Buffer{allocator().malloc(sizeof(Atomic))}, [](Buffer* ptr) {
CHECK_CUDA_ERROR(cudaMallocManaged(&ac, sizeof(Atomic))); allocator().free(*ptr);
new (ac) Atomic(0); delete ptr;
ac_ = std::shared_ptr<Atomic>(ac, [](Atomic* ptr) { });
ptr->~Atomic(); *static_cast<uint64_t*>(buf_->raw_ptr()) = 0;
allocator().cuda_free(ptr);
});
} }
void SharedEvent::wait(uint64_t value) { void SharedEvent::wait(uint64_t value) {
nvtx3::scoped_range r("cu::SharedEvent::wait"); nvtx3::scoped_range r("cu::SharedEvent::wait");
event_wait(ac_.get(), value); event_wait(to_atomic(buf_), value);
} }
void SharedEvent::wait(cudaStream_t stream, uint64_t value) { void SharedEvent::wait(cudaStream_t stream, uint64_t value) {
event_wait_kernel<<<1, 1, 0, stream>>>(ac_.get(), value); event_wait_kernel<<<1, 1, 0, stream>>>(to_atomic(buf_), value);
} }
void SharedEvent::wait(Stream s, uint64_t value) { void SharedEvent::wait(Stream s, uint64_t value) {
@@ -138,17 +140,17 @@ void SharedEvent::wait(Stream s, uint64_t value) {
auto& encoder = get_command_encoder(s); auto& encoder = get_command_encoder(s);
encoder.commit(); encoder.commit();
wait(encoder.stream(), value); wait(encoder.stream(), value);
encoder.add_completed_handler([ac = ac_]() {}); encoder.add_completed_handler([buf = buf_]() {});
} }
} }
void SharedEvent::signal(uint64_t value) { void SharedEvent::signal(uint64_t value) {
nvtx3::scoped_range r("cu::SharedEvent::signal"); nvtx3::scoped_range r("cu::SharedEvent::signal");
event_signal(ac_.get(), value); event_signal(to_atomic(buf_), value);
} }
void SharedEvent::signal(cudaStream_t stream, uint64_t value) { void SharedEvent::signal(cudaStream_t stream, uint64_t value) {
event_signal_kernel<<<1, 1, 0, stream>>>(ac_.get(), value); event_signal_kernel<<<1, 1, 0, stream>>>(to_atomic(buf_), value);
} }
void SharedEvent::signal(Stream s, uint64_t value) { void SharedEvent::signal(Stream s, uint64_t value) {
@@ -162,18 +164,18 @@ void SharedEvent::signal(Stream s, uint64_t value) {
auto& encoder = get_command_encoder(s); auto& encoder = get_command_encoder(s);
encoder.commit(); encoder.commit();
signal(encoder.stream(), value); signal(encoder.stream(), value);
encoder.add_completed_handler([ac = ac_]() {}); encoder.add_completed_handler([buf = buf_]() {});
} }
} }
bool SharedEvent::is_signaled(uint64_t value) const { bool SharedEvent::is_signaled(uint64_t value) const {
nvtx3::scoped_range r("cu::SharedEvent::is_signaled"); nvtx3::scoped_range r("cu::SharedEvent::is_signaled");
return ac_->load() >= value; return to_atomic(buf_)->load() >= value;
} }
uint64_t SharedEvent::value() const { uint64_t SharedEvent::value() const {
nvtx3::scoped_range r("cu::SharedEvent::value"); nvtx3::scoped_range r("cu::SharedEvent::value");
return ac_->load(); return to_atomic(buf_)->load();
} }
} // namespace cu } // namespace cu

View File

@@ -2,6 +2,7 @@
#pragma once #pragma once
#include "mlx/allocator.h"
#include "mlx/stream.h" #include "mlx/stream.h"
#include <cuda_runtime.h> #include <cuda_runtime.h>
@@ -55,12 +56,8 @@ class SharedEvent {
bool is_signaled(uint64_t value) const; bool is_signaled(uint64_t value) const;
uint64_t value() const; uint64_t value() const;
const std::shared_ptr<Atomic>& atomic() const {
return ac_;
}
private: private:
std::shared_ptr<Atomic> ac_; std::shared_ptr<mlx::core::allocator::Buffer> buf_;
}; };
} // namespace mlx::core::cu } // namespace mlx::core::cu

147
mlx/backend/cuda/gemv.cu Normal file
View File

@@ -0,0 +1,147 @@
// Copyright © 2025 Apple Inc.
#include "mlx/backend/cuda/gemv.h"
#include "mlx/backend/cuda/kernel_utils.cuh"
#include "mlx/dtype_utils.h"
#include <cooperative_groups.h>
#include <cooperative_groups/reduce.h>
namespace mlx::core::cu {
namespace cg = cooperative_groups;
static constexpr int n_per_thread = 4;
static constexpr int rows_per_block = 8;
template <typename T, int rows_per_block, int n_per_thread>
__device__ void
gemv_impl(const T* mat, const T* vec, T* out, int rows, int cols) {
auto block = cg::this_thread_block();
auto warp = cg::tiled_partition<WARP_SIZE>(block);
auto g_idx = block.group_index();
auto t_idx = block.thread_index();
int row = g_idx.x * rows_per_block + t_idx.y;
if (row < rows) {
float sum = 0.0f;
for (int col = n_per_thread * warp.thread_rank(); col < cols;
col += (WARP_SIZE * n_per_thread)) {
auto local_mat = load_vector<n_per_thread>(mat + row * cols + col, 0);
auto local_vec = load_vector<n_per_thread>(vec + col, 0);
#pragma unroll
for (int j = 0; j < n_per_thread; ++j) {
sum += static_cast<float>(local_mat.val[j]) *
static_cast<float>(local_vec.val[j]);
}
}
sum = cg::reduce(warp, sum, cg::plus<float>{});
if (warp.thread_rank() == 0) {
out[row] = static_cast<T>(sum);
}
}
}
template <typename T, int rows_per_block, int n_per_thread>
__global__ void
gemv_single(const T* mat, const T* vec, T* out, int rows, int cols) {
gemv_impl<T, rows_per_block, n_per_thread>(mat, vec, out, rows, cols);
}
template <typename T, int rows_per_block, int n_per_thread>
__global__ void gemv_batched(
const T* mat,
const T* vec,
T* out,
int rows,
int cols,
const __grid_constant__ Shape batch_shape,
const __grid_constant__ Strides mat_batch_strides,
const __grid_constant__ Strides vec_batch_strides,
int batch_ndim) {
auto block = cg::this_thread_block();
auto batch_idx = block.group_index().y;
auto [vec_offset, mat_offset] = elem_to_loc(
batch_idx,
batch_shape.data(),
vec_batch_strides.data(),
mat_batch_strides.data(),
batch_ndim);
gemv_impl<T, rows_per_block, n_per_thread>(
mat + mat_offset, vec + vec_offset, out + batch_idx * rows, rows, cols);
}
bool can_use_gemv(int M, int N, int K, bool a_transposed, bool b_transposed) {
return K % (WARP_SIZE * n_per_thread) == 0 &&
((M == 1 && b_transposed) || (N == 1 && !a_transposed));
}
void gemv(
const array& a,
const array& b,
array& out,
int M,
int N,
int K,
uint32_t batch_count,
const mlx::core::Shape& batch_shape,
const mlx::core::Strides& a_batch_strides,
const mlx::core::Strides& b_batch_strides,
CommandEncoder& encoder) {
encoder.set_input_array(a);
encoder.set_input_array(b);
encoder.set_output_array(out);
dispatch_float_types(out.dtype(), "gemv", [&](auto type_tag) {
using DataType = cuda_type_t<MLX_GET_TYPE(type_tag)>;
dim3 block_dims{WARP_SIZE, rows_per_block};
const DataType* mat;
const DataType* vec;
int rows;
int cols = K;
auto mat_strides = const_param(a_batch_strides);
auto vec_strides = const_param(b_batch_strides);
if (M == 1) {
mat = b.data<DataType>();
vec = a.data<DataType>();
rows = N;
std::swap(mat_strides, vec_strides);
} else {
mat = a.data<DataType>();
vec = b.data<DataType>();
rows = M;
}
uint32_t num_blocks_x = (rows + rows_per_block - 1) / rows_per_block;
if (batch_count == 1) {
auto kernel = gemv_single<DataType, rows_per_block, n_per_thread>;
encoder.add_kernel_node(
kernel,
num_blocks_x,
block_dims,
mat,
vec,
out.data<DataType>(),
rows,
cols);
} else {
auto kernel = gemv_batched<DataType, rows_per_block, n_per_thread>;
encoder.add_kernel_node(
kernel,
dim3{num_blocks_x, batch_count},
block_dims,
mat,
vec,
out.data<DataType>(),
rows,
cols,
const_param(batch_shape),
mat_strides,
vec_strides,
batch_shape.size());
}
});
}
} // namespace mlx::core::cu

24
mlx/backend/cuda/gemv.h Normal file
View File

@@ -0,0 +1,24 @@
// Copyright © 2025 Apple Inc.
#pragma once
#include "mlx/backend/cuda/device.h"
namespace mlx::core::cu {
bool can_use_gemv(int M, int N, int K, bool a_transposed, bool b_transposed);
void gemv(
const array& a,
const array& b,
array& out,
int M,
int N,
int K,
uint32_t batch_count,
const mlx::core::Shape& batch_shape,
const mlx::core::Strides& a_batch_strides,
const mlx::core::Strides& b_batch_strides,
CommandEncoder& encoder);
} // namespace mlx::core::cu

View File

@@ -2,6 +2,7 @@
#include "mlx/backend/common/matmul.h" #include "mlx/backend/common/matmul.h"
#include "mlx/backend/cuda/device.h" #include "mlx/backend/cuda/device.h"
#include "mlx/backend/cuda/gemv.h"
#include "mlx/backend/gpu/copy.h" #include "mlx/backend/gpu/copy.h"
#include "mlx/dtype_utils.h" #include "mlx/dtype_utils.h"
#include "mlx/primitives.h" #include "mlx/primitives.h"
@@ -353,6 +354,22 @@ void Matmul::eval_gpu(const std::vector<array>& inputs, array& out) {
batch_shape = {1}; batch_shape = {1};
} }
if (cu::can_use_gemv(M, N, K, a_transposed, b_transposed)) {
cu::gemv(
a,
b,
out,
M,
N,
K,
batch_count,
batch_shape,
a_batch_strides,
b_batch_strides,
encoder);
return;
}
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// Invoke cublasLt // Invoke cublasLt

View File

@@ -76,7 +76,7 @@ __global__ void ternary_g(
int ndim) { int ndim) {
IdxT index = cg::this_grid().thread_rank(); IdxT index = cg::this_grid().thread_rank();
if (index < size) { if (index < size) {
auto [a_idx, b_idx, c_idx] = elem_to_loc_4d( auto [a_idx, b_idx, c_idx] = elem_to_loc(
index, index,
shape.data(), shape.data(),
a_strides.data(), a_strides.data(),

View File

@@ -47,7 +47,7 @@ __global__ void unary_g(
int ndim) { int ndim) {
IdxT index = cg::this_grid().thread_rank(); IdxT index = cg::this_grid().thread_rank();
if (index < size) { if (index < size) {
auto idx = elem_to_loc_4d(index, shape.data(), strides.data(), ndim); auto idx = elem_to_loc(index, shape.data(), strides.data(), ndim);
out[index] = Op{}(in[idx]); out[index] = Op{}(in[idx]);
} }
} }

View File

@@ -1,7 +1,6 @@
// Copyright © 2025 Apple Inc. // Copyright © 2025 Apple Inc.
#include "mlx/backend/cuda/worker.h" #include "mlx/backend/cuda/worker.h"
#include "mlx/backend/cuda/allocator.h"
#include "mlx/backend/cuda/device.h" #include "mlx/backend/cuda/device.h"
namespace mlx::core::cu { namespace mlx::core::cu {
@@ -12,10 +11,10 @@ Worker::Worker()
Worker::~Worker() { Worker::~Worker() {
{ {
std::lock_guard lock(worker_mutex_); std::lock_guard lock(mtx_);
stop_ = true; stop_ = true;
} }
worker_event_.signal(batch_ + 1); cond_.notify_one();
worker_.join(); worker_.join();
} }
@@ -23,53 +22,41 @@ void Worker::add_task(std::function<void()> task) {
pending_tasks_.push_back(std::move(task)); pending_tasks_.push_back(std::move(task));
} }
void Worker::consume_in_this_thread() { void Worker::signal(void* data) {
for (auto& task : pending_tasks_) { auto w = static_cast<Worker*>(data);
task();
}
pending_tasks_.clear();
}
void Worker::end_batch() {
batch_++;
{ {
std::lock_guard lock(worker_mutex_); std::lock_guard lock(w->mtx_);
worker_tasks_[batch_] = std::move(pending_tasks_); w->signaled_batch_++;
} }
uncommited_batches_++; w->cond_.notify_one();
}
void Worker::commit() {
if (uncommited_batches_ == 0) {
return;
}
uncommited_batches_ = 0;
worker_event_.signal(batch_);
} }
void Worker::commit(cudaStream_t stream) { void Worker::commit(cudaStream_t stream) {
if (uncommited_batches_ == 0) { // Move pending tasks into tasks
if (pending_tasks_.empty()) {
return; return;
} }
uncommited_batches_ = 0; {
// Signal the |worker_event_| in |signal_stream_| after the kernels in std::lock_guard lock(mtx_);
// |stream_| finish running. // Move pending tasks into ready tasks
worker_tasks_[++committed_batch_] = std::move(pending_tasks_);
}
signal_event_.record(stream); signal_event_.record(stream);
signal_event_.wait(signal_stream_); signal_event_.wait(signal_stream_);
worker_event_.signal(signal_stream_, batch_); cudaLaunchHostFunc(signal_stream_, signal, this);
} }
void Worker::thread_fn() { void Worker::thread_fn() {
// The worker thread is safe to free buffers.
allocator().register_this_thread();
while (!stop_) { while (!stop_) {
uint64_t batch = worker_event_.value(); uint64_t current_batch = 0;
Tasks tasks; Tasks tasks;
{ {
std::lock_guard lock(worker_mutex_); std::unique_lock<std::mutex> lk(mtx_);
// Move tasks in signaled batches. cond_.wait(lk, [this, &current_batch] {
auto end = worker_tasks_.upper_bound(batch); return this->signaled_batch_ > current_batch || this->stop_;
});
current_batch = signaled_batch_;
auto end = worker_tasks_.upper_bound(current_batch);
for (auto it = worker_tasks_.begin(); it != end; ++it) { for (auto it = worker_tasks_.begin(); it != end; ++it) {
if (tasks.empty()) { if (tasks.empty()) {
tasks = std::move(it->second); tasks = std::move(it->second);
@@ -85,7 +72,6 @@ void Worker::thread_fn() {
auto task = std::move(tasks[i]); auto task = std::move(tasks[i]);
task(); task();
} }
worker_event_.wait(batch + 1);
} }
} }

View File

@@ -5,6 +5,7 @@
#include "mlx/backend/cuda/event.h" #include "mlx/backend/cuda/event.h"
#include "mlx/backend/cuda/utils.h" #include "mlx/backend/cuda/utils.h"
#include <condition_variable>
#include <functional> #include <functional>
#include <map> #include <map>
#include <mutex> #include <mutex>
@@ -24,38 +25,24 @@ class Worker {
// Add a pending |task| that will run when consumed or commited. // Add a pending |task| that will run when consumed or commited.
void add_task(std::function<void()> task); void add_task(std::function<void()> task);
// Run pending tasks immediately in current thread.
void consume_in_this_thread();
// Put pending tasks in a batch.
void end_batch();
// Inform worker thread to run current batches now.
void commit();
// Inform worker thread to run current batches after kernels in |stream| // Inform worker thread to run current batches after kernels in |stream|
// finish running. // finish running.
void commit(cudaStream_t stream); void commit(cudaStream_t stream);
// Return how many batches have been added but not committed yet.
size_t uncommited_batches() const {
return uncommited_batches_;
}
private: private:
void thread_fn(); static void signal(void*);
uint64_t batch_{0}; void thread_fn();
size_t uncommited_batches_{0}; std::mutex mtx_;
std::condition_variable cond_;
uint64_t committed_batch_{0};
uint64_t signaled_batch_{0};
// Cuda stream and event for signaling kernel completion. // Cuda stream and event for signaling kernel completion.
CudaStream signal_stream_; CudaStream signal_stream_;
CudaEvent signal_event_; CudaEvent signal_event_;
// Worker thread.
SharedEvent worker_event_;
std::thread worker_;
std::mutex worker_mutex_;
bool stop_{false}; bool stop_{false};
// Tasks are put in |pending_tasks_| first, and then moved to // Tasks are put in |pending_tasks_| first, and then moved to
@@ -63,6 +50,7 @@ class Worker {
using Tasks = std::vector<std::function<void()>>; using Tasks = std::vector<std::function<void()>>;
Tasks pending_tasks_; Tasks pending_tasks_;
std::map<uint64_t, Tasks> worker_tasks_; std::map<uint64_t, Tasks> worker_tasks_;
std::thread worker_;
}; };
} // namespace mlx::core::cu } // namespace mlx::core::cu

View File

@@ -128,8 +128,7 @@ Buffer MetalAllocator::malloc(size_t size) {
auto pool = metal::new_scoped_memory_pool(); auto pool = metal::new_scoped_memory_pool();
// If we have a lot of memory pressure or are over the maximum cache size, // If we have a lot of memory pressure try to reclaim memory from the cache
// try to reclaim memory from the cache
if (mem_required >= gc_limit_ || num_resources_ >= resource_limit_) { if (mem_required >= gc_limit_ || num_resources_ >= resource_limit_) {
num_resources_ -= num_resources_ -=
buffer_cache_.release_cached_buffers(mem_required - gc_limit_); buffer_cache_.release_cached_buffers(mem_required - gc_limit_);

View File

@@ -9,7 +9,7 @@ from functools import partial
from pathlib import Path from pathlib import Path
from subprocess import run from subprocess import run
from setuptools import Command, Extension, setup from setuptools import Command, Extension, find_namespace_packages, setup
from setuptools.command.bdist_wheel import bdist_wheel from setuptools.command.bdist_wheel import bdist_wheel
from setuptools.command.build_ext import build_ext from setuptools.command.build_ext import build_ext
@@ -166,6 +166,10 @@ class GenerateStubs(Command):
# Run again without recursive to specify output file name # Run again without recursive to specify output file name
subprocess.run(["rm", f"{out_path}/mlx.pyi"]) subprocess.run(["rm", f"{out_path}/mlx.pyi"])
subprocess.run(stub_cmd + ["-o", f"{out_path}/__init__.pyi"]) subprocess.run(stub_cmd + ["-o", f"{out_path}/__init__.pyi"])
# mx.bool_ gets filtered by nanobind because of the trailing
# underscore, add it manually:
with open(f"{out_path}/__init__.pyi", "a") as fid:
fid.write("\nbool_: Dtype = ...")
class MLXBdistWheel(bdist_wheel): class MLXBdistWheel(bdist_wheel):
@@ -184,12 +188,19 @@ with open(Path(__file__).parent / "README.md", encoding="utf-8") as f:
if __name__ == "__main__": if __name__ == "__main__":
package_dir = {"": "python"} package_dir = {"": "python"}
packages = [ packages = find_namespace_packages(
"mlx", where="python",
"mlx.nn", exclude=[
"mlx.nn.layers", "src",
"mlx.optimizers", "tests",
] "scripts",
"mlx.lib",
"mlx.include",
"mlx.share",
"mlx.share.**",
"mlx.include.**",
],
)
build_macos = platform.system() == "Darwin" build_macos = platform.system() == "Darwin"
build_cuda = "MLX_BUILD_CUDA=ON" in os.environ.get("CMAKE_ARGS", "") build_cuda = "MLX_BUILD_CUDA=ON" in os.environ.get("CMAKE_ARGS", "")
@@ -221,7 +232,7 @@ if __name__ == "__main__":
}, },
) )
package_data = {"mlx": ["lib/*", "include/*", "share/*"], "mlx.core": ["*.pyi"]} package_data = {"mlx.core": ["*.pyi"]}
extras = { extras = {
"dev": [ "dev": [