Distributed builds (#13100)
Fixes #9394 Closes #13217. ## Background Spack provides the ability to enable/disable parallel builds through two options: package `parallel` and configuration `build_jobs`. This PR changes the algorithm to allow multiple, simultaneous processes to coordinate the installation of the same spec (and specs with overlapping dependencies.). The `parallel` (boolean) property sets the default for its package though the value can be overridden in the `install` method. Spack's current parallel builds are limited to build tools supporting `jobs` arguments (e.g., `Makefiles`). The number of jobs actually used is calculated as`min(config:build_jobs, # cores, 16)`, which can be overridden in the package or on the command line (i.e., `spack install -j <# jobs>`). This PR adds support for distributed (single- and multi-node) parallel builds. The goals of this work include improving the efficiency of installing packages with many dependencies and reducing the repetition associated with concurrent installations of (dependency) packages. ## Approach ### File System Locks Coordination between concurrent installs of overlapping packages to a Spack instance is accomplished through bottom-up dependency DAG processing and file system locks. The runs can be a combination of interactive and batch processes affecting the same file system. Exclusive prefix locks are required to install a package while shared prefix locks are required to check if the package is installed. Failures are communicated through a separate exclusive prefix failure lock, for concurrent processes, combined with a persistent store, for separate, related build processes. The resulting file contains the failing spec to facilitate manual debugging. ### Priority Queue Management of dependency builds changed from reliance on recursion to use of a priority queue where the priority of a spec is based on the number of its remaining uninstalled dependencies. Using a queue required a change to dependency build exception handling with the most visible issue being that the `install` method *must* install something in the prefix. Consequently, packages can no longer get away with an install method consisting of `pass`, for example. ## Caveats - This still only parallelizes a single-rooted build. Multi-rooted installs (e.g., for environments) are TBD in a future PR. Tasks: - [x] Adjust package lock timeout to correspond to value used in the demo - [x] Adjust database lock timeout to reduce contention on startup of concurrent `spack install <spec>` calls - [x] Replace (test) package's `install: pass` methods with file creation since post-install `sanity_check_prefix` will otherwise error out with `Install failed .. Nothing was installed!` - [x] Resolve remaining existing test failures - [x] Respond to alalazo's initial feedback - [x] Remove `bin/demo-locks.py` - [x] Add new tests to address new coverage issues - [x] Replace built-in package's `def install(..): pass` to "install" something (i.e., only `apple-libunwind`) - [x] Increase code coverage
This commit is contained in:
@@ -8,14 +8,32 @@
|
||||
import errno
|
||||
import time
|
||||
import socket
|
||||
from datetime import datetime
|
||||
|
||||
import llnl.util.tty as tty
|
||||
import spack.util.string
|
||||
|
||||
|
||||
__all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction',
|
||||
'LockError', 'LockTimeoutError',
|
||||
'LockPermissionError', 'LockROFileError', 'CantCreateLockError']
|
||||
|
||||
#: Mapping of supported locks to description
|
||||
lock_type = {fcntl.LOCK_SH: 'read', fcntl.LOCK_EX: 'write'}
|
||||
|
||||
#: A useful replacement for functions that should return True when not provided
|
||||
#: for example.
|
||||
true_fn = lambda: True
|
||||
|
||||
|
||||
def _attempts_str(wait_time, nattempts):
|
||||
# Don't print anything if we succeeded on the first try
|
||||
if nattempts <= 1:
|
||||
return ''
|
||||
|
||||
attempts = spack.util.string.plural(nattempts, 'attempt')
|
||||
return ' after {0:0.2f}s and {1}'.format(wait_time, attempts)
|
||||
|
||||
|
||||
class Lock(object):
|
||||
"""This is an implementation of a filesystem lock using Python's lockf.
|
||||
@@ -31,8 +49,8 @@ class Lock(object):
|
||||
maintain multiple locks on the same file.
|
||||
"""
|
||||
|
||||
def __init__(self, path, start=0, length=0, debug=False,
|
||||
default_timeout=None):
|
||||
def __init__(self, path, start=0, length=0, default_timeout=None,
|
||||
debug=False, desc=''):
|
||||
"""Construct a new lock on the file at ``path``.
|
||||
|
||||
By default, the lock applies to the whole file. Optionally,
|
||||
@@ -43,6 +61,16 @@ def __init__(self, path, start=0, length=0, debug=False,
|
||||
not currently expose the ``whence`` parameter -- ``whence`` is
|
||||
always ``os.SEEK_SET`` and ``start`` is always evaluated from the
|
||||
beginning of the file.
|
||||
|
||||
Args:
|
||||
path (str): path to the lock
|
||||
start (int): optional byte offset at which the lock starts
|
||||
length (int): optional number of bytes to lock
|
||||
default_timeout (int): number of seconds to wait for lock attempts,
|
||||
where None means to wait indefinitely
|
||||
debug (bool): debug mode specific to locking
|
||||
desc (str): optional debug message lock description, which is
|
||||
helpful for distinguishing between different Spack locks.
|
||||
"""
|
||||
self.path = path
|
||||
self._file = None
|
||||
@@ -56,6 +84,9 @@ def __init__(self, path, start=0, length=0, debug=False,
|
||||
# enable debug mode
|
||||
self.debug = debug
|
||||
|
||||
# optional debug description
|
||||
self.desc = ' ({0})'.format(desc) if desc else ''
|
||||
|
||||
# If the user doesn't set a default timeout, or if they choose
|
||||
# None, 0, etc. then lock attempts will not time out (unless the
|
||||
# user sets a timeout for each attempt)
|
||||
@@ -89,6 +120,20 @@ def _poll_interval_generator(_wait_times=None):
|
||||
num_requests += 1
|
||||
yield wait_time
|
||||
|
||||
def __repr__(self):
|
||||
"""Formal representation of the lock."""
|
||||
rep = '{0}('.format(self.__class__.__name__)
|
||||
for attr, value in self.__dict__.items():
|
||||
rep += '{0}={1}, '.format(attr, value.__repr__())
|
||||
return '{0})'.format(rep.strip(', '))
|
||||
|
||||
def __str__(self):
|
||||
"""Readable string (with key fields) of the lock."""
|
||||
location = '{0}[{1}:{2}]'.format(self.path, self._start, self._length)
|
||||
timeout = 'timeout={0}'.format(self.default_timeout)
|
||||
activity = '#reads={0}, #writes={1}'.format(self._reads, self._writes)
|
||||
return '({0}, {1}, {2})'.format(location, timeout, activity)
|
||||
|
||||
def _lock(self, op, timeout=None):
|
||||
"""This takes a lock using POSIX locks (``fcntl.lockf``).
|
||||
|
||||
@@ -99,8 +144,9 @@ def _lock(self, op, timeout=None):
|
||||
successfully acquired, the total wait time and the number of attempts
|
||||
is returned.
|
||||
"""
|
||||
assert op in (fcntl.LOCK_SH, fcntl.LOCK_EX)
|
||||
assert op in lock_type
|
||||
|
||||
self._log_acquiring('{0} LOCK'.format(lock_type[op].upper()))
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
# Create file and parent directories if they don't exist.
|
||||
@@ -128,6 +174,9 @@ def _lock(self, op, timeout=None):
|
||||
# If the file were writable, we'd have opened it 'r+'
|
||||
raise LockROFileError(self.path)
|
||||
|
||||
tty.debug("{0} locking [{1}:{2}]: timeout {3} sec"
|
||||
.format(lock_type[op], self._start, self._length, timeout))
|
||||
|
||||
poll_intervals = iter(Lock._poll_interval_generator())
|
||||
start_time = time.time()
|
||||
num_attempts = 0
|
||||
@@ -139,17 +188,21 @@ def _lock(self, op, timeout=None):
|
||||
|
||||
time.sleep(next(poll_intervals))
|
||||
|
||||
# TBD: Is an extra attempt after timeout needed/appropriate?
|
||||
num_attempts += 1
|
||||
if self._poll_lock(op):
|
||||
total_wait_time = time.time() - start_time
|
||||
return total_wait_time, num_attempts
|
||||
|
||||
raise LockTimeoutError("Timed out waiting for lock.")
|
||||
raise LockTimeoutError("Timed out waiting for a {0} lock."
|
||||
.format(lock_type[op]))
|
||||
|
||||
def _poll_lock(self, op):
|
||||
"""Attempt to acquire the lock in a non-blocking manner. Return whether
|
||||
the locking attempt succeeds
|
||||
"""
|
||||
assert op in lock_type
|
||||
|
||||
try:
|
||||
# Try to get the lock (will raise if not available.)
|
||||
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
|
||||
@@ -159,6 +212,9 @@ def _poll_lock(self, op):
|
||||
if self.debug:
|
||||
# All locks read the owner PID and host
|
||||
self._read_debug_data()
|
||||
tty.debug('{0} locked {1} [{2}:{3}] (owner={4})'
|
||||
.format(lock_type[op], self.path,
|
||||
self._start, self._length, self.pid))
|
||||
|
||||
# Exclusive locks write their PID/host
|
||||
if op == fcntl.LOCK_EX:
|
||||
@@ -167,12 +223,12 @@ def _poll_lock(self, op):
|
||||
return True
|
||||
|
||||
except IOError as e:
|
||||
if e.errno in (errno.EAGAIN, errno.EACCES):
|
||||
# EAGAIN and EACCES == locked by another process
|
||||
pass
|
||||
else:
|
||||
# EAGAIN and EACCES == locked by another process (so try again)
|
||||
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
||||
raise
|
||||
|
||||
return False
|
||||
|
||||
def _ensure_parent_directory(self):
|
||||
parent = os.path.dirname(self.path)
|
||||
|
||||
@@ -227,6 +283,8 @@ def _unlock(self):
|
||||
self._length, self._start, os.SEEK_SET)
|
||||
self._file.close()
|
||||
self._file = None
|
||||
self._reads = 0
|
||||
self._writes = 0
|
||||
|
||||
def acquire_read(self, timeout=None):
|
||||
"""Acquires a recursive, shared lock for reading.
|
||||
@@ -242,15 +300,14 @@ def acquire_read(self, timeout=None):
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
if self._reads == 0 and self._writes == 0:
|
||||
self._debug(
|
||||
'READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
|
||||
.format(self))
|
||||
# can raise LockError.
|
||||
wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout)
|
||||
self._acquired_debug('READ LOCK', wait_time, nattempts)
|
||||
self._reads += 1
|
||||
# Log if acquired, which includes counts when verbose
|
||||
self._log_acquired('READ LOCK', wait_time, nattempts)
|
||||
return True
|
||||
else:
|
||||
# Increment the read count for nested lock tracking
|
||||
self._reads += 1
|
||||
return False
|
||||
|
||||
@@ -268,13 +325,11 @@ def acquire_write(self, timeout=None):
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
if self._writes == 0:
|
||||
self._debug(
|
||||
'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
|
||||
.format(self))
|
||||
# can raise LockError.
|
||||
wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout)
|
||||
self._acquired_debug('WRITE LOCK', wait_time, nattempts)
|
||||
self._writes += 1
|
||||
# Log if acquired, which includes counts when verbose
|
||||
self._log_acquired('WRITE LOCK', wait_time, nattempts)
|
||||
|
||||
# return True only if we weren't nested in a read lock.
|
||||
# TODO: we may need to return two values: whether we got
|
||||
@@ -282,9 +337,65 @@ def acquire_write(self, timeout=None):
|
||||
# write lock for the first time. Now it returns the latter.
|
||||
return self._reads == 0
|
||||
else:
|
||||
# Increment the write count for nested lock tracking
|
||||
self._writes += 1
|
||||
return False
|
||||
|
||||
def is_write_locked(self):
|
||||
"""Check if the file is write locked
|
||||
|
||||
Return:
|
||||
(bool): ``True`` if the path is write locked, otherwise, ``False``
|
||||
"""
|
||||
try:
|
||||
self.acquire_read()
|
||||
|
||||
# If we have a read lock then no other process has a write lock.
|
||||
self.release_read()
|
||||
except LockTimeoutError:
|
||||
# Another process is holding a write lock on the file
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def downgrade_write_to_read(self, timeout=None):
|
||||
"""
|
||||
Downgrade from an exclusive write lock to a shared read.
|
||||
|
||||
Raises:
|
||||
LockDowngradeError: if this is an attempt at a nested transaction
|
||||
"""
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
if self._writes == 1 and self._reads == 0:
|
||||
self._log_downgrading()
|
||||
# can raise LockError.
|
||||
wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout)
|
||||
self._reads = 1
|
||||
self._writes = 0
|
||||
self._log_downgraded(wait_time, nattempts)
|
||||
else:
|
||||
raise LockDowngradeError(self.path)
|
||||
|
||||
def upgrade_read_to_write(self, timeout=None):
|
||||
"""
|
||||
Attempts to upgrade from a shared read lock to an exclusive write.
|
||||
|
||||
Raises:
|
||||
LockUpgradeError: if this is an attempt at a nested transaction
|
||||
"""
|
||||
timeout = timeout or self.default_timeout
|
||||
|
||||
if self._reads == 1 and self._writes == 0:
|
||||
self._log_upgrading()
|
||||
# can raise LockError.
|
||||
wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout)
|
||||
self._reads = 0
|
||||
self._writes = 1
|
||||
self._log_upgraded(wait_time, nattempts)
|
||||
else:
|
||||
raise LockUpgradeError(self.path)
|
||||
|
||||
def release_read(self, release_fn=None):
|
||||
"""Releases a read lock.
|
||||
|
||||
@@ -305,17 +416,17 @@ def release_read(self, release_fn=None):
|
||||
"""
|
||||
assert self._reads > 0
|
||||
|
||||
locktype = 'READ LOCK'
|
||||
if self._reads == 1 and self._writes == 0:
|
||||
self._debug(
|
||||
'READ LOCK: {0.path}[{0._start}:{0._length}] [Released]'
|
||||
.format(self))
|
||||
self._log_releasing(locktype)
|
||||
|
||||
result = True
|
||||
if release_fn is not None:
|
||||
result = release_fn()
|
||||
# we need to call release_fn before releasing the lock
|
||||
release_fn = release_fn or true_fn
|
||||
result = release_fn()
|
||||
|
||||
self._unlock() # can raise LockError.
|
||||
self._reads -= 1
|
||||
self._reads = 0
|
||||
self._log_released(locktype)
|
||||
return result
|
||||
else:
|
||||
self._reads -= 1
|
||||
@@ -339,45 +450,91 @@ def release_write(self, release_fn=None):
|
||||
|
||||
"""
|
||||
assert self._writes > 0
|
||||
release_fn = release_fn or true_fn
|
||||
|
||||
locktype = 'WRITE LOCK'
|
||||
if self._writes == 1 and self._reads == 0:
|
||||
self._debug(
|
||||
'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]'
|
||||
.format(self))
|
||||
self._log_releasing(locktype)
|
||||
|
||||
# we need to call release_fn before releasing the lock
|
||||
result = True
|
||||
if release_fn is not None:
|
||||
result = release_fn()
|
||||
result = release_fn()
|
||||
|
||||
self._unlock() # can raise LockError.
|
||||
self._writes -= 1
|
||||
self._writes = 0
|
||||
self._log_released(locktype)
|
||||
return result
|
||||
|
||||
else:
|
||||
self._writes -= 1
|
||||
|
||||
# when the last *write* is released, we call release_fn here
|
||||
# instead of immediately before releasing the lock.
|
||||
if self._writes == 0:
|
||||
return release_fn() if release_fn is not None else True
|
||||
return release_fn()
|
||||
else:
|
||||
return False
|
||||
|
||||
def _debug(self, *args):
|
||||
tty.debug(*args)
|
||||
|
||||
def _acquired_debug(self, lock_type, wait_time, nattempts):
|
||||
attempts_format = 'attempt' if nattempts == 1 else 'attempt'
|
||||
if nattempts > 1:
|
||||
acquired_attempts_format = ' after {0:0.2f}s and {1:d} {2}'.format(
|
||||
wait_time, nattempts, attempts_format)
|
||||
else:
|
||||
# Dont print anything if we succeeded immediately
|
||||
acquired_attempts_format = ''
|
||||
self._debug(
|
||||
'{0}: {1.path}[{1._start}:{1._length}] [Acquired{2}]'
|
||||
.format(lock_type, self, acquired_attempts_format))
|
||||
def _get_counts_desc(self):
|
||||
return '(reads {0}, writes {1})'.format(self._reads, self._writes) \
|
||||
if tty.is_verbose() else ''
|
||||
|
||||
def _log_acquired(self, locktype, wait_time, nattempts):
|
||||
attempts_part = _attempts_str(wait_time, nattempts)
|
||||
now = datetime.now()
|
||||
desc = 'Acquired at %s' % now.strftime("%H:%M:%S.%f")
|
||||
self._debug(self._status_msg(locktype, '{0}{1}'.
|
||||
format(desc, attempts_part)))
|
||||
|
||||
def _log_acquiring(self, locktype):
|
||||
self._debug2(self._status_msg(locktype, 'Acquiring'))
|
||||
|
||||
def _log_downgraded(self, wait_time, nattempts):
|
||||
attempts_part = _attempts_str(wait_time, nattempts)
|
||||
now = datetime.now()
|
||||
desc = 'Downgraded at %s' % now.strftime("%H:%M:%S.%f")
|
||||
self._debug(self._status_msg('READ LOCK', '{0}{1}'
|
||||
.format(desc, attempts_part)))
|
||||
|
||||
def _log_downgrading(self):
|
||||
self._debug2(self._status_msg('WRITE LOCK', 'Downgrading'))
|
||||
|
||||
def _log_released(self, locktype):
|
||||
now = datetime.now()
|
||||
desc = 'Released at %s' % now.strftime("%H:%M:%S.%f")
|
||||
self._debug(self._status_msg(locktype, desc))
|
||||
|
||||
def _log_releasing(self, locktype):
|
||||
self._debug2(self._status_msg(locktype, 'Releasing'))
|
||||
|
||||
def _log_upgraded(self, wait_time, nattempts):
|
||||
attempts_part = _attempts_str(wait_time, nattempts)
|
||||
now = datetime.now()
|
||||
desc = 'Upgraded at %s' % now.strftime("%H:%M:%S.%f")
|
||||
self._debug(self._status_msg('WRITE LOCK', '{0}{1}'.
|
||||
format(desc, attempts_part)))
|
||||
|
||||
def _log_upgrading(self):
|
||||
self._debug2(self._status_msg('READ LOCK', 'Upgrading'))
|
||||
|
||||
def _status_msg(self, locktype, status):
|
||||
status_desc = '[{0}] {1}'.format(status, self._get_counts_desc())
|
||||
return '{0}{1.desc}: {1.path}[{1._start}:{1._length}] {2}'.format(
|
||||
locktype, self, status_desc)
|
||||
|
||||
def _debug2(self, *args):
|
||||
# TODO: Easy place to make a single, temporary change to the
|
||||
# TODO: debug level associated with the more detailed messages.
|
||||
# TODO:
|
||||
# TODO: Someday it would be great if we could switch this to
|
||||
# TODO: another level, perhaps _between_ debug and verbose, or
|
||||
# TODO: some other form of filtering so the first level of
|
||||
# TODO: debugging doesn't have to generate these messages. Using
|
||||
# TODO: verbose here did not work as expected because tests like
|
||||
# TODO: test_spec_json will write the verbose messages to the
|
||||
# TODO: output that is used to check test correctness.
|
||||
tty.debug(*args)
|
||||
|
||||
|
||||
class LockTransaction(object):
|
||||
@@ -462,10 +619,28 @@ class LockError(Exception):
|
||||
"""Raised for any errors related to locks."""
|
||||
|
||||
|
||||
class LockDowngradeError(LockError):
|
||||
"""Raised when unable to downgrade from a write to a read lock."""
|
||||
def __init__(self, path):
|
||||
msg = "Cannot downgrade lock from write to read on file: %s" % path
|
||||
super(LockDowngradeError, self).__init__(msg)
|
||||
|
||||
|
||||
class LockLimitError(LockError):
|
||||
"""Raised when exceed maximum attempts to acquire a lock."""
|
||||
|
||||
|
||||
class LockTimeoutError(LockError):
|
||||
"""Raised when an attempt to acquire a lock times out."""
|
||||
|
||||
|
||||
class LockUpgradeError(LockError):
|
||||
"""Raised when unable to upgrade from a read to a write lock."""
|
||||
def __init__(self, path):
|
||||
msg = "Cannot upgrade lock from read to write on file: %s" % path
|
||||
super(LockUpgradeError, self).__init__(msg)
|
||||
|
||||
|
||||
class LockPermissionError(LockError):
|
||||
"""Raised when there are permission issues with a lock."""
|
||||
|
||||
|
@@ -135,7 +135,9 @@ def process_stacktrace(countback):
|
||||
def get_timestamp(force=False):
|
||||
"""Get a string timestamp"""
|
||||
if _debug or _timestamp or force:
|
||||
return datetime.now().strftime("[%Y-%m-%d-%H:%M:%S.%f] ")
|
||||
# Note inclusion of the PID is useful for parallel builds.
|
||||
return '[{0}, {1}] '.format(
|
||||
datetime.now().strftime("%Y-%m-%d-%H:%M:%S.%f"), os.getpid())
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
Reference in New Issue
Block a user