mirror of
https://github.com/ml-explore/mlx-examples.git
synced 2025-09-16 08:08:08 +08:00
make_sampler
creates sampler chain with all sampling parameters (#1330)
* top_p refactor * top_k and min_p refactor * Create sampler chain * Remove unnecessary mx.where * Use mx.allclose
This commit is contained in:
@@ -1,79 +1,97 @@
|
||||
import unittest
|
||||
|
||||
import mlx.core as mx
|
||||
from mlx_lm.sample_utils import min_p_sampling, top_k_sampling, top_p_sampling
|
||||
from mlx_lm.sample_utils import apply_min_p, apply_top_k, apply_top_p
|
||||
|
||||
|
||||
class TestSampleUtils(unittest.TestCase):
|
||||
def test_top_p_sampling(self):
|
||||
def test_apply_top_p(self):
|
||||
probs = mx.array([0.9, 0.0, 0.0, 0.1])[None]
|
||||
logits = mx.log(probs)
|
||||
temperature = 1.0
|
||||
|
||||
token = top_p_sampling(logits, 0.3, temperature).item()
|
||||
self.assertEqual(token, 0)
|
||||
new_logits = apply_top_p(logits, 0.3)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertEqual(actual_probs.tolist(), [1.0, 0.0, 0.0, 0.0])
|
||||
|
||||
token = top_p_sampling(logits, 0.95, temperature).item()
|
||||
self.assertTrue(token in (0, 3))
|
||||
new_logits = apply_top_p(logits, 0.95)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertTrue(mx.allclose(probs.squeeze(), actual_probs))
|
||||
|
||||
probs = mx.array([0.0, 0.5, 0.4, 0.1])[None]
|
||||
logits = mx.log(probs)
|
||||
new_logits = apply_top_p(logits, 0.4)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertEqual(actual_probs.tolist(), [0.0, 1.0, 0.0, 0.0])
|
||||
|
||||
token = top_p_sampling(logits, 0.4, temperature).item()
|
||||
self.assertEqual(token, 1)
|
||||
new_logits = apply_top_p(logits, 0.6)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertEqual(
|
||||
[round(p, 4) for p in actual_probs.tolist()], [0.0, 0.5556, 0.4444, 0.0]
|
||||
)
|
||||
|
||||
token = top_p_sampling(logits, 0.6, temperature).item()
|
||||
self.assertTrue(token in (1, 2))
|
||||
new_logits = apply_top_p(logits, 0.95)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
actual_rounded = [round(p, 4) for p in actual_probs.tolist()]
|
||||
expected_rounded = [0.0, 0.5, 0.4, 0.1]
|
||||
self.assertEqual(actual_rounded, expected_rounded)
|
||||
self.assertAlmostEqual(sum(actual_probs.tolist()), 1.0)
|
||||
|
||||
token = top_p_sampling(logits, 0.95, temperature).item()
|
||||
self.assertTrue(token in (1, 2, 3))
|
||||
# Batch mode works
|
||||
probs = mx.array([[0.9, 0.0, 0.0, 0.1], [0.0, 0.8, 0.1, 0.1]])
|
||||
logits = mx.log(probs)
|
||||
new_logits = apply_top_p(logits, 0.5)
|
||||
actual_probs = mx.softmax(new_logits, axis=-1)
|
||||
self.assertEqual(
|
||||
actual_probs.tolist(), [[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0]]
|
||||
)
|
||||
|
||||
def test_apply_min_p(self):
|
||||
probs = mx.array([0.9, 0.0, 0.0, 0.1])[None]
|
||||
logits = mx.log(probs)
|
||||
new_logits = apply_min_p(logits, 0.8)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertEqual(actual_probs.tolist(), [1.0, 0.0, 0.0, 0.0])
|
||||
|
||||
probs = mx.array([0.9, 0.0, 0.0, 0.1])[None]
|
||||
logits = mx.log(probs)
|
||||
new_logits = apply_min_p(logits, 0.05)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertTrue(mx.allclose(actual_probs, mx.squeeze(probs)))
|
||||
|
||||
# Batch mode works
|
||||
probs = mx.array([[0.9, 0.0, 0.0, 0.1], [0.0, 0.8, 0.0, 0.1]])
|
||||
logits = mx.log(probs)
|
||||
tokens = top_p_sampling(logits, 0.5, temperature)
|
||||
self.assertEqual(tokens.tolist(), [0, 1])
|
||||
new_logits = apply_min_p(logits, 0.7)
|
||||
actual_probs = mx.softmax(new_logits, axis=-1)
|
||||
self.assertEqual(
|
||||
actual_probs.tolist(), [[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0]]
|
||||
)
|
||||
|
||||
def test_min_p_sampling(self):
|
||||
probs = mx.array([0.9, 0.0, 0.0, 0.1])[None]
|
||||
logits = mx.log(probs)
|
||||
temperature = 1.0
|
||||
token = min_p_sampling(logits, 0.8)
|
||||
self.assertEqual(token, 0)
|
||||
|
||||
probs = mx.array([0.9, 0.0, 0.0, 0.1])[None]
|
||||
logits = mx.log(probs)
|
||||
temperature = 1.0
|
||||
for _ in range(5):
|
||||
token = min_p_sampling(logits, 0.05)
|
||||
self.assertTrue(token in (0, 3))
|
||||
|
||||
# Batch mode works
|
||||
probs = mx.array([[0.9, 0.0, 0.0, 0.1], [0.0, 0.8, 0.0, 0.1]])
|
||||
logits = mx.log(probs)
|
||||
tokens = min_p_sampling(logits, 0.7)
|
||||
self.assertEqual(tokens.tolist(), [0, 1])
|
||||
|
||||
def test_top_k_sampling(self):
|
||||
def test_apply_top_k(self):
|
||||
probs = mx.array([0.9, 0.0, 0.0, 0.1])[None]
|
||||
logits = mx.log(probs)
|
||||
|
||||
token = top_k_sampling(logits, 1).item()
|
||||
self.assertEqual(token, 0)
|
||||
new_logits = apply_top_k(logits, 1)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertEqual(actual_probs.tolist(), [1.0, 0.0, 0.0, 0.0])
|
||||
|
||||
probs = mx.array([0.5, 0.0, 0.0, 0.5])[None]
|
||||
tokens = set()
|
||||
for _ in range(100):
|
||||
token = top_k_sampling(logits, 2)
|
||||
tokens.add(token.item())
|
||||
self.assertEqual(tokens, {0, 3})
|
||||
probs = mx.array([0.6, 0.0, 0.1, 0.3])[None]
|
||||
logits = mx.log(probs)
|
||||
new_logits = apply_top_k(logits, 2)
|
||||
actual_probs = mx.softmax(new_logits.squeeze())
|
||||
self.assertEqual(
|
||||
[round(p, 4) for p in actual_probs.tolist()], [0.6667, 0.0, 0.0, 0.3333]
|
||||
)
|
||||
|
||||
# Batch mode works
|
||||
probs = mx.array([[0.9, 0.0, 0.0, 0.1], [0.0, 0.8, 0.0, 0.1]])
|
||||
logits = mx.log(probs)
|
||||
|
||||
tokens = top_k_sampling(logits, 1)
|
||||
self.assertEqual(tokens.tolist(), [0, 1])
|
||||
new_logits = apply_top_k(logits, 1)
|
||||
actual_probs = mx.softmax(new_logits, axis=-1)
|
||||
self.assertEqual(
|
||||
actual_probs.tolist(), [[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0]]
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Reference in New Issue
Block a user