Merge pull request #1562 from LLNL/features/db-locking

Finer-grained locking
This commit is contained in:
Todd Gamblin 2016-10-11 02:35:46 -07:00 committed by GitHub
commit f9d8325cc2
14 changed files with 567 additions and 185 deletions

View File

@ -39,15 +39,34 @@
import llnl.util.tty as tty
from llnl.util.lang import dedupe
__all__ = ['set_install_permissions', 'install', 'install_tree',
'traverse_tree',
'expand_user', 'working_dir', 'touch', 'touchp', 'mkdirp',
'force_remove', 'join_path', 'ancestor', 'can_access',
'filter_file',
'FileFilter', 'change_sed_delimiter', 'is_exe', 'force_symlink',
'set_executable', 'copy_mode', 'unset_executable_mode',
'remove_dead_links', 'remove_linked_tree',
'fix_darwin_install_name', 'find_libraries', 'LibraryList']
__all__ = [
'FileFilter',
'LibraryList',
'ancestor',
'can_access',
'change_sed_delimiter',
'copy_mode',
'expand_user',
'filter_file',
'find_libraries',
'fix_darwin_install_name',
'force_remove',
'force_symlink',
'install',
'install_tree',
'is_exe',
'join_path',
'mkdirp',
'remove_dead_links',
'remove_if_dead_link',
'remove_linked_tree',
'set_executable',
'set_install_permissions',
'touch',
'touchp',
'traverse_tree',
'unset_executable_mode',
'working_dir']
def filter_file(regex, repl, *filenames, **kwargs):
@ -388,10 +407,20 @@ def remove_dead_links(root):
"""
for file in os.listdir(root):
path = join_path(root, file)
if os.path.islink(path):
real_path = os.path.realpath(path)
if not os.path.exists(real_path):
os.unlink(path)
remove_if_dead_link(path)
def remove_if_dead_link(path):
"""
Removes the argument if it is a dead link, does nothing otherwise
Args:
path: the potential dead link
"""
if os.path.islink(path):
real_path = os.path.realpath(path)
if not os.path.exists(real_path):
os.unlink(path)
def remove_linked_tree(path):

View File

@ -28,9 +28,13 @@
import time
import socket
import llnl.util.tty as tty
__all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction',
'LockError']
# Default timeout in seconds, after which locks will raise exceptions.
_default_timeout = 60
@ -41,51 +45,86 @@
class Lock(object):
"""This is an implementation of a filesystem lock using Python's lockf.
In Python, `lockf` actually calls `fcntl`, so this should work with any
filesystem implementation that supports locking through the fcntl calls.
This includes distributed filesystems like Lustre (when flock is enabled)
and recent NFS versions.
In Python, `lockf` actually calls `fcntl`, so this should work with
any filesystem implementation that supports locking through the fcntl
calls. This includes distributed filesystems like Lustre (when flock
is enabled) and recent NFS versions.
"""
def __init__(self, file_path):
self._file_path = file_path
self._fd = None
def __init__(self, path, start=0, length=0):
"""Construct a new lock on the file at ``path``.
By default, the lock applies to the whole file. Optionally,
caller can specify a byte range beginning ``start`` bytes from
the start of the file and extending ``length`` bytes from there.
This exposes a subset of fcntl locking functionality. It does
not currently expose the ``whence`` parameter -- ``whence`` is
always os.SEEK_SET and ``start`` is always evaluated from the
beginning of the file.
"""
self.path = path
self._file = None
self._reads = 0
self._writes = 0
def _lock(self, op, timeout):
# byte range parameters
self._start = start
self._length = length
# PID and host of lock holder
self.pid = self.old_pid = None
self.host = self.old_host = None
def _lock(self, op, timeout=_default_timeout):
"""This takes a lock using POSIX locks (``fnctl.lockf``).
The lock is implemented as a spin lock using a nonblocking
call to lockf().
The lock is implemented as a spin lock using a nonblocking call
to lockf().
On acquiring an exclusive lock, the lock writes this process's
pid and host to the lock file, in case the holding process
needs to be killed later.
pid and host to the lock file, in case the holding process needs
to be killed later.
If the lock times out, it raises a ``LockError``.
"""
start_time = time.time()
while (time.time() - start_time) < timeout:
try:
# If this is already open read-only and we want to
# upgrade to an exclusive write lock, close first.
if self._fd is not None:
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
if op == fcntl.LOCK_EX and flags | os.O_RDONLY:
os.close(self._fd)
self._fd = None
if self._fd is None:
mode = os.O_RDWR if op == fcntl.LOCK_EX else os.O_RDONLY
self._fd = os.open(self._file_path, mode)
fcntl.lockf(self._fd, op | fcntl.LOCK_NB)
# If we could write the file, we'd have opened it 'r+'.
# Raise an error when we attempt to upgrade to a write lock.
if op == fcntl.LOCK_EX:
os.write(
self._fd,
"pid=%s,host=%s" % (os.getpid(), socket.getfqdn()))
if self._file and self._file.mode == 'r':
raise LockError(
"Can't take exclusive lock on read-only file: %s"
% self.path)
# Create file and parent directories if they don't exist.
if self._file is None:
self._ensure_parent_directory()
# Prefer to open 'r+' to allow upgrading to write
# lock later if possible. Open read-only if we can't
# write the lock file at all.
os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+'
if os.path.exists(self.path) and not os.access(
self.path, os.W_OK):
os_mode, fd_mode = os.O_RDONLY, 'r'
fd = os.open(self.path, os_mode)
self._file = os.fdopen(fd, fd_mode)
# Try to get the lock (will raise if not available.)
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
self._length, self._start, os.SEEK_SET)
# All locks read the owner PID and host
self._read_lock_data()
# Exclusive locks write their PID/host
if op == fcntl.LOCK_EX:
self._write_lock_data()
return
except IOError as error:
@ -97,6 +136,40 @@ def _lock(self, op, timeout):
raise LockError("Timed out waiting for lock.")
def _ensure_parent_directory(self):
parent = os.path.dirname(self.path)
try:
os.makedirs(parent)
return True
except OSError as e:
# makedirs can fail when diretory already exists.
if not (e.errno == errno.EEXIST and os.path.isdir(parent) or
e.errno == errno.EISDIR):
raise
def _read_lock_data(self):
"""Read PID and host data out of the file if it is there."""
line = self._file.read()
if line:
pid, host = line.strip().split(',')
_, _, self.pid = pid.rpartition('=')
_, _, self.host = host.rpartition('=')
def _write_lock_data(self):
"""Write PID and host data to the file, recording old values."""
self.old_pid = self.pid
self.old_host = self.host
self.pid = os.getpid()
self.host = socket.getfqdn()
# write pid, host to disk to sync over FS
self._file.seek(0)
self._file.write("pid=%s,host=%s" % (self.pid, self.host))
self._file.truncate()
self._file.flush()
os.fsync(self._file.fileno())
def _unlock(self):
"""Releases a lock using POSIX locks (``fcntl.lockf``)
@ -104,9 +177,10 @@ def _unlock(self):
be masquerading as write locks, but this removes either.
"""
fcntl.lockf(self._fd, fcntl.LOCK_UN)
os.close(self._fd)
self._fd = None
fcntl.lockf(self._file, fcntl.LOCK_UN,
self._length, self._start, os.SEEK_SET)
self._file.close()
self._file = None
def acquire_read(self, timeout=_default_timeout):
"""Acquires a recursive, shared lock for reading.
@ -120,7 +194,9 @@ def acquire_read(self, timeout=_default_timeout):
"""
if self._reads == 0 and self._writes == 0:
self._lock(fcntl.LOCK_SH, timeout) # can raise LockError.
tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
.format(self))
self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError.
self._reads += 1
return True
else:
@ -139,7 +215,10 @@ def acquire_write(self, timeout=_default_timeout):
"""
if self._writes == 0:
self._lock(fcntl.LOCK_EX, timeout) # can raise LockError.
tty.debug(
'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
.format(self))
self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError.
self._writes += 1
return True
else:
@ -159,6 +238,8 @@ def release_read(self):
assert self._reads > 0
if self._reads == 1 and self._writes == 0:
tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Released]'
.format(self))
self._unlock() # can raise LockError.
self._reads -= 1
return True
@ -179,6 +260,8 @@ def release_write(self):
assert self._writes > 0
if self._writes == 1 and self._reads == 0:
tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]'
.format(self))
self._unlock() # can raise LockError.
self._writes -= 1
return True

View File

@ -23,6 +23,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##############################################################################
import os
import re
from datetime import datetime
from glob import glob
@ -53,8 +54,12 @@ def _debug_tarball_suffix():
if not os.path.isdir('.git'):
return 'nobranch.nogit.%s' % suffix
# Get symbolic branch name and strip any special chars (mainly '/')
symbolic = git(
'rev-parse', '--abbrev-ref', '--short', 'HEAD', output=str).strip()
symbolic = re.sub(r'[^\w.-]', '-', symbolic)
# Get the commit hash too.
commit = git(
'rev-parse', '--short', 'HEAD', output=str).strip()
@ -69,12 +74,23 @@ def create_db_tarball(args):
tarball_name = "spack-db.%s.tar.gz" % _debug_tarball_suffix()
tarball_path = os.path.abspath(tarball_name)
with working_dir(spack.spack_root):
base = os.path.basename(spack.install_path)
transform_args = []
if 'GNU' in tar('--version', output=str):
transform_args = ['--transform', 's/^%s/%s/' % (base, tarball_name)]
else:
transform_args = ['-s', '/^%s/%s/' % (base, tarball_name)]
wd = os.path.dirname(spack.install_path)
with working_dir(wd):
files = [spack.installed_db._index_path]
files += glob('%s/*/*/*/.spack/spec.yaml' % spack.install_path)
files += glob('%s/*/*/*/.spack/spec.yaml' % base)
files = [os.path.relpath(f) for f in files]
tar('-czf', tarball_path, *files)
args = ['-czf', tarball_path]
args += transform_args
args += files
tar(*args)
tty.msg('Created %s' % tarball_name)

View File

@ -65,43 +65,40 @@ def diy(self, args):
if len(specs) > 1:
tty.die("spack diy only takes one spec.")
# Take a write lock before checking for existence.
with spack.installed_db.write_transaction():
spec = specs[0]
if not spack.repo.exists(spec.name):
tty.warn("No such package: %s" % spec.name)
create = tty.get_yes_or_no("Create this package?", default=False)
if not create:
tty.msg("Exiting without creating.")
sys.exit(1)
else:
tty.msg("Running 'spack edit -f %s'" % spec.name)
edit_package(spec.name, spack.repo.first_repo(), None, True)
return
if not spec.versions.concrete:
tty.die(
"spack diy spec must have a single, concrete version. "
"Did you forget a package version number?")
spec.concretize()
package = spack.repo.get(spec)
if package.installed:
tty.error("Already installed in %s" % package.prefix)
tty.msg("Uninstall or try adding a version suffix for this "
"DIY build.")
spec = specs[0]
if not spack.repo.exists(spec.name):
tty.warn("No such package: %s" % spec.name)
create = tty.get_yes_or_no("Create this package?", default=False)
if not create:
tty.msg("Exiting without creating.")
sys.exit(1)
else:
tty.msg("Running 'spack edit -f %s'" % spec.name)
edit_package(spec.name, spack.repo.first_repo(), None, True)
return
# Forces the build to run out of the current directory.
package.stage = DIYStage(os.getcwd())
if not spec.versions.concrete:
tty.die(
"spack diy spec must have a single, concrete version. "
"Did you forget a package version number?")
# TODO: make this an argument, not a global.
spack.do_checksum = False
spec.concretize()
package = spack.repo.get(spec)
package.do_install(
keep_prefix=args.keep_prefix,
install_deps=not args.ignore_deps,
verbose=not args.quiet,
keep_stage=True, # don't remove source dir for DIY.
dirty=args.dirty)
if package.installed:
tty.error("Already installed in %s" % package.prefix)
tty.msg("Uninstall or try adding a version suffix for this DIY build.")
sys.exit(1)
# Forces the build to run out of the current directory.
package.stage = DIYStage(os.getcwd())
# TODO: make this an argument, not a global.
spack.do_checksum = False
package.do_install(
keep_prefix=args.keep_prefix,
install_deps=not args.ignore_deps,
verbose=not args.quiet,
keep_stage=True, # don't remove source dir for DIY.
dirty=args.dirty)

View File

@ -84,15 +84,14 @@ def install(parser, args):
specs = spack.cmd.parse_specs(args.packages, concretize=True)
for spec in specs:
package = spack.repo.get(spec)
with spack.installed_db.write_transaction():
package.do_install(
keep_prefix=args.keep_prefix,
keep_stage=args.keep_stage,
install_deps=not args.ignore_deps,
install_self=not args.deps_only,
make_jobs=args.jobs,
run_tests=args.run_tests,
verbose=args.verbose,
fake=args.fake,
dirty=args.dirty,
explicit=True)
package.do_install(
keep_prefix=args.keep_prefix,
keep_stage=args.keep_stage,
install_deps=not args.ignore_deps,
install_self=not args.deps_only,
make_jobs=args.jobs,
run_tests=args.run_tests,
verbose=args.verbose,
fake=args.fake,
dirty=args.dirty,
explicit=True)

View File

@ -193,16 +193,14 @@ def uninstall(parser, args):
if not args.packages and not args.all:
tty.die("uninstall requires at least one package argument.")
with spack.installed_db.write_transaction():
uninstall_list = get_uninstall_list(args)
uninstall_list = get_uninstall_list(args)
if not args.yes_to_all:
tty.msg("The following packages will be uninstalled : ")
print('')
spack.cmd.display_specs(uninstall_list, **display_args)
print('')
spack.cmd.ask_for_confirmation('Do you want to proceed ? ')
if not args.yes_to_all:
tty.msg("The following packages will be uninstalled : ")
print('')
spack.cmd.display_specs(uninstall_list, **display_args)
print('')
spack.cmd.ask_for_confirmation('Do you want to proceed ? ')
# Uninstall everything on the list
do_uninstall(uninstall_list, args.force)
# Uninstall everything on the list
do_uninstall(uninstall_list, args.force)

View File

@ -33,7 +33,7 @@
2. It will allow us to track external installations as well as lost
packages and their dependencies.
Prior ot the implementation of this store, a direcotry layout served
Prior to the implementation of this store, a directory layout served
as the authoritative database of packages in Spack. This module
provides a cache and a sanity checking mechanism for what is in the
filesystem.
@ -156,13 +156,13 @@ def __init__(self, root, db_dir=None):
self._index_path = join_path(self._db_dir, 'index.yaml')
self._lock_path = join_path(self._db_dir, 'lock')
# This is for other classes to use to lock prefix directories.
self.prefix_lock_path = join_path(self._db_dir, 'prefix_lock')
# Create needed directories and files
if not os.path.exists(self._db_dir):
mkdirp(self._db_dir)
if not os.path.exists(self._lock_path):
touch(self._lock_path)
# initialize rest of state.
self.lock = Lock(self._lock_path)
self._data = {}

View File

@ -77,10 +77,7 @@ def _lock_path(self, key):
def _get_lock(self, key):
"""Create a lock for a key, if necessary, and return a lock object."""
if key not in self._locks:
lock_file = self._lock_path(key)
if not os.path.exists(lock_file):
touch(lock_file)
self._locks[key] = Lock(lock_file)
self._locks[key] = Lock(self._lock_path(key))
return self._locks[key]
def init_entry(self, key):

View File

@ -39,8 +39,16 @@
import textwrap
import time
import string
import contextlib
from StringIO import StringIO
import llnl.util.lock
import llnl.util.tty as tty
from llnl.util.filesystem import *
from llnl.util.lang import *
from llnl.util.link_tree import LinkTree
from llnl.util.tty.log import log_output
import spack
import spack.build_environment
import spack.compilers
@ -53,12 +61,8 @@
import spack.url
import spack.util.web
from StringIO import StringIO
from llnl.util.filesystem import *
from llnl.util.lang import *
from llnl.util.link_tree import LinkTree
from llnl.util.tty.log import log_output
from spack.stage import Stage, ResourceStage, StageComposite
from spack.util.crypto import bit_length
from spack.util.environment import dump_environment
from spack.util.executable import ProcessError, which
from spack.version import *
@ -305,6 +309,7 @@ class SomePackage(Package):
Package creators override functions like install() (all of them do this),
clean() (some of them do this), and others to provide custom behavior.
"""
#
# These are default values for instance variables.
#
@ -336,6 +341,9 @@ class SomePackage(Package):
"""
sanity_check_is_dir = []
"""Per-process lock objects for each install prefix."""
prefix_locks = {}
class __metaclass__(type):
"""Ensure attributes required by Spack directives are present."""
def __init__(cls, name, bases, dict):
@ -346,6 +354,9 @@ def __init__(self, spec):
# this determines how the package should be built.
self.spec = spec
# Lock on the prefix shared resource. Will be set in prefix property
self._prefix_lock = None
# Name of package is the name of its module, without the
# containing module names.
self.name = self.module.__name__
@ -691,6 +702,29 @@ def installed_dependents(self):
dependents.append(spec)
return dependents
@property
def prefix_lock(self):
"""Prefix lock is a byte range lock on the nth byte of a file.
The lock file is ``spack.installed_db.prefix_lock`` -- the DB
tells us what to call it and it lives alongside the install DB.
n is the sys.maxsize-bit prefix of the DAG hash. This makes
likelihood of collision is very low AND it gives us
readers-writer lock semantics with just a single lockfile, so no
cleanup required.
"""
if self._prefix_lock is None:
prefix = self.spec.prefix
if prefix not in Package.prefix_locks:
Package.prefix_locks[prefix] = llnl.util.lock.Lock(
spack.installed_db.prefix_lock_path,
self.spec.dag_hash_bit_prefix(bit_length(sys.maxsize)), 1)
self._prefix_lock = Package.prefix_locks[prefix]
return self._prefix_lock
@property
def prefix(self):
"""Get the prefix into which this package should be installed."""
@ -875,6 +909,22 @@ def _resource_stage(self, resource):
resource_stage_folder = '-'.join(pieces)
return resource_stage_folder
@contextlib.contextmanager
def _prefix_read_lock(self):
try:
self.prefix_lock.acquire_read(60)
yield self
finally:
self.prefix_lock.release_read()
@contextlib.contextmanager
def _prefix_write_lock(self):
try:
self.prefix_lock.acquire_write(60)
yield self
finally:
self.prefix_lock.release_write()
install_phases = set(['configure', 'build', 'install', 'provenance'])
def do_install(self,
@ -926,14 +976,18 @@ def do_install(self,
# Ensure package is not already installed
layout = spack.install_layout
if 'install' in install_phases and layout.check_installed(self.spec):
tty.msg("%s is already installed in %s" % (self.name, self.prefix))
rec = spack.installed_db.get_record(self.spec)
if (not rec.explicit) and explicit:
with spack.installed_db.write_transaction():
rec = spack.installed_db.get_record(self.spec)
rec.explicit = True
return
with self._prefix_read_lock():
if ('install' in install_phases and
layout.check_installed(self.spec)):
tty.msg(
"%s is already installed in %s" % (self.name, self.prefix))
rec = spack.installed_db.get_record(self.spec)
if (not rec.explicit) and explicit:
with spack.installed_db.write_transaction():
rec = spack.installed_db.get_record(self.spec)
rec.explicit = True
return
tty.msg("Installing %s" % self.name)
@ -983,7 +1037,7 @@ def build_process():
self.build_directory = join_path(self.stage.path, 'spack-build')
self.source_directory = self.stage.source_path
with self.stage:
with contextlib.nested(self.stage, self._prefix_write_lock()):
# Run the pre-install hook in the child process after
# the directory is created.
spack.hooks.pre_install(self)
@ -1077,8 +1131,9 @@ def build_process():
wrap=False)
raise
# note: PARENT of the build process adds the new package to
# Parent of the build process adds the new package to
# the database, so that we don't need to re-read from file.
# NOTE: add() implicitly acquires a write-lock
spack.installed_db.add(
self.spec, spack.install_layout, explicit=explicit)
@ -1259,11 +1314,12 @@ def do_uninstall(self, force=False):
raise PackageStillNeededError(self.spec, dependents)
# Pre-uninstall hook runs first.
spack.hooks.pre_uninstall(self)
# Uninstalling in Spack only requires removing the prefix.
self.remove_prefix()
spack.installed_db.remove(self.spec)
with self._prefix_write_lock():
spack.hooks.pre_uninstall(self)
# Uninstalling in Spack only requires removing the prefix.
self.remove_prefix()
#
spack.installed_db.remove(self.spec)
tty.msg("Successfully uninstalled %s" % self.spec.short_spec)
# Once everything else is done, run post install hooks

View File

@ -120,6 +120,7 @@
from spack.util.string import *
import spack.util.spack_yaml as syaml
from spack.util.spack_yaml import syaml_dict
from spack.util.crypto import prefix_bits
from spack.version import *
from spack.provider_index import ProviderIndex
@ -963,13 +964,10 @@ def prefix(self):
return Prefix(spack.install_layout.path_for_spec(self))
def dag_hash(self, length=None):
"""
Return a hash of the entire spec DAG, including connectivity.
"""
"""Return a hash of the entire spec DAG, including connectivity."""
if self._hash:
return self._hash[:length]
else:
# XXX(deptype): ignore 'build' dependencies here
yaml_text = syaml.dump(
self.to_node_dict(), default_flow_style=True, width=sys.maxint)
sha = hashlib.sha1(yaml_text)
@ -978,6 +976,10 @@ def dag_hash(self, length=None):
self._hash = b32_hash
return b32_hash
def dag_hash_bit_prefix(self, bits):
"""Get the first <bits> bits of the DAG hash as an integer type."""
return base32_prefix_bits(self.dag_hash(), bits)
def to_node_dict(self):
d = syaml_dict()
@ -999,6 +1001,8 @@ def to_node_dict(self):
if self.architecture:
d['arch'] = self.architecture.to_dict()
# TODO: restore build dependencies here once we have less picky
# TODO: concretization.
deps = self.dependencies_dict(deptype=('link', 'run'))
if deps:
d['dependencies'] = syaml_dict([
@ -2723,6 +2727,16 @@ def parse_anonymous_spec(spec_like, pkg_name):
return anon_spec
def base32_prefix_bits(hash_string, bits):
"""Return the first <bits> bits of a base32 string as an integer."""
if bits > len(hash_string) * 5:
raise ValueError("Too many bits! Requested %d bit prefix of '%s'."
% (bits, hash_string))
hash_bytes = base64.b32decode(hash_string, casefold=True)
return prefix_bits(hash_bytes, bits)
class SpecError(spack.error.SpackError):
"""Superclass for all errors that occur while constructing specs."""

View File

@ -23,12 +23,15 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##############################################################################
import os
import sys
import errno
import hashlib
import shutil
import tempfile
from urlparse import urljoin
import llnl.util.tty as tty
import llnl.util.lock
from llnl.util.filesystem import *
import spack.util.pattern as pattern
@ -38,6 +41,7 @@
import spack.fetch_strategy as fs
import spack.error
from spack.version import *
from spack.util.crypto import prefix_bits, bit_length
STAGE_PREFIX = 'spack-stage-'
@ -88,8 +92,12 @@ class Stage(object):
similar, and are intended to persist for only one run of spack.
"""
def __init__(self, url_or_fetch_strategy,
name=None, mirror_path=None, keep=False, path=None):
"""Shared dict of all stage locks."""
stage_locks = {}
def __init__(
self, url_or_fetch_strategy,
name=None, mirror_path=None, keep=False, path=None, lock=True):
"""Create a stage object.
Parameters:
url_or_fetch_strategy
@ -147,6 +155,20 @@ def __init__(self, url_or_fetch_strategy,
# Flag to decide whether to delete the stage folder on exit or not
self.keep = keep
# File lock for the stage directory. We use one file for all
# stage locks. See Spec.prefix_lock for details on this approach.
self._lock = None
if lock:
if self.name not in Stage.stage_locks:
sha1 = hashlib.sha1(self.name).digest()
lock_id = prefix_bits(sha1, bit_length(sys.maxsize))
stage_lock_path = join_path(spack.stage_path, '.lock')
Stage.stage_locks[self.name] = llnl.util.lock.Lock(
stage_lock_path, lock_id, 1)
self._lock = Stage.stage_locks[self.name]
def __enter__(self):
"""
Entering a stage context will create the stage directory
@ -154,6 +176,8 @@ def __enter__(self):
Returns:
self
"""
if self._lock is not None:
self._lock.acquire_write(timeout=60)
self.create()
return self
@ -175,6 +199,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None and not self.keep:
self.destroy()
if self._lock is not None:
self._lock.release_write()
def _need_to_create_path(self):
"""Makes sure nothing weird has happened since the last time we
looked at path. Returns True if path already exists and is ok.
@ -416,7 +443,8 @@ def create(self):
"""
# Create the top-level stage directory
mkdirp(spack.stage_path)
remove_dead_links(spack.stage_path)
remove_if_dead_link(self.path)
# If a tmp_root exists then create a directory there and then link it
# in the stage area, otherwise create the stage directory in self.path
if self._need_to_create_path():

View File

@ -25,6 +25,7 @@
"""
These tests ensure that our lock works correctly.
"""
import os
import shutil
import tempfile
import unittest
@ -44,7 +45,6 @@ class LockTest(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.lock_path = join_path(self.tempdir, 'lockfile')
touch(self.lock_path)
def tearDown(self):
shutil.rmtree(self.tempdir, ignore_errors=True)
@ -64,98 +64,185 @@ def multiproc_test(self, *functions):
#
# Process snippets below can be composed into tests.
#
def acquire_write(self, barrier):
lock = Lock(self.lock_path)
lock.acquire_write() # grab exclusive lock
barrier.wait()
barrier.wait() # hold the lock until exception raises in other procs.
def acquire_write(self, start=0, length=0):
def fn(barrier):
lock = Lock(self.lock_path, start, length)
lock.acquire_write() # grab exclusive lock
barrier.wait()
barrier.wait() # hold the lock until timeout in other procs.
return fn
def acquire_read(self, barrier):
lock = Lock(self.lock_path)
lock.acquire_read() # grab shared lock
barrier.wait()
barrier.wait() # hold the lock until exception raises in other procs.
def acquire_read(self, start=0, length=0):
def fn(barrier):
lock = Lock(self.lock_path, start, length)
lock.acquire_read() # grab shared lock
barrier.wait()
barrier.wait() # hold the lock until timeout in other procs.
return fn
def timeout_write(self, barrier):
lock = Lock(self.lock_path)
barrier.wait() # wait for lock acquire in first process
self.assertRaises(LockError, lock.acquire_write, 0.1)
barrier.wait()
def timeout_write(self, start=0, length=0):
def fn(barrier):
lock = Lock(self.lock_path, start, length)
barrier.wait() # wait for lock acquire in first process
self.assertRaises(LockError, lock.acquire_write, 0.1)
barrier.wait()
return fn
def timeout_read(self, barrier):
lock = Lock(self.lock_path)
barrier.wait() # wait for lock acquire in first process
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait()
def timeout_read(self, start=0, length=0):
def fn(barrier):
lock = Lock(self.lock_path, start, length)
barrier.wait() # wait for lock acquire in first process
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait()
return fn
#
# Test that exclusive locks on other processes time out when an
# exclusive lock is held.
#
def test_write_lock_timeout_on_write(self):
self.multiproc_test(self.acquire_write, self.timeout_write)
self.multiproc_test(self.acquire_write(), self.timeout_write())
def test_write_lock_timeout_on_write_2(self):
self.multiproc_test(
self.acquire_write, self.timeout_write, self.timeout_write)
self.acquire_write(), self.timeout_write(), self.timeout_write())
def test_write_lock_timeout_on_write_3(self):
self.multiproc_test(
self.acquire_write, self.timeout_write, self.timeout_write,
self.timeout_write)
self.acquire_write(), self.timeout_write(), self.timeout_write(),
self.timeout_write())
def test_write_lock_timeout_on_write_ranges(self):
self.multiproc_test(
self.acquire_write(0, 1), self.timeout_write(0, 1))
def test_write_lock_timeout_on_write_ranges_2(self):
self.multiproc_test(
self.acquire_write(0, 64), self.acquire_write(65, 1),
self.timeout_write(0, 1), self.timeout_write(63, 1))
def test_write_lock_timeout_on_write_ranges_3(self):
self.multiproc_test(
self.acquire_write(0, 1), self.acquire_write(1, 1),
self.timeout_write(), self.timeout_write(), self.timeout_write())
def test_write_lock_timeout_on_write_ranges_4(self):
self.multiproc_test(
self.acquire_write(0, 1), self.acquire_write(1, 1),
self.acquire_write(2, 456), self.acquire_write(500, 64),
self.timeout_write(), self.timeout_write(), self.timeout_write())
#
# Test that shared locks on other processes time out when an
# exclusive lock is held.
#
def test_read_lock_timeout_on_write(self):
self.multiproc_test(self.acquire_write, self.timeout_read)
self.multiproc_test(self.acquire_write(), self.timeout_read())
def test_read_lock_timeout_on_write_2(self):
self.multiproc_test(
self.acquire_write, self.timeout_read, self.timeout_read)
self.acquire_write(), self.timeout_read(), self.timeout_read())
def test_read_lock_timeout_on_write_3(self):
self.multiproc_test(
self.acquire_write, self.timeout_read, self.timeout_read,
self.timeout_read)
self.acquire_write(), self.timeout_read(), self.timeout_read(),
self.timeout_read())
def test_read_lock_timeout_on_write_ranges(self):
"""small write lock, read whole file."""
self.multiproc_test(self.acquire_write(0, 1), self.timeout_read())
def test_read_lock_timeout_on_write_ranges_2(self):
"""small write lock, small read lock"""
self.multiproc_test(self.acquire_write(0, 1), self.timeout_read(0, 1))
def test_read_lock_timeout_on_write_ranges_3(self):
"""two write locks, overlapping read locks"""
self.multiproc_test(
self.acquire_write(0, 1), self.acquire_write(64, 128),
self.timeout_read(0, 1), self.timeout_read(128, 256))
#
# Test that exclusive locks time out when shared locks are held.
#
def test_write_lock_timeout_on_read(self):
self.multiproc_test(self.acquire_read, self.timeout_write)
self.multiproc_test(self.acquire_read(), self.timeout_write())
def test_write_lock_timeout_on_read_2(self):
self.multiproc_test(
self.acquire_read, self.timeout_write, self.timeout_write)
self.acquire_read(), self.timeout_write(), self.timeout_write())
def test_write_lock_timeout_on_read_3(self):
self.multiproc_test(
self.acquire_read, self.timeout_write, self.timeout_write,
self.timeout_write)
self.acquire_read(), self.timeout_write(), self.timeout_write(),
self.timeout_write())
def test_write_lock_timeout_on_read_ranges(self):
self.multiproc_test(self.acquire_read(0, 1), self.timeout_write())
def test_write_lock_timeout_on_read_ranges_2(self):
self.multiproc_test(self.acquire_read(0, 1), self.timeout_write(0, 1))
def test_write_lock_timeout_on_read_ranges_3(self):
self.multiproc_test(
self.acquire_read(0, 1), self.acquire_read(10, 1),
self.timeout_write(0, 1), self.timeout_write(10, 1))
def test_write_lock_timeout_on_read_ranges_4(self):
self.multiproc_test(
self.acquire_read(0, 64),
self.timeout_write(10, 1), self.timeout_write(32, 1))
def test_write_lock_timeout_on_read_ranges_5(self):
self.multiproc_test(
self.acquire_read(64, 128),
self.timeout_write(65, 1), self.timeout_write(127, 1),
self.timeout_write(90, 10))
#
# Test that exclusive locks time while lots of shared locks are held.
#
def test_write_lock_timeout_with_multiple_readers_2_1(self):
self.multiproc_test(
self.acquire_read, self.acquire_read, self.timeout_write)
self.acquire_read(), self.acquire_read(), self.timeout_write())
def test_write_lock_timeout_with_multiple_readers_2_2(self):
self.multiproc_test(
self.acquire_read, self.acquire_read, self.timeout_write,
self.timeout_write)
self.acquire_read(), self.acquire_read(), self.timeout_write(),
self.timeout_write())
def test_write_lock_timeout_with_multiple_readers_3_1(self):
self.multiproc_test(
self.acquire_read, self.acquire_read, self.acquire_read,
self.timeout_write)
self.acquire_read(), self.acquire_read(), self.acquire_read(),
self.timeout_write())
def test_write_lock_timeout_with_multiple_readers_3_2(self):
self.multiproc_test(
self.acquire_read, self.acquire_read, self.acquire_read,
self.timeout_write, self.timeout_write)
self.acquire_read(), self.acquire_read(), self.acquire_read(),
self.timeout_write(), self.timeout_write())
def test_write_lock_timeout_with_multiple_readers_2_1_ranges(self):
self.multiproc_test(
self.acquire_read(0, 10), self.acquire_read(5, 10),
self.timeout_write(5, 5))
def test_write_lock_timeout_with_multiple_readers_2_3_ranges(self):
self.multiproc_test(
self.acquire_read(0, 10), self.acquire_read(5, 15),
self.timeout_write(0, 1), self.timeout_write(11, 3),
self.timeout_write(7, 1))
def test_write_lock_timeout_with_multiple_readers_3_1_ranges(self):
self.multiproc_test(
self.acquire_read(0, 5), self.acquire_read(5, 5),
self.acquire_read(10, 5),
self.timeout_write(0, 15))
def test_write_lock_timeout_with_multiple_readers_3_2_ranges(self):
self.multiproc_test(
self.acquire_read(0, 5), self.acquire_read(5, 5),
self.acquire_read(10, 5),
self.timeout_write(3, 10), self.timeout_write(5, 1))
#
# Test that read can be upgraded to write.
@ -172,19 +259,42 @@ def test_upgrade_read_to_write(self):
lock.acquire_read()
self.assertTrue(lock._reads == 1)
self.assertTrue(lock._writes == 0)
self.assertTrue(lock._file.mode == 'r+')
lock.acquire_write()
self.assertTrue(lock._reads == 1)
self.assertTrue(lock._writes == 1)
self.assertTrue(lock._file.mode == 'r+')
lock.release_write()
self.assertTrue(lock._reads == 1)
self.assertTrue(lock._writes == 0)
self.assertTrue(lock._file.mode == 'r+')
lock.release_read()
self.assertTrue(lock._reads == 0)
self.assertTrue(lock._writes == 0)
self.assertTrue(lock._fd is None)
self.assertTrue(lock._file is None)
#
# Test that read-only file can be read-locked but not write-locked.
#
def test_upgrade_read_to_write_fails_with_readonly_file(self):
# ensure lock file exists the first time, so we open it read-only
# to begin wtih.
touch(self.lock_path)
os.chmod(self.lock_path, 0444)
lock = Lock(self.lock_path)
self.assertTrue(lock._reads == 0)
self.assertTrue(lock._writes == 0)
lock.acquire_read()
self.assertTrue(lock._reads == 1)
self.assertTrue(lock._writes == 0)
self.assertTrue(lock._file.mode == 'r')
self.assertRaises(LockError, lock.acquire_write)
#
# Longer test case that ensures locks are reusable. Ordering is

View File

@ -523,3 +523,37 @@ def descend_and_check(iterable, level=0):
level = descend_and_check(dag.to_node_dict())
# level just makes sure we are doing something here
self.assertTrue(level >= 5)
def test_hash_bits(self):
"""Ensure getting first n bits of a base32-encoded DAG hash works."""
# RFC 4648 base32 decode table
b32 = dict((j, i) for i, j in enumerate('abcdefghijklmnopqrstuvwxyz'))
b32.update(dict((j, i) for i, j in enumerate('234567', 26)))
# some package hashes
tests = [
'35orsd4cenv743hg4i5vxha2lzayycby',
'6kfqtj7dap3773rxog6kkmoweix5gpwo',
'e6h6ff3uvmjbq3azik2ckr6ckwm3depv',
'snz2juf4ij7sv77cq3vs467q6acftmur',
'4eg47oedi5bbkhpoxw26v3oe6vamkfd7',
'vrwabwj6umeb5vjw6flx2rnft3j457rw']
for test_hash in tests:
# string containing raw bits of hash ('1' and '0')
expected = ''.join([format(b32[c], '#07b').replace('0b', '')
for c in test_hash])
for bits in (1, 2, 3, 4, 7, 8, 9, 16, 64, 117, 128, 160):
actual_int = spack.spec.base32_prefix_bits(test_hash, bits)
fmt = "#0%sb" % (bits + 2)
actual = format(actual_int, fmt).replace('0b', '')
self.assertEqual(expected[:bits], actual)
self.assertRaises(
ValueError, spack.spec.base32_prefix_bits, test_hash, 161)
self.assertRaises(
ValueError, spack.spec.base32_prefix_bits, test_hash, 256)

View File

@ -100,3 +100,24 @@ def check(self, filename):
self.sum = checksum(
self.hash_fun, filename, block_size=self.block_size)
return self.sum == self.hexdigest
def prefix_bits(byte_array, bits):
"""Return the first <bits> bits of a byte array as an integer."""
result = 0
n = 0
for i, b in enumerate(byte_array):
n += 8
result = (result << 8) | ord(b)
if n >= bits:
break
result >>= (n - bits)
return result
def bit_length(num):
"""Number of bits required to represent an integer in binary."""
s = bin(num)
s = s.lstrip('-0b')
return len(s)