Increase and customize lock timeouts (#9219)
Fixes #9166 This is intended to reduce errors related to lock timeouts by making the following changes: * Improves error reporting when acquiring a lock fails (addressing #9166) - there is no longer an attempt to release the lock if an acquire fails * By default locks taken on individual packages no longer have a timeout. This allows multiple spack instances to install overlapping dependency DAGs. For debugging purposes, a timeout can be added by setting 'package_lock_timeout' in config.yaml * Reduces the polling frequency when trying to acquire a lock, to reduce impact in the case where NFS is overtaxed. A simple adaptive strategy is implemented, which starts with a polling interval of .1 seconds and quickly increases to .5 seconds (originally it would poll up to 10^5 times per second). A test is added to check the polling interval generation logic. * The timeout for Spack's whole-database lock (e.g. for managing information about installed packages) is increased from 60s to 120s * Users can configure the whole-database lock timeout using the 'db_lock_timout' setting in config.yaml Generally, Spack locks (those created using spack.llnl.util.lock.Lock) now have no timeout by default This does not address implementations of NFS that do not support file locking, or detect cases where services that may be required (nfslock/statd) aren't running. Users may want to be able to more-aggressively release locks when they know they are the only one using their Spack instance, and they encounter lock errors after a crash (e.g. a remote terminal disconnect mentioned in #8915).
This commit is contained in:
parent
6c0f0dbdfd
commit
28c0dd9148
@ -94,3 +94,16 @@ config:
|
||||
|
||||
# If set to true, spack will use ccache to cache c compiles.
|
||||
ccache: false
|
||||
|
||||
# How long to wait to lock the Spack installation database. This lock is used
|
||||
# when spack needs to manage its own package metadata and all operations are
|
||||
# expected to complete within the default time limit. The timeout should
|
||||
# therefore generally be left untouched.
|
||||
db_lock_timeout: 120
|
||||
|
||||
# How long to wait when attempting to modify a package (e.g. to install it).
|
||||
# This value should typically be 'null' (never time out) unless the Spack
|
||||
# instance only ever has a single user at a time, and only if the user
|
||||
# anticipates that a significant delay indicates that the lock attempt will
|
||||
# never succeed.
|
||||
package_lock_timeout: null
|
||||
|
@ -36,13 +36,6 @@
|
||||
'LockPermissionError', 'LockROFileError', 'CantCreateLockError']
|
||||
|
||||
|
||||
# Default timeout in seconds, after which locks will raise exceptions.
|
||||
_default_timeout = 60
|
||||
|
||||
# Sleep time per iteration in spin loop (in seconds)
|
||||
_sleep_time = 1e-5
|
||||
|
||||
|
||||
class Lock(object):
|
||||
"""This is an implementation of a filesystem lock using Python's lockf.
|
||||
|
||||
@ -50,9 +43,15 @@ class Lock(object):
|
||||
any filesystem implementation that supports locking through the fcntl
|
||||
calls. This includes distributed filesystems like Lustre (when flock
|
||||
is enabled) and recent NFS versions.
|
||||
|
||||
Note that this is for managing contention over resources *between*
|
||||
processes and not for managing contention between threads in a process: the
|
||||
functions of this object are not thread-safe. A process also must not
|
||||
maintain multiple locks on the same file.
|
||||
"""
|
||||
|
||||
def __init__(self, path, start=0, length=0, debug=False):
|
||||
def __init__(self, path, start=0, length=0, debug=False,
|
||||
default_timeout=None):
|
||||
"""Construct a new lock on the file at ``path``.
|
||||
|
||||
By default, the lock applies to the whole file. Optionally,
|
||||
@ -76,11 +75,40 @@ def __init__(self, path, start=0, length=0, debug=False):
|
||||
# enable debug mode
|
||||
self.debug = debug
|
||||
|
||||
# If the user doesn't set a default timeout, or if they choose
|
||||
# None, 0, etc. then lock attempts will not time out (unless the
|
||||
# user sets a timeout for each attempt)
|
||||
self.default_timeout = default_timeout or None
|
||||
|
||||
# PID and host of lock holder (only used in debug mode)
|
||||
self.pid = self.old_pid = None
|
||||
self.host = self.old_host = None
|
||||
|
||||
def _lock(self, op, timeout=_default_timeout):
|
||||
@staticmethod
|
||||
def _poll_interval_generator(_wait_times=None):
|
||||
"""This implements a backoff scheme for polling a contended resource
|
||||
by suggesting a succession of wait times between polls.
|
||||
|
||||
It suggests a poll interval of .1s until 2 seconds have passed,
|
||||
then a poll interval of .2s until 10 seconds have passed, and finally
|
||||
(for all requests after 10s) suggests a poll interval of .5s.
|
||||
|
||||
This doesn't actually track elapsed time, it estimates the waiting
|
||||
time as though the caller always waits for the full length of time
|
||||
suggested by this function.
|
||||
"""
|
||||
num_requests = 0
|
||||
stage1, stage2, stage3 = _wait_times or (1e-1, 2e-1, 5e-1)
|
||||
wait_time = stage1
|
||||
while True:
|
||||
if num_requests >= 60: # 40 * .2 = 8
|
||||
wait_time = stage3
|
||||
elif num_requests >= 20: # 20 * .1 = 2
|
||||
wait_time = stage2
|
||||
num_requests += 1
|
||||
yield wait_time
|
||||
|
||||
def _lock(self, op, timeout=None):
|
||||
"""This takes a lock using POSIX locks (``fcntl.lockf``).
|
||||
|
||||
The lock is implemented as a spin lock using a nonblocking call
|
||||
@ -90,64 +118,84 @@ def _lock(self, op, timeout=_default_timeout):
|
||||
pid and host to the lock file, in case the holding process needs
|
||||
to be killed later.
|
||||
|
||||
If the lock times out, it raises a ``LockError``.
|
||||
If the lock times out, it raises a ``LockError``. If the lock is
|
||||
successfully acquired, the total wait time and the number of attempts
|
||||
is returned.
|
||||
"""
|
||||
assert op in (fcntl.LOCK_SH, fcntl.LOCK_EX)
|
||||
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
# Create file and parent directories if they don't exist.
|
||||
if self._file is None:
|
||||
parent = self._ensure_parent_directory()
|
||||
|
||||
# Open writable files as 'r+' so we can upgrade to write later
|
||||
os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+'
|
||||
if os.path.exists(self.path):
|
||||
if not os.access(self.path, os.W_OK):
|
||||
if op == fcntl.LOCK_SH:
|
||||
# can still lock read-only files if we open 'r'
|
||||
os_mode, fd_mode = os.O_RDONLY, 'r'
|
||||
else:
|
||||
raise LockROFileError(self.path)
|
||||
|
||||
elif not os.access(parent, os.W_OK):
|
||||
raise CantCreateLockError(self.path)
|
||||
|
||||
fd = os.open(self.path, os_mode)
|
||||
self._file = os.fdopen(fd, fd_mode)
|
||||
|
||||
elif op == fcntl.LOCK_EX and self._file.mode == 'r':
|
||||
# Attempt to upgrade to write lock w/a read-only file.
|
||||
# If the file were writable, we'd have opened it 'r+'
|
||||
raise LockROFileError(self.path)
|
||||
|
||||
poll_intervals = iter(Lock._poll_interval_generator())
|
||||
start_time = time.time()
|
||||
while (time.time() - start_time) < timeout:
|
||||
# Create file and parent directories if they don't exist.
|
||||
if self._file is None:
|
||||
parent = self._ensure_parent_directory()
|
||||
num_attempts = 0
|
||||
while (not timeout) or (time.time() - start_time) < timeout:
|
||||
num_attempts += 1
|
||||
if self._poll_lock(op):
|
||||
total_wait_time = time.time() - start_time
|
||||
return total_wait_time, num_attempts
|
||||
|
||||
# Open writable files as 'r+' so we can upgrade to write later
|
||||
os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+'
|
||||
if os.path.exists(self.path):
|
||||
if not os.access(self.path, os.W_OK):
|
||||
if op == fcntl.LOCK_SH:
|
||||
# can still lock read-only files if we open 'r'
|
||||
os_mode, fd_mode = os.O_RDONLY, 'r'
|
||||
else:
|
||||
raise LockROFileError(self.path)
|
||||
time.sleep(next(poll_intervals))
|
||||
|
||||
elif not os.access(parent, os.W_OK):
|
||||
raise CantCreateLockError(self.path)
|
||||
|
||||
fd = os.open(self.path, os_mode)
|
||||
self._file = os.fdopen(fd, fd_mode)
|
||||
|
||||
elif op == fcntl.LOCK_EX and self._file.mode == 'r':
|
||||
# Attempt to upgrade to write lock w/a read-only file.
|
||||
# If the file were writable, we'd have opened it 'r+'
|
||||
raise LockROFileError(self.path)
|
||||
|
||||
try:
|
||||
# Try to get the lock (will raise if not available.)
|
||||
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
|
||||
self._length, self._start, os.SEEK_SET)
|
||||
|
||||
# help for debugging distributed locking
|
||||
if self.debug:
|
||||
# All locks read the owner PID and host
|
||||
self._read_debug_data()
|
||||
|
||||
# Exclusive locks write their PID/host
|
||||
if op == fcntl.LOCK_EX:
|
||||
self._write_debug_data()
|
||||
|
||||
return
|
||||
|
||||
except IOError as e:
|
||||
if e.errno in (errno.EAGAIN, errno.EACCES):
|
||||
# EAGAIN and EACCES == locked by another process
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
time.sleep(_sleep_time)
|
||||
num_attempts += 1
|
||||
if self._poll_lock(op):
|
||||
total_wait_time = time.time() - start_time
|
||||
return total_wait_time, num_attempts
|
||||
|
||||
raise LockTimeoutError("Timed out waiting for lock.")
|
||||
|
||||
def _poll_lock(self, op):
|
||||
"""Attempt to acquire the lock in a non-blocking manner. Return whether
|
||||
the locking attempt succeeds
|
||||
"""
|
||||
try:
|
||||
# Try to get the lock (will raise if not available.)
|
||||
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
|
||||
self._length, self._start, os.SEEK_SET)
|
||||
|
||||
# help for debugging distributed locking
|
||||
if self.debug:
|
||||
# All locks read the owner PID and host
|
||||
self._read_debug_data()
|
||||
|
||||
# Exclusive locks write their PID/host
|
||||
if op == fcntl.LOCK_EX:
|
||||
self._write_debug_data()
|
||||
|
||||
return True
|
||||
|
||||
except IOError as e:
|
||||
if e.errno in (errno.EAGAIN, errno.EACCES):
|
||||
# EAGAIN and EACCES == locked by another process
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
def _ensure_parent_directory(self):
|
||||
parent = os.path.dirname(self.path)
|
||||
|
||||
@ -203,7 +251,7 @@ def _unlock(self):
|
||||
self._file.close()
|
||||
self._file = None
|
||||
|
||||
def acquire_read(self, timeout=_default_timeout):
|
||||
def acquire_read(self, timeout=None):
|
||||
"""Acquires a recursive, shared lock for reading.
|
||||
|
||||
Read and write locks can be acquired and released in arbitrary
|
||||
@ -214,21 +262,22 @@ def acquire_read(self, timeout=_default_timeout):
|
||||
the POSIX lock, False if it is a nested transaction.
|
||||
|
||||
"""
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
if self._reads == 0 and self._writes == 0:
|
||||
self._debug(
|
||||
'READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
|
||||
.format(self))
|
||||
self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError.
|
||||
self._debug(
|
||||
'READ LOCK: {0.path}[{0._start}:{0._length}] [Acquired]'
|
||||
.format(self))
|
||||
# can raise LockError.
|
||||
wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout)
|
||||
self._acquired_debug('READ LOCK', wait_time, nattempts)
|
||||
self._reads += 1
|
||||
return True
|
||||
else:
|
||||
self._reads += 1
|
||||
return False
|
||||
|
||||
def acquire_write(self, timeout=_default_timeout):
|
||||
def acquire_write(self, timeout=None):
|
||||
"""Acquires a recursive, exclusive lock for writing.
|
||||
|
||||
Read and write locks can be acquired and released in arbitrary
|
||||
@ -239,14 +288,15 @@ def acquire_write(self, timeout=_default_timeout):
|
||||
the POSIX lock, False if it is a nested transaction.
|
||||
|
||||
"""
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
if self._writes == 0:
|
||||
self._debug(
|
||||
'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
|
||||
.format(self))
|
||||
self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError.
|
||||
self._debug(
|
||||
'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquired]'
|
||||
.format(self))
|
||||
# can raise LockError.
|
||||
wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout)
|
||||
self._acquired_debug('WRITE LOCK', wait_time, nattempts)
|
||||
self._writes += 1
|
||||
return True
|
||||
else:
|
||||
@ -302,6 +352,18 @@ def release_write(self):
|
||||
def _debug(self, *args):
|
||||
tty.debug(*args)
|
||||
|
||||
def _acquired_debug(self, lock_type, wait_time, nattempts):
|
||||
attempts_format = 'attempt' if nattempts == 1 else 'attempt'
|
||||
if nattempts > 1:
|
||||
acquired_attempts_format = ' after {0:0.2f}s and {1:d} {2}'.format(
|
||||
wait_time, nattempts, attempts_format)
|
||||
else:
|
||||
# Dont print anything if we succeeded immediately
|
||||
acquired_attempts_format = ''
|
||||
self._debug(
|
||||
'{0}: {1.path}[{1._start}:{1._length}] [Acquired{2}]'
|
||||
.format(lock_type, self, acquired_attempts_format))
|
||||
|
||||
|
||||
class LockTransaction(object):
|
||||
"""Simple nested transaction context manager that uses a file lock.
|
||||
@ -323,7 +385,7 @@ class LockTransaction(object):
|
||||
"""
|
||||
|
||||
def __init__(self, lock, acquire_fn=None, release_fn=None,
|
||||
timeout=_default_timeout):
|
||||
timeout=None):
|
||||
self._lock = lock
|
||||
self._timeout = timeout
|
||||
self._acquire_fn = acquire_fn
|
||||
|
@ -63,7 +63,7 @@
|
||||
from spack.directory_layout import DirectoryLayoutError
|
||||
from spack.error import SpackError
|
||||
from spack.version import Version
|
||||
from spack.util.lock import Lock, WriteTransaction, ReadTransaction
|
||||
from spack.util.lock import Lock, WriteTransaction, ReadTransaction, LockError
|
||||
|
||||
|
||||
# DB goes in this directory underneath the root
|
||||
@ -73,7 +73,7 @@
|
||||
_db_version = Version('0.9.3')
|
||||
|
||||
# Timeout for spack database locks in seconds
|
||||
_db_lock_timeout = 60
|
||||
_db_lock_timeout = 120
|
||||
|
||||
# Types of dependencies tracked by the database
|
||||
_tracked_deps = ('link', 'run')
|
||||
@ -203,19 +203,30 @@ def __init__(self, root, db_dir=None):
|
||||
mkdirp(self._db_dir)
|
||||
|
||||
# initialize rest of state.
|
||||
self.lock = Lock(self._lock_path)
|
||||
self.db_lock_timeout = (
|
||||
spack.config.get('config:db_lock_timeout') or _db_lock_timeout)
|
||||
self.package_lock_timeout = (
|
||||
spack.config.get('config:package_lock_timeout') or None)
|
||||
tty.debug('DATABASE LOCK TIMEOUT: {0}s'.format(
|
||||
str(self.db_lock_timeout)))
|
||||
timeout_format_str = ('{0}s'.format(str(self.package_lock_timeout))
|
||||
if self.package_lock_timeout else 'No timeout')
|
||||
tty.debug('PACKAGE LOCK TIMEOUT: {0}'.format(
|
||||
str(timeout_format_str)))
|
||||
self.lock = Lock(self._lock_path,
|
||||
default_timeout=self.db_lock_timeout)
|
||||
self._data = {}
|
||||
|
||||
# whether there was an error at the start of a read transaction
|
||||
self._error = None
|
||||
|
||||
def write_transaction(self, timeout=_db_lock_timeout):
|
||||
def write_transaction(self):
|
||||
"""Get a write lock context manager for use in a `with` block."""
|
||||
return WriteTransaction(self.lock, self._read, self._write, timeout)
|
||||
return WriteTransaction(self.lock, self._read, self._write)
|
||||
|
||||
def read_transaction(self, timeout=_db_lock_timeout):
|
||||
def read_transaction(self):
|
||||
"""Get a read lock context manager for use in a `with` block."""
|
||||
return ReadTransaction(self.lock, self._read, timeout=timeout)
|
||||
return ReadTransaction(self.lock, self._read)
|
||||
|
||||
def prefix_lock(self, spec):
|
||||
"""Get a lock on a particular spec's installation directory.
|
||||
@ -236,26 +247,44 @@ def prefix_lock(self, spec):
|
||||
if prefix not in self._prefix_locks:
|
||||
self._prefix_locks[prefix] = Lock(
|
||||
self.prefix_lock_path,
|
||||
spec.dag_hash_bit_prefix(bit_length(sys.maxsize)), 1)
|
||||
start=spec.dag_hash_bit_prefix(bit_length(sys.maxsize)),
|
||||
length=1,
|
||||
default_timeout=self.package_lock_timeout)
|
||||
|
||||
return self._prefix_locks[prefix]
|
||||
|
||||
@contextlib.contextmanager
|
||||
def prefix_read_lock(self, spec):
|
||||
prefix_lock = self.prefix_lock(spec)
|
||||
prefix_lock.acquire_read()
|
||||
|
||||
try:
|
||||
prefix_lock.acquire_read(60)
|
||||
yield self
|
||||
finally:
|
||||
except LockError:
|
||||
# This addresses the case where a nested lock attempt fails inside
|
||||
# of this context manager
|
||||
raise
|
||||
except (Exception, KeyboardInterrupt):
|
||||
prefix_lock.release_read()
|
||||
raise
|
||||
else:
|
||||
prefix_lock.release_read()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def prefix_write_lock(self, spec):
|
||||
prefix_lock = self.prefix_lock(spec)
|
||||
prefix_lock.acquire_write()
|
||||
|
||||
try:
|
||||
prefix_lock.acquire_write(60)
|
||||
yield self
|
||||
finally:
|
||||
except LockError:
|
||||
# This addresses the case where a nested lock attempt fails inside
|
||||
# of this context manager
|
||||
raise
|
||||
except (Exception, KeyboardInterrupt):
|
||||
prefix_lock.release_write()
|
||||
raise
|
||||
else:
|
||||
prefix_lock.release_write()
|
||||
|
||||
def _write_to_file(self, stream):
|
||||
@ -435,7 +464,7 @@ def _read_suppress_error():
|
||||
self._data = {}
|
||||
|
||||
transaction = WriteTransaction(
|
||||
self.lock, _read_suppress_error, self._write, _db_lock_timeout
|
||||
self.lock, _read_suppress_error, self._write
|
||||
)
|
||||
|
||||
with transaction:
|
||||
@ -599,7 +628,7 @@ def _read(self):
|
||||
if os.access(self._db_dir, os.R_OK | os.W_OK):
|
||||
# if we can write, then read AND write a JSON file.
|
||||
self._read_from_file(self._old_yaml_index_path, format='yaml')
|
||||
with WriteTransaction(self.lock, timeout=_db_lock_timeout):
|
||||
with WriteTransaction(self.lock):
|
||||
self._write(None, None, None)
|
||||
else:
|
||||
# Read chck for a YAML file if we can't find JSON.
|
||||
@ -608,7 +637,7 @@ def _read(self):
|
||||
else:
|
||||
# The file doesn't exist, try to traverse the directory.
|
||||
# reindex() takes its own write lock, so no lock here.
|
||||
with WriteTransaction(self.lock, timeout=_db_lock_timeout):
|
||||
with WriteTransaction(self.lock):
|
||||
self._write(None, None, None)
|
||||
self.reindex(spack.store.layout)
|
||||
|
||||
|
@ -70,6 +70,13 @@
|
||||
'dirty': {'type': 'boolean'},
|
||||
'build_jobs': {'type': 'integer', 'minimum': 1},
|
||||
'ccache': {'type': 'boolean'},
|
||||
'db_lock_timeout': {'type': 'integer', 'minimum': 1},
|
||||
'package_lock_timeout': {
|
||||
'anyOf': [
|
||||
{'type': 'integer', 'minimum': 1},
|
||||
{'type': 'null'}
|
||||
],
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
|
@ -220,6 +220,13 @@ def lock_path(lock_dir):
|
||||
os.unlink(lock_file)
|
||||
|
||||
|
||||
def test_poll_interval_generator():
|
||||
interval_iter = iter(
|
||||
lk.Lock._poll_interval_generator(_wait_times=[1, 2, 3]))
|
||||
intervals = list(next(interval_iter) for i in range(100))
|
||||
assert intervals == [1] * 20 + [2] * 40 + [3] * 40
|
||||
|
||||
|
||||
def local_multiproc_test(*functions, **kwargs):
|
||||
"""Order some processes using simple barrier synchronization."""
|
||||
b = mp.Barrier(len(functions), timeout=barrier_timeout)
|
||||
|
@ -42,17 +42,24 @@ class FileCache(object):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, root):
|
||||
def __init__(self, root, timeout=120):
|
||||
"""Create a file cache object.
|
||||
|
||||
This will create the cache directory if it does not exist yet.
|
||||
|
||||
Args:
|
||||
root: specifies the root directory where the cache stores files
|
||||
|
||||
timeout: when there is contention among multiple Spack processes
|
||||
for cache files, this specifies how long Spack should wait
|
||||
before assuming that there is a deadlock.
|
||||
"""
|
||||
self.root = root.rstrip(os.path.sep)
|
||||
if not os.path.exists(self.root):
|
||||
mkdirp(self.root)
|
||||
|
||||
self._locks = {}
|
||||
self.lock_timeout = timeout
|
||||
|
||||
def destroy(self):
|
||||
"""Remove all files under the cache root."""
|
||||
@ -77,7 +84,8 @@ def _lock_path(self, key):
|
||||
def _get_lock(self, key):
|
||||
"""Create a lock for a key, if necessary, and return a lock object."""
|
||||
if key not in self._locks:
|
||||
self._locks[key] = Lock(self._lock_path(key))
|
||||
self._locks[key] = Lock(self._lock_path(key),
|
||||
default_timeout=self.lock_timeout)
|
||||
return self._locks[key]
|
||||
|
||||
def init_entry(self, key):
|
||||
|
@ -47,7 +47,9 @@ def __init__(self, *args, **kwargs):
|
||||
|
||||
def _lock(self, op, timeout=0):
|
||||
if self._enable:
|
||||
super(Lock, self)._lock(op, timeout)
|
||||
return super(Lock, self)._lock(op, timeout)
|
||||
else:
|
||||
return 0, 0
|
||||
|
||||
def _unlock(self):
|
||||
"""Unlock call that always succeeds."""
|
||||
|
Loading…
Reference in New Issue
Block a user