Several changes to DB implementation.

1. Database stores a file version, so we can add to it in the future.
2. Database indexed by hashes and not numerical indexes.
3. Specs built by database have consistent hashes and it's checked.
4. minor naming and whitespace changes.
This commit is contained in:
Todd Gamblin 2015-09-17 00:16:12 -07:00
parent cd23d2eaa2
commit ccf311c9c6
6 changed files with 220 additions and 139 deletions

View File

@ -222,7 +222,7 @@ def working_dir(dirname, **kwargs):
def touch(path): def touch(path):
"""Creates an empty file at the specified path.""" """Creates an empty file at the specified path."""
with closing(open(path, 'a')) as file: with open(path, 'a') as file:
os.utime(path, None) os.utime(path, None)

View File

@ -22,6 +22,7 @@
# along with this program; if not, write to the Free Software Foundation, # along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
############################################################################## ##############################################################################
"""Lock implementation for shared filesystems."""
import os import os
import fcntl import fcntl
import errno import errno
@ -34,11 +35,13 @@ class Read_Lock_Instance(object):
A context manager for getting shared access to the object lock A context manager for getting shared access to the object lock
Arguments are lock and timeout (default 5 minutes) Arguments are lock and timeout (default 5 minutes)
""" """
def __init__(self,lock,timeout = 300): def __init__(self, lock, timeout=300):
self._lock = lock self._lock = lock
self._timeout = timeout self._timeout = timeout
def __enter__(self): def __enter__(self):
self._lock.acquire_read(self._timeout) self._lock.acquire_read(self._timeout)
def __exit__(self,type,value,traceback): def __exit__(self,type,value,traceback):
self._lock.release_read() self._lock.release_read()
@ -48,17 +51,21 @@ class Write_Lock_Instance(object):
A context manager for getting exclusive access to the object lock A context manager for getting exclusive access to the object lock
Arguments are lock and timeout (default 5 minutes) Arguments are lock and timeout (default 5 minutes)
""" """
def __init__(self,lock,timeout = 300): def __init__(self, lock, timeout=300):
self._lock = lock self._lock = lock
self._timeout = timeout self._timeout = timeout
def __enter__(self): def __enter__(self):
self._lock.acquire_write(self._timeout) self._lock.acquire_write(self._timeout)
def __exit__(self,type,value,traceback): def __exit__(self,type,value,traceback):
self._lock.release_write() self._lock.release_write()
class Lock(object): class Lock(object):
def __init__(self,file_path): """Distributed file-based lock using ``flock``."""
def __init__(self, file_path):
self._file_path = file_path self._file_path = file_path
self._fd = os.open(file_path,os.O_RDWR) self._fd = os.open(file_path,os.O_RDWR)
self._reads = 0 self._reads = 0
@ -71,20 +78,20 @@ def acquire_read(self,timeout):
the write lock will be maintained until all locks are released the write lock will be maintained until all locks are released
""" """
if self._reads == 0 and self._writes == 0: if self._reads == 0 and self._writes == 0:
self._lock(fcntl.LOCK_SH,timeout) self._lock(fcntl.LOCK_SH, timeout)
self._reads += 1 self._reads += 1
def acquire_write(self,timeout): def acquire_write(self, timeout):
""" """
Implements recursive lock Implements recursive lock
""" """
if self._writes == 0: if self._writes == 0:
self._lock(fcntl.LOCK_EX,timeout) self._lock(fcntl.LOCK_EX, timeout)
self._writes += 1 self._writes += 1
def _lock(self,op,timeout): def _lock(self, op, timeout):
""" """
The timeout is implemented using nonblocking flock() The timeout is implemented using nonblocking flock()
to avoid using signals for timing to avoid using signals for timing
@ -96,8 +103,8 @@ def _lock(self,op,timeout):
try: try:
fcntl.flock(self._fd, op | fcntl.LOCK_NB) fcntl.flock(self._fd, op | fcntl.LOCK_NB)
if op == fcntl.LOCK_EX: if op == fcntl.LOCK_EX:
with open(self._file_path,'w') as f: with open(self._file_path, 'w') as f:
f.write("pid = "+str(os.getpid())+", host = "+socket.getfqdn()) f.write("pid = " + str(os.getpid()) + ", host = " + socket.getfqdn())
return return
except IOError as error: except IOError as error:
if error.errno == errno.EAGAIN or error.errno == EACCES: if error.errno == errno.EAGAIN or error.errno == EACCES:
@ -133,4 +140,4 @@ def _unlock(self):
Releases the lock regardless of mode. Note that read locks may be Releases the lock regardless of mode. Note that read locks may be
masquerading as write locks at times, but this removes either. masquerading as write locks at times, but this removes either.
""" """
fcntl.flock(self._fd,fcntl.LOCK_UN) fcntl.flock(self._fd, fcntl.LOCK_UN)

View File

@ -139,10 +139,11 @@ def find(parser, args):
# Get all the specs the user asked for # Get all the specs the user asked for
if not query_specs: if not query_specs:
with Read_Lock_Instance(spack.installed_db.lock,1800): with Read_Lock_Instance(spack.installed_db.lock, 1800):
specs = set(spack.installed_db.installed_package_specs()) specs = set(spack.installed_db.installed_package_specs())
else: else:
with Read_Lock_Instance(spack.installed_db.lock,1800): with Read_Lock_Instance(spack.installed_db.lock, 1800):
results = [set(spack.installed_db.get_installed(qs)) for qs in query_specs] results = [set(spack.installed_db.get_installed(qs)) for qs in query_specs]
specs = set.union(*results) specs = set.union(*results)

View File

@ -1,5 +1,5 @@
############################################################################## ##############################################################################
# Copyright (c) 2013, Lawrence Livermore National Security, LLC. # Copyright (c) 2013-2015, Lawrence Livermore National Security, LLC.
# Produced at the Lawrence Livermore National Laboratory. # Produced at the Lawrence Livermore National Laboratory.
# #
# This file is part of Spack. # This file is part of Spack.
@ -23,95 +23,192 @@
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
############################################################################## ##############################################################################
import os import os
import sys
import inspect
import glob
import imp
import time import time
import copy
import errno
from external import yaml from external import yaml
from external.yaml.error import MarkedYAMLError from external.yaml.error import MarkedYAMLError, YAMLError
import llnl.util.tty as tty import llnl.util.tty as tty
from llnl.util.filesystem import join_path from llnl.util.filesystem import *
from llnl.util.lang import *
from llnl.util.lock import * from llnl.util.lock import *
import spack.error
import spack.spec import spack.spec
from spack.version import Version
from spack.spec import Spec from spack.spec import Spec
from spack.error import SpackError from spack.error import SpackError
from spack.virtual import ProviderIndex
from spack.util.naming import mod_to_class, validate_module_name # DB goes in this directory underneath the root
_db_dirname = '.spack-db'
# DB version. This is stuck in the DB file to track changes in format.
_db_version = Version('0.9')
def _autospec(function): def _autospec(function):
"""Decorator that automatically converts the argument of a single-arg """Decorator that automatically converts the argument of a single-arg
function to a Spec.""" function to a Spec."""
def converter(self, spec_like, **kwargs): def converter(self, spec_like, *args, **kwargs):
if not isinstance(spec_like, spack.spec.Spec): if not isinstance(spec_like, spack.spec.Spec):
spec_like = spack.spec.Spec(spec_like) spec_like = spack.spec.Spec(spec_like)
return function(self, spec_like, **kwargs) return function(self, spec_like, *args, **kwargs)
return converter return converter
class InstallRecord(object):
"""A record represents one installation in the DB."""
def __init__(self, spec, path):
self.spec = spec
self.path = path
def to_dict(self):
return { 'spec' : self.spec.to_node_dict(),
'path' : self.path }
@classmethod
def from_dict(cls, d):
return InstallRecord(d['spec'], d['path'])
class Database(object): class Database(object):
def __init__(self,root,file_name="_index.yaml"): def __init__(self, root):
""" """Create an empty Database.
Create an empty Database
Location defaults to root/specDB.yaml Location defaults to root/_index.yaml
The individual data are dicts containing The individual data are dicts containing
spec: the top level spec of a package spec: the top level spec of a package
path: the path to the install of that package path: the path to the install of that package
dep_hash: a hash of the dependence DAG for that package dep_hash: a hash of the dependence DAG for that package
""" """
self._root = root self._root = root
self._file_name = file_name
self._file_path = join_path(self._root,self._file_name)
self._lock_name = "_db_lock" # Set up layout of database files.
self._lock_path = join_path(self._root,self._lock_name) self._db_dir = join_path(self._root, _db_dirname)
self._index_path = join_path(self._db_dir, 'index.yaml')
self._lock_path = join_path(self._db_dir, 'lock')
# Create needed directories and files
if not os.path.exists(self._db_dir):
mkdirp(self._db_dir)
if not os.path.exists(self._lock_path): if not os.path.exists(self._lock_path):
open(self._lock_path,'w').close() touch(self._lock_path)
self.lock = Lock(self._lock_path)
self._data = [] # initialize rest of state.
self.lock = Lock(self._lock_path)
self._data = {}
self._last_write_time = 0 self._last_write_time = 0
def _read_from_yaml(self,stream): def _write_to_yaml(self, stream):
"""Write out the databsae to a YAML file."""
# map from per-spec hash code to installation record.
installs = dict((k, v.to_dict()) for k, v in self._data.items())
# databaes includes installation list and version.
# NOTE: this DB version does not handle multiple installs of
# the same spec well. If there are 2 identical specs with
# different paths, it can't differentiate.
# TODO: fix this before we support multiple install locations.
database = {
'database' : {
'installs' : installs,
'version' : str(_db_version)
}
}
try:
return yaml.dump(database, stream=stream, default_flow_style=False)
except YAMLError as e:
raise SpackYAMLError("error writing YAML database:", str(e))
def _read_spec_from_yaml(self, hash_key, installs):
"""Recursively construct a spec from a hash in a YAML database."""
# TODO: check validity of hash_key records here.
spec_dict = installs[hash_key]['spec']
# Build spec from dict first.
spec = Spec.from_node_dict(spec_dict)
# Add dependencies from other records in the install DB to
# form a full spec.
for dep_hash in spec_dict[spec.name]['dependencies'].values():
spec._add_dependency(self._read_spec_from_yaml(dep_hash, installs))
return spec
def _read_from_yaml(self, stream):
""" """
Fill database from YAML, do not maintain old data Fill database from YAML, do not maintain old data
Translate the spec portions from node-dict form to spec form Translate the spec portions from node-dict form to spec form
""" """
try: try:
file = yaml.load(stream) if isinstance(stream, basestring):
except MarkedYAMLError, e: with open(stream, 'r') as f:
yfile = yaml.load(f)
else:
yfile = yaml.load(stream)
except MarkedYAMLError as e:
raise SpackYAMLError("error parsing YAML database:", str(e)) raise SpackYAMLError("error parsing YAML database:", str(e))
if file is None: if yfile is None:
return return
def check(cond, msg):
if not cond: raise CorruptDatabaseError(self._index_path, msg)
check('database' in yfile, "No 'database' attribute in YAML.")
# High-level file checks.
db = yfile['database']
check('installs' in db, "No 'installs' in YAML DB.")
check('version' in db, "No 'version' in YAML DB.")
# TODO: better version check.
version = Version(db['version'])
if version != _db_version:
raise InvalidDatabaseVersionError(_db_version, version)
# Iterate through database and check each record.
installs = db['installs']
data = {} data = {}
for index, sp in file['database'].items(): for hash_key, rec in installs.items():
spec = Spec.from_node_dict(sp['spec']) try:
deps = sp['dependency_indices'] spec = self._read_spec_from_yaml(hash_key, installs)
path = sp['path'] spec_hash = spec.dag_hash()
dep_hash = sp['hash'] if not spec_hash == hash_key:
db_entry = {'deps':deps, 'spec': spec, 'path': path, 'hash':dep_hash} tty.warn("Hash mismatch in database: %s -> spec with hash %s"
data[index] = db_entry % (hash_key, spec_hash))
continue
for sph in data.values(): data[hash_key] = InstallRecord(spec, rec['path'])
for idx in sph['deps']:
sph['spec'].dependencies[data[idx]['spec'].name] = data[idx]['spec']
self._data = data.values() except Exception as e:
tty.warn("Invalid database reecord:",
"file: %s" % self._index_path,
"hash: %s" % hash_key,
"cause: %s" % str(e))
raise
self._data = data
def read_database(self): def reindex(self, directory_layout):
"""Build database index from scratch based from a directory layout."""
with Write_Lock_Instance(self.lock, 60):
data = {}
for spec in directory_layout.all_specs():
path = directory_layout.path_for_spec(spec)
hash_key = spec.dag_hash()
data[hash_key] = InstallRecord(spec, path)
self._data = data
self.write()
def read(self):
""" """
Re-read Database from the data in the set location Re-read Database from the data in the set location
If the cache is fresh, return immediately. If the cache is fresh, return immediately.
@ -119,43 +216,12 @@ def read_database(self):
if not self.is_dirty(): if not self.is_dirty():
return return
if os.path.isfile(self._file_path): if os.path.isfile(self._index_path):
with open(self._file_path,'r') as f: # Read from YAML file if a database exists
self._read_from_yaml(f) self._read_from_yaml(self._index_path)
else: else:
#The file doesn't exist, construct from file paths # The file doesn't exist, try to traverse the directory.
self._data = [] self.reindex(spack.install_layout)
specs = spack.install_layout.all_specs()
for spec in specs:
sph = {}
sph['spec']=spec
sph['hash']=spec.dag_hash()
sph['path']=spack.install_layout.path_for_spec(spec)
self._data.append(sph)
def _write_database_to_yaml(self,stream):
"""
Replace each spec with its dict-node form
Then stream all data to YAML
"""
node_list = []
spec_list = [sph['spec'] for sph in self._data]
for sph in self._data:
node = {}
deps = []
for name,spec in sph['spec'].dependencies.items():
deps.append(spec_list.index(spec))
node['spec']=Spec.to_node_dict(sph['spec'])
node['hash']=sph['hash']
node['path']=sph['path']
node['dependency_indices']=deps
node_list.append(node)
node_dict = dict(enumerate(node_list))
return yaml.dump({ 'database' : node_dict},
stream=stream, default_flow_style=False)
def write(self): def write(self):
@ -165,39 +231,42 @@ def write(self):
within the same lock, so there is no need to refresh within the same lock, so there is no need to refresh
the database within write() the database within write()
""" """
temp_name = str(os.getpid()) + socket.getfqdn() + ".temp" temp_name = '%s.%s.temp' % (socket.getfqdn(), os.getpid())
temp_file = join_path(self._root,temp_name) temp_file = join_path(self._db_dir, temp_name)
with open(temp_file,'w') as f:
# Write a temporary database file them move it into place
try:
with open(temp_file, 'w') as f:
self._last_write_time = int(time.time()) self._last_write_time = int(time.time())
self._write_database_to_yaml(f) self._write_to_yaml(f)
os.rename(temp_file,self._file_path) os.rename(temp_file, self._index_path)
except:
# Clean up temp file if something goes wrong.
if os.path.exists(temp_file):
os.remove(temp_file)
raise
def is_dirty(self): def is_dirty(self):
""" """
Returns true iff the database file does not exist Returns true iff the database file does not exist
or was most recently written to by another spack instance. or was most recently written to by another spack instance.
""" """
if not os.path.isfile(self._file_path): return (not os.path.isfile(self._index_path) or
return True (os.path.getmtime(self._index_path) > self._last_write_time))
else:
return (os.path.getmtime(self._file_path) > self._last_write_time)
# @_autospec @_autospec
def add(self, spec, path): def add(self, spec, path):
"""Read the database from the set location """Read the database from the set location
Add the specified entry as a dict Add the specified entry as a dict
Write the database back to memory Write the database back to memory
""" """
sph = {} # Should always already be locked
sph['spec']=spec with Write_Lock_Instance(self.lock, 60):
sph['path']=path self.read()
sph['hash']=spec.dag_hash() self._data[spec.dag_hash()] = InstallRecord(spec, path)
#Should always already be locked
with Write_Lock_Instance(self.lock,60):
self.read_database()
self._data.append(sph)
self.write() self.write()
@ -208,23 +277,18 @@ def remove(self, spec):
Searches for and removes the specified spec Searches for and removes the specified spec
Writes the database back to memory Writes the database back to memory
""" """
#Should always already be locked # Should always already be locked
with Write_Lock_Instance(self.lock,60): with Write_Lock_Instance(self.lock, 60):
self.read_database() self.read()
hash_key = spec.dag_hash()
for sp in self._data: if hash_key in self._data:
#Not sure the hash comparison is necessary del self._data[hash_key]
if sp['hash'] == spec.dag_hash() and sp['spec'] == spec:
self._data.remove(sp)
self.write() self.write()
@_autospec @_autospec
def get_installed(self, spec): def get_installed(self, spec):
""" """Get installed specs that satisfy the provided spec constraint."""
Get all the installed specs that satisfy the provided spec constraint
"""
return [s for s in self.installed_package_specs() if s.satisfies(spec)] return [s for s in self.installed_package_specs() if s.satisfies(spec)]
@ -238,10 +302,10 @@ def installed_extensions_for(self, extendee_spec):
try: try:
if s.package.extends(extendee_spec): if s.package.extends(extendee_spec):
yield s.package yield s.package
except UnknownPackageError, e: except UnknownPackageError as e:
continue continue
#skips unknown packages # skips unknown packages
#TODO: conditional way to do this instead of catching exceptions # TODO: conditional way to do this instead of catching exceptions
def installed_package_specs(self): def installed_package_specs(self):
@ -249,14 +313,10 @@ def installed_package_specs(self):
Read installed package names from the database Read installed package names from the database
and return their specs and return their specs
""" """
#Should always already be locked # Should always already be locked
with Read_Lock_Instance(self.lock,60): with Read_Lock_Instance(self.lock, 60):
self.read_database() self.read()
return sorted(rec.spec for rec in self._data.values())
installed = []
for sph in self._data:
installed.append(sph['spec'])
return installed
def installed_known_package_specs(self): def installed_known_package_specs(self):
@ -265,5 +325,18 @@ def installed_known_package_specs(self):
Return only the specs for which the package is known Return only the specs for which the package is known
to this version of spack to this version of spack
""" """
return [s for s in self.installed_package_specs() if spack.db.exists(s.name)] return [s for s in self.installed_package_specs()
if spack.db.exists(s.name)]
class CorruptDatabaseError(SpackError):
def __init__(self, path, msg=''):
super(CorruptDatabaseError, self).__init__(
"Spack database is corrupt: %s. %s" %(path, msg))
class InvalidDatabaseVersionError(SpackError):
def __init__(self, expected, found):
super(InvalidDatabaseVersionError, self).__init__(
"Expected database version %s but found version %s"
% (expected, found))

View File

@ -55,8 +55,8 @@ def die(self):
def __str__(self): def __str__(self):
msg = self.message msg = self.message
if self.long_message: if self._long_message:
msg += "\n %s" % self.long_message msg += "\n %s" % self._long_message
return msg return msg
class UnsupportedPlatformError(SpackError): class UnsupportedPlatformError(SpackError):

View File

@ -2000,4 +2000,4 @@ def __init__(self, provided, required):
class SpackYAMLError(spack.error.SpackError): class SpackYAMLError(spack.error.SpackError):
def __init__(self, msg, yaml_error): def __init__(self, msg, yaml_error):
super(SpackError, self).__init__(msg, str(yaml_error)) super(SpackYAMLError, self).__init__(msg, str(yaml_error))