llnl.util.lock: add type-hints (#38977)

Also uppercase global variables in the module
This commit is contained in:
Massimiliano Culpo 2023-07-19 11:23:08 +02:00 committed by GitHub
parent a7f2abf924
commit f34c93c5f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 71 deletions

View File

@ -9,9 +9,10 @@
import sys import sys
import time import time
from datetime import datetime from datetime import datetime
from types import TracebackType
from typing import IO, Any, Callable, ContextManager, Dict, Generator, Optional, Tuple, Type, Union
import llnl.util.tty as tty from llnl.util import lang, tty
from llnl.util.lang import pretty_seconds
import spack.util.string import spack.util.string
@ -34,9 +35,12 @@
] ]
#: A useful replacement for functions that should return True when not provided ReleaseFnType = Optional[Callable[[], bool]]
#: for example.
true_fn = lambda: True
def true_fn() -> bool:
"""A function that always returns True."""
return True
class OpenFile: class OpenFile:
@ -48,7 +52,7 @@ class OpenFile:
file descriptors as well in the future. file descriptors as well in the future.
""" """
def __init__(self, fh): def __init__(self, fh: IO) -> None:
self.fh = fh self.fh = fh
self.refs = 0 self.refs = 0
@ -78,11 +82,11 @@ class OpenFileTracker:
work in Python and assume the GIL. work in Python and assume the GIL.
""" """
def __init__(self): def __init__(self) -> None:
"""Create a new ``OpenFileTracker``.""" """Create a new ``OpenFileTracker``."""
self._descriptors = {} self._descriptors: Dict[Any, OpenFile] = {}
def get_fh(self, path): def get_fh(self, path: str) -> IO:
"""Get a filehandle for a lockfile. """Get a filehandle for a lockfile.
This routine will open writable files for read/write even if you're asking This routine will open writable files for read/write even if you're asking
@ -90,7 +94,7 @@ def get_fh(self, path):
(write) lock later if requested. (write) lock later if requested.
Arguments: Arguments:
path (str): path to lock file we want a filehandle for path: path to lock file we want a filehandle for
""" """
# Open writable files as 'r+' so we can upgrade to write later # Open writable files as 'r+' so we can upgrade to write later
os_mode, fh_mode = (os.O_RDWR | os.O_CREAT), "r+" os_mode, fh_mode = (os.O_RDWR | os.O_CREAT), "r+"
@ -157,7 +161,7 @@ def purge(self):
#: Open file descriptors for locks in this process. Used to prevent one process #: Open file descriptors for locks in this process. Used to prevent one process
#: from opening the sam file many times for different byte range locks #: from opening the sam file many times for different byte range locks
file_tracker = OpenFileTracker() FILE_TRACKER = OpenFileTracker()
def _attempts_str(wait_time, nattempts): def _attempts_str(wait_time, nattempts):
@ -166,7 +170,7 @@ def _attempts_str(wait_time, nattempts):
return "" return ""
attempts = spack.util.string.plural(nattempts, "attempt") attempts = spack.util.string.plural(nattempts, "attempt")
return " after {} and {}".format(pretty_seconds(wait_time), attempts) return " after {} and {}".format(lang.pretty_seconds(wait_time), attempts)
class LockType: class LockType:
@ -188,7 +192,7 @@ def to_module(tid):
return lock return lock
@staticmethod @staticmethod
def is_valid(op): def is_valid(op: int) -> bool:
return op == LockType.READ or op == LockType.WRITE return op == LockType.READ or op == LockType.WRITE
@ -207,7 +211,15 @@ class Lock:
overlapping byte ranges in the same file). overlapping byte ranges in the same file).
""" """
def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, desc=""): def __init__(
self,
path: str,
start: int = 0,
length: int = 0,
default_timeout: Optional[float] = None,
debug: bool = False,
desc: str = "",
) -> None:
"""Construct a new lock on the file at ``path``. """Construct a new lock on the file at ``path``.
By default, the lock applies to the whole file. Optionally, By default, the lock applies to the whole file. Optionally,
@ -220,17 +232,17 @@ def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, d
beginning of the file. beginning of the file.
Args: Args:
path (str): path to the lock path: path to the lock
start (int): optional byte offset at which the lock starts start: optional byte offset at which the lock starts
length (int): optional number of bytes to lock length: optional number of bytes to lock
default_timeout (int): number of seconds to wait for lock attempts, default_timeout: seconds to wait for lock attempts,
where None means to wait indefinitely where None means to wait indefinitely
debug (bool): debug mode specific to locking debug: debug mode specific to locking
desc (str): optional debug message lock description, which is desc: optional debug message lock description, which is
helpful for distinguishing between different Spack locks. helpful for distinguishing between different Spack locks.
""" """
self.path = path self.path = path
self._file = None self._file: Optional[IO] = None
self._reads = 0 self._reads = 0
self._writes = 0 self._writes = 0
@ -242,7 +254,7 @@ def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, d
self.debug = debug self.debug = debug
# optional debug description # optional debug description
self.desc = " ({0})".format(desc) if desc else "" self.desc = f" ({desc})" if desc else ""
# If the user doesn't set a default timeout, or if they choose # If the user doesn't set a default timeout, or if they choose
# None, 0, etc. then lock attempts will not time out (unless the # None, 0, etc. then lock attempts will not time out (unless the
@ -250,11 +262,15 @@ def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, d
self.default_timeout = default_timeout or None self.default_timeout = default_timeout or None
# PID and host of lock holder (only used in debug mode) # PID and host of lock holder (only used in debug mode)
self.pid = self.old_pid = None self.pid: Optional[int] = None
self.host = self.old_host = None self.old_pid: Optional[int] = None
self.host: Optional[str] = None
self.old_host: Optional[str] = None
@staticmethod @staticmethod
def _poll_interval_generator(_wait_times=None): def _poll_interval_generator(
_wait_times: Optional[Tuple[float, float, float]] = None
) -> Generator[float, None, None]:
"""This implements a backoff scheme for polling a contended resource """This implements a backoff scheme for polling a contended resource
by suggesting a succession of wait times between polls. by suggesting a succession of wait times between polls.
@ -277,21 +293,21 @@ def _poll_interval_generator(_wait_times=None):
num_requests += 1 num_requests += 1
yield wait_time yield wait_time
def __repr__(self): def __repr__(self) -> str:
"""Formal representation of the lock.""" """Formal representation of the lock."""
rep = "{0}(".format(self.__class__.__name__) rep = "{0}(".format(self.__class__.__name__)
for attr, value in self.__dict__.items(): for attr, value in self.__dict__.items():
rep += "{0}={1}, ".format(attr, value.__repr__()) rep += "{0}={1}, ".format(attr, value.__repr__())
return "{0})".format(rep.strip(", ")) return "{0})".format(rep.strip(", "))
def __str__(self): def __str__(self) -> str:
"""Readable string (with key fields) of the lock.""" """Readable string (with key fields) of the lock."""
location = "{0}[{1}:{2}]".format(self.path, self._start, self._length) location = "{0}[{1}:{2}]".format(self.path, self._start, self._length)
timeout = "timeout={0}".format(self.default_timeout) timeout = "timeout={0}".format(self.default_timeout)
activity = "#reads={0}, #writes={1}".format(self._reads, self._writes) activity = "#reads={0}, #writes={1}".format(self._reads, self._writes)
return "({0}, {1}, {2})".format(location, timeout, activity) return "({0}, {1}, {2})".format(location, timeout, activity)
def _lock(self, op, timeout=None): def _lock(self, op: int, timeout: Optional[float] = None) -> Tuple[float, int]:
"""This takes a lock using POSIX locks (``fcntl.lockf``). """This takes a lock using POSIX locks (``fcntl.lockf``).
The lock is implemented as a spin lock using a nonblocking call The lock is implemented as a spin lock using a nonblocking call
@ -310,7 +326,7 @@ def _lock(self, op, timeout=None):
# Create file and parent directories if they don't exist. # Create file and parent directories if they don't exist.
if self._file is None: if self._file is None:
self._ensure_parent_directory() self._ensure_parent_directory()
self._file = file_tracker.get_fh(self.path) self._file = FILE_TRACKER.get_fh(self.path)
if LockType.to_module(op) == fcntl.LOCK_EX and self._file.mode == "r": if LockType.to_module(op) == fcntl.LOCK_EX and self._file.mode == "r":
# Attempt to upgrade to write lock w/a read-only file. # Attempt to upgrade to write lock w/a read-only file.
@ -319,7 +335,7 @@ def _lock(self, op, timeout=None):
self._log_debug( self._log_debug(
"{} locking [{}:{}]: timeout {}".format( "{} locking [{}:{}]: timeout {}".format(
op_str.lower(), self._start, self._length, pretty_seconds(timeout or 0) op_str.lower(), self._start, self._length, lang.pretty_seconds(timeout or 0)
) )
) )
@ -343,15 +359,20 @@ def _lock(self, op, timeout=None):
total_wait_time = time.time() - start_time total_wait_time = time.time() - start_time
raise LockTimeoutError(op_str.lower(), self.path, total_wait_time, num_attempts) raise LockTimeoutError(op_str.lower(), self.path, total_wait_time, num_attempts)
def _poll_lock(self, op): def _poll_lock(self, op: int) -> bool:
"""Attempt to acquire the lock in a non-blocking manner. Return whether """Attempt to acquire the lock in a non-blocking manner. Return whether
the locking attempt succeeds the locking attempt succeeds
""" """
assert self._file is not None, "cannot poll a lock without the file being set"
module_op = LockType.to_module(op) module_op = LockType.to_module(op)
try: try:
# Try to get the lock (will raise if not available.) # Try to get the lock (will raise if not available.)
fcntl.lockf( fcntl.lockf(
self._file, module_op | fcntl.LOCK_NB, self._length, self._start, os.SEEK_SET self._file.fileno(),
module_op | fcntl.LOCK_NB,
self._length,
self._start,
os.SEEK_SET,
) )
# help for debugging distributed locking # help for debugging distributed locking
@ -377,7 +398,7 @@ def _poll_lock(self, op):
return False return False
def _ensure_parent_directory(self): def _ensure_parent_directory(self) -> str:
parent = os.path.dirname(self.path) parent = os.path.dirname(self.path)
# relative paths to lockfiles in the current directory have no parent # relative paths to lockfiles in the current directory have no parent
@ -396,20 +417,22 @@ def _ensure_parent_directory(self):
raise raise
return parent return parent
def _read_log_debug_data(self): def _read_log_debug_data(self) -> None:
"""Read PID and host data out of the file if it is there.""" """Read PID and host data out of the file if it is there."""
assert self._file is not None, "cannot read debug log without the file being set"
self.old_pid = self.pid self.old_pid = self.pid
self.old_host = self.host self.old_host = self.host
line = self._file.read() line = self._file.read()
if line: if line:
pid, host = line.strip().split(",") pid, host = line.strip().split(",")
_, _, self.pid = pid.rpartition("=") _, _, pid = pid.rpartition("=")
_, _, self.host = host.rpartition("=") _, _, self.host = host.rpartition("=")
self.pid = int(self.pid) self.pid = int(pid)
def _write_log_debug_data(self): def _write_log_debug_data(self) -> None:
"""Write PID and host data to the file, recording old values.""" """Write PID and host data to the file, recording old values."""
assert self._file is not None, "cannot write debug log without the file being set"
self.old_pid = self.pid self.old_pid = self.pid
self.old_host = self.host self.old_host = self.host
@ -423,20 +446,21 @@ def _write_log_debug_data(self):
self._file.flush() self._file.flush()
os.fsync(self._file.fileno()) os.fsync(self._file.fileno())
def _unlock(self): def _unlock(self) -> None:
"""Releases a lock using POSIX locks (``fcntl.lockf``) """Releases a lock using POSIX locks (``fcntl.lockf``)
Releases the lock regardless of mode. Note that read locks may Releases the lock regardless of mode. Note that read locks may
be masquerading as write locks, but this removes either. be masquerading as write locks, but this removes either.
""" """
fcntl.lockf(self._file, fcntl.LOCK_UN, self._length, self._start, os.SEEK_SET) assert self._file is not None, "cannot unlock without the file being set"
file_tracker.release_by_fh(self._file) fcntl.lockf(self._file.fileno(), fcntl.LOCK_UN, self._length, self._start, os.SEEK_SET)
FILE_TRACKER.release_by_fh(self._file)
self._file = None self._file = None
self._reads = 0 self._reads = 0
self._writes = 0 self._writes = 0
def acquire_read(self, timeout=None): def acquire_read(self, timeout: Optional[float] = None) -> bool:
"""Acquires a recursive, shared lock for reading. """Acquires a recursive, shared lock for reading.
Read and write locks can be acquired and released in arbitrary Read and write locks can be acquired and released in arbitrary
@ -461,7 +485,7 @@ def acquire_read(self, timeout=None):
self._reads += 1 self._reads += 1
return False return False
def acquire_write(self, timeout=None): def acquire_write(self, timeout: Optional[float] = None) -> bool:
"""Acquires a recursive, exclusive lock for writing. """Acquires a recursive, exclusive lock for writing.
Read and write locks can be acquired and released in arbitrary Read and write locks can be acquired and released in arbitrary
@ -491,7 +515,7 @@ def acquire_write(self, timeout=None):
self._writes += 1 self._writes += 1
return False return False
def is_write_locked(self): def is_write_locked(self) -> bool:
"""Check if the file is write locked """Check if the file is write locked
Return: Return:
@ -508,7 +532,7 @@ def is_write_locked(self):
return False return False
def downgrade_write_to_read(self, timeout=None): def downgrade_write_to_read(self, timeout: Optional[float] = None) -> None:
""" """
Downgrade from an exclusive write lock to a shared read. Downgrade from an exclusive write lock to a shared read.
@ -527,7 +551,7 @@ def downgrade_write_to_read(self, timeout=None):
else: else:
raise LockDowngradeError(self.path) raise LockDowngradeError(self.path)
def upgrade_read_to_write(self, timeout=None): def upgrade_read_to_write(self, timeout: Optional[float] = None) -> None:
""" """
Attempts to upgrade from a shared read lock to an exclusive write. Attempts to upgrade from a shared read lock to an exclusive write.
@ -546,7 +570,7 @@ def upgrade_read_to_write(self, timeout=None):
else: else:
raise LockUpgradeError(self.path) raise LockUpgradeError(self.path)
def release_read(self, release_fn=None): def release_read(self, release_fn: ReleaseFnType = None) -> bool:
"""Releases a read lock. """Releases a read lock.
Arguments: Arguments:
@ -582,7 +606,7 @@ def release_read(self, release_fn=None):
self._reads -= 1 self._reads -= 1
return False return False
def release_write(self, release_fn=None): def release_write(self, release_fn: ReleaseFnType = None) -> bool:
"""Releases a write lock. """Releases a write lock.
Arguments: Arguments:
@ -623,58 +647,58 @@ def release_write(self, release_fn=None):
else: else:
return False return False
def cleanup(self): def cleanup(self) -> None:
if self._reads == 0 and self._writes == 0: if self._reads == 0 and self._writes == 0:
os.unlink(self.path) os.unlink(self.path)
else: else:
raise LockError("Attempting to cleanup active lock.") raise LockError("Attempting to cleanup active lock.")
def _get_counts_desc(self): def _get_counts_desc(self) -> str:
return ( return (
"(reads {0}, writes {1})".format(self._reads, self._writes) if tty.is_verbose() else "" "(reads {0}, writes {1})".format(self._reads, self._writes) if tty.is_verbose() else ""
) )
def _log_acquired(self, locktype, wait_time, nattempts): def _log_acquired(self, locktype, wait_time, nattempts) -> None:
attempts_part = _attempts_str(wait_time, nattempts) attempts_part = _attempts_str(wait_time, nattempts)
now = datetime.now() now = datetime.now()
desc = "Acquired at %s" % now.strftime("%H:%M:%S.%f") desc = "Acquired at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg(locktype, "{0}{1}".format(desc, attempts_part))) self._log_debug(self._status_msg(locktype, "{0}{1}".format(desc, attempts_part)))
def _log_acquiring(self, locktype): def _log_acquiring(self, locktype) -> None:
self._log_debug(self._status_msg(locktype, "Acquiring"), level=3) self._log_debug(self._status_msg(locktype, "Acquiring"), level=3)
def _log_debug(self, *args, **kwargs): def _log_debug(self, *args, **kwargs) -> None:
"""Output lock debug messages.""" """Output lock debug messages."""
kwargs["level"] = kwargs.get("level", 2) kwargs["level"] = kwargs.get("level", 2)
tty.debug(*args, **kwargs) tty.debug(*args, **kwargs)
def _log_downgraded(self, wait_time, nattempts): def _log_downgraded(self, wait_time, nattempts) -> None:
attempts_part = _attempts_str(wait_time, nattempts) attempts_part = _attempts_str(wait_time, nattempts)
now = datetime.now() now = datetime.now()
desc = "Downgraded at %s" % now.strftime("%H:%M:%S.%f") desc = "Downgraded at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg("READ LOCK", "{0}{1}".format(desc, attempts_part))) self._log_debug(self._status_msg("READ LOCK", "{0}{1}".format(desc, attempts_part)))
def _log_downgrading(self): def _log_downgrading(self) -> None:
self._log_debug(self._status_msg("WRITE LOCK", "Downgrading"), level=3) self._log_debug(self._status_msg("WRITE LOCK", "Downgrading"), level=3)
def _log_released(self, locktype): def _log_released(self, locktype) -> None:
now = datetime.now() now = datetime.now()
desc = "Released at %s" % now.strftime("%H:%M:%S.%f") desc = "Released at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg(locktype, desc)) self._log_debug(self._status_msg(locktype, desc))
def _log_releasing(self, locktype): def _log_releasing(self, locktype) -> None:
self._log_debug(self._status_msg(locktype, "Releasing"), level=3) self._log_debug(self._status_msg(locktype, "Releasing"), level=3)
def _log_upgraded(self, wait_time, nattempts): def _log_upgraded(self, wait_time, nattempts) -> None:
attempts_part = _attempts_str(wait_time, nattempts) attempts_part = _attempts_str(wait_time, nattempts)
now = datetime.now() now = datetime.now()
desc = "Upgraded at %s" % now.strftime("%H:%M:%S.%f") desc = "Upgraded at %s" % now.strftime("%H:%M:%S.%f")
self._log_debug(self._status_msg("WRITE LOCK", "{0}{1}".format(desc, attempts_part))) self._log_debug(self._status_msg("WRITE LOCK", "{0}{1}".format(desc, attempts_part)))
def _log_upgrading(self): def _log_upgrading(self) -> None:
self._log_debug(self._status_msg("READ LOCK", "Upgrading"), level=3) self._log_debug(self._status_msg("READ LOCK", "Upgrading"), level=3)
def _status_msg(self, locktype, status): def _status_msg(self, locktype: str, status: str) -> str:
status_desc = "[{0}] {1}".format(status, self._get_counts_desc()) status_desc = "[{0}] {1}".format(status, self._get_counts_desc())
return "{0}{1.desc}: {1.path}[{1._start}:{1._length}] {2}".format( return "{0}{1.desc}: {1.path}[{1._start}:{1._length}] {2}".format(
locktype, self, status_desc locktype, self, status_desc
@ -709,7 +733,13 @@ class LockTransaction:
""" """
def __init__(self, lock, acquire=None, release=None, timeout=None): def __init__(
self,
lock: Lock,
acquire: Union[ReleaseFnType, ContextManager] = None,
release: Union[ReleaseFnType, ContextManager] = None,
timeout: Optional[float] = None,
) -> None:
self._lock = lock self._lock = lock
self._timeout = timeout self._timeout = timeout
self._acquire_fn = acquire self._acquire_fn = acquire
@ -724,15 +754,20 @@ def __enter__(self):
else: else:
return self._as return self._as
def __exit__(self, type, value, traceback): def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool:
suppress = False suppress = False
def release_fn(): def release_fn():
if self._release_fn is not None: if self._release_fn is not None:
return self._release_fn(type, value, traceback) return self._release_fn(exc_type, exc_value, traceback)
if self._as and hasattr(self._as, "__exit__"): if self._as and hasattr(self._as, "__exit__"):
if self._as.__exit__(type, value, traceback): if self._as.__exit__(exc_type, exc_value, traceback):
suppress = True suppress = True
if self._exit(release_fn): if self._exit(release_fn):
@ -740,6 +775,12 @@ def release_fn():
return suppress return suppress
def _enter(self) -> bool:
return NotImplemented
def _exit(self, release_fn: ReleaseFnType) -> bool:
return NotImplemented
class ReadTransaction(LockTransaction): class ReadTransaction(LockTransaction):
"""LockTransaction context manager that does a read and releases it.""" """LockTransaction context manager that does a read and releases it."""
@ -785,7 +826,7 @@ def __init__(self, lock_type, path, time, attempts):
super().__init__( super().__init__(
fmt.format( fmt.format(
lock_type, lock_type,
pretty_seconds(time), lang.pretty_seconds(time),
attempts, attempts,
"attempt" if attempts == 1 else "attempts", "attempt" if attempts == 1 else "attempts",
path, path,

View File

@ -1701,15 +1701,15 @@ def mock_test_stage(mutable_config, tmpdir):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def inode_cache(): def inode_cache():
llnl.util.lock.file_tracker.purge() llnl.util.lock.FILE_TRACKER.purge()
yield yield
# TODO: it is a bug when the file tracker is non-empty after a test, # TODO: it is a bug when the file tracker is non-empty after a test,
# since it means a lock was not released, or the inode was not purged # since it means a lock was not released, or the inode was not purged
# when acquiring the lock failed. So, we could assert that here, but # when acquiring the lock failed. So, we could assert that here, but
# currently there are too many issues to fix, so look for the more # currently there are too many issues to fix, so look for the more
# serious issue of having a closed file descriptor in the cache. # serious issue of having a closed file descriptor in the cache.
assert not any(f.fh.closed for f in llnl.util.lock.file_tracker._descriptors.values()) assert not any(f.fh.closed for f in llnl.util.lock.FILE_TRACKER._descriptors.values())
llnl.util.lock.file_tracker.purge() llnl.util.lock.FILE_TRACKER.purge()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)

View File

@ -687,8 +687,8 @@ def test_upgrade_read_to_write_fails_with_readonly_file(private_lock_path):
with pytest.raises(lk.LockROFileError): with pytest.raises(lk.LockROFileError):
lock.acquire_write() lock.acquire_write()
# TODO: lk.file_tracker does not release private_lock_path # TODO: lk.FILE_TRACKER does not release private_lock_path
lk.file_tracker.release_by_stat(os.stat(private_lock_path)) lk.FILE_TRACKER.release_by_stat(os.stat(private_lock_path))
class ComplexAcquireAndRelease: class ComplexAcquireAndRelease:
@ -1345,8 +1345,7 @@ def _lockf(fd, cmd, len, start, whence):
with tmpdir.as_cwd(): with tmpdir.as_cwd():
lockfile = "lockfile" lockfile = "lockfile"
lock = lk.Lock(lockfile) lock = lk.Lock(lockfile)
lock.acquire_read()
touch(lockfile)
monkeypatch.setattr(fcntl, "lockf", _lockf) monkeypatch.setattr(fcntl, "lockf", _lockf)
@ -1356,6 +1355,9 @@ def _lockf(fd, cmd, len, start, whence):
with pytest.raises(IOError, match=err_msg): with pytest.raises(IOError, match=err_msg):
lock._poll_lock(fcntl.LOCK_EX) lock._poll_lock(fcntl.LOCK_EX)
monkeypatch.undo()
lock.release_read()
def test_upgrade_read_okay(tmpdir): def test_upgrade_read_okay(tmpdir):
"""Test the lock read-to-write upgrade operation.""" """Test the lock read-to-write upgrade operation."""