1367 lines
42 KiB
Python
1367 lines
42 KiB
Python
# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
|
|
# Spack Project Developers. See the top-level COPYRIGHT file for details.
|
|
#
|
|
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
|
|
|
|
"""These tests ensure that our lock works correctly.
|
|
|
|
This can be run in two ways.
|
|
|
|
First, it can be run as a node-local test, with a typical invocation like
|
|
this::
|
|
|
|
spack test lock
|
|
|
|
You can *also* run it as an MPI program, which allows you to test locks
|
|
across nodes. So, e.g., you can run the test like this::
|
|
|
|
mpirun -n 7 spack test lock
|
|
|
|
And it will test locking correctness among MPI processes. Ideally, you
|
|
want the MPI processes to span across multiple nodes, so, e.g., for SLURM
|
|
you might do this::
|
|
|
|
srun -N 7 -n 7 -m cyclic spack test lock
|
|
|
|
You can use this to test whether your shared filesystem properly supports
|
|
POSIX reader-writer locking with byte ranges through fcntl.
|
|
|
|
If you want to test on multiple filesystems, you can modify the
|
|
``locations`` list below. By default it looks like this::
|
|
|
|
locations = [
|
|
tempfile.gettempdir(), # standard tmp directory (potentially local)
|
|
'/nfs/tmp2/%u', # NFS tmp mount
|
|
'/p/lscratch*/%u' # Lustre scratch mount
|
|
]
|
|
|
|
Add names and paths for your preferred filesystem mounts to test on them;
|
|
the tests are parametrized to run on all the filesystems listed in this
|
|
dict. Note that 'tmp' will be skipped for MPI testing, as it is often a
|
|
node-local filesystem, and multi-node tests will fail if the locks aren't
|
|
actually on a shared filesystem.
|
|
|
|
"""
|
|
import collections
|
|
import errno
|
|
import fcntl
|
|
import getpass
|
|
import glob
|
|
import os
|
|
import shutil
|
|
import socket
|
|
import tempfile
|
|
import traceback
|
|
from contextlib import contextmanager
|
|
from multiprocessing import Process, Queue
|
|
|
|
import pytest
|
|
|
|
import llnl.util.lock as lk
|
|
import llnl.util.multiproc as mp
|
|
from llnl.util.filesystem import touch
|
|
|
|
#
|
|
# This test can be run with MPI. MPI is "enabled" if we can import
|
|
# mpi4py and the number of total MPI processes is greater than 1.
|
|
# Otherwise it just runs as a node-local test.
|
|
#
|
|
# NOTE: MPI mode is different from node-local mode in that node-local
|
|
# mode will spawn its own test processes, while MPI mode assumes you've
|
|
# run this script as a SPMD application. In MPI mode, no additional
|
|
# processes are spawned, and you need to ensure that you mpirun the
|
|
# script with enough processes for all the multiproc_test cases below.
|
|
#
|
|
# If you don't run with enough processes, tests that require more
|
|
# processes than you currently have will be skipped.
|
|
#
|
|
mpi = False
|
|
comm = None
|
|
try:
|
|
from mpi4py import MPI
|
|
comm = MPI.COMM_WORLD
|
|
if comm.size > 1:
|
|
mpi = True
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
"""This is a list of filesystem locations to test locks in. Paths are
|
|
expanded so that %u is replaced with the current username. '~' is also
|
|
legal and will be expanded to the user's home directory.
|
|
|
|
Tests are skipped for directories that don't exist, so you'll need to
|
|
update this with the locations of NFS, Lustre, and other mounts on your
|
|
system.
|
|
"""
|
|
locations = [
|
|
tempfile.gettempdir(),
|
|
os.path.join('/nfs/tmp2/', getpass.getuser()),
|
|
os.path.join('/p/lscratch*/', getpass.getuser()),
|
|
]
|
|
|
|
"""This is the longest a failed multiproc test will take.
|
|
Barriers will time out and raise an exception after this interval.
|
|
In MPI mode, barriers don't time out (they hang). See mpi_multiproc_test.
|
|
"""
|
|
barrier_timeout = 5
|
|
|
|
"""This is the lock timeout for expected failures.
|
|
This may need to be higher for some filesystems."""
|
|
lock_fail_timeout = 0.1
|
|
|
|
|
|
def make_readable(*paths):
|
|
for path in paths:
|
|
mode = 0o555 if os.path.isdir(path) else 0o444
|
|
os.chmod(path, mode)
|
|
|
|
|
|
def make_writable(*paths):
|
|
for path in paths:
|
|
mode = 0o755 if os.path.isdir(path) else 0o744
|
|
os.chmod(path, mode)
|
|
|
|
|
|
@contextmanager
|
|
def read_only(*paths):
|
|
modes = [os.stat(p).st_mode for p in paths]
|
|
make_readable(*paths)
|
|
|
|
yield
|
|
|
|
for path, mode in zip(paths, modes):
|
|
os.chmod(path, mode)
|
|
|
|
|
|
@pytest.fixture(scope='session', params=locations)
|
|
def lock_test_directory(request):
|
|
"""This fixture causes tests to be executed for many different mounts.
|
|
|
|
See the ``locations`` dict above for details.
|
|
"""
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def lock_dir(lock_test_directory):
|
|
parent = next((p for p in glob.glob(lock_test_directory)
|
|
if os.path.exists(p) and os.access(p, os.W_OK)), None)
|
|
if not parent:
|
|
# Skip filesystems that don't exist or aren't writable
|
|
pytest.skip("requires filesystem: '%s'" % lock_test_directory)
|
|
elif mpi and parent == tempfile.gettempdir():
|
|
# Skip local tmp test for MPI runs
|
|
pytest.skip("skipping local tmp directory for MPI test.")
|
|
|
|
tempdir = None
|
|
if not mpi or comm.rank == 0:
|
|
tempdir = tempfile.mkdtemp(dir=parent)
|
|
if mpi:
|
|
tempdir = comm.bcast(tempdir)
|
|
|
|
yield tempdir
|
|
|
|
if mpi:
|
|
# rank 0 may get here before others, in which case it'll try to
|
|
# remove the directory while other processes try to re-create the
|
|
# lock. This will give errno 39: directory not empty. Use a
|
|
# barrier to ensure everyone is done first.
|
|
comm.barrier()
|
|
|
|
if not mpi or comm.rank == 0:
|
|
make_writable(tempdir)
|
|
shutil.rmtree(tempdir)
|
|
|
|
|
|
@pytest.fixture
|
|
def private_lock_path(lock_dir):
|
|
"""In MPI mode, this is a private lock for each rank in a multiproc test.
|
|
|
|
For other modes, it is the same as a shared lock.
|
|
"""
|
|
lock_file = os.path.join(lock_dir, 'lockfile')
|
|
if mpi:
|
|
lock_file += '.%s' % comm.rank
|
|
|
|
yield lock_file
|
|
|
|
if os.path.exists(lock_file):
|
|
make_writable(lock_dir, lock_file)
|
|
os.unlink(lock_file)
|
|
|
|
|
|
@pytest.fixture
|
|
def lock_path(lock_dir):
|
|
"""This lock is shared among all processes in a multiproc test."""
|
|
lock_file = os.path.join(lock_dir, 'lockfile')
|
|
|
|
yield lock_file
|
|
|
|
if os.path.exists(lock_file):
|
|
make_writable(lock_dir, lock_file)
|
|
os.unlink(lock_file)
|
|
|
|
|
|
def test_poll_interval_generator():
|
|
interval_iter = iter(
|
|
lk.Lock._poll_interval_generator(_wait_times=[1, 2, 3]))
|
|
intervals = list(next(interval_iter) for i in range(100))
|
|
assert intervals == [1] * 20 + [2] * 40 + [3] * 40
|
|
|
|
|
|
def local_multiproc_test(*functions, **kwargs):
|
|
"""Order some processes using simple barrier synchronization."""
|
|
b = mp.Barrier(len(functions), timeout=barrier_timeout)
|
|
|
|
args = (b,) + tuple(kwargs.get('extra_args', ()))
|
|
procs = [Process(target=f, args=args, name=f.__name__)
|
|
for f in functions]
|
|
|
|
for p in procs:
|
|
p.start()
|
|
|
|
for p in procs:
|
|
p.join()
|
|
|
|
assert all(p.exitcode == 0 for p in procs)
|
|
|
|
|
|
def mpi_multiproc_test(*functions):
|
|
"""SPMD version of multiproc test.
|
|
|
|
This needs to be run like so:
|
|
|
|
srun spack test lock
|
|
|
|
Each process executes its corresponding function. This is different
|
|
from ``multiproc_test`` above, which spawns the processes. This will
|
|
skip tests if there are too few processes to run them.
|
|
"""
|
|
procs = len(functions)
|
|
if procs > comm.size:
|
|
pytest.skip("requires at least %d MPI processes" % procs)
|
|
|
|
comm.Barrier() # barrier before each MPI test
|
|
|
|
include = comm.rank < len(functions)
|
|
subcomm = comm.Split(include)
|
|
|
|
class subcomm_barrier(object):
|
|
"""Stand-in for multiproc barrier for MPI-parallel jobs."""
|
|
def wait(self):
|
|
subcomm.Barrier()
|
|
|
|
if include:
|
|
try:
|
|
functions[subcomm.rank](subcomm_barrier())
|
|
except BaseException:
|
|
# aborting is the best we can do for MPI tests without
|
|
# hanging, since we're using MPI barriers. This will fail
|
|
# early and it loses the nice pytest output, but at least it
|
|
# gets use a stacktrace on the processes that failed.
|
|
traceback.print_exc()
|
|
comm.Abort()
|
|
subcomm.Free()
|
|
|
|
comm.Barrier() # barrier after each MPI test.
|
|
|
|
|
|
"""``multiproc_test()`` should be called by tests below.
|
|
``multiproc_test()`` will work for either MPI runs or for local runs.
|
|
"""
|
|
multiproc_test = mpi_multiproc_test if mpi else local_multiproc_test
|
|
|
|
|
|
#
|
|
# Process snippets below can be composed into tests.
|
|
#
|
|
class AcquireWrite(object):
|
|
def __init__(self, lock_path, start=0, length=0):
|
|
self.lock_path = lock_path
|
|
self.start = start
|
|
self.length = length
|
|
|
|
@property
|
|
def __name__(self):
|
|
return self.__class__.__name__
|
|
|
|
def __call__(self, barrier):
|
|
lock = lk.Lock(self.lock_path, self.start, self.length)
|
|
lock.acquire_write() # grab exclusive lock
|
|
barrier.wait()
|
|
barrier.wait() # hold the lock until timeout in other procs.
|
|
|
|
|
|
class AcquireRead(object):
|
|
def __init__(self, lock_path, start=0, length=0):
|
|
self.lock_path = lock_path
|
|
self.start = start
|
|
self.length = length
|
|
|
|
@property
|
|
def __name__(self):
|
|
return self.__class__.__name__
|
|
|
|
def __call__(self, barrier):
|
|
lock = lk.Lock(self.lock_path, self.start, self.length)
|
|
lock.acquire_read() # grab shared lock
|
|
barrier.wait()
|
|
barrier.wait() # hold the lock until timeout in other procs.
|
|
|
|
|
|
class TimeoutWrite(object):
|
|
def __init__(self, lock_path, start=0, length=0):
|
|
self.lock_path = lock_path
|
|
self.start = start
|
|
self.length = length
|
|
|
|
@property
|
|
def __name__(self):
|
|
return self.__class__.__name__
|
|
|
|
def __call__(self, barrier):
|
|
lock = lk.Lock(self.lock_path, self.start, self.length)
|
|
barrier.wait() # wait for lock acquire in first process
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
barrier.wait()
|
|
|
|
|
|
class TimeoutRead(object):
|
|
def __init__(self, lock_path, start=0, length=0):
|
|
self.lock_path = lock_path
|
|
self.start = start
|
|
self.length = length
|
|
|
|
@property
|
|
def __name__(self):
|
|
return self.__class__.__name__
|
|
|
|
def __call__(self, barrier):
|
|
lock = lk.Lock(self.lock_path, self.start, self.length)
|
|
barrier.wait() # wait for lock acquire in first process
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_read(lock_fail_timeout)
|
|
barrier.wait()
|
|
|
|
|
|
#
|
|
# Test that exclusive locks on other processes time out when an
|
|
# exclusive lock is held.
|
|
#
|
|
def test_write_lock_timeout_on_write(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_write_2(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_write_3(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_write_ranges(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path, 0, 1),
|
|
TimeoutWrite(lock_path, 0, 1))
|
|
|
|
|
|
def test_write_lock_timeout_on_write_ranges_2(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path, 0, 64),
|
|
AcquireWrite(lock_path, 65, 1),
|
|
TimeoutWrite(lock_path, 0, 1),
|
|
TimeoutWrite(lock_path, 63, 1))
|
|
|
|
|
|
def test_write_lock_timeout_on_write_ranges_3(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path, 0, 1),
|
|
AcquireWrite(lock_path, 1, 1),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_write_ranges_4(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path, 0, 1),
|
|
AcquireWrite(lock_path, 1, 1),
|
|
AcquireWrite(lock_path, 2, 456),
|
|
AcquireWrite(lock_path, 500, 64),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
#
|
|
# Test that shared locks on other processes time out when an
|
|
# exclusive lock is held.
|
|
#
|
|
def test_read_lock_timeout_on_write(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path),
|
|
TimeoutRead(lock_path))
|
|
|
|
|
|
def test_read_lock_timeout_on_write_2(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path),
|
|
TimeoutRead(lock_path),
|
|
TimeoutRead(lock_path))
|
|
|
|
|
|
def test_read_lock_timeout_on_write_3(lock_path):
|
|
multiproc_test(
|
|
AcquireWrite(lock_path),
|
|
TimeoutRead(lock_path),
|
|
TimeoutRead(lock_path),
|
|
TimeoutRead(lock_path))
|
|
|
|
|
|
def test_read_lock_timeout_on_write_ranges(lock_path):
|
|
"""small write lock, read whole file."""
|
|
multiproc_test(
|
|
AcquireWrite(lock_path, 0, 1),
|
|
TimeoutRead(lock_path))
|
|
|
|
|
|
def test_read_lock_timeout_on_write_ranges_2(lock_path):
|
|
"""small write lock, small read lock"""
|
|
multiproc_test(
|
|
AcquireWrite(lock_path, 0, 1),
|
|
TimeoutRead(lock_path, 0, 1))
|
|
|
|
|
|
def test_read_lock_timeout_on_write_ranges_3(lock_path):
|
|
"""two write locks, overlapping read locks"""
|
|
multiproc_test(
|
|
AcquireWrite(lock_path, 0, 1),
|
|
AcquireWrite(lock_path, 64, 128),
|
|
TimeoutRead(lock_path, 0, 1),
|
|
TimeoutRead(lock_path, 128, 256))
|
|
|
|
|
|
#
|
|
# Test that exclusive locks time out when shared locks are held.
|
|
#
|
|
def test_write_lock_timeout_on_read(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_read_2(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_read_3(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_read_ranges(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 1),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_on_read_ranges_2(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 1),
|
|
TimeoutWrite(lock_path, 0, 1))
|
|
|
|
|
|
def test_write_lock_timeout_on_read_ranges_3(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 1),
|
|
AcquireRead(lock_path, 10, 1),
|
|
TimeoutWrite(lock_path, 0, 1),
|
|
TimeoutWrite(lock_path, 10, 1))
|
|
|
|
|
|
def test_write_lock_timeout_on_read_ranges_4(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 64),
|
|
TimeoutWrite(lock_path, 10, 1),
|
|
TimeoutWrite(lock_path, 32, 1))
|
|
|
|
|
|
def test_write_lock_timeout_on_read_ranges_5(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 64, 128),
|
|
TimeoutWrite(lock_path, 65, 1),
|
|
TimeoutWrite(lock_path, 127, 1),
|
|
TimeoutWrite(lock_path, 90, 10))
|
|
|
|
|
|
#
|
|
# Test that exclusive locks time while lots of shared locks are held.
|
|
#
|
|
def test_write_lock_timeout_with_multiple_readers_2_1(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path),
|
|
AcquireRead(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_with_multiple_readers_2_2(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path),
|
|
AcquireRead(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_with_multiple_readers_3_1(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path),
|
|
AcquireRead(lock_path),
|
|
AcquireRead(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_with_multiple_readers_3_2(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path),
|
|
AcquireRead(lock_path),
|
|
AcquireRead(lock_path),
|
|
TimeoutWrite(lock_path),
|
|
TimeoutWrite(lock_path))
|
|
|
|
|
|
def test_write_lock_timeout_with_multiple_readers_2_1_ranges(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 10),
|
|
AcquireRead(lock_path, 0.5, 10),
|
|
TimeoutWrite(lock_path, 5, 5))
|
|
|
|
|
|
def test_write_lock_timeout_with_multiple_readers_2_3_ranges(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 10),
|
|
AcquireRead(lock_path, 5, 15),
|
|
TimeoutWrite(lock_path, 0, 1),
|
|
TimeoutWrite(lock_path, 11, 3),
|
|
TimeoutWrite(lock_path, 7, 1))
|
|
|
|
|
|
def test_write_lock_timeout_with_multiple_readers_3_1_ranges(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 5),
|
|
AcquireRead(lock_path, 5, 5),
|
|
AcquireRead(lock_path, 10, 5),
|
|
TimeoutWrite(lock_path, 0, 15))
|
|
|
|
|
|
def test_write_lock_timeout_with_multiple_readers_3_2_ranges(lock_path):
|
|
multiproc_test(
|
|
AcquireRead(lock_path, 0, 5),
|
|
AcquireRead(lock_path, 5, 5),
|
|
AcquireRead(lock_path, 10, 5),
|
|
TimeoutWrite(lock_path, 3, 10),
|
|
TimeoutWrite(lock_path, 5, 1))
|
|
|
|
|
|
@pytest.mark.skipif(os.getuid() == 0, reason='user is root')
|
|
def test_read_lock_on_read_only_lockfile(lock_dir, lock_path):
|
|
"""read-only directory, read-only lockfile."""
|
|
touch(lock_path)
|
|
with read_only(lock_path, lock_dir):
|
|
lock = lk.Lock(lock_path)
|
|
|
|
with lk.ReadTransaction(lock):
|
|
pass
|
|
|
|
with pytest.raises(lk.LockROFileError):
|
|
with lk.WriteTransaction(lock):
|
|
pass
|
|
|
|
|
|
def test_read_lock_read_only_dir_writable_lockfile(lock_dir, lock_path):
|
|
"""read-only directory, writable lockfile."""
|
|
touch(lock_path)
|
|
with read_only(lock_dir):
|
|
lock = lk.Lock(lock_path)
|
|
|
|
with lk.ReadTransaction(lock):
|
|
pass
|
|
|
|
with lk.WriteTransaction(lock):
|
|
pass
|
|
|
|
|
|
@pytest.mark.skipif(os.getuid() == 0, reason='user is root')
|
|
def test_read_lock_no_lockfile(lock_dir, lock_path):
|
|
"""read-only directory, no lockfile (so can't create)."""
|
|
with read_only(lock_dir):
|
|
lock = lk.Lock(lock_path)
|
|
|
|
with pytest.raises(lk.CantCreateLockError):
|
|
with lk.ReadTransaction(lock):
|
|
pass
|
|
|
|
with pytest.raises(lk.CantCreateLockError):
|
|
with lk.WriteTransaction(lock):
|
|
pass
|
|
|
|
|
|
def test_upgrade_read_to_write(private_lock_path):
|
|
"""Test that a read lock can be upgraded to a write lock.
|
|
|
|
Note that to upgrade a read lock to a write lock, you have the be the
|
|
only holder of a read lock. Client code needs to coordinate that for
|
|
shared locks. For this test, we use a private lock just to test that an
|
|
upgrade is possible.
|
|
"""
|
|
# ensure lock file exists the first time, so we open it read-only
|
|
# to begin with.
|
|
touch(private_lock_path)
|
|
|
|
lock = lk.Lock(private_lock_path)
|
|
assert lock._reads == 0
|
|
assert lock._writes == 0
|
|
|
|
lock.acquire_read()
|
|
assert lock._reads == 1
|
|
assert lock._writes == 0
|
|
assert lock._file.mode == 'r+'
|
|
|
|
lock.acquire_write()
|
|
assert lock._reads == 1
|
|
assert lock._writes == 1
|
|
assert lock._file.mode == 'r+'
|
|
|
|
lock.release_write()
|
|
assert lock._reads == 1
|
|
assert lock._writes == 0
|
|
assert lock._file.mode == 'r+'
|
|
|
|
lock.release_read()
|
|
assert lock._reads == 0
|
|
assert lock._writes == 0
|
|
assert lock._file is None
|
|
|
|
|
|
def test_upgrade_read_to_write_fails_with_readonly_file(private_lock_path):
|
|
"""Test that read-only file can be read-locked but not write-locked."""
|
|
# ensure lock file exists the first time
|
|
touch(private_lock_path)
|
|
|
|
# open it read-only to begin with.
|
|
with read_only(private_lock_path):
|
|
lock = lk.Lock(private_lock_path)
|
|
assert lock._reads == 0
|
|
assert lock._writes == 0
|
|
|
|
lock.acquire_read()
|
|
assert lock._reads == 1
|
|
assert lock._writes == 0
|
|
assert lock._file.mode == 'r'
|
|
|
|
# upgrade to writ here
|
|
with pytest.raises(lk.LockROFileError):
|
|
lock.acquire_write()
|
|
|
|
|
|
class ComplexAcquireAndRelease(object):
|
|
def __init__(self, lock_path):
|
|
self.lock_path = lock_path
|
|
|
|
def p1(self, barrier):
|
|
lock = lk.Lock(self.lock_path)
|
|
|
|
lock.acquire_write()
|
|
barrier.wait() # ---------------------------------------- 1
|
|
# others test timeout
|
|
barrier.wait() # ---------------------------------------- 2
|
|
lock.release_write() # release and others acquire read
|
|
barrier.wait() # ---------------------------------------- 3
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
lock.acquire_read()
|
|
barrier.wait() # ---------------------------------------- 4
|
|
lock.release_read()
|
|
barrier.wait() # ---------------------------------------- 5
|
|
|
|
# p2 upgrades read to write
|
|
barrier.wait() # ---------------------------------------- 6
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_read(lock_fail_timeout)
|
|
barrier.wait() # ---------------------------------------- 7
|
|
# p2 releases write and read
|
|
barrier.wait() # ---------------------------------------- 8
|
|
|
|
# p3 acquires read
|
|
barrier.wait() # ---------------------------------------- 9
|
|
# p3 upgrades read to write
|
|
barrier.wait() # ---------------------------------------- 10
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_read(lock_fail_timeout)
|
|
barrier.wait() # ---------------------------------------- 11
|
|
# p3 releases locks
|
|
barrier.wait() # ---------------------------------------- 12
|
|
lock.acquire_read()
|
|
barrier.wait() # ---------------------------------------- 13
|
|
lock.release_read()
|
|
|
|
def p2(self, barrier):
|
|
lock = lk.Lock(self.lock_path)
|
|
|
|
# p1 acquires write
|
|
barrier.wait() # ---------------------------------------- 1
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_read(lock_fail_timeout)
|
|
barrier.wait() # ---------------------------------------- 2
|
|
lock.acquire_read()
|
|
barrier.wait() # ---------------------------------------- 3
|
|
# p1 tests shared read
|
|
barrier.wait() # ---------------------------------------- 4
|
|
# others release reads
|
|
barrier.wait() # ---------------------------------------- 5
|
|
|
|
lock.acquire_write() # upgrade read to write
|
|
barrier.wait() # ---------------------------------------- 6
|
|
# others test timeout
|
|
barrier.wait() # ---------------------------------------- 7
|
|
lock.release_write() # release read AND write (need both)
|
|
lock.release_read()
|
|
barrier.wait() # ---------------------------------------- 8
|
|
|
|
# p3 acquires read
|
|
barrier.wait() # ---------------------------------------- 9
|
|
# p3 upgrades read to write
|
|
barrier.wait() # ---------------------------------------- 10
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_read(lock_fail_timeout)
|
|
barrier.wait() # ---------------------------------------- 11
|
|
# p3 releases locks
|
|
barrier.wait() # ---------------------------------------- 12
|
|
lock.acquire_read()
|
|
barrier.wait() # ---------------------------------------- 13
|
|
lock.release_read()
|
|
|
|
def p3(self, barrier):
|
|
lock = lk.Lock(self.lock_path)
|
|
|
|
# p1 acquires write
|
|
barrier.wait() # ---------------------------------------- 1
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_read(lock_fail_timeout)
|
|
barrier.wait() # ---------------------------------------- 2
|
|
lock.acquire_read()
|
|
barrier.wait() # ---------------------------------------- 3
|
|
# p1 tests shared read
|
|
barrier.wait() # ---------------------------------------- 4
|
|
lock.release_read()
|
|
barrier.wait() # ---------------------------------------- 5
|
|
|
|
# p2 upgrades read to write
|
|
barrier.wait() # ---------------------------------------- 6
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_write(lock_fail_timeout)
|
|
with pytest.raises(lk.LockTimeoutError):
|
|
lock.acquire_read(lock_fail_timeout)
|
|
barrier.wait() # ---------------------------------------- 7
|
|
# p2 releases write & read
|
|
barrier.wait() # ---------------------------------------- 8
|
|
|
|
lock.acquire_read()
|
|
barrier.wait() # ---------------------------------------- 9
|
|
lock.acquire_write()
|
|
barrier.wait() # ---------------------------------------- 10
|
|
# others test timeout
|
|
barrier.wait() # ---------------------------------------- 11
|
|
lock.release_read() # release read AND write in opposite
|
|
lock.release_write() # order from before on p2
|
|
barrier.wait() # ---------------------------------------- 12
|
|
lock.acquire_read()
|
|
barrier.wait() # ---------------------------------------- 13
|
|
lock.release_read()
|
|
|
|
|
|
#
|
|
# Longer test case that ensures locks are reusable. Ordering is
|
|
# enforced by barriers throughout -- steps are shown with numbers.
|
|
#
|
|
def test_complex_acquire_and_release_chain(lock_path):
|
|
test_chain = ComplexAcquireAndRelease(lock_path)
|
|
multiproc_test(test_chain.p1,
|
|
test_chain.p2,
|
|
test_chain.p3)
|
|
|
|
|
|
class AssertLock(lk.Lock):
|
|
"""Test lock class that marks acquire/release events."""
|
|
def __init__(self, lock_path, vals):
|
|
super(AssertLock, self).__init__(lock_path)
|
|
self.vals = vals
|
|
|
|
# assert hooks for subclasses
|
|
assert_acquire_read = lambda self: None
|
|
assert_acquire_write = lambda self: None
|
|
assert_release_read = lambda self: None
|
|
assert_release_write = lambda self: None
|
|
|
|
def acquire_read(self, timeout=None):
|
|
self.assert_acquire_read()
|
|
result = super(AssertLock, self).acquire_read(timeout)
|
|
self.vals['acquired_read'] = True
|
|
return result
|
|
|
|
def acquire_write(self, timeout=None):
|
|
self.assert_acquire_write()
|
|
result = super(AssertLock, self).acquire_write(timeout)
|
|
self.vals['acquired_write'] = True
|
|
return result
|
|
|
|
def release_read(self, release_fn=None):
|
|
self.assert_release_read()
|
|
result = super(AssertLock, self).release_read(release_fn)
|
|
self.vals['released_read'] = True
|
|
return result
|
|
|
|
def release_write(self, release_fn=None):
|
|
self.assert_release_write()
|
|
result = super(AssertLock, self).release_write(release_fn)
|
|
self.vals['released_write'] = True
|
|
return result
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"transaction,type",
|
|
[(lk.ReadTransaction, "read"), (lk.WriteTransaction, "write")]
|
|
)
|
|
def test_transaction(lock_path, transaction, type):
|
|
class MockLock(AssertLock):
|
|
def assert_acquire_read(self):
|
|
assert not vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def assert_release_read(self):
|
|
assert vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def assert_acquire_write(self):
|
|
assert not vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def assert_release_write(self):
|
|
assert vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def enter_fn():
|
|
# assert enter_fn is called while lock is held
|
|
assert vals['acquired_%s' % type]
|
|
vals['entered_fn'] = True
|
|
|
|
def exit_fn(t, v, tb):
|
|
# assert exit_fn is called while lock is held
|
|
assert not vals['released_%s' % type]
|
|
vals['exited_fn'] = True
|
|
vals['exception'] = (t or v or tb)
|
|
|
|
vals = collections.defaultdict(lambda: False)
|
|
lock = MockLock(lock_path, vals)
|
|
|
|
with transaction(lock, acquire=enter_fn, release=exit_fn):
|
|
assert vals['acquired_%s' % type]
|
|
assert not vals['released_%s' % type]
|
|
|
|
assert vals['entered_fn']
|
|
assert vals['exited_fn']
|
|
assert vals['acquired_%s' % type]
|
|
assert vals['released_%s' % type]
|
|
assert not vals['exception']
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"transaction,type",
|
|
[(lk.ReadTransaction, "read"), (lk.WriteTransaction, "write")]
|
|
)
|
|
def test_transaction_with_exception(lock_path, transaction, type):
|
|
class MockLock(AssertLock):
|
|
def assert_acquire_read(self):
|
|
assert not vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def assert_release_read(self):
|
|
assert vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def assert_acquire_write(self):
|
|
assert not vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def assert_release_write(self):
|
|
assert vals['entered_fn']
|
|
assert not vals['exited_fn']
|
|
|
|
def enter_fn():
|
|
assert vals['acquired_%s' % type]
|
|
vals['entered_fn'] = True
|
|
|
|
def exit_fn(t, v, tb):
|
|
assert not vals['released_%s' % type]
|
|
vals['exited_fn'] = True
|
|
vals['exception'] = (t or v or tb)
|
|
return exit_result
|
|
|
|
exit_result = False
|
|
vals = collections.defaultdict(lambda: False)
|
|
lock = MockLock(lock_path, vals)
|
|
|
|
with pytest.raises(Exception):
|
|
with transaction(lock, acquire=enter_fn, release=exit_fn):
|
|
raise Exception()
|
|
|
|
assert vals['entered_fn']
|
|
assert vals['exited_fn']
|
|
assert vals['exception']
|
|
|
|
# test suppression of exceptions from exit_fn
|
|
exit_result = True
|
|
vals.clear()
|
|
|
|
# should not raise now.
|
|
with transaction(lock, acquire=enter_fn, release=exit_fn):
|
|
raise Exception()
|
|
|
|
assert vals['entered_fn']
|
|
assert vals['exited_fn']
|
|
assert vals['exception']
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"transaction,type",
|
|
[(lk.ReadTransaction, "read"), (lk.WriteTransaction, "write")]
|
|
)
|
|
def test_transaction_with_context_manager(lock_path, transaction, type):
|
|
class MockLock(AssertLock):
|
|
def assert_acquire_read(self):
|
|
assert not vals['entered_ctx']
|
|
assert not vals['exited_ctx']
|
|
|
|
def assert_release_read(self):
|
|
assert vals['entered_ctx']
|
|
assert vals['exited_ctx']
|
|
|
|
def assert_acquire_write(self):
|
|
assert not vals['entered_ctx']
|
|
assert not vals['exited_ctx']
|
|
|
|
def assert_release_write(self):
|
|
assert vals['entered_ctx']
|
|
assert vals['exited_ctx']
|
|
|
|
class TestContextManager(object):
|
|
def __enter__(self):
|
|
vals['entered_ctx'] = True
|
|
|
|
def __exit__(self, t, v, tb):
|
|
assert not vals['released_%s' % type]
|
|
vals['exited_ctx'] = True
|
|
vals['exception_ctx'] = (t or v or tb)
|
|
return exit_ctx_result
|
|
|
|
def exit_fn(t, v, tb):
|
|
assert not vals['released_%s' % type]
|
|
vals['exited_fn'] = True
|
|
vals['exception_fn'] = (t or v or tb)
|
|
return exit_fn_result
|
|
|
|
exit_fn_result, exit_ctx_result = False, False
|
|
vals = collections.defaultdict(lambda: False)
|
|
lock = MockLock(lock_path, vals)
|
|
|
|
with transaction(lock, acquire=TestContextManager, release=exit_fn):
|
|
pass
|
|
|
|
assert vals['entered_ctx']
|
|
assert vals['exited_ctx']
|
|
assert vals['exited_fn']
|
|
assert not vals['exception_ctx']
|
|
assert not vals['exception_fn']
|
|
|
|
vals.clear()
|
|
with transaction(lock, acquire=TestContextManager):
|
|
pass
|
|
|
|
assert vals['entered_ctx']
|
|
assert vals['exited_ctx']
|
|
assert not vals['exited_fn']
|
|
assert not vals['exception_ctx']
|
|
assert not vals['exception_fn']
|
|
|
|
# below are tests for exceptions with and without suppression
|
|
def assert_ctx_and_fn_exception(raises=True):
|
|
vals.clear()
|
|
|
|
if raises:
|
|
with pytest.raises(Exception):
|
|
with transaction(
|
|
lock, acquire=TestContextManager, release=exit_fn):
|
|
raise Exception()
|
|
else:
|
|
with transaction(
|
|
lock, acquire=TestContextManager, release=exit_fn):
|
|
raise Exception()
|
|
|
|
assert vals['entered_ctx']
|
|
assert vals['exited_ctx']
|
|
assert vals['exited_fn']
|
|
assert vals['exception_ctx']
|
|
assert vals['exception_fn']
|
|
|
|
def assert_only_ctx_exception(raises=True):
|
|
vals.clear()
|
|
|
|
if raises:
|
|
with pytest.raises(Exception):
|
|
with transaction(lock, acquire=TestContextManager):
|
|
raise Exception()
|
|
else:
|
|
with transaction(lock, acquire=TestContextManager):
|
|
raise Exception()
|
|
|
|
assert vals['entered_ctx']
|
|
assert vals['exited_ctx']
|
|
assert not vals['exited_fn']
|
|
assert vals['exception_ctx']
|
|
assert not vals['exception_fn']
|
|
|
|
# no suppression
|
|
assert_ctx_and_fn_exception(raises=True)
|
|
assert_only_ctx_exception(raises=True)
|
|
|
|
# suppress exception only in function
|
|
exit_fn_result, exit_ctx_result = True, False
|
|
assert_ctx_and_fn_exception(raises=False)
|
|
assert_only_ctx_exception(raises=True)
|
|
|
|
# suppress exception only in context
|
|
exit_fn_result, exit_ctx_result = False, True
|
|
assert_ctx_and_fn_exception(raises=False)
|
|
assert_only_ctx_exception(raises=False)
|
|
|
|
# suppress exception in function and context
|
|
exit_fn_result, exit_ctx_result = True, True
|
|
assert_ctx_and_fn_exception(raises=False)
|
|
assert_only_ctx_exception(raises=False)
|
|
|
|
|
|
def test_nested_write_transaction(lock_path):
|
|
"""Ensure that the outermost write transaction writes."""
|
|
|
|
def write(t, v, tb):
|
|
vals['wrote'] = True
|
|
|
|
vals = collections.defaultdict(lambda: False)
|
|
lock = AssertLock(lock_path, vals)
|
|
|
|
# write/write
|
|
with lk.WriteTransaction(lock, release=write):
|
|
assert not vals['wrote']
|
|
with lk.WriteTransaction(lock, release=write):
|
|
assert not vals['wrote']
|
|
assert not vals['wrote']
|
|
assert vals['wrote']
|
|
|
|
# read/write
|
|
vals.clear()
|
|
with lk.ReadTransaction(lock):
|
|
assert not vals['wrote']
|
|
with lk.WriteTransaction(lock, release=write):
|
|
assert not vals['wrote']
|
|
assert vals['wrote']
|
|
|
|
# write/read/write
|
|
vals.clear()
|
|
with lk.WriteTransaction(lock, release=write):
|
|
assert not vals['wrote']
|
|
with lk.ReadTransaction(lock):
|
|
assert not vals['wrote']
|
|
with lk.WriteTransaction(lock, release=write):
|
|
assert not vals['wrote']
|
|
assert not vals['wrote']
|
|
assert not vals['wrote']
|
|
assert vals['wrote']
|
|
|
|
# read/write/read/write
|
|
vals.clear()
|
|
with lk.ReadTransaction(lock):
|
|
with lk.WriteTransaction(lock, release=write):
|
|
assert not vals['wrote']
|
|
with lk.ReadTransaction(lock):
|
|
assert not vals['wrote']
|
|
with lk.WriteTransaction(lock, release=write):
|
|
assert not vals['wrote']
|
|
assert not vals['wrote']
|
|
assert not vals['wrote']
|
|
assert vals['wrote']
|
|
|
|
|
|
def test_nested_reads(lock_path):
|
|
"""Ensure that write transactions won't re-read data."""
|
|
|
|
def read():
|
|
vals['read'] += 1
|
|
|
|
vals = collections.defaultdict(lambda: 0)
|
|
lock = AssertLock(lock_path, vals)
|
|
|
|
# read/read
|
|
vals.clear()
|
|
assert vals['read'] == 0
|
|
with lk.ReadTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.ReadTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
|
|
# write/write
|
|
vals.clear()
|
|
assert vals['read'] == 0
|
|
with lk.WriteTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.WriteTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
|
|
# read/write
|
|
vals.clear()
|
|
assert vals['read'] == 0
|
|
with lk.ReadTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.WriteTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
|
|
# write/read/write
|
|
vals.clear()
|
|
assert vals['read'] == 0
|
|
with lk.WriteTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.ReadTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.WriteTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
|
|
# read/write/read/write
|
|
vals.clear()
|
|
assert vals['read'] == 0
|
|
with lk.ReadTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.WriteTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.ReadTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
with lk.WriteTransaction(lock, acquire=read):
|
|
assert vals['read'] == 1
|
|
|
|
|
|
class LockDebugOutput(object):
|
|
def __init__(self, lock_path):
|
|
self.lock_path = lock_path
|
|
self.host = socket.gethostname()
|
|
|
|
def p1(self, barrier, q1, q2):
|
|
# exchange pids
|
|
p1_pid = os.getpid()
|
|
q1.put(p1_pid)
|
|
p2_pid = q2.get()
|
|
|
|
# set up lock
|
|
lock = lk.Lock(self.lock_path, debug=True)
|
|
|
|
with lk.WriteTransaction(lock):
|
|
# p1 takes write lock and writes pid/host to file
|
|
barrier.wait() # ------------------------------------ 1
|
|
|
|
assert lock.pid == p1_pid
|
|
assert lock.host == self.host
|
|
|
|
# wait for p2 to verify contents of file
|
|
barrier.wait() # ---------------------------------------- 2
|
|
|
|
# wait for p2 to take a write lock
|
|
barrier.wait() # ---------------------------------------- 3
|
|
|
|
# verify pid/host info again
|
|
with lk.ReadTransaction(lock):
|
|
assert lock.old_pid == p1_pid
|
|
assert lock.old_host == self.host
|
|
|
|
assert lock.pid == p2_pid
|
|
assert lock.host == self.host
|
|
|
|
barrier.wait() # ---------------------------------------- 4
|
|
|
|
def p2(self, barrier, q1, q2):
|
|
# exchange pids
|
|
p2_pid = os.getpid()
|
|
p1_pid = q1.get()
|
|
q2.put(p2_pid)
|
|
|
|
# set up lock
|
|
lock = lk.Lock(self.lock_path, debug=True)
|
|
|
|
# p1 takes write lock and writes pid/host to file
|
|
barrier.wait() # ---------------------------------------- 1
|
|
|
|
# verify that p1 wrote information to lock file
|
|
with lk.ReadTransaction(lock):
|
|
assert lock.pid == p1_pid
|
|
assert lock.host == self.host
|
|
|
|
barrier.wait() # ---------------------------------------- 2
|
|
|
|
# take a write lock on the file and verify pid/host info
|
|
with lk.WriteTransaction(lock):
|
|
assert lock.old_pid == p1_pid
|
|
assert lock.old_host == self.host
|
|
|
|
assert lock.pid == p2_pid
|
|
assert lock.host == self.host
|
|
|
|
barrier.wait() # ------------------------------------ 3
|
|
|
|
# wait for p1 to verify pid/host info
|
|
barrier.wait() # ---------------------------------------- 4
|
|
|
|
|
|
def test_lock_debug_output(lock_path):
|
|
test_debug = LockDebugOutput(lock_path)
|
|
q1, q2 = Queue(), Queue()
|
|
local_multiproc_test(test_debug.p2, test_debug.p1, extra_args=(q1, q2))
|
|
|
|
|
|
def test_lock_with_no_parent_directory(tmpdir):
|
|
"""Make sure locks work even when their parent directory does not exist."""
|
|
with tmpdir.as_cwd():
|
|
lock = lk.Lock('foo/bar/baz/lockfile')
|
|
with lk.WriteTransaction(lock):
|
|
pass
|
|
|
|
|
|
def test_lock_in_current_directory(tmpdir):
|
|
"""Make sure locks work even when their parent directory does not exist."""
|
|
with tmpdir.as_cwd():
|
|
# test we can create a lock in the current directory
|
|
lock = lk.Lock('lockfile')
|
|
for i in range(10):
|
|
with lk.ReadTransaction(lock):
|
|
pass
|
|
with lk.WriteTransaction(lock):
|
|
pass
|
|
|
|
# and that we can do the same thing after it's already there
|
|
lock = lk.Lock('lockfile')
|
|
for i in range(10):
|
|
with lk.ReadTransaction(lock):
|
|
pass
|
|
with lk.WriteTransaction(lock):
|
|
pass
|
|
|
|
|
|
def test_attempts_str():
|
|
assert lk._attempts_str(0, 0) == ''
|
|
assert lk._attempts_str(0.12, 1) == ''
|
|
assert lk._attempts_str(12.345, 2) == ' after 12.35s and 2 attempts'
|
|
|
|
|
|
def test_lock_str():
|
|
lock = lk.Lock('lockfile')
|
|
lockstr = str(lock)
|
|
assert 'lockfile[0:0]' in lockstr
|
|
assert 'timeout=None' in lockstr
|
|
assert '#reads=0, #writes=0' in lockstr
|
|
|
|
|
|
def test_downgrade_write_okay(tmpdir):
|
|
"""Test the lock write-to-read downgrade operation."""
|
|
with tmpdir.as_cwd():
|
|
lock = lk.Lock('lockfile')
|
|
lock.acquire_write()
|
|
lock.downgrade_write_to_read()
|
|
assert lock._reads == 1
|
|
assert lock._writes == 0
|
|
|
|
|
|
def test_downgrade_write_fails(tmpdir):
|
|
"""Test failing the lock write-to-read downgrade operation."""
|
|
with tmpdir.as_cwd():
|
|
lock = lk.Lock('lockfile')
|
|
lock.acquire_read()
|
|
msg = 'Cannot downgrade lock from write to read on file: lockfile'
|
|
with pytest.raises(lk.LockDowngradeError, match=msg):
|
|
lock.downgrade_write_to_read()
|
|
|
|
|
|
@pytest.mark.parametrize("err_num,err_msg",
|
|
[(errno.EACCES, "Fake EACCES error"),
|
|
(errno.EAGAIN, "Fake EAGAIN error"),
|
|
(errno.ENOENT, "Fake ENOENT error")])
|
|
def test_poll_lock_exception(tmpdir, monkeypatch, err_num, err_msg):
|
|
"""Test poll lock exception handling."""
|
|
def _lockf(fd, cmd, len, start, whence):
|
|
raise IOError(err_num, err_msg)
|
|
|
|
with tmpdir.as_cwd():
|
|
lockfile = 'lockfile'
|
|
lock = lk.Lock(lockfile)
|
|
|
|
touch(lockfile)
|
|
|
|
monkeypatch.setattr(fcntl, 'lockf', _lockf)
|
|
|
|
if err_num in [errno.EAGAIN, errno.EACCES]:
|
|
assert not lock._poll_lock(fcntl.LOCK_EX)
|
|
else:
|
|
with pytest.raises(IOError, match=err_msg):
|
|
lock._poll_lock(fcntl.LOCK_EX)
|
|
|
|
|
|
def test_upgrade_read_okay(tmpdir):
|
|
"""Test the lock read-to-write upgrade operation."""
|
|
with tmpdir.as_cwd():
|
|
lock = lk.Lock('lockfile')
|
|
lock.acquire_read()
|
|
lock.upgrade_read_to_write()
|
|
assert lock._reads == 0
|
|
assert lock._writes == 1
|
|
|
|
|
|
def test_upgrade_read_fails(tmpdir):
|
|
"""Test failing the lock read-to-write upgrade operation."""
|
|
with tmpdir.as_cwd():
|
|
lock = lk.Lock('lockfile')
|
|
lock.acquire_write()
|
|
msg = 'Cannot upgrade lock from read to write on file: lockfile'
|
|
with pytest.raises(lk.LockUpgradeError, match=msg):
|
|
lock.upgrade_read_to_write()
|